问题描述
我有传入的处理请求,由于共享资源的耗尽,我不希望同时进行太多处理.我还希望共享某些唯一密钥的请求不要同时执行:
I have incoming processing requests, of which I want do not want too many processing concurrently due to depleting shared resources. I also would prefer requests which share some unique key to not be executed concurrently:
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
但是,以上操作不起作用,因为每个键的可观察值从未完成.实现此目的的正确方法是什么?
However, the above doesn't work because the observable per key never completes. What is the correct way to achieve this?
什么不起作用:
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})
推荐答案
这是我设法实现这一目标的方法.对于每个唯一键,我正在分配专用的单线程调度程序(以便按顺序处理具有相同键的消息):
Here's how I managed to achieve this. For each unique key I am assigning dedicated single thread scheduler (so that messages with the same key are processed in order):
@Test
public void groupBy() throws InterruptedException {
final int NUM_GROUPS = 10;
Observable.interval(1, TimeUnit.MILLISECONDS)
.map(v -> {
logger.info("received {}", v);
return v;
})
.groupBy(v -> v % NUM_GROUPS)
.flatMap(grouped -> {
long key = grouped.getKey();
logger.info("selecting scheduler for key {}", key);
return grouped
.observeOn(assignScheduler(key))
.map(v -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + key, threadName);
logger.info("processing {} on {}", v, threadName);
return v;
})
.observeOn(Schedulers.single()); // re-schedule
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(1000);
}
在我的情况下,键的数量(NUM_GROUPS)很小,因此我为每个键创建了专用的调度程序:
In my case the number of keys (NUM_GROUPS) is small so I create dedicated scheduler for each key:
Scheduler assignScheduler(long key) {
return Schedulers.from(Executors.newSingleThreadExecutor(
r -> new Thread(r, "proc-" + key)));
}
如果键的数量是无限的或太大而无法为每个键专用一个线程,则可以创建一个调度程序池并按如下方式重用它们:
In case the number of keys is infinite or too large to dedicate a thread for each one, you can create a pool of schedulers and reuse them like this:
Scheduler assignScheduler(long key) {
// assign randomly
return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)];
}
这篇关于在RxJava/RxScala中组合groupBy和flatMap(maxConcurrent,...)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!