我正在用python测试子流程管道。我知道我可以直接在python中执行下面的程序,但这不是重点。我只想测试管道,以便知道如何使用它。

我的系统是带有默认python 2.6的Linux Ubuntu 9.04。

我从documentation example开始。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

那行得通,但是因为p1stdin没有被重定向,所以我必须在终端中输入内容来填充管道。当我键入^D关闭stdin时,我得到了想要的输出。

但是,我想使用python字符串变量将数据发送到管道。首先,我尝试在stdin上编写:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没用我尝试在最后一行使用p2.stdout.read()代替,但它也会阻塞。我添加了p1.stdin.flush()p1.stdin.close(),但是它也不起作用。我然后我开始交流:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]

所以还不是。

我注意到运行单个进程(如上面的p1,删除p2)非常有效。并且将文件句柄传递给p1(stdin=open(...))也可以。所以问题是:

是否可以在不阻塞的情况下将数据传递给python中2个或更多子进程的管道?为什么不?

我知道我可以运行一个Shell并在Shell中运行管道,但这不是我想要的。

更新1 :按照下面的Aaron Digulla的提示,我现在尝试使用线程来使其工作。

首先,我尝试在线程上运行p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好的,没用。尝试了其他组合,例如将其更改为.write()以及p2.read()。没有。现在让我们尝试相反的方法:
def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终在某处阻塞。在派生线程中,或在主线程中,或两者都存在。所以它没有用。如果您知道如何使它工作,那么可以提供工作代码会更容易。我在这里尝试。

更新2

Paul Du Bois在下方提供了一些信息,所以我做了更多测试。
我已经阅读了整个subprocess.py模块,并了解了它的工作原理。因此,我尝试将其确切地应用于代码。

我在Linux上,但是由于我正在使用线程进行测试,因此我的第一种方法是复制subprocess.pycommunicate()方法上看到的确切Windows线程代码,但是要复制两个进程而不是一个进程。这是我尝试过的全部 list :
import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

出色地。没用即使在调用p1.stdin.close()之后,p2.stdout.read()仍然会阻塞。

然后我在subprocess.py上尝试了posix代码:
import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

还会阻止select.select()。通过在周围散布print,我发现了这一点:
  • 正在读取。代码在执行期间被读取多次。
  • 写作也可以。数据被写入p1.stdin
  • numwrites的末尾,调用p1.stdin.close()
  • select()开始阻止时,只有to_read具有某些内容p2.stdoutto_write已经为空。
  • os.read()调用始终返回某些内容,因此永远不会调用p2.stdout.close()

  • 这两个测试的结论:关闭管道上第一个进程的stdin(在示例中为grep)并不会使它的缓冲输出转储到下一个并死亡。

    没有办法使其工作吗?

    PS:我不想使用临时文件,我已经对文件进行了测试,并且知道它可以工作。而且我不想使用Windows。

    最佳答案

    我知道了怎么做。

    它与线程无关,与select()也无关。

    当我运行第一个进程(grep)时,它将创建两个低级文件描述符,每个管道一个。让我们将它们称为ab

    当我运行第二个过程时,b被传递给cut sdtin。但是Popen-close_fds=False上有一个让人脑瘫的默认设置。

    这样做的结果是cut也继承了a。因此,即使我关闭grepa也不会死,因为stdin在cut的进程中仍然处于打开状态(cut忽略了它)。

    现在,以下代码可以完美运行。

    from subprocess import Popen, PIPE
    
    p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
    p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
    p1.stdin.write('Hello World\n')
    p1.stdin.close()
    result = p2.stdout.read()
    assert result == "Hello Worl\n"
    

    unix系统上的 close_fds=True应该默认为。在Windows上,它将关闭所有 fds的,因此可以防止管道传输。

    编辑:

    PS:对于有类似问题的人,请阅读以下答案:pooryorick在评论中说,如果写入p1.stdin的数据大于缓冲区,这也可能会阻塞。在这种情况下,您应该将数据分成较小的块,并使用select.select()知道何时进行读取/写入。问题中的代码应提示如何实现。

    EDIT2:找到了另一个解决方案,在pooryorick的更多帮助下-除了使用close_fds=True并关闭 ALL fds外,还可以在执行第二个过程时关闭属于第一个过程的fd,它将起作用。关闭操作必须在子进程中完成,因此Popen的preexec_fn函数非常有用。在执行p2时,您可以执行以下操作:
    p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
    

    10-08 08:04