1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = pprocess.create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 pprocess.exit(channel) 64 else: 65 # This code is run by the creating/calling process. 66 # Read from and write to the channel to communicate with the created 67 # process. 68 ... 69 70 Message Exchanges 71 ----------------- 72 73 When creating many processes, each providing results for the consumption of the 74 main process, the collection of those results in an efficient fashion can be 75 problematic: if some processes take longer than others, and if we decide to read 76 from those processes when they are not ready instead of other processes which 77 are ready, the whole activity will take much longer than necessary. 78 79 One solution to the problem of knowing when to read from channels is to create 80 an Exchange object, optionally initialising it with a list of channels through 81 which data is expected to arrive: 82 83 exchange = pprocess.Exchange() # populate the exchange later 84 exchange = pprocess.Exchange(channels) # populate the exchange with channels 85 86 We can add channels to the exchange using the add method: 87 88 exchange.add(channel) 89 90 To test whether an exchange is active - that is, whether it is actually 91 monitoring any channels - we can use the active method which returns all 92 channels being monitored by the exchange: 93 94 channels = exchange.active() 95 96 We may then check the exchange to see whether any data is ready to be received; 97 for example: 98 99 for channel in exchange.ready(): 100 # Read from and write to the channel. 101 ... 102 103 If we do not wish to wait indefinitely for a list of channels, we can set a 104 timeout value as an argument to the ready method (as a floating point number 105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 106 in the select module's select function documentation). 107 108 Convenient Message Exchanges 109 ---------------------------- 110 111 A convenient form of message exchanges can be adopted by defining a subclass of 112 the Exchange class and defining a particular method: 113 114 class MyExchange(pprocess.Exchange): 115 def store_data(self, channel): 116 data = channel.receive() 117 # Do something with data here. 118 119 The exact operations performed on the received data might be as simple as 120 storing it on an instance attribute. To make use of the exchange, we would 121 instantiate it as usual: 122 123 exchange = MyExchange() # populate the exchange later 124 exchange = MyExchange(limit=10) # set a limit for later population 125 126 The exchange can now be used in a simpler fashion than that shown above. We can 127 add channels as before using the add method, or we can choose to only add 128 channels if the specified limit of channels is not exceeded: 129 130 exchange.add(channel) # add a channel as normal 131 exchange.add_wait(channel) # add a channel, waiting if the limit would be 132 # exceeded 133 134 Or we can request that the exchange create a channel on our behalf: 135 136 channel = exchange.create() 137 138 We can even start processes and monitor channels without ever handling the 139 channel ourselves: 140 141 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 142 143 We can explicitly wait for "free space" for channels by calling the wait method, 144 although the start and add_wait methods make this less interesting: 145 146 exchange.wait() 147 148 Finally, when finishing the computation, we can choose to merely call the finish 149 method and have the remaining data processed automatically: 150 151 exchange.finish() 152 153 Clearly, this approach is less flexible but more convenient than the raw message 154 exchange API as described above. However, it permits much simpler and clearer 155 code. 156 157 Exchanges as Queues 158 ------------------- 159 160 Instead of having to subclass the pprocess.Exchange class and to define the 161 store_data method, it might be more desirable to let the exchange manage the 162 communications between created and creating processes and to let the creating 163 process just consume received data as it arrives, without particular regard for 164 the order of the received data - perhaps the creating process has its own way of 165 managing such issues. 166 167 For such situations, the Queue class may be instantiated and channels added to 168 the queue using the various methods provided: 169 170 queue = pprocess.Queue(limit=10) 171 channel = queue.create() 172 if channel: 173 # Do some computation. 174 pprocess.exit(channel) 175 176 The results can then be consumed by treating the queue like an iterator: 177 178 for result in queue: 179 # Capture each result. 180 181 This approach does not, of course, require the direct handling of channels. One 182 could instead use the start method on the queue to create processes and to 183 initiate computations (since a queue is merely an enhanced exchange with a 184 specific implementation of the store_data method). 185 186 Managed Callables 187 ----------------- 188 189 A further simplification of the above convenient use of message exchanges 190 involves the creation of callables (eg. functions) which are automatically 191 monitored by an exchange. We create such a callable by calling the manage method 192 on an exchange: 193 194 myfn = exchange.manage(fn) 195 196 This callable can then be invoked instead of using the exchange's start method: 197 198 myfn(arg1, arg2, named1=value1, named2=value2) 199 200 The exchange's finish method can be used as usual to process incoming data. 201 202 Making Existing Functions Parallel 203 ---------------------------------- 204 205 In making a program parallel, existing functions which only return results can 206 be manually modified to accept and use channels to communicate results back to 207 the main process. However, a simple alternative is to use the MakeParallel class 208 to provide a wrapper around unmodified functions which will return the results 209 from those functions in the channels provided. For example: 210 211 fn = pprocess.MakeParallel(originalfn) 212 213 Map-style Processing 214 -------------------- 215 216 In situations where a callable would normally be used in conjunction with the 217 Python built-in map function, an alternative solution can be adopted by using 218 the pmap function: 219 220 pprocess.pmap(fn, sequence) 221 222 Here, the sequence would have to contain elements that each contain the required 223 parameters of the specified callable, fn. Note that the callable does not need 224 to be a parallel-aware function which has a channel argument: the pmap function 225 automatically wraps the given callable internally. 226 227 Signals and Waiting 228 ------------------- 229 230 When created/child processes terminate, one would typically want to be informed 231 of such conditions using a signal handler. Unfortunately, Python seems to have 232 issues with restartable reads from file descriptors when interrupted by signals: 233 234 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 235 http://twistedmatrix.com/bugs/issue733 236 237 Select and Poll 238 --------------- 239 240 The exact combination of conditions indicating closed pipes remains relatively 241 obscure. Here is a message/thread describing them (in the context of another 242 topic): 243 244 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 245 246 It would seem, from using sockets and from studying the asyncore module, that 247 sockets are more predictable than pipes. 248 249 Notes about poll implementations can be found here: 250 251 http://www.greenend.org.uk/rjk/2001/06/poll.html 252 """ 253 254 __version__ = "0.3" 255 256 import os 257 import sys 258 import select 259 import socket 260 261 try: 262 import cPickle as pickle 263 except ImportError: 264 import pickle 265 266 # Communications. 267 268 class AcknowledgementError(Exception): 269 pass 270 271 class Channel: 272 273 "A communications channel." 274 275 def __init__(self, pid, read_pipe, write_pipe): 276 277 """ 278 Initialise the channel with a process identifier 'pid', a 'read_pipe' 279 from which messages will be received, and a 'write_pipe' into which 280 messages will be sent. 281 """ 282 283 self.pid = pid 284 self.read_pipe = read_pipe 285 self.write_pipe = write_pipe 286 self.closed = 0 287 288 def __del__(self): 289 290 # Since signals don't work well with I/O, we close pipes and wait for 291 # created processes upon finalisation. 292 293 self.close() 294 295 def close(self): 296 297 "Explicitly close the channel." 298 299 if not self.closed: 300 self.closed = 1 301 self.read_pipe.close() 302 self.write_pipe.close() 303 #self.wait(os.WNOHANG) 304 305 def wait(self, options=0): 306 307 "Wait for the created process, if any, to exit." 308 309 if self.pid != 0: 310 try: 311 os.waitpid(self.pid, options) 312 except OSError: 313 pass 314 315 def _send(self, obj): 316 317 "Send the given object 'obj' through the channel." 318 319 pickle.dump(obj, self.write_pipe) 320 self.write_pipe.flush() 321 322 def send(self, obj): 323 324 """ 325 Send the given object 'obj' through the channel. Then wait for an 326 acknowledgement. (The acknowledgement makes the caller wait, thus 327 preventing processes from exiting and disrupting the communications 328 channel and losing data.) 329 """ 330 331 self._send(obj) 332 if self._receive() != "OK": 333 raise AcknowledgementError, obj 334 335 def _receive(self): 336 337 "Receive an object through the channel, returning the object." 338 339 obj = pickle.load(self.read_pipe) 340 if isinstance(obj, Exception): 341 raise obj 342 else: 343 return obj 344 345 def receive(self): 346 347 """ 348 Receive an object through the channel, returning the object. Send an 349 acknowledgement of receipt. (The acknowledgement makes the sender wait, 350 thus preventing processes from exiting and disrupting the communications 351 channel and losing data.) 352 """ 353 354 try: 355 obj = self._receive() 356 return obj 357 finally: 358 self._send("OK") 359 360 # Management of processes and communications. 361 362 class Exchange: 363 364 """ 365 A communications exchange that can be used to detect channels which are 366 ready to communicate. Subclasses of this class can define the 'store_data' 367 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 368 """ 369 370 def __init__(self, channels=None, limit=None, autoclose=1): 371 372 """ 373 Initialise the exchange with an optional list of 'channels'. 374 375 If the optional 'limit' is specified, restrictions on the addition of 376 new channels can be enforced and observed through the 'add_wait', 'wait' 377 and 'finish' methods. To make use of these methods, create a subclass of 378 this class and define a working 'store_data' method. 379 380 If the optional 'autoclose' parameter is set to a false value, channels 381 will not be closed automatically when they are removed from the exchange 382 - by default they are closed when removed. 383 """ 384 385 self.limit = limit 386 self.waiting = [] 387 self.autoclose = autoclose 388 self.readables = {} 389 self.removed = [] 390 self.poller = select.poll() 391 for channel in channels or []: 392 self.add(channel) 393 394 def add(self, channel): 395 396 "Add the given 'channel' to the exchange." 397 398 self.readables[channel.read_pipe.fileno()] = channel 399 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 400 401 def active(self): 402 403 "Return a list of active channels." 404 405 return self.readables.values() 406 407 def ready(self, timeout=None): 408 409 """ 410 Wait for a period of time specified by the optional 'timeout' (or until 411 communication is possible) and return a list of channels which are ready 412 to be read from. 413 """ 414 415 fds = self.poller.poll(timeout) 416 readables = [] 417 self.removed = [] 418 419 for fd, status in fds: 420 channel = self.readables[fd] 421 removed = 0 422 423 # Remove ended/error channels. 424 425 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 426 self.remove(channel) 427 self.removed.append(channel) 428 removed = 1 429 430 # Record readable channels. 431 432 if status & select.POLLIN: 433 if not (removed and self.autoclose): 434 readables.append(channel) 435 436 return readables 437 438 def remove(self, channel): 439 440 """ 441 Remove the given 'channel' from the exchange. 442 """ 443 444 del self.readables[channel.read_pipe.fileno()] 445 self.poller.unregister(channel.read_pipe.fileno()) 446 if self.autoclose: 447 channel.close() 448 channel.wait() 449 450 # Enhanced exchange methods involving channel limits. 451 452 def add_wait(self, channel): 453 454 """ 455 Add the given 'channel' to the exchange, waiting if the limit on active 456 channels would be exceeded by adding the channel. 457 """ 458 459 self.wait() 460 self.add(channel) 461 462 def wait(self): 463 464 """ 465 Test for the limit on channels, blocking and reading incoming data until 466 the number of channels is below the limit. 467 """ 468 469 # If limited, block until channels have been closed. 470 471 while self.limit is not None and len(self.active()) >= self.limit: 472 self.store() 473 474 def start_waiting(self): 475 476 """ 477 Start a waiting process. 478 """ 479 480 if self.waiting: 481 callable, args, kw = self.waiting.pop() 482 self.add(start(callable, *args, **kw)) 483 484 def finish(self): 485 486 """ 487 Finish the use of the exchange by waiting for all channels to complete. 488 """ 489 490 while self.active(): 491 self.store() 492 493 def store(self): 494 495 "For each ready channel, process the incoming data." 496 497 for channel in self.ready(): 498 self.store_data(channel) 499 self.start_waiting() 500 501 def store_data(self, channel): 502 503 """ 504 Store incoming data from the specified 'channel'. In subclasses of this 505 class, such data could be stored using instance attributes. 506 """ 507 508 raise NotImplementedError, "store_data" 509 510 # Convenience methods. 511 512 def start(self, callable, *args, **kw): 513 514 """ 515 Using pprocess.start, create a new process for the given 'callable' 516 using any additional arguments provided. Then, monitor the channel 517 created between this process and the created process. 518 """ 519 520 if self.limit is not None and len(self.active()) >= self.limit: 521 self.waiting.insert(0, (callable, args, kw)) 522 return 523 524 self.add_wait(start(callable, *args, **kw)) 525 526 def create(self): 527 528 """ 529 Using pprocess.create, create a new process and return the created 530 communications channel to the created process. In the creating process, 531 return None - the channel receiving data from the created process will 532 be automatically managed by this exchange. 533 """ 534 535 channel = create() 536 if channel.pid == 0: 537 return channel 538 else: 539 self.add_wait(channel) 540 return None 541 542 def manage(self, callable): 543 544 """ 545 Wrap the given 'callable' in an object which can then be called in the 546 same way as 'callable', but with new processes and communications 547 managed automatically. 548 """ 549 550 return ManagedCallable(callable, self) 551 552 class ManagedCallable: 553 554 "A callable managed by an exchange." 555 556 def __init__(self, callable, exchange): 557 558 """ 559 Wrap the given 'callable', using the given 'exchange' to monitor the 560 channels created for communications between this and the created 561 processes. Note that the 'callable' must be parallel-aware (that is, 562 have a 'channel' parameter). Use the MakeParallel class to wrap other 563 kinds of callable objects. 564 """ 565 566 self.callable = callable 567 self.exchange = exchange 568 569 def __call__(self, *args, **kw): 570 571 "Invoke the callable with the supplied arguments." 572 573 self.exchange.start(self.callable, *args, **kw) 574 575 # Abstractions and utilities. 576 577 class Map(Exchange): 578 579 "An exchange which can be used like the built-in 'map' function." 580 581 def add(self, channel): 582 583 "Add the given 'channel' to the exchange." 584 585 Exchange.add(self, channel) 586 self.channels[channel] = self.channel_number 587 self.channel_number += 1 588 589 def __call__(self, callable, sequence): 590 591 "Wrap and invoke 'callable' for each element in the 'sequence'." 592 593 if not isinstance(callable, MakeParallel): 594 wrapped = MakeParallel(callable) 595 else: 596 wrapped = callable 597 598 # Remember the channel addition order to order output. 599 600 self.channel_number = 0 601 self.channels = {} 602 self.results = [] 603 604 for i in sequence: 605 self.results.append(None) # placeholder 606 self.start(wrapped, i) 607 self.finish() 608 609 # NOTE: Could return results as they arrive, but we would then need to 610 # NOTE: return the position of each result in the original sequence. 611 612 return self.results 613 614 def store_data(self, channel): 615 616 "Accumulate the incoming data, associating results with channels." 617 618 data = channel.receive() 619 self.results[self.channels[channel]] = data 620 del self.channels[channel] 621 622 class Queue(Exchange): 623 624 """ 625 An exchange acting as a queue, making data from created processes available 626 in the order in which it is received. 627 """ 628 629 def __init__(self, *args, **kw): 630 Exchange.__init__(self, *args, **kw) 631 self.queue = [] 632 633 def store_data(self, channel): 634 635 "Accumulate the incoming data, associating results with channels." 636 637 data = channel.receive() 638 self.queue.insert(0, data) 639 640 def __iter__(self): 641 return self 642 643 def next(self): 644 645 "Return the next element in the queue." 646 647 if self.queue: 648 return self.queue.pop() 649 while self.active(): 650 self.store() 651 if self.queue: 652 return self.queue.pop() 653 else: 654 raise StopIteration 655 656 class MakeParallel: 657 658 "A wrapper around functions making them able to communicate results." 659 660 def __init__(self, callable): 661 662 """ 663 Initialise the wrapper with the given 'callable'. This object will then 664 be able to accept a 'channel' parameter when invoked, and to forward the 665 result of the given 'callable' via the channel provided back to the 666 invoking process. 667 """ 668 669 self.callable = callable 670 671 def __call__(self, channel, *args, **kw): 672 673 "Invoke the callable and return its result via the given 'channel'." 674 675 channel.send(self.callable(*args, **kw)) 676 677 # Utility functions. 678 679 def create(): 680 681 """ 682 Create a new process, returning a communications channel to both the 683 creating process and the created process. 684 """ 685 686 parent, child = socket.socketpair() 687 for s in [parent, child]: 688 s.setblocking(1) 689 690 pid = os.fork() 691 if pid == 0: 692 parent.close() 693 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 694 else: 695 child.close() 696 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 697 698 def exit(channel): 699 700 """ 701 Terminate a created process, closing the given 'channel'. 702 """ 703 704 channel.close() 705 os._exit(0) 706 707 def start(callable, *args, **kw): 708 709 """ 710 Create a new process which shall start running in the given 'callable'. 711 Additional arguments to the 'callable' can be given as additional arguments 712 to this function. 713 714 Return a communications channel to the creating process. For the created 715 process, supply a channel as the 'channel' parameter in the given 'callable' 716 so that it may send data back to the creating process. 717 """ 718 719 channel = create() 720 if channel.pid == 0: 721 try: 722 try: 723 callable(channel, *args, **kw) 724 except: 725 exc_type, exc_value, exc_traceback = sys.exc_info() 726 channel.send(exc_value) 727 finally: 728 exit(channel) 729 else: 730 return channel 731 732 def waitall(): 733 734 "Wait for all created processes to terminate." 735 736 try: 737 while 1: 738 os.wait() 739 except OSError: 740 pass 741 742 def pmap(callable, sequence, limit=None): 743 744 """ 745 A parallel version of the built-in map function with an optional process 746 'limit'. The given 'callable' should not be parallel-aware (that is, have a 747 'channel' parameter) since it will be wrapped for parallel communications 748 before being invoked. 749 750 Return the processed 'sequence' where each element in the sequence is 751 processed by a different process. 752 """ 753 754 mymap = Map(limit=limit) 755 return mymap(callable, sequence) 756 757 # vim: tabstop=4 expandtab shiftwidth=4