pprocess

parallel.py

37:758590a37212
2005-09-29 paulb [project @ 2005-09-29 17:30:53 by paulb] Changed the indexing demonstration to accept various other arguments, with the first argument being the directory containing files to index. Made the code observe these new parameters, using them in conjunction with the new Indexer functionality. Changed the Indexer to process many files and to impose a delay after each file if specified.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Thread-style Processing     8 -----------------------     9     10 To create new processes to run a function or any callable object, specify the    11 "callable" and any arguments as follows:    12     13 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    14     15 This returns a channel which can then be used to communicate with the created    16 process. Meanwhile, in the created process, the given callable will be invoked    17 with another channel as its first argument followed by the specified arguments:    18     19 def fn(channel, arg1, arg2, named1, named2):    20     # Read from and write to the channel.    21     # Return value is ignored.    22     ...    23     24 Fork-style Processing    25 ---------------------    26     27 To create new processes in a similar way to that employed when using os.fork    28 (ie. the fork system call on various operating systems), use the following    29 method:    30     31 channel = create()    32 if channel.pid == 0:    33     # This code is run by the created process.    34     # Read from and write to the channel to communicate with the    35     # creating/calling process.    36     # An explicit exit of the process may be desirable to prevent the process    37     # from running code which is intended for the creating/calling process.    38     ...    39 else:    40     # This code is run by the creating/calling process.    41     # Read from and write to the channel to communicate with the created    42     # process.    43     ...    44     45 Message Exchanges    46 -----------------    47     48 When creating many processes, each providing results for the consumption of the    49 main process, the collection of those results in an efficient fashion can be    50 problematic: if some processes take longer than others, and if we decide to read    51 from those processes when they are not ready instead of other processes which    52 are ready, the whole activity will take much longer than necessary.    53     54 One solution to the problem of knowing when to read from channels is to create    55 an Exchange object, optionally initialising it with a list of channels through    56 which data is expected to arrive:    57     58 exchange = Exchange()           # populate the exchange later    59 exchange = Exchange(channels)   # populate the exchange with channels    60     61 We can add channels to the exchange using the add method:    62     63 exchange.add(channel)    64     65 To test whether an exchange is active - that is, whether it is actually    66 monitoring any channels - we can use the active method which returns all    67 channels being monitored by the exchange:    68     69 channels = exchange.active()    70     71 We may then check the exchange to see whether any data is ready to be received;    72 for example:    73     74 for channel in exchange.ready():    75     # Read from and write to the channel.    76     ...    77     78 If we do not wish to wait indefinitely for a list of channels, we can set a    79 timeout value as an argument to the ready method (as a floating point number    80 specifying the timeout in seconds, where 0 means a non-blocking poll as stated    81 in the select module's select function documentation).    82     83 Signals and Waiting    84 -------------------    85     86 When created/child processes terminate, one would typically want to be informed    87 of such conditions using a signal handler. Unfortunately, Python seems to have    88 issues with restartable reads from file descriptors when interrupted by signals:    89     90 http://mail.python.org/pipermail/python-dev/2002-September/028572.html    91 http://twistedmatrix.com/bugs/issue733    92     93 Select and Poll    94 ---------------    95     96 The exact combination of conditions indicating closed pipes remains relatively    97 obscure. Here is a message/thread describing them (in the context of another    98 topic):    99    100 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html   101    102 It would seem, from using sockets and from studying the asycore module, that   103 sockets are more predictable than pipes.   104 """   105    106 __version__ = "0.1"   107    108 import os   109 import sys   110 import select   111 import socket   112    113 try:   114     import cPickle as pickle   115 except ImportError:   116     import pickle   117    118 class AcknowledgementError(Exception):   119     pass   120    121 class Channel:   122    123     "A communications channel."   124    125     def __init__(self, pid, read_pipe, write_pipe):   126    127         """   128         Initialise the channel with a process identifier 'pid', a 'read_pipe'   129         from which messages will be received, and a 'write_pipe' into which   130         messages will be sent.   131         """   132    133         self.pid = pid   134         self.read_pipe = read_pipe   135         self.write_pipe = write_pipe   136         self.closed = 0   137    138     def __del__(self):   139    140         # Since signals don't work well with I/O, we close pipes and wait for   141         # created processes upon finalisation.   142    143         self.close()   144    145     def close(self):   146    147         "Explicitly close the channel."   148    149         if not self.closed:   150             self.closed = 1   151             self.read_pipe.close()   152             self.write_pipe.close()   153             #self.wait(os.WNOHANG)   154    155     def wait(self, options=0):   156    157         "Wait for the created process, if any, to exit."   158    159         if self.pid != 0:   160             try:   161                 os.waitpid(self.pid, options)   162             except OSError:   163                 pass   164    165     def _send(self, obj):   166    167         "Send the given object 'obj' through the channel."   168    169         pickle.dump(obj, self.write_pipe)   170         self.write_pipe.flush()   171    172     def send(self, obj):   173    174         """   175         Send the given object 'obj' through the channel. Then wait for an   176         acknowledgement. (The acknowledgement makes the caller wait, thus   177         preventing processes from exiting and disrupting the communications   178         channel and losing data.)   179         """   180    181         self._send(obj)   182         if self._receive() != "OK":   183             raise AcknowledgementError, obj   184    185     def _receive(self):   186    187         "Receive an object through the channel, returning the object."   188    189         obj = pickle.load(self.read_pipe)   190         if isinstance(obj, Exception):   191             raise obj   192         else:   193             return obj   194    195     def receive(self):   196    197         """   198         Receive an object through the channel, returning the object. Send an   199         acknowledgement of receipt. (The acknowledgement makes the sender wait,   200         thus preventing processes from exiting and disrupting the communications   201         channel and losing data.)   202         """   203    204         try:   205             obj = self._receive()   206             return obj   207         finally:   208             self._send("OK")   209    210 class Exchange:   211    212     """   213     A communications exchange that can be used to detect channels which are   214     ready to communicate.   215     """   216    217     def __init__(self, channels=None, autoclose=1):   218    219         """   220         Initialise the exchange with an optional list of 'channels'. If the   221         optional 'autoclose' parameter is set to a false value, channels will   222         not be closed automatically when they are removed from the exchange - by   223         default they are closed when removed.   224         """   225    226         self.autoclose = autoclose   227         self.readables = {}   228         self.poller = select.poll()   229         for channel in channels or []:   230             self.add(channel)   231    232     def add(self, channel):   233    234         "Add the given 'channel' to the exchange."   235    236         self.readables[channel.read_pipe.fileno()] = channel   237         self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)   238    239     def active(self):   240    241         "Return a list of active channels."   242    243         return self.readables.values()   244    245     def ready(self, timeout=None):   246    247         """   248         Wait for a period of time specified by the optional 'timeout' (or until   249         communication is possible) and return a list of channels which are ready   250         to be read from.   251         """   252    253         fds = self.poller.poll(timeout)   254         readables = []   255         for fd, status in fds:   256             channel = self.readables[fd]   257    258             # Remove ended/error channels.   259    260             if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):   261                 self.remove(channel)   262    263             # Record readable channels.   264    265             elif status & select.POLLIN:   266                 readables.append(channel)   267    268         return readables   269    270     def remove(self, channel):   271    272         """   273         Remove the given 'channel' from the exchange.   274         """   275    276         del self.readables[channel.read_pipe.fileno()]   277         self.poller.unregister(channel.read_pipe.fileno())   278         if self.autoclose:   279             channel.close()   280             channel.wait()   281    282 def create():   283    284     """   285     Create a new process, returning a communications channel to both the   286     creating process and the created process.   287     """   288    289     parent, child = socket.socketpair()   290     for s in [parent, child]:   291         s.setblocking(1)   292    293     pid = os.fork()   294     if pid == 0:   295         parent.close()   296         return Channel(pid, child.makefile("r"), child.makefile("w"))   297     else:   298         child.close()   299         return Channel(pid, parent.makefile("r"), parent.makefile("w"))   300    301 def start(callable, *args, **kwargs):   302    303     """   304     Create a new process which shall start running in the given 'callable'.   305     Return a communications channel to the creating process, and supply such a   306     channel to the created process as the 'channel' parameter in the given   307     'callable'. Additional arguments to the 'callable' can be given as   308     additional arguments to this function.   309     """   310    311     channel = create()   312     if channel.pid == 0:   313         try:   314             try:   315                 callable(channel, *args, **kwargs)   316             except:   317                 exc_type, exc_value, exc_traceback = sys.exc_info()   318                 channel.send(exc_value)   319         finally:   320             channel.close()   321             sys.exit(0)   322     else:   323         return channel   324    325 def waitall():   326    327     "Wait for all created processes to terminate."   328    329     try:   330         while 1:   331             os.wait()   332     except OSError:   333         pass   334    335 # vim: tabstop=4 expandtab shiftwidth=4