1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.4.1" 24 25 import os 26 import sys 27 import select 28 import socket 29 import platform 30 31 try: 32 import cPickle as pickle 33 except ImportError: 34 import pickle 35 36 try: 37 set 38 except NameError: 39 from sets import Set as set 40 41 # Special values. 42 43 class Undefined: pass 44 45 # Communications. 46 47 class AcknowledgementError(Exception): 48 pass 49 50 class Channel: 51 52 "A communications channel." 53 54 def __init__(self, pid, read_pipe, write_pipe): 55 56 """ 57 Initialise the channel with a process identifier 'pid', a 'read_pipe' 58 from which messages will be received, and a 'write_pipe' into which 59 messages will be sent. 60 """ 61 62 self.pid = pid 63 self.read_pipe = read_pipe 64 self.write_pipe = write_pipe 65 66 def __del__(self): 67 68 # Since signals don't work well with I/O, we close pipes and wait for 69 # created processes upon finalisation. 70 71 self.close() 72 73 def close(self): 74 75 "Explicitly close the channel." 76 77 if self.read_pipe is not None: 78 self.read_pipe.close() 79 self.read_pipe = None 80 if self.write_pipe is not None: 81 self.write_pipe.close() 82 self.write_pipe = None 83 #self.wait(os.WNOHANG) 84 85 def wait(self, options=0): 86 87 "Wait for the created process, if any, to exit." 88 89 if self.pid != 0: 90 try: 91 os.waitpid(self.pid, options) 92 except OSError: 93 pass 94 95 def _send(self, obj): 96 97 "Send the given object 'obj' through the channel." 98 99 pickle.dump(obj, self.write_pipe) 100 self.write_pipe.flush() 101 102 def send(self, obj): 103 104 """ 105 Send the given object 'obj' through the channel. Then wait for an 106 acknowledgement. (The acknowledgement makes the caller wait, thus 107 preventing processes from exiting and disrupting the communications 108 channel and losing data.) 109 """ 110 111 self._send(obj) 112 if self._receive() != "OK": 113 raise AcknowledgementError, obj 114 115 def _receive(self): 116 117 "Receive an object through the channel, returning the object." 118 119 obj = pickle.load(self.read_pipe) 120 if isinstance(obj, Exception): 121 raise obj 122 else: 123 return obj 124 125 def receive(self): 126 127 """ 128 Receive an object through the channel, returning the object. Send an 129 acknowledgement of receipt. (The acknowledgement makes the sender wait, 130 thus preventing processes from exiting and disrupting the communications 131 channel and losing data.) 132 """ 133 134 try: 135 obj = self._receive() 136 return obj 137 finally: 138 self._send("OK") 139 140 class PersistentChannel(Channel): 141 142 """ 143 A persistent communications channel which can handle peer disconnection, 144 acting as a server, meaning that this channel is associated with a specific 145 address which can be contacted by other processes. 146 """ 147 148 def __init__(self, pid, endpoint, address): 149 Channel.__init__(self, pid, None, None) 150 self.endpoint = endpoint 151 self.address = address 152 self.poller = select.poll() 153 154 # Listen for connections before this process is interested in 155 # communicating. It is not desirable to wait for connections at this 156 # point because this will block the process. 157 158 self.endpoint.listen(1) 159 160 def close(self): 161 162 "Close the persistent channel and remove the socket file." 163 164 Channel.close(self) 165 try: 166 os.unlink(self.address) 167 except OSError: 168 pass 169 170 def _ensure_pipes(self): 171 172 "Ensure that the channel is capable of communicating." 173 174 if self.read_pipe is None or self.write_pipe is None: 175 176 # Accept any incoming connections. 177 178 endpoint, address = self.endpoint.accept() 179 self.read_pipe = endpoint.makefile("r", 0) 180 self.write_pipe = endpoint.makefile("w", 0) 181 182 # Monitor the write pipe for error conditions. 183 184 fileno = self.write_pipe.fileno() 185 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 186 187 def _reset_pipes(self): 188 189 "Discard the existing connection." 190 191 fileno = self.write_pipe.fileno() 192 self.poller.unregister(fileno) 193 self.read_pipe = None 194 self.write_pipe = None 195 self.endpoint.listen(1) 196 197 def _ensure_communication(self, timeout=None): 198 199 "Ensure that sending and receiving are possible." 200 201 while 1: 202 self._ensure_pipes() 203 fileno = self.write_pipe.fileno() 204 fds = self.poller.poll(timeout) 205 for fd, status in fds: 206 if fd != fileno: 207 continue 208 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 209 210 # Broken connection: discard it and start all over again. 211 212 self._reset_pipes() 213 break 214 else: 215 return 216 217 def _send(self, obj): 218 219 "Send the given object 'obj' through the channel." 220 221 self._ensure_communication() 222 Channel._send(self, obj) 223 224 def _receive(self): 225 226 "Receive an object through the channel, returning the object." 227 228 self._ensure_communication() 229 return Channel._receive(self) 230 231 # Management of processes and communications. 232 233 class Exchange: 234 235 """ 236 A communications exchange that can be used to detect channels which are 237 ready to communicate. Subclasses of this class can define the 'store_data' 238 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 239 """ 240 241 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 242 243 """ 244 Initialise the exchange with an optional list of 'channels'. 245 246 If the optional 'limit' is specified, restrictions on the addition of 247 new channels can be enforced and observed through the 'add_wait', 'wait' 248 and 'finish' methods. To make use of these methods, create a subclass of 249 this class and define a working 'store_data' method. 250 251 If the optional 'reuse' parameter is set to a true value, channels and 252 processes will be reused for waiting computations. 253 254 If the optional 'autoclose' parameter is set to a false value, channels 255 will not be closed automatically when they are removed from the exchange 256 - by default they are closed when removed. 257 """ 258 259 self.limit = limit 260 self.reuse = reuse 261 self.autoclose = autoclose 262 self.waiting = [] 263 self.readables = {} 264 self.removed = [] 265 self.poller = select.poll() 266 for channel in channels or []: 267 self.add(channel) 268 269 def add(self, channel): 270 271 "Add the given 'channel' to the exchange." 272 273 fileno = channel.read_pipe.fileno() 274 self.readables[fileno] = channel 275 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 276 277 def active(self): 278 279 "Return a list of active channels." 280 281 return self.readables.values() 282 283 def ready(self, timeout=None): 284 285 """ 286 Wait for a period of time specified by the optional 'timeout' (or until 287 communication is possible) and return a list of channels which are ready 288 to be read from. 289 """ 290 291 fds = self.poller.poll(timeout) 292 readables = [] 293 self.removed = [] 294 295 for fd, status in fds: 296 channel = self.readables[fd] 297 removed = 0 298 299 # Remove ended/error channels. 300 301 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 302 self.remove(channel) 303 self.removed.append(channel) 304 removed = 1 305 306 # Record readable channels. 307 308 if status & select.POLLIN: 309 if not (removed and self.autoclose): 310 readables.append(channel) 311 312 return readables 313 314 def remove(self, channel): 315 316 """ 317 Remove the given 'channel' from the exchange. 318 """ 319 320 fileno = channel.read_pipe.fileno() 321 del self.readables[fileno] 322 self.poller.unregister(fileno) 323 if self.autoclose: 324 channel.close() 325 channel.wait() 326 327 # Enhanced exchange methods involving channel limits. 328 329 def add_wait(self, channel): 330 331 """ 332 Add the given 'channel' to the exchange, waiting if the limit on active 333 channels would be exceeded by adding the channel. 334 """ 335 336 self.wait() 337 self.add(channel) 338 339 def wait(self): 340 341 """ 342 Test for the limit on channels, blocking and reading incoming data until 343 the number of channels is below the limit. 344 """ 345 346 # If limited, block until channels have been closed. 347 348 while self.limit is not None and len(self.active()) >= self.limit: 349 self.store() 350 351 def _get_waiting(self, channel): 352 353 """ 354 Get waiting callable and argument information for new processes, given 355 the reception of data on the given 'channel'. 356 """ 357 358 if self.waiting: 359 callable, args, kw = self.waiting.pop() 360 361 # Try and reuse existing channels if possible. 362 363 if self.reuse: 364 365 # Re-add the channel - this may update information related to 366 # the channel in subclasses. 367 368 self.add(channel) 369 channel.send((args, kw)) 370 else: 371 return callable, args, kw 372 373 # Where channels are being reused, but where no processes are waiting 374 # any more, send a special value to tell them to quit. 375 376 elif self.reuse: 377 channel.send(None) 378 379 return None 380 381 def finish(self): 382 383 """ 384 Finish the use of the exchange by waiting for all channels to complete. 385 """ 386 387 while self.active(): 388 self.store() 389 390 def store(self): 391 392 "For each ready channel, process the incoming data." 393 394 for channel in self.ready(): 395 self.store_data(channel) 396 self.start_waiting(channel) 397 398 def store_data(self, channel): 399 400 """ 401 Store incoming data from the specified 'channel'. In subclasses of this 402 class, such data could be stored using instance attributes. 403 """ 404 405 raise NotImplementedError, "store_data" 406 407 # Support for the convenience methods. 408 409 def _set_waiting(self, callable, args, kw): 410 411 """ 412 Support process creation by returning whether the given 'callable' has 413 been queued for later invocation. 414 """ 415 416 if self.limit is not None and len(self.active()) >= self.limit: 417 self.waiting.insert(0, (callable, args, kw)) 418 return 1 419 else: 420 return 0 421 422 def _get_channel_for_process(self, channel): 423 424 """ 425 Support process creation by returning the given 'channel' to the 426 creating process, and None to the created process. 427 """ 428 429 if channel.pid == 0: 430 return channel 431 else: 432 self.add_wait(channel) 433 return None 434 435 # Methods for overriding, related to the convenience methods. 436 437 def start_waiting(self, channel): 438 439 """ 440 Start a waiting process given the reception of data on the given 441 'channel'. 442 """ 443 444 details = self._get_waiting(channel) 445 if details is not None: 446 callable, args, kw = details 447 self.add(start(callable, *args, **kw)) 448 449 # Convenience methods. 450 451 def start(self, callable, *args, **kw): 452 453 """ 454 Create a new process for the given 'callable' using any additional 455 arguments provided. Then, monitor the channel created between this 456 process and the created process. 457 """ 458 459 if self._set_waiting(callable, args, kw): 460 return 461 462 self.add_wait(start(callable, *args, **kw)) 463 464 def create(self): 465 466 """ 467 Create a new process and return the created communications channel to 468 the created process. In the creating process, return None - the channel 469 receiving data from the created process will be automatically managed by 470 this exchange. 471 """ 472 473 channel = create() 474 return self._get_channel_for_process(channel) 475 476 def manage(self, callable): 477 478 """ 479 Wrap the given 'callable' in an object which can then be called in the 480 same way as 'callable', but with new processes and communications 481 managed automatically. 482 """ 483 484 return ManagedCallable(callable, self) 485 486 class Persistent: 487 488 """ 489 A mix-in class providing methods to exchanges for the management of 490 persistent communications. 491 """ 492 493 def start_waiting(self, channel): 494 495 """ 496 Start a waiting process given the reception of data on the given 497 'channel'. 498 """ 499 500 details = self._get_waiting(channel) 501 if details is not None: 502 callable, args, kw = details 503 self.add(start_persistent(channel.address, callable, *args, **kw)) 504 505 def start(self, address, callable, *args, **kw): 506 507 """ 508 Create a new process, located at the given 'address', for the given 509 'callable' using any additional arguments provided. Then, monitor the 510 channel created between this process and the created process. 511 """ 512 513 if self._set_waiting(callable, args, kw): 514 return 515 516 start_persistent(address, callable, *args, **kw) 517 518 def create(self, address): 519 520 """ 521 Create a new process, located at the given 'address', and return the 522 created communications channel to the created process. In the creating 523 process, return None - the channel receiving data from the created 524 process will be automatically managed by this exchange. 525 """ 526 527 channel = create_persistent(address) 528 return self._get_channel_for_process(channel) 529 530 def manage(self, address, callable): 531 532 """ 533 Using the given 'address', publish the given 'callable' in an object 534 which can then be called in the same way as 'callable', but with new 535 processes and communications managed automatically. 536 """ 537 538 return PersistentCallable(address, callable, self) 539 540 def connect(self, address): 541 542 "Connect to a process which is contactable via the given 'address'." 543 544 channel = connect_persistent(address) 545 self.add_wait(channel) 546 547 class ManagedCallable: 548 549 "A callable managed by an exchange." 550 551 def __init__(self, callable, exchange): 552 553 """ 554 Wrap the given 'callable', using the given 'exchange' to monitor the 555 channels created for communications between this and the created 556 processes. Note that the 'callable' must be parallel-aware (that is, 557 have a 'channel' parameter). Use the MakeParallel class to wrap other 558 kinds of callable objects. 559 """ 560 561 self.callable = callable 562 self.exchange = exchange 563 564 def __call__(self, *args, **kw): 565 566 "Invoke the callable with the supplied arguments." 567 568 self.exchange.start(self.callable, *args, **kw) 569 570 class PersistentCallable: 571 572 "A callable which sets up a persistent communications channel." 573 574 def __init__(self, address, callable, exchange): 575 576 """ 577 Using the given 'address', wrap the given 'callable', using the given 578 'exchange' to monitor the channels created for communications between 579 this and the created processes, so that when it is called, a background 580 process is started within which the 'callable' will run. Note that the 581 'callable' must be parallel-aware (that is, have a 'channel' parameter). 582 Use the MakeParallel class to wrap other kinds of callable objects. 583 """ 584 585 self.callable = callable 586 self.exchange = exchange 587 self.address = address 588 589 def __call__(self, *args, **kw): 590 591 "Invoke the callable with the supplied arguments." 592 593 self.exchange.start(self.address, self.callable, *args, **kw) 594 595 class BackgroundCallable: 596 597 """ 598 A callable which sets up a persistent communications channel, but is 599 unmanaged by an exchange. 600 """ 601 602 def __init__(self, address, callable): 603 604 """ 605 Using the given 'address', wrap the given 'callable'. This object can 606 then be invoked, but the wrapped callable will be run in a background 607 process. Note that the 'callable' must be parallel-aware (that is, have 608 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 609 of callable objects. 610 """ 611 612 self.callable = callable 613 self.address = address 614 615 def __call__(self, *args, **kw): 616 617 "Invoke the callable with the supplied arguments." 618 619 start_persistent(self.address, self.callable, *args, **kw) 620 621 # Abstractions and utilities. 622 623 class Map(Exchange): 624 625 "An exchange which can be used like the built-in 'map' function." 626 627 def __init__(self, *args, **kw): 628 Exchange.__init__(self, *args, **kw) 629 self.init() 630 631 def init(self): 632 633 "Remember the channel addition order to order output." 634 635 self.channel_number = 0 636 self.channels = {} 637 self.results = [] 638 self.current_index = 0 639 640 def add(self, channel): 641 642 "Add the given 'channel' to the exchange." 643 644 Exchange.add(self, channel) 645 self.channels[channel] = self.channel_number 646 self.channel_number += 1 647 648 def start(self, callable, *args, **kw): 649 650 """ 651 Create a new process for the given 'callable' using any additional 652 arguments provided. Then, monitor the channel created between this 653 process and the created process. 654 """ 655 656 self.results.append(Undefined) # placeholder 657 Exchange.start(self, callable, *args, **kw) 658 659 def create(self): 660 661 """ 662 Create a new process and return the created communications channel to 663 the created process. In the creating process, return None - the channel 664 receiving data from the created process will be automatically managed by 665 this exchange. 666 """ 667 668 self.results.append(Undefined) # placeholder 669 return Exchange.create(self) 670 671 def __call__(self, callable, sequence): 672 673 "Wrap and invoke 'callable' for each element in the 'sequence'." 674 675 if not isinstance(callable, MakeParallel): 676 wrapped = MakeParallel(callable) 677 else: 678 wrapped = callable 679 680 self.init() 681 682 # Start processes for each element in the sequence. 683 684 for i in sequence: 685 self.start(wrapped, i) 686 687 # Access to the results occurs through this object. 688 689 return self 690 691 def store_data(self, channel): 692 693 "Accumulate the incoming data, associating results with channels." 694 695 data = channel.receive() 696 self.results[self.channels[channel]] = data 697 del self.channels[channel] 698 699 def __iter__(self): 700 return self 701 702 def next(self): 703 704 "Return the next element in the map." 705 706 try: 707 return self._next() 708 except IndexError: 709 pass 710 711 while self.active(): 712 self.store() 713 try: 714 return self._next() 715 except IndexError: 716 pass 717 else: 718 raise StopIteration 719 720 def __getitem__(self, i): 721 722 "Return element 'i' from the map." 723 724 try: 725 return self._get(i) 726 except IndexError: 727 pass 728 729 while self.active(): 730 self.store() 731 try: 732 return self._get(i) 733 except IndexError: 734 pass 735 else: 736 raise IndexError, i 737 738 # Helper methods for the above access methods. 739 740 def _next(self): 741 result = self._get(self.current_index) 742 self.current_index += 1 743 return result 744 745 def _get(self, i): 746 result = self.results[i] 747 if result is Undefined or isinstance(i, slice) and Undefined in result: 748 raise IndexError, i 749 return result 750 751 class Queue(Exchange): 752 753 """ 754 An exchange acting as a queue, making data from created processes available 755 in the order in which it is received. 756 """ 757 758 def __init__(self, *args, **kw): 759 Exchange.__init__(self, *args, **kw) 760 self.queue = [] 761 762 def store_data(self, channel): 763 764 "Accumulate the incoming data, associating results with channels." 765 766 data = channel.receive() 767 self.queue.insert(0, data) 768 769 def __iter__(self): 770 return self 771 772 def next(self): 773 774 "Return the next element in the queue." 775 776 if self.queue: 777 return self.queue.pop() 778 while self.active(): 779 self.store() 780 if self.queue: 781 return self.queue.pop() 782 else: 783 raise StopIteration 784 785 class MakeParallel: 786 787 "A wrapper around functions making them able to communicate results." 788 789 def __init__(self, callable): 790 791 """ 792 Initialise the wrapper with the given 'callable'. This object will then 793 be able to accept a 'channel' parameter when invoked, and to forward the 794 result of the given 'callable' via the channel provided back to the 795 invoking process. 796 """ 797 798 self.callable = callable 799 800 def __call__(self, channel, *args, **kw): 801 802 "Invoke the callable and return its result via the given 'channel'." 803 804 channel.send(self.callable(*args, **kw)) 805 806 class MakeReusable(MakeParallel): 807 808 """ 809 A wrapper around functions making them able to communicate results in a 810 reusable fashion. 811 """ 812 813 def __call__(self, channel, *args, **kw): 814 815 "Invoke the callable and return its result via the given 'channel'." 816 817 channel.send(self.callable(*args, **kw)) 818 t = channel.receive() 819 while t is not None: 820 args, kw = t 821 channel.send(self.callable(*args, **kw)) 822 t = channel.receive() 823 824 # Persistent variants. 825 826 class PersistentExchange(Persistent, Exchange): 827 828 "An exchange which manages persistent communications." 829 830 pass 831 832 class PersistentQueue(Persistent, Queue): 833 834 "A queue which manages persistent communications." 835 836 pass 837 838 # Convenience functions. 839 840 def BackgroundQueue(address): 841 842 """ 843 Connect to a process reachable via the given 'address', making the results 844 of which accessible via a queue. 845 """ 846 847 queue = PersistentQueue(limit=1) 848 queue.connect(address) 849 return queue 850 851 def pmap(callable, sequence, limit=None): 852 853 """ 854 A parallel version of the built-in map function with an optional process 855 'limit'. The given 'callable' should not be parallel-aware (that is, have a 856 'channel' parameter) since it will be wrapped for parallel communications 857 before being invoked. 858 859 Return the processed 'sequence' where each element in the sequence is 860 processed by a different process. 861 """ 862 863 mymap = Map(limit=limit) 864 return mymap(callable, sequence) 865 866 # Utility functions. 867 868 _cpuinfo_fields = "processor", "physical id", "core id" 869 870 def _get_number_of_cores(): 871 872 """ 873 Return the number of distinct, genuine processor cores. If the platform is 874 not supported by this function, None is returned. 875 """ 876 877 try: 878 f = open("/proc/cpuinfo") 879 try: 880 processors = set() 881 882 # Use the _cpuinfo_field values as "digits" in a larger unique 883 # core identifier. 884 885 processor = [None, None, None] 886 887 for line in f.xreadlines(): 888 for i, field in enumerate(_cpuinfo_fields): 889 890 # Where the field is found, insert the value into the 891 # appropriate location in the processor identifier. 892 893 if line.startswith(field): 894 t = line.split(":") 895 processor[i] = int(t[1].strip()) 896 break 897 898 # Where a new processor description is started, record the 899 # identifier. 900 901 if line.startswith("processor") and processor[0] is not None: 902 processors.add(tuple(processor)) 903 processor = [None, None, None] 904 905 # At the end of reading the file, add any unrecorded processors. 906 907 if processor[0] is not None: 908 processors.add(tuple(processor)) 909 910 return len(processors) 911 912 finally: 913 f.close() 914 915 except OSError: 916 return None 917 918 def _get_number_of_cores_solaris(): 919 920 """ 921 Return the number of cores for OpenSolaris 2008.05 and possibly other 922 editions of Solaris. 923 """ 924 925 f = os.popen("psrinfo -p") 926 try: 927 return int(f.read().strip()) 928 finally: 929 f.close() 930 931 # Low-level functions. 932 933 def create_socketpair(): 934 935 """ 936 Create a new process, returning a communications channel to both the 937 creating process and the created process. 938 """ 939 940 parent, child = socket.socketpair() 941 for s in [parent, child]: 942 s.setblocking(1) 943 944 pid = os.fork() 945 if pid == 0: 946 parent.close() 947 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 948 else: 949 child.close() 950 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 951 952 def create_pipes(): 953 954 """ 955 Create a new process, returning a communications channel to both the 956 creating process and the created process. 957 958 This function uses pipes instead of a socket pair, since some platforms 959 seem to have problems with poll and such socket pairs. 960 """ 961 962 pr, cw = os.pipe() 963 cr, pw = os.pipe() 964 965 pid = os.fork() 966 if pid == 0: 967 os.close(pr) 968 os.close(pw) 969 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 970 else: 971 os.close(cr) 972 os.close(cw) 973 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 974 975 if platform.system() == "SunOS": 976 create = create_pipes 977 get_number_of_cores = _get_number_of_cores_solaris 978 else: 979 create = create_socketpair 980 get_number_of_cores = _get_number_of_cores 981 982 def create_persistent(address): 983 984 """ 985 Create a new process, returning a persistent communications channel between 986 the creating process and the created process. This channel can be 987 disconnected from the creating process and connected to another process, and 988 thus can be used to collect results from daemon processes. 989 990 In order to be able to reconnect to created processes, the 'address' of the 991 communications endpoint for the created process needs to be provided. This 992 should be a filename. 993 """ 994 995 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 996 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 997 child.bind(address) 998 999 for s in [parent, child]: 1000 s.setblocking(1) 1001 1002 pid = os.fork() 1003 if pid == 0: 1004 parent.close() 1005 return PersistentChannel(pid, child, address) 1006 else: 1007 child.close() 1008 #parent.connect(address) 1009 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1010 1011 def connect_persistent(address): 1012 1013 """ 1014 Connect via a persistent channel to an existing created process, reachable 1015 at the given 'address'. 1016 """ 1017 1018 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1019 parent.setblocking(1) 1020 parent.connect(address) 1021 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 1022 1023 def exit(channel): 1024 1025 """ 1026 Terminate a created process, closing the given 'channel'. 1027 """ 1028 1029 channel.close() 1030 os._exit(0) 1031 1032 def start(callable, *args, **kw): 1033 1034 """ 1035 Create a new process which shall start running in the given 'callable'. 1036 Additional arguments to the 'callable' can be given as additional arguments 1037 to this function. 1038 1039 Return a communications channel to the creating process. For the created 1040 process, supply a channel as the 'channel' parameter in the given 'callable' 1041 so that it may send data back to the creating process. 1042 """ 1043 1044 channel = create() 1045 if channel.pid == 0: 1046 try: 1047 try: 1048 callable(channel, *args, **kw) 1049 except: 1050 exc_type, exc_value, exc_traceback = sys.exc_info() 1051 channel.send(exc_value) 1052 finally: 1053 exit(channel) 1054 else: 1055 return channel 1056 1057 def start_persistent(address, callable, *args, **kw): 1058 1059 """ 1060 Create a new process which shall be reachable using the given 'address' and 1061 which will start running in the given 'callable'. Additional arguments to 1062 the 'callable' can be given as additional arguments to this function. 1063 1064 Return a communications channel to the creating process. For the created 1065 process, supply a channel as the 'channel' parameter in the given 'callable' 1066 so that it may send data back to the creating process. 1067 1068 Note that the created process employs a channel which is persistent: it can 1069 withstand disconnection from the creating process and subsequent connections 1070 from other processes. 1071 """ 1072 1073 channel = create_persistent(address) 1074 if channel.pid == 0: 1075 close_streams() 1076 try: 1077 try: 1078 callable(channel, *args, **kw) 1079 except: 1080 exc_type, exc_value, exc_traceback = sys.exc_info() 1081 channel.send(exc_value) 1082 finally: 1083 exit(channel) 1084 else: 1085 return channel 1086 1087 def close_streams(): 1088 1089 """ 1090 Close streams which keep the current process attached to any creating 1091 processes. 1092 """ 1093 1094 os.close(sys.stdin.fileno()) 1095 os.close(sys.stdout.fileno()) 1096 os.close(sys.stderr.fileno()) 1097 1098 def waitall(): 1099 1100 "Wait for all created processes to terminate." 1101 1102 try: 1103 while 1: 1104 os.wait() 1105 except OSError: 1106 pass 1107 1108 # vim: tabstop=4 expandtab shiftwidth=4