【深度学习】ONNX模型CPU多线程快速部署【基础】


前言

之前的内容已经尽可能简单、详细的介绍CPU【Pytorch2ONNX】和GPU【Pytorch2ONNX】俩种模式下Pytorch模型转ONNX格式的流程,本博文根据自己的学习和需求进一步讲解ONNX模型的部署。onnx模型博主将使用PyInstaller进行打包部署,PyInstaller是一个用于将Python脚本打包成独立可执行文件的工具,【入门篇】中已经进行了最基本的使用讲解。之前博主在【快速部署ONNX模型】中分别介绍了CPU模式和GPU模式下onnx模型打包成可执行文件的教程,本博文将进一步介绍在CPU模式下使用多线程对ONNX模型进行快速部署。
系列学习目录:
【CPU】Pytorch模型转ONNX模型流程详解
【GPU】Pytorch模型转ONNX格式流程详解
【ONNX模型】快速部署
【ONNX模型】多线程快速部署


搭建打包环境

创建一个纯净的、没有多余的第三方库和模块的小型Python环境,抛开任何pytorch相关的依赖,只使用onnx模型完成测试。

# name 环境名、3.x Python的版本
conda create -n deploy python==3.10
# 激活环境
activate deploy 
# 安装onnx
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple onnx
# 安装GPU版
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple onnxruntime-gpu==1.15.0
# 下载安装Pyinstaller模块
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple Pyinstaller
# 根据个人情况安装包,博主这里需要安装piilow
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple Pillow

python多线程并发简单教程

多线程是一种并发编程的技术,通过同时执行多个线程来提高程序的性能和效率。python的内置模块提供了两个内置模块:thread和threading,thread是源生模块,是比较底层的模块,threading是扩展模块,是对thread做了一些封装,可以更加方便的被使用,所以只需要使用threading这个模块就能完成并发的测试。

基本教程

python3.x中通过threading模块有两种方法创建新的线程:通过threading.Thread(Target=executable Method)传递给Thread对象一个可执行方法(或对象);通过继承threading.Thread定义子类并重写run()方法。下面给出了俩种创建新线程方法的例子,读者可以运行一下加深理解。

  • 普通创建方式:threading.Thread进行创建多线程
    import threading
    import time
    
    def myTestFunc():
        # 子线程开始
        print("the current threading %s is runing" % (threading.current_thread().name))
        time.sleep(1)   # 休眠线程
        # 子线程结束
        print("the current threading %s is ended" % (threading.current_thread().name))
    
    # 主线程
    print("the current threading %s is runing" % (threading.current_thread().name))
    # 子线程t1创建
    t1 = threading.Thread(target=myTestFunc)
    # 子线程t2创建
    t2 = threading.Thread(target=myTestFunc)
    
    t1.start()  # 启动线程
    t2.start()
    
    t1.join()  # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到子线程t1结束之后才结束
    t2.join()
    # 主线程结束
    print("the current threading %s is ended" % (threading.current_thread().name))
    
  • 自定义线程:继承threading.Thread定义子类创建多线
    import threading
    import time
    
    class myTestThread(threading.Thread):  # 继承父类threading.Thread
      def __init__(self, threadID, name, counter):
         threading.Thread.__init__(self)
         self.threadID = threadID
         self.name = name
    
      # 把要执行的代码写到run函数里面,线程在创建后会直接运行run函数
      def run(self):
         print("the current threading %s is runing" % (self.name))
         print_time(self.name,5*self.threadID)
         print("the current threading %s is ended" % (self.name))
    
    
    def print_time(threadName, delay):
         time.sleep(delay)
         print("%s process at: %s" % (threadName, time.ctime(time.time())))
    
    # 主线程
    print("the current threading %s is runing" % (threading.current_thread().name))
    
    # 创建新线程
    t1 = myTestThread(1, "Thread-1", 1)
    t2 = myTestThread(2, "Thread-2", 2)
    
    # 开启线程
    t1.start()
    t2.start()
    
    # 等待线程结束
    t1.join()
    t2.join()
    
    print("the current threading %s is ended" % (threading.current_thread().name))
    

ONNX模型多线程并发

博主采用的是基础教程中普通创建方式创建新线程:将推理流程单独指定成目标函数,而后创建线程对象并指定目标函数,同一个推理session被分配给多个线程,多个线程会共享同一个onnx模型,这是因为深度学习模型的参数通常存储在模型对象中的共享内存中,并且模型的参数在运行时是可读写的,每个线程可以独立地使用模型对象执行任务,并且线程之间可以共享模型的状态和参数。

import onnxruntime as ort
import numpy as np
from PIL import Image
import time
import datetime
import sys
import os
import threading

def composed_transforms(image):
    mean = np.array([0.485, 0.456, 0.406])  # 均值
    std = np.array([0.229, 0.224, 0.225])  # 标准差
    # transforms.Resize是双线性插值
    resized_image = image.resize((args['scale'], args['scale']), resample=Image.BILINEAR)
    # onnx模型的输入必须是np,并且数据类型与onnx模型要求的数据类型保持一致
    resized_image = np.array(resized_image)
    normalized_image = (resized_image/255.0 - mean) / std
    return np.round(normalized_image.astype(np.float32), 4)

def check_mkdir(dir_name):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

args = {
    'scale': 416,
    'save_results': True
}
def process_img(img_list,
                ort_session,
                image_path,
                mask_path,
                input_name,
                output_names):

    for idx, img_name in enumerate(img_list):
        img = Image.open(os.path.join(image_path, img_name + '.jpg')).convert('RGB')
        w, h = img.size
        #  对原始图像resize和归一化
        img_var = composed_transforms(img)
        # np的shape从[w,h,c]=>[c,w,h]
        img_var = np.transpose(img_var, (2, 0, 1))
        # 增加数据的维度[c,w,h]=>[bathsize,c,w,h]
        img_var = np.expand_dims(img_var, axis=0)
        start_each = time.time()
        prediction = ort_session.run(output_names, {input_name: img_var})
        time_each = time.time() - start_each
        # 除去多余的bathsize维度,NumPy变会PIL同样需要变换数据类型
        # *255替换pytorch的to_pil
        prediction = (np.squeeze(prediction[3]) * 255).astype(np.uint8)
        if args['save_results']:
            Image.fromarray(prediction).resize((w, h)).save(os.path.join(mask_path, img_name + '.jpg'))

def main():
    # 线程个数
    num_cores = 10
    # 保存检测结果的地址
    input = sys.argv[1]
    # providers = ["CUDAExecutionProvider"]
    providers = ["CPUExecutionProvider"]
    model_path = "PFNet.onnx"
    ort_session = ort.InferenceSession(model_path, providers=providers)  # 创建一个推理session
    input_name = ort_session.get_inputs()[0].name
    # 输出有四个
    output_names = [output.name for output in ort_session.get_outputs()]
    print('Load {} succeed!'.format('PFNet.onnx'))

    start = time.time()
    image_path = os.path.join(input, 'image')
    mask_path = os.path.join(input, 'mask')

    if args['save_results']:
        check_mkdir(mask_path)
    # 所有图片数量
    img_list = [os.path.splitext(f)[0] for f in os.listdir(image_path) if f.endswith('jpg')]

    # 每个线程被均匀分配的图片数量
    total_images = len(img_list)
    start_index = 0
    images_per_list = total_images // num_cores

    # 理解成线程池
    Thread_list = []
    for i in range(num_cores):
        end_index = start_index + images_per_list
        img_l = img_list[start_index:end_index]
        start_index = end_index
        # 分配线程
        t = threading.Thread(target=process_img, args=(img_l,ort_session, image_path, mask_path,input_name,output_names))
        # 假如线程池
        Thread_list.append(t)
        # 线程执行
        t.start()
    # 这里是为了阻塞主线程
    for t in Thread_list:
        t.join()
    end = time.time()
    print("Total Testing Time: {}".format(str(datetime.timedelta(seconds=int(end - start)))))
if __name__ == '__main__':
    main()

线程的数量根据需求而定,不是越多越好。


打包成可执行文件

  • 在cpu模式下打包可执行文件:
    pyinstaller -F run_t.py
    
    【深度学习】ONNX模型多线程快速部署【基础】-LMLPHP
  • 在gpu模式下打包可执行文件:
    pyinstaller -F run_t.py --add-binary "D:/ProgramData/Anaconda3_data/envs/deploy/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_cuda.dll;./onnxruntime/capi" --add-binary "D:/ProgramData/Anaconda3_data/envs/deploy/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_shared.dll;./onnxruntime/capi"
    
    【深度学习】ONNX模型多线程快速部署【基础】-LMLPHP
    详细的过程和结果此前已经讲解过了,可以查看博主的博文【快速部署ONNX模型】。图片数量较多时,对比此前的执行速度,多线程的执行速度快了俩倍以上。

总结

尽可能简单、详细的介绍ONNX模型多线程快速部署过程。

09-21 15:02