我正在用python测试子流程管道。我知道我可以直接在python中执行下面的程序,但这不是重点。我只想测试管道,以便知道如何使用它。
我的系统是带有默认python 2.6的Linux Ubuntu 9.04。
我从documentation example开始。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
那行得通,但是因为
p1
的stdin
没有被重定向,所以我必须在终端中输入内容来填充管道。当我键入^D
关闭stdin时,我得到了想要的输出。但是,我想使用python字符串变量将数据发送到管道。首先,我尝试在stdin上编写:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没用我尝试在最后一行使用
p2.stdout.read()
代替,但它也会阻塞。我添加了p1.stdin.flush()
和p1.stdin.close()
,但是它也不起作用。我然后我开始交流:p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
所以还不是。
我注意到运行单个进程(如上面的
p1
,删除p2
)非常有效。并且将文件句柄传递给p1
(stdin=open(...)
)也可以。所以问题是:是否可以在不阻塞的情况下将数据传递给python中2个或更多子进程的管道?为什么不?
我知道我可以运行一个Shell并在Shell中运行管道,但这不是我想要的。
更新1 :按照下面的Aaron Digulla的提示,我现在尝试使用线程来使其工作。
首先,我尝试在线程上运行p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好的,没用。尝试了其他组合,例如将其更改为
.write()
以及p2.read()
。没有。现在让我们尝试相反的方法:def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终在某处阻塞。在派生线程中,或在主线程中,或两者都存在。所以它没有用。如果您知道如何使它工作,那么可以提供工作代码会更容易。我在这里尝试。
更新2
Paul Du Bois在下方提供了一些信息,所以我做了更多测试。
我已经阅读了整个
subprocess.py
模块,并了解了它的工作原理。因此,我尝试将其确切地应用于代码。我在Linux上,但是由于我正在使用线程进行测试,因此我的第一种方法是复制
subprocess.py
的communicate()
方法上看到的确切Windows线程代码,但是要复制两个进程而不是一个进程。这是我尝试过的全部 list :import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
出色地。没用即使在调用
p1.stdin.close()
之后,p2.stdout.read()
仍然会阻塞。然后我在
subprocess.py
上尝试了posix代码:import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
还会阻止
select.select()
。通过在周围散布print
,我发现了这一点:p1.stdin
。 numwrites
的末尾,调用p1.stdin.close()
。 select()
开始阻止时,只有to_read
具有某些内容p2.stdout
。 to_write
已经为空。 os.read()
调用始终返回某些内容,因此永远不会调用p2.stdout.close()
。 这两个测试的结论:关闭管道上第一个进程的
stdin
(在示例中为grep
)并不会使它的缓冲输出转储到下一个并死亡。没有办法使其工作吗?
PS:我不想使用临时文件,我已经对文件进行了测试,并且知道它可以工作。而且我不想使用Windows。
最佳答案
我知道了怎么做。
它与线程无关,与select()也无关。
当我运行第一个进程(grep
)时,它将创建两个低级文件描述符,每个管道一个。让我们将它们称为a
和b
。
当我运行第二个过程时,b
被传递给cut
sdtin
。但是Popen
-close_fds=False
上有一个让人脑瘫的默认设置。
这样做的结果是cut
也继承了a
。因此,即使我关闭grep
,a
也不会死,因为stdin在cut
的进程中仍然处于打开状态(cut
忽略了它)。
现在,以下代码可以完美运行。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
unix系统上的
close_fds=True
应该默认为。在Windows上,它将关闭所有 fds的,因此可以防止管道传输。编辑:
PS:对于有类似问题的人,请阅读以下答案:pooryorick在评论中说,如果写入
p1.stdin
的数据大于缓冲区,这也可能会阻塞。在这种情况下,您应该将数据分成较小的块,并使用select.select()
知道何时进行读取/写入。问题中的代码应提示如何实现。EDIT2:找到了另一个解决方案,在pooryorick的更多帮助下-除了使用
close_fds=True
并关闭 ALL fds外,还可以在执行第二个过程时关闭属于第一个过程的fd
,它将起作用。关闭操作必须在子进程中完成,因此Popen的preexec_fn
函数非常有用。在执行p2时,您可以执行以下操作:p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)