本文介绍了为什么在与PriceResource Publisher的多个连接中,只有一个获得流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

似乎只有一个http客户端获取数据流,而其他HTTP客户端没有.

It seems that only one http client gets the stream of data, while the others do not.

发布者是否为热点数据,是否应该将其广播给所有订阅者?

Is it true that the Publisher is hot data, and that it should broadcast to all subscribers?

请在

package org.acme.kafka;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import org.jboss.resteasy.annotations.SseElementType;
import org.reactivestreams.Publisher;

import io.smallrye.reactive.messaging.annotations.Channel;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static io.reactivex.Flowable.fromIterable;

/**
 * A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event.
 */
@Path("/migrations")
public class StreamingResource {
    private volatile Map<String, String> counterBySystemDate = new ConcurrentHashMap<>();

    @Inject
    @Channel("migrations")
    Flowable<String> counters;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType("text/plain") // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<String> stream() {
        Flowable<String> mainStream = counters.doOnNext(dateSystemToCount -> {
            String key = dateSystemToCount.substring(0, dateSystemToCount.lastIndexOf("_"));
            counterBySystemDate.put(key, dateSystemToCount);
        });
        return fromIterable(counterBySystemDate.values().stream().sorted().collect(Collectors.toList()))
                .concatWith(mainStream)
                .onBackpressureLatest();
    }
}

推荐答案

您可以使用重放运算符或 ConnectableObservable

这篇关于为什么在与PriceResource Publisher的多个连接中,只有一个获得流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 07:02
查看更多