pprocess

parallel.py

13:7ca994eb4dc9
2005-09-27 paulb [project @ 2005-09-27 18:23:46 by paulb] Added waiting for created processes.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Thread-style Processing     8 -----------------------     9     10 To create new processes to run a function or any callable object, specify the    11 "callable" and any arguments as follows:    12     13 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    14     15 This returns a channel which can then be used to communicate with the created    16 process. Meanwhile, in the created process, the given callable will be invoked    17 with another channel as its first argument followed by the specified arguments:    18     19 def fn(channel, arg1, arg2, named1, named2):    20     # Read from and write to the channel.    21     # Return value is ignored.    22     ...    23     24 Fork-style Processing    25 ---------------------    26     27 To create new processes in a similar way to that employed when using os.fork    28 (ie. the fork system call on various operating systems), use the following    29 method:    30     31 channel = create()    32 if channel.pid == 0:    33     # This code is run by the created process.    34     # Read from and write to the channel to communicate with the    35     # creating/calling process.    36     # An explicit exit of the process may be desirable to prevent the process    37     # from running code which is intended for the creating/calling process.    38     ...    39 else:    40     # This code is run by the creating/calling process.    41     # Read from and write to the channel to communicate with the created    42     # process.    43     ...    44     45 Message Exchanges    46 -----------------    47     48 When creating many processes, each providing results for the consumption of the    49 main process, the collection of those results in an efficient fashion can be    50 problematic: if some processes take longer than others, and if we decide to read    51 from those processes when they are not ready instead of other processes which    52 are ready, the whole activity will take much longer than necessary.    53     54 One solution to the problem of knowing when to read from channels is to create    55 an Exchange object, initialising it with a list of channels through which data    56 is expected to arrive:    57     58 exchange = Exchange(channels)    59     60 We may then check the exchange to see whether any data is ready to be received;    61 for example:    62     63 for channel in exchange.ready():    64     # Read from and write to the channel.    65     ...    66     67 If we do not wish to wait indefinitely for a list of channels, we can set a    68 timeout value as an argument to the ready method (as a floating point number    69 specifying the timeout in seconds, where 0 means a non-blocking poll as stated    70 in the select module's select function documentation).    71     72 Signals and Waiting    73 -------------------    74     75 When created/child processes terminate, one would typically want to be informed    76 of such conditions using a signal handler. Unfortunately, Python seems to have    77 issues with restartable reads from file descriptors when interrupted by signals:    78     79 http://mail.python.org/pipermail/python-dev/2002-September/028572.html    80 http://twistedmatrix.com/bugs/issue733    81     82 Select and Poll    83 ---------------    84     85 The exact combination of conditions indicating closed pipes remains relatively    86 obscure. Here is a message/thread describing them (in the context of another    87 topic):    88     89 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html    90 """    91     92 import os    93 import sys    94 import select    95 #from signal import signal, SIGCHLD    96     97 try:    98     import cPickle as pickle    99 except ImportError:   100     import pickle   101    102 class Channel:   103    104     "A communications channel."   105    106     def __init__(self, pid, read_pipe, write_pipe):   107    108         """   109         Initialise the channel with a process identifier 'pid', a 'read_pipe'   110         from which messages will be received, and a 'write_pipe' into which   111         messages will be sent.   112         """   113    114         self.pid = pid   115         self.read_pipe = read_pipe   116         self.write_pipe = write_pipe   117         self.closed = 0   118    119     def __del__(self):   120    121         # NOTE: Hack until the signals vs. pipes behaviour is fixed.   122    123         if not self.closed:   124             self.close()   125    126     def close(self):   127    128         "Explicitly close the channel."   129    130         self.closed = 1   131         self.read_pipe.close()   132         self.write_pipe.close()   133         self.wait(os.WNOHANG)   134    135     def wait(self, options=0):   136    137         "Wait for the created process, if any, to exit."   138    139         if self.pid != 0:   140             try:   141                 os.waitpid(self.pid, options)   142             except OSError:   143                 pass   144    145     def send(self, obj):   146    147         "Send the given object 'obj' through the channel."   148    149         pickle.dump(obj, self.write_pipe)   150         self.write_pipe.flush()   151    152     def receive(self):   153    154         "Receive an object through the channel, returning the object."   155    156         obj = pickle.load(self.read_pipe)   157         if isinstance(obj, Exception):   158             raise obj   159         else:   160             return obj   161    162 class Exchange:   163    164     """   165     A communications exchange that can be used to detect channels which are   166     ready to communicate.   167     """   168    169     def __init__(self, channels):   170    171         "Initialise the exchange with the given 'channels'."   172    173         self.readables = {}   174         self.poller = select.poll()   175         for channel in channels:   176             self.readables[channel.read_pipe.fileno()] = channel   177             self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)   178    179     def active(self):   180    181         "Return the number of active channels."   182    183         return len(self.readables.keys())   184    185     def ready(self, timeout=None):   186    187         """   188         Wait for a period of time specified by the optional 'timeout' (or until   189         communication is possible) and return a list of channels which are ready   190         to be read from.   191         """   192    193         fds = self.poller.poll(timeout)   194         readables = []   195         for fd, status in fds:   196             channel = self.readables[fd]   197    198             # Record readable channels.   199    200             if status & select.POLLIN:   201                 readables.append(channel)   202    203             # Remove ended/error channels.   204    205             if status & (select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR):   206                 self.remove(channel)   207    208         return readables   209    210     def remove(self, channel):   211    212         """   213         Remove the given 'channel' from the exchange.   214         """   215    216         del self.readables[channel.read_pipe.fileno()]   217         self.poller.unregister(channel.read_pipe.fileno())   218    219 def create():   220    221     """   222     Create a new process, returning a communications channel to both the   223     creating process and the created process.   224     """   225    226     parent_read_fd, child_write_fd = os.pipe()   227     child_read_fd, parent_write_fd = os.pipe()   228    229     pid = os.fork()   230     if pid == 0:   231         os.close(parent_read_fd)   232         os.close(parent_write_fd)   233         return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w"))   234     else:   235         os.close(child_read_fd)   236         os.close(child_write_fd)   237         return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w"))   238    239 def start(callable, *args, **kwargs):   240    241     """   242     Create a new process which shall start running in the given 'callable'.   243     Return a communications channel to the creating process, and supply such a   244     channel to the created process as the 'channel' parameter in the given   245     'callable'. Additional arguments to the 'callable' can be given as   246     additional arguments to this function.   247     """   248    249     channel = create()   250     if channel.pid == 0:   251         try:   252             try:   253                 callable(channel, *args, **kwargs)   254             except:   255                 exc_type, exc_value, exc_traceback = sys.exc_info()   256                 channel.send(exc_value)   257         finally:   258             channel.close()   259             sys.exit(0)   260     else:   261         return channel   262    263 # Define and install a handler which waits for terminated child processes.   264    265 #def handler(number, frame):   266 #    os.wait()   267    268 #signal(SIGCHLD, handler)   269    270 # vim: tabstop=4 expandtab shiftwidth=4