文章目录
前言
提示:这里可以添加本文要记录的大概内容:
最近想着有时间实现总线报文收发的动态的配置,今天特记录一下报文周期任务的动态的创建和移除
提示:以下是本篇文章正文内容,下面案例可供参考
一、can周期任务类构建
代码如下(示例):
import threading
import time
class CycleTask():
def __init__(self):
self.__task_dict = dict() # 周期任务的标志位字典
self.__task_queue = dict() # 周期任务需发送的数据字典,{cycle_time:{msg_obj:data}}
self.__lock_dict = dict() # 每个线程对应的线程锁
def create_task(self, cycle_time):
"""
功能:创建周期任务
参数1:周期时间
"""
self.__create_task_thread(cycle_time)
def __create_task_thread(self, cycle_time):
"""
功能:创建周期线程
参数1:事件
"""
task = threading.Thread(target=self.__task, args=(cycle_time,))
task.start()
def __task(self, cycle_time):
"""
功能:周期任务函数
参数1:周期事件
"""
lock = threading.Lock()
self.__lock_dict[cycle_time] = lock
task_flag = True
self.__task_dict[cycle_time] = task_flag
while self.__task_dict[cycle_time]:
self.__lock_dict[cycle_time].acquire() # 线程上锁
for msg, data in self.__task_queue[cycle_time].items():
pass
self.__lock_dict[cycle_time].release() # 线程下锁
time.sleep(cycle_time)
def cycle_send_data(self, cycle_time, msg, data):
"""
功能:将报文进行周期发送
参数1:周期事件
参数2:报文对象
参数3:数据列表
"""
if cycle_time in self.__task_queue:
self.__lock_dict[cycle_time].acquire() # 线程上锁
self.__task_queue[cycle_time][msg] = data # {cycle_time:{msg_obj:data}}
self.__lock_dict[cycle_time].release() # 线程下锁
else:
self.__task_queue[cycle_time] = dict()
self.__task_queue[cycle_time][msg] = data
def cycle_msg_stop(self, cycle_time, msg):
"""
功能:停止报文周期发送
参数1:周期事件
参数2:报文对象
"""
self.__lock_dict[cycle_time].acquire() # 线程上锁
del self.__task_queue[cycle_time][msg]
self.__lock_dict[cycle_time].release() # 线程下锁
if not self.__task_queue: # 当周期任务中没有
self.__task_dict[cycle_time] = False
总结
我是一名车载集成测试开发工程师,希望能和志同道合的朋友一起相互学习进步