from threading import Thread from queue import SimpleQueue from sys import stderr,stdout __license__ = """ Copyright © 2019, by Phil D. Howard - all other rights reserved Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. The author may be contacted by decoding the number 11054987560151472272755686915985840251291393453694611309 (provu igi la numeron al duuma) """ # type groups to check for sbb = (str,bytes,bytearray) lt = (list,tuple) #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # class cmd_pool #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# class cmd_pool: """Class to implement an asynchronous command execution pool that limits the number of commands or command pipelines that can run concurrently. This class also illustrates a way to manage processes using threads. The available methods in this class are: obj = cmd_pool() Create a cmd_pool instance. obj.cmd([solocommand,strings,...]) Submit one simple non-pipeline command to be eventually run. obj.cmd([[command1,strings,...],[command2,strings,...],...]) Submit one command pipeline to be eventually run. obj.join() Join with the cmd_pool to wait for completion of all commands. del obj Release all memory used by the object instance and remove this reference. """ #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # method __init__ #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def __init__(self,**kwargs): """Create a cmd_pool instance. Every cmd_pool instance has a queue and an agent thread running. valid keywords with defaults: max=1 file=None """ verbose = kwargd.pop('verbose',0) # is verbose flag def_prt = None if verbose: def_prt = stderr self.prt = kwargs.pop('file',def_prt) # is open file to print event messages to self.max = kwargs.pop('max',1) # is maximum number of commands/processes to run concurrently self.name = kwargs.pop('name','') # is arbitrary name of this cmd_pool instance self.stdin = kwargs.pop('stdin',None) # every command gets this stdin or the default self.stdout = kwargs.pop('stdout',None) # every command gets this stdout or the default self.stderr = kwargs.pop('stderr',None) # every command gets this stderr or the default self.callstart = kwargs.pop('callstart',None) # call this function(cmd) in the command thread when any command is about to start self.callend = kwargs.pop('callend', None) # call this function(cmd,status) in the command thread when any command has ended self.queue = SimpleQueue() # create queue for agent thread to get messages from self.thread = Thread(target=_agent) # create agent thread run by _agent method self.thread.start() # start the agent thread at initialization _print(f'cmd_pool: module: {hex(id(cmd_pool))} instance: {hex(id(self))}thread: {hex(id(self.thread))} queue: {hex(id(self.queue))}') return #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # thread agent # method _agent #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def _agent(self): """The agent thread manages the command pool and returns when all done. """ clist = [] count = 0 finis = False _print('started agent in cmd_pool {self.name} for up to {self.max} command processes') while True: _print(f'agent: ({self!r} in {self.thread!r}) looping around to get next message from queue: {self.queue!r}') msg = self.queue.get() _print(f'got message from queue: {msg!r}') if not msg: raise TypeError(f'cmd_pool{self.name}._agent: got bad object from queue: {msg!r}') if not isinstance(msg,list): raise TypeError(f'cmd_pool{self.name}._agent: got bad object from queue, not list: {msg!r}') if len(msg)<2: raise TypeError(f'cmd_pool{self.name}._agent: got bad object from queue, len < 2: {msg!r}') # if there is a saved command and available slot to do it then launch a thread to run it. if clist and count < self.max: Thread(target=_cmd,args=(clist.pop(0),sqnum)).start() count[0] += len(cmd) sqnum += 1 _print(f'ready to process message action {msg[0]!r}') if msg[0] == 'cmd': # submitted command clist.append(m[1]) elif msg[0] == 'end': # running command ends count -= m[1] elif msg[0] == 'finis': # agent is to quit finis = True else: raise RunTimeError(f'cmd_pool{self.name}._agent: unknown message {msg[0]!r}') _print(f'finished processing message action {msg[0]!r}') if finis and not clist: break _print('ended agent') # the agent is done return #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # thread per command # method _cmd # pupose run processes to execute a command or pipeline #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def _cmd(self,cmdline,seq): """The cmd thread starts a command or command pipeline then waits for it to end to notify the agent. """ line = '' if len(cmdline)==1 else ' line' opts = {} pcnt = len(cmdline) proc = [] cmdstr = ' | '.join(' '.join(for y in cmd)for cmd in cmdline) last = ' '.join(for y in cmdline[-1]) _print(f'resdy to start command{line} {cmdstr!r}') # all but last command pipe stdout to the next command stdin opts['stdout']=PIPE for cmd in cmdline[:-1]: _print(f'including command',' '.join(repr(a) for a in cmd)) proc.append(Popen(cmd,**opts)) _print(f'included command',' '.join(repr(a) for a in cmd)) opts['stdin']=p[-1].stdout # last command stdout goes to /dev/null cmd = cmdline[-1] with open('/dev/null'.'w') as opts['stdout']: _print(f'including command',' '.join(repr(a) for a in cmd)) proc.append(Popen(cmd,**opts)) _print(f'included command',' '.join(repr(a) for a in cmd)) assert pcnt == len(proc) # wait for all command processes to end then tell agent how many ended _print(f'waiting for command{line} {cmdstr!r}') assert pcnt == len([x.wait() for x in proc]) ended = self.queue.put(['end',pcnt)]) _print(f'finished command{line} {cmdstr!r}') return #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # method _print # purpose print an event message to the designated output file, or None #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def _print(*args,**kwargs): """Print a message like an event messsage. """ if 'file' not in kwargs: kwargs['file'] = self.prt if 'flush' not in kwargs: kwargs['flush'] = True if kwargs['file']: return print(*args,**kwargs) return #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # thread main # method cmd # purpose submit a command to eventually be run # # a single command can be given 1 of 2 ways: # 1. a list or tuple of strings # 2, a list or tuple containing 1 list or tuple of strings, bytes, or bytearrays # a pipeline of commands can be given only 1 way: # 1. a list or tuple containing many lists and/o tuples (or a mix) of strings, bytes, or bytearrays #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def cmd(self,cmd_line,**kwargs): """Submit a command or pipeline of commands to eventually be run asynchronously. """ # validate the type of what gets passed to this method. if not cmd_line: raise TypeError(f'cmd_pool.cmd(): argument 1 is empty or otherwise untrue: {cmd_line!r}') if not isinstance(cmd_line,lt): raise TypeError(f'cmd_pool.cmd(): argument 1 is not a list or tuple: {cmd_line!r}') if isinstance(cmd_line[0],sbb): if not all(isinstance(x,sbb)for x in cmd_line): raise TypeError(f'cmd_pool.cmd(): argument is list of not all str/bytes/bytearray: {cmd_line!r}') cmd_line = [cmd_line] elif isinstance(cmd_line[0],lt): if not all(isinstance(x,lt)for x in cmd_line): raise TypeError(f'cmd_pool.cmd(): argument is list of not all list/tuple: {cmd_line!r}') if all(all(isinstance(x,sbb)for x in y) for y in cmd_line): raise TypeError(f'cmd_pool.cmd(): argument is list/tuple of list/tuple of not all str/bytes/bytearray:: {cmd_line!r}') else: raise TypeError(f'cmd_pool.cmd(): argument 0 is list of not all list/tuple/str/bytes/bytearray: {cmd_line!r}') return self.queue.put(['cmd',deepcopy(cmd_line),kwargs]) #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # thread main # method join # purpose to wait out the agent thread #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def join(self): """Wait for all commands to end. """ self.queue.put(['finis','finis']) return self.thread.join() #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # function _main #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# def _main(args): """Spread hate and fear on the internet. """ return 'this is a module, not a commamd, nothing to see here, be on your way' #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # section __main__ #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# if __name__ == '__main__': try: result=_main(argv) except BrokenPipeError: result = 141 except KeyboardInterrupt: print(flush=True) result = 98 if result is None or result is True: result = 0 elif result is False: result = 1 exit(result) #-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------#-------# # EOF