pprocess

pprocess.py

70:ef09cf46f92f
2007-01-14 paulb [project @ 2007-01-14 22:53:09 by paulb] Updated release information.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk>     8      9 This software is free software; you can redistribute it and/or    10 modify it under the terms of the GNU General Public License as    11 published by the Free Software Foundation; either version 2 of    12 the License, or (at your option) any later version.    13     14 This software is distributed in the hope that it will be useful,    15 but WITHOUT ANY WARRANTY; without even the implied warranty of    16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the    17 GNU General Public License for more details.    18     19 You should have received a copy of the GNU General Public    20 License along with this library; see the file LICENCE.txt    21 If not, write to the Free Software Foundation, Inc.,    22 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA    23     24 --------    25     26 The recommended styles of programming using pprocess involve the "Thread-style    27 Processing" and "Convenient Message Exchanges" sections below, although    28 developers may wish to read the "Message Exchanges" section for more details of    29 the API concerned, and the "Fork-style Processing" section may be of interest to    30 those with experience of large scale parallel processing systems.    31     32 Thread-style Processing    33 -----------------------    34     35 To create new processes to run a function or any callable object, specify the    36 "callable" and any arguments as follows:    37     38 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    39     40 This returns a channel which can then be used to communicate with the created    41 process. Meanwhile, in the created process, the given callable will be invoked    42 with another channel as its first argument followed by the specified arguments:    43     44 def fn(channel, arg1, arg2, named1, named2):    45     # Read from and write to the channel.    46     # Return value is ignored.    47     ...    48     49 Fork-style Processing    50 ---------------------    51     52 To create new processes in a similar way to that employed when using os.fork    53 (ie. the fork system call on various operating systems), use the following    54 method:    55     56 channel = create()    57 if channel.pid == 0:    58     # This code is run by the created process.    59     # Read from and write to the channel to communicate with the    60     # creating/calling process.    61     # An explicit exit of the process may be desirable to prevent the process    62     # from running code which is intended for the creating/calling process.    63     ...    64 else:    65     # This code is run by the creating/calling process.    66     # Read from and write to the channel to communicate with the created    67     # process.    68     ...    69     70 Message Exchanges    71 -----------------    72     73 When creating many processes, each providing results for the consumption of the    74 main process, the collection of those results in an efficient fashion can be    75 problematic: if some processes take longer than others, and if we decide to read    76 from those processes when they are not ready instead of other processes which    77 are ready, the whole activity will take much longer than necessary.    78     79 One solution to the problem of knowing when to read from channels is to create    80 an Exchange object, optionally initialising it with a list of channels through    81 which data is expected to arrive:    82     83 exchange = Exchange()           # populate the exchange later    84 exchange = Exchange(channels)   # populate the exchange with channels    85     86 We can add channels to the exchange using the add method:    87     88 exchange.add(channel)    89     90 To test whether an exchange is active - that is, whether it is actually    91 monitoring any channels - we can use the active method which returns all    92 channels being monitored by the exchange:    93     94 channels = exchange.active()    95     96 We may then check the exchange to see whether any data is ready to be received;    97 for example:    98     99 for channel in exchange.ready():   100     # Read from and write to the channel.   101     ...   102    103 If we do not wish to wait indefinitely for a list of channels, we can set a   104 timeout value as an argument to the ready method (as a floating point number   105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated   106 in the select module's select function documentation).   107    108 Convenient Message Exchanges   109 ----------------------------   110    111 A convenient form of message exchanges can be adopted by defining a subclass of   112 the Exchange class and defining a particular method:   113    114 class MyExchange(Exchange):   115     def store_data(self, channel):   116         data = channel.receive()   117         # Do something with data here.   118    119 The exact operations performed on the received data might be as simple as   120 storing it on an instance attribute. To make use of the exchange, we would   121 instantiate it as usual:   122    123 exchange = MyExchange()         # populate the exchange later   124 exchange = MyExchange(limit=10) # set a limit for later population   125    126 The exchange can now be used in a simpler fashion than that shown above. We can   127 add channels as before using the add method, or we can choose to only add   128 channels if the specified limit of channels is not exceeded:   129    130 exchange.add(channel)           # add a channel as normal   131 exchange.add_wait(channel)      # add a channel, waiting if the limit would be   132                                 # exceeded   133    134 We can explicitly wait for "free space" for channels by calling the wait method:   135    136 exchange.wait()   137    138 Finally, when finishing the computation, we can choose to merely call the finish   139 method and have the remaining data processed automatically:   140    141 exchange.finish()   142    143 Clearly, this approach is less flexible but more convenient than the raw message   144 exchange API as described above. However, it permits much simpler and clearer   145 code.   146    147 Signals and Waiting   148 -------------------   149    150 When created/child processes terminate, one would typically want to be informed   151 of such conditions using a signal handler. Unfortunately, Python seems to have   152 issues with restartable reads from file descriptors when interrupted by signals:   153    154 http://mail.python.org/pipermail/python-dev/2002-September/028572.html   155 http://twistedmatrix.com/bugs/issue733   156    157 Select and Poll   158 ---------------   159    160 The exact combination of conditions indicating closed pipes remains relatively   161 obscure. Here is a message/thread describing them (in the context of another   162 topic):   163    164 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html   165    166 It would seem, from using sockets and from studying the asyncore module, that   167 sockets are more predictable than pipes.   168    169 Notes about poll implementations can be found here:   170    171 http://www.greenend.org.uk/rjk/2001/06/poll.html   172 """   173    174 __version__ = "0.2.3"   175    176 import os   177 import sys   178 import select   179 import socket   180    181 try:   182     import cPickle as pickle   183 except ImportError:   184     import pickle   185    186 class AcknowledgementError(Exception):   187     pass   188    189 class Channel:   190    191     "A communications channel."   192    193     def __init__(self, pid, read_pipe, write_pipe):   194    195         """   196         Initialise the channel with a process identifier 'pid', a 'read_pipe'   197         from which messages will be received, and a 'write_pipe' into which   198         messages will be sent.   199         """   200    201         self.pid = pid   202         self.read_pipe = read_pipe   203         self.write_pipe = write_pipe   204         self.closed = 0   205    206     def __del__(self):   207    208         # Since signals don't work well with I/O, we close pipes and wait for   209         # created processes upon finalisation.   210    211         self.close()   212    213     def close(self):   214    215         "Explicitly close the channel."   216    217         if not self.closed:   218             self.closed = 1   219             self.read_pipe.close()   220             self.write_pipe.close()   221             #self.wait(os.WNOHANG)   222    223     def wait(self, options=0):   224    225         "Wait for the created process, if any, to exit."   226    227         if self.pid != 0:   228             try:   229                 os.waitpid(self.pid, options)   230             except OSError:   231                 pass   232    233     def _send(self, obj):   234    235         "Send the given object 'obj' through the channel."   236    237         pickle.dump(obj, self.write_pipe)   238         self.write_pipe.flush()   239    240     def send(self, obj):   241    242         """   243         Send the given object 'obj' through the channel. Then wait for an   244         acknowledgement. (The acknowledgement makes the caller wait, thus   245         preventing processes from exiting and disrupting the communications   246         channel and losing data.)   247         """   248    249         self._send(obj)   250         if self._receive() != "OK":   251             raise AcknowledgementError, obj   252    253     def _receive(self):   254    255         "Receive an object through the channel, returning the object."   256    257         obj = pickle.load(self.read_pipe)   258         if isinstance(obj, Exception):   259             raise obj   260         else:   261             return obj   262    263     def receive(self):   264    265         """   266         Receive an object through the channel, returning the object. Send an   267         acknowledgement of receipt. (The acknowledgement makes the sender wait,   268         thus preventing processes from exiting and disrupting the communications   269         channel and losing data.)   270         """   271    272         try:   273             obj = self._receive()   274             return obj   275         finally:   276             self._send("OK")   277    278 class Exchange:   279    280     """   281     A communications exchange that can be used to detect channels which are   282     ready to communicate. Subclasses of this class can define the 'store_data'   283     method in order to enable the 'add_wait', 'wait' and 'finish' methods.   284     """   285    286     def __init__(self, channels=None, limit=None, autoclose=1):   287    288         """   289         Initialise the exchange with an optional list of 'channels'.   290    291         If the optional 'limit' is specified, restrictions on the addition of   292         new channels can be enforced and observed through the 'add_wait', 'wait'   293         and 'finish' methods. To make use of these methods, create a subclass of   294         this class and define a working 'store_data' method.   295    296         If the optional 'autoclose' parameter is set to a false value, channels   297         will not be closed automatically when they are removed from the exchange   298         - by default they are closed when removed.   299         """   300    301         self.limit = limit   302         self.autoclose = autoclose   303         self.readables = {}   304         self.removed = []   305         self.poller = select.poll()   306         for channel in channels or []:   307             self.add(channel)   308    309     def add(self, channel):   310    311         "Add the given 'channel' to the exchange."   312    313         self.readables[channel.read_pipe.fileno()] = channel   314         self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)   315    316     def active(self):   317    318         "Return a list of active channels."   319    320         return self.readables.values()   321    322     def ready(self, timeout=None):   323    324         """   325         Wait for a period of time specified by the optional 'timeout' (or until   326         communication is possible) and return a list of channels which are ready   327         to be read from.   328         """   329    330         fds = self.poller.poll(timeout)   331         readables = []   332         self.removed = []   333    334         for fd, status in fds:   335             channel = self.readables[fd]   336             removed = 0   337    338             # Remove ended/error channels.   339    340             if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):   341                 self.remove(channel)   342                 self.removed.append(channel)   343                 removed = 1   344    345             # Record readable channels.   346    347             if status & select.POLLIN:   348                 if not (removed and self.autoclose):   349                     readables.append(channel)   350    351         return readables   352    353     def remove(self, channel):   354    355         """   356         Remove the given 'channel' from the exchange.   357         """   358    359         del self.readables[channel.read_pipe.fileno()]   360         self.poller.unregister(channel.read_pipe.fileno())   361         if self.autoclose:   362             channel.close()   363             channel.wait()   364    365     # Enhanced exchange methods involving channel limits.   366    367     def add_wait(self, channel):   368    369         """   370         Add the given 'channel' to the exchange, waiting if the limit on active   371         channels would be exceeded by adding the channel.   372         """   373    374         self.wait()   375         self.add(channel)   376    377     def wait(self):   378    379         """   380         Test for the limit on channels, blocking and reading incoming data until   381         the number of channels is below the limit.   382         """   383    384         # If limited, block until channels have been closed.   385    386         while self.limit is not None and len(self.active()) >= self.limit:   387             self.store()   388    389     def finish(self):   390    391         """   392         Finish the use of the exchange by waiting for all channels to complete.   393         """   394    395         while self.active():   396             self.store()   397    398     def store(self):   399    400         "For each ready channel, process the incoming data."   401    402         for channel in self.ready():   403             self.store_data(channel)   404    405     def store_data(self, channel):   406    407         """   408         Store incoming data from the specified 'channel'. In subclasses of this   409         class, such data could be stored using instance attributes.   410         """   411    412         raise NotImplementedError, "store_data"   413    414 def create():   415    416     """   417     Create a new process, returning a communications channel to both the   418     creating process and the created process.   419     """   420    421     parent, child = socket.socketpair()   422     for s in [parent, child]:   423         s.setblocking(1)   424    425     pid = os.fork()   426     if pid == 0:   427         parent.close()   428         return Channel(pid, child.makefile("r"), child.makefile("w"))   429     else:   430         child.close()   431         return Channel(pid, parent.makefile("r"), parent.makefile("w"))   432    433 def start(callable, *args, **kwargs):   434    435     """   436     Create a new process which shall start running in the given 'callable'.   437     Return a communications channel to the creating process, and supply such a   438     channel to the created process as the 'channel' parameter in the given   439     'callable'. Additional arguments to the 'callable' can be given as   440     additional arguments to this function.   441     """   442    443     channel = create()   444     if channel.pid == 0:   445         try:   446             try:   447                 callable(channel, *args, **kwargs)   448             except:   449                 exc_type, exc_value, exc_traceback = sys.exc_info()   450                 channel.send(exc_value)   451         finally:   452             channel.close()   453             os._exit(0)   454     else:   455         return channel   456    457 def waitall():   458    459     "Wait for all created processes to terminate."   460    461     try:   462         while 1:   463             os.wait()   464     except OSError:   465         pass   466    467 # vim: tabstop=4 expandtab shiftwidth=4