1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 else: 64 # This code is run by the creating/calling process. 65 # Read from and write to the channel to communicate with the created 66 # process. 67 ... 68 69 Message Exchanges 70 ----------------- 71 72 When creating many processes, each providing results for the consumption of the 73 main process, the collection of those results in an efficient fashion can be 74 problematic: if some processes take longer than others, and if we decide to read 75 from those processes when they are not ready instead of other processes which 76 are ready, the whole activity will take much longer than necessary. 77 78 One solution to the problem of knowing when to read from channels is to create 79 an Exchange object, optionally initialising it with a list of channels through 80 which data is expected to arrive: 81 82 exchange = pprocess.Exchange() # populate the exchange later 83 exchange = pprocess.Exchange(channels) # populate the exchange with channels 84 85 We can add channels to the exchange using the add method: 86 87 exchange.add(channel) 88 89 To test whether an exchange is active - that is, whether it is actually 90 monitoring any channels - we can use the active method which returns all 91 channels being monitored by the exchange: 92 93 channels = exchange.active() 94 95 We may then check the exchange to see whether any data is ready to be received; 96 for example: 97 98 for channel in exchange.ready(): 99 # Read from and write to the channel. 100 ... 101 102 If we do not wish to wait indefinitely for a list of channels, we can set a 103 timeout value as an argument to the ready method (as a floating point number 104 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 105 in the select module's select function documentation). 106 107 Convenient Message Exchanges 108 ---------------------------- 109 110 A convenient form of message exchanges can be adopted by defining a subclass of 111 the Exchange class and defining a particular method: 112 113 class MyExchange(pprocess.Exchange): 114 def store_data(self, channel): 115 data = channel.receive() 116 # Do something with data here. 117 118 The exact operations performed on the received data might be as simple as 119 storing it on an instance attribute. To make use of the exchange, we would 120 instantiate it as usual: 121 122 exchange = MyExchange() # populate the exchange later 123 exchange = MyExchange(limit=10) # set a limit for later population 124 125 The exchange can now be used in a simpler fashion than that shown above. We can 126 add channels as before using the add method, or we can choose to only add 127 channels if the specified limit of channels is not exceeded: 128 129 exchange.add(channel) # add a channel as normal 130 exchange.add_wait(channel) # add a channel, waiting if the limit would be 131 # exceeded 132 133 We can even start processes and monitor channels without ever handling the 134 channel ourselves: 135 136 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 137 138 We can explicitly wait for "free space" for channels by calling the wait method, 139 although the start and add_wait methods make this less interesting: 140 141 exchange.wait() 142 143 Finally, when finishing the computation, we can choose to merely call the finish 144 method and have the remaining data processed automatically: 145 146 exchange.finish() 147 148 Clearly, this approach is less flexible but more convenient than the raw message 149 exchange API as described above. However, it permits much simpler and clearer 150 code. 151 152 Managed Callables 153 ----------------- 154 155 A further simplification of the above convenient use of message exchanges 156 involves the creation of callables (eg. functions) which are automatically 157 monitored by an exchange. We create such a callable by calling the wrap method 158 on an exchange: 159 160 myfn = exchange.wrap(fn) 161 162 This callable can then be invoked instead of using the exchange's start method: 163 164 myfn(arg1, arg2, named1=value1, named2=value2) 165 166 The exchange's finish method can be used as usual to process incoming data. 167 168 Map-style Processing 169 -------------------- 170 171 In situations where a callable would normally be used in conjunction with the 172 Python built-in map function, an alternative solution can be adopted by using 173 the pmap function: 174 175 pprocess.pmap(fn, sequence) 176 177 Here, the sequence would have to contain elements that each contain the required 178 parameters of the specified callable, fn. 179 180 Making Existing Functions Parallel 181 ---------------------------------- 182 183 In making a program parallel, existing functions which only return results can 184 be manually modified to accept and use channels to communicate results back to 185 the main process. However, a simple alternative is to use the MakeParallel class 186 to provide a wrapper around unmodified functions which will return the results 187 from those functions in the channels provided. For example: 188 189 fn = pprocess.MakeParallel(originalfn) 190 191 Signals and Waiting 192 ------------------- 193 194 When created/child processes terminate, one would typically want to be informed 195 of such conditions using a signal handler. Unfortunately, Python seems to have 196 issues with restartable reads from file descriptors when interrupted by signals: 197 198 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 199 http://twistedmatrix.com/bugs/issue733 200 201 Select and Poll 202 --------------- 203 204 The exact combination of conditions indicating closed pipes remains relatively 205 obscure. Here is a message/thread describing them (in the context of another 206 topic): 207 208 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 209 210 It would seem, from using sockets and from studying the asyncore module, that 211 sockets are more predictable than pipes. 212 213 Notes about poll implementations can be found here: 214 215 http://www.greenend.org.uk/rjk/2001/06/poll.html 216 """ 217 218 __version__ = "0.2.6" 219 220 import os 221 import sys 222 import select 223 import socket 224 225 try: 226 import cPickle as pickle 227 except ImportError: 228 import pickle 229 230 # Communications. 231 232 class AcknowledgementError(Exception): 233 pass 234 235 class Channel: 236 237 "A communications channel." 238 239 def __init__(self, pid, read_pipe, write_pipe): 240 241 """ 242 Initialise the channel with a process identifier 'pid', a 'read_pipe' 243 from which messages will be received, and a 'write_pipe' into which 244 messages will be sent. 245 """ 246 247 self.pid = pid 248 self.read_pipe = read_pipe 249 self.write_pipe = write_pipe 250 self.closed = 0 251 252 def __del__(self): 253 254 # Since signals don't work well with I/O, we close pipes and wait for 255 # created processes upon finalisation. 256 257 self.close() 258 259 def close(self): 260 261 "Explicitly close the channel." 262 263 if not self.closed: 264 self.closed = 1 265 self.read_pipe.close() 266 self.write_pipe.close() 267 #self.wait(os.WNOHANG) 268 269 def wait(self, options=0): 270 271 "Wait for the created process, if any, to exit." 272 273 if self.pid != 0: 274 try: 275 os.waitpid(self.pid, options) 276 except OSError: 277 pass 278 279 def _send(self, obj): 280 281 "Send the given object 'obj' through the channel." 282 283 pickle.dump(obj, self.write_pipe) 284 self.write_pipe.flush() 285 286 def send(self, obj): 287 288 """ 289 Send the given object 'obj' through the channel. Then wait for an 290 acknowledgement. (The acknowledgement makes the caller wait, thus 291 preventing processes from exiting and disrupting the communications 292 channel and losing data.) 293 """ 294 295 self._send(obj) 296 if self._receive() != "OK": 297 raise AcknowledgementError, obj 298 299 def _receive(self): 300 301 "Receive an object through the channel, returning the object." 302 303 obj = pickle.load(self.read_pipe) 304 if isinstance(obj, Exception): 305 raise obj 306 else: 307 return obj 308 309 def receive(self): 310 311 """ 312 Receive an object through the channel, returning the object. Send an 313 acknowledgement of receipt. (The acknowledgement makes the sender wait, 314 thus preventing processes from exiting and disrupting the communications 315 channel and losing data.) 316 """ 317 318 try: 319 obj = self._receive() 320 return obj 321 finally: 322 self._send("OK") 323 324 # Management of processes and communications. 325 326 class Exchange: 327 328 """ 329 A communications exchange that can be used to detect channels which are 330 ready to communicate. Subclasses of this class can define the 'store_data' 331 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 332 """ 333 334 def __init__(self, channels=None, limit=None, autoclose=1): 335 336 """ 337 Initialise the exchange with an optional list of 'channels'. 338 339 If the optional 'limit' is specified, restrictions on the addition of 340 new channels can be enforced and observed through the 'add_wait', 'wait' 341 and 'finish' methods. To make use of these methods, create a subclass of 342 this class and define a working 'store_data' method. 343 344 If the optional 'autoclose' parameter is set to a false value, channels 345 will not be closed automatically when they are removed from the exchange 346 - by default they are closed when removed. 347 """ 348 349 self.limit = limit 350 self.autoclose = autoclose 351 self.readables = {} 352 self.removed = [] 353 self.poller = select.poll() 354 for channel in channels or []: 355 self.add(channel) 356 357 def add(self, channel): 358 359 "Add the given 'channel' to the exchange." 360 361 self.readables[channel.read_pipe.fileno()] = channel 362 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 363 364 def active(self): 365 366 "Return a list of active channels." 367 368 return self.readables.values() 369 370 def ready(self, timeout=None): 371 372 """ 373 Wait for a period of time specified by the optional 'timeout' (or until 374 communication is possible) and return a list of channels which are ready 375 to be read from. 376 """ 377 378 fds = self.poller.poll(timeout) 379 readables = [] 380 self.removed = [] 381 382 for fd, status in fds: 383 channel = self.readables[fd] 384 removed = 0 385 386 # Remove ended/error channels. 387 388 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 389 self.remove(channel) 390 self.removed.append(channel) 391 removed = 1 392 393 # Record readable channels. 394 395 if status & select.POLLIN: 396 if not (removed and self.autoclose): 397 readables.append(channel) 398 399 return readables 400 401 def remove(self, channel): 402 403 """ 404 Remove the given 'channel' from the exchange. 405 """ 406 407 del self.readables[channel.read_pipe.fileno()] 408 self.poller.unregister(channel.read_pipe.fileno()) 409 if self.autoclose: 410 channel.close() 411 channel.wait() 412 413 # Enhanced exchange methods involving channel limits. 414 415 def add_wait(self, channel): 416 417 """ 418 Add the given 'channel' to the exchange, waiting if the limit on active 419 channels would be exceeded by adding the channel. 420 """ 421 422 self.wait() 423 self.add(channel) 424 425 def wait(self): 426 427 """ 428 Test for the limit on channels, blocking and reading incoming data until 429 the number of channels is below the limit. 430 """ 431 432 # If limited, block until channels have been closed. 433 434 while self.limit is not None and len(self.active()) >= self.limit: 435 self.store() 436 437 def finish(self): 438 439 """ 440 Finish the use of the exchange by waiting for all channels to complete. 441 """ 442 443 while self.active(): 444 self.store() 445 446 def store(self): 447 448 "For each ready channel, process the incoming data." 449 450 for channel in self.ready(): 451 self.store_data(channel) 452 453 def store_data(self, channel): 454 455 """ 456 Store incoming data from the specified 'channel'. In subclasses of this 457 class, such data could be stored using instance attributes. 458 """ 459 460 raise NotImplementedError, "store_data" 461 462 # Convenience methods. 463 464 def start(self, callable, *args, **kw): 465 466 """ 467 Using pprocess.start, create a new process for the given 'callable' 468 using any additional arguments provided. Then, monitor the channel 469 created between this process and the created process. 470 """ 471 472 self.add_wait(start(callable, *args, **kw)) 473 474 def manage(self, callable): 475 476 """ 477 Wrap the given 'callable' in an object which can then be called in the 478 same way as 'callable', but with new processes and communications 479 managed automatically. 480 """ 481 482 return ManagedCallable(callable, self) 483 484 class ManagedCallable: 485 486 "A callable managed by an exchange." 487 488 def __init__(self, callable, exchange): 489 490 """ 491 Wrap the given 'callable', using the given 'exchange' to monitor the 492 channels created for communications between this and the created 493 processes. 494 """ 495 496 self.callable = callable 497 self.exchange = exchange 498 499 def __call__(self, *args, **kw): 500 501 "Invoke the callable with the supplied arguments." 502 503 self.exchange.start(self.callable, *args, **kw) 504 505 # Abstractions and utilities. 506 507 class Map(Exchange): 508 509 "An exchange which can be used like the built-in 'map' function." 510 511 def add(self, channel): 512 513 "Add the given 'channel' to the exchange." 514 515 Exchange.add(self, channel) 516 self.channels.append(channel) 517 518 def __call__(self, callable, sequence): 519 520 "Invoke 'callable' for each element in the 'sequence'." 521 522 # Remember the channel addition order to order output. 523 524 self.channels = [] 525 self.results = {} 526 527 for i in sequence: 528 self.start(callable, *i) 529 self.finish() 530 531 # NOTE: Could use a generator instead. 532 533 result = [] 534 for channel in self.channels: 535 result.append(self.results[channel]) 536 return result 537 538 def store_data(self, channel): 539 540 "Accumulate the incoming data, associating results with channels." 541 542 data = channel.receive() 543 self.results[channel] = data 544 545 class MakeParallel: 546 547 "A wrapper around functions making them able to communicate results." 548 549 def __init__(self, callable): 550 551 "Initialise the wrapper with the given 'callable'." 552 553 self.callable = callable 554 555 def __call__(self, channel, *args, **kw): 556 557 "Invoke the callable and return its result via the given 'channel'." 558 559 channel.send(self.callable(*args, **kw)) 560 561 # Utility functions. 562 563 def create(): 564 565 """ 566 Create a new process, returning a communications channel to both the 567 creating process and the created process. 568 """ 569 570 parent, child = socket.socketpair() 571 for s in [parent, child]: 572 s.setblocking(1) 573 574 pid = os.fork() 575 if pid == 0: 576 parent.close() 577 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 578 else: 579 child.close() 580 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 581 582 def start(callable, *args, **kw): 583 584 """ 585 Create a new process which shall start running in the given 'callable'. 586 Return a communications channel to the creating process, and supply such a 587 channel to the created process as the 'channel' parameter in the given 588 'callable'. Additional arguments to the 'callable' can be given as 589 additional arguments to this function. 590 """ 591 592 channel = create() 593 if channel.pid == 0: 594 try: 595 try: 596 callable(channel, *args, **kw) 597 except: 598 exc_type, exc_value, exc_traceback = sys.exc_info() 599 channel.send(exc_value) 600 finally: 601 channel.close() 602 os._exit(0) 603 else: 604 return channel 605 606 def waitall(): 607 608 "Wait for all created processes to terminate." 609 610 try: 611 while 1: 612 os.wait() 613 except OSError: 614 pass 615 616 def pmap(callable, sequence): 617 618 "A parallel version of the built-in map function." 619 620 mymap = Map() 621 return mymap(callable, sequence) 622 623 # vim: tabstop=4 expandtab shiftwidth=4