是否存在用于使节点0mq pub / sub中的发布者冗余的通用模式?其动机是能够与可能会失败/定期重新启动的发布者一起运行多个流程。

我最初的想法是在母版中创建一个转发器,并从工作发布者处连接到它:

var cluster = require('cluster')
  , zmq = require('zmq')
  , endpointIn = 'ipc:///tmp/cluster_pub_sub'
  , endpointOut = 'tcp://127.0.0.1:7777';

if (cluster.isMaster) {
  for (var i = 0; i < 2; i++) cluster.fork();
  startPubSubForwarder();
} else {
  startPublisher();
}

function startPublisher() {
  var socket = zmq.socket('pub');
  socket.connect(endpointIn);
  setInterval(function () {
    socket.send('pid=' + process.pid);
  }, 1000);
}

function startPubSubForwarder() {
  var sIn = zmq.socket('sub')
    , sOut = zmq.socket('pub');

  // incoming
  sIn.subscribe('');
  sIn.bind(endpointIn, function (err) {
    if (err) throw err;
  });
  sIn.on('message', function (data) {
    sOut.send(data);
  });

  // outgoing
  sOut.bind(endpointOut, function (err) {
    if (err) throw err;
  });
}


还有其他/更好的方法吗?

最佳答案

如果您关心的是消息的持久性,那么我想您将不必担心拥有多个发布者,而应更加关注确保发布者去世时不会丢失消息。您可以立即立即重新启动发布者,然后从停下来的地方领取。您还需要知道哪些消息已成功发送。

这需要1)持久存储和2)以及向发布者确认消息已在接收者端接收(并且可能已完成处理)的方法。此设置应解决您的可靠性要求。

如果您还想实现高规模,则需要对体系结构进行一些扩充。对于发送方和接收方为1:1的发送/接收方案,它更简单,而当您需要进行1:N循环/负载分配方案时,情况可能会稍微复杂一些,这可能就是扩展规模所需要的。

我对完成横向扩展方案的投入是要进行以下设置:

sender_process-(1:1)->分发服务器-(1:N)-> receiver_process(es)

分发者在此确认收到了发件人的消息,然后将其扇形展开到接收者的进程。

您可能希望通过在每个流程的前面放置一个队列来实现此目的。因此,您不会发送到该过程。您发送到该进程读取的队列。发件人将物料放入分发服务器队列。分发者将内容放入接收者的队列中。在每个点上,每个过程都会尝试进行处理。如果经过多次最大重试后失败,它将进入错误队列。

我们使用rabbitmq / amqp来完成所有这些工作。我已经开始开源总线,用于执行1:1和1:N的总线在这里发送:https://github.com/mateodelnorte/servicebus。在接下来的几天中,我将添加自述文件和更多测试。

09-20 23:21