我正在制作输入流速率计。它基本上是一种服务,它公开请求流调用并计算每秒可以处理多少消息。

由于客户端在发送消息时完全不同步,因此我使用ClientCallStreamObserver在流准备就绪时才开始发送消息,以避免内存溢出。

客户端代码如下所示:

public static void main(String[] args) throws Exception {
    ManagedChannel channel =  ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
    ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);


    StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
        @Override
        public void onNext(Empty empty) {

        }

        @Override
        public void onError(Throwable throwable) {
            logger.info("on error response stream");
        }

        @Override
        public void onCompleted() {
            logger.info("on completed response stream");
        }
    });

    final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;

    while (!clientCallObserver.isReady()) {
        Thread.sleep(2000);
        logger.info("stream not ready yet");
    }

    counter.setLastTic(System.nanoTime());

    while (true) {
        counter.inc();
        if (counter.getCounter() % 15000 == 0 ) {
            long now = System.nanoTime();
            double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
            logger.info("rate: " + rate + " msgs per sec");
            counter.clear();
            counter.setLastTic(now);
        }
        inputStream.onNext(createRandomTrade());
    }
}


我对isReady的观察循环永无止境。

OBS:我正在使用kubernetes集群进行测试,服务器正在接收呼叫并返回StreamObserver实现。

最佳答案

只要RPC不会立即错误/完成,isReady最终应返回true。但是代码没有正确观察流控制。

每次调用onNext()发送请求后,isReady()可能会开始返回false。您的while (true)循环应该在每次迭代的开始都进行isReady()检查。

与轮询相比,最好在呼叫准备发送时呼叫serverCallObserver.setOnReadyHandler(yourRunnable)以得到通知。请注意,您仍应检查isReady()中的yourRunnable,因为可能会有虚假通知/过期通知。

关于java - ClientCallStreamObserver isReady永远不会返回true,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44524718/

10-12 14:18