问题描述
我的代码基本上是遵循官方教程,主要目的是收集来自一个订阅(Constants.UNFINISHEDSUBID)的所有消息并在另一个订阅上重新发布。但目前我面临一个问题,我无法解决。在我的实现中,调用subscriber.stopAsync()会导致以下异常: Mai 04,2017 4:59:25 PM com .google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND:使用执行程序执行可运行com.google.common.util.concurrent.Futures$6@6e13e898时的RuntimeException java.util.concurrent.Executors$DelegatedScheduledExecutorService@2f3c6ac4
java.util.concurrent.RejectedExecutionException:任务java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@60d40af2从java.util.concurrent.ScheduledThreadPoolExecutor@d55b6e中被拒绝[已终止,池大小= 0,活动线程数= 0,排队任务数= 0 ,已完成的任务在= $ java.util.concurrent.ThreadPoolExecutor中AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
。在java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)320]
在java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExe cutor.java:326)
在java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
在java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
在java.util.concurrent.Executors $ DelegatedExecutorService.execute(Executors.java:668)
在com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
在com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
的com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
在io.grpc.stub.ClientCalls $ GrpcFuture.set(ClientCalls.java:458)
在io.grpc.stub.ClientCalls $ UnaryStreamToFuture.onClose(ClientCalls.java:437)
在io.grpc .internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
。在io.grpc.internal.ClientCallImpl.access $ 100(ClientCallImpl.java:76)
。在io.grpc.internal.ClientCallImpl $ ClientStreamListene rImpl.close(ClientCallImpl.java:514)在io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl.access
$ 700(ClientCallImpl.java:431)
。在io.grpc.internal.ClientCallImpl $ $ ClientStreamListenerImpl 1StreamClosed .runInContext(ClientCallImpl.java:546)
在io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)在io.grpc.internal.SerializingExecutor $ TaskRunner.run
(SerializingExecutor.java :152)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
我也注意到那种随机地,有时所有的信息,有时只是一些或不是一个单一的收集。正在调用subscriber.stopAsync()不正确的方法吗?
我当前的实现:
protected void pullUnfinished()抛出异常{
List< PubsubMessage> jobsToRepublish = new ArrayList<>();
SubscriptionName订阅=
SubscriptionName.create(Constants.PROJECTID,Constants.UNFINISHEDSUBID);
MessageReceiver receiver = new MessageReceiver(){
@Override $ b $ public void receiveMessage(PubsubMessage message,AckReplyConsumer consumer){
synchronized(jobsToRepublish){
jobsToRepublish.add(消息);
}
String unfinishedJob = message.getData()。toStringUtf8();
LOG.info(got message:{},unfinishedJob);
consumer.ack();
}
};
订户订户=空;
尝试{
ChannelProvider channelProvider = new PlainTextChannelProvider();
subscriber = Subscriber.defaultBuilder(subscription,receiver)
.setChannelProvider(channelProvider)
.build();
subscriber.addListener(new Subscriber.Listener(){
@Override
public void failed(Subscriber.State from,Throwable failure){
System.err.println(failure) ;
}
},MoreExecutors.directExecutor());
subscriber.startAsync()。awaitRunning();
Thread.sleep(60000);
} finally {
if(subscriber!= null){
subscriber.stopAsync(); //导致异常
publishJobs(jobsToRepublish);
}
public class PlainTextChannelProvider implements ChannelProvider {
$ b $ @Override
public boolean shouldAutoClose(){
// TODO自动生成的方法存根
返回false;
$ b @Override
public boolean needsExecutor(){
// TODO自动生成的方法存根
return false;
$ b @Override
public ManagedChannel getChannel()抛出IOException {
return NettyChannelBuilder.forAddress(localhost,8085)
.negotiationType(NegotiationType .PLAINTEXT)
.build();
$ b @Override
public ManagedChannel getChannel(Executor executor)抛出IOException {
return getChannel();
}
}
I从JUnit测试中运行类似的代码时发现了完全相同的问题,并发现这个,这表明一个ThreadPool在Listeners仍然指向它时关闭。我还查看了在GitHub上找到了一个示例,用于在startAsync()的JavaDoc中接收大量消息,建议等待终止stopAsync ()。
$ b
尝试更改
subscriber.stopAsync();
至
subscriber.stopAsync()awaitTerminated();
为我工作。
My code is basically following the official tutorials and the main purpose is to collect all messages from one subscription (Constants.UNFINISHEDSUBID) and republish them on another. But currently I'm facing a problem, that i can't solve. In my implementation calling subscriber.stopAsync() results in the following exception:
Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND: RuntimeException while executing runnable com.google.common.util.concurrent.Futures$6@6e13e898 with executor java.util.concurrent.Executors$DelegatedScheduledExecutorService@2f3c6ac4
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@60d40af2 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@d55b6e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I also noticed that kind of randomly, sometimes all messages and sometimes just a few or not a single one get collected. Is calling subscriber.stopAsync() not the correct way?
My current implementation:
protected void pullUnfinished() throws Exception {
List<PubsubMessage> jobsToRepublish = new ArrayList<>();
SubscriptionName subscription =
SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID);
MessageReceiver receiver = new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
synchronized(jobsToRepublish){
jobsToRepublish.add(message);
}
String unfinishedJob = message.getData().toStringUtf8();
LOG.info("got message: {}", unfinishedJob);
consumer.ack();
}
};
Subscriber subscriber = null;
try {
ChannelProvider channelProvider = new PlainTextChannelProvider();
subscriber = Subscriber.defaultBuilder(subscription, receiver)
.setChannelProvider(channelProvider)
.build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println(failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync(); //Causes the exception
}
}
publishJobs(jobsToRepublish);
}
public class PlainTextChannelProvider implements ChannelProvider {
@Override
public boolean shouldAutoClose() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean needsExecutor() {
// TODO Auto-generated method stub
return false;
}
@Override
public ManagedChannel getChannel() throws IOException {
return NettyChannelBuilder.forAddress("localhost", 8085)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}
@Override
public ManagedChannel getChannel(Executor executor) throws IOException {
return getChannel();
}
}
I had the exact same issue when running similar code from a JUnit test, and found this related answer on multithreading in general, suggesting that a ThreadPool is closed while Listeners are still referring to it. I also looked into the code of Subscriber.java on GitHub, and found an example for receiving a number of messages in the JavaDoc on startAsync(), suggesting to wait for termination of stopAsync().
Try changing
subscriber.stopAsync();
to
subscriber.stopAsync().awaitTerminated();
Worked for me.
这篇关于Subscriber.stopAsync()导致RejectedExecutionException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!