1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.4" 24 25 import os 26 import sys 27 import select 28 import socket 29 import platform 30 31 try: 32 import cPickle as pickle 33 except ImportError: 34 import pickle 35 36 try: 37 set 38 except NameError: 39 from sets import Set as set 40 41 # Communications. 42 43 class AcknowledgementError(Exception): 44 pass 45 46 class Channel: 47 48 "A communications channel." 49 50 def __init__(self, pid, read_pipe, write_pipe): 51 52 """ 53 Initialise the channel with a process identifier 'pid', a 'read_pipe' 54 from which messages will be received, and a 'write_pipe' into which 55 messages will be sent. 56 """ 57 58 self.pid = pid 59 self.read_pipe = read_pipe 60 self.write_pipe = write_pipe 61 62 def __del__(self): 63 64 # Since signals don't work well with I/O, we close pipes and wait for 65 # created processes upon finalisation. 66 67 self.close() 68 69 def close(self): 70 71 "Explicitly close the channel." 72 73 if self.read_pipe is not None: 74 self.read_pipe.close() 75 self.read_pipe = None 76 if self.write_pipe is not None: 77 self.write_pipe.close() 78 self.write_pipe = None 79 #self.wait(os.WNOHANG) 80 81 def wait(self, options=0): 82 83 "Wait for the created process, if any, to exit." 84 85 if self.pid != 0: 86 try: 87 os.waitpid(self.pid, options) 88 except OSError: 89 pass 90 91 def _send(self, obj): 92 93 "Send the given object 'obj' through the channel." 94 95 pickle.dump(obj, self.write_pipe) 96 self.write_pipe.flush() 97 98 def send(self, obj): 99 100 """ 101 Send the given object 'obj' through the channel. Then wait for an 102 acknowledgement. (The acknowledgement makes the caller wait, thus 103 preventing processes from exiting and disrupting the communications 104 channel and losing data.) 105 """ 106 107 self._send(obj) 108 if self._receive() != "OK": 109 raise AcknowledgementError, obj 110 111 def _receive(self): 112 113 "Receive an object through the channel, returning the object." 114 115 obj = pickle.load(self.read_pipe) 116 if isinstance(obj, Exception): 117 raise obj 118 else: 119 return obj 120 121 def receive(self): 122 123 """ 124 Receive an object through the channel, returning the object. Send an 125 acknowledgement of receipt. (The acknowledgement makes the sender wait, 126 thus preventing processes from exiting and disrupting the communications 127 channel and losing data.) 128 """ 129 130 try: 131 obj = self._receive() 132 return obj 133 finally: 134 self._send("OK") 135 136 class PersistentChannel(Channel): 137 138 """ 139 A persistent communications channel which can handle peer disconnection, 140 acting as a server, meaning that this channel is associated with a specific 141 address which can be contacted by other processes. 142 """ 143 144 def __init__(self, pid, endpoint, address): 145 Channel.__init__(self, pid, None, None) 146 self.endpoint = endpoint 147 self.address = address 148 self.poller = select.poll() 149 150 # Listen for connections before this process is interested in 151 # communicating. It is not desirable to wait for connections at this 152 # point because this will block the process. 153 154 self.endpoint.listen(1) 155 156 def close(self): 157 158 "Close the persistent channel and remove the socket file." 159 160 Channel.close(self) 161 try: 162 os.unlink(self.address) 163 except OSError: 164 pass 165 166 def _ensure_pipes(self): 167 168 "Ensure that the channel is capable of communicating." 169 170 if self.read_pipe is None or self.write_pipe is None: 171 172 # Accept any incoming connections. 173 174 endpoint, address = self.endpoint.accept() 175 self.read_pipe = endpoint.makefile("r", 0) 176 self.write_pipe = endpoint.makefile("w", 0) 177 178 # Monitor the write pipe for error conditions. 179 180 fileno = self.write_pipe.fileno() 181 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 182 183 def _reset_pipes(self): 184 185 "Discard the existing connection." 186 187 fileno = self.write_pipe.fileno() 188 self.poller.unregister(fileno) 189 self.read_pipe = None 190 self.write_pipe = None 191 self.endpoint.listen(1) 192 193 def _ensure_communication(self, timeout=None): 194 195 "Ensure that sending and receiving are possible." 196 197 while 1: 198 self._ensure_pipes() 199 fileno = self.write_pipe.fileno() 200 fds = self.poller.poll(timeout) 201 for fd, status in fds: 202 if fd != fileno: 203 continue 204 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 205 206 # Broken connection: discard it and start all over again. 207 208 self._reset_pipes() 209 break 210 else: 211 return 212 213 def _send(self, obj): 214 215 "Send the given object 'obj' through the channel." 216 217 self._ensure_communication() 218 Channel._send(self, obj) 219 220 def _receive(self): 221 222 "Receive an object through the channel, returning the object." 223 224 self._ensure_communication() 225 return Channel._receive(self) 226 227 # Management of processes and communications. 228 229 class Exchange: 230 231 """ 232 A communications exchange that can be used to detect channels which are 233 ready to communicate. Subclasses of this class can define the 'store_data' 234 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 235 """ 236 237 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 238 239 """ 240 Initialise the exchange with an optional list of 'channels'. 241 242 If the optional 'limit' is specified, restrictions on the addition of 243 new channels can be enforced and observed through the 'add_wait', 'wait' 244 and 'finish' methods. To make use of these methods, create a subclass of 245 this class and define a working 'store_data' method. 246 247 If the optional 'reuse' parameter is set to a true value, channels and 248 processes will be reused for waiting computations. 249 250 If the optional 'autoclose' parameter is set to a false value, channels 251 will not be closed automatically when they are removed from the exchange 252 - by default they are closed when removed. 253 """ 254 255 self.limit = limit 256 self.reuse = reuse 257 self.autoclose = autoclose 258 self.waiting = [] 259 self.readables = {} 260 self.removed = [] 261 self.poller = select.poll() 262 for channel in channels or []: 263 self.add(channel) 264 265 def add(self, channel): 266 267 "Add the given 'channel' to the exchange." 268 269 fileno = channel.read_pipe.fileno() 270 self.readables[fileno] = channel 271 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 272 273 def active(self): 274 275 "Return a list of active channels." 276 277 return self.readables.values() 278 279 def ready(self, timeout=None): 280 281 """ 282 Wait for a period of time specified by the optional 'timeout' (or until 283 communication is possible) and return a list of channels which are ready 284 to be read from. 285 """ 286 287 fds = self.poller.poll(timeout) 288 readables = [] 289 self.removed = [] 290 291 for fd, status in fds: 292 channel = self.readables[fd] 293 removed = 0 294 295 # Remove ended/error channels. 296 297 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 298 self.remove(channel) 299 self.removed.append(channel) 300 removed = 1 301 302 # Record readable channels. 303 304 if status & select.POLLIN: 305 if not (removed and self.autoclose): 306 readables.append(channel) 307 308 return readables 309 310 def remove(self, channel): 311 312 """ 313 Remove the given 'channel' from the exchange. 314 """ 315 316 fileno = channel.read_pipe.fileno() 317 del self.readables[fileno] 318 self.poller.unregister(fileno) 319 if self.autoclose: 320 channel.close() 321 channel.wait() 322 323 # Enhanced exchange methods involving channel limits. 324 325 def add_wait(self, channel): 326 327 """ 328 Add the given 'channel' to the exchange, waiting if the limit on active 329 channels would be exceeded by adding the channel. 330 """ 331 332 self.wait() 333 self.add(channel) 334 335 def wait(self): 336 337 """ 338 Test for the limit on channels, blocking and reading incoming data until 339 the number of channels is below the limit. 340 """ 341 342 # If limited, block until channels have been closed. 343 344 while self.limit is not None and len(self.active()) >= self.limit: 345 self.store() 346 347 def _get_waiting(self, channel): 348 349 """ 350 Get waiting callable and argument information for new processes, given 351 the reception of data on the given 'channel'. 352 """ 353 354 if self.waiting: 355 callable, args, kw = self.waiting.pop() 356 357 # Try and reuse existing channels if possible. 358 359 if self.reuse: 360 361 # Re-add the channel - this may update information related to 362 # the channel in subclasses. 363 364 self.add(channel) 365 channel.send((args, kw)) 366 else: 367 return callable, args, kw 368 369 # Where channels are being reused, but where no processes are waiting 370 # any more, send a special value to tell them to quit. 371 372 elif self.reuse: 373 channel.send(None) 374 375 return None 376 377 def finish(self): 378 379 """ 380 Finish the use of the exchange by waiting for all channels to complete. 381 """ 382 383 while self.active(): 384 self.store() 385 386 def store(self): 387 388 "For each ready channel, process the incoming data." 389 390 for channel in self.ready(): 391 self.store_data(channel) 392 self.start_waiting(channel) 393 394 def store_data(self, channel): 395 396 """ 397 Store incoming data from the specified 'channel'. In subclasses of this 398 class, such data could be stored using instance attributes. 399 """ 400 401 raise NotImplementedError, "store_data" 402 403 # Support for the convenience methods. 404 405 def _set_waiting(self, callable, args, kw): 406 407 """ 408 Support process creation by returning whether the given 'callable' has 409 been queued for later invocation. 410 """ 411 412 if self.limit is not None and len(self.active()) >= self.limit: 413 self.waiting.insert(0, (callable, args, kw)) 414 return 1 415 else: 416 return 0 417 418 def _get_channel_for_process(self, channel): 419 420 """ 421 Support process creation by returning the given 'channel' to the 422 creating process, and None to the created process. 423 """ 424 425 if channel.pid == 0: 426 return channel 427 else: 428 self.add_wait(channel) 429 return None 430 431 # Methods for overriding, related to the convenience methods. 432 433 def start_waiting(self, channel): 434 435 """ 436 Start a waiting process given the reception of data on the given 437 'channel'. 438 """ 439 440 details = self._get_waiting(channel) 441 if details is not None: 442 callable, args, kw = details 443 self.add(start(callable, *args, **kw)) 444 445 # Convenience methods. 446 447 def start(self, callable, *args, **kw): 448 449 """ 450 Create a new process for the given 'callable' using any additional 451 arguments provided. Then, monitor the channel created between this 452 process and the created process. 453 """ 454 455 if self._set_waiting(callable, args, kw): 456 return 457 458 self.add_wait(start(callable, *args, **kw)) 459 460 def create(self): 461 462 """ 463 Create a new process and return the created communications channel to 464 the created process. In the creating process, return None - the channel 465 receiving data from the created process will be automatically managed by 466 this exchange. 467 """ 468 469 channel = create() 470 return self._get_channel_for_process(channel) 471 472 def manage(self, callable): 473 474 """ 475 Wrap the given 'callable' in an object which can then be called in the 476 same way as 'callable', but with new processes and communications 477 managed automatically. 478 """ 479 480 return ManagedCallable(callable, self) 481 482 class Persistent: 483 484 """ 485 A mix-in class providing methods to exchanges for the management of 486 persistent communications. 487 """ 488 489 def start_waiting(self, channel): 490 491 """ 492 Start a waiting process given the reception of data on the given 493 'channel'. 494 """ 495 496 details = self._get_waiting(channel) 497 if details is not None: 498 callable, args, kw = details 499 self.add(start_persistent(channel.address, callable, *args, **kw)) 500 501 def start(self, address, callable, *args, **kw): 502 503 """ 504 Create a new process, located at the given 'address', for the given 505 'callable' using any additional arguments provided. Then, monitor the 506 channel created between this process and the created process. 507 """ 508 509 if self._set_waiting(callable, args, kw): 510 return 511 512 start_persistent(address, callable, *args, **kw) 513 514 def create(self, address): 515 516 """ 517 Create a new process, located at the given 'address', and return the 518 created communications channel to the created process. In the creating 519 process, return None - the channel receiving data from the created 520 process will be automatically managed by this exchange. 521 """ 522 523 channel = create_persistent(address) 524 return self._get_channel_for_process(channel) 525 526 def manage(self, address, callable): 527 528 """ 529 Using the given 'address', publish the given 'callable' in an object 530 which can then be called in the same way as 'callable', but with new 531 processes and communications managed automatically. 532 """ 533 534 return PersistentCallable(address, callable, self) 535 536 def connect(self, address): 537 538 "Connect to a process which is contactable via the given 'address'." 539 540 channel = connect_persistent(address) 541 self.add_wait(channel) 542 543 class ManagedCallable: 544 545 "A callable managed by an exchange." 546 547 def __init__(self, callable, exchange): 548 549 """ 550 Wrap the given 'callable', using the given 'exchange' to monitor the 551 channels created for communications between this and the created 552 processes. Note that the 'callable' must be parallel-aware (that is, 553 have a 'channel' parameter). Use the MakeParallel class to wrap other 554 kinds of callable objects. 555 """ 556 557 self.callable = callable 558 self.exchange = exchange 559 560 def __call__(self, *args, **kw): 561 562 "Invoke the callable with the supplied arguments." 563 564 self.exchange.start(self.callable, *args, **kw) 565 566 class PersistentCallable: 567 568 "A callable which sets up a persistent communications channel." 569 570 def __init__(self, address, callable, exchange): 571 572 """ 573 Using the given 'address', wrap the given 'callable', using the given 574 'exchange' to monitor the channels created for communications between 575 this and the created processes, so that when it is called, a background 576 process is started within which the 'callable' will run. Note that the 577 'callable' must be parallel-aware (that is, have a 'channel' parameter). 578 Use the MakeParallel class to wrap other kinds of callable objects. 579 """ 580 581 self.callable = callable 582 self.exchange = exchange 583 self.address = address 584 585 def __call__(self, *args, **kw): 586 587 "Invoke the callable with the supplied arguments." 588 589 self.exchange.start(self.address, self.callable, *args, **kw) 590 591 class BackgroundCallable: 592 593 """ 594 A callable which sets up a persistent communications channel, but is 595 unmanaged by an exchange. 596 """ 597 598 def __init__(self, address, callable): 599 600 """ 601 Using the given 'address', wrap the given 'callable'. This object can 602 then be invoked, but the wrapped callable will be run in a background 603 process. Note that the 'callable' must be parallel-aware (that is, have 604 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 605 of callable objects. 606 """ 607 608 self.callable = callable 609 self.address = address 610 611 def __call__(self, *args, **kw): 612 613 "Invoke the callable with the supplied arguments." 614 615 start_persistent(self.address, self.callable, *args, **kw) 616 617 # Abstractions and utilities. 618 619 class Map(Exchange): 620 621 "An exchange which can be used like the built-in 'map' function." 622 623 def __init__(self, *args, **kw): 624 Exchange.__init__(self, *args, **kw) 625 self.init() 626 627 def init(self): 628 629 "Remember the channel addition order to order output." 630 631 self.channel_number = 0 632 self.channels = {} 633 self.results = [] 634 635 def add(self, channel): 636 637 "Add the given 'channel' to the exchange." 638 639 Exchange.add(self, channel) 640 self.channels[channel] = self.channel_number 641 self.channel_number += 1 642 643 def start(self, callable, *args, **kw): 644 645 """ 646 Create a new process for the given 'callable' using any additional 647 arguments provided. Then, monitor the channel created between this 648 process and the created process. 649 """ 650 651 self.results.append(None) # placeholder 652 Exchange.start(self, callable, *args, **kw) 653 654 def create(self): 655 656 """ 657 Create a new process and return the created communications channel to 658 the created process. In the creating process, return None - the channel 659 receiving data from the created process will be automatically managed by 660 this exchange. 661 """ 662 663 self.results.append(None) # placeholder 664 return Exchange.create(self) 665 666 def __call__(self, callable, sequence): 667 668 "Wrap and invoke 'callable' for each element in the 'sequence'." 669 670 if not isinstance(callable, MakeParallel): 671 wrapped = MakeParallel(callable) 672 else: 673 wrapped = callable 674 675 self.init() 676 677 # Start processes for each element in the sequence. 678 679 for i in sequence: 680 self.start(wrapped, i) 681 682 # Access to the results occurs through this object. 683 684 return self 685 686 def __getitem__(self, i): 687 self.finish() 688 return self.results[i] 689 690 def __iter__(self): 691 self.finish() 692 return iter(self.results) 693 694 def store_data(self, channel): 695 696 "Accumulate the incoming data, associating results with channels." 697 698 data = channel.receive() 699 self.results[self.channels[channel]] = data 700 del self.channels[channel] 701 702 class Queue(Exchange): 703 704 """ 705 An exchange acting as a queue, making data from created processes available 706 in the order in which it is received. 707 """ 708 709 def __init__(self, *args, **kw): 710 Exchange.__init__(self, *args, **kw) 711 self.queue = [] 712 713 def store_data(self, channel): 714 715 "Accumulate the incoming data, associating results with channels." 716 717 data = channel.receive() 718 self.queue.insert(0, data) 719 720 def __iter__(self): 721 return self 722 723 def next(self): 724 725 "Return the next element in the queue." 726 727 if self.queue: 728 return self.queue.pop() 729 while self.active(): 730 self.store() 731 if self.queue: 732 return self.queue.pop() 733 else: 734 raise StopIteration 735 736 class MakeParallel: 737 738 "A wrapper around functions making them able to communicate results." 739 740 def __init__(self, callable): 741 742 """ 743 Initialise the wrapper with the given 'callable'. This object will then 744 be able to accept a 'channel' parameter when invoked, and to forward the 745 result of the given 'callable' via the channel provided back to the 746 invoking process. 747 """ 748 749 self.callable = callable 750 751 def __call__(self, channel, *args, **kw): 752 753 "Invoke the callable and return its result via the given 'channel'." 754 755 channel.send(self.callable(*args, **kw)) 756 757 class MakeReusable(MakeParallel): 758 759 """ 760 A wrapper around functions making them able to communicate results in a 761 reusable fashion. 762 """ 763 764 def __call__(self, channel, *args, **kw): 765 766 "Invoke the callable and return its result via the given 'channel'." 767 768 channel.send(self.callable(*args, **kw)) 769 t = channel.receive() 770 while t is not None: 771 args, kw = t 772 channel.send(self.callable(*args, **kw)) 773 t = channel.receive() 774 775 # Persistent variants. 776 777 class PersistentExchange(Persistent, Exchange): 778 779 "An exchange which manages persistent communications." 780 781 pass 782 783 class PersistentQueue(Persistent, Queue): 784 785 "A queue which manages persistent communications." 786 787 pass 788 789 # Convenience functions. 790 791 def BackgroundQueue(address): 792 793 """ 794 Connect to a process reachable via the given 'address', making the results 795 of which accessible via a queue. 796 """ 797 798 queue = PersistentQueue(limit=1) 799 queue.connect(address) 800 return queue 801 802 def pmap(callable, sequence, limit=None): 803 804 """ 805 A parallel version of the built-in map function with an optional process 806 'limit'. The given 'callable' should not be parallel-aware (that is, have a 807 'channel' parameter) since it will be wrapped for parallel communications 808 before being invoked. 809 810 Return the processed 'sequence' where each element in the sequence is 811 processed by a different process. 812 """ 813 814 mymap = Map(limit=limit) 815 return mymap(callable, sequence) 816 817 # Utility functions. 818 819 _cpuinfo_fields = "physical id", "core id" 820 821 def _get_number_of_cores(): 822 823 """ 824 Return the number of distinct, genuine processor cores. If the platform is 825 not supported by this function, None is returned. 826 """ 827 828 try: 829 f = open("/proc/cpuinfo") 830 try: 831 processors = set() 832 processor = [None, None] 833 834 for line in f.xreadlines(): 835 for i, field in enumerate(_cpuinfo_fields): 836 if line.startswith(field): 837 t = line.split(":") 838 processor[i] = int(t[1].strip()) 839 break 840 else: 841 if line.startswith("processor") and processor[0] is not None: 842 processors.add(tuple(processor)) 843 processor = [None, None] 844 845 if processor[0] is not None: 846 processors.add(tuple(processor)) 847 848 return len(processors) 849 850 finally: 851 f.close() 852 853 except OSError: 854 return None 855 856 def _get_number_of_cores_solaris(): 857 858 """ 859 Return the number of cores for OpenSolaris 2008.05 and possibly other 860 editions of Solaris. 861 """ 862 863 f = os.popen("psrinfo -p") 864 try: 865 return int(f.read().strip()) 866 finally: 867 f.close() 868 869 # Low-level functions. 870 871 def create_socketpair(): 872 873 """ 874 Create a new process, returning a communications channel to both the 875 creating process and the created process. 876 """ 877 878 parent, child = socket.socketpair() 879 for s in [parent, child]: 880 s.setblocking(1) 881 882 pid = os.fork() 883 if pid == 0: 884 parent.close() 885 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 886 else: 887 child.close() 888 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 889 890 def create_pipes(): 891 892 """ 893 Create a new process, returning a communications channel to both the 894 creating process and the created process. 895 896 This function uses pipes instead of a socket pair, since some platforms 897 seem to have problems with poll and such socket pairs. 898 """ 899 900 pr, cw = os.pipe() 901 cr, pw = os.pipe() 902 903 pid = os.fork() 904 if pid == 0: 905 os.close(pr) 906 os.close(pw) 907 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 908 else: 909 os.close(cr) 910 os.close(cw) 911 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 912 913 if platform.system() == "SunOS": 914 create = create_pipes 915 get_number_of_cores = _get_number_of_cores_solaris 916 else: 917 create = create_socketpair 918 get_number_of_cores = _get_number_of_cores 919 920 def create_persistent(address): 921 922 """ 923 Create a new process, returning a persistent communications channel between 924 the creating process and the created process. This channel can be 925 disconnected from the creating process and connected to another process, and 926 thus can be used to collect results from daemon processes. 927 928 In order to be able to reconnect to created processes, the 'address' of the 929 communications endpoint for the created process needs to be provided. This 930 should be a filename. 931 """ 932 933 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 934 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 935 child.bind(address) 936 937 for s in [parent, child]: 938 s.setblocking(1) 939 940 pid = os.fork() 941 if pid == 0: 942 parent.close() 943 return PersistentChannel(pid, child, address) 944 else: 945 child.close() 946 #parent.connect(address) 947 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 948 949 def connect_persistent(address): 950 951 """ 952 Connect via a persistent channel to an existing created process, reachable 953 at the given 'address'. 954 """ 955 956 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 957 parent.setblocking(1) 958 parent.connect(address) 959 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 960 961 def exit(channel): 962 963 """ 964 Terminate a created process, closing the given 'channel'. 965 """ 966 967 channel.close() 968 os._exit(0) 969 970 def start(callable, *args, **kw): 971 972 """ 973 Create a new process which shall start running in the given 'callable'. 974 Additional arguments to the 'callable' can be given as additional arguments 975 to this function. 976 977 Return a communications channel to the creating process. For the created 978 process, supply a channel as the 'channel' parameter in the given 'callable' 979 so that it may send data back to the creating process. 980 """ 981 982 channel = create() 983 if channel.pid == 0: 984 try: 985 try: 986 callable(channel, *args, **kw) 987 except: 988 exc_type, exc_value, exc_traceback = sys.exc_info() 989 channel.send(exc_value) 990 finally: 991 exit(channel) 992 else: 993 return channel 994 995 def start_persistent(address, callable, *args, **kw): 996 997 """ 998 Create a new process which shall be reachable using the given 'address' and 999 which will start running in the given 'callable'. Additional arguments to 1000 the 'callable' can be given as additional arguments to this function. 1001 1002 Return a communications channel to the creating process. For the created 1003 process, supply a channel as the 'channel' parameter in the given 'callable' 1004 so that it may send data back to the creating process. 1005 1006 Note that the created process employs a channel which is persistent: it can 1007 withstand disconnection from the creating process and subsequent connections 1008 from other processes. 1009 """ 1010 1011 channel = create_persistent(address) 1012 if channel.pid == 0: 1013 close_streams() 1014 try: 1015 try: 1016 callable(channel, *args, **kw) 1017 except: 1018 exc_type, exc_value, exc_traceback = sys.exc_info() 1019 channel.send(exc_value) 1020 finally: 1021 exit(channel) 1022 else: 1023 return channel 1024 1025 def close_streams(): 1026 1027 """ 1028 Close streams which keep the current process attached to any creating 1029 processes. 1030 """ 1031 1032 os.close(sys.stdin.fileno()) 1033 os.close(sys.stdout.fileno()) 1034 os.close(sys.stderr.fileno()) 1035 1036 def waitall(): 1037 1038 "Wait for all created processes to terminate." 1039 1040 try: 1041 while 1: 1042 os.wait() 1043 except OSError: 1044 pass 1045 1046 # vim: tabstop=4 expandtab shiftwidth=4