RxScala中组合groupBy和flatMap

RxScala中组合groupBy和flatMap

本文介绍了在RxJava/RxScala中组合groupBy和flatMap(maxConcurrent,...)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有传入的处理请求,由于共享资源的耗尽,我不希望同时进行太多处理.我还希望共享某些唯一密钥的请求不要同时执行:

I have incoming processing requests, of which I want do not want too many processing concurrently due to depleting shared resources. I also would prefer requests which share some unique key to not be executed concurrently:

def process(request: Request): Observable[Answer] = ???

requestsStream
  .groupBy(request => request.key)
  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
      requestsForKey
         .flatMap(1, process)
  })

但是,以上操作不起作用,因为每个键的可观察值从未完成.实现此目的的正确方法是什么?

However, the above doesn't work because the observable per key never completes. What is the correct way to achieve this?

什么不起作用:

  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
      // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
      requestsForKey.take(1)
         .flatMap(1, process)
  })

 .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
      // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
      // This discards all requests after the first if processing takes more than 100 milliseconds
      requestsForKey.timeout(100.millis, Observable.empty)
         .flatMap(1, process)
  })

推荐答案

这是我设法实现这一目标的方法.对于每个唯一键,我正在分配专用的单线程调度程序(以便按顺序处理具有相同键的消息):

Here's how I managed to achieve this. For each unique key I am assigning dedicated single thread scheduler (so that messages with the same key are processed in order):

@Test
public void groupBy() throws InterruptedException {
    final int NUM_GROUPS = 10;
    Observable.interval(1, TimeUnit.MILLISECONDS)
            .map(v -> {
                logger.info("received {}", v);
                return v;
            })
            .groupBy(v -> v % NUM_GROUPS)
            .flatMap(grouped -> {
                long key = grouped.getKey();
                logger.info("selecting scheduler for key {}", key);
                return grouped
                        .observeOn(assignScheduler(key))
                        .map(v -> {
                            String threadName = Thread.currentThread().getName();
                            Assert.assertEquals("proc-" + key, threadName);
                            logger.info("processing {} on {}", v, threadName);
                            return v;
                        })
                        .observeOn(Schedulers.single()); // re-schedule
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(1000);
}

在我的情况下,键的数量(NUM_GROUPS)很小,因此我为每个键创建了专用的调度程序:

In my case the number of keys (NUM_GROUPS) is small so I create dedicated scheduler for each key:

Scheduler assignScheduler(long key) {
    return Schedulers.from(Executors.newSingleThreadExecutor(
        r -> new Thread(r, "proc-" + key)));
}

如果键的数量是无限的或太大而无法为每个键专用一个线程,则可以创建一个调度程序池并按如下方式重用它们:

In case the number of keys is infinite or too large to dedicate a thread for each one, you can create a pool of schedulers and reuse them like this:

Scheduler assignScheduler(long key) {
    // assign randomly
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)];
}

这篇关于在RxJava/RxScala中组合groupBy和flatMap(maxConcurrent,...)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 06:07