1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = pprocess.create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 pprocess.exit(channel) 64 else: 65 # This code is run by the creating/calling process. 66 # Read from and write to the channel to communicate with the created 67 # process. 68 ... 69 70 Message Exchanges 71 ----------------- 72 73 When creating many processes, each providing results for the consumption of the 74 main process, the collection of those results in an efficient fashion can be 75 problematic: if some processes take longer than others, and if we decide to read 76 from those processes when they are not ready instead of other processes which 77 are ready, the whole activity will take much longer than necessary. 78 79 One solution to the problem of knowing when to read from channels is to create 80 an Exchange object, optionally initialising it with a list of channels through 81 which data is expected to arrive: 82 83 exchange = pprocess.Exchange() # populate the exchange later 84 exchange = pprocess.Exchange(channels) # populate the exchange with channels 85 86 We can add channels to the exchange using the add method: 87 88 exchange.add(channel) 89 90 To test whether an exchange is active - that is, whether it is actually 91 monitoring any channels - we can use the active method which returns all 92 channels being monitored by the exchange: 93 94 channels = exchange.active() 95 96 We may then check the exchange to see whether any data is ready to be received; 97 for example: 98 99 for channel in exchange.ready(): 100 # Read from and write to the channel. 101 ... 102 103 If we do not wish to wait indefinitely for a list of channels, we can set a 104 timeout value as an argument to the ready method (as a floating point number 105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 106 in the select module's select function documentation). 107 108 Convenient Message Exchanges 109 ---------------------------- 110 111 A convenient form of message exchanges can be adopted by defining a subclass of 112 the Exchange class and defining a particular method: 113 114 class MyExchange(pprocess.Exchange): 115 def store_data(self, channel): 116 data = channel.receive() 117 # Do something with data here. 118 119 The exact operations performed on the received data might be as simple as 120 storing it on an instance attribute. To make use of the exchange, we would 121 instantiate it as usual: 122 123 exchange = MyExchange() # populate the exchange later 124 exchange = MyExchange(limit=10) # set a limit for later population 125 126 The exchange can now be used in a simpler fashion than that shown above. We can 127 add channels as before using the add method, or we can choose to only add 128 channels if the specified limit of channels is not exceeded: 129 130 exchange.add(channel) # add a channel as normal 131 exchange.add_wait(channel) # add a channel, waiting if the limit would be 132 # exceeded 133 134 Or we can request that the exchange create a channel on our behalf: 135 136 channel = exchange.create() 137 138 We can even start processes and monitor channels without ever handling the 139 channel ourselves: 140 141 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 142 143 We can explicitly wait for "free space" for channels by calling the wait method, 144 although the start and add_wait methods make this less interesting: 145 146 exchange.wait() 147 148 Finally, when finishing the computation, we can choose to merely call the finish 149 method and have the remaining data processed automatically: 150 151 exchange.finish() 152 153 Clearly, this approach is less flexible but more convenient than the raw message 154 exchange API as described above. However, it permits much simpler and clearer 155 code. 156 157 Exchanges as Queues 158 ------------------- 159 160 Instead of having to subclass the pprocess.Exchange class and to define the 161 store_data method, it might be more desirable to let the exchange manage the 162 communications between created and creating processes and to let the creating 163 process just consume received data as it arrives, without particular regard for 164 the order of the received data - perhaps the creating process has its own way of 165 managing such issues. 166 167 For such situations, the Queue class may be instantiated and channels added to 168 the queue using the various methods provided: 169 170 queue = pprocess.Queue(limit=10) 171 channel = queue.create() 172 if channel: 173 # Do some computation. 174 pprocess.exit(channel) 175 176 The results can then be consumed by treating the queue like an iterator: 177 178 for result in queue: 179 # Capture each result. 180 181 This approach does not, of course, require the direct handling of channels. One 182 could instead use the start method on the queue to create processes and to 183 initiate computations (since a queue is merely an enhanced exchange with a 184 specific implementation of the store_data method). 185 186 Managed Callables 187 ----------------- 188 189 A further simplification of the above convenient use of message exchanges 190 involves the creation of callables (eg. functions) which are automatically 191 monitored by an exchange. We create such a callable by calling the manage method 192 on an exchange: 193 194 myfn = exchange.manage(fn) 195 196 This callable can then be invoked instead of using the exchange's start method: 197 198 myfn(arg1, arg2, named1=value1, named2=value2) 199 200 The exchange's finish method can be used as usual to process incoming data. 201 202 Making Existing Functions Parallel 203 ---------------------------------- 204 205 In making a program parallel, existing functions which only return results can 206 be manually modified to accept and use channels to communicate results back to 207 the main process. However, a simple alternative is to use the MakeParallel class 208 to provide a wrapper around unmodified functions which will return the results 209 from those functions in the channels provided. For example: 210 211 fn = pprocess.MakeParallel(originalfn) 212 213 Map-style Processing 214 -------------------- 215 216 In situations where a callable would normally be used in conjunction with the 217 Python built-in map function, an alternative solution can be adopted by using 218 the pmap function: 219 220 pprocess.pmap(fn, sequence) 221 222 Here, the sequence would have to contain elements that each contain the required 223 parameters of the specified callable, fn. Note that the callable does not need 224 to be a parallel-aware function which has a channel argument: the pmap function 225 automatically wraps the given callable internally. 226 227 Signals and Waiting 228 ------------------- 229 230 When created/child processes terminate, one would typically want to be informed 231 of such conditions using a signal handler. Unfortunately, Python seems to have 232 issues with restartable reads from file descriptors when interrupted by signals: 233 234 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 235 http://twistedmatrix.com/bugs/issue733 236 237 Select and Poll 238 --------------- 239 240 The exact combination of conditions indicating closed pipes remains relatively 241 obscure. Here is a message/thread describing them (in the context of another 242 topic): 243 244 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 245 246 It would seem, from using sockets and from studying the asyncore module, that 247 sockets are more predictable than pipes. 248 249 Notes about poll implementations can be found here: 250 251 http://www.greenend.org.uk/rjk/2001/06/poll.html 252 """ 253 254 __version__ = "0.3" 255 256 import os 257 import sys 258 import select 259 import socket 260 261 try: 262 import cPickle as pickle 263 except ImportError: 264 import pickle 265 266 # Communications. 267 268 class AcknowledgementError(Exception): 269 pass 270 271 class Channel: 272 273 "A communications channel." 274 275 def __init__(self, pid, read_pipe, write_pipe): 276 277 """ 278 Initialise the channel with a process identifier 'pid', a 'read_pipe' 279 from which messages will be received, and a 'write_pipe' into which 280 messages will be sent. 281 """ 282 283 self.pid = pid 284 self.read_pipe = read_pipe 285 self.write_pipe = write_pipe 286 self.closed = 0 287 288 def __del__(self): 289 290 # Since signals don't work well with I/O, we close pipes and wait for 291 # created processes upon finalisation. 292 293 self.close() 294 295 def close(self): 296 297 "Explicitly close the channel." 298 299 if not self.closed: 300 self.closed = 1 301 self.read_pipe.close() 302 self.write_pipe.close() 303 #self.wait(os.WNOHANG) 304 305 def wait(self, options=0): 306 307 "Wait for the created process, if any, to exit." 308 309 if self.pid != 0: 310 try: 311 os.waitpid(self.pid, options) 312 except OSError: 313 pass 314 315 def _send(self, obj): 316 317 "Send the given object 'obj' through the channel." 318 319 pickle.dump(obj, self.write_pipe) 320 self.write_pipe.flush() 321 322 def send(self, obj): 323 324 """ 325 Send the given object 'obj' through the channel. Then wait for an 326 acknowledgement. (The acknowledgement makes the caller wait, thus 327 preventing processes from exiting and disrupting the communications 328 channel and losing data.) 329 """ 330 331 self._send(obj) 332 if self._receive() != "OK": 333 raise AcknowledgementError, obj 334 335 def _receive(self): 336 337 "Receive an object through the channel, returning the object." 338 339 obj = pickle.load(self.read_pipe) 340 if isinstance(obj, Exception): 341 raise obj 342 else: 343 return obj 344 345 def receive(self): 346 347 """ 348 Receive an object through the channel, returning the object. Send an 349 acknowledgement of receipt. (The acknowledgement makes the sender wait, 350 thus preventing processes from exiting and disrupting the communications 351 channel and losing data.) 352 """ 353 354 try: 355 obj = self._receive() 356 return obj 357 finally: 358 self._send("OK") 359 360 # Management of processes and communications. 361 362 class Exchange: 363 364 """ 365 A communications exchange that can be used to detect channels which are 366 ready to communicate. Subclasses of this class can define the 'store_data' 367 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 368 """ 369 370 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 371 372 """ 373 Initialise the exchange with an optional list of 'channels'. 374 375 If the optional 'limit' is specified, restrictions on the addition of 376 new channels can be enforced and observed through the 'add_wait', 'wait' 377 and 'finish' methods. To make use of these methods, create a subclass of 378 this class and define a working 'store_data' method. 379 380 If the optional 'reuse' parameter is set to a true value, channels and 381 processes will be reused for waiting computations. 382 383 If the optional 'autoclose' parameter is set to a false value, channels 384 will not be closed automatically when they are removed from the exchange 385 - by default they are closed when removed. 386 """ 387 388 self.limit = limit 389 self.reuse = reuse 390 self.autoclose = autoclose 391 self.waiting = [] 392 self.readables = {} 393 self.removed = [] 394 self.poller = select.poll() 395 for channel in channels or []: 396 self.add(channel) 397 398 def add(self, channel): 399 400 "Add the given 'channel' to the exchange." 401 402 self.readables[channel.read_pipe.fileno()] = channel 403 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 404 405 def active(self): 406 407 "Return a list of active channels." 408 409 return self.readables.values() 410 411 def ready(self, timeout=None): 412 413 """ 414 Wait for a period of time specified by the optional 'timeout' (or until 415 communication is possible) and return a list of channels which are ready 416 to be read from. 417 """ 418 419 fds = self.poller.poll(timeout) 420 readables = [] 421 self.removed = [] 422 423 for fd, status in fds: 424 channel = self.readables[fd] 425 removed = 0 426 427 # Remove ended/error channels. 428 429 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 430 self.remove(channel) 431 self.removed.append(channel) 432 removed = 1 433 434 # Record readable channels. 435 436 if status & select.POLLIN: 437 if not (removed and self.autoclose): 438 readables.append(channel) 439 440 return readables 441 442 def remove(self, channel): 443 444 """ 445 Remove the given 'channel' from the exchange. 446 """ 447 448 del self.readables[channel.read_pipe.fileno()] 449 self.poller.unregister(channel.read_pipe.fileno()) 450 if self.autoclose: 451 channel.close() 452 channel.wait() 453 454 # Enhanced exchange methods involving channel limits. 455 456 def add_wait(self, channel): 457 458 """ 459 Add the given 'channel' to the exchange, waiting if the limit on active 460 channels would be exceeded by adding the channel. 461 """ 462 463 self.wait() 464 self.add(channel) 465 466 def wait(self): 467 468 """ 469 Test for the limit on channels, blocking and reading incoming data until 470 the number of channels is below the limit. 471 """ 472 473 # If limited, block until channels have been closed. 474 475 while self.limit is not None and len(self.active()) >= self.limit: 476 self.store() 477 478 def start_waiting(self, channel): 479 480 """ 481 Start a waiting process given the reception of data on the given 482 'channel'. 483 """ 484 485 if self.waiting: 486 callable, args, kw = self.waiting.pop() 487 488 # Try and reuse existing channels if possible. 489 490 if self.reuse: 491 channel.send((args, kw)) 492 else: 493 self.add(start(callable, *args, **kw)) 494 495 # Where channels are being reused, but where no processes are waiting 496 # any more, send a special value to tell them to quit. 497 498 elif self.reuse: 499 channel.send(None) 500 501 def finish(self): 502 503 """ 504 Finish the use of the exchange by waiting for all channels to complete. 505 """ 506 507 while self.active(): 508 self.store() 509 510 def store(self): 511 512 "For each ready channel, process the incoming data." 513 514 for channel in self.ready(): 515 self.store_data(channel) 516 self.start_waiting(channel) 517 518 def store_data(self, channel): 519 520 """ 521 Store incoming data from the specified 'channel'. In subclasses of this 522 class, such data could be stored using instance attributes. 523 """ 524 525 raise NotImplementedError, "store_data" 526 527 # Convenience methods. 528 529 def start(self, callable, *args, **kw): 530 531 """ 532 Using pprocess.start, create a new process for the given 'callable' 533 using any additional arguments provided. Then, monitor the channel 534 created between this process and the created process. 535 """ 536 537 if self.limit is not None and len(self.active()) >= self.limit: 538 self.waiting.insert(0, (callable, args, kw)) 539 return 540 541 self.add_wait(start(callable, *args, **kw)) 542 543 def create(self): 544 545 """ 546 Using pprocess.create, create a new process and return the created 547 communications channel to the created process. In the creating process, 548 return None - the channel receiving data from the created process will 549 be automatically managed by this exchange. 550 """ 551 552 channel = create() 553 if channel.pid == 0: 554 return channel 555 else: 556 self.add_wait(channel) 557 return None 558 559 def manage(self, callable): 560 561 """ 562 Wrap the given 'callable' in an object which can then be called in the 563 same way as 'callable', but with new processes and communications 564 managed automatically. 565 """ 566 567 return ManagedCallable(callable, self) 568 569 class ManagedCallable: 570 571 "A callable managed by an exchange." 572 573 def __init__(self, callable, exchange): 574 575 """ 576 Wrap the given 'callable', using the given 'exchange' to monitor the 577 channels created for communications between this and the created 578 processes. Note that the 'callable' must be parallel-aware (that is, 579 have a 'channel' parameter). Use the MakeParallel class to wrap other 580 kinds of callable objects. 581 """ 582 583 self.callable = callable 584 self.exchange = exchange 585 586 def __call__(self, *args, **kw): 587 588 "Invoke the callable with the supplied arguments." 589 590 self.exchange.start(self.callable, *args, **kw) 591 592 # Abstractions and utilities. 593 594 class Map(Exchange): 595 596 "An exchange which can be used like the built-in 'map' function." 597 598 def __init__(self, *args, **kw): 599 Exchange.__init__(self, *args, **kw) 600 self.init() 601 602 def init(self): 603 604 "Remember the channel addition order to order output." 605 606 self.channel_number = 0 607 self.channels = {} 608 self.results = [] 609 610 def add(self, channel): 611 612 "Add the given 'channel' to the exchange." 613 614 Exchange.add(self, channel) 615 self.channels[channel] = self.channel_number 616 self.channel_number += 1 617 618 def start(self, callable, *args, **kw): 619 620 """ 621 Using pprocess.start, create a new process for the given 'callable' 622 using any additional arguments provided. Then, monitor the channel 623 created between this process and the created process. 624 """ 625 626 self.results.append(None) # placeholder 627 Exchange.start(self, callable, *args, **kw) 628 629 def create(self): 630 631 """ 632 Using pprocess.create, create a new process and return the created 633 communications channel to the created process. In the creating process, 634 return None - the channel receiving data from the created process will 635 be automatically managed by this exchange. 636 """ 637 638 self.results.append(None) # placeholder 639 return Exchange.create(self) 640 641 def __call__(self, callable, sequence): 642 643 "Wrap and invoke 'callable' for each element in the 'sequence'." 644 645 if not isinstance(callable, MakeParallel): 646 wrapped = MakeParallel(callable) 647 else: 648 wrapped = callable 649 650 self.init() 651 652 # Start processes for each element in the sequence. 653 654 for i in sequence: 655 self.start(wrapped, i) 656 657 # Access to the results occurs through this object. 658 659 return self 660 661 def __getitem__(self, i): 662 self.finish() 663 return self.results[i] 664 665 def __iter__(self): 666 self.finish() 667 return iter(self.results) 668 669 def store_data(self, channel): 670 671 "Accumulate the incoming data, associating results with channels." 672 673 data = channel.receive() 674 self.results[self.channels[channel]] = data 675 del self.channels[channel] 676 677 class Queue(Exchange): 678 679 """ 680 An exchange acting as a queue, making data from created processes available 681 in the order in which it is received. 682 """ 683 684 def __init__(self, *args, **kw): 685 Exchange.__init__(self, *args, **kw) 686 self.queue = [] 687 688 def store_data(self, channel): 689 690 "Accumulate the incoming data, associating results with channels." 691 692 data = channel.receive() 693 self.queue.insert(0, data) 694 695 def __iter__(self): 696 return self 697 698 def next(self): 699 700 "Return the next element in the queue." 701 702 if self.queue: 703 return self.queue.pop() 704 while self.active(): 705 self.store() 706 if self.queue: 707 return self.queue.pop() 708 else: 709 raise StopIteration 710 711 class MakeParallel: 712 713 "A wrapper around functions making them able to communicate results." 714 715 def __init__(self, callable): 716 717 """ 718 Initialise the wrapper with the given 'callable'. This object will then 719 be able to accept a 'channel' parameter when invoked, and to forward the 720 result of the given 'callable' via the channel provided back to the 721 invoking process. 722 """ 723 724 self.callable = callable 725 726 def __call__(self, channel, *args, **kw): 727 728 "Invoke the callable and return its result via the given 'channel'." 729 730 channel.send(self.callable(*args, **kw)) 731 732 # Utility functions. 733 734 def create(): 735 736 """ 737 Create a new process, returning a communications channel to both the 738 creating process and the created process. 739 """ 740 741 parent, child = socket.socketpair() 742 for s in [parent, child]: 743 s.setblocking(1) 744 745 pid = os.fork() 746 if pid == 0: 747 parent.close() 748 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 749 else: 750 child.close() 751 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 752 753 def exit(channel): 754 755 """ 756 Terminate a created process, closing the given 'channel'. 757 """ 758 759 channel.close() 760 os._exit(0) 761 762 def start(callable, *args, **kw): 763 764 """ 765 Create a new process which shall start running in the given 'callable'. 766 Additional arguments to the 'callable' can be given as additional arguments 767 to this function. 768 769 Return a communications channel to the creating process. For the created 770 process, supply a channel as the 'channel' parameter in the given 'callable' 771 so that it may send data back to the creating process. 772 """ 773 774 channel = create() 775 if channel.pid == 0: 776 try: 777 try: 778 callable(channel, *args, **kw) 779 except: 780 exc_type, exc_value, exc_traceback = sys.exc_info() 781 channel.send(exc_value) 782 finally: 783 exit(channel) 784 else: 785 return channel 786 787 def waitall(): 788 789 "Wait for all created processes to terminate." 790 791 try: 792 while 1: 793 os.wait() 794 except OSError: 795 pass 796 797 def pmap(callable, sequence, limit=None): 798 799 """ 800 A parallel version of the built-in map function with an optional process 801 'limit'. The given 'callable' should not be parallel-aware (that is, have a 802 'channel' parameter) since it will be wrapped for parallel communications 803 before being invoked. 804 805 Return the processed 'sequence' where each element in the sequence is 806 processed by a different process. 807 """ 808 809 mymap = Map(limit=limit) 810 return mymap(callable, sequence) 811 812 # vim: tabstop=4 expandtab shiftwidth=4