我的类在连接到服务器时应该立即发送登录字符串,然后在会话结束时应该发送注销字符串并清理套接字下面是我的代码。

import trio

class test:

    _buffer = 8192
    _max_retry = 4

    def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
        self.host = str(host)
        self.port = int(port)
        self.usr = str(usr)
        self.pwd = str(pwd)
        self._nl = b'\r\n'
        self._attempt = 0
        self._queue = trio.Queue(30)
        self._connected = trio.Event()
        self._end_session = trio.Event()

    @property
    def connected(self):
        return self._connected.is_set()

    async def _sender(self, client_stream, nursery):
        print('## sender: started!')
        q = self._queue
        while True:
            cmd = await q.get()
            print('## sending to the server:\n{!r}\n'.format(cmd))
            if self._end_session.is_set():
                nursery.cancel_scope.shield = True
                with trio.move_on_after(1):
                    await client_stream.send_all(cmd)
                nursery.cancel_scope.shield = False
            await client_stream.send_all(cmd)

    async def _receiver(self, client_stream, nursery):
        print('## receiver: started!')
        buff = self._buffer
        while True:
            data = await client_stream.receive_some(buff)
            if not data:
                print('## receiver: connection closed')
                self._end_session.set()
                break
            print('## got data from the server:\n{!r}'.format(data))

    async def _watchdog(self, nursery):
        await self._end_session.wait()
        await self._queue.put(self._logoff)
        self._connected.clear()
        nursery.cancel_scope.cancel()

    @property
    def _login(self, *a, **kw):
        nl = self._nl
        usr, pwd = self.usr, self.pwd
        return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl

    @property
    def _logoff(self, *a, **kw):
        nl = self._nl
        return nl.join(x.encode() for x in ['Logoff']) + 2*nl

    async def _connect(self):
        host, port = self.host, self.port
        print('## connecting to {}:{}'.format(host, port))
        try:
            client_stream = await trio.open_tcp_stream(host, port)
        except OSError as err:
            print('##', err)
        else:
            async with client_stream:
                self._end_session.clear()
                self._connected.set()
                self._attempt = 0
                # Sign in as soon as connected
                await self._queue.put(self._login)
                async with trio.open_nursery() as nursery:
                    print("## spawning watchdog...")
                    nursery.start_soon(self._watchdog, nursery)
                    print("## spawning sender...")
                    nursery.start_soon(self._sender, client_stream, nursery)
                    print("## spawning receiver...")
                    nursery.start_soon(self._receiver, client_stream, nursery)

    def connect(self):
        while self._attempt <= self._max_retry:
            try:
                trio.run(self._connect)
                trio.run(trio.sleep, 1)
                self._attempt += 1
            except KeyboardInterrupt:
                self._end_session.set()
                print('Bye bye...')
                break

tst = test()
tst.connect()

我的逻辑不太正确如果我杀死了netcat侦听器,那么我的会话如下所示:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'

请注意,Logoff字符串已经发送出去了,尽管此时连接已经断开,在这里没有意义。
然而,我的目标是当用户LogoffKeyboardInterrupt。在这种情况下,我的会话类似于:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

Bye bye...

注意Logoff没有被发送出去。
有什么想法吗?

最佳答案

你的呼叫树看起来像:

connect
|
+- _connect*
   |
   +- _watchdog*
   |
   +- _sender*
   |
   +- _receiver*

*s表示4个三项任务。_connect任务位于托儿所块的末尾,等待子任务完成。_watchdog任务在await self._end_session.wait()中被阻止,_sender任务在await q.get()中被阻止,_receiver任务在await client_stream.receive_some(...)中被阻止。
当您点击control-c时,标准的python语义是无论运行哪一位python代码,都会突然提升KeyboardInterrupt。在本例中,您有4个不同的任务在运行,因此其中一个被阻塞的操作被随机选取[1],并引发一个KeyboardInterrupt这意味着可能会发生一些不同的事情:
如果_watchdog wait调用引发KeyboardInterrupt,那么_watchdog方法立即退出,因此它甚至从未尝试发送logout。然后,作为解压缩堆栈的一部分,三重奏取消所有其他任务,一旦它们退出,则KeyboardInterrupt继续传播直到它在finally中达到connect块。此时,您尝试使用self._end_session.set()通知监视程序任务,但它不再运行,因此它不会注意到。
如果_sender q.get()调用引发KeyboardInterrupt,那么_sender方法立即退出,因此即使_watchdog要求它发送注销消息,也不会有注意到。在任何情况下,trio都会继续取消watchdog和receiver任务,并且事情会按上述方式进行。
如果_receiver'sreceive_all调用引发KeyboardInterrupt.同样的事情也会发生。
小细节:_connect还可以接收KeyboardInterrupt,这做了相同的事情:取消所有子级,然后等待它们停止,然后允许KeyboardInterrupt继续传播。
如果您想可靠地捕获control-c并对其进行处理,那么在某个随机点上提出这种业务是相当麻烦的。最简单的方法是使用Trio's support for catching signals捕获signal.SIGINT信号,这是python通常转换为KeyboardInterrupt的东西。(int代表“中断”。)类似于:
async def _control_c_watcher(self):
    # This API is currently a little cumbersome, sorry, see
    # https://github.com/python-trio/trio/issues/354
    with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
        async for _ in batched_signal_aiter:
            self._end_session.set()
            # We exit the loop, restoring the normal behavior of
            # control-C. This way hitting control-C once will try to
            # do a polite shutdown, but if that gets stuck the user
            # can hit control-C again to raise KeyboardInterrupt and
            # force things to exit.
            break

然后和其他任务一起运行。
您还有一个问题,在_watchdog方法中,它会将logoff请求放入队列中,从而安排稍后由_sender任务发送的消息,然后立即取消所有任务,这样_sender任务可能就没有机会看到消息并对其作出反应!一般来说,当我只在必要时使用任务时,我发现我的代码工作得更好与其有一个sender任务,然后在发送消息时将消息放入队列,不如让希望直接发送消息调用的代码stream.send_all您需要注意的一件事是,如果您有多个可能同时发送内容的任务,您可能需要使用trio.Lock()来确保它们不会通过同时调用send_all而相互碰撞:
async def send_all(self, data):
    async with self.send_lock:
        await self.send_stream.send_all(data)

async def do_logoff(self):
    # First send the message
    await self.send_all(b"Logoff\r\n\r\n")
    # And then, *after* the message has been sent, cancel the tasks
    self.nursery.cancel()

如果这样做,就可以完全摆脱监视程序任务和_end_session事件。
当我在这里时,关于您的代码的一些其他注释:
像这样多次呼叫trio.run是不寻常的通常的方式是在程序的顶部调用一次,然后将所有真正的代码放入其中。一旦退出trio.run,所有三重唱的状态都会丢失,你肯定不会运行任何并发任务(所以没有任何方法可以听到并注意到你对_end_session.set()的调用!)。一般来说,几乎所有三个函数都假设您已经在对trio.run的调用中。现在你可以在启动trio之前调用trio.Queue(),而不会出现异常,但这基本上只是一个巧合。
在我看来,内部屏蔽的使用很奇怪屏蔽通常是一个高级功能,你几乎不想使用,我不认为这是一个例外。
希望能有帮助!如果您想更多地讨论类似这样的样式/设计问题,但又担心它们可能太模糊,以致堆栈溢出(“这个程序设计得好吗?”),然后您可以随意顺路到trio chat channel
[1]实际上,Trio可能出于各种原因选择了主要任务,但这并不能保证,而且在任何情况下,这都没有什么区别。

关于python - 如何在python-trio中的KeyboardInterrupt之后清理连接,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48798144/

10-12 06:47