1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005 Paul Boddie <paul@boddie.org.uk> 8 9 This software is free software; you can redistribute it and/or 10 modify it under the terms of the GNU General Public License as 11 published by the Free Software Foundation; either version 2 of 12 the License, or (at your option) any later version. 13 14 This software is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public 20 License along with this library; see the file LICENCE.txt 21 If not, write to the Free Software Foundation, Inc., 22 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA 23 24 -------- 25 26 Thread-style Processing 27 ----------------------- 28 29 To create new processes to run a function or any callable object, specify the 30 "callable" and any arguments as follows: 31 32 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 33 34 This returns a channel which can then be used to communicate with the created 35 process. Meanwhile, in the created process, the given callable will be invoked 36 with another channel as its first argument followed by the specified arguments: 37 38 def fn(channel, arg1, arg2, named1, named2): 39 # Read from and write to the channel. 40 # Return value is ignored. 41 ... 42 43 Fork-style Processing 44 --------------------- 45 46 To create new processes in a similar way to that employed when using os.fork 47 (ie. the fork system call on various operating systems), use the following 48 method: 49 50 channel = create() 51 if channel.pid == 0: 52 # This code is run by the created process. 53 # Read from and write to the channel to communicate with the 54 # creating/calling process. 55 # An explicit exit of the process may be desirable to prevent the process 56 # from running code which is intended for the creating/calling process. 57 ... 58 else: 59 # This code is run by the creating/calling process. 60 # Read from and write to the channel to communicate with the created 61 # process. 62 ... 63 64 Message Exchanges 65 ----------------- 66 67 When creating many processes, each providing results for the consumption of the 68 main process, the collection of those results in an efficient fashion can be 69 problematic: if some processes take longer than others, and if we decide to read 70 from those processes when they are not ready instead of other processes which 71 are ready, the whole activity will take much longer than necessary. 72 73 One solution to the problem of knowing when to read from channels is to create 74 an Exchange object, optionally initialising it with a list of channels through 75 which data is expected to arrive: 76 77 exchange = Exchange() # populate the exchange later 78 exchange = Exchange(channels) # populate the exchange with channels 79 80 We can add channels to the exchange using the add method: 81 82 exchange.add(channel) 83 84 To test whether an exchange is active - that is, whether it is actually 85 monitoring any channels - we can use the active method which returns all 86 channels being monitored by the exchange: 87 88 channels = exchange.active() 89 90 We may then check the exchange to see whether any data is ready to be received; 91 for example: 92 93 for channel in exchange.ready(): 94 # Read from and write to the channel. 95 ... 96 97 If we do not wish to wait indefinitely for a list of channels, we can set a 98 timeout value as an argument to the ready method (as a floating point number 99 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 100 in the select module's select function documentation). 101 102 Signals and Waiting 103 ------------------- 104 105 When created/child processes terminate, one would typically want to be informed 106 of such conditions using a signal handler. Unfortunately, Python seems to have 107 issues with restartable reads from file descriptors when interrupted by signals: 108 109 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 110 http://twistedmatrix.com/bugs/issue733 111 112 Select and Poll 113 --------------- 114 115 The exact combination of conditions indicating closed pipes remains relatively 116 obscure. Here is a message/thread describing them (in the context of another 117 topic): 118 119 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 120 121 It would seem, from using sockets and from studying the asyncore module, that 122 sockets are more predictable than pipes. 123 """ 124 125 __version__ = "0.2" 126 127 import os 128 import sys 129 import select 130 import socket 131 132 try: 133 import cPickle as pickle 134 except ImportError: 135 import pickle 136 137 class AcknowledgementError(Exception): 138 pass 139 140 class Channel: 141 142 "A communications channel." 143 144 def __init__(self, pid, read_pipe, write_pipe): 145 146 """ 147 Initialise the channel with a process identifier 'pid', a 'read_pipe' 148 from which messages will be received, and a 'write_pipe' into which 149 messages will be sent. 150 """ 151 152 self.pid = pid 153 self.read_pipe = read_pipe 154 self.write_pipe = write_pipe 155 self.closed = 0 156 157 def __del__(self): 158 159 # Since signals don't work well with I/O, we close pipes and wait for 160 # created processes upon finalisation. 161 162 self.close() 163 164 def close(self): 165 166 "Explicitly close the channel." 167 168 if not self.closed: 169 self.closed = 1 170 self.read_pipe.close() 171 self.write_pipe.close() 172 #self.wait(os.WNOHANG) 173 174 def wait(self, options=0): 175 176 "Wait for the created process, if any, to exit." 177 178 if self.pid != 0: 179 try: 180 os.waitpid(self.pid, options) 181 except OSError: 182 pass 183 184 def _send(self, obj): 185 186 "Send the given object 'obj' through the channel." 187 188 pickle.dump(obj, self.write_pipe) 189 self.write_pipe.flush() 190 191 def send(self, obj): 192 193 """ 194 Send the given object 'obj' through the channel. Then wait for an 195 acknowledgement. (The acknowledgement makes the caller wait, thus 196 preventing processes from exiting and disrupting the communications 197 channel and losing data.) 198 """ 199 200 self._send(obj) 201 if self._receive() != "OK": 202 raise AcknowledgementError, obj 203 204 def _receive(self): 205 206 "Receive an object through the channel, returning the object." 207 208 obj = pickle.load(self.read_pipe) 209 if isinstance(obj, Exception): 210 raise obj 211 else: 212 return obj 213 214 def receive(self): 215 216 """ 217 Receive an object through the channel, returning the object. Send an 218 acknowledgement of receipt. (The acknowledgement makes the sender wait, 219 thus preventing processes from exiting and disrupting the communications 220 channel and losing data.) 221 """ 222 223 try: 224 obj = self._receive() 225 return obj 226 finally: 227 self._send("OK") 228 229 class Exchange: 230 231 """ 232 A communications exchange that can be used to detect channels which are 233 ready to communicate. 234 """ 235 236 def __init__(self, channels=None, autoclose=1): 237 238 """ 239 Initialise the exchange with an optional list of 'channels'. If the 240 optional 'autoclose' parameter is set to a false value, channels will 241 not be closed automatically when they are removed from the exchange - by 242 default they are closed when removed. 243 """ 244 245 self.autoclose = autoclose 246 self.readables = {} 247 self.poller = select.poll() 248 for channel in channels or []: 249 self.add(channel) 250 251 def add(self, channel): 252 253 "Add the given 'channel' to the exchange." 254 255 self.readables[channel.read_pipe.fileno()] = channel 256 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 257 258 def active(self): 259 260 "Return a list of active channels." 261 262 return self.readables.values() 263 264 def ready(self, timeout=None): 265 266 """ 267 Wait for a period of time specified by the optional 'timeout' (or until 268 communication is possible) and return a list of channels which are ready 269 to be read from. 270 """ 271 272 fds = self.poller.poll(timeout) 273 readables = [] 274 for fd, status in fds: 275 channel = self.readables[fd] 276 277 # Remove ended/error channels. 278 279 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 280 self.remove(channel) 281 282 # Record readable channels. 283 284 elif status & select.POLLIN: 285 readables.append(channel) 286 287 return readables 288 289 def remove(self, channel): 290 291 """ 292 Remove the given 'channel' from the exchange. 293 """ 294 295 del self.readables[channel.read_pipe.fileno()] 296 self.poller.unregister(channel.read_pipe.fileno()) 297 if self.autoclose: 298 channel.close() 299 channel.wait() 300 301 def create(): 302 303 """ 304 Create a new process, returning a communications channel to both the 305 creating process and the created process. 306 """ 307 308 parent, child = socket.socketpair() 309 for s in [parent, child]: 310 s.setblocking(1) 311 312 pid = os.fork() 313 if pid == 0: 314 parent.close() 315 return Channel(pid, child.makefile("r"), child.makefile("w")) 316 else: 317 child.close() 318 return Channel(pid, parent.makefile("r"), parent.makefile("w")) 319 320 def start(callable, *args, **kwargs): 321 322 """ 323 Create a new process which shall start running in the given 'callable'. 324 Return a communications channel to the creating process, and supply such a 325 channel to the created process as the 'channel' parameter in the given 326 'callable'. Additional arguments to the 'callable' can be given as 327 additional arguments to this function. 328 """ 329 330 channel = create() 331 if channel.pid == 0: 332 try: 333 try: 334 callable(channel, *args, **kwargs) 335 except: 336 exc_type, exc_value, exc_traceback = sys.exc_info() 337 channel.send(exc_value) 338 finally: 339 channel.close() 340 sys.exit(0) 341 else: 342 return channel 343 344 def waitall(): 345 346 "Wait for all created processes to terminate." 347 348 try: 349 while 1: 350 os.wait() 351 except OSError: 352 pass 353 354 # vim: tabstop=4 expandtab shiftwidth=4