我有一个类型为Idsint64数组,这是我要发布的Nsq消息。

nsqMsg := st{
    Action  :     "insert",
    Ids     :     Ids
    GID     :     Gids
}

msg, err := json.Marshal(nsqMsg)
if err != nil {
    log.Println(err)
    return err
}
err = nsqProducer.Publish(TOPIC-NAME, msg)
if err != nil {
    log.Println(err)
    return err
}

在我的消费者中,我要一个一个地获取每个ID,然后从我的数据存储中获取基于我的ID的信息。

因此,在提取时,如果我的CreateObject方法返回错误,则可能会出现这种情况,因此我可以通过重新排队消息(产生错误)来处理这种情况,因此可以重试。
for i := 0; i < len(data.Ids); i++ {

    Object, err := X.CreateObject(data.Ids[i)
        if err != nil {
            requeueMsgData = append(requeueMsgData, data.Ids[i])
            continue
        }
        DataList = append(DataList, Object)
    }

    if len(requeueMsgData) > 0 {
        msg, err := json.Marshal(requeueMsgData)
        if err != nil {
            log.Println(err)
            return err
        }
        message.Body = msg
        message.Requeue(60 * time.Second)
        log.Println("error while creating Object", err)
        return n
}

那么,这是正确的方法吗?
他们在这种情况下有什么缺点吗?
再次发布更好吗?

最佳答案

某些队列(例如Kafka)支持确认,直到消费者真正确认成功接收到该项目后,才会从队列中删除已出队的项目。

这种模型的优点是,如果消费者在消费后死亡但在确认之前死亡,则该商品将自动重新排队。模型的缺点是,在这种情况下,该项目可能会丢失。

确认模型的风险在于,现在物品可能会被双重消耗。如果消费者尝试使用具有副作用(例如增加计数器或更改数据库)但没有意识到的消费,则重试可能不会产生所需的结果。 (请注意,即使您不重新排队数据,也无法保证通过nsq文档进行重试,因此您的代码无论如何都必须对此有所防御。)

如果您想更深入地了解这一点,则应该研究“完全一次”与“最多一次”处理。

仔细阅读nsq文档,似乎不支持确认,因此,如果必须使用nsq,这可能是您的最佳选择。

关于go - NSQ重新排队一半消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52238258/

10-09 06:00