我的期望是即时添加可观察对象(例如,上传图像),让它们开始,当我完成对所有内容的动态排队时,等待所有可观察对象完成。

这是我的课:

open class InstantObservables<T> {
    lazy var disposeBag = DisposeBag()

    public init() { }

    lazy var observables: [Observable<T>] = []
    lazy var disposables: [Disposable] = []

    open func enqueue(observable: Observable<T>) {
        observables.append(observable)

        let disposable = observable
            .subscribe()

        disposables.append(disposable)

        disposable
            .addDisposableTo(disposeBag)
    }

    open func removeAndStop(atIndex index: Int) {
        guard observables.indices.contains(index)
            && disposables.indices.contains(index) else {
            return
        }
        let disposable = disposables.remove(at: index)
        disposable.dispose()

        _ = observables.remove(at: index)
    }

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> {
        let multipleObservable = Observable.zip(observables)
        observables.removeAll()
        disposables.removeAll()
        return multipleObservable
    }

    open func cancelObservables() {
        disposeBag = DisposeBag()
    }
}


但是,当我订阅waitForAllObservablesToBeFinished()发送的observable时,它们都将重新执行(这是逻辑,关于Rx的工作原理)。

无论订阅的数量是多少,我如何保证每个都执行一次?

最佳答案

在写问题时,我得到了答案!
通过shareReplay(1)更改可观察对象,并排队并订阅此更改的可观察对象。

这是更新的代码:

open class InstantObservables<T> {
    lazy var disposeBag = DisposeBag()

    public init() { }

    lazy var observables: [Observable<T>] = []
    lazy var disposables: [Disposable] = []

    open func enqueue(observable: Observable<T>) {
        let shared = observable.shareReplay(1)
        observables.append(shared)

        let disposable = shared
            .subscribe()

        disposables.append(disposable)

        disposable
            .addDisposableTo(disposeBag)
    }

    open func removeAndStop(atIndex index: Int) {
        guard observables.indices.contains(index)
            && disposables.indices.contains(index) else {
            return
        }
        let disposable = disposables.remove(at: index)
        disposable.dispose()

        _ = observables.remove(at: index)
    }

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> {
        let multipleObservable = Observable.zip(observables)
        observables.removeAll()
        disposables.removeAll()
        return multipleObservable
    }

    open func cancelObservables() {
        disposeBag = DisposeBag()
    }
}

关于ios - 如何在RxSwift中等待可变数组中的所有可观察对象完成,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45374112/

10-13 03:59