目前,我有管理从本地存储检索数据的服务,但也会检查远程网络中是否有任何修改过的数据。它正在使用一个模式和协议类型为Result的完成处理程序,但希望将其转换为可观察的方法。
以下是当前的逻辑:

struct AuthorWorker: AuthorWorkerType, Loggable {
    private let store: AuthorStore
    private let remote: AuthorRemote

    init(store: AuthorStore, remote: AuthorRemote) {
        self.store = store
        self.remote = remote
    }
}

extension AuthorWorker {

    func fetch(id: Int, completion: @escaping (Result<AuthorType, DataError>) -> Void) {
        store.fetch(id: id) {
            // Immediately return local response
            completion($0)

            guard case .success(let cacheElement) = $0 else { return }

            // Sync remote updates to cache if applicable
            remote.fetch(id: id) {
                // Validate if any updates occurred and return
                guard case .success(let element) = $0,
                    element.modifiedAt > cacheElement.modifiedAt else {
                        return
                }

                // Update local storage with updated data
                self.store.createOrUpdate(element) {
                    guard case .success = $0 else { return }

                    // Callback handler again if updated
                    completion($0)
                }
            }
        }
    }
}

我总是立即将本地数据返回给ui,这样用户就不用等待了。在后台,它正在检查远程网络中是否有修改过的数据,并仅在必要时再次更新ui。我是这样用的:
authorWorker.fetch(1) { [weak self] in
    guard case .success(let value) = $0 else {
        // alert error
    }

    self?.myLabel.text = value.name
}

如何将其转换为rxswift或可观测概念?这是我开始做的,但我还没有看到墙上的代码像尼欧谈到RX,所以我需要帮助看光。
extension AuthorWorker {

    func fetch(id: Int) -> Observable<AuthorType> {
        return Observable<AuthorType>.create { observer in
            store.fetch(id: id) {
                // Immediately return local response
                observer.on(.next($0))

                guard case .success(let cacheElement) = $0 else {
                    observer.on(.completed)
                    return
                }

                // Sync remote updates to cache if applicable
                remote.fetch(id: id) {
                    // Validate if any updates occurred and return
                    guard case .success(let element) = $0,
                        element.modifiedAt > cacheElement.modifiedAt else {
                            observer.on(.completed)
                            return
                    }

                    // Update local storage with updated data
                    self.store.createOrUpdate(element) {
                        guard case .success = $0 else {
                            observer.on(.completed)
                            return
                        }

                        // Callback handler again if updated
                        observer.on(.next($0))
                        observer.on(.completed)
                    }
                }
            }
        }
    }
}

那我就这样用它?
authorWorker.fetch(1).subscribe { [weak self] in
    guard let element = $0.element else {
        // Handle error how?
        return
    }

    self?.myLabel.text = element.name
}

这是正确的方法还是有更推荐的方法?是否也值得将底层的远程和本地存储转换为可观测的,或者不将所有的东西转换为可观测的有意义吗?

最佳答案

新答案
根据我的评论,我知道你想要比我的第一个答案更详细的东西,所以你来吧。

func worker<T: Equatable>(store: Observable<T>, remote: Observable<T>) -> (value: Observable<T>, store: Observable<T>) {
    let sharedStore = store.share(replay: 1)
    let sharedRemote = remote.share(replay: 1)
    let value = Observable.merge(sharedStore, sharedRemote)
        .distinctUntilChanged()
        .takeUntil(sharedRemote.materialize().filter { $0.isStopEvent })
    let store = Observable.zip(sharedStore, sharedRemote)
        .filter { $0.0 != $0.1 }
        .map { $0.1 }

    return (value: value, store:  store)
}

以下是在您的AuthorWorker类中使用的上面的代码:
extension AuthorWorker {
    func fetch(id: Int) -> Observable<AuthorType> {
        let (_value, _store) = worker(store: store.fetch(id: id), remote: remote.fetch(id: id))

        _ = _store
            .subscribe(onNext: store.createOrUpdate)

        return _value
    }
}

这里有一个测试套件证明它可以正常工作:
class Tests: XCTestCase {

    var scheduler: TestScheduler!
    var emission: TestableObserver<String>!
    var storage: TestableObserver<String>!
    var disposeBag: DisposeBag!

    override func setUp() {
        super.setUp()
        scheduler = TestScheduler(initialClock: 0)
        emission = scheduler.createObserver(String.self)
        storage = scheduler.createObserver(String.self)
        disposeBag = DisposeBag()
    }

    func testHappyPath() {
        let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
        let remoteProducer = scheduler.createColdObservable([.next(20, "remote"), .completed(21)])

        let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())

        disposeBag.insert(
            value.subscribe(emission),
            store.subscribe(storage)
        )

        scheduler.start()

        XCTAssertEqual(emission.events, [.next(10, "store"), .next(20, "remote"), .completed(21)])
        XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
    }

    func testSameValue() {
        let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
        let remoteProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])

        let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())

        disposeBag.insert(
            value.subscribe(emission),
            store.subscribe(storage)
        )

        scheduler.start()

        XCTAssertEqual(emission.events, [.next(10, "store"), .completed(21)])
        XCTAssertEqual(storage.events, [.completed(21)])
    }

    func testRemoteFirst() {
        let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
        let remoteProducer = scheduler.createColdObservable([.next(10, "remote"), .completed(11)])

        let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())

        disposeBag.insert(
            value.subscribe(emission),
            store.subscribe(storage)
        )

        scheduler.start()

        XCTAssertEqual(emission.events, [.next(10, "remote"), .completed(11)])
        XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
    }

    func testRemoteFirstSameValue() {
        let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
        let remoteProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])

        let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())

        disposeBag.insert(
            value.subscribe(emission),
            store.subscribe(storage)
        )

        scheduler.start()

        XCTAssertEqual(emission.events, [.next(10, "store"), .completed(11)])
        XCTAssertEqual(storage.events, [.completed(21)])
    }
}

上一个答案
我倾向于这样的用法:
let result = authorWorker.fetch(id: 1)
    .share()

result
    .map { $0.description }
    .catchErrorJustReturn("")
    .bind(to: myLabel.rx.text)
    .disposed(by: disposeBag)

result
    .subscribe(onError: { error in
        // handle error here
    })
    .disposed(by: disposeBag)

如果您有如下所示的东西,则可以完成上述操作:
extension AuthorWorker {

    func fetch(id: Int) -> Observable<AuthorType> {
        return Observable.merge(store.fetch(id: id), remote.fetch(id: id))
            .distinctUntilChanged()
    }
}

extension AuthorStore {
    func fetch(id: Int) -> Observable<AuthorType> {
        return Observable.create { observer in
            self.fetch(id: id, completion: { result in
                switch result {
                case .success(let value):
                    observer.onNext(value)
                    observer.onCompleted()
                case .failure(let error):
                    observer.onError(error)
                }
            })
            return Disposables.create()
        }
    }
}

extension AuthorRemote {
    func fetch(id: Int) -> Observable<AuthorType> {
        return Observable.create { observer in
            self.fetch(id: id, completion: { result in
                switch result {
                case .success(let value):
                    observer.onNext(value)
                    observer.onCompleted()
                case .failure(let error):
                    observer.onError(error)
                }
            })
            return Disposables.create()
        }
    }
}

10-01 02:24
查看更多