1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = pprocess.create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 pprocess.exit(channel) 64 else: 65 # This code is run by the creating/calling process. 66 # Read from and write to the channel to communicate with the created 67 # process. 68 ... 69 70 Message Exchanges 71 ----------------- 72 73 When creating many processes, each providing results for the consumption of the 74 main process, the collection of those results in an efficient fashion can be 75 problematic: if some processes take longer than others, and if we decide to read 76 from those processes when they are not ready instead of other processes which 77 are ready, the whole activity will take much longer than necessary. 78 79 One solution to the problem of knowing when to read from channels is to create 80 an Exchange object, optionally initialising it with a list of channels through 81 which data is expected to arrive: 82 83 exchange = pprocess.Exchange() # populate the exchange later 84 exchange = pprocess.Exchange(channels) # populate the exchange with channels 85 86 We can add channels to the exchange using the add method: 87 88 exchange.add(channel) 89 90 To test whether an exchange is active - that is, whether it is actually 91 monitoring any channels - we can use the active method which returns all 92 channels being monitored by the exchange: 93 94 channels = exchange.active() 95 96 We may then check the exchange to see whether any data is ready to be received; 97 for example: 98 99 for channel in exchange.ready(): 100 # Read from and write to the channel. 101 ... 102 103 If we do not wish to wait indefinitely for a list of channels, we can set a 104 timeout value as an argument to the ready method (as a floating point number 105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 106 in the select module's select function documentation). 107 108 Convenient Message Exchanges 109 ---------------------------- 110 111 A convenient form of message exchanges can be adopted by defining a subclass of 112 the Exchange class and defining a particular method: 113 114 class MyExchange(pprocess.Exchange): 115 def store_data(self, channel): 116 data = channel.receive() 117 # Do something with data here. 118 119 The exact operations performed on the received data might be as simple as 120 storing it on an instance attribute. To make use of the exchange, we would 121 instantiate it as usual: 122 123 exchange = MyExchange() # populate the exchange later 124 exchange = MyExchange(limit=10) # set a limit for later population 125 126 The exchange can now be used in a simpler fashion than that shown above. We can 127 add channels as before using the add method, or we can choose to only add 128 channels if the specified limit of channels is not exceeded: 129 130 exchange.add(channel) # add a channel as normal 131 exchange.add_wait(channel) # add a channel, waiting if the limit would be 132 # exceeded 133 134 Or we can request that the exchange create a channel on our behalf: 135 136 channel = exchange.create() 137 138 We can even start processes and monitor channels without ever handling the 139 channel ourselves: 140 141 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 142 143 We can explicitly wait for "free space" for channels by calling the wait method, 144 although the start and add_wait methods make this less interesting: 145 146 exchange.wait() 147 148 Finally, when finishing the computation, we can choose to merely call the finish 149 method and have the remaining data processed automatically: 150 151 exchange.finish() 152 153 Clearly, this approach is less flexible but more convenient than the raw message 154 exchange API as described above. However, it permits much simpler and clearer 155 code. 156 157 Exchanges as Queues 158 ------------------- 159 160 Instead of having to subclass the pprocess.Exchange class and to define the 161 store_data method, it might be more desirable to let the exchange manage the 162 communications between created and creating processes and to let the creating 163 process just consume received data as it arrives, without particular regard for 164 the order of the received data - perhaps the creating process has its own way of 165 managing such issues. 166 167 For such situations, the Queue class may be instantiated and channels added to 168 the queue using the various methods provided: 169 170 queue = pprocess.Queue(limit=10) 171 channel = queue.create() 172 if channel: 173 # Do some computation. 174 pprocess.exit(channel) 175 176 The results can then be consumed by treating the queue like an iterator: 177 178 for result in queue: 179 # Capture each result. 180 181 This approach does not, of course, require the direct handling of channels. One 182 could instead use the start method on the queue to create processes and to 183 initiate computations (since a queue is merely an enhanced exchange with a 184 specific implementation of the store_data method). 185 186 Managed Callables 187 ----------------- 188 189 A further simplification of the above convenient use of message exchanges 190 involves the creation of callables (eg. functions) which are automatically 191 monitored by an exchange. We create such a callable by calling the manage method 192 on an exchange: 193 194 myfn = exchange.manage(fn) 195 196 This callable can then be invoked instead of using the exchange's start method: 197 198 myfn(arg1, arg2, named1=value1, named2=value2) 199 200 The exchange's finish method can be used as usual to process incoming data. 201 202 Making Existing Functions Parallel 203 ---------------------------------- 204 205 In making a program parallel, existing functions which only return results can 206 be manually modified to accept and use channels to communicate results back to 207 the main process. However, a simple alternative is to use the MakeParallel class 208 to provide a wrapper around unmodified functions which will return the results 209 from those functions in the channels provided. For example: 210 211 fn = pprocess.MakeParallel(originalfn) 212 213 Map-style Processing 214 -------------------- 215 216 In situations where a callable would normally be used in conjunction with the 217 Python built-in map function, an alternative solution can be adopted by using 218 the pmap function: 219 220 pprocess.pmap(fn, sequence) 221 222 Here, the sequence would have to contain elements that each contain the required 223 parameters of the specified callable, fn. Note that the callable does not need 224 to be a parallel-aware function which has a channel argument: the pmap function 225 automatically wraps the given callable internally. 226 227 Signals and Waiting 228 ------------------- 229 230 When created/child processes terminate, one would typically want to be informed 231 of such conditions using a signal handler. Unfortunately, Python seems to have 232 issues with restartable reads from file descriptors when interrupted by signals: 233 234 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 235 http://twistedmatrix.com/bugs/issue733 236 237 Select and Poll 238 --------------- 239 240 The exact combination of conditions indicating closed pipes remains relatively 241 obscure. Here is a message/thread describing them (in the context of another 242 topic): 243 244 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 245 246 It would seem, from using sockets and from studying the asyncore module, that 247 sockets are more predictable than pipes. 248 249 Notes about poll implementations can be found here: 250 251 http://www.greenend.org.uk/rjk/2001/06/poll.html 252 """ 253 254 __version__ = "0.3" 255 256 import os 257 import sys 258 import select 259 import socket 260 261 try: 262 import cPickle as pickle 263 except ImportError: 264 import pickle 265 266 # Communications. 267 268 class AcknowledgementError(Exception): 269 pass 270 271 class Channel: 272 273 "A communications channel." 274 275 def __init__(self, pid, read_pipe, write_pipe): 276 277 """ 278 Initialise the channel with a process identifier 'pid', a 'read_pipe' 279 from which messages will be received, and a 'write_pipe' into which 280 messages will be sent. 281 """ 282 283 self.pid = pid 284 self.read_pipe = read_pipe 285 self.write_pipe = write_pipe 286 self.closed = 0 287 288 def __del__(self): 289 290 # Since signals don't work well with I/O, we close pipes and wait for 291 # created processes upon finalisation. 292 293 self.close() 294 295 def close(self): 296 297 "Explicitly close the channel." 298 299 if not self.closed: 300 self.closed = 1 301 self.read_pipe.close() 302 self.write_pipe.close() 303 #self.wait(os.WNOHANG) 304 305 def wait(self, options=0): 306 307 "Wait for the created process, if any, to exit." 308 309 if self.pid != 0: 310 try: 311 os.waitpid(self.pid, options) 312 except OSError: 313 pass 314 315 def _send(self, obj): 316 317 "Send the given object 'obj' through the channel." 318 319 pickle.dump(obj, self.write_pipe) 320 self.write_pipe.flush() 321 322 def send(self, obj): 323 324 """ 325 Send the given object 'obj' through the channel. Then wait for an 326 acknowledgement. (The acknowledgement makes the caller wait, thus 327 preventing processes from exiting and disrupting the communications 328 channel and losing data.) 329 """ 330 331 self._send(obj) 332 if self._receive() != "OK": 333 raise AcknowledgementError, obj 334 335 def _receive(self): 336 337 "Receive an object through the channel, returning the object." 338 339 obj = pickle.load(self.read_pipe) 340 if isinstance(obj, Exception): 341 raise obj 342 else: 343 return obj 344 345 def receive(self): 346 347 """ 348 Receive an object through the channel, returning the object. Send an 349 acknowledgement of receipt. (The acknowledgement makes the sender wait, 350 thus preventing processes from exiting and disrupting the communications 351 channel and losing data.) 352 """ 353 354 try: 355 obj = self._receive() 356 return obj 357 finally: 358 self._send("OK") 359 360 # Management of processes and communications. 361 362 class Exchange: 363 364 """ 365 A communications exchange that can be used to detect channels which are 366 ready to communicate. Subclasses of this class can define the 'store_data' 367 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 368 """ 369 370 def __init__(self, channels=None, limit=None, autoclose=1): 371 372 """ 373 Initialise the exchange with an optional list of 'channels'. 374 375 If the optional 'limit' is specified, restrictions on the addition of 376 new channels can be enforced and observed through the 'add_wait', 'wait' 377 and 'finish' methods. To make use of these methods, create a subclass of 378 this class and define a working 'store_data' method. 379 380 If the optional 'autoclose' parameter is set to a false value, channels 381 will not be closed automatically when they are removed from the exchange 382 - by default they are closed when removed. 383 """ 384 385 self.limit = limit 386 self.waiting = [] 387 self.autoclose = autoclose 388 self.readables = {} 389 self.removed = [] 390 self.poller = select.poll() 391 for channel in channels or []: 392 self.add(channel) 393 394 def add(self, channel): 395 396 "Add the given 'channel' to the exchange." 397 398 self.readables[channel.read_pipe.fileno()] = channel 399 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 400 401 def active(self): 402 403 "Return a list of active channels." 404 405 return self.readables.values() 406 407 def ready(self, timeout=None): 408 409 """ 410 Wait for a period of time specified by the optional 'timeout' (or until 411 communication is possible) and return a list of channels which are ready 412 to be read from. 413 """ 414 415 fds = self.poller.poll(timeout) 416 readables = [] 417 self.removed = [] 418 419 for fd, status in fds: 420 channel = self.readables[fd] 421 removed = 0 422 423 # Remove ended/error channels. 424 425 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 426 self.remove(channel) 427 self.removed.append(channel) 428 removed = 1 429 430 # Record readable channels. 431 432 if status & select.POLLIN: 433 if not (removed and self.autoclose): 434 readables.append(channel) 435 436 return readables 437 438 def remove(self, channel): 439 440 """ 441 Remove the given 'channel' from the exchange. 442 """ 443 444 del self.readables[channel.read_pipe.fileno()] 445 self.poller.unregister(channel.read_pipe.fileno()) 446 if self.autoclose: 447 channel.close() 448 channel.wait() 449 450 # Enhanced exchange methods involving channel limits. 451 452 def add_wait(self, channel): 453 454 """ 455 Add the given 'channel' to the exchange, waiting if the limit on active 456 channels would be exceeded by adding the channel. 457 """ 458 459 self.wait() 460 self.add(channel) 461 462 def wait(self): 463 464 """ 465 Test for the limit on channels, blocking and reading incoming data until 466 the number of channels is below the limit. 467 """ 468 469 # If limited, block until channels have been closed. 470 471 while self.limit is not None and len(self.active()) >= self.limit: 472 self.store() 473 474 def start_waiting(self): 475 476 """ 477 Start a waiting process. 478 """ 479 480 if self.waiting: 481 callable, args, kw = self.waiting.pop() 482 self.add(start(callable, *args, **kw)) 483 484 def finish(self): 485 486 """ 487 Finish the use of the exchange by waiting for all channels to complete. 488 """ 489 490 while self.active(): 491 self.store() 492 493 def store(self): 494 495 "For each ready channel, process the incoming data." 496 497 for channel in self.ready(): 498 self.store_data(channel) 499 self.start_waiting() 500 501 def store_data(self, channel): 502 503 """ 504 Store incoming data from the specified 'channel'. In subclasses of this 505 class, such data could be stored using instance attributes. 506 """ 507 508 raise NotImplementedError, "store_data" 509 510 # Convenience methods. 511 512 def start(self, callable, *args, **kw): 513 514 """ 515 Using pprocess.start, create a new process for the given 'callable' 516 using any additional arguments provided. Then, monitor the channel 517 created between this process and the created process. 518 """ 519 520 if self.limit is not None and len(self.active()) >= self.limit: 521 self.waiting.insert(0, (callable, args, kw)) 522 return 523 524 self.add_wait(start(callable, *args, **kw)) 525 526 def create(self): 527 528 """ 529 Using pprocess.create, create a new process and return the created 530 communications channel to the created process. In the creating process, 531 return None - the channel receiving data from the created process will 532 be automatically managed by this exchange. 533 """ 534 535 channel = create() 536 if channel.pid == 0: 537 return channel 538 else: 539 self.add_wait(channel) 540 return None 541 542 def manage(self, callable): 543 544 """ 545 Wrap the given 'callable' in an object which can then be called in the 546 same way as 'callable', but with new processes and communications 547 managed automatically. 548 """ 549 550 return ManagedCallable(callable, self) 551 552 class ManagedCallable: 553 554 "A callable managed by an exchange." 555 556 def __init__(self, callable, exchange): 557 558 """ 559 Wrap the given 'callable', using the given 'exchange' to monitor the 560 channels created for communications between this and the created 561 processes. Note that the 'callable' must be parallel-aware (that is, 562 have a 'channel' parameter). Use the MakeParallel class to wrap other 563 kinds of callable objects. 564 """ 565 566 self.callable = callable 567 self.exchange = exchange 568 569 def __call__(self, *args, **kw): 570 571 "Invoke the callable with the supplied arguments." 572 573 self.exchange.start(self.callable, *args, **kw) 574 575 # Abstractions and utilities. 576 577 class Map(Exchange): 578 579 "An exchange which can be used like the built-in 'map' function." 580 581 def __init__(self, *args, **kw): 582 Exchange.__init__(self, *args, **kw) 583 self.init() 584 585 def init(self): 586 587 "Remember the channel addition order to order output." 588 589 self.channel_number = 0 590 self.channels = {} 591 self.results = [] 592 593 def add(self, channel): 594 595 "Add the given 'channel' to the exchange." 596 597 Exchange.add(self, channel) 598 self.channels[channel] = self.channel_number 599 self.channel_number += 1 600 601 def start(self, callable, *args, **kw): 602 603 """ 604 Using pprocess.start, create a new process for the given 'callable' 605 using any additional arguments provided. Then, monitor the channel 606 created between this process and the created process. 607 """ 608 609 self.results.append(None) # placeholder 610 Exchange.start(self, callable, *args, **kw) 611 612 def create(self): 613 614 """ 615 Using pprocess.create, create a new process and return the created 616 communications channel to the created process. In the creating process, 617 return None - the channel receiving data from the created process will 618 be automatically managed by this exchange. 619 """ 620 621 self.results.append(None) # placeholder 622 return Exchange.create(self) 623 624 def __call__(self, callable, sequence): 625 626 "Wrap and invoke 'callable' for each element in the 'sequence'." 627 628 if not isinstance(callable, MakeParallel): 629 wrapped = MakeParallel(callable) 630 else: 631 wrapped = callable 632 633 self.init() 634 635 # Start processes for each element in the sequence. 636 637 for i in sequence: 638 self.start(wrapped, i) 639 640 # Access to the results occurs through this object. 641 642 return self 643 644 def __getitem__(self, i): 645 self.finish() 646 return self.results[i] 647 648 def __iter__(self): 649 self.finish() 650 return iter(self.results) 651 652 def store_data(self, channel): 653 654 "Accumulate the incoming data, associating results with channels." 655 656 data = channel.receive() 657 self.results[self.channels[channel]] = data 658 del self.channels[channel] 659 660 class Queue(Exchange): 661 662 """ 663 An exchange acting as a queue, making data from created processes available 664 in the order in which it is received. 665 """ 666 667 def __init__(self, *args, **kw): 668 Exchange.__init__(self, *args, **kw) 669 self.queue = [] 670 671 def store_data(self, channel): 672 673 "Accumulate the incoming data, associating results with channels." 674 675 data = channel.receive() 676 self.queue.insert(0, data) 677 678 def __iter__(self): 679 return self 680 681 def next(self): 682 683 "Return the next element in the queue." 684 685 if self.queue: 686 return self.queue.pop() 687 while self.active(): 688 self.store() 689 if self.queue: 690 return self.queue.pop() 691 else: 692 raise StopIteration 693 694 class MakeParallel: 695 696 "A wrapper around functions making them able to communicate results." 697 698 def __init__(self, callable): 699 700 """ 701 Initialise the wrapper with the given 'callable'. This object will then 702 be able to accept a 'channel' parameter when invoked, and to forward the 703 result of the given 'callable' via the channel provided back to the 704 invoking process. 705 """ 706 707 self.callable = callable 708 709 def __call__(self, channel, *args, **kw): 710 711 "Invoke the callable and return its result via the given 'channel'." 712 713 channel.send(self.callable(*args, **kw)) 714 715 # Utility functions. 716 717 def create(): 718 719 """ 720 Create a new process, returning a communications channel to both the 721 creating process and the created process. 722 """ 723 724 parent, child = socket.socketpair() 725 for s in [parent, child]: 726 s.setblocking(1) 727 728 pid = os.fork() 729 if pid == 0: 730 parent.close() 731 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 732 else: 733 child.close() 734 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 735 736 def exit(channel): 737 738 """ 739 Terminate a created process, closing the given 'channel'. 740 """ 741 742 channel.close() 743 os._exit(0) 744 745 def start(callable, *args, **kw): 746 747 """ 748 Create a new process which shall start running in the given 'callable'. 749 Additional arguments to the 'callable' can be given as additional arguments 750 to this function. 751 752 Return a communications channel to the creating process. For the created 753 process, supply a channel as the 'channel' parameter in the given 'callable' 754 so that it may send data back to the creating process. 755 """ 756 757 channel = create() 758 if channel.pid == 0: 759 try: 760 try: 761 callable(channel, *args, **kw) 762 except: 763 exc_type, exc_value, exc_traceback = sys.exc_info() 764 channel.send(exc_value) 765 finally: 766 exit(channel) 767 else: 768 return channel 769 770 def waitall(): 771 772 "Wait for all created processes to terminate." 773 774 try: 775 while 1: 776 os.wait() 777 except OSError: 778 pass 779 780 def pmap(callable, sequence, limit=None): 781 782 """ 783 A parallel version of the built-in map function with an optional process 784 'limit'. The given 'callable' should not be parallel-aware (that is, have a 785 'channel' parameter) since it will be wrapped for parallel communications 786 before being invoked. 787 788 Return the processed 'sequence' where each element in the sequence is 789 processed by a different process. 790 """ 791 792 mymap = Map(limit=limit) 793 return mymap(callable, sequence) 794 795 # vim: tabstop=4 expandtab shiftwidth=4