本文介绍了django通道2如何在使用者中创建连续发送数据并安全断开连接的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

来自答案,该答案有助于为每个消费者发送数据n 秒.

From this answer, which helps to send data from consumers for every n seconds.

尝试使用 creat_task 方法正确处理断开连接,尝试停止 while-loop (用于为每个 n 秒),方法是发送 flag = False (假设此标志未发送到创建任务的同一实例).

Tried to handle the disconnection properly, using creat_task method, tried to stop while-loop(which is used to send data for every n seconds) by sending a flag=False(Assuming, this flag is not sent to the same instance which is created the task).

consumers.py:

class AutoUpdateConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        print("connected", event)
        await self.send({
            "type": "websocket.accept"
        })
        await self.create_task(True)

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        await self.create_task(False)

        print("disconnected", event)


    async def create_task(self, flag=True):
        while flag:
            await asyncio.sleep(2)

            df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])

            await self.send({
                'type': 'websocket.send',
                'text': df.to_html(),
            })

警告:

2019-09-11 14:40:06,400 - WARNING - server - Application instance 
<Task pending coro=<SessionMiddlewareInstance.__call__() running at 
D:\Django\Django channels\django_channels_env\lib\site-packages\channels\sessions.py:175>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 
0x000001870E06C618>()] for connection <WebSocketProtocol client=
['127.0.0.1', 63789] path=b'/ws/home'> took too long to shut down and was 
killed.

如何安全地停止任务而不是等待 channels 杀死任务?

How to stop_task safely instead of waiting for channels to kill task?

如何从同一个类的另一个方法停止在一个方法中运行的while循环的无限循环?

How to stop infinite while loop running in a method, from another method in same class?

版本:

  • Django == 2.0.7
  • 渠道== 2.1.2

推荐答案

我建议在连接到使用者时创建一个组.这样,只要知道组名( auto_update ),就可以从django项目中的任何地方触发消息.

I would suggest creating a group when connecting to the consumer. That way you can trigger a message from anywhere in your django project as long as you know the group name (auto_update).

from channels.generic.websocket import AsyncWebsocketConsumer

class AutoUpdateConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        print('connect')

        # join the group
        self.group_name = 'auto_update'
        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, event):
        print('disconnect')

        # leave the group
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )

    async def receive(self, event):
        print('receive')

    async def auto_update(self, event):
        print('sending df')
        df = event['df']

        await self.send({
            'text': df
        })

要发送消息,我将使用自定义管理命令.要停止命令,我将创建一个单例模型(一个只有一个实例的模型),该模型具有一个布尔字段,可以定期检查该布尔字段是否应该停止循环.

To send the message I would use a custom management command. To stop the command I would create a singleton model (a model with only one instance) that has a boolean field that can be periodically checked to see if the loop should be stopped.

首先使用 get_channel_layer()获取与Redis通信的活动层,然后在循环中调用 group_send 调用由 type指定的使用者方法键.

First use get_channel_layer() to get the active layer that communicates with redis, then in the loop call group_send to invoke a consumer method specified by the type key.

# /project/app/management/commands/auto_update.py

from django.core.management.base import BaseCommand
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from config.models import AutoUpdateSettings

class Command(BaseCommand):
    help = 'Command to start auto updating'

    def handle(self, *args, **kwargs):
        settings = AutoUpdateSettings.objects.first()
        settings.keep_running = True
        settings.save()

        group_name = 'auto_update'
        channel_layer = get_channel_layer()

        while True:
            settings.refresh_from_db()
            if not settings.keep_running:
                break

            df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
            async_to_sync(channel_layer.group_send)(
                group_name,
                {
                    'type': 'auto_update',  # this is the name of your consumer method
                    'df': df.to_html()
                }
            )

要启动将消息发送到组的循环,您可以调用命令 python manage.py auto_update .要停止该命令,您将使用管理页面并将 keep_running 设置为false.

To start the loop that sends the message to the group you would call the command python manage.py auto_update. To stop the command you would use the admin page and set keep_running to false.

这篇关于django通道2如何在使用者中创建连续发送数据并安全断开连接的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 07:40