我对PublishSubject
中的RxJava
有疑问。
我创建了一个虚拟的PublishSubject,它发出一些对象。这是我的代码:
override fun generate(exportRequest: ExportRequest): Observable<Report> {
val faker = Faker()
val dummyPublisher = PublishSubject.create<Report>()
for(x in 1..1_000){
val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
val report = Report(dataToExport)
sddPublisher.onNext(report)
Thread.sleep(1)
}
dummyPublisher.onComplete()
return dummyPublisher
}
订阅时,不会发射任何对象。例如,什么都不打印:
... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
println(report)
}
也许我缺少了一些东西。任何帮助将不胜感激
最佳答案
正如@akarnokd在注释中指出的那样,您创建的PublishSubject
立即发出通过其onNext
方法传递给它的任何值。无论当前是否有任何订阅,都会发生这种情况。它的主要目的是帮助弥合命令式或基于回调的代码与反应式代码之间的鸿沟。
您似乎想要的是一个Observable
,一旦有人订阅它,它便开始执行一些同步代码。 Observable.create
是创建此类实例的一种方法,但是正确使用它可能很麻烦。
创建所需内容的一种更方便的方法是Observable.fromPublisher
。它以Publisher
作为参数。 Publisher
本身是一个函数,只要Subscriber
订阅由Observer
创建的Observable
并允许您直接将事件发送到该fromPublisher
,就会将Observer
实例传递给该函数。
您想要的代码如下所示:
fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
return Observable.fromPublisher { subscriber ->
for (x in 1..1_000) {
val fakeReport = genFakeReport()
subscriber.onNext(fakeReport)
Thread.sleep(1)
}
subscriber.onComplete()
}
}
fun main() {
/** supply whatever logic you want to generate a fake [Report] */
fun genFakeReport(): Report = TODO()
val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}
预订
Observable
返回的generateReportStream
实例后,这将正确发出值。另外,可以对同一实例进行更多预订,并且每个预订将使用相同的逻辑发出新的序列值。关于java - 关于RxJava和PublishSubject的初学者问题,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60946441/