I have incoming processing requests, of which I want do not want too many processing concurrently due to depleting shared resources. I also would prefer requests which share some unique key to not be executed concurrently:
def process(request: Request): Observable[Answer] = ???
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
.flatMap(1, process)
However, the above doesn't work because the observable per key never completes. What is the correct way to achieve this?
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
.flatMap(1, process)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
Here's how I managed to achieve this. For each unique key I am assigning dedicated single thread scheduler (so that messages with the same key are processed in order):
public void groupBy() throws InterruptedException {
final int NUM_GROUPS = 10;
Observable.interval(1, TimeUnit.MILLISECONDS)
.map(v -> {
logger.info("received {}", v);
return v;
.groupBy(v -> v % NUM_GROUPS)
.flatMap(grouped -> {
long key = grouped.getKey();
logger.info("selecting scheduler for key {}", key);
return grouped
.map(v -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + key, threadName);
logger.info("processing {} on {}", v, threadName);
return v;
.observeOn(Schedulers.single()); // re-schedule
.subscribe(v -> logger.info("got {}", v));
In my case the number of keys (NUM_GROUPS) is small so I create dedicated scheduler for each key:
Scheduler assignScheduler(long key) {
return Schedulers.from(Executors.newSingleThreadExecutor(
r -> new Thread(r, "proc-" + key)));
In case the number of keys is infinite or too large to dedicate a thread for each one, you can create a pool of schedulers and reuse them like this:
Scheduler assignScheduler(long key) {
// assign randomly
return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)];