我的代码基本上遵循官方教程,主要目的是从一个订阅(Constants.UNFINISHEDSUBID)收集所有消息,然后将其重新发布到另一个订阅上。但是目前我正面临一个无法解决的问题。在我的实现中,调用subscriber.stopAsync()导致以下异常:

Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@6e13e898 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@2f3c6ac4
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@60d40af2 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@d55b6e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
    at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


我还注意到这种情况是随机的,有时是所有消息,有时只是几条消息,或者不是一条消息。调用Subscriber.stopAsync()不是正确的方法吗?

我当前的实现:

protected void pullUnfinished() throws Exception {
    List<PubsubMessage> jobsToRepublish = new ArrayList<>();
    SubscriptionName subscription =
            SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID);

    MessageReceiver receiver = new MessageReceiver() {
        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            synchronized(jobsToRepublish){
                jobsToRepublish.add(message);
            }
            String unfinishedJob = message.getData().toStringUtf8();
            LOG.info("got message: {}", unfinishedJob);
            consumer.ack();
        }
    };

    Subscriber subscriber = null;
    try {
        ChannelProvider channelProvider = new PlainTextChannelProvider();
        subscriber = Subscriber.defaultBuilder(subscription, receiver)
                               .setChannelProvider(channelProvider)
                               .build();
        subscriber.addListener(new Subscriber.Listener() {
            @Override
            public void failed(Subscriber.State from, Throwable failure) {
                System.err.println(failure);
            }
        }, MoreExecutors.directExecutor());
        subscriber.startAsync().awaitRunning();
        Thread.sleep(60000);
    } finally {
        if (subscriber != null) {
            subscriber.stopAsync(); //Causes the exception
        }
    }
    publishJobs(jobsToRepublish);
}

public class PlainTextChannelProvider implements ChannelProvider {

    @Override
    public boolean shouldAutoClose() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean needsExecutor() {
        // TODO Auto-generated method stub
        return false;
    }

      @Override
      public ManagedChannel getChannel() throws IOException {
        return NettyChannelBuilder.forAddress("localhost", 8085)
          .negotiationType(NegotiationType.PLAINTEXT)
          .build();
      }

      @Override
      public ManagedChannel getChannel(Executor executor) throws IOException {
        return getChannel();
      }
}

最佳答案

当运行来自JUnit测试的类似代码时,我遇到了完全相同的问题,并且通常在多线程上发现了这个related answer,这表明在监听器仍在引用ThreadPool的同时将其关闭。我还研究了GitHub上的Subscriber.java代码,并找到了一个示例,用于在JavaDoc中的startAsync()上接收大量消息,建议等待stopAsync()终止。

尝试改变

subscriber.stopAsync();




subscriber.stopAsync().awaitTerminated();


为我工作。

关于java - Subscriber.stopAsync()导致RejectedExecutionException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43786716/

10-10 11:56