问题描述
就反应式流而言,有一个发布者,它可以有尽可能多的订阅者.
In terms of Reactive Streams, there is a Publisher and it could have as many Subscribers.
但是,假设订阅者从发布者那里收到一条消息.现在,此订户(例如Subs1)更改/修改了该消息,并将其传递给其他订户(例如Subs2),后者使用了已修改的消息.
But suppose, a subscriber gets a message from Publisher. Now this Subscriber(say Subs1) changes/modifies the message and passes it to some other Subscriber(say Subs2), which consumes the modified message.
那么,这个Subs1订阅者可以充当发布者,该发布者可以将消息传递给新的Subs2订阅者吗?
So can this Subs1 subscriber can act as a Publisher which can pass on message to new Subs2 subscriber?
我不确定是否可以,但是我认为这种情况是可能的.
I am not sure if it possible or not, but the scenario is possible i think.
如果可能,请提出一个可行的方法.
If its possible, please suggest a possible way to do this.
推荐答案
这是完成此操作的完整实现:
Here is the complete implementation to do this:
-
创建一个MyTransformer类,该类实现Processor并扩展SubmissionPublisher,因为它将同时充当Subscriber和Publisher:
Create a MyTransformer class which implements Processor and extends SubmissionPublisher, as it will act both as Subscriber and Publisher:
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyTransformer(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onComplete() {
System.out.println("Transformer Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("Transformer Got : "+item);
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
创建一个TestSubscriber类,该类实现Subscriber接口并实现所需的方法:
Create an TestSubscriber class which implements the Subscriber interface and implement the required methods:
在处理开始之前调用onSubscribe()方法.订阅的实例作为参数传递.这是一个类,用于控制订阅服务器和发布服务器之间的消息流.
The onSubscribe() method is called before processing starts. The instance of the Subscription is passed as the argument. It is a class that is used to control the flow of messages between Subscriber and the Publisher.
这里的主要方法是onNext()-每当发布者发布新消息时都会调用此方法.
The main method here is onNext() – this is called whenever the Publisher publishes a new message.
我们正在使用实现Publisher接口的SubmissionPublisher类.
We are using the SubmissionPublisher class which implements the Publisher interface.
我们将向发布者提交N个元素-我们的TestSubscriber将接收该元素.
We’re going to be submitting N elements to the Publisher – which our TestSubscriber will be receiving.
请注意,我们正在TestSubscriber实例上调用close()方法.它将在给定发布者的每个订阅者下面调用onComplete()回调.
Note, that we’re calling the close() method on the instance of the TestSubscriber. It will invoke onComplete() callback underneath on every Subscriber of the given Publisher.
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class TestSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumed = new LinkedList<>();
@Override
public void onComplete() {
System.out.println("Subsciber Completed");
}
@Override
public void onError(Throwable arg0) {
arg0.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("In Subscriber Got : "+item);
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
- 这里是发布者正在发布String元素的处理流程.
MyTransformer正在将String解析为Integer,这意味着此处需要进行转换.
MyTransformer is parsing the String as Integer – which means a conversion needs to happening here.
import java.util.List;
import java.util.concurrent.SubmissionPublisher;;
public class TestTransformer {
public static void main(String... args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
}
}
这篇关于订阅者可以充当发布者吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!