在python中长时间运行的子进程上写入标准输入并从标准输出读取

在python中长时间运行的子进程上写入标准输入并从标准输出读取

本文介绍了在python中长时间运行的子进程上写入标准输入并从标准输出读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个长时间运行的计算模型,我希望使用 STDINSTDOUT 控制、提供数据和从中读取数据.在这个外部代码中,有一个控制反馈循环,每 100 毫秒左右需要来自 STDIN 的新数据.

I have a long-running computational model that i wish to control, feed data to, and read data from using STDIN and STDOUT. Inside this external code, there's a control feedback loop that needs new data from STDIN every 100ms or so.

出于这个原因,subprocess.communicate() 不合适,因为它等待进程完成.该流程的预计运行时间约为数周.

for this reason, subprocess.communicate() is not appropriate, since it waits for the process to finish. The process' estimated runtime is on the order of several weeks.

下面的代码不起作用,因为控制挂在 stdout.read() 上并且永远不会回来.

The code below doesn't work because control hangs on stdout.read() and never comes back.

谈论 STDIN 和 STDOUT 的正确方法是什么?

What is the correct way to talk over STDIN and STDOUT ?

import subprocess as sb


class Foo:
    def process_output(self,values: str) -> ():
        """ gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """
        floats = [float(f) for f in values.split(',') if values and f]
        # if len(floats) == 7:
        mag = (floats[0], floats[1], floats[2])
        quat = (floats[3], floats[4], floats[5], floats[6])
        return (mag, quat)

    def format_input(self,invals: {}) -> bytes:
        """ takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline"""
        concat = lambda s, f: ''.join([f % x for x in s])
        retval = ''
        retval += concat(invals['mag_meas'], '%3.2f,')
        retval += concat(invals['euler_angle'], '%3.2f,')
        retval += concat(invals['sun_meas'], '%3.2f,')
        retval += concat(invals['epoch'], '%02.0f,')
        retval += concat(invals['lla'], '%3.2f,')
        retval += concat([invals['s_flag']], '%1.0f,')
        retval = retval[:-1]
        retval += '\n'
        return retval.encode('utf-8')

    def page(self,input: {}) -> ():
        """ send a bytestring with 19 floats to ADC.elf.  Process the returned 7 floats into a data struture"""
        formatted = self.format_input(input)
        self.pid.stdin.write(formatted)
        response = self.pid.stdout.read()

        return self.process_output(response.decode())

    def __init__(self):
        """ start the long-running process ADC.elf that waits for input and performs some scientific computation"""
        self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)

    def exit(self):
        """ send SIGTERM to ADC.elf"""
        self.pid.terminate()



if __name__ == "__main__":
    # some dummy data
    testData = {'mag_meas': [1, 2, 3],
                'euler_angle': [4, 5, 6],
                'sun_meas': [7, 8, 9],
                'epoch': [0, 1, 2, 3, 4, 5],
                'lla': [6, 7, 8],
                's_flag': 9
                }
    #initialize
    runner = Foo()
    # send and receive once.
    result = runner.page(testData)
    print(result)
    #clean up
    runner.exit()

推荐答案

不知道如何直接用 subprocess 做到这一点,但 pexpect 做的完全正确:

No idea how to do this with subprocess directly, but pexpect did exactly the right thing:

import pexpect, os
from time import sleep

class Foo:
    def process_output(self,values: str) -> ():
        floats = [float(f) for f in values.split(',') if values and f]
        # if len(floats) == 7:
        mag = (floats[0], floats[1], floats[2])
        quat = (floats[3], floats[4], floats[5], floats[6])
        return (mag, quat)

    def format_input(self,invals: {}) -> bytes:
        concat = lambda s, f: ''.join([f % x for x in s])
        retval = ''
        retval += concat(invals['mag_meas'], '%3.2f,')
        retval += concat(invals['euler_angle'], '%3.2f,')
        retval += concat(invals['sun_meas'], '%3.2f,')
        retval += concat(invals['epoch'], '%02.0f,')
        retval += concat(invals['lla'], '%3.2f,')
        retval += concat([invals['s_flag']], '%1.0f,')
        retval = retval[:-1]
        retval += '\n'
        return retval.encode('utf-8')

    def page(self,input: {}) -> ():
        formatted = self.format_input(input)
        self.pid.write(formatted)
        response = self.pid.readline()

        return self.process_output(response.decode())

    def __init__(self):

        self.pid = pexpect.spawn('./ADC.elf')
        self.pid.setecho(False)

    def exit(self):
        self.pid.terminate()



if __name__ == "__main__":
    testData = {'mag_meas': [1, 2, 3],
                'euler_angle': [4, 5, 6],
                'sun_meas': [7, 8, 9],
                'epoch': [0, 1, 2, 3, 4, 5],
                'lla': [6, 7, 8],
                's_flag': 9
                }
    runner = Foo()
    i = 0
    while i < 100:
        result = runner.page(testData)
        print(result)
        i += 1
        sleep(.1)



    runner.exit()

这篇关于在python中长时间运行的子进程上写入标准输入并从标准输出读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 19:30