1、subprocess调用系统的命令
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess import sys completed = subprocess.run(['ls',sys.argv[1],'-l']) print('运行结果',completed.returncode)
运行效果
[root@ mnt]# python3 subprocess_os_system.py /mnt/ total 4 -rw-r--r-- 1 root root 172 Dec 2 21:38 subprocess_os_system.py
2、subprocess利用shell进程运行命令
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess completed=subprocess.run('echo $HOME',shell=True) print('执行返回码:',completed.returncode)
运行效果
[root@ mnt]# python3 subprocess_shell_variables.py /root 执行返回码: 0
3、subprocess错误的处理
(1)、check=True,会检查退出码,如果返回非0,则抛出subprocess.CalledProcessError异常
(2)、check=False,不会检查退出码
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess try: subprocess.run(['false'], check=True) except subprocess.CalledProcessError as err: print('运行错误:', err)
运行效果
[root@ mnt]# python3 subprocess_run_check.py 运行错误: Command '['false']' returned non-zero exit status 1.
4、subprocess管道显示运行结果
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess import sys completed = subprocess.run(['ls', sys.argv[1], '-l'], stdout=subprocess.PIPE) print('completed.returncode: ',completed.returncode) print('completed.stdout: \n',str(completed.stdout,'utf-8'))
运行效果
[root@ mnt]# python3 subprocess_run_output.py /mnt/ completed.returncode: 0 completed.stdout: total 4 -rw-r--r-- 1 root root 269 Dec 2 22:15 subprocess_run_output.py
5、 subprocess管道显示以及异常的捕捉
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess try: completed = subprocess.run('echo to stdout;' 'echo to stderr 1>&2;' 'exit 1', check=True, shell=True, stdout=subprocess.PIPE) except subprocess.CalledProcessError as err: print('执行错误: ', err) else: print('completed.returncode: ', completed.returncode) print('completed.stdout: ', str(completed.stdout, encoding='utf-8'))
运行效果
[root@ mnt]# python3 subprocess_run_output_error.py to stderr 执行错误: Command 'echo to stdout;echo to stderr 1>&2;exit 1' returned non-zero exit status 1.
6、subprocess管道显示和管道错误信息的显示
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess try: completed = subprocess.run('echo to stdout;' 'echo to stderr 1>&2;' 'exit 1', check=False,#如果check=False,则不抛出 subprocess.CalledProcessError 异常 shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError as err: print('执行错误: ', err) else: print('completed.returncode: ', completed.returncode) print('completed.stdout: ', completed.stdout.decode('utf-8')) print('completed.stderr: ', completed.stderr.decode('utf-8'))
运行效果
[root@ mnt]# python3 subprocess_run_output_error_trap.py completed.returncode: 1 completed.stdout: to stdout completed.stderr: to stderr
7、subprocess之check_output函数,实现错误信息与命令执行结果一起输出显示
import subprocess try: completed = subprocess.check_output('echo to stdout; echo to stderr 1>&2;', shell=True, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print('执行错误: ', err) else: print('completed.returncode: ', completed.returncode) print('completed.stdout: ', str(completed.stdout, encoding='utf-8'))
运行效果
[root@ mnt]# python3 subprocess_run_output_error_trap.py completed: b'to stdout\nto stderr\n' completed.stdout: to stdout to stderr
8、subprocess抑制输出,相当于把标准输出或标准错误写入/dev/null
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess try: completed = subprocess.run('echo to stdout; echo to stderr 1>&2;', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as err: print('执行错误: ', err) else: print('completed.returncode: ', completed.returncode) print('completed.stdout: ', completed.stdout) print('completed.stderr: ', completed.stderr)
运行效果
[root@ mnt]# python3 subprocess_run_output_error_suppress.py completed.returncode: 0 completed.stdout: None completed.stderr: None
8、subprocess直接使用管道处理读取
由于call()、run()、check_output()都是Popen类的装饰器,直接使用Popen更容易的控制命令执行结果的标准的输入输出
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess proc = subprocess.Popen(['echo', 'Popen 输出'], stdout=subprocess.PIPE, ) stdout_value = proc.communicate()[0].decode('utf-8') print('stdout:', repr(stdout_value)) #返回一个对象的 string 格式。
运行效果
[root@ mnt]# python3 subprocess_popen_read.py stdout: 'Popen 输出\n'
9、subprocess直接使用管道处理写数据
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess print('write:') proc = subprocess.Popen( ['cat', '-'], stdin=subprocess.PIPE, ) proc.communicate('我写入数据\n'.encode('utf-8'))
运行效果
[root@ mnt]# python3 subprocess_popen_write.py write: 我写入数据
10、 Popen进程双向通道,实现同时完成标准的读写
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess proc = subprocess.Popen( ['cat', '-'], stdout=subprocess.PIPE, stdin=subprocess.PIPE ) msg = '我是标准输入的内容 stdin'.encode('utf-8') stdout_value = proc.communicate(msg)[0].decode('utf-8') print('stdout_value: ', stdout_value)
运行效果
[root@ mnt]# python3 subprocess_popen2.py
stdout_value: 我是标准输入的内容 stdin
11、Popen捕获错误输出,实现标准错误信息的显示
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess proc = subprocess.Popen( 'cat -;echo "to stderr" 1>&2', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) msg = '我是标准输入的内容 stdin'.encode('utf-8') stdout_value, stderr_value = proc.communicate(msg) print('stdout_value: ', repr(stdout_value.decode('utf-8'))) print('stderr_value: ', repr(stderr_value.decode('utf-8')))
运行效果
[root@ mnt]# python3 subprocess_popen3.py stdout_value: '我是标准输入的内容 stdin' stderr_value: 'to stderr\n'
12、Popen把标准错误切换标准输出显示
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess proc = subprocess.Popen( 'cat -;echo "to stderr" 1>&2', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT ) msg = '我是标准输入的内容 stdin'.encode('utf-8') stdout_value, stderr_value = proc.communicate(msg) print('stdout_value: ', repr(stdout_value.decode('utf-8'))) print('stderr_value: ', repr(stderr_value))
运行效果
[root@ mnt]# python3 subprocess_popen4.py stdout_value: '我是标准输入的内容 stdinto stderr\n' stderr_value: None
13、Popen管道之间的连接,相当于Linux命令管道传送
如 linux命令
[root@ mnt]# cat cz.txt | grep 1400268947 | head -n 5 1400268947 1427 1400268947 613 1400268947 493 1400268947 18176 1400268947 5208
用python实现如上命令的方法
#!/usr/bin/env python # -*- coding: utf-8 -*- import subprocess cat = subprocess.Popen( ['cat', '/mnt/cz.txt'], stdout=subprocess.PIPE, ) grep = subprocess.Popen( ['grep', '1400268947'], stdin=cat.stdout, stdout=subprocess.PIPE, ) head = subprocess.Popen( ['head', '-n', '5'], stdin=grep.stdout, stdout=subprocess.PIPE, ) result_stdout = head.communicate()[0].decode('utf-8') print(result_stdout)
运行效果
[root@ mnt]# python3 subprocess_pipes.py 1400268947 1427 1400268947 613 1400268947 493 1400268947 18176 1400268947 5208
14、Popen命令之间的交互
#!/usr/bin/env python # -*- coding: utf-8 -*- #中继程序 import sys sys.stderr.write('repeater.py: starting\n') sys.stderr.flush() while True: next_line = sys.stdin.readline() sys.stderr.flush() if not next_line: break sys.stdout.write(next_line) sys.stdout.flush() sys.stderr.write('repeater.py: exiting\n') sys.stderr.flush()
#!/usr/bin/env python # -*- coding: utf-8 -*- #交互的程序 import io import subprocess print('#####一次一行输出#####') # 创建一个Popen对象,执行的命令结果通过管道传输 proc = subprocess.Popen( 'python3 /mnt/repeater.py', shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) # 创建输入的缓冲区,设置每发一次数据,单独一行 stdin = io.TextIOWrapper( proc.stdin, encoding='utf-8', line_buffering=True, # send data on newline ) # 创建输出的缓冲区 stdout = io.TextIOWrapper( proc.stdout, encoding='utf-8', ) for i in range(5): line = '{}\n'.format(i) stdin.write(line) # 写入缓冲区 output = stdout.readline() # 从输出缓冲区读取一行 print(output.rstrip()) # 去除右边的空格 remainder = proc.communicate()[0].decode('utf-8') # 读取管理的结果 print(remainder) print('#####一次全部输出#####') proc = subprocess.Popen( 'python3 /mnt/repeater.py', shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) stdin = io.TextIOWrapper( proc.stdin, encoding='utf-8', ) for i in range(5): line = '{}\n'.format(i) stdin.write(line) stdin.flush() output = proc.communicate()[0].decode('utf-8') print(output)
运行效果
[root@ mnt]# python3 repeater.py #开启中继程序监听数据收和发 repeater.py: starting [root@ mnt]# python3 interaction.py #####一次一行输出##### repeater.py: starting 0 1 2 3 4 repeater.py: exiting #####一次全部输出##### repeater.py: starting repeater.py: exiting 0 1 2 3 4
15、利于模块signal实现进程间信号的传递,即是利父程去管理子进程,使用os.kill关闭子进程未运行完成的程序
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import signal import time import sys pid = os.getpid() received = False def signal_usr1(signum, frame): "Callback invoked when a signal is received" global received received = True print('CHILD {:>6}: Received USR1'.format(pid)) sys.stdout.flush() print('CHILD {:>6}: 设置信号处理程序'.format(pid)) sys.stdout.flush() signal.signal(signal.SIGUSR1, signal_usr1) print('CHILD {:>6}: 暂停等待信号'.format(pid)) sys.stdout.flush() time.sleep(5) if not received: print('CHILD {:>6}: 从未接收到信号'.format(pid))
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import signal import subprocess import time import sys proc = subprocess.Popen(['python3', 'signal_child.py']) print('PARENT : Pausing before sending signal...') sys.stdout.flush() time.sleep(1) print('PARENT : Signaling child') sys.stdout.flush() os.kill(proc.pid, signal.SIGUSR1)
运行结果
[root@ mnt]# python3 signal_parent.py PARENT : Pausing before sending signal... CHILD 9070: 设置信号处理程序 CHILD 9070: 暂停等待信号 PARENT : Signaling child CHILD 9070: Received USR1
16、进程组和会话
问题:如果使用Popen创建了子进程,那么创建的子进程就不会接收到父进程的信号,问题代码如下:
import os import signal import subprocess import tempfile import time import sys script = '''#!/bin/sh echo "Shell script in process $$" set -x python3 /mnt/signal_child.py ''' #在/tmp目录下,将script写入一个临时文件 script_file = tempfile.NamedTemporaryFile('wt') script_file.write(script) script_file.flush() proc = subprocess.Popen(['sh', script_file.name]) #script_file.name ==> /tmp/tmprizk8l2p print('PARENT : Pausing before signaling {}...'.format(proc.pid)) sys.stdout.flush() time.sleep(1) print('PARENT : Signaling child {}'.format(proc.pid)) sys.stdout.flush() os.kill(proc.pid, signal.SIGUSR1) time.sleep(3)
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import signal import time import sys pid = os.getpid() received = False def signal_usr1(signum, frame): "Callback invoked when a signal is received" global received received = True print('CHILD {:>6}: Received USR1'.format(pid)) sys.stdout.flush() print('CHILD {:>6}: 设置信号处理程序'.format(pid)) sys.stdout.flush() signal.signal(signal.SIGUSR1, signal_usr1) print('CHILD {:>6}: 暂停等待信号'.format(pid)) sys.stdout.flush() time.sleep(3) if not received: print('CHILD {:>6}: 从未接收到信号'.format(pid))
运行效果
[root@ mnt]# python3 subprocess_signal_parent_shell.py Shell script in process 9176 + python3 /mnt/signal_child.py PARENT : Pausing before signaling 9176... CHILD 9177: 设置信号处理程序 CHILD 9177: 暂停等待信号 PARENT : Signaling child 9176 CHILD 9177: 从未接收到信号
#以上由Popen创建了子进程shell,shell解释器又创建进程调用signal_child.py程序,所以产生进程id不一样,导致进程之间无法相互发信号通讯。
利用进程组之间的通信,解决进程之间pid识别不到,导致信号传输失败的问题
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import signal import subprocess import tempfile import time import sys def show_setting_prgrp(): print('Calling os.setpgrp() from {}'.format(os.getpid())) os.setpgrp() print('Process group is now {}'.format(os.getpgrp())) sys.stdout.flush() script = '''#!/bin/sh echo "Shell script in process $$" set -x python3 /mnt/signal_child.py ''' script_file = tempfile.NamedTemporaryFile('wt') script_file.write(script) script_file.flush() proc = subprocess.Popen( ['sh', script_file.name], preexec_fn=show_setting_prgrp, ) print('PARENT : Pausing before signaling {}...'.format(proc.pid)) sys.stdout.flush() time.sleep(1) print('PARENT : Signaling process group {}'.format(proc.pid)) sys.stdout.flush() os.killpg(proc.pid, signal.SIGUSR1) time.sleep(3)
运行效果
[root@ mnt]# python3 subprocess_signal_setpgrp.py Calling os.setpgrp() from 9553 Process group is now 9553 Shell script in process 9553 + python3 /mnt/signal_child.py PARENT : Pausing before signaling 9553... CHILD 9554: 设置信号处理程序 CHILD 9554: 暂停等待信号 PARENT : Signaling process group 9553 CHILD 9554: Received USR1