1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.4.1" 24 25 import os 26 import sys 27 import select 28 import socket 29 import platform 30 31 try: 32 import cPickle as pickle 33 except ImportError: 34 import pickle 35 36 try: 37 set 38 except NameError: 39 from sets import Set as set 40 41 # Communications. 42 43 class AcknowledgementError(Exception): 44 pass 45 46 class Channel: 47 48 "A communications channel." 49 50 def __init__(self, pid, read_pipe, write_pipe): 51 52 """ 53 Initialise the channel with a process identifier 'pid', a 'read_pipe' 54 from which messages will be received, and a 'write_pipe' into which 55 messages will be sent. 56 """ 57 58 self.pid = pid 59 self.read_pipe = read_pipe 60 self.write_pipe = write_pipe 61 62 def __del__(self): 63 64 # Since signals don't work well with I/O, we close pipes and wait for 65 # created processes upon finalisation. 66 67 self.close() 68 69 def close(self): 70 71 "Explicitly close the channel." 72 73 if self.read_pipe is not None: 74 self.read_pipe.close() 75 self.read_pipe = None 76 if self.write_pipe is not None: 77 self.write_pipe.close() 78 self.write_pipe = None 79 #self.wait(os.WNOHANG) 80 81 def wait(self, options=0): 82 83 "Wait for the created process, if any, to exit." 84 85 if self.pid != 0: 86 try: 87 os.waitpid(self.pid, options) 88 except OSError: 89 pass 90 91 def _send(self, obj): 92 93 "Send the given object 'obj' through the channel." 94 95 pickle.dump(obj, self.write_pipe) 96 self.write_pipe.flush() 97 98 def send(self, obj): 99 100 """ 101 Send the given object 'obj' through the channel. Then wait for an 102 acknowledgement. (The acknowledgement makes the caller wait, thus 103 preventing processes from exiting and disrupting the communications 104 channel and losing data.) 105 """ 106 107 self._send(obj) 108 if self._receive() != "OK": 109 raise AcknowledgementError, obj 110 111 def _receive(self): 112 113 "Receive an object through the channel, returning the object." 114 115 obj = pickle.load(self.read_pipe) 116 if isinstance(obj, Exception): 117 raise obj 118 else: 119 return obj 120 121 def receive(self): 122 123 """ 124 Receive an object through the channel, returning the object. Send an 125 acknowledgement of receipt. (The acknowledgement makes the sender wait, 126 thus preventing processes from exiting and disrupting the communications 127 channel and losing data.) 128 """ 129 130 try: 131 obj = self._receive() 132 return obj 133 finally: 134 self._send("OK") 135 136 class PersistentChannel(Channel): 137 138 """ 139 A persistent communications channel which can handle peer disconnection, 140 acting as a server, meaning that this channel is associated with a specific 141 address which can be contacted by other processes. 142 """ 143 144 def __init__(self, pid, endpoint, address): 145 Channel.__init__(self, pid, None, None) 146 self.endpoint = endpoint 147 self.address = address 148 self.poller = select.poll() 149 150 # Listen for connections before this process is interested in 151 # communicating. It is not desirable to wait for connections at this 152 # point because this will block the process. 153 154 self.endpoint.listen(1) 155 156 def close(self): 157 158 "Close the persistent channel and remove the socket file." 159 160 Channel.close(self) 161 try: 162 os.unlink(self.address) 163 except OSError: 164 pass 165 166 def _ensure_pipes(self): 167 168 "Ensure that the channel is capable of communicating." 169 170 if self.read_pipe is None or self.write_pipe is None: 171 172 # Accept any incoming connections. 173 174 endpoint, address = self.endpoint.accept() 175 self.read_pipe = endpoint.makefile("r", 0) 176 self.write_pipe = endpoint.makefile("w", 0) 177 178 # Monitor the write pipe for error conditions. 179 180 fileno = self.write_pipe.fileno() 181 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 182 183 def _reset_pipes(self): 184 185 "Discard the existing connection." 186 187 fileno = self.write_pipe.fileno() 188 self.poller.unregister(fileno) 189 self.read_pipe = None 190 self.write_pipe = None 191 self.endpoint.listen(1) 192 193 def _ensure_communication(self, timeout=None): 194 195 "Ensure that sending and receiving are possible." 196 197 while 1: 198 self._ensure_pipes() 199 fileno = self.write_pipe.fileno() 200 fds = self.poller.poll(timeout) 201 for fd, status in fds: 202 if fd != fileno: 203 continue 204 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 205 206 # Broken connection: discard it and start all over again. 207 208 self._reset_pipes() 209 break 210 else: 211 return 212 213 def _send(self, obj): 214 215 "Send the given object 'obj' through the channel." 216 217 self._ensure_communication() 218 Channel._send(self, obj) 219 220 def _receive(self): 221 222 "Receive an object through the channel, returning the object." 223 224 self._ensure_communication() 225 return Channel._receive(self) 226 227 # Management of processes and communications. 228 229 class Exchange: 230 231 """ 232 A communications exchange that can be used to detect channels which are 233 ready to communicate. Subclasses of this class can define the 'store_data' 234 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 235 """ 236 237 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 238 239 """ 240 Initialise the exchange with an optional list of 'channels'. 241 242 If the optional 'limit' is specified, restrictions on the addition of 243 new channels can be enforced and observed through the 'add_wait', 'wait' 244 and 'finish' methods. To make use of these methods, create a subclass of 245 this class and define a working 'store_data' method. 246 247 If the optional 'reuse' parameter is set to a true value, channels and 248 processes will be reused for waiting computations. 249 250 If the optional 'autoclose' parameter is set to a false value, channels 251 will not be closed automatically when they are removed from the exchange 252 - by default they are closed when removed. 253 """ 254 255 self.limit = limit 256 self.reuse = reuse 257 self.autoclose = autoclose 258 self.waiting = [] 259 self.readables = {} 260 self.removed = [] 261 self.poller = select.poll() 262 for channel in channels or []: 263 self.add(channel) 264 265 def add(self, channel): 266 267 "Add the given 'channel' to the exchange." 268 269 fileno = channel.read_pipe.fileno() 270 self.readables[fileno] = channel 271 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 272 273 def active(self): 274 275 "Return a list of active channels." 276 277 return self.readables.values() 278 279 def ready(self, timeout=None): 280 281 """ 282 Wait for a period of time specified by the optional 'timeout' (or until 283 communication is possible) and return a list of channels which are ready 284 to be read from. 285 """ 286 287 fds = self.poller.poll(timeout) 288 readables = [] 289 self.removed = [] 290 291 for fd, status in fds: 292 channel = self.readables[fd] 293 removed = 0 294 295 # Remove ended/error channels. 296 297 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 298 self.remove(channel) 299 self.removed.append(channel) 300 removed = 1 301 302 # Record readable channels. 303 304 if status & select.POLLIN: 305 if not (removed and self.autoclose): 306 readables.append(channel) 307 308 return readables 309 310 def remove(self, channel): 311 312 """ 313 Remove the given 'channel' from the exchange. 314 """ 315 316 fileno = channel.read_pipe.fileno() 317 del self.readables[fileno] 318 self.poller.unregister(fileno) 319 if self.autoclose: 320 channel.close() 321 channel.wait() 322 323 # Enhanced exchange methods involving channel limits. 324 325 def add_wait(self, channel): 326 327 """ 328 Add the given 'channel' to the exchange, waiting if the limit on active 329 channels would be exceeded by adding the channel. 330 """ 331 332 self.wait() 333 self.add(channel) 334 335 def wait(self): 336 337 """ 338 Test for the limit on channels, blocking and reading incoming data until 339 the number of channels is below the limit. 340 """ 341 342 # If limited, block until channels have been closed. 343 344 while self.limit is not None and len(self.active()) >= self.limit: 345 self.store() 346 347 def _get_waiting(self, channel): 348 349 """ 350 Get waiting callable and argument information for new processes, given 351 the reception of data on the given 'channel'. 352 """ 353 354 if self.waiting: 355 callable, args, kw = self.waiting.pop() 356 357 # Try and reuse existing channels if possible. 358 359 if self.reuse: 360 361 # Re-add the channel - this may update information related to 362 # the channel in subclasses. 363 364 self.add(channel) 365 channel.send((args, kw)) 366 else: 367 return callable, args, kw 368 369 # Where channels are being reused, but where no processes are waiting 370 # any more, send a special value to tell them to quit. 371 372 elif self.reuse: 373 channel.send(None) 374 375 return None 376 377 def finish(self): 378 379 """ 380 Finish the use of the exchange by waiting for all channels to complete. 381 """ 382 383 while self.active(): 384 self.store() 385 386 def store(self): 387 388 "For each ready channel, process the incoming data." 389 390 for channel in self.ready(): 391 self.store_data(channel) 392 self.start_waiting(channel) 393 394 def store_data(self, channel): 395 396 """ 397 Store incoming data from the specified 'channel'. In subclasses of this 398 class, such data could be stored using instance attributes. 399 """ 400 401 raise NotImplementedError, "store_data" 402 403 # Support for the convenience methods. 404 405 def _set_waiting(self, callable, args, kw): 406 407 """ 408 Support process creation by returning whether the given 'callable' has 409 been queued for later invocation. 410 """ 411 412 if self.limit is not None and len(self.active()) >= self.limit: 413 self.waiting.insert(0, (callable, args, kw)) 414 return 1 415 else: 416 return 0 417 418 def _get_channel_for_process(self, channel): 419 420 """ 421 Support process creation by returning the given 'channel' to the 422 creating process, and None to the created process. 423 """ 424 425 if channel.pid == 0: 426 return channel 427 else: 428 self.add_wait(channel) 429 return None 430 431 # Methods for overriding, related to the convenience methods. 432 433 def start_waiting(self, channel): 434 435 """ 436 Start a waiting process given the reception of data on the given 437 'channel'. 438 """ 439 440 details = self._get_waiting(channel) 441 if details is not None: 442 callable, args, kw = details 443 self.add(start(callable, *args, **kw)) 444 445 # Convenience methods. 446 447 def start(self, callable, *args, **kw): 448 449 """ 450 Create a new process for the given 'callable' using any additional 451 arguments provided. Then, monitor the channel created between this 452 process and the created process. 453 """ 454 455 if self._set_waiting(callable, args, kw): 456 return 457 458 self.add_wait(start(callable, *args, **kw)) 459 460 def create(self): 461 462 """ 463 Create a new process and return the created communications channel to 464 the created process. In the creating process, return None - the channel 465 receiving data from the created process will be automatically managed by 466 this exchange. 467 """ 468 469 channel = create() 470 return self._get_channel_for_process(channel) 471 472 def manage(self, callable): 473 474 """ 475 Wrap the given 'callable' in an object which can then be called in the 476 same way as 'callable', but with new processes and communications 477 managed automatically. 478 """ 479 480 return ManagedCallable(callable, self) 481 482 class Persistent: 483 484 """ 485 A mix-in class providing methods to exchanges for the management of 486 persistent communications. 487 """ 488 489 def start_waiting(self, channel): 490 491 """ 492 Start a waiting process given the reception of data on the given 493 'channel'. 494 """ 495 496 details = self._get_waiting(channel) 497 if details is not None: 498 callable, args, kw = details 499 self.add(start_persistent(channel.address, callable, *args, **kw)) 500 501 def start(self, address, callable, *args, **kw): 502 503 """ 504 Create a new process, located at the given 'address', for the given 505 'callable' using any additional arguments provided. Then, monitor the 506 channel created between this process and the created process. 507 """ 508 509 if self._set_waiting(callable, args, kw): 510 return 511 512 start_persistent(address, callable, *args, **kw) 513 514 def create(self, address): 515 516 """ 517 Create a new process, located at the given 'address', and return the 518 created communications channel to the created process. In the creating 519 process, return None - the channel receiving data from the created 520 process will be automatically managed by this exchange. 521 """ 522 523 channel = create_persistent(address) 524 return self._get_channel_for_process(channel) 525 526 def manage(self, address, callable): 527 528 """ 529 Using the given 'address', publish the given 'callable' in an object 530 which can then be called in the same way as 'callable', but with new 531 processes and communications managed automatically. 532 """ 533 534 return PersistentCallable(address, callable, self) 535 536 def connect(self, address): 537 538 "Connect to a process which is contactable via the given 'address'." 539 540 channel = connect_persistent(address) 541 self.add_wait(channel) 542 543 class ManagedCallable: 544 545 "A callable managed by an exchange." 546 547 def __init__(self, callable, exchange): 548 549 """ 550 Wrap the given 'callable', using the given 'exchange' to monitor the 551 channels created for communications between this and the created 552 processes. Note that the 'callable' must be parallel-aware (that is, 553 have a 'channel' parameter). Use the MakeParallel class to wrap other 554 kinds of callable objects. 555 """ 556 557 self.callable = callable 558 self.exchange = exchange 559 560 def __call__(self, *args, **kw): 561 562 "Invoke the callable with the supplied arguments." 563 564 self.exchange.start(self.callable, *args, **kw) 565 566 class PersistentCallable: 567 568 "A callable which sets up a persistent communications channel." 569 570 def __init__(self, address, callable, exchange): 571 572 """ 573 Using the given 'address', wrap the given 'callable', using the given 574 'exchange' to monitor the channels created for communications between 575 this and the created processes, so that when it is called, a background 576 process is started within which the 'callable' will run. Note that the 577 'callable' must be parallel-aware (that is, have a 'channel' parameter). 578 Use the MakeParallel class to wrap other kinds of callable objects. 579 """ 580 581 self.callable = callable 582 self.exchange = exchange 583 self.address = address 584 585 def __call__(self, *args, **kw): 586 587 "Invoke the callable with the supplied arguments." 588 589 self.exchange.start(self.address, self.callable, *args, **kw) 590 591 class BackgroundCallable: 592 593 """ 594 A callable which sets up a persistent communications channel, but is 595 unmanaged by an exchange. 596 """ 597 598 def __init__(self, address, callable): 599 600 """ 601 Using the given 'address', wrap the given 'callable'. This object can 602 then be invoked, but the wrapped callable will be run in a background 603 process. Note that the 'callable' must be parallel-aware (that is, have 604 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 605 of callable objects. 606 """ 607 608 self.callable = callable 609 self.address = address 610 611 def __call__(self, *args, **kw): 612 613 "Invoke the callable with the supplied arguments." 614 615 start_persistent(self.address, self.callable, *args, **kw) 616 617 # Abstractions and utilities. 618 619 class Map(Exchange): 620 621 "An exchange which can be used like the built-in 'map' function." 622 623 def __init__(self, *args, **kw): 624 Exchange.__init__(self, *args, **kw) 625 self.init() 626 627 def init(self): 628 629 "Remember the channel addition order to order output." 630 631 self.channel_number = 0 632 self.channels = {} 633 self.results = [] 634 635 def add(self, channel): 636 637 "Add the given 'channel' to the exchange." 638 639 Exchange.add(self, channel) 640 self.channels[channel] = self.channel_number 641 self.channel_number += 1 642 643 def start(self, callable, *args, **kw): 644 645 """ 646 Create a new process for the given 'callable' using any additional 647 arguments provided. Then, monitor the channel created between this 648 process and the created process. 649 """ 650 651 self.results.append(None) # placeholder 652 Exchange.start(self, callable, *args, **kw) 653 654 def create(self): 655 656 """ 657 Create a new process and return the created communications channel to 658 the created process. In the creating process, return None - the channel 659 receiving data from the created process will be automatically managed by 660 this exchange. 661 """ 662 663 self.results.append(None) # placeholder 664 return Exchange.create(self) 665 666 def __call__(self, callable, sequence): 667 668 "Wrap and invoke 'callable' for each element in the 'sequence'." 669 670 if not isinstance(callable, MakeParallel): 671 wrapped = MakeParallel(callable) 672 else: 673 wrapped = callable 674 675 self.init() 676 677 # Start processes for each element in the sequence. 678 679 for i in sequence: 680 self.start(wrapped, i) 681 682 # Access to the results occurs through this object. 683 684 return self 685 686 def __getitem__(self, i): 687 self.finish() 688 return self.results[i] 689 690 def __iter__(self): 691 self.finish() 692 return iter(self.results) 693 694 def store_data(self, channel): 695 696 "Accumulate the incoming data, associating results with channels." 697 698 data = channel.receive() 699 self.results[self.channels[channel]] = data 700 del self.channels[channel] 701 702 class Queue(Exchange): 703 704 """ 705 An exchange acting as a queue, making data from created processes available 706 in the order in which it is received. 707 """ 708 709 def __init__(self, *args, **kw): 710 Exchange.__init__(self, *args, **kw) 711 self.queue = [] 712 713 def store_data(self, channel): 714 715 "Accumulate the incoming data, associating results with channels." 716 717 data = channel.receive() 718 self.queue.insert(0, data) 719 720 def __iter__(self): 721 return self 722 723 def next(self): 724 725 "Return the next element in the queue." 726 727 if self.queue: 728 return self.queue.pop() 729 while self.active(): 730 self.store() 731 if self.queue: 732 return self.queue.pop() 733 else: 734 raise StopIteration 735 736 class MakeParallel: 737 738 "A wrapper around functions making them able to communicate results." 739 740 def __init__(self, callable): 741 742 """ 743 Initialise the wrapper with the given 'callable'. This object will then 744 be able to accept a 'channel' parameter when invoked, and to forward the 745 result of the given 'callable' via the channel provided back to the 746 invoking process. 747 """ 748 749 self.callable = callable 750 751 def __call__(self, channel, *args, **kw): 752 753 "Invoke the callable and return its result via the given 'channel'." 754 755 channel.send(self.callable(*args, **kw)) 756 757 class MakeReusable(MakeParallel): 758 759 """ 760 A wrapper around functions making them able to communicate results in a 761 reusable fashion. 762 """ 763 764 def __call__(self, channel, *args, **kw): 765 766 "Invoke the callable and return its result via the given 'channel'." 767 768 channel.send(self.callable(*args, **kw)) 769 t = channel.receive() 770 while t is not None: 771 args, kw = t 772 channel.send(self.callable(*args, **kw)) 773 t = channel.receive() 774 775 # Persistent variants. 776 777 class PersistentExchange(Persistent, Exchange): 778 779 "An exchange which manages persistent communications." 780 781 pass 782 783 class PersistentQueue(Persistent, Queue): 784 785 "A queue which manages persistent communications." 786 787 pass 788 789 # Convenience functions. 790 791 def BackgroundQueue(address): 792 793 """ 794 Connect to a process reachable via the given 'address', making the results 795 of which accessible via a queue. 796 """ 797 798 queue = PersistentQueue(limit=1) 799 queue.connect(address) 800 return queue 801 802 def pmap(callable, sequence, limit=None): 803 804 """ 805 A parallel version of the built-in map function with an optional process 806 'limit'. The given 'callable' should not be parallel-aware (that is, have a 807 'channel' parameter) since it will be wrapped for parallel communications 808 before being invoked. 809 810 Return the processed 'sequence' where each element in the sequence is 811 processed by a different process. 812 """ 813 814 mymap = Map(limit=limit) 815 return mymap(callable, sequence) 816 817 # Utility functions. 818 819 _cpuinfo_fields = "processor", "physical id", "core id" 820 821 def _get_number_of_cores(): 822 823 """ 824 Return the number of distinct, genuine processor cores. If the platform is 825 not supported by this function, None is returned. 826 """ 827 828 try: 829 f = open("/proc/cpuinfo") 830 try: 831 processors = set() 832 833 # Use the _cpuinfo_field values as "digits" in a larger unique 834 # core identifier. 835 836 processor = [None, None, None] 837 838 for line in f.xreadlines(): 839 for i, field in enumerate(_cpuinfo_fields): 840 841 # Where the field is found, insert the value into the 842 # appropriate location in the processor identifier. 843 844 if line.startswith(field): 845 t = line.split(":") 846 processor[i] = int(t[1].strip()) 847 break 848 849 # Where a new processor description is started, record the 850 # identifier. 851 852 if line.startswith("processor") and processor[0] is not None: 853 processors.add(tuple(processor)) 854 processor = [None, None, None] 855 856 # At the end of reading the file, add any unrecorded processors. 857 858 if processor[0] is not None: 859 processors.add(tuple(processor)) 860 861 return len(processors) 862 863 finally: 864 f.close() 865 866 except OSError: 867 return None 868 869 def _get_number_of_cores_solaris(): 870 871 """ 872 Return the number of cores for OpenSolaris 2008.05 and possibly other 873 editions of Solaris. 874 """ 875 876 f = os.popen("psrinfo -p") 877 try: 878 return int(f.read().strip()) 879 finally: 880 f.close() 881 882 # Low-level functions. 883 884 def create_socketpair(): 885 886 """ 887 Create a new process, returning a communications channel to both the 888 creating process and the created process. 889 """ 890 891 parent, child = socket.socketpair() 892 for s in [parent, child]: 893 s.setblocking(1) 894 895 pid = os.fork() 896 if pid == 0: 897 parent.close() 898 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 899 else: 900 child.close() 901 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 902 903 def create_pipes(): 904 905 """ 906 Create a new process, returning a communications channel to both the 907 creating process and the created process. 908 909 This function uses pipes instead of a socket pair, since some platforms 910 seem to have problems with poll and such socket pairs. 911 """ 912 913 pr, cw = os.pipe() 914 cr, pw = os.pipe() 915 916 pid = os.fork() 917 if pid == 0: 918 os.close(pr) 919 os.close(pw) 920 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 921 else: 922 os.close(cr) 923 os.close(cw) 924 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 925 926 if platform.system() == "SunOS": 927 create = create_pipes 928 get_number_of_cores = _get_number_of_cores_solaris 929 else: 930 create = create_socketpair 931 get_number_of_cores = _get_number_of_cores 932 933 def create_persistent(address): 934 935 """ 936 Create a new process, returning a persistent communications channel between 937 the creating process and the created process. This channel can be 938 disconnected from the creating process and connected to another process, and 939 thus can be used to collect results from daemon processes. 940 941 In order to be able to reconnect to created processes, the 'address' of the 942 communications endpoint for the created process needs to be provided. This 943 should be a filename. 944 """ 945 946 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 947 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 948 child.bind(address) 949 950 for s in [parent, child]: 951 s.setblocking(1) 952 953 pid = os.fork() 954 if pid == 0: 955 parent.close() 956 return PersistentChannel(pid, child, address) 957 else: 958 child.close() 959 #parent.connect(address) 960 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 961 962 def connect_persistent(address): 963 964 """ 965 Connect via a persistent channel to an existing created process, reachable 966 at the given 'address'. 967 """ 968 969 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 970 parent.setblocking(1) 971 parent.connect(address) 972 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 973 974 def exit(channel): 975 976 """ 977 Terminate a created process, closing the given 'channel'. 978 """ 979 980 channel.close() 981 os._exit(0) 982 983 def start(callable, *args, **kw): 984 985 """ 986 Create a new process which shall start running in the given 'callable'. 987 Additional arguments to the 'callable' can be given as additional arguments 988 to this function. 989 990 Return a communications channel to the creating process. For the created 991 process, supply a channel as the 'channel' parameter in the given 'callable' 992 so that it may send data back to the creating process. 993 """ 994 995 channel = create() 996 if channel.pid == 0: 997 try: 998 try: 999 callable(channel, *args, **kw) 1000 except: 1001 exc_type, exc_value, exc_traceback = sys.exc_info() 1002 channel.send(exc_value) 1003 finally: 1004 exit(channel) 1005 else: 1006 return channel 1007 1008 def start_persistent(address, callable, *args, **kw): 1009 1010 """ 1011 Create a new process which shall be reachable using the given 'address' and 1012 which will start running in the given 'callable'. Additional arguments to 1013 the 'callable' can be given as additional arguments to this function. 1014 1015 Return a communications channel to the creating process. For the created 1016 process, supply a channel as the 'channel' parameter in the given 'callable' 1017 so that it may send data back to the creating process. 1018 1019 Note that the created process employs a channel which is persistent: it can 1020 withstand disconnection from the creating process and subsequent connections 1021 from other processes. 1022 """ 1023 1024 channel = create_persistent(address) 1025 if channel.pid == 0: 1026 close_streams() 1027 try: 1028 try: 1029 callable(channel, *args, **kw) 1030 except: 1031 exc_type, exc_value, exc_traceback = sys.exc_info() 1032 channel.send(exc_value) 1033 finally: 1034 exit(channel) 1035 else: 1036 return channel 1037 1038 def close_streams(): 1039 1040 """ 1041 Close streams which keep the current process attached to any creating 1042 processes. 1043 """ 1044 1045 os.close(sys.stdin.fileno()) 1046 os.close(sys.stdout.fileno()) 1047 os.close(sys.stderr.fileno()) 1048 1049 def waitall(): 1050 1051 "Wait for all created processes to terminate." 1052 1053 try: 1054 while 1: 1055 os.wait() 1056 except OSError: 1057 pass 1058 1059 # vim: tabstop=4 expandtab shiftwidth=4