1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 Thread-style Processing 25 ----------------------- 26 27 To create new processes to run a function or any callable object, specify the 28 "callable" and any arguments as follows: 29 30 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 31 32 This returns a channel which can then be used to communicate with the created 33 process. Meanwhile, in the created process, the given callable will be invoked 34 with another channel as its first argument followed by the specified arguments: 35 36 def fn(channel, arg1, arg2, named1, named2): 37 # Read from and write to the channel. 38 # Return value is ignored. 39 ... 40 41 Fork-style Processing 42 --------------------- 43 44 To create new processes in a similar way to that employed when using os.fork 45 (ie. the fork system call on various operating systems), use the following 46 method: 47 48 channel = pprocess.create() 49 if channel.pid == 0: 50 # This code is run by the created process. 51 # Read from and write to the channel to communicate with the 52 # creating/calling process. 53 # An explicit exit of the process may be desirable to prevent the process 54 # from running code which is intended for the creating/calling process. 55 ... 56 pprocess.exit(channel) 57 else: 58 # This code is run by the creating/calling process. 59 # Read from and write to the channel to communicate with the created 60 # process. 61 ... 62 63 Message Exchanges 64 ----------------- 65 66 When creating many processes, each providing results for the consumption of the 67 main process, the collection of those results in an efficient fashion can be 68 problematic: if some processes take longer than others, and if we decide to read 69 from those processes when they are not ready instead of other processes which 70 are ready, the whole activity will take much longer than necessary. 71 72 One solution to the problem of knowing when to read from channels is to create 73 an Exchange object, optionally initialising it with a list of channels through 74 which data is expected to arrive: 75 76 exchange = pprocess.Exchange() # populate the exchange later 77 exchange = pprocess.Exchange(channels) # populate the exchange with channels 78 79 We can add channels to the exchange using the add method: 80 81 exchange.add(channel) 82 83 To test whether an exchange is active - that is, whether it is actually 84 monitoring any channels - we can use the active method which returns all 85 channels being monitored by the exchange: 86 87 channels = exchange.active() 88 89 We may then check the exchange to see whether any data is ready to be received; 90 for example: 91 92 for channel in exchange.ready(): 93 # Read from and write to the channel. 94 ... 95 96 If we do not wish to wait indefinitely for a list of channels, we can set a 97 timeout value as an argument to the ready method (as a floating point number 98 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 99 in the select module's select function documentation). 100 101 Convenient Message Exchanges 102 ---------------------------- 103 104 A convenient form of message exchanges can be adopted by defining a subclass of 105 the Exchange class and defining a particular method: 106 107 class MyExchange(pprocess.Exchange): 108 def store_data(self, channel): 109 data = channel.receive() 110 # Do something with data here. 111 112 The exact operations performed on the received data might be as simple as 113 storing it on an instance attribute. To make use of the exchange, we would 114 instantiate it as usual: 115 116 exchange = MyExchange() # populate the exchange later 117 exchange = MyExchange(limit=10) # set a limit for later population 118 119 The exchange can now be used in a simpler fashion than that shown above. We can 120 add channels as before using the add method, or we can choose to only add 121 channels if the specified limit of channels is not exceeded: 122 123 exchange.add(channel) # add a channel as normal 124 exchange.add_wait(channel) # add a channel, waiting if the limit would be 125 # exceeded 126 127 Or we can request that the exchange create a channel on our behalf: 128 129 channel = exchange.create() 130 131 We can even start processes and monitor channels without ever handling the 132 channel ourselves: 133 134 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 135 136 We can explicitly wait for "free space" for channels by calling the wait method, 137 although the start and add_wait methods make this less interesting: 138 139 exchange.wait() 140 141 Finally, when finishing the computation, we can choose to merely call the finish 142 method and have the remaining data processed automatically: 143 144 exchange.finish() 145 146 Clearly, this approach is less flexible but more convenient than the raw message 147 exchange API as described above. However, it permits much simpler and clearer 148 code. 149 150 Exchanges as Queues 151 ------------------- 152 153 Instead of having to subclass the pprocess.Exchange class and to define the 154 store_data method, it might be more desirable to let the exchange manage the 155 communications between created and creating processes and to let the creating 156 process just consume received data as it arrives, without particular regard for 157 the order of the received data - perhaps the creating process has its own way of 158 managing such issues. 159 160 For such situations, the Queue class may be instantiated and channels added to 161 the queue using the various methods provided: 162 163 queue = pprocess.Queue(limit=10) 164 channel = queue.create() 165 if channel: 166 # Do some computation. 167 pprocess.exit(channel) 168 169 The results can then be consumed by treating the queue like an iterator: 170 171 for result in queue: 172 # Capture each result. 173 174 This approach does not, of course, require the direct handling of channels. One 175 could instead use the start method on the queue to create processes and to 176 initiate computations (since a queue is merely an enhanced exchange with a 177 specific implementation of the store_data method). 178 179 Exchanges as Maps 180 ----------------- 181 182 Where the above Queue class appears like an attractive solution for the 183 management of the results of computations, but where the order of their 184 consumption by the creating process remains important, the Map class may offer a 185 suitable way of collecting and accessing results: 186 187 results = pprocess.Map(limit=10) 188 for value in inputs: 189 results.start(fn, args) 190 191 The results can then be consumed in an order corresponding to the order of the 192 computations which produced them: 193 194 for result in results: 195 # Process each result. 196 197 Internally, the Map object records a particular ordering of channels, ensuring 198 that the received results can be mapped to this ordering, and that the results 199 can be made available with this ordering preserved. 200 201 Managed Callables 202 ----------------- 203 204 A further simplification of the above convenient use of message exchanges 205 involves the creation of callables (eg. functions) which are automatically 206 monitored by an exchange. We create such a callable by calling the manage method 207 on an exchange: 208 209 myfn = exchange.manage(fn) 210 211 This callable can then be invoked instead of using the exchange's start method: 212 213 myfn(arg1, arg2, named1=value1, named2=value2) 214 215 The exchange's finish method can be used as usual to process incoming data. 216 217 Making Existing Functions Parallel 218 ---------------------------------- 219 220 In making a program parallel, existing functions which only return results can 221 be manually modified to accept and use channels to communicate results back to 222 the main process. However, a simple alternative is to use the MakeParallel class 223 to provide a wrapper around unmodified functions which will return the results 224 from those functions in the channels provided. For example: 225 226 fn = pprocess.MakeParallel(originalfn) 227 228 Map-style Processing 229 -------------------- 230 231 In situations where a callable would normally be used in conjunction with the 232 Python built-in map function, an alternative solution can be adopted by using 233 the pmap function: 234 235 pprocess.pmap(fn, sequence) 236 237 Here, the sequence would have to contain elements that each contain the required 238 parameters of the specified callable, fn. Note that the callable does not need 239 to be a parallel-aware function which has a channel argument: the pmap function 240 automatically wraps the given callable internally. 241 242 Reusing Processes and Channels 243 ------------------------------ 244 245 So far, all parallel computations have been done with newly-created processes. 246 However, this can seem somewhat inefficient, especially if processes are being 247 continually created and destroyed (although if this happens too often, the 248 amount of work done by each process may be too little, anyway). One solution is 249 to retain processes after they have done their work and request that they 250 perform more work for each new parallel task or invocation. To enable the reuse 251 of processes in this way, a special keyword argument may be specified when 252 creating Exchange objects (and subclasses such as Map and Queue). For example: 253 254 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes 255 256 Code invoked through such exchanges must be aware of channels and be constructed 257 in such a way that it does not terminate after sending a result back to the 258 creating process. Instead, it should repeatedly wait for subsequent sets of 259 parameters (compatible with those either in the signature of a callable or with 260 the original values read from the channel). Reusable code is terminated when the 261 special value of None is sent from the creating process to the created process, 262 indicating that no more parameters will be sent; this should cause the code to 263 terminate. 264 265 Making Existing Functions Parallel and Reusable 266 ----------------------------------------------- 267 268 An easier way of making reusable code sections for parallel use is to employ the 269 MakeReusable class to wrap an existing callable: 270 271 fn = pprocess.MakeReusable(originalfn) 272 273 This wraps the callable in a similar fashion to MakeParallel, but provides the 274 necessary mechanisms described above for reusable code. 275 276 Signals and Waiting 277 ------------------- 278 279 When created/child processes terminate, one would typically want to be informed 280 of such conditions using a signal handler. Unfortunately, Python seems to have 281 issues with restartable reads from file descriptors when interrupted by signals: 282 283 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 284 http://twistedmatrix.com/bugs/issue733 285 286 Select and Poll 287 --------------- 288 289 The exact combination of conditions indicating closed pipes remains relatively 290 obscure. Here is a message/thread describing them (in the context of another 291 topic): 292 293 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 294 295 It would seem, from using sockets and from studying the asyncore module, that 296 sockets are more predictable than pipes. 297 298 Notes about poll implementations can be found here: 299 300 http://www.greenend.org.uk/rjk/2001/06/poll.html 301 """ 302 303 __version__ = "0.3" 304 305 import os 306 import sys 307 import select 308 import socket 309 310 try: 311 import cPickle as pickle 312 except ImportError: 313 import pickle 314 315 # Communications. 316 317 class AcknowledgementError(Exception): 318 pass 319 320 class Channel: 321 322 "A communications channel." 323 324 def __init__(self, pid, read_pipe, write_pipe): 325 326 """ 327 Initialise the channel with a process identifier 'pid', a 'read_pipe' 328 from which messages will be received, and a 'write_pipe' into which 329 messages will be sent. 330 """ 331 332 self.pid = pid 333 self.read_pipe = read_pipe 334 self.write_pipe = write_pipe 335 self.closed = 0 336 337 def __del__(self): 338 339 # Since signals don't work well with I/O, we close pipes and wait for 340 # created processes upon finalisation. 341 342 self.close() 343 344 def close(self): 345 346 "Explicitly close the channel." 347 348 if not self.closed: 349 self.closed = 1 350 self.read_pipe.close() 351 self.write_pipe.close() 352 #self.wait(os.WNOHANG) 353 354 def wait(self, options=0): 355 356 "Wait for the created process, if any, to exit." 357 358 if self.pid != 0: 359 try: 360 os.waitpid(self.pid, options) 361 except OSError: 362 pass 363 364 def _send(self, obj): 365 366 "Send the given object 'obj' through the channel." 367 368 pickle.dump(obj, self.write_pipe) 369 self.write_pipe.flush() 370 371 def send(self, obj): 372 373 """ 374 Send the given object 'obj' through the channel. Then wait for an 375 acknowledgement. (The acknowledgement makes the caller wait, thus 376 preventing processes from exiting and disrupting the communications 377 channel and losing data.) 378 """ 379 380 self._send(obj) 381 if self._receive() != "OK": 382 raise AcknowledgementError, obj 383 384 def _receive(self): 385 386 "Receive an object through the channel, returning the object." 387 388 obj = pickle.load(self.read_pipe) 389 if isinstance(obj, Exception): 390 raise obj 391 else: 392 return obj 393 394 def receive(self): 395 396 """ 397 Receive an object through the channel, returning the object. Send an 398 acknowledgement of receipt. (The acknowledgement makes the sender wait, 399 thus preventing processes from exiting and disrupting the communications 400 channel and losing data.) 401 """ 402 403 try: 404 obj = self._receive() 405 return obj 406 finally: 407 self._send("OK") 408 409 # Management of processes and communications. 410 411 class Exchange: 412 413 """ 414 A communications exchange that can be used to detect channels which are 415 ready to communicate. Subclasses of this class can define the 'store_data' 416 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 417 """ 418 419 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 420 421 """ 422 Initialise the exchange with an optional list of 'channels'. 423 424 If the optional 'limit' is specified, restrictions on the addition of 425 new channels can be enforced and observed through the 'add_wait', 'wait' 426 and 'finish' methods. To make use of these methods, create a subclass of 427 this class and define a working 'store_data' method. 428 429 If the optional 'reuse' parameter is set to a true value, channels and 430 processes will be reused for waiting computations. 431 432 If the optional 'autoclose' parameter is set to a false value, channels 433 will not be closed automatically when they are removed from the exchange 434 - by default they are closed when removed. 435 """ 436 437 self.limit = limit 438 self.reuse = reuse 439 self.autoclose = autoclose 440 self.waiting = [] 441 self.readables = {} 442 self.removed = [] 443 self.poller = select.poll() 444 for channel in channels or []: 445 self.add(channel) 446 447 def add(self, channel): 448 449 "Add the given 'channel' to the exchange." 450 451 self.readables[channel.read_pipe.fileno()] = channel 452 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 453 454 def active(self): 455 456 "Return a list of active channels." 457 458 return self.readables.values() 459 460 def ready(self, timeout=None): 461 462 """ 463 Wait for a period of time specified by the optional 'timeout' (or until 464 communication is possible) and return a list of channels which are ready 465 to be read from. 466 """ 467 468 fds = self.poller.poll(timeout) 469 readables = [] 470 self.removed = [] 471 472 for fd, status in fds: 473 channel = self.readables[fd] 474 removed = 0 475 476 # Remove ended/error channels. 477 478 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 479 self.remove(channel) 480 self.removed.append(channel) 481 removed = 1 482 483 # Record readable channels. 484 485 if status & select.POLLIN: 486 if not (removed and self.autoclose): 487 readables.append(channel) 488 489 return readables 490 491 def remove(self, channel): 492 493 """ 494 Remove the given 'channel' from the exchange. 495 """ 496 497 del self.readables[channel.read_pipe.fileno()] 498 self.poller.unregister(channel.read_pipe.fileno()) 499 if self.autoclose: 500 channel.close() 501 channel.wait() 502 503 # Enhanced exchange methods involving channel limits. 504 505 def add_wait(self, channel): 506 507 """ 508 Add the given 'channel' to the exchange, waiting if the limit on active 509 channels would be exceeded by adding the channel. 510 """ 511 512 self.wait() 513 self.add(channel) 514 515 def wait(self): 516 517 """ 518 Test for the limit on channels, blocking and reading incoming data until 519 the number of channels is below the limit. 520 """ 521 522 # If limited, block until channels have been closed. 523 524 while self.limit is not None and len(self.active()) >= self.limit: 525 self.store() 526 527 def start_waiting(self, channel): 528 529 """ 530 Start a waiting process given the reception of data on the given 531 'channel'. 532 """ 533 534 if self.waiting: 535 callable, args, kw = self.waiting.pop() 536 537 # Try and reuse existing channels if possible. 538 539 if self.reuse: 540 541 # Re-add the channel - this may update information related to 542 # the channel in subclasses. 543 544 self.add(channel) 545 channel.send((args, kw)) 546 else: 547 self.add(start(callable, *args, **kw)) 548 549 # Where channels are being reused, but where no processes are waiting 550 # any more, send a special value to tell them to quit. 551 552 elif self.reuse: 553 channel.send(None) 554 555 def finish(self): 556 557 """ 558 Finish the use of the exchange by waiting for all channels to complete. 559 """ 560 561 while self.active(): 562 self.store() 563 564 def store(self): 565 566 "For each ready channel, process the incoming data." 567 568 for channel in self.ready(): 569 self.store_data(channel) 570 self.start_waiting(channel) 571 572 def store_data(self, channel): 573 574 """ 575 Store incoming data from the specified 'channel'. In subclasses of this 576 class, such data could be stored using instance attributes. 577 """ 578 579 raise NotImplementedError, "store_data" 580 581 # Convenience methods. 582 583 def start(self, callable, *args, **kw): 584 585 """ 586 Using pprocess.start, create a new process for the given 'callable' 587 using any additional arguments provided. Then, monitor the channel 588 created between this process and the created process. 589 """ 590 591 if self.limit is not None and len(self.active()) >= self.limit: 592 self.waiting.insert(0, (callable, args, kw)) 593 return 594 595 self.add_wait(start(callable, *args, **kw)) 596 597 def create(self): 598 599 """ 600 Using pprocess.create, create a new process and return the created 601 communications channel to the created process. In the creating process, 602 return None - the channel receiving data from the created process will 603 be automatically managed by this exchange. 604 """ 605 606 channel = create() 607 if channel.pid == 0: 608 return channel 609 else: 610 self.add_wait(channel) 611 return None 612 613 def manage(self, callable): 614 615 """ 616 Wrap the given 'callable' in an object which can then be called in the 617 same way as 'callable', but with new processes and communications 618 managed automatically. 619 """ 620 621 return ManagedCallable(callable, self) 622 623 class ManagedCallable: 624 625 "A callable managed by an exchange." 626 627 def __init__(self, callable, exchange): 628 629 """ 630 Wrap the given 'callable', using the given 'exchange' to monitor the 631 channels created for communications between this and the created 632 processes. Note that the 'callable' must be parallel-aware (that is, 633 have a 'channel' parameter). Use the MakeParallel class to wrap other 634 kinds of callable objects. 635 """ 636 637 self.callable = callable 638 self.exchange = exchange 639 640 def __call__(self, *args, **kw): 641 642 "Invoke the callable with the supplied arguments." 643 644 self.exchange.start(self.callable, *args, **kw) 645 646 # Abstractions and utilities. 647 648 class Map(Exchange): 649 650 "An exchange which can be used like the built-in 'map' function." 651 652 def __init__(self, *args, **kw): 653 Exchange.__init__(self, *args, **kw) 654 self.init() 655 656 def init(self): 657 658 "Remember the channel addition order to order output." 659 660 self.channel_number = 0 661 self.channels = {} 662 self.results = [] 663 664 def add(self, channel): 665 666 "Add the given 'channel' to the exchange." 667 668 Exchange.add(self, channel) 669 self.channels[channel] = self.channel_number 670 self.channel_number += 1 671 672 def start(self, callable, *args, **kw): 673 674 """ 675 Using pprocess.start, create a new process for the given 'callable' 676 using any additional arguments provided. Then, monitor the channel 677 created between this process and the created process. 678 """ 679 680 self.results.append(None) # placeholder 681 Exchange.start(self, callable, *args, **kw) 682 683 def create(self): 684 685 """ 686 Using pprocess.create, create a new process and return the created 687 communications channel to the created process. In the creating process, 688 return None - the channel receiving data from the created process will 689 be automatically managed by this exchange. 690 """ 691 692 self.results.append(None) # placeholder 693 return Exchange.create(self) 694 695 def __call__(self, callable, sequence): 696 697 "Wrap and invoke 'callable' for each element in the 'sequence'." 698 699 if not isinstance(callable, MakeParallel): 700 wrapped = MakeParallel(callable) 701 else: 702 wrapped = callable 703 704 self.init() 705 706 # Start processes for each element in the sequence. 707 708 for i in sequence: 709 self.start(wrapped, i) 710 711 # Access to the results occurs through this object. 712 713 return self 714 715 def __getitem__(self, i): 716 self.finish() 717 return self.results[i] 718 719 def __iter__(self): 720 self.finish() 721 return iter(self.results) 722 723 def store_data(self, channel): 724 725 "Accumulate the incoming data, associating results with channels." 726 727 data = channel.receive() 728 self.results[self.channels[channel]] = data 729 del self.channels[channel] 730 731 class Queue(Exchange): 732 733 """ 734 An exchange acting as a queue, making data from created processes available 735 in the order in which it is received. 736 """ 737 738 def __init__(self, *args, **kw): 739 Exchange.__init__(self, *args, **kw) 740 self.queue = [] 741 742 def store_data(self, channel): 743 744 "Accumulate the incoming data, associating results with channels." 745 746 data = channel.receive() 747 self.queue.insert(0, data) 748 749 def __iter__(self): 750 return self 751 752 def next(self): 753 754 "Return the next element in the queue." 755 756 if self.queue: 757 return self.queue.pop() 758 while self.active(): 759 self.store() 760 if self.queue: 761 return self.queue.pop() 762 else: 763 raise StopIteration 764 765 class MakeParallel: 766 767 "A wrapper around functions making them able to communicate results." 768 769 def __init__(self, callable): 770 771 """ 772 Initialise the wrapper with the given 'callable'. This object will then 773 be able to accept a 'channel' parameter when invoked, and to forward the 774 result of the given 'callable' via the channel provided back to the 775 invoking process. 776 """ 777 778 self.callable = callable 779 780 def __call__(self, channel, *args, **kw): 781 782 "Invoke the callable and return its result via the given 'channel'." 783 784 channel.send(self.callable(*args, **kw)) 785 786 class MakeReusable(MakeParallel): 787 788 """ 789 A wrapper around functions making them able to communicate results in a 790 reusable fashion. 791 """ 792 793 def __call__(self, channel, *args, **kw): 794 795 "Invoke the callable and return its result via the given 'channel'." 796 797 channel.send(self.callable(*args, **kw)) 798 t = channel.receive() 799 while t is not None: 800 args, kw = t 801 channel.send(self.callable(*args, **kw)) 802 t = channel.receive() 803 804 # Utility functions. 805 806 def create(): 807 808 """ 809 Create a new process, returning a communications channel to both the 810 creating process and the created process. 811 """ 812 813 parent, child = socket.socketpair() 814 for s in [parent, child]: 815 s.setblocking(1) 816 817 pid = os.fork() 818 if pid == 0: 819 parent.close() 820 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 821 else: 822 child.close() 823 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 824 825 def exit(channel): 826 827 """ 828 Terminate a created process, closing the given 'channel'. 829 """ 830 831 channel.close() 832 os._exit(0) 833 834 def start(callable, *args, **kw): 835 836 """ 837 Create a new process which shall start running in the given 'callable'. 838 Additional arguments to the 'callable' can be given as additional arguments 839 to this function. 840 841 Return a communications channel to the creating process. For the created 842 process, supply a channel as the 'channel' parameter in the given 'callable' 843 so that it may send data back to the creating process. 844 """ 845 846 channel = create() 847 if channel.pid == 0: 848 try: 849 try: 850 callable(channel, *args, **kw) 851 except: 852 exc_type, exc_value, exc_traceback = sys.exc_info() 853 channel.send(exc_value) 854 finally: 855 exit(channel) 856 else: 857 return channel 858 859 def waitall(): 860 861 "Wait for all created processes to terminate." 862 863 try: 864 while 1: 865 os.wait() 866 except OSError: 867 pass 868 869 def pmap(callable, sequence, limit=None): 870 871 """ 872 A parallel version of the built-in map function with an optional process 873 'limit'. The given 'callable' should not be parallel-aware (that is, have a 874 'channel' parameter) since it will be wrapped for parallel communications 875 before being invoked. 876 877 Return the processed 'sequence' where each element in the sequence is 878 processed by a different process. 879 """ 880 881 mymap = Map(limit=limit) 882 return mymap(callable, sequence) 883 884 # vim: tabstop=4 expandtab shiftwidth=4