我正在考虑用io.projectreactor
替换看起来非常接近ReactiveStreams的本地日志处理库。目的是减少我们维护的代码,并利用社区添加的任何新功能(将运算符(operator)融合)。
首先,我需要使用stdio并将多行日志条目合并为文本blob,这些blob会沿管道流下。用例在Filebeat文档的multiline log entries一章中进行了详细说明(除非我们希望它进行中)。
到目前为止,我拥有的代码是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
这会在检测到新的日志 header 时考虑多行合并,但是在现有库中,我们还会在超时后刷新累积的行(即,如果5秒钟内未收到文本,则刷新记录)。
在Reactor中对此建模的正确方法是什么?我需要编写自己的运算符,还是可以自定义任何现有运算符?
任何在Project Reactor或RxJava中实现此用例的相关示例和文档的指针都将非常受赞赏。
最佳答案
这取决于您如何确定每个缓冲区的开始和结束,因此以下RxJava 2代码旨在作为有关使用主源值打开和关闭缓冲区门的提示:
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
打印:
[Start, A, B, End]
[Start, C]
[Start, D, End]
它通过
publish
共享源来工作,该源允许从上游重用相同的值,而无需一次运行多个源副本。通过检测在线上的“开始”字符串来控制打开。通过检测“结束”字符串或宽限期后触发的计时器来控制关闭。编辑:
如果“开始”还是下一批的指示符,则可以将“结束”检查替换为“开始”并更改缓冲区的内容,因为它将在先前的缓冲区中包括新的 header :
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);
关于rx-java - react 性流-具有超时的批处理,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45055568/