编辑:
我对视频流是什么有疑问,所以我将提供更多的清晰度。该流是来自我的网络摄像头的实时视频供稿,可通过OpenCV访问。当相机读取图像时,会得到每个帧,并将其发送到单独的进程进行处理。该过程基于对图像进行的计算返回文本。然后将文本显示在图像上。我需要实时显示视频流,如果文本和所显示的视频之间存在滞后就可以了(即,如果文本适用于上一帧,那就可以了)。

也许更容易想到的是我正在对摄像头看到的图像进行识别。我一次将一帧发送到一个单独的过程中,以对该帧进行识别分析,然后将文本发送回以作为字幕添加到实时供稿中。显然,处理比从网络摄像头抓取帧并显示它们要花费更多的时间,因此,如果字幕说明和网络摄像头提要显示的内容有所延迟,这是可以接受的,也是可以预期的。

现在发生的是,由于其他进程,我正在显示的实时视频出现滞后(当我不将帧发送到进程进行计算时,就没有滞后)。我还确保一次只排队一帧,这样可以避免队列重载并导致延迟。我已经更新了下面的代码以反射(reflect)此细节。

我在python中使用多处理模块来帮助加快主程序的速度。但是我相信我可能做错了什么,因为我认为计算不是并行进行的。

我希望程序在主进程中从视频流中读取图像,然后将帧传递给两个子进程,对它们进行计算,然后将文本(包含计算结果)发送回主进程。

但是,当我使用多重处理时,主要流程似乎滞后了,运行速度几乎是没有多重处理时的一半,这使我相信这些流程不会完全并行运行。

经过一些研究,我推测延迟可能是由于使用队列在进程之间进行通信(将图像从主对象传递到子对象,以及将文本从子对象传递到主对象)造成的。

但是,我注释掉了计算步骤,只是让主进程传递了一个图像,而子进程返回了空白文本,在这种情况下,主进程根本没有变慢。它全速运行。

因此,我相信

1)我不是最优地使用多处理

或者

2)这些进程不能真正并行运行(我会理解有些滞后,但是这会使主进程减慢一半)。

这是我的代码的概述。只有一个消费者而不是2个消费者,但是两个消费者几乎是相同的。如果有人可以提供指导,我将不胜感激。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        while True:
            image = self.task_queue.get()
            #Do computations on image
            self.result_queue.put("text")

        return

import cv2

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False

while rval:
    if tasks.empty():
       tasks.put(image)
    else:
       text = tasks.get()
       #Add text to frame
       cv2.putText(frame,text)

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()

最佳答案



在上面的生产者/消费者实现中,将任务放入消费者要执行的队列的生产者需要与主过程/控制过程分开,以便它可以与主过程并行添加任务处理从结果队列读取输出。

尝试以下方法。在使用者进程中增加了 sleep 以模拟处理,并添加了第二个使用者以表明它们正在并行运行。

限制任务队列的大小也是一个好主意,如果处理无法跟上输入流的速度,可以避免它因内存使用而耗尽。可以在调用Queue(<size>)时指定大小。如果队列是该大小,则对.put的调用将阻塞,直到队列未满为止。

import time
import multiprocessing
import cv2

class ImageProcessor(multiprocessing.Process):

    def __init__(self, tasks_q, results_q):
        multiprocessing.Process.__init__(self)
        self.tasks_q = tasks_q
        self.results_q = results_q

    def run(self):
        while True:
            image = self.tasks_q.get()
            # Do computations on image
            time.sleep(1)
            # Display the result on stream
            self.results_q.put("text")

# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()

def capture_display_video(vc):
    rval, frame = vc.read()
    while rval:
        image = frame.get_image()
        if not tasks_q.full():
            tasks_q.put(image)
        if not results_q.empty():
            text = results_q.get()
            cv2.putText(frame, text)
        cv2.imshow("preview", frame)
        rval, frame = vc.read()

cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
    raise Exception("Cannot capture video")

capture_display_video(vc)
processor.terminate()

10-07 13:06
查看更多