asynchronous - Python: Feed and parse stream of data to and from external program with additional input and output files -
the problem: have poorly designed fortran program (i cannot change it, i'm stuck it) takes text input stdin , other input files, , writes text output results stdout , other output files. size of input , out quite large, , avoid writing hard drive (slow operation). have written function iterates on lines of several input files, , have parsers multiple output. don't know if program first read input , starts output, or starts outputting while reading input.
the goal: have function feeds external program wants, , parses output comes program, without writing data text files on hard drive.
research: naive way using files is:
from subprocess import pipe, popen def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files): filename, file_iter in input_files.iteritems(): open(filename ,'w') f: line in file_iter: f.write(line + '\n') p_sub = popen( shlex.split(cmd), stdin = pipe, stdout = open('stdout.txt', 'w'), stderr = open('stderr.txt', 'w'), bufsize=1 ) line in stdin_iter: p_sub.stdin.write(line + '\n') p_sub.stdin.close() p_sub.wait() data = {} filename, parse_func in output_files.iteritems(): # stdout.txt , stderr.txt included here open(filename,'r') f: data[filename] = parse_func( iter(f.readline, b'') ) return data
i have tried , subprocess module execute external program together. additional input/output files handled named pipes , multiprocessing. want feed stdin iterator (which returns lines input), save stderr in list, , parse stdout comes external program. input , output can quite large, using communicate
not feasible.
i have parser on format:
def parser(iterator): line in iterator: # if condition: break some_other_function(iterator) return data
i looked @ solution using select
choose appropriate stream, don't know how make work stdout parser , how feed stdin.
i asyncio module, can see have same problem parsing of stout.
you should use named pipes input , output fortran program avoid writing disk. then, in consumer, can use threads read each of program's output sources , add information queue in-order processing.
to model this, created python app daemon.py
reads standard input , returns square root until eof. logs input log file specified command-line argument , prints square root stdout , errors stderr. think simulates program (of course number of output files one, can scaled). can view source code test application here. note explicit call stdout.flush()
. default, standard output print buffered, means output @ end , messages not arrive in order. hope fortran application not buffer output. believe sample application not run on windows, due unix-only use of select
, shouldn't matter in case.
i have consumer application starts daemon application subprocess, stdin, stdout , stderr redirected subprocess.pipe
s. each of these pipes given different thread, 1 give input, , 3 handle log file, errors , standard output respectively. add messages shared queue
main thread reads , sends parser.
this consumer's code:
import os, random, time import subprocess import threading import queue import atexit def setup(): # make named pipe every file program should write logfilepipe='logpipe' os.mkfifo(logfilepipe) def cleanup(): # put named pipes here cleaned logfilepipe='logpipe' os.remove(logfilepipe) # run our cleanup code no matter - avoid leaving pipes laying around # if terminate ctrl-c atexit.register(cleanup) # example iterator supplies input program. have iterator # don't worry this. returns random input sample_data list # until maximum number of iterations reached. class myiter(): sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94] def __init__(self, numiterations=1000): self.numiterations=numiterations self.current = 0 def __iter__(self): return self def next(self): self.current += 1 if self.current > self.numiterations: raise stopiteration else: return random.choice(self.__class__.sample_data) # parse_func function - print out [tag] showing source. def parse_func(source,line): print "[%s] %s" % (source,line) # generic function sending standard input problem. # p - process handle returned subprocess def input_func(p, queue): # run command output redirected line in myiter(30): # limit testing purposes time.sleep(0.1) # sleep tiny bit p.stdin.write(str(line)+'\n') queue.put(('input', line)) p.stdin.close() p.wait() # once our process has ended, tell main thread quit queue.put(('quit', true)) # generic function reading output program. source can either # named pipe identified string, or subprocess.pipe stdout , stderr. def read_output(source, queue, tag=none): print "starting read output %r" % source if isinstance(source,str): # file or named pipe, open source=open(source, 'r') # open file string name line = source.readline() # enqueue , read lines until eof while line != '': queue.put((tag, line.rstrip())) line = source.readline() if __name__=='__main__': cmd='daemon.py' # set our fifos instead of using files - put file names setup() , cleanup() setup() logfilepipe='logpipe' # message queue handling output, whether it's stdout, stderr, or file output our command lq = queue.queue() # open subprocess command print "running command." p = subprocess.popen(['/path/to/'+cmd,logfilepipe], stdin=subprocess.pipe, stdout=subprocess.pipe, stderr=subprocess.pipe) # start threads handle input , output threading.thread(target=input_func, args=(p, lq)).start() threading.thread(target=read_output, args=(p.stdout, lq, 'output')).start() threading.thread(target=read_output, args=(p.stderr, lq, 'errors')).start() # open thread read other output files (e.g. log file) named pipes threading.thread(target=read_output, args=(logfilepipe, lq, 'log')).start() # combine results our threads want run=true while(run): (tag, line) = lq.get() if tag == 'quit': run=false else: parse_func(tag, line)
my iterator returns random input value (some of junk cause errors). yours should drop-in replacement. program run until end of input , wait subprocess complete before enqueueing quit
message main thread. parse_func
super simple, printing out message , source, should able work something. function read output source designed work both pipes , strings - don't open pipes on main thread because block until input available. file readers (e.g. reading log files), it's better have child thread open file , block. however, spawn subprocess on main thread can pass handles stdin, stdout , stderr respective child threads.
based partially on this python implementation of multitail.
Comments
Post a Comment