是否存在用于使节点0mq pub / sub中的发布者冗余的通用模式?其动机是能够与可能会失败/定期重新启动的发布者一起运行多个流程。
我最初的想法是在母版中创建一个转发器,并从工作发布者处连接到它:
var cluster = require('cluster')
, zmq = require('zmq')
, endpointIn = 'ipc:///tmp/cluster_pub_sub'
, endpointOut = 'tcp://127.0.0.1:7777';
if (cluster.isMaster) {
for (var i = 0; i < 2; i++) cluster.fork();
startPubSubForwarder();
} else {
startPublisher();
}
function startPublisher() {
var socket = zmq.socket('pub');
socket.connect(endpointIn);
setInterval(function () {
socket.send('pid=' + process.pid);
}, 1000);
}
function startPubSubForwarder() {
var sIn = zmq.socket('sub')
, sOut = zmq.socket('pub');
// incoming
sIn.subscribe('');
sIn.bind(endpointIn, function (err) {
if (err) throw err;
});
sIn.on('message', function (data) {
sOut.send(data);
});
// outgoing
sOut.bind(endpointOut, function (err) {
if (err) throw err;
});
}
还有其他/更好的方法吗?
最佳答案
如果您关心的是消息的持久性,那么我想您将不必担心拥有多个发布者,而应更加关注确保发布者去世时不会丢失消息。您可以立即立即重新启动发布者,然后从停下来的地方领取。您还需要知道哪些消息已成功发送。
这需要1)持久存储和2)以及向发布者确认消息已在接收者端接收(并且可能已完成处理)的方法。此设置应解决您的可靠性要求。
如果您还想实现高规模,则需要对体系结构进行一些扩充。对于发送方和接收方为1:1的发送/接收方案,它更简单,而当您需要进行1:N循环/负载分配方案时,情况可能会稍微复杂一些,这可能就是扩展规模所需要的。
我对完成横向扩展方案的投入是要进行以下设置:
sender_process-(1:1)->分发服务器-(1:N)-> receiver_process(es)
分发者在此确认收到了发件人的消息,然后将其扇形展开到接收者的进程。
您可能希望通过在每个流程的前面放置一个队列来实现此目的。因此,您不会发送到该过程。您发送到该进程读取的队列。发件人将物料放入分发服务器队列。分发者将内容放入接收者的队列中。在每个点上,每个过程都会尝试进行处理。如果经过多次最大重试后失败,它将进入错误队列。
我们使用rabbitmq / amqp来完成所有这些工作。我已经开始开源总线,用于执行1:1和1:N的总线在这里发送:https://github.com/mateodelnorte/servicebus。在接下来的几天中,我将添加自述文件和更多测试。