我有google-pubsub api的Java客户端。

我同时向pubsub发送了4条消息。

我的订阅者每小时触发一次,并在pubsub上循环直到其为空。

尽管如此,我看到消息是在不同的时间获取的,即使它们都是同时发送的,并且直到下一个自动的用户同步获取还有很长的延迟。

// A hotfix as cofman is too slow for ack
while (pubSubIsFull) {
    // Fetch messages by order
    log("before fetched messages by order");
    List<ReceivedMessage> messages = mySyncSubscriber.fetch(100000, true);

    messages = messages.stream().sorted(Comparator.comparingLong(message2 -> message2.getMessage().getPublishTime().getSeconds())).collect(Collectors.toList());

    log("fetched " + messages.size() + " messages by order");
    pubSubIsFull = messages.size() > 0;

    log("before processMessages");
    List<String> ackIds = processMessages(messages);
    log("after processMessages");
    mySyncSubscriber.sendAck(ackIds);
}

import com.google.pubsub.v1.PullRequest.Builder;

public List<ReceivedMessage> fetch(int maxMessages, boolean returnImmediately) {
    String subscriptionName = this.getSubscriptionName(this.getSubscriptionId()).toString();
    Builder pullRequestBuilder = PullRequest.newBuilder().setSubscription(subscriptionName).setReturnImmediately(returnImmediately);

    if (maxMessages != 0) {
      pullRequestBuilder.setMaxMessages(maxMessages);
    }

    PullRequest pullRequest = pullRequestBuilder.build();
    PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call(pullRequest);

    return pullResponse.getReceivedMessagesList();
}

这是pub sub中的错误吗?

我可以做些改变吗?

最佳答案

Cloud Pub / Sub不提供任何订购保证。如果您发布多条消息(甚至间隔一定的时间),然后在以后获取所有消息,则不能保证订阅者会首先收到较早发布的消息。

此外,在单个响应中接收0条消息不是确定没有可用消息的好方法。有时即使有可用消息,响应也将包含0条消息,特别是如果returnImmediately设置为true(如my response to your other question中所述)。至少,在确定没有可用消息之前,您需要确保一段时间内的几个请求都返回0条消息。更好的方法是查询Stackdriver metric for subscription/num_undelivered_messages ,它指示您的订阅尚未确认的消息数。

07-25 22:12
查看更多