本文介绍了证明我RxJava中的PublishSubject不是线程安全的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
已声明PublishSubject在RxJava中不是线程安全的.好的.
It is being declared that PublishSubject is not thread safe in RxJava. Ok.
我试图找到任何示例,我试图构建任何示例来模拟竞争条件,这会导致不良结果.但是我不能:(
I'm trying to find any example, I'm trying to construct any example to emulate race condition, that leads to unwanted results. But I can't :(
任何人都可以提供一个示例来证明PublishSubject不是线程安全的吗?
Can anyone provide an example proving that PublishSubject is not thread safe?
推荐答案
我已经找到了证明.我认为这个示例比提供的@akarnokd更明显.
I've found the proof. I think this example more obvious then @akarnokd provided.
AtomicInteger counter = new AtomicInteger();
// Thread-safe
// SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();
// Not Thread Safe
PublishSubject<Object> subject = PublishSubject.create();
Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);
Consumer<Integer> sleep = (s) -> {
try {
Thread.sleep(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
subject
.doOnNext(i -> counter.incrementAndGet())
.doOnNext(i -> counter.decrementAndGet())
.doOnNext(print)
.filter(i -> counter.get() != 0)
.doOnNext(i -> {
throw new NullPointerException("Concurrency detected");
}
)
.subscribe();
Runnable r = () -> {
for (int i = 0; i < 100000; i++) {
sleep.accept(1);
subject.onNext(i);
}
};
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(r);
pool.execute(r);
这篇关于证明我RxJava中的PublishSubject不是线程安全的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!