我不能确定错误在哪里,但是我正在尝试在Python客户端和Chapel服务器之间传递消息。客户端代码是

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

for request in range(10):
    print("Sending request %s ..." % request)
    socket.send(str("Yo"))
    message = socket.recv()
    print("OMG!! He said %s" % message)


而教堂服务器是

use ZMQ;
var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");

while ( 1 < 2) {
  var msg = socket.recv(string);
  socket.recv(string);
  writeln("got something");
  socket.send("back from chapel");
}


该信息似乎很普通,但我并没有真正理解。

server.chpl:7: error: halt reached - Error in Socket.recv(string): Operation cannot be accomplished in current state


我想双方都在发送/接收。最初的Chapel示例on the Chapel site工作正常,但是我在修改它时遇到了麻烦。

更新资料

this thread上的Chapel团队的帮助下,这现在可以工作了。

client.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

for request in range(10):
    message = "Hello %i from Python" % request
    print("[Python] Sending request: %s" % message)
    socket.send_string(message)
    message = socket.recv_string()
    print("[Python] Received response: %s" % message)


server.chpl

use ZMQ;

var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");

for i in 0..#10 {
  var msg = socket.recv(string);
  writeln("[Chapel] Received message: ", msg);
  socket.send("Hello %i from Chapel".format(i));
}

最佳答案

chapel团队解决并重新确认之前,请仅用int有效负载测试任何chapel ZMQ模块服务,并避免使用PUB/SUB原型(由于字符串匹配待处理问题)。

正如@Nick最近disclosed here,所提供的那样,尚有一种方法可以使ZMQ服务满足ZeroMQ API的合规性,并为异构分布式系统完全打开交叉兼容的大门:


  要发送字符串,Chapel发送一条消息,其大小为字符串大小,然后发送另一条消息,其字节缓冲区为;接收作品类似。这意味着您对<aSocket>.recv( string )的一次呼叫实际上是在后台对zmq_recv()进行了两次背对背呼叫。使用REQ/REP模式,这两个背对背zmq_recv()调用将ZeroMQ状态机置于无效状态,因此出现错误消息。
  
  这绝对是Chapel的ZMQ模块的错误。




采取一些步骤可以使场景更加生动:

在诊断根本原因之前,让我提出一些应采取的措施。 ZeroMQ是一个功能非常强大的框架,在该框架中,很难找到比REQ/REP更难(而且更脆弱)的消息传递原型。

内部的有限状态自动机(实际上是分布式FSA)都处于阻塞状态(通过设计,以强制在连接的对等方之间传递类似钟摆的消息(不必只是前2个),以便SEQ的SEQ [A]在一侧的[A]-.send()-.recv()-.send()-.recv() -...与[B]-.recv()-.send()-.recv()-的SEQ匹配。 ..),并且如果双方出于某种原因进入等待状态,则此dFSA原则上也无法挽回彼此的死锁,其中[A]和[B]都希望收到来自另一方的下一条消息渠道。

这就是说,我的建议是首先进入最简单的测试-使用一对无限制的单纯形通道(是[A] PUSH / [B] PULL + [B] PUSH / [A ] PULL,或者使用PUB/SUB的更为复杂的方案)。

不需要为完全网格化的多Agent基础架构进行设置,而是简化的版本(不需要并且不打算使用ROUTER/DEALER通道,但是如果扩展了PUSH/PULL -s,则可能会复制(反向)Socket -s)模拟方案):

python - 带Chapel和Python的ZeroMQ,无法在当前状态下回答-LMLPHP

由于当前chapel实施限制,尚需对隐含限制进行更多的努力:


  在Chapel中,在Reflection上发送或接收消息会使用多部分消息和ZMQ模块来尽可能地序列化原始数据和用户定义的数据类型。当前,try:/except:/finally:模块序列化原始数字类型,字符串和由这些类型组成的记录。字符串被编码为长度(如int),后跟字符数组(以字节为单位)。


如果这些评论不仅是有线级别的内部性,而且还扩展到了顶级ZeroMQ消息传递/信令层(请参阅管理订阅的详细信息,其中ZeroMQ主题为-过滤器匹配基于与接收到的消息的左侧精确匹配等)。



python面享有更大的设计自由度:

#
# python
# #########

import time
import zmq; context = zmq.Context()

print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )

dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 )        # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )

heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER,   0 )      # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 )      # ( ignore history, keep just last )

heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" )    # in case [Chapel] complicates serialisation
# -------------------------------------------------------------------
while ( True ):
      pass;             print( "INF: waiting for a [Chapel] HeartBeat-Message" )
      hbIN = heartB.recv( zmq.NOBLOCK );
      if len( hbIN ) > 0:
         pass;          print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
         break
      else:
         time.sleep( 0.5 )
# -------------------------------------------------------------------
for request in range(10):
    pass;               print( "INF: Sending a request %s to [Chapel] ..." % request )
    dataAB.send( str( "Yo" ) )
    pass;               print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
    message = dataAB.recv()
    pass;               print( "INF: [Chapel] said %s" % message )
# -------------------------------------------------------------------
dataAB.close()
heartB.close()
context.term()
# -------------------------------------------------------------------


应当从无限的KeyboardInterrupt -loops等中为while() -s提供一些其他的.send()构造,但为清楚起见,此处将其省略。



chapel方面,我们将按原样努力与API保持同步:

照原样,文档尚不能帮助您确定用户代码是否可以控制选项,而在您的代码假定正在运行的情况下,对.recv() / zmq_send()方法的调用是否隐式始终处于阻塞状态在阻塞模式下(对于任何分布式系统设计,我一直(并在原则上一直强烈反对)阻塞是一种较差的做法-more on this here)。


  尽管C级调用Socket.send()可能是阻塞调用(取决于套接字类型和标志参数),但希望对zmq_send()进行语义阻塞的调用允许在操作系统线程上安排其他Chapel任务,如下所示:任务层支持。在内部,ZMQ模块使用对zmq_recv()和的非阻塞调用来传输数据,并在其他情况下阻塞时通过chpl_task_yield()产生任务层。
  
  Source


use ZMQ;
use Reflection;

var context: Context;
var dataBA = context.socket( ZMQ.REP ),
    heartB = context.socket( ZMQ.PUB );
var WAITms = 0;                             // setup as explicit int
    dataBA.setsockopt( ZMQ.LINGER, WAITms );// a must
    heartB.setsockopt( ZMQ.LINGER, WAITms );// a preventive step

    dataBA.bind( "tcp://*:5555" );          // may reverse .bind()/.connect()

    writeln( "INF: This Agent uses ZeroMQ v.", ZMQ.version() );

// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
config var   MAX_LOOPS  = 120;              //  --MAX_LOOPS = 10 set on cmdline
       var            i =   0;

while ( i < MAX_LOOPS ) {
 // --------------------------------------- // .send HeartBeat
    heartB.send( "[chapel2python.HB]" );
    i += 1;
    writeln( "INF: Sent HeartBeat # ", i );
 // --------------------------------------- // .send HeartBeat

    var msg = dataBA.recv( string );        // .recv() from python
 // - - - - - - - - - - - - - - - - - - - - // - - - - -WILL-[BLOCK]!!!
                                            //          ( ref. src )
    writeln( "INF: [Chapel] got: ",
              getField( msg, 1 )
              );

    dataBA.send( "back from chapel" );      // .send() to   python
}
writeln( "INF: MAX_LOOPS were exhausted,",
             " will exit-{} & .close()",
             " channels' sockets before",
             " [Chapel] exits to system."
             );
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
dataBA.close( WAITms );                     // explicit graceful termination
heartB.close( WAITms );                     // explicit graceful termination
context.deinit();                           // explicit context  termination
                                            //       as not yet sure
                                            //       on auto-termination
                                            //       warranties

关于python - 带Chapel和Python的ZeroMQ,无法在当前状态下回答,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45651724/

10-12 21:34