pprocess

pprocess.py

51:3af372b68f5d
2006-10-01 paulb [project @ 2006-10-01 00:57:19 by paulb] Updated copyright and licensing information.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Copyright (C) 2005 Paul Boddie <paul@boddie.org.uk>     8      9 This software is free software; you can redistribute it and/or    10 modify it under the terms of the GNU General Public License as    11 published by the Free Software Foundation; either version 2 of    12 the License, or (at your option) any later version.    13     14 This software is distributed in the hope that it will be useful,    15 but WITHOUT ANY WARRANTY; without even the implied warranty of    16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the    17 GNU General Public License for more details.    18     19 You should have received a copy of the GNU General Public    20 License along with this library; see the file LICENCE.txt    21 If not, write to the Free Software Foundation, Inc.,    22 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA    23     24 --------    25     26 Thread-style Processing    27 -----------------------    28     29 To create new processes to run a function or any callable object, specify the    30 "callable" and any arguments as follows:    31     32 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    33     34 This returns a channel which can then be used to communicate with the created    35 process. Meanwhile, in the created process, the given callable will be invoked    36 with another channel as its first argument followed by the specified arguments:    37     38 def fn(channel, arg1, arg2, named1, named2):    39     # Read from and write to the channel.    40     # Return value is ignored.    41     ...    42     43 Fork-style Processing    44 ---------------------    45     46 To create new processes in a similar way to that employed when using os.fork    47 (ie. the fork system call on various operating systems), use the following    48 method:    49     50 channel = create()    51 if channel.pid == 0:    52     # This code is run by the created process.    53     # Read from and write to the channel to communicate with the    54     # creating/calling process.    55     # An explicit exit of the process may be desirable to prevent the process    56     # from running code which is intended for the creating/calling process.    57     ...    58 else:    59     # This code is run by the creating/calling process.    60     # Read from and write to the channel to communicate with the created    61     # process.    62     ...    63     64 Message Exchanges    65 -----------------    66     67 When creating many processes, each providing results for the consumption of the    68 main process, the collection of those results in an efficient fashion can be    69 problematic: if some processes take longer than others, and if we decide to read    70 from those processes when they are not ready instead of other processes which    71 are ready, the whole activity will take much longer than necessary.    72     73 One solution to the problem of knowing when to read from channels is to create    74 an Exchange object, optionally initialising it with a list of channels through    75 which data is expected to arrive:    76     77 exchange = Exchange()           # populate the exchange later    78 exchange = Exchange(channels)   # populate the exchange with channels    79     80 We can add channels to the exchange using the add method:    81     82 exchange.add(channel)    83     84 To test whether an exchange is active - that is, whether it is actually    85 monitoring any channels - we can use the active method which returns all    86 channels being monitored by the exchange:    87     88 channels = exchange.active()    89     90 We may then check the exchange to see whether any data is ready to be received;    91 for example:    92     93 for channel in exchange.ready():    94     # Read from and write to the channel.    95     ...    96     97 If we do not wish to wait indefinitely for a list of channels, we can set a    98 timeout value as an argument to the ready method (as a floating point number    99 specifying the timeout in seconds, where 0 means a non-blocking poll as stated   100 in the select module's select function documentation).   101    102 Signals and Waiting   103 -------------------   104    105 When created/child processes terminate, one would typically want to be informed   106 of such conditions using a signal handler. Unfortunately, Python seems to have   107 issues with restartable reads from file descriptors when interrupted by signals:   108    109 http://mail.python.org/pipermail/python-dev/2002-September/028572.html   110 http://twistedmatrix.com/bugs/issue733   111    112 Select and Poll   113 ---------------   114    115 The exact combination of conditions indicating closed pipes remains relatively   116 obscure. Here is a message/thread describing them (in the context of another   117 topic):   118    119 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html   120    121 It would seem, from using sockets and from studying the asyncore module, that   122 sockets are more predictable than pipes.   123 """   124    125 __version__ = "0.2"   126    127 import os   128 import sys   129 import select   130 import socket   131    132 try:   133     import cPickle as pickle   134 except ImportError:   135     import pickle   136    137 class AcknowledgementError(Exception):   138     pass   139    140 class Channel:   141    142     "A communications channel."   143    144     def __init__(self, pid, read_pipe, write_pipe):   145    146         """   147         Initialise the channel with a process identifier 'pid', a 'read_pipe'   148         from which messages will be received, and a 'write_pipe' into which   149         messages will be sent.   150         """   151    152         self.pid = pid   153         self.read_pipe = read_pipe   154         self.write_pipe = write_pipe   155         self.closed = 0   156    157     def __del__(self):   158    159         # Since signals don't work well with I/O, we close pipes and wait for   160         # created processes upon finalisation.   161    162         self.close()   163    164     def close(self):   165    166         "Explicitly close the channel."   167    168         if not self.closed:   169             self.closed = 1   170             self.read_pipe.close()   171             self.write_pipe.close()   172             #self.wait(os.WNOHANG)   173    174     def wait(self, options=0):   175    176         "Wait for the created process, if any, to exit."   177    178         if self.pid != 0:   179             try:   180                 os.waitpid(self.pid, options)   181             except OSError:   182                 pass   183    184     def _send(self, obj):   185    186         "Send the given object 'obj' through the channel."   187    188         pickle.dump(obj, self.write_pipe)   189         self.write_pipe.flush()   190    191     def send(self, obj):   192    193         """   194         Send the given object 'obj' through the channel. Then wait for an   195         acknowledgement. (The acknowledgement makes the caller wait, thus   196         preventing processes from exiting and disrupting the communications   197         channel and losing data.)   198         """   199    200         self._send(obj)   201         if self._receive() != "OK":   202             raise AcknowledgementError, obj   203    204     def _receive(self):   205    206         "Receive an object through the channel, returning the object."   207    208         obj = pickle.load(self.read_pipe)   209         if isinstance(obj, Exception):   210             raise obj   211         else:   212             return obj   213    214     def receive(self):   215    216         """   217         Receive an object through the channel, returning the object. Send an   218         acknowledgement of receipt. (The acknowledgement makes the sender wait,   219         thus preventing processes from exiting and disrupting the communications   220         channel and losing data.)   221         """   222    223         try:   224             obj = self._receive()   225             return obj   226         finally:   227             self._send("OK")   228    229 class Exchange:   230    231     """   232     A communications exchange that can be used to detect channels which are   233     ready to communicate.   234     """   235    236     def __init__(self, channels=None, autoclose=1):   237    238         """   239         Initialise the exchange with an optional list of 'channels'. If the   240         optional 'autoclose' parameter is set to a false value, channels will   241         not be closed automatically when they are removed from the exchange - by   242         default they are closed when removed.   243         """   244    245         self.autoclose = autoclose   246         self.readables = {}   247         self.poller = select.poll()   248         for channel in channels or []:   249             self.add(channel)   250    251     def add(self, channel):   252    253         "Add the given 'channel' to the exchange."   254    255         self.readables[channel.read_pipe.fileno()] = channel   256         self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)   257    258     def active(self):   259    260         "Return a list of active channels."   261    262         return self.readables.values()   263    264     def ready(self, timeout=None):   265    266         """   267         Wait for a period of time specified by the optional 'timeout' (or until   268         communication is possible) and return a list of channels which are ready   269         to be read from.   270         """   271    272         fds = self.poller.poll(timeout)   273         readables = []   274         for fd, status in fds:   275             channel = self.readables[fd]   276    277             # Remove ended/error channels.   278    279             if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):   280                 self.remove(channel)   281    282             # Record readable channels.   283    284             elif status & select.POLLIN:   285                 readables.append(channel)   286    287         return readables   288    289     def remove(self, channel):   290    291         """   292         Remove the given 'channel' from the exchange.   293         """   294    295         del self.readables[channel.read_pipe.fileno()]   296         self.poller.unregister(channel.read_pipe.fileno())   297         if self.autoclose:   298             channel.close()   299             channel.wait()   300    301 def create():   302    303     """   304     Create a new process, returning a communications channel to both the   305     creating process and the created process.   306     """   307    308     parent, child = socket.socketpair()   309     for s in [parent, child]:   310         s.setblocking(1)   311    312     pid = os.fork()   313     if pid == 0:   314         parent.close()   315         return Channel(pid, child.makefile("r"), child.makefile("w"))   316     else:   317         child.close()   318         return Channel(pid, parent.makefile("r"), parent.makefile("w"))   319    320 def start(callable, *args, **kwargs):   321    322     """   323     Create a new process which shall start running in the given 'callable'.   324     Return a communications channel to the creating process, and supply such a   325     channel to the created process as the 'channel' parameter in the given   326     'callable'. Additional arguments to the 'callable' can be given as   327     additional arguments to this function.   328     """   329    330     channel = create()   331     if channel.pid == 0:   332         try:   333             try:   334                 callable(channel, *args, **kwargs)   335             except:   336                 exc_type, exc_value, exc_traceback = sys.exc_info()   337                 channel.send(exc_value)   338         finally:   339             channel.close()   340             sys.exit(0)   341     else:   342         return channel   343    344 def waitall():   345    346     "Wait for all created processes to terminate."   347    348     try:   349         while 1:   350             os.wait()   351     except OSError:   352         pass   353    354 # vim: tabstop=4 expandtab shiftwidth=4