我正在OSX上读取一个网络摄像头,该摄像头可以通过以下简单脚本正常运行:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
现在,我想在一个单独的过程中读取视频,为此我拥有更长的脚本,并且可以在Linux上的一个单独过程中正确读取视频:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
main()
这在Linux上可以正常工作,但是在OSX上却遇到了麻烦,因为它似乎无法在创建的
.read()
对象(存储在var cv2.VideoCapture(device)
中)上执行self._cap
。经过一番搜索,我发现this SO answer,建议使用Billiard,它是python多处理的替代品,据说它有一些非常有用的改进。因此,在文件的顶部,我只是在先前的多处理导入之后添加了导入(有效地覆盖了
multiprocessing.Process
):from billiard import Process, forking_enable
在
video_process
变量实例化之前,我使用forking_enable
,如下所示:forking_enable(0) # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))
因此,在这个版本(here on pastebin)中,我再次运行该文件,这给了我这个错误:
搜索该错误导致我找到an SO question with a long list of answers,其中一个建议是使用dill serialization lib解决此问题。但是,该lib应该与Pathos multiprocessing fork一起使用。所以我只是尝试从更改我的多处理导入行
from multiprocessing import Array, Value, Process
至
from pathos.multiprocessing import Array, Value, Process
但是
Array
包中似乎不存在Value
,Process
和pathos.multiprocessing
。从这一点上我完全迷失了。我正在搜索我几乎不了解的东西,甚至不知道该朝哪个方向搜索或调试。
那么,有比我更聪明的人可以帮助我在单独的过程中捕获视频吗?欢迎所有提示!
最佳答案
您的第一个问题是您无法通过forked
进程访问网络摄像头。当外部库与fork
一起使用时会出现几个问题,因为fork操作不能清除父进程打开的所有文件描述符,从而导致奇怪的行为。该库通常对于Linux上的此类问题更健壮,但是在两个进程之间共享IO对象(例如cv2.VideoCapture
)不是一个好主意。
当您使用billard.forking_enabled
并将其设置为False
时,您要求库不要使用fork
来生成新进程,而是使用spawn
或forkserver
方法来生成新进程,因为它们关闭了所有文件描述符,但它们启动起来也较慢,因此更干净,这不应该是在您的情况下。如果您使用的是python3.4+
,则可以使用multiprocessing.set_start_method('forkserver')
进行此操作。
当您使用这些方法之一时,目标函数和参数需要序列化以传递给子进程。序列化默认情况下使用pickle
完成,正如您提到的那样,它具有多个流程,无法序列化本地定义的对象以及cv2.VideoCapture
。但是您可以简化程序,使Process
的所有参数都可腌制。这是解决您的问题的一种尝试:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
set_start_method("spawn")
main()
我目前无法在Mac上对其进行测试,因此它可能无法立即使用,但不应出现与
multiprocessing
相关的错误。一些注意事项:cv2.VideoCapture
对象,并抓取相机,因为仅应从相机读取一个进程。 fork
的程序中的问题仅是由于共享的cv2.VideoCapture
所致,在stream
函数中重新创建它可以解决您的问题。 mp.Array
缓冲区(这确实很奇怪,花了我一段时间才弄清楚)。您需要显式传递Array
并重新创建包装器。 fork
的程序中的问题仅是由于共享的cv2.VideoCapture
所致,在stream
函数中重新创建它可以解决您的问题。 python3.4+
中运行代码,所以我没有使用billard
,但使用forking_enabled(False)
而不是set_start_method
应该有点类似。 让我知道这项工作!
关于python - 如何在OSX上的单独过程中读取网络摄像头?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43572744/