本文介绍了Rx:结合 ThrottleFirst 和 Sample 运算符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
给定一个源 observable S,我怎样才能让 RxJava/Rx 产生 observable D,那:
Given a source observable S, how can I ask RxJava / Rx to produce observable D, that:
- 立即从 S 发出第一项
- 在发射每个项目之后和发射下一个项目 L 之前至少等待 T 秒,其中 L 是 S 在等待期间发射的最后一个项目
- 如果 S 在等待期 T 内没有产生任何项目(从第 2 点开始),则在它出现在 S 之后立即发出下一个项目
大理石图:
我想用:
- 示例运算符,但它不满足要求 #3.
- Debounce 运算符,但它也不满足要求 #3.
- ThrottleFirst 运算符,但它不满足要求 #2,因为它不记得 L(而 Sample 会这样做).
我更喜欢最简单的答案,即使用标准运算符(如果可能).
I would prefer the most simple answer, that utilises standard operators (if it is possible).
推荐答案
如果仅限于标准运算符,则可以通过使用 publish
并在两种收集模式之间切换来实现:直接和缓冲时间.在后一种模式下,如果缓冲区为空,则切换回直接模式:
If one is limited to standard operators only, this could be achieved by using publish
and switching between two collection modes: direct, and buffer with time. In the latter mode, if the buffer turns out to be empty, switch back to the direct mode:
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;
public class ThrottleSampleTest {
@Test
public void test() {
TestScheduler tsch = new TestScheduler();
Flowable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v ->
System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS))
);
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
static final Exception RESTART_INDICATOR = new Exception();
static <T> FlowableTransformer<T, T> throttleFirstSample(
long time, TimeUnit unit, Scheduler scheduler) {
return f ->
f
.publish(g ->
g
.take(1)
.concatWith(
g
.buffer(time, unit, scheduler)
.map(v -> {
if (v.isEmpty()) {
throw RESTART_INDICATOR;
}
return v.get(v.size() - 1);
})
)
.retry(e -> e == RESTART_INDICATOR)
)
;
}
}
另一种方法是使用自定义运算符:
The alternative is to have a custom operator:
@Test
public void testObservable() {
TestScheduler tsch = new TestScheduler();
Observable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
static <T> ObservableTransformer<T, T> throttleFirstSampleObservable(
long time, TimeUnit unit, Scheduler scheduler) {
return f -> new Observable<T>() {
@Override
protected void subscribeActual(Observer<? super T> observer) {
f.subscribe(new ThrottleFirstSampleObserver<T>(
observer, time, unit, scheduler.createWorker()));
}
};
}
static final class ThrottleFirstSampleObserver<T>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 205628968660185683L;
static final Object TIMEOUT = new Object();
final Observer<? super T> actual;
final Queue<Object> queue;
final Worker worker;
final long time;
final TimeUnit unit;
Disposable upstream;
boolean latestMode;
T latest;
volatile boolean done;
Throwable error;
volatile boolean disposed;
ThrottleFirstSampleObserver(Observer<? super T> actual,
long time, TimeUnit unit, Worker worker) {
this.actual = actual;
this.time = time;
this.unit = unit;
this.worker = worker;
this.queue = new ConcurrentLinkedQueue<Object>();
}
@Override
public void onSubscribe(Disposable d) {
upstream = d;
actual.onSubscribe(this);
}
@Override
public void onNext(T t) {
queue.offer(t);
drain();
}
@Override
public void onError(Throwable e) {
error = e;
done = true;
drain();
}
@Override
public void onComplete() {
done = true;
drain();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
@Override
public void dispose() {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
latest = null;
}
}
@Override
public void run() {
queue.offer(TIMEOUT);
drain();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}
int missed = 1;
Observer<? super T> a = actual;
Queue<Object> q = queue;
for (;;) {
for (;;) {
if (disposed) {
q.clear();
latest = null;
return;
}
boolean d = done;
Object v = q.poll();
boolean empty = v == null;
if (d && empty) {
if (latestMode) {
T u = latest;
latest = null;
if (u != null) {
a.onNext(u);
}
}
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
worker.dispose();
return;
}
if (empty) {
break;
}
if (latestMode) {
if (v == TIMEOUT) {
T u = latest;
latest = null;
if (u != null) {
a.onNext(u);
worker.schedule(this, time, unit);
} else {
latestMode = false;
}
} else {
latest = (T)v;
}
} else {
latestMode = true;
a.onNext((T)v);
worker.schedule(this, time, unit);
}
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
这篇关于Rx:结合 ThrottleFirst 和 Sample 运算符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!