pprocess

pprocess.py

56:fe4ac9803e8a
2006-11-19 paulb [project @ 2006-11-19 00:10:24 by paulb] Fixed process accounting.
     1 #!/usr/bin/env python     2      3 """     4 A simple parallel processing API for Python, inspired somewhat by the thread     5 module, slightly less by pypar, and slightly less still by pypvm.     6      7 Copyright (C) 2005 Paul Boddie <paul@boddie.org.uk>     8      9 This software is free software; you can redistribute it and/or    10 modify it under the terms of the GNU General Public License as    11 published by the Free Software Foundation; either version 2 of    12 the License, or (at your option) any later version.    13     14 This software is distributed in the hope that it will be useful,    15 but WITHOUT ANY WARRANTY; without even the implied warranty of    16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the    17 GNU General Public License for more details.    18     19 You should have received a copy of the GNU General Public    20 License along with this library; see the file LICENCE.txt    21 If not, write to the Free Software Foundation, Inc.,    22 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA    23     24 --------    25     26 Thread-style Processing    27 -----------------------    28     29 To create new processes to run a function or any callable object, specify the    30 "callable" and any arguments as follows:    31     32 channel = start(fn, arg1, arg2, named1=value1, named2=value2)    33     34 This returns a channel which can then be used to communicate with the created    35 process. Meanwhile, in the created process, the given callable will be invoked    36 with another channel as its first argument followed by the specified arguments:    37     38 def fn(channel, arg1, arg2, named1, named2):    39     # Read from and write to the channel.    40     # Return value is ignored.    41     ...    42     43 Fork-style Processing    44 ---------------------    45     46 To create new processes in a similar way to that employed when using os.fork    47 (ie. the fork system call on various operating systems), use the following    48 method:    49     50 channel = create()    51 if channel.pid == 0:    52     # This code is run by the created process.    53     # Read from and write to the channel to communicate with the    54     # creating/calling process.    55     # An explicit exit of the process may be desirable to prevent the process    56     # from running code which is intended for the creating/calling process.    57     ...    58 else:    59     # This code is run by the creating/calling process.    60     # Read from and write to the channel to communicate with the created    61     # process.    62     ...    63     64 Message Exchanges    65 -----------------    66     67 When creating many processes, each providing results for the consumption of the    68 main process, the collection of those results in an efficient fashion can be    69 problematic: if some processes take longer than others, and if we decide to read    70 from those processes when they are not ready instead of other processes which    71 are ready, the whole activity will take much longer than necessary.    72     73 One solution to the problem of knowing when to read from channels is to create    74 an Exchange object, optionally initialising it with a list of channels through    75 which data is expected to arrive:    76     77 exchange = Exchange()           # populate the exchange later    78 exchange = Exchange(channels)   # populate the exchange with channels    79     80 We can add channels to the exchange using the add method:    81     82 exchange.add(channel)    83     84 To test whether an exchange is active - that is, whether it is actually    85 monitoring any channels - we can use the active method which returns all    86 channels being monitored by the exchange:    87     88 channels = exchange.active()    89     90 We may then check the exchange to see whether any data is ready to be received;    91 for example:    92     93 for channel in exchange.ready():    94     # Read from and write to the channel.    95     ...    96     97 If we do not wish to wait indefinitely for a list of channels, we can set a    98 timeout value as an argument to the ready method (as a floating point number    99 specifying the timeout in seconds, where 0 means a non-blocking poll as stated   100 in the select module's select function documentation).   101    102 Signals and Waiting   103 -------------------   104    105 When created/child processes terminate, one would typically want to be informed   106 of such conditions using a signal handler. Unfortunately, Python seems to have   107 issues with restartable reads from file descriptors when interrupted by signals:   108    109 http://mail.python.org/pipermail/python-dev/2002-September/028572.html   110 http://twistedmatrix.com/bugs/issue733   111    112 Select and Poll   113 ---------------   114    115 The exact combination of conditions indicating closed pipes remains relatively   116 obscure. Here is a message/thread describing them (in the context of another   117 topic):   118    119 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html   120    121 It would seem, from using sockets and from studying the asyncore module, that   122 sockets are more predictable than pipes.   123 """   124    125 __version__ = "0.2.2"   126    127 import os   128 import sys   129 import select   130 import socket   131    132 try:   133     import cPickle as pickle   134 except ImportError:   135     import pickle   136    137 class AcknowledgementError(Exception):   138     pass   139    140 class Channel:   141    142     "A communications channel."   143    144     def __init__(self, pid, read_pipe, write_pipe):   145    146         """   147         Initialise the channel with a process identifier 'pid', a 'read_pipe'   148         from which messages will be received, and a 'write_pipe' into which   149         messages will be sent.   150         """   151    152         self.pid = pid   153         self.read_pipe = read_pipe   154         self.write_pipe = write_pipe   155         self.closed = 0   156    157     def __del__(self):   158    159         # Since signals don't work well with I/O, we close pipes and wait for   160         # created processes upon finalisation.   161    162         self.close()   163    164     def close(self):   165    166         "Explicitly close the channel."   167    168         if not self.closed:   169             self.closed = 1   170             self.read_pipe.close()   171             self.write_pipe.close()   172             #self.wait(os.WNOHANG)   173    174     def wait(self, options=0):   175    176         "Wait for the created process, if any, to exit."   177    178         if self.pid != 0:   179             try:   180                 os.waitpid(self.pid, options)   181             except OSError:   182                 pass   183    184     def _send(self, obj):   185    186         "Send the given object 'obj' through the channel."   187    188         pickle.dump(obj, self.write_pipe)   189         self.write_pipe.flush()   190    191     def send(self, obj):   192    193         """   194         Send the given object 'obj' through the channel. Then wait for an   195         acknowledgement. (The acknowledgement makes the caller wait, thus   196         preventing processes from exiting and disrupting the communications   197         channel and losing data.)   198         """   199    200         self._send(obj)   201         if self._receive() != "OK":   202             raise AcknowledgementError, obj   203    204     def _receive(self):   205    206         "Receive an object through the channel, returning the object."   207    208         obj = pickle.load(self.read_pipe)   209         if isinstance(obj, Exception):   210             raise obj   211         else:   212             return obj   213    214     def receive(self):   215    216         """   217         Receive an object through the channel, returning the object. Send an   218         acknowledgement of receipt. (The acknowledgement makes the sender wait,   219         thus preventing processes from exiting and disrupting the communications   220         channel and losing data.)   221         """   222    223         try:   224             obj = self._receive()   225             return obj   226         finally:   227             self._send("OK")   228    229 class Exchange:   230    231     """   232     A communications exchange that can be used to detect channels which are   233     ready to communicate.   234     """   235    236     def __init__(self, channels=None, autoclose=1):   237    238         """   239         Initialise the exchange with an optional list of 'channels'. If the   240         optional 'autoclose' parameter is set to a false value, channels will   241         not be closed automatically when they are removed from the exchange - by   242         default they are closed when removed.   243         """   244    245         self.autoclose = autoclose   246         self.readables = {}   247         self.poller = select.poll()   248         for channel in channels or []:   249             self.add(channel)   250    251     def add(self, channel):   252    253         "Add the given 'channel' to the exchange."   254    255         self.readables[channel.read_pipe.fileno()] = channel   256         self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)   257    258     def active(self):   259    260         "Return a list of active channels."   261    262         return self.readables.values()   263    264     def ready(self, timeout=None):   265    266         """   267         Wait for a period of time specified by the optional 'timeout' (or until   268         communication is possible) and return a list of channels which are ready   269         to be read from.   270         """   271    272         fds = self.poller.poll(timeout)   273         readables = []   274         for fd, status in fds:   275             channel = self.readables[fd]   276             removed = 0   277    278             # Remove ended/error channels.   279    280             if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):   281                 self.remove(channel)   282                 removed = 1   283    284             # Record readable channels.   285    286             if status & select.POLLIN:   287                 if not (removed and self.autoclose):   288                     readables.append(channel)   289    290         return readables   291    292     def remove(self, channel):   293    294         """   295         Remove the given 'channel' from the exchange.   296         """   297    298         del self.readables[channel.read_pipe.fileno()]   299         self.poller.unregister(channel.read_pipe.fileno())   300         if self.autoclose:   301             channel.close()   302             channel.wait()   303    304 def create():   305    306     """   307     Create a new process, returning a communications channel to both the   308     creating process and the created process.   309     """   310    311     parent, child = socket.socketpair()   312     for s in [parent, child]:   313         s.setblocking(1)   314    315     pid = os.fork()   316     if pid == 0:   317         parent.close()   318         return Channel(pid, child.makefile("r"), child.makefile("w"))   319     else:   320         child.close()   321         return Channel(pid, parent.makefile("r"), parent.makefile("w"))   322    323 def start(callable, *args, **kwargs):   324    325     """   326     Create a new process which shall start running in the given 'callable'.   327     Return a communications channel to the creating process, and supply such a   328     channel to the created process as the 'channel' parameter in the given   329     'callable'. Additional arguments to the 'callable' can be given as   330     additional arguments to this function.   331     """   332    333     channel = create()   334     if channel.pid == 0:   335         try:   336             try:   337                 callable(channel, *args, **kwargs)   338             except:   339                 exc_type, exc_value, exc_traceback = sys.exc_info()   340                 channel.send(exc_value)   341         finally:   342             channel.close()   343             sys.exit(0)   344     else:   345         return channel   346    347 def waitall():   348    349     "Wait for all created processes to terminate."   350    351     try:   352         while 1:   353             os.wait()   354     except OSError:   355         pass   356    357 # vim: tabstop=4 expandtab shiftwidth=4