我有google-pubsub api的Java客户端。
我同时向pubsub发送了4条消息。
我的订阅者每小时触发一次,并在pubsub上循环直到其为空。
尽管如此,我看到消息是在不同的时间获取的,即使它们都是同时发送的,并且直到下一个自动的用户同步获取还有很长的延迟。
// A hotfix as cofman is too slow for ack
while (pubSubIsFull) {
// Fetch messages by order
log("before fetched messages by order");
List<ReceivedMessage> messages = mySyncSubscriber.fetch(100000, true);
messages = messages.stream().sorted(Comparator.comparingLong(message2 -> message2.getMessage().getPublishTime().getSeconds())).collect(Collectors.toList());
log("fetched " + messages.size() + " messages by order");
pubSubIsFull = messages.size() > 0;
log("before processMessages");
List<String> ackIds = processMessages(messages);
log("after processMessages");
mySyncSubscriber.sendAck(ackIds);
}
import com.google.pubsub.v1.PullRequest.Builder;
public List<ReceivedMessage> fetch(int maxMessages, boolean returnImmediately) {
String subscriptionName = this.getSubscriptionName(this.getSubscriptionId()).toString();
Builder pullRequestBuilder = PullRequest.newBuilder().setSubscription(subscriptionName).setReturnImmediately(returnImmediately);
if (maxMessages != 0) {
pullRequestBuilder.setMaxMessages(maxMessages);
}
PullRequest pullRequest = pullRequestBuilder.build();
PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList();
}
这是pub sub中的错误吗?
我可以做些改变吗?
最佳答案
Cloud Pub / Sub不提供任何订购保证。如果您发布多条消息(甚至间隔一定的时间),然后在以后获取所有消息,则不能保证订阅者会首先收到较早发布的消息。
此外,在单个响应中接收0条消息不是确定没有可用消息的好方法。有时即使有可用消息,响应也将包含0条消息,特别是如果returnImmediately
设置为true(如my response to your other question中所述)。至少,在确定没有可用消息之前,您需要确保一段时间内的几个请求都返回0条消息。更好的方法是查询Stackdriver metric for subscription/num_undelivered_messages
,它指示您的订阅尚未确认的消息数。