本文介绍了ZeroMQ:即使设置了HWM和bufsize,数据包也会丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在ZeroMQ中使用 PUSH / PULL 可扩展形式通信模式。发送者应用程序总共发送30,000条消息,每条消息10kB。有很多数据丢失,因此我们在发送方方面设置以下内容:

We're using a PUSH/PULL Scalable Formal Communication Pattern in ZeroMQ. The sender application sends a total of 30,000 messages, 10kB each. There's a lot of data loss and hence we set the following on sender's side:

zmq_socket = context.socket(zmq.PUSH)
zmq_socket.setsockopt(zmq.SNDBUF, 10240)
zmq_socket.setsockopt(zmq.SNDHWM, 1)
zmq_socket.bind("tcp://127.0.0.1:4999")

在接收方方面:

zmq_socket = context.socket(zmq.PULL)
zmqSocket.setReceiveBufferSize(10240);
zmqSocket.setRcvHWM(1);
zmq_socket.connect("tcp://127.0.0.1:4999")

有仍然数据丢失。不确定如何避免数据包被静默丢弃。

There's still data loss. Not sure how we can avoid packets being dropped silently.

编辑1:

Python中的发件人代码:

context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.setsockopt(zmq.SNDBUF, 10240)
zmq_socket.setsockopt(zmq.SNDHWM, 1)
zmq_socket.bind("tcp://127.0.0.1:4999")

for file_name in list_of_files:              # Reads data from a list of files:
    while True:                              #       data from a file_name
          with open(os.path.join(self.local_base_dir,file_name), 'r') as sf:
               socket_data = sf.read(5120)
               if socket_data == '':
                  sf.close()
                  break                      #       until EoF

               ret = zmq_socket.send(socket_data)

               if ret == 0:
                  return True

               if ret == -1:
                  print zmq_errno()

接收器代码:

    private ZMQ.Socket zmqSocket = zmqContext.socket(ZMQ.PULL);       
    zmqSocket.setReceiveBufferSize(10240);
    zmqSocket.setRcvHWM(1);
    zmqSocket.connect(socketEndpoint);
    String message = new String(zmqSocket.recv());
    messages.add(message);


推荐答案

基于ZeroMQ的注释:



从不会过早地优化资源分配:在代码运行时操作满足所有功能要求之前,代码中引入的所有准优化决策都是令人误解的。 优化故障的分布式系统根本没有任何意义。只有对运行时进行了测试并保证可以按指定的方式运行,某些资源才可能在事后进行微调,因为某些积极的输出理由(这样做的新应计成本+积极的性能优势+减少的资源使用)对于这样做。反之亦然。 从不。

Remarks on ZeroMQ:

Never pre-maturely optimise resources allocations: all quasi-optimisation decisions introduced in code before the code runtime operations meet all functional requirements are misleading. "Optimising" ill-functioning distributed system makes no sense at all. Only once the runtime was tested and is guaranteed to work as specified, some resources may get fine-tuned ex-post, given some positive output rationale of ( newly accrued cost of doing so + positive performance benefits + reduced resource use ) makes sense for doing so. Never vice versa. Never.

从不将ZeroMQ基础结构设计为易耗品(如上述Java MCVE片段所示)。 )。 zmq.Context(n_IO_threads) 花费了大量的时间来进行操作系统设置和调整ZeroMQ世界。 >实例化和所有临时派生的 .Socket()实例必须创建(全部在O / S调度程序指定的时间范围内+在某些实际的多代理系统下行为动力学,因为将分布式交易对手引入了底层原始套接字分布式握手和多边谈判)中,因此即使SLOC单行代码或幼稚的教科书示例看起来像那样,也永远不要跳入ZeroMQ的ALAP设置基础结构正确,然后只读取一条消息并处理ZeroMQ框架的整个大教堂(隐藏在引擎盖下)-是显式的还是隐式的(更糟糕的是)-在 .recv()仅一封邮件。 从不。

Never design a ZeroMQ infrastructure as a consumable ( ref. as seen in the Java MCVE snippet above ). It takes a remarkable time to the O/S to setup and adjust the ZeroMQ world "under the hood", that the zmq.Context( n_IO_threads ) instantiation and all the ad-hoc derived .Socket() instances have to create ( all inside the O/S scheduler dictated time-frames + under some realistic multi-agent system behaviour dynamics, given the distributed counterparties are introduced into underlying raw socket distributed handshaking & multi-lateral negotiations ), so even if SLOC one-liners or naive school-book examples may look that way, never jump into an ALAP setup of the ZeroMQ infrastructure "right" before reading just one message and dispose the whole cathedral of the ZeroMQ framework ( hidden under the hood ) - be that explicitly or ( the worse ) implicitly - right after .recv() just one message. Never.

从不忽略记录的ZeroMQ功能。如果它指出:

Never ignore documented ZeroMQ features. If it states:

永远不要尝试声明 .setsockopt(ZMQ_SNDHWM,1)永远不会。

one ought never ever attempt to declare .setsockopt( ZMQ_SNDHWM, 1 ). Never.

永远不会将ZeroMQ基础设施教科书赤裸。设计主体 try有很多合理的理由:除外:最后;为ZeroMQ基础结构使用设计 结构,以便释放所有资源,直到操作系统保持 port# -s,在每种情况下,包括未处理的异常。这种做法是一种严肃的资源管理,任何专业设计都绝不能跳过(不仅针对可重入的Factory模式)。 从不。

Never leave the ZeroMQ infrastructure school-book "naked". There are many fair reasons to design principal try: except: finally; structures for the ZeroMQ infrastructure usage so as to release all the resources, down to the O/S maintained port#-s, in every case, incl. the unhandled exceptions. This practice is a serious resource-management must ( not only for re-entrant Factory patterns ) any professional design shall never skip. Never.

   GLOBAL_context = zmq.Context( setMoreIOthreadsForPERFORMANCE ) // future PERF
   PUSH_socket = GLOBAL_context.socket( zmq.PUSH )
// -------------------------------------------------------------- // .SET
   PUSH_socket.setsockopt(       zmq.SNDBUF,    123456 )          // ^ HI 1st
   PUSH_socket.setsockopt(       zmq.SNDHWM,    123456 )          // ^ HI 1st
// PULL_socket.setsockopt(       zmq.MAXMSGSIZE, 12345 )          // ~ LOCK !DDoS
   PUSH_socket.setsockopt(       zmq.AFFINITY,       0 )          // [0] 1st
// -------------------------------------------------------------- // GRACEFUL TERMINATION:
   PUSH_socket.setsockopt(       zmq.LINGER,         0 )          // ALWAYS
// -------------------------------------------------------------- // 
   PUSH_socket.bind( "tcp://127.0.0.1:4999" )                     // IPC: w/o TCP overheads
// -------------------------------------------------------------- //
   ...
   app logic
   ...
// -------------------------------------------------------------- // ALWAYS
   PUSH_socket.close()                                            // ALWAYS
   GLOBAL_context.term()                                          // ALWAYS






结语:



ZeroMQ明确指出,一个人要么获得完整的消息,要么一无所获。因此,如果需要传递每条消息,则应用程序设计必须采取所有应有的谨慎,因为智能ZeroMQ工具可以处理很多事情,但这由设计者负责。


Epilogue:

ZeroMQ clearly states, that one either gets the complete message or nothing. So the application design has to take all due care, if in a need to deliver each and every message, as the smart ZeroMQ tools handle many things, but leave this at the designer's responsibility.

鉴于您的评论已解释,提议的系统只有一个接收器,可能更愿意使用 PAIR / PAIR 模式+避免与L3 | L2 | L1->-L1 | L2 | L3多堆栈数据包的所有层相关的所有开销。与 tcp:// 传输类关联的O / S缓冲区管理,然后进入 ipc:/ / (如果操作系统允许)或 vmci:// 因此,传输类的速度更快了,因此延迟时间更短(更好),协议开销。

Given your comment has explained there is just one receiver for the proposed system, one may rather go into PAIR/PAIR Scalable Formal Communication Pattern + avoiding all the overheads associated with all the layers of the L3|L2|L1->-L1|L2|L3 multi-stack packet assembly / disassembly & O/S buffers management associated with the tcp:// transport-class and go into ipc:// ( if O/S permits ) or vmci:// less complex & thus a lot faster transport-classes, having thus lower ( better ) latency & protocol overheads.

使用 PAIR / PAIR 原型还可以防止DoS攻击 .connect() -s放在PUSH_socket端,ZeroMQ无法避免这种步骤,而这种改变游戏规则的步骤有效地破坏了您的设计工作,这是默认设置行为是开始为所有潜在连接的对等节点提供循环服务(即使在某些攻击者被迫断开连接之后也是如此)。

Using PAIR/PAIR archetype also prevents nasty surprises once a DoS-attack .connect()-s onto a PUSH_socket side, where ZeroMQ has no means to avoid such step and such game-changing step efficiently devastates your design efforts, as the default behaviour is to start round-robin serving all the "potentially" connected peers ( even after some attacker(s) 's('ve) been forced to disconnect ).

最后但并非最不重要的一点是,应该管理这样的分布式过程,而总是选择 .send()/ .recv()非阻塞模式c> 以专业的高性能信令/消息传递和呼叫进行呼叫基于 .poll() 的控制工具位于各个应用程序域内使用的数据泵送服务的最关键的实时部分。

Last but not least, one ought manage such distributed processes to rather always prefer non-blocking mode of .send() / .recv() calls in professional high-performance signalling / messaging & .poll() based control tools inside the most critical real-time sections of the data-pumping services used inside the respective application domain.

这篇关于ZeroMQ:即使设置了HWM和bufsize,数据包也会丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-21 06:52