asynchronous - Python: Feed and parse stream of data to and from external program with additional input and output files -


the problem: have poorly designed fortran program (i cannot change it, i'm stuck it) takes text input stdin , other input files, , writes text output results stdout , other output files. size of input , out quite large, , avoid writing hard drive (slow operation). have written function iterates on lines of several input files, , have parsers multiple output. don't know if program first read input , starts output, or starts outputting while reading input.

the goal: have function feeds external program wants, , parses output comes program, without writing data text files on hard drive.

research: naive way using files is:

from subprocess import pipe, popen  def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):      filename, file_iter in input_files.iteritems():         open(filename ,'w') f:             line in file_iter:                 f.write(line + '\n')       p_sub = popen(         shlex.split(cmd),         stdin = pipe,         stdout = open('stdout.txt', 'w'),         stderr = open('stderr.txt', 'w'),         bufsize=1     )     line in stdin_iter:         p_sub.stdin.write(line + '\n')      p_sub.stdin.close()     p_sub.wait()      data = {}     filename, parse_func in output_files.iteritems():         # stdout.txt , stderr.txt included here         open(filename,'r') f:             data[filename] = parse_func(                     iter(f.readline, b'')             )     return data 

i have tried , subprocess module execute external program together. additional input/output files handled named pipes , multiprocessing. want feed stdin iterator (which returns lines input), save stderr in list, , parse stdout comes external program. input , output can quite large, using communicate not feasible.

i have parser on format:

def parser(iterator):     line in iterator:         #         if condition:             break     some_other_function(iterator)     return data 

i looked @ solution using select choose appropriate stream, don't know how make work stdout parser , how feed stdin.

i asyncio module, can see have same problem parsing of stout.

you should use named pipes input , output fortran program avoid writing disk. then, in consumer, can use threads read each of program's output sources , add information queue in-order processing.

to model this, created python app daemon.py reads standard input , returns square root until eof. logs input log file specified command-line argument , prints square root stdout , errors stderr. think simulates program (of course number of output files one, can scaled). can view source code test application here. note explicit call stdout.flush(). default, standard output print buffered, means output @ end , messages not arrive in order. hope fortran application not buffer output. believe sample application not run on windows, due unix-only use of select, shouldn't matter in case.

i have consumer application starts daemon application subprocess, stdin, stdout , stderr redirected subprocess.pipes. each of these pipes given different thread, 1 give input, , 3 handle log file, errors , standard output respectively. add messages shared queue main thread reads , sends parser.

this consumer's code:

import os, random, time import subprocess import threading import queue import atexit  def setup():     # make named pipe every file program should write     logfilepipe='logpipe'     os.mkfifo(logfilepipe)  def cleanup():     # put named pipes here cleaned     logfilepipe='logpipe'     os.remove(logfilepipe)  # run our cleanup code no matter - avoid leaving pipes laying around # if terminate ctrl-c atexit.register(cleanup)  # example iterator supplies input program. have iterator  # don't worry this. returns random input sample_data list # until maximum number of iterations reached. class myiter():     sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]     def __init__(self, numiterations=1000):         self.numiterations=numiterations         self.current = 0      def __iter__(self):         return self      def next(self):         self.current += 1         if self.current > self.numiterations:             raise stopiteration         else:             return random.choice(self.__class__.sample_data)  # parse_func function - print out [tag] showing source. def parse_func(source,line):     print "[%s] %s" % (source,line)  # generic function sending standard input problem. # p - process handle returned subprocess def input_func(p, queue):     # run command output redirected     line in myiter(30): # limit testing purposes         time.sleep(0.1) # sleep tiny bit         p.stdin.write(str(line)+'\n')         queue.put(('input', line))     p.stdin.close()     p.wait()      # once our process has ended, tell main thread quit     queue.put(('quit', true))  # generic function reading output program. source can either # named pipe identified string, or subprocess.pipe stdout , stderr. def read_output(source, queue, tag=none):     print "starting read output %r" % source     if isinstance(source,str):         # file or named pipe, open         source=open(source, 'r') # open file string name     line = source.readline()     # enqueue , read lines until eof     while line != '':         queue.put((tag, line.rstrip()))         line = source.readline()  if __name__=='__main__':     cmd='daemon.py'      # set our fifos instead of using files - put file names setup() , cleanup()     setup()      logfilepipe='logpipe'      # message queue handling output, whether it's stdout, stderr, or file output our command     lq = queue.queue()      # open subprocess command     print "running command."     p = subprocess.popen(['/path/to/'+cmd,logfilepipe],                                     stdin=subprocess.pipe, stdout=subprocess.pipe, stderr=subprocess.pipe)      # start threads handle input , output     threading.thread(target=input_func, args=(p, lq)).start()     threading.thread(target=read_output, args=(p.stdout, lq, 'output')).start()     threading.thread(target=read_output, args=(p.stderr, lq, 'errors')).start()      # open thread read other output files (e.g. log file) named pipes     threading.thread(target=read_output, args=(logfilepipe, lq, 'log')).start()      # combine results our threads want     run=true     while(run):         (tag, line) = lq.get()         if tag == 'quit':             run=false         else:             parse_func(tag, line) 

my iterator returns random input value (some of junk cause errors). yours should drop-in replacement. program run until end of input , wait subprocess complete before enqueueing quit message main thread. parse_func super simple, printing out message , source, should able work something. function read output source designed work both pipes , strings - don't open pipes on main thread because block until input available. file readers (e.g. reading log files), it's better have child thread open file , block. however, spawn subprocess on main thread can pass handles stdin, stdout , stderr respective child threads.

based partially on this python implementation of multitail.


Comments

Popular posts from this blog

yii2 - Yii 2 Running a Cron in the basic template -

asp.net - 'System.Web.HttpContext' does not contain a definition for 'GetOwinContext' Mystery -

mercurial graft feature, can it copy? -