编辑:
我对视频流是什么有疑问,所以我将提供更多的清晰度。该流是来自我的网络摄像头的实时视频供稿,可通过OpenCV访问。当相机读取图像时,会得到每个帧,并将其发送到单独的进程进行处理。该过程基于对图像进行的计算返回文本。然后将文本显示在图像上。我需要实时显示视频流,如果文本和所显示的视频之间存在滞后就可以了(即,如果文本适用于上一帧,那就可以了)。
也许更容易想到的是我正在对摄像头看到的图像进行识别。我一次将一帧发送到一个单独的过程中,以对该帧进行识别分析,然后将文本发送回以作为字幕添加到实时供稿中。显然,处理比从网络摄像头抓取帧并显示它们要花费更多的时间,因此,如果字幕说明和网络摄像头提要显示的内容有所延迟,这是可以接受的,也是可以预期的。
现在发生的是,由于其他进程,我正在显示的实时视频出现滞后(当我不将帧发送到进程进行计算时,就没有滞后)。我还确保一次只排队一帧,这样可以避免队列重载并导致延迟。我已经更新了下面的代码以反射(reflect)此细节。
我在python中使用多处理模块来帮助加快主程序的速度。但是我相信我可能做错了什么,因为我认为计算不是并行进行的。
我希望程序在主进程中从视频流中读取图像,然后将帧传递给两个子进程,对它们进行计算,然后将文本(包含计算结果)发送回主进程。
但是,当我使用多重处理时,主要流程似乎滞后了,运行速度几乎是没有多重处理时的一半,这使我相信这些流程不会完全并行运行。
经过一些研究,我推测延迟可能是由于使用队列在进程之间进行通信(将图像从主对象传递到子对象,以及将文本从子对象传递到主对象)造成的。
但是,我注释掉了计算步骤,只是让主进程传递了一个图像,而子进程返回了空白文本,在这种情况下,主进程根本没有变慢。它全速运行。
因此,我相信
1)我不是最优地使用多处理
或者
2)这些进程不能真正并行运行(我会理解有些滞后,但是这会使主进程减慢一半)。
这是我的代码的概述。只有一个消费者而不是2个消费者,但是两个消费者几乎是相同的。如果有人可以提供指导,我将不胜感激。
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")
return
import cv2
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
最佳答案
在上面的生产者/消费者实现中,将任务放入消费者要执行的队列的生产者需要与主过程/控制过程分开,以便它可以与主过程并行添加任务处理从结果队列读取输出。
尝试以下方法。在使用者进程中增加了 sleep 以模拟处理,并添加了第二个使用者以表明它们正在并行运行。
限制任务队列的大小也是一个好主意,如果处理无法跟上输入流的速度,可以避免它因内存使用而耗尽。可以在调用Queue(<size>)
时指定大小。如果队列是该大小,则对.put
的调用将阻塞,直到队列未满为止。
import time
import multiprocessing
import cv2
class ImageProcessor(multiprocessing.Process):
def __init__(self, tasks_q, results_q):
multiprocessing.Process.__init__(self)
self.tasks_q = tasks_q
self.results_q = results_q
def run(self):
while True:
image = self.tasks_q.get()
# Do computations on image
time.sleep(1)
# Display the result on stream
self.results_q.put("text")
# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()
def capture_display_video(vc):
rval, frame = vc.read()
while rval:
image = frame.get_image()
if not tasks_q.full():
tasks_q.put(image)
if not results_q.empty():
text = results_q.get()
cv2.putText(frame, text)
cv2.imshow("preview", frame)
rval, frame = vc.read()
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
raise Exception("Cannot capture video")
capture_display_video(vc)
processor.terminate()