1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.3.2" 24 25 import os 26 import sys 27 import select 28 import socket 29 30 try: 31 import cPickle as pickle 32 except ImportError: 33 import pickle 34 35 try: 36 set 37 except NameError: 38 from sets import Set as set 39 40 # Communications. 41 42 class AcknowledgementError(Exception): 43 pass 44 45 class Channel: 46 47 "A communications channel." 48 49 def __init__(self, pid, read_pipe, write_pipe): 50 51 """ 52 Initialise the channel with a process identifier 'pid', a 'read_pipe' 53 from which messages will be received, and a 'write_pipe' into which 54 messages will be sent. 55 """ 56 57 self.pid = pid 58 self.read_pipe = read_pipe 59 self.write_pipe = write_pipe 60 61 def __del__(self): 62 63 # Since signals don't work well with I/O, we close pipes and wait for 64 # created processes upon finalisation. 65 66 self.close() 67 68 def close(self): 69 70 "Explicitly close the channel." 71 72 if self.read_pipe is not None: 73 self.read_pipe.close() 74 self.read_pipe = None 75 if self.write_pipe is not None: 76 self.write_pipe.close() 77 self.write_pipe = None 78 #self.wait(os.WNOHANG) 79 80 def wait(self, options=0): 81 82 "Wait for the created process, if any, to exit." 83 84 if self.pid != 0: 85 try: 86 os.waitpid(self.pid, options) 87 except OSError: 88 pass 89 90 def _send(self, obj): 91 92 "Send the given object 'obj' through the channel." 93 94 pickle.dump(obj, self.write_pipe) 95 self.write_pipe.flush() 96 97 def send(self, obj): 98 99 """ 100 Send the given object 'obj' through the channel. Then wait for an 101 acknowledgement. (The acknowledgement makes the caller wait, thus 102 preventing processes from exiting and disrupting the communications 103 channel and losing data.) 104 """ 105 106 self._send(obj) 107 if self._receive() != "OK": 108 raise AcknowledgementError, obj 109 110 def _receive(self): 111 112 "Receive an object through the channel, returning the object." 113 114 obj = pickle.load(self.read_pipe) 115 if isinstance(obj, Exception): 116 raise obj 117 else: 118 return obj 119 120 def receive(self): 121 122 """ 123 Receive an object through the channel, returning the object. Send an 124 acknowledgement of receipt. (The acknowledgement makes the sender wait, 125 thus preventing processes from exiting and disrupting the communications 126 channel and losing data.) 127 """ 128 129 try: 130 obj = self._receive() 131 return obj 132 finally: 133 self._send("OK") 134 135 class PersistentChannel(Channel): 136 137 """ 138 A persistent communications channel which can handle peer disconnection, 139 acting as a server, meaning that this channel is associated with a specific 140 address which can be contacted by other processes. 141 """ 142 143 def __init__(self, pid, endpoint, address): 144 Channel.__init__(self, pid, None, None) 145 self.endpoint = endpoint 146 self.address = address 147 self.poller = select.poll() 148 self._ensure_pipes() 149 150 def close(self): 151 #print "Closing persistent channel" 152 Channel.close(self) 153 try: 154 os.unlink(self.address) 155 except OSError: 156 pass 157 158 def _ensure_pipes(self): 159 160 "Ensure that the channel is capable of communicating." 161 162 if self.read_pipe is None or self.write_pipe is None: 163 #print "Waiting for connection" 164 self.endpoint.listen(1) 165 endpoint, address = self.endpoint.accept() 166 self.read_pipe = endpoint.makefile("r", 0) 167 self.write_pipe = endpoint.makefile("w", 0) 168 169 # Monitor the write pipe for error conditions. 170 171 fileno = self.write_pipe.fileno() 172 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 173 174 def _reset_pipes(self): 175 fileno = self.write_pipe.fileno() 176 self.poller.unregister(fileno) 177 self.read_pipe = None 178 self.write_pipe = None 179 180 def _ensure_communication(self, timeout=None): 181 182 "Ensure that sending and receiving are possible." 183 184 while 1: 185 fileno = self.write_pipe.fileno() 186 fds = self.poller.poll(timeout) 187 for fd, status in fds: 188 if fd != fileno: 189 continue 190 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 191 #print "Broken connection" 192 self._reset_pipes() 193 self._ensure_pipes() 194 break 195 else: 196 return 197 198 def _send(self, obj): 199 200 "Send the given object 'obj' through the channel." 201 202 self._ensure_communication() 203 Channel._send(self, obj) 204 205 def _receive(self): 206 207 "Receive an object through the channel, returning the object." 208 209 self._ensure_communication() 210 return Channel._receive(self) 211 212 # Management of processes and communications. 213 214 class Exchange: 215 216 """ 217 A communications exchange that can be used to detect channels which are 218 ready to communicate. Subclasses of this class can define the 'store_data' 219 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 220 """ 221 222 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 223 224 """ 225 Initialise the exchange with an optional list of 'channels'. 226 227 If the optional 'limit' is specified, restrictions on the addition of 228 new channels can be enforced and observed through the 'add_wait', 'wait' 229 and 'finish' methods. To make use of these methods, create a subclass of 230 this class and define a working 'store_data' method. 231 232 If the optional 'reuse' parameter is set to a true value, channels and 233 processes will be reused for waiting computations. 234 235 If the optional 'autoclose' parameter is set to a false value, channels 236 will not be closed automatically when they are removed from the exchange 237 - by default they are closed when removed. 238 """ 239 240 self.limit = limit 241 self.reuse = reuse 242 self.autoclose = autoclose 243 self.waiting = [] 244 self.readables = {} 245 self.removed = [] 246 self.poller = select.poll() 247 for channel in channels or []: 248 self.add(channel) 249 250 def add(self, channel): 251 252 "Add the given 'channel' to the exchange." 253 254 fileno = channel.read_pipe.fileno() 255 self.readables[fileno] = channel 256 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 257 258 def active(self): 259 260 "Return a list of active channels." 261 262 return self.readables.values() 263 264 def ready(self, timeout=None): 265 266 """ 267 Wait for a period of time specified by the optional 'timeout' (or until 268 communication is possible) and return a list of channels which are ready 269 to be read from. 270 """ 271 272 fds = self.poller.poll(timeout) 273 readables = [] 274 self.removed = [] 275 276 for fd, status in fds: 277 channel = self.readables[fd] 278 removed = 0 279 280 # Remove ended/error channels. 281 282 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 283 self.remove(channel) 284 self.removed.append(channel) 285 removed = 1 286 287 # Record readable channels. 288 289 if status & select.POLLIN: 290 if not (removed and self.autoclose): 291 readables.append(channel) 292 293 return readables 294 295 def remove(self, channel): 296 297 """ 298 Remove the given 'channel' from the exchange. 299 """ 300 301 del self.readables[channel.read_pipe.fileno()] 302 self.poller.unregister(channel.read_pipe.fileno()) 303 if self.autoclose: 304 channel.close() 305 channel.wait() 306 307 # Enhanced exchange methods involving channel limits. 308 309 def add_wait(self, channel): 310 311 """ 312 Add the given 'channel' to the exchange, waiting if the limit on active 313 channels would be exceeded by adding the channel. 314 """ 315 316 self.wait() 317 self.add(channel) 318 319 def wait(self): 320 321 """ 322 Test for the limit on channels, blocking and reading incoming data until 323 the number of channels is below the limit. 324 """ 325 326 # If limited, block until channels have been closed. 327 328 while self.limit is not None and len(self.active()) >= self.limit: 329 self.store() 330 331 def _get_waiting(self, channel): 332 333 """ 334 Get waiting callable and argument information for new processes, given 335 the reception of data on the given 'channel'. 336 """ 337 338 if self.waiting: 339 callable, args, kw = self.waiting.pop() 340 341 # Try and reuse existing channels if possible. 342 343 if self.reuse: 344 345 # Re-add the channel - this may update information related to 346 # the channel in subclasses. 347 348 self.add(channel) 349 channel.send((args, kw)) 350 else: 351 return callable, args, kw 352 353 # Where channels are being reused, but where no processes are waiting 354 # any more, send a special value to tell them to quit. 355 356 elif self.reuse: 357 channel.send(None) 358 359 return None 360 361 def finish(self): 362 363 """ 364 Finish the use of the exchange by waiting for all channels to complete. 365 """ 366 367 while self.active(): 368 self.store() 369 370 def store(self): 371 372 "For each ready channel, process the incoming data." 373 374 for channel in self.ready(): 375 self.store_data(channel) 376 self.start_waiting(channel) 377 378 def store_data(self, channel): 379 380 """ 381 Store incoming data from the specified 'channel'. In subclasses of this 382 class, such data could be stored using instance attributes. 383 """ 384 385 raise NotImplementedError, "store_data" 386 387 # Support for the convenience methods. 388 389 def _set_waiting(self, callable, args, kw): 390 391 """ 392 Support process creation by returning whether the given 'callable' has 393 been queued for later invocation. 394 """ 395 396 if self.limit is not None and len(self.active()) >= self.limit: 397 self.waiting.insert(0, (callable, args, kw)) 398 return 1 399 else: 400 return 0 401 402 def _get_channel_for_process(self, channel): 403 404 """ 405 Support process creation by returning the given 'channel' to the 406 creating process, and None to the created process. 407 """ 408 409 if channel.pid == 0: 410 return channel 411 else: 412 self.add_wait(channel) 413 return None 414 415 # Methods for overriding, related to the convenience methods. 416 417 def start_waiting(self, channel): 418 419 """ 420 Start a waiting process given the reception of data on the given 421 'channel'. 422 """ 423 424 details = self._get_waiting(channel) 425 if details is not None: 426 callable, args, kw = details 427 self.add(start(callable, *args, **kw)) 428 429 # Convenience methods. 430 431 def start(self, callable, *args, **kw): 432 433 """ 434 Create a new process for the given 'callable' using any additional 435 arguments provided. Then, monitor the channel created between this 436 process and the created process. 437 """ 438 439 if self._set_waiting(callable, args, kw): 440 return 441 442 self.add_wait(start(callable, *args, **kw)) 443 444 def create(self): 445 446 """ 447 Create a new process and return the created communications channel to 448 the created process. In the creating process, return None - the channel 449 receiving data from the created process will be automatically managed by 450 this exchange. 451 """ 452 453 channel = create() 454 return self._get_channel_for_process(channel) 455 456 def manage(self, callable): 457 458 """ 459 Wrap the given 'callable' in an object which can then be called in the 460 same way as 'callable', but with new processes and communications 461 managed automatically. 462 """ 463 464 return ManagedCallable(callable, self) 465 466 class Persistent: 467 468 """ 469 A mix-in class providing methods to exchanges for the management of 470 persistent communications. 471 """ 472 473 def start_waiting(self, channel): 474 475 """ 476 Start a waiting process given the reception of data on the given 477 'channel'. 478 """ 479 480 details = self._get_waiting(channel) 481 if details is not None: 482 callable, args, kw = details 483 self.add(start_persistent(channel.address, callable, *args, **kw)) 484 485 def start(self, address, callable, *args, **kw): 486 487 """ 488 Create a new process, located at the given 'address', for the given 489 'callable' using any additional arguments provided. Then, monitor the 490 channel created between this process and the created process. 491 """ 492 493 if self._set_waiting(callable, args, kw): 494 return 495 496 start_persistent(address, callable, *args, **kw) 497 498 def create(self, address): 499 500 """ 501 Create a new process, located at the given 'address', and return the 502 created communications channel to the created process. In the creating 503 process, return None - the channel receiving data from the created 504 process will be automatically managed by this exchange. 505 """ 506 507 channel = create_persistent(address) 508 return self._get_channel_for_process(channel) 509 510 def manage(self, address, callable): 511 512 """ 513 Using the given 'address', publish the given 'callable' in an object 514 which can then be called in the same way as 'callable', but with new 515 processes and communications managed automatically. 516 """ 517 518 return PersistentCallable(address, callable, self) 519 520 def connect(self, address): 521 522 "Connect to a process which is contactable via the given 'address'." 523 524 channel = connect_persistent(address) 525 self.add_wait(channel) 526 527 class ManagedCallable: 528 529 "A callable managed by an exchange." 530 531 def __init__(self, callable, exchange): 532 533 """ 534 Wrap the given 'callable', using the given 'exchange' to monitor the 535 channels created for communications between this and the created 536 processes. Note that the 'callable' must be parallel-aware (that is, 537 have a 'channel' parameter). Use the MakeParallel class to wrap other 538 kinds of callable objects. 539 """ 540 541 self.callable = callable 542 self.exchange = exchange 543 544 def __call__(self, *args, **kw): 545 546 "Invoke the callable with the supplied arguments." 547 548 self.exchange.start(self.callable, *args, **kw) 549 550 class PersistentCallable: 551 552 "A callable which sets up a persistent communications channel." 553 554 def __init__(self, address, callable, exchange): 555 556 """ 557 Using the given 'address', wrap the given 'callable', using the given 558 'exchange' to monitor the channels created for communications between 559 this and the created processes, so that when it is called, a background 560 process is started within which the 'callable' will run. Note that the 561 'callable' must be parallel-aware (that is, have a 'channel' parameter). 562 Use the MakeParallel class to wrap other kinds of callable objects. 563 """ 564 565 self.callable = callable 566 self.exchange = exchange 567 self.address = address 568 569 def __call__(self, *args, **kw): 570 571 "Invoke the callable and discard the result." 572 573 self.exchange.start(self.address, self.callable, *args, **kw) 574 575 # Abstractions and utilities. 576 577 class Map(Exchange): 578 579 "An exchange which can be used like the built-in 'map' function." 580 581 def __init__(self, *args, **kw): 582 Exchange.__init__(self, *args, **kw) 583 self.init() 584 585 def init(self): 586 587 "Remember the channel addition order to order output." 588 589 self.channel_number = 0 590 self.channels = {} 591 self.results = [] 592 593 def add(self, channel): 594 595 "Add the given 'channel' to the exchange." 596 597 Exchange.add(self, channel) 598 self.channels[channel] = self.channel_number 599 self.channel_number += 1 600 601 def start(self, callable, *args, **kw): 602 603 """ 604 Create a new process for the given 'callable' using any additional 605 arguments provided. Then, monitor the channel created between this 606 process and the created process. 607 """ 608 609 self.results.append(None) # placeholder 610 Exchange.start(self, callable, *args, **kw) 611 612 def create(self): 613 614 """ 615 Create a new process and return the created communications channel to 616 the created process. In the creating process, return None - the channel 617 receiving data from the created process will be automatically managed by 618 this exchange. 619 """ 620 621 self.results.append(None) # placeholder 622 return Exchange.create(self) 623 624 def __call__(self, callable, sequence): 625 626 "Wrap and invoke 'callable' for each element in the 'sequence'." 627 628 if not isinstance(callable, MakeParallel): 629 wrapped = MakeParallel(callable) 630 else: 631 wrapped = callable 632 633 self.init() 634 635 # Start processes for each element in the sequence. 636 637 for i in sequence: 638 self.start(wrapped, i) 639 640 # Access to the results occurs through this object. 641 642 return self 643 644 def __getitem__(self, i): 645 self.finish() 646 return self.results[i] 647 648 def __iter__(self): 649 self.finish() 650 return iter(self.results) 651 652 def store_data(self, channel): 653 654 "Accumulate the incoming data, associating results with channels." 655 656 data = channel.receive() 657 self.results[self.channels[channel]] = data 658 del self.channels[channel] 659 660 class Queue(Exchange): 661 662 """ 663 An exchange acting as a queue, making data from created processes available 664 in the order in which it is received. 665 """ 666 667 def __init__(self, *args, **kw): 668 Exchange.__init__(self, *args, **kw) 669 self.queue = [] 670 671 def store_data(self, channel): 672 673 "Accumulate the incoming data, associating results with channels." 674 675 data = channel.receive() 676 self.queue.insert(0, data) 677 678 def __iter__(self): 679 return self 680 681 def next(self): 682 683 "Return the next element in the queue." 684 685 if self.queue: 686 return self.queue.pop() 687 while self.active(): 688 self.store() 689 if self.queue: 690 return self.queue.pop() 691 else: 692 raise StopIteration 693 694 class MakeParallel: 695 696 "A wrapper around functions making them able to communicate results." 697 698 def __init__(self, callable): 699 700 """ 701 Initialise the wrapper with the given 'callable'. This object will then 702 be able to accept a 'channel' parameter when invoked, and to forward the 703 result of the given 'callable' via the channel provided back to the 704 invoking process. 705 """ 706 707 self.callable = callable 708 709 def __call__(self, channel, *args, **kw): 710 711 "Invoke the callable and return its result via the given 'channel'." 712 713 channel.send(self.callable(*args, **kw)) 714 715 class MakeReusable(MakeParallel): 716 717 """ 718 A wrapper around functions making them able to communicate results in a 719 reusable fashion. 720 """ 721 722 def __call__(self, channel, *args, **kw): 723 724 "Invoke the callable and return its result via the given 'channel'." 725 726 channel.send(self.callable(*args, **kw)) 727 t = channel.receive() 728 while t is not None: 729 args, kw = t 730 channel.send(self.callable(*args, **kw)) 731 t = channel.receive() 732 733 # Persistent variants. 734 735 class PersistentExchange(Persistent, Exchange): 736 737 "An exchange which manages persistent communications." 738 739 pass 740 741 class PersistentQueue(Persistent, Queue): 742 743 "A queue which manages persistent communications." 744 745 pass 746 747 # Utility functions. 748 749 _cpuinfo_fields = "physical id", "core id" 750 751 def get_number_of_cores(): 752 753 """ 754 Return the number of distinct, genuine processor cores. If the platform is 755 not supported by this function, None is returned. 756 """ 757 758 try: 759 f = open("/proc/cpuinfo") 760 try: 761 processors = set() 762 processor = [None, None] 763 764 for line in f.xreadlines(): 765 for i, field in enumerate(_cpuinfo_fields): 766 if line.startswith(field): 767 t = line.split(":") 768 processor[i] = int(t[1].strip()) 769 break 770 else: 771 if line.startswith("processor") and processor[0] is not None: 772 processors.add(tuple(processor)) 773 processor = [None, None] 774 775 if processor[0] is not None: 776 processors.add(tuple(processor)) 777 778 return len(processors) 779 780 finally: 781 f.close() 782 783 except OSError: 784 return None 785 786 def create(): 787 788 """ 789 Create a new process, returning a communications channel to both the 790 creating process and the created process. 791 """ 792 793 parent, child = socket.socketpair() 794 for s in [parent, child]: 795 s.setblocking(1) 796 797 pid = os.fork() 798 if pid == 0: 799 parent.close() 800 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 801 else: 802 child.close() 803 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 804 805 def create_persistent(address): 806 807 """ 808 Create a new process, returning a persistent communications channel between 809 the creating process and the created process. This channel can be 810 disconnected from the creating process and connected to another process, and 811 thus can be used to collect results from daemon processes. 812 813 In order to be able to reconnect to created processes, the 'address' of the 814 communications endpoint for the created process needs to be provided. This 815 should be a filename. 816 """ 817 818 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 819 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 820 child.bind(address) 821 822 for s in [parent, child]: 823 s.setblocking(1) 824 825 pid = os.fork() 826 if pid == 0: 827 parent.close() 828 return PersistentChannel(pid, child, address) 829 else: 830 child.close() 831 #parent.connect(address) 832 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 833 834 def connect_persistent(address): 835 836 """ 837 Connect via a persistent channel to an existing created process, reachable 838 at the given 'address'. 839 """ 840 841 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 842 parent.setblocking(1) 843 parent.connect(address) 844 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 845 846 def exit(channel): 847 848 """ 849 Terminate a created process, closing the given 'channel'. 850 """ 851 852 channel.close() 853 os._exit(0) 854 855 def start(callable, *args, **kw): 856 857 """ 858 Create a new process which shall start running in the given 'callable'. 859 Additional arguments to the 'callable' can be given as additional arguments 860 to this function. 861 862 Return a communications channel to the creating process. For the created 863 process, supply a channel as the 'channel' parameter in the given 'callable' 864 so that it may send data back to the creating process. 865 """ 866 867 channel = create() 868 if channel.pid == 0: 869 try: 870 try: 871 callable(channel, *args, **kw) 872 except: 873 exc_type, exc_value, exc_traceback = sys.exc_info() 874 channel.send(exc_value) 875 finally: 876 exit(channel) 877 else: 878 return channel 879 880 def start_persistent(address, callable, *args, **kw): 881 882 """ 883 Create a new process which shall be reachable using the given 'address' and 884 which will start running in the given 'callable'. Additional arguments to 885 the 'callable' can be given as additional arguments to this function. 886 887 Return a communications channel to the creating process. For the created 888 process, supply a channel as the 'channel' parameter in the given 'callable' 889 so that it may send data back to the creating process. 890 891 Note that the created process employs a channel which is persistent: it can 892 withstand disconnection from the creating process and subsequent connections 893 from other processes. 894 """ 895 896 channel = create_persistent(address) 897 if channel.pid == 0: 898 close_streams() 899 try: 900 try: 901 callable(channel, *args, **kw) 902 except: 903 exc_type, exc_value, exc_traceback = sys.exc_info() 904 channel.send(exc_value) 905 finally: 906 exit(channel) 907 else: 908 return channel 909 910 def close_streams(): 911 912 """ 913 Close streams which keep the current process attached to any creating 914 processes. 915 """ 916 917 os.close(sys.stdin.fileno()) 918 os.close(sys.stdout.fileno()) 919 os.close(sys.stderr.fileno()) 920 921 def waitall(): 922 923 "Wait for all created processes to terminate." 924 925 try: 926 while 1: 927 os.wait() 928 except OSError: 929 pass 930 931 def pmap(callable, sequence, limit=None): 932 933 """ 934 A parallel version of the built-in map function with an optional process 935 'limit'. The given 'callable' should not be parallel-aware (that is, have a 936 'channel' parameter) since it will be wrapped for parallel communications 937 before being invoked. 938 939 Return the processed 'sequence' where each element in the sequence is 940 processed by a different process. 941 """ 942 943 mymap = Map(limit=limit) 944 return mymap(callable, sequence) 945 946 # vim: tabstop=4 expandtab shiftwidth=4