pprocess

pprocess.py

65:06a385740b93
2007-01-14 paulb [project @ 2007-01-14 21:56:31 by paulb] Converted the example to use the newer add_wait, wait and finish methods on an Exchange subclass.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Copyright (C) 2005, 2006 Paul Boddie <paul@boddie.org.uk>     8      9 This software is free software; you can redistribute it and/or    10 modify it under the terms of the GNU General Public License as    11 published by the Free Software Foundation; either version 2 of    12 the License, or (at your option) any later version.    13     14 This software is distributed in the hope that it will be useful,    15 but WITHOUT ANY WARRANTY; without even the implied warranty of    16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the    17 GNU General Public License for more details.    18     19 You should have received a copy of the GNU General Public    20 License along with this library; see the file LICENCE.txt    21 If not, write to the Free Software Foundation, Inc.,    22 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA    23     24 --------    25     26 Thread-style Processing    27 -----------------------    28     29 To create new processes to run a function or any callable object, specify the    30 "callable" and any arguments as follows:    31     32 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    33     34 This returns a channel which can then be used to communicate with the created    35 process. Meanwhile, in the created process, the given callable will be invoked    36 with another channel as its first argument followed by the specified arguments:    37     38 def fn(channel, arg1, arg2, named1, named2):    39     # Read from and write to the channel.    40     # Return value is ignored.    41     ...    42     43 Fork-style Processing    44 ---------------------    45     46 To create new processes in a similar way to that employed when using os.fork    47 (ie. the fork system call on various operating systems), use the following    48 method:    49     50 channel = create()    51 if channel.pid == 0:    52     # This code is run by the created process.    53     # Read from and write to the channel to communicate with the    54     # creating/calling process.    55     # An explicit exit of the process may be desirable to prevent the process    56     # from running code which is intended for the creating/calling process.    57     ...    58 else:    59     # This code is run by the creating/calling process.    60     # Read from and write to the channel to communicate with the created    61     # process.    62     ...    63     64 Message Exchanges    65 -----------------    66     67 When creating many processes, each providing results for the consumption of the    68 main process, the collection of those results in an efficient fashion can be    69 problematic: if some processes take longer than others, and if we decide to read    70 from those processes when they are not ready instead of other processes which    71 are ready, the whole activity will take much longer than necessary.    72     73 One solution to the problem of knowing when to read from channels is to create    74 an Exchange object, optionally initialising it with a list of channels through    75 which data is expected to arrive:    76     77 exchange = Exchange()           # populate the exchange later    78 exchange = Exchange(channels)   # populate the exchange with channels    79     80 We can add channels to the exchange using the add method:    81     82 exchange.add(channel)    83     84 To test whether an exchange is active - that is, whether it is actually    85 monitoring any channels - we can use the active method which returns all    86 channels being monitored by the exchange:    87     88 channels = exchange.active()    89     90 We may then check the exchange to see whether any data is ready to be received;    91 for example:    92     93 for channel in exchange.ready():    94     # Read from and write to the channel.    95     ...    96     97 If we do not wish to wait indefinitely for a list of channels, we can set a    98 timeout value as an argument to the ready method (as a floating point number    99 specifying the timeout in seconds, where 0 means a non-blocking poll as stated   100 in the select module's select function documentation).   101    102 Signals and Waiting   103 -------------------   104    105 When created/child processes terminate, one would typically want to be informed   106 of such conditions using a signal handler. Unfortunately, Python seems to have   107 issues with restartable reads from file descriptors when interrupted by signals:   108    109 http://mail.python.org/pipermail/python-dev/2002-September/028572.html   110 http://twistedmatrix.com/bugs/issue733   111    112 Select and Poll   113 ---------------   114    115 The exact combination of conditions indicating closed pipes remains relatively   116 obscure. Here is a message/thread describing them (in the context of another   117 topic):   118    119 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html   120    121 It would seem, from using sockets and from studying the asyncore module, that   122 sockets are more predictable than pipes.   123    124 Notes about poll implementations can be found here:   125    126 http://www.greenend.org.uk/rjk/2001/06/poll.html   127 """   128    129 __version__ = "0.2.2"   130    131 import os   132 import sys   133 import select   134 import socket   135    136 try:   137     import cPickle as pickle   138 except ImportError:   139     import pickle   140    141 class AcknowledgementError(Exception):   142     pass   143    144 class Channel:   145    146     "A communications channel."   147    148     def __init__(self, pid, read_pipe, write_pipe):   149    150         """   151         Initialise the channel with a process identifier 'pid', a 'read_pipe'   152         from which messages will be received, and a 'write_pipe' into which   153         messages will be sent.   154         """   155    156         self.pid = pid   157         self.read_pipe = read_pipe   158         self.write_pipe = write_pipe   159         self.closed = 0   160    161     def __del__(self):   162    163         # Since signals don't work well with I/O, we close pipes and wait for   164         # created processes upon finalisation.   165    166         self.close()   167    168     def close(self):   169    170         "Explicitly close the channel."   171    172         if not self.closed:   173             self.closed = 1   174             self.read_pipe.close()   175             self.write_pipe.close()   176             #self.wait(os.WNOHANG)   177    178     def wait(self, options=0):   179    180         "Wait for the created process, if any, to exit."   181    182         if self.pid != 0:   183             try:   184                 os.waitpid(self.pid, options)   185             except OSError:   186                 pass   187    188     def _send(self, obj):   189    190         "Send the given object 'obj' through the channel."   191    192         pickle.dump(obj, self.write_pipe)   193         self.write_pipe.flush()   194    195     def send(self, obj):   196    197         """   198         Send the given object 'obj' through the channel. Then wait for an   199         acknowledgement. (The acknowledgement makes the caller wait, thus   200         preventing processes from exiting and disrupting the communications   201         channel and losing data.)   202         """   203    204         self._send(obj)   205         if self._receive() != "OK":   206             raise AcknowledgementError, obj   207    208     def _receive(self):   209    210         "Receive an object through the channel, returning the object."   211    212         obj = pickle.load(self.read_pipe)   213         if isinstance(obj, Exception):   214             raise obj   215         else:   216             return obj   217    218     def receive(self):   219    220         """   221         Receive an object through the channel, returning the object. Send an   222         acknowledgement of receipt. (The acknowledgement makes the sender wait,   223         thus preventing processes from exiting and disrupting the communications   224         channel and losing data.)   225         """   226    227         try:   228             obj = self._receive()   229             return obj   230         finally:   231             self._send("OK")   232    233 class Exchange:   234    235     """   236     A communications exchange that can be used to detect channels which are   237     ready to communicate.   238     """   239    240     def __init__(self, channels=None, autoclose=1):   241    242         """   243         Initialise the exchange with an optional list of 'channels'. If the   244         optional 'autoclose' parameter is set to a false value, channels will   245         not be closed automatically when they are removed from the exchange - by   246         default they are closed when removed.   247         """   248    249         self.autoclose = autoclose   250         self.readables = {}   251         self.removed = []   252         self.poller = select.poll()   253         for channel in channels or []:   254             self.add(channel)   255    256     def add(self, channel):   257    258         "Add the given 'channel' to the exchange."   259    260         self.readables[channel.read_pipe.fileno()] = channel   261         self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)   262    263     def active(self):   264    265         "Return a list of active channels."   266    267         return self.readables.values()   268    269     def ready(self, timeout=None):   270    271         """   272         Wait for a period of time specified by the optional 'timeout' (or until   273         communication is possible) and return a list of channels which are ready   274         to be read from.   275         """   276    277         fds = self.poller.poll(timeout)   278         readables = []   279         self.removed = []   280    281         for fd, status in fds:   282             channel = self.readables[fd]   283             removed = 0   284    285             # Remove ended/error channels.   286    287             if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):   288                 self.remove(channel)   289                 self.removed.append(channel)   290                 removed = 1   291    292             # Record readable channels.   293    294             if status & select.POLLIN:   295                 if not (removed and self.autoclose):   296                     readables.append(channel)   297    298         return readables   299    300     def remove(self, channel):   301    302         """   303         Remove the given 'channel' from the exchange.   304         """   305    306         del self.readables[channel.read_pipe.fileno()]   307         self.poller.unregister(channel.read_pipe.fileno())   308         if self.autoclose:   309             channel.close()   310             channel.wait()   311    312 def create():   313    314     """   315     Create a new process, returning a communications channel to both the   316     creating process and the created process.   317     """   318    319     parent, child = socket.socketpair()   320     for s in [parent, child]:   321         s.setblocking(1)   322    323     pid = os.fork()   324     if pid == 0:   325         parent.close()   326         return Channel(pid, child.makefile("r"), child.makefile("w"))   327     else:   328         child.close()   329         return Channel(pid, parent.makefile("r"), parent.makefile("w"))   330    331 def start(callable, *args, **kwargs):   332    333     """   334     Create a new process which shall start running in the given 'callable'.   335     Return a communications channel to the creating process, and supply such a   336     channel to the created process as the 'channel' parameter in the given   337     'callable'. Additional arguments to the 'callable' can be given as   338     additional arguments to this function.   339     """   340    341     channel = create()   342     if channel.pid == 0:   343         try:   344             try:   345                 callable(channel, *args, **kwargs)   346             except:   347                 exc_type, exc_value, exc_traceback = sys.exc_info()   348                 channel.send(exc_value)   349         finally:   350             channel.close()   351             os._exit(0)   352     else:   353         return channel   354    355 def waitall():   356    357     "Wait for all created processes to terminate."   358    359     try:   360         while 1:   361             os.wait()   362     except OSError:   363         pass   364    365 # vim: tabstop=4 expandtab shiftwidth=4