我正在尝试找出一种方法来消除可变状态,从而消除可能的竞争状况。但是我似乎无法弄清楚如何以某种方式“交织”两个Observable,同时还使用“scan”。

希望通过显示更多代码,我可以给你一个主意:

private val stateRelay: BehaviorRelay<State> = BehaviorRelay.createDefault(initialState ?: DEFAULT_STATE) // maybe this should be `Observable.startWith()` somehow?

fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
    compositeDisposable += intents.concatMap { action ->
        when (action) {
            is Actions.Increment -> {
                Observable.create<Change> { emitter ->
                   // emit things
                }
            }
            is Actions.Decrement -> {
                Observable.create<Change> { emitter ->
                   // emit things
                }
            }
        }
    }.map { change ->
        reducer(stateRelay.value, change) // TODO: figure out how to use scan() here, instead of stateRelay.value! :(
    }.subscribeBy { newState ->
        stateRelay.accept(newState) // there is a chance that the relay shouldn't be here if scan is used
    }

    compositeDisposable +=
        stateRelay // TODO: figure out how to use scan() instead of a relay!
            .distinctUntilChanged()
            .subscribeBy { state ->
                stateRenderer(state)
            }
}

fun unbindIntents() {
    compositeDisposable.clear()
}

因此,我在这种方法中收到了Observable<Actions>,从技术上讲,它是另一端的PublishRelay(应该没问题)。

但是,我应该以某种方式将BehaviorRelay替换为Observable.scan()(可能是startWith)以消除可变状态,但是我似乎无法将自己的头全都束之高阁。

至于涉及的类型,如果需要它们:
private typealias Reducer = (state: State, change: Change) -> State

private typealias StateRenderer = (state: State) -> Unit

@Parcelize
data class State(val count: Int): Parcelable

我如何包装intents.concatMap.map(作为Observable.scan()的一部分(可能还有startWith()replay(1))),以消除我对BehaviorSubject的使用?

最佳答案

我将在上面详细说明我的评论。
这是对代码的简单重写,可以完成您所要的。

fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
    val stateObservable = intents.concatMap { action ->
        when (action) {
            is Actions.Increment -> {
                Observable.create<Change> { emitter ->
                // emit things
                }
            }
            is Actions.Decrement -> {
                Observable.create<Change> { emitter ->
                    // emit things
                }
            }
        }
    }.scan(initialState, { currentState, change -> reducer(currentState, change)})

    compositeDisposable +=
        stateObservable
                .distinctUntilChanged()
                .subscribeBy { state ->
                    stateRenderer(state)
                }
}

请注意,可以通过在下面的表达式中内联我分配给stateObservable的可观察变量,并使用方法引用作为第二个参数来进行扫描,从而进一步简化此过程
fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
    compositeDisposable +=
            intents.concatMap { action ->
                when (action) {
                    is Actions.Increment -> {
                        Observable.create<Change> { emitter ->
                            // emit things
                        }
                    }
                    is Actions.Decrement -> {
                        Observable.create<Change> { emitter ->
                            // emit things
                        }
                    }
                }
            }.scan(initialState, this::reducer)
                    .distinctUntilChanged()
                    .subscribeBy { state ->
                        stateRenderer(state)
                    }
}

08-05 13:07