我第一次在python中使用asyncio玩,并尝试将其与ZMQ结合使用。

基本上,我的问题是我有一个REP / REQ系统,它在async def中具有我需要等待的功能。值如何不更新。
这是一段代码来说明这一点:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")


我将此对象发送到一个类,并在此函数中将其取回

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7


sonar_read正在使用pymata_express读取超声波传感器。如果我评论line.2line.4,我将得到正确的i值。如果我注释line.1line.5,则print(value)sonar_read打印正确的值。但是,当我按如下所示运行它时,value不会更新。

我想念什么吗?



编辑:编辑有关行注释的类型。我的意思是,如果我只读声纳并打印该值。它工作正常。如果我只有.recv().send(json.dumps(i).encode()),它可以工作。但是,如果我尝试从声纳发送值。它锁定到未更新的给定value



EDIT2 :(对Alan Yorinks的回答):这是MWE,它考虑您发送的有关类中zmq声明的内容。它来自pymata_express示例concurrent_tasks.py

要重现该错误,请在两个不同的终端中运行这两个脚本。您将需要安装Frimata_express的arduino板。如果一切顺利
PART A.仅应在mve_req.py端吐出相同的值。您可以编辑不同的块(PARTS A,B或C)以查看行为。

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()


mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

最佳答案

完全公开的是,我是pymata-expresspython-banyan.的作者。OP要求我发布此解决方案,因此,这并不意味着要无耻。

自从在Python 3中首次引入asyncio以来,我就一直在进行开发。当asyncio代码工作时,asyncio(IMHO)可以简化并发性和代码。但是,当出现问题时,调试和了解问题的原因可能会令人沮丧。

我提前表示歉意,因为这可能会有点冗长,但是我需要提供一些背景信息,以使该示例看起来不会像是一些随机的代码。

开发python-banyan框架是为了提供线程,多处理和异步的替代方法。简而言之,Banyan应用程序由针对性小的可执行文件组成,这些可执行文件使用通过LAN共享的协议消息相互通信。它的核心使用Zeromq。它的设计目的不是让流量通过WAN传输,而是将LAN用作“软件底板”。在某些方面,Banyan与MQTT相似,但是在LAN中使用时,速度要快得多。如果需要,它确实具有连接到MQTT网络的能力。

Banyan的一部分是一个称为OneGPIO的概念。它是一个协议消息传递规范,抽象了GPIO功能以独立于任何硬件实现。为了实现硬件细节,开发了专用的Banyan组件,称为Banyan硬件网关。有适用于Raspberry Pi,Arduino,ESP-8266和Adafruit Crickit Hat的网关。 GPIO应用程序发布通用的OneGPIO消息,任何或所有网关都可以选择接收该消息。要从一个硬件平台转移到另一个硬件平台,将启动与硬件相关的网关,并且无需进行修改即可启动控制组件(如下所示的代码)。从一个硬件平台到另一个硬件平台,任何组件都不需要修改代码,控制组件和网关都无需修改。启动控制组件时,可通过命令行选项指定诸如引脚号之类的变量。对于Arduino网关,使用pymata-express来控制Arduino的GPIO。 Pymata-express是StandardFirmata客户端的异步实现。需要注意的是,下面的代码不是asyncio。 Banyan框架允许人们使用适合问题的工具进行开发,但允许解耦解决方案的各个部分,在这种情况下,该应用程序允许将asyncio与非asyncio混合使用,而不会在执行过程中通常遇到任何麻烦所以。

在提供的代码中,类定义下的所有代码均用于提供对命令行配置选项的支持。

import argparse
import signal
import sys
import threading
import time

from python_banyan.banyan_base import BanyanBase


class HCSR04(BanyanBase, threading.Thread):
    def __init__(self, **kwargs):
        """
        kwargs contains the following parameters
        :param back_plane_ip_address: If none, the local IP address is used
        :param process_name: HCSR04
        :param publisher_port: publishing port
        :param subscriber_port: subscriber port
        :param loop_time: receive loop idle time
        :param trigger_pin: GPIO trigger pin number
        :param echo_pin: GPIO echo pin number
        """

        self.back_plane_ip_address = kwargs['back_plane_ip_address'],
        self.process_name = kwargs['process_name']
        self.publisher_port = kwargs['publisher_port']
        self.subscriber_port = kwargs['subscriber_port'],
        self.loop_time = kwargs['loop_time']
        self.trigger_pin = kwargs['trigger_pin']
        self.echo_pin = kwargs['echo_pin']
        self.poll_interval = kwargs['poll_interval']

        self.last_distance_value = 0

        # initialize the base class
        super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                     subscriber_port=kwargs['subscriber_port'],
                                     publisher_port=kwargs['publisher_port'],
                                     process_name=kwargs['process_name'],
                                     loop_time=kwargs['loop_time'])

        threading.Thread.__init__(self)
        self.daemon = True

        self.lock = threading.Lock()

        # subscribe to receive messages from arduino gateway
        self.set_subscriber_topic('from_arduino_gateway')

        # enable hc-sr04 in arduino gateway
        payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                   'echo_pin': self.echo_pin}
        self.publish_payload(payload, 'to_arduino_gateway')

        # start the thread
        self.start()

        try:
            self.receive_loop()
        except KeyboardInterrupt:
            self.clean_up()
            sys.exit(0)

    def incoming_message_processing(self, topic, payload):
        print(topic, payload)
        with self.lock:
            self.last_distance_value = payload['value']

    def run(self):
        while True:
            with self.lock:
                distance = self.last_distance_value
            payload = {'distance': distance}
            topic = 'distance_poll'
            self.publish_payload(payload, topic)
            time.sleep(self.poll_interval)


def hcsr04():
    parser = argparse.ArgumentParser()
    # allow user to bypass the IP address auto-discovery.
    # This is necessary if the component resides on a computer
    # other than the computing running the backplane.
    parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                        help="None or IP address used by Back Plane")
    parser.add_argument("-i", dest="poll_interval", default=1.0,
                        help="Distance polling interval")
    parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                        help="Set process name in banner")
    parser.add_argument("-p", dest="publisher_port", default="43124",
                        help="Publisher IP port")
    parser.add_argument("-s", dest="subscriber_port", default="43125",
                        help="Subscriber IP port")
    parser.add_argument("-t", dest="loop_time", default=".1",
                        help="Event Loop Timer in seconds")
    parser.add_argument("-x", dest="trigger_pin", default="12",
                        help="Trigger GPIO pin number")
    parser.add_argument("-y", dest="echo_pin", default="13",
                        help="Echo GPIO pin number")

    args = parser.parse_args()

    if args.back_plane_ip_address == 'None':
        args.back_plane_ip_address = None
    kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                  'publisher_port': args.publisher_port,
                  'subscriber_port': args.subscriber_port,
                  'process_name': args.process_name,
                  'loop_time': float(args.loop_time),
                  'trigger_pin': int(args.trigger_pin),
                  'echo_pin': int(args.echo_pin),
                  'poll_interval': int(args.poll_interval)
                  }

    # replace with the name of your class
    HCSR04(**kw_options)


# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
    print('Exiting Through Signal Handler')
    raise KeyboardInterrupt


# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == '__main__':
    hcsr04()

关于python - 带有异步的pyzmq REQ/REP等待变量,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57156822/

10-10 18:00