asynchronous - Python: Feed and parse stream of data to and from external program with additional input and output files -
the problem: have poorly designed fortran program (i cannot change it, i'm stuck it) takes text input stdin , other input files, , writes text output results stdout , other output files. size of input , out quite large, , avoid writing hard drive (slow operation). have written function iterates on lines of several input files, , have parsers multiple output. don't know if program first read input , starts output, or starts outputting while reading input.
the goal: have function feeds external program wants, , parses output comes program, without writing data text files on hard drive.
research: naive way using files is:
from subprocess import pipe, popen def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files): filename, file_iter in input_files.iteritems(): open(filename ,'w') f: line in file_iter: f.write(line + '\n') p_sub = popen( shlex.split(cmd), stdin = pipe, stdout = open('stdout.txt', 'w'), stderr = open('stderr.txt', 'w'), bufsize=1 ) line in stdin_iter: p_sub.stdin.write(line + '\n') p_sub.stdin.close() p_sub.wait() data = {} filename, parse_func in output_files.iteritems(): # stdout.txt , stderr.txt included here open(filename,'r') f: data[filename] = parse_func( iter(f.readline, b'') ) return data i have tried , subprocess module execute external program together. additional input/output files handled named pipes , multiprocessing. want feed stdin iterator (which returns lines input), save stderr in list, , parse stdout comes external program. input , output can quite large, using communicate not feasible.
i have parser on format:
def parser(iterator): line in iterator: # if condition: break some_other_function(iterator) return data i looked @ solution using select choose appropriate stream, don't know how make work stdout parser , how feed stdin.
i asyncio module, can see have same problem parsing of stout.
you should use named pipes input , output fortran program avoid writing disk. then, in consumer, can use threads read each of program's output sources , add information queue in-order processing.
to model this, created python app daemon.py reads standard input , returns square root until eof. logs input log file specified command-line argument , prints square root stdout , errors stderr. think simulates program (of course number of output files one, can scaled). can view source code test application here. note explicit call stdout.flush(). default, standard output print buffered, means output @ end , messages not arrive in order. hope fortran application not buffer output. believe sample application not run on windows, due unix-only use of select, shouldn't matter in case.
i have consumer application starts daemon application subprocess, stdin, stdout , stderr redirected subprocess.pipes. each of these pipes given different thread, 1 give input, , 3 handle log file, errors , standard output respectively. add messages shared queue main thread reads , sends parser.
this consumer's code:
import os, random, time import subprocess import threading import queue import atexit def setup(): # make named pipe every file program should write logfilepipe='logpipe' os.mkfifo(logfilepipe) def cleanup(): # put named pipes here cleaned logfilepipe='logpipe' os.remove(logfilepipe) # run our cleanup code no matter - avoid leaving pipes laying around # if terminate ctrl-c atexit.register(cleanup) # example iterator supplies input program. have iterator # don't worry this. returns random input sample_data list # until maximum number of iterations reached. class myiter(): sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94] def __init__(self, numiterations=1000): self.numiterations=numiterations self.current = 0 def __iter__(self): return self def next(self): self.current += 1 if self.current > self.numiterations: raise stopiteration else: return random.choice(self.__class__.sample_data) # parse_func function - print out [tag] showing source. def parse_func(source,line): print "[%s] %s" % (source,line) # generic function sending standard input problem. # p - process handle returned subprocess def input_func(p, queue): # run command output redirected line in myiter(30): # limit testing purposes time.sleep(0.1) # sleep tiny bit p.stdin.write(str(line)+'\n') queue.put(('input', line)) p.stdin.close() p.wait() # once our process has ended, tell main thread quit queue.put(('quit', true)) # generic function reading output program. source can either # named pipe identified string, or subprocess.pipe stdout , stderr. def read_output(source, queue, tag=none): print "starting read output %r" % source if isinstance(source,str): # file or named pipe, open source=open(source, 'r') # open file string name line = source.readline() # enqueue , read lines until eof while line != '': queue.put((tag, line.rstrip())) line = source.readline() if __name__=='__main__': cmd='daemon.py' # set our fifos instead of using files - put file names setup() , cleanup() setup() logfilepipe='logpipe' # message queue handling output, whether it's stdout, stderr, or file output our command lq = queue.queue() # open subprocess command print "running command." p = subprocess.popen(['/path/to/'+cmd,logfilepipe], stdin=subprocess.pipe, stdout=subprocess.pipe, stderr=subprocess.pipe) # start threads handle input , output threading.thread(target=input_func, args=(p, lq)).start() threading.thread(target=read_output, args=(p.stdout, lq, 'output')).start() threading.thread(target=read_output, args=(p.stderr, lq, 'errors')).start() # open thread read other output files (e.g. log file) named pipes threading.thread(target=read_output, args=(logfilepipe, lq, 'log')).start() # combine results our threads want run=true while(run): (tag, line) = lq.get() if tag == 'quit': run=false else: parse_func(tag, line) my iterator returns random input value (some of junk cause errors). yours should drop-in replacement. program run until end of input , wait subprocess complete before enqueueing quit message main thread. parse_func super simple, printing out message , source, should able work something. function read output source designed work both pipes , strings - don't open pipes on main thread because block until input available. file readers (e.g. reading log files), it's better have child thread open file , block. however, spawn subprocess on main thread can pass handles stdin, stdout , stderr respective child threads.
based partially on this python implementation of multitail.
Comments
Post a Comment