我正在用一百万个项目的列表测试gRPC,并通过流发送这百万个项目。

我在客户端上有以下代码:

在我的测试主机上:“ Localhost”,ipPort = 7777

ManagedChannel comunicationChanel = ManagedChannelBuilder.forAddress(host, ipPort)
                .enableFullStreamDecompression().compressorRegistry(CompressorRegistry.getDefaultInstance())
                .decompressorRegistry(DecompressorRegistry.getDefaultInstance()).usePlaintext(true)
                .maxInboundMessageSize(200888896).build();


ListMessageSRVStub asyncStub = ListMessageSRVGrpc.newStub(comunicationChanel);

List<MessageValue> millionMessages = new ArrayList<MessageValue>();
        for (long i = 0l; i < 1000000; i++) {
            millionMessages.add(MessageValue.newBuilder().build());
        }

long before = System.currentTimeMillis();
        StreamObserver<MessageValue> requestObserver = asyncStub.recievetonm(responseObserverTonMessages);
        long i = 0;
        for (MessageValue messageValue : millionMessages) {
            requestObserver.onNext(messageValue);
            i++;
            if (i % 50000 == 0) {
                LOG.info("Sended: " + i);
            }
        }
        requestObserver.onCompleted();
        long total = System.currentTimeMillis() - before;
        LOG.info("Time = " + total);


但是我有这个异常:

Exception in thread "main" io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1879048487, max: 1894252544)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:214)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
    at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:121)
    at io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
    at io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:226)
    at io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:167)
    at io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:140)
    at io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:52)
    at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:438)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
    at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:320)
    at com.oesia.grpgtest.server.TestClient.tonsofMSG(TestClient.java:130)
    at com.oesia.grpgtest.server.TestClient.main(TestClient.java:146)


有什么方法可以解决发送大量数据的问题吗?

最佳答案

您是否尝试过针对慢速接收器进行写:

public class GracefulWriteHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    writeIfPossible(ctx.channel());
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) {
    writeIfPossible(ctx.channel());
  }

  private void writeIfPossible(Channel channel) {
    while(needsToWrite && channel.isWritable()) {
      channel.writeAndFlush(createMessage());
    }
  }
}

关于java - 协议(protocol)缓冲流测试gRPC百万条消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48419744/

10-10 04:07