目前,我有管理从本地存储检索数据的服务,但也会检查远程网络中是否有任何修改过的数据。它正在使用一个模式和协议类型为Result
的完成处理程序,但希望将其转换为可观察的方法。
以下是当前的逻辑:
struct AuthorWorker: AuthorWorkerType, Loggable {
private let store: AuthorStore
private let remote: AuthorRemote
init(store: AuthorStore, remote: AuthorRemote) {
self.store = store
self.remote = remote
}
}
extension AuthorWorker {
func fetch(id: Int, completion: @escaping (Result<AuthorType, DataError>) -> Void) {
store.fetch(id: id) {
// Immediately return local response
completion($0)
guard case .success(let cacheElement) = $0 else { return }
// Sync remote updates to cache if applicable
remote.fetch(id: id) {
// Validate if any updates occurred and return
guard case .success(let element) = $0,
element.modifiedAt > cacheElement.modifiedAt else {
return
}
// Update local storage with updated data
self.store.createOrUpdate(element) {
guard case .success = $0 else { return }
// Callback handler again if updated
completion($0)
}
}
}
}
}
我总是立即将本地数据返回给ui,这样用户就不用等待了。在后台,它正在检查远程网络中是否有修改过的数据,并仅在必要时再次更新ui。我是这样用的:
authorWorker.fetch(1) { [weak self] in
guard case .success(let value) = $0 else {
// alert error
}
self?.myLabel.text = value.name
}
如何将其转换为rxswift或可观测概念?这是我开始做的,但我还没有看到墙上的代码像尼欧谈到RX,所以我需要帮助看光。
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable<AuthorType>.create { observer in
store.fetch(id: id) {
// Immediately return local response
observer.on(.next($0))
guard case .success(let cacheElement) = $0 else {
observer.on(.completed)
return
}
// Sync remote updates to cache if applicable
remote.fetch(id: id) {
// Validate if any updates occurred and return
guard case .success(let element) = $0,
element.modifiedAt > cacheElement.modifiedAt else {
observer.on(.completed)
return
}
// Update local storage with updated data
self.store.createOrUpdate(element) {
guard case .success = $0 else {
observer.on(.completed)
return
}
// Callback handler again if updated
observer.on(.next($0))
observer.on(.completed)
}
}
}
}
}
}
那我就这样用它?
authorWorker.fetch(1).subscribe { [weak self] in
guard let element = $0.element else {
// Handle error how?
return
}
self?.myLabel.text = element.name
}
这是正确的方法还是有更推荐的方法?是否也值得将底层的远程和本地存储转换为可观测的,或者不将所有的东西转换为可观测的有意义吗?
最佳答案
新答案
根据我的评论,我知道你想要比我的第一个答案更详细的东西,所以你来吧。
func worker<T: Equatable>(store: Observable<T>, remote: Observable<T>) -> (value: Observable<T>, store: Observable<T>) {
let sharedStore = store.share(replay: 1)
let sharedRemote = remote.share(replay: 1)
let value = Observable.merge(sharedStore, sharedRemote)
.distinctUntilChanged()
.takeUntil(sharedRemote.materialize().filter { $0.isStopEvent })
let store = Observable.zip(sharedStore, sharedRemote)
.filter { $0.0 != $0.1 }
.map { $0.1 }
return (value: value, store: store)
}
以下是在您的AuthorWorker类中使用的上面的代码:
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
let (_value, _store) = worker(store: store.fetch(id: id), remote: remote.fetch(id: id))
_ = _store
.subscribe(onNext: store.createOrUpdate)
return _value
}
}
这里有一个测试套件证明它可以正常工作:
class Tests: XCTestCase {
var scheduler: TestScheduler!
var emission: TestableObserver<String>!
var storage: TestableObserver<String>!
var disposeBag: DisposeBag!
override func setUp() {
super.setUp()
scheduler = TestScheduler(initialClock: 0)
emission = scheduler.createObserver(String.self)
storage = scheduler.createObserver(String.self)
disposeBag = DisposeBag()
}
func testHappyPath() {
let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let remoteProducer = scheduler.createColdObservable([.next(20, "remote"), .completed(21)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .next(20, "remote"), .completed(21)])
XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
}
func testSameValue() {
let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let remoteProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .completed(21)])
XCTAssertEqual(storage.events, [.completed(21)])
}
func testRemoteFirst() {
let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let remoteProducer = scheduler.createColdObservable([.next(10, "remote"), .completed(11)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "remote"), .completed(11)])
XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
}
func testRemoteFirstSameValue() {
let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let remoteProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .completed(11)])
XCTAssertEqual(storage.events, [.completed(21)])
}
}
上一个答案
我倾向于这样的用法:
let result = authorWorker.fetch(id: 1)
.share()
result
.map { $0.description }
.catchErrorJustReturn("")
.bind(to: myLabel.rx.text)
.disposed(by: disposeBag)
result
.subscribe(onError: { error in
// handle error here
})
.disposed(by: disposeBag)
如果您有如下所示的东西,则可以完成上述操作:
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.merge(store.fetch(id: id), remote.fetch(id: id))
.distinctUntilChanged()
}
}
extension AuthorStore {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.create { observer in
self.fetch(id: id, completion: { result in
switch result {
case .success(let value):
observer.onNext(value)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
}
})
return Disposables.create()
}
}
}
extension AuthorRemote {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.create { observer in
self.fetch(id: id, completion: { result in
switch result {
case .success(let value):
observer.onNext(value)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
}
})
return Disposables.create()
}
}
}