1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing" and "Convenient Message Exchanges" sections below, although 26 developers may wish to read the "Message Exchanges" section for more details of 27 the API concerned, and the "Fork-style Processing" section may be of interest to 28 those with experience of large scale parallel processing systems. 29 30 Thread-style Processing 31 ----------------------- 32 33 To create new processes to run a function or any callable object, specify the 34 "callable" and any arguments as follows: 35 36 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 37 38 This returns a channel which can then be used to communicate with the created 39 process. Meanwhile, in the created process, the given callable will be invoked 40 with another channel as its first argument followed by the specified arguments: 41 42 def fn(channel, arg1, arg2, named1, named2): 43 # Read from and write to the channel. 44 # Return value is ignored. 45 ... 46 47 Fork-style Processing 48 --------------------- 49 50 To create new processes in a similar way to that employed when using os.fork 51 (ie. the fork system call on various operating systems), use the following 52 method: 53 54 channel = create() 55 if channel.pid == 0: 56 # This code is run by the created process. 57 # Read from and write to the channel to communicate with the 58 # creating/calling process. 59 # An explicit exit of the process may be desirable to prevent the process 60 # from running code which is intended for the creating/calling process. 61 ... 62 else: 63 # This code is run by the creating/calling process. 64 # Read from and write to the channel to communicate with the created 65 # process. 66 ... 67 68 Message Exchanges 69 ----------------- 70 71 When creating many processes, each providing results for the consumption of the 72 main process, the collection of those results in an efficient fashion can be 73 problematic: if some processes take longer than others, and if we decide to read 74 from those processes when they are not ready instead of other processes which 75 are ready, the whole activity will take much longer than necessary. 76 77 One solution to the problem of knowing when to read from channels is to create 78 an Exchange object, optionally initialising it with a list of channels through 79 which data is expected to arrive: 80 81 exchange = pprocess.Exchange() # populate the exchange later 82 exchange = pprocess.Exchange(channels) # populate the exchange with channels 83 84 We can add channels to the exchange using the add method: 85 86 exchange.add(channel) 87 88 To test whether an exchange is active - that is, whether it is actually 89 monitoring any channels - we can use the active method which returns all 90 channels being monitored by the exchange: 91 92 channels = exchange.active() 93 94 We may then check the exchange to see whether any data is ready to be received; 95 for example: 96 97 for channel in exchange.ready(): 98 # Read from and write to the channel. 99 ... 100 101 If we do not wish to wait indefinitely for a list of channels, we can set a 102 timeout value as an argument to the ready method (as a floating point number 103 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 104 in the select module's select function documentation). 105 106 Convenient Message Exchanges 107 ---------------------------- 108 109 A convenient form of message exchanges can be adopted by defining a subclass of 110 the Exchange class and defining a particular method: 111 112 class MyExchange(pprocess.Exchange): 113 def store_data(self, channel): 114 data = channel.receive() 115 # Do something with data here. 116 117 The exact operations performed on the received data might be as simple as 118 storing it on an instance attribute. To make use of the exchange, we would 119 instantiate it as usual: 120 121 exchange = MyExchange() # populate the exchange later 122 exchange = MyExchange(limit=10) # set a limit for later population 123 124 The exchange can now be used in a simpler fashion than that shown above. We can 125 add channels as before using the add method, or we can choose to only add 126 channels if the specified limit of channels is not exceeded: 127 128 exchange.add(channel) # add a channel as normal 129 exchange.add_wait(channel) # add a channel, waiting if the limit would be 130 # exceeded 131 132 We can even start processes and monitor channels without ever handling the 133 channel ourselves: 134 135 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 136 137 We can explicitly wait for "free space" for channels by calling the wait method, 138 although the start and add_wait methods make this less interesting: 139 140 exchange.wait() 141 142 Finally, when finishing the computation, we can choose to merely call the finish 143 method and have the remaining data processed automatically: 144 145 exchange.finish() 146 147 Clearly, this approach is less flexible but more convenient than the raw message 148 exchange API as described above. However, it permits much simpler and clearer 149 code. 150 151 Signals and Waiting 152 ------------------- 153 154 When created/child processes terminate, one would typically want to be informed 155 of such conditions using a signal handler. Unfortunately, Python seems to have 156 issues with restartable reads from file descriptors when interrupted by signals: 157 158 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 159 http://twistedmatrix.com/bugs/issue733 160 161 Select and Poll 162 --------------- 163 164 The exact combination of conditions indicating closed pipes remains relatively 165 obscure. Here is a message/thread describing them (in the context of another 166 topic): 167 168 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 169 170 It would seem, from using sockets and from studying the asyncore module, that 171 sockets are more predictable than pipes. 172 173 Notes about poll implementations can be found here: 174 175 http://www.greenend.org.uk/rjk/2001/06/poll.html 176 """ 177 178 __version__ = "0.2.5" 179 180 import os 181 import sys 182 import select 183 import socket 184 185 try: 186 import cPickle as pickle 187 except ImportError: 188 import pickle 189 190 class AcknowledgementError(Exception): 191 pass 192 193 class Channel: 194 195 "A communications channel." 196 197 def __init__(self, pid, read_pipe, write_pipe): 198 199 """ 200 Initialise the channel with a process identifier 'pid', a 'read_pipe' 201 from which messages will be received, and a 'write_pipe' into which 202 messages will be sent. 203 """ 204 205 self.pid = pid 206 self.read_pipe = read_pipe 207 self.write_pipe = write_pipe 208 self.closed = 0 209 210 def __del__(self): 211 212 # Since signals don't work well with I/O, we close pipes and wait for 213 # created processes upon finalisation. 214 215 self.close() 216 217 def close(self): 218 219 "Explicitly close the channel." 220 221 if not self.closed: 222 self.closed = 1 223 self.read_pipe.close() 224 self.write_pipe.close() 225 #self.wait(os.WNOHANG) 226 227 def wait(self, options=0): 228 229 "Wait for the created process, if any, to exit." 230 231 if self.pid != 0: 232 try: 233 os.waitpid(self.pid, options) 234 except OSError: 235 pass 236 237 def _send(self, obj): 238 239 "Send the given object 'obj' through the channel." 240 241 pickle.dump(obj, self.write_pipe) 242 self.write_pipe.flush() 243 244 def send(self, obj): 245 246 """ 247 Send the given object 'obj' through the channel. Then wait for an 248 acknowledgement. (The acknowledgement makes the caller wait, thus 249 preventing processes from exiting and disrupting the communications 250 channel and losing data.) 251 """ 252 253 self._send(obj) 254 if self._receive() != "OK": 255 raise AcknowledgementError, obj 256 257 def _receive(self): 258 259 "Receive an object through the channel, returning the object." 260 261 obj = pickle.load(self.read_pipe) 262 if isinstance(obj, Exception): 263 raise obj 264 else: 265 return obj 266 267 def receive(self): 268 269 """ 270 Receive an object through the channel, returning the object. Send an 271 acknowledgement of receipt. (The acknowledgement makes the sender wait, 272 thus preventing processes from exiting and disrupting the communications 273 channel and losing data.) 274 """ 275 276 try: 277 obj = self._receive() 278 return obj 279 finally: 280 self._send("OK") 281 282 class Exchange: 283 284 """ 285 A communications exchange that can be used to detect channels which are 286 ready to communicate. Subclasses of this class can define the 'store_data' 287 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 288 """ 289 290 def __init__(self, channels=None, limit=None, autoclose=1): 291 292 """ 293 Initialise the exchange with an optional list of 'channels'. 294 295 If the optional 'limit' is specified, restrictions on the addition of 296 new channels can be enforced and observed through the 'add_wait', 'wait' 297 and 'finish' methods. To make use of these methods, create a subclass of 298 this class and define a working 'store_data' method. 299 300 If the optional 'autoclose' parameter is set to a false value, channels 301 will not be closed automatically when they are removed from the exchange 302 - by default they are closed when removed. 303 """ 304 305 self.limit = limit 306 self.autoclose = autoclose 307 self.readables = {} 308 self.removed = [] 309 self.poller = select.poll() 310 for channel in channels or []: 311 self.add(channel) 312 313 def add(self, channel): 314 315 "Add the given 'channel' to the exchange." 316 317 self.readables[channel.read_pipe.fileno()] = channel 318 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 319 320 def active(self): 321 322 "Return a list of active channels." 323 324 return self.readables.values() 325 326 def ready(self, timeout=None): 327 328 """ 329 Wait for a period of time specified by the optional 'timeout' (or until 330 communication is possible) and return a list of channels which are ready 331 to be read from. 332 """ 333 334 fds = self.poller.poll(timeout) 335 readables = [] 336 self.removed = [] 337 338 for fd, status in fds: 339 channel = self.readables[fd] 340 removed = 0 341 342 # Remove ended/error channels. 343 344 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 345 self.remove(channel) 346 self.removed.append(channel) 347 removed = 1 348 349 # Record readable channels. 350 351 if status & select.POLLIN: 352 if not (removed and self.autoclose): 353 readables.append(channel) 354 355 return readables 356 357 def remove(self, channel): 358 359 """ 360 Remove the given 'channel' from the exchange. 361 """ 362 363 del self.readables[channel.read_pipe.fileno()] 364 self.poller.unregister(channel.read_pipe.fileno()) 365 if self.autoclose: 366 channel.close() 367 channel.wait() 368 369 # Enhanced exchange methods involving channel limits. 370 371 def add_wait(self, channel): 372 373 """ 374 Add the given 'channel' to the exchange, waiting if the limit on active 375 channels would be exceeded by adding the channel. 376 """ 377 378 self.wait() 379 self.add(channel) 380 381 def wait(self): 382 383 """ 384 Test for the limit on channels, blocking and reading incoming data until 385 the number of channels is below the limit. 386 """ 387 388 # If limited, block until channels have been closed. 389 390 while self.limit is not None and len(self.active()) >= self.limit: 391 self.store() 392 393 def finish(self): 394 395 """ 396 Finish the use of the exchange by waiting for all channels to complete. 397 """ 398 399 while self.active(): 400 self.store() 401 402 def store(self): 403 404 "For each ready channel, process the incoming data." 405 406 for channel in self.ready(): 407 self.store_data(channel) 408 409 def store_data(self, channel): 410 411 """ 412 Store incoming data from the specified 'channel'. In subclasses of this 413 class, such data could be stored using instance attributes. 414 """ 415 416 raise NotImplementedError, "store_data" 417 418 # Convenience methods. 419 420 def start(self, callable, *args, **kwargs): 421 422 """ 423 Using pprocess.start, create a new process for the given 'callable' 424 using any additional arguments provided. Then, monitor the channel 425 created between this process and the created process. 426 """ 427 428 self.add_wait(start(callable, *args, **kwargs)) 429 430 def create(): 431 432 """ 433 Create a new process, returning a communications channel to both the 434 creating process and the created process. 435 """ 436 437 parent, child = socket.socketpair() 438 for s in [parent, child]: 439 s.setblocking(1) 440 441 pid = os.fork() 442 if pid == 0: 443 parent.close() 444 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 445 else: 446 child.close() 447 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 448 449 def start(callable, *args, **kwargs): 450 451 """ 452 Create a new process which shall start running in the given 'callable'. 453 Return a communications channel to the creating process, and supply such a 454 channel to the created process as the 'channel' parameter in the given 455 'callable'. Additional arguments to the 'callable' can be given as 456 additional arguments to this function. 457 """ 458 459 channel = create() 460 if channel.pid == 0: 461 try: 462 try: 463 callable(channel, *args, **kwargs) 464 except: 465 exc_type, exc_value, exc_traceback = sys.exc_info() 466 channel.send(exc_value) 467 finally: 468 channel.close() 469 os._exit(0) 470 else: 471 return channel 472 473 def waitall(): 474 475 "Wait for all created processes to terminate." 476 477 try: 478 while 1: 479 os.wait() 480 except OSError: 481 pass 482 483 # vim: tabstop=4 expandtab shiftwidth=4