1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.4.1" 24 25 import os 26 import sys 27 import select 28 import socket 29 import platform 30 31 try: 32 import cPickle as pickle 33 except ImportError: 34 import pickle 35 36 try: 37 set 38 except NameError: 39 from sets import Set as set 40 41 # Special values. 42 43 class Undefined: pass 44 45 # Communications. 46 47 class AcknowledgementError(Exception): 48 pass 49 50 class Channel: 51 52 "A communications channel." 53 54 def __init__(self, pid, read_pipe, write_pipe): 55 56 """ 57 Initialise the channel with a process identifier 'pid', a 'read_pipe' 58 from which messages will be received, and a 'write_pipe' into which 59 messages will be sent. 60 """ 61 62 self.pid = pid 63 self.read_pipe = read_pipe 64 self.write_pipe = write_pipe 65 66 def __del__(self): 67 68 # Since signals don't work well with I/O, we close pipes and wait for 69 # created processes upon finalisation. 70 71 self.close() 72 73 def close(self): 74 75 "Explicitly close the channel." 76 77 if self.read_pipe is not None: 78 self.read_pipe.close() 79 self.read_pipe = None 80 if self.write_pipe is not None: 81 self.write_pipe.close() 82 self.write_pipe = None 83 #self.wait(os.WNOHANG) 84 85 def wait(self, options=0): 86 87 "Wait for the created process, if any, to exit." 88 89 if self.pid != 0: 90 try: 91 os.waitpid(self.pid, options) 92 except OSError: 93 pass 94 95 def _send(self, obj): 96 97 "Send the given object 'obj' through the channel." 98 99 pickle.dump(obj, self.write_pipe) 100 self.write_pipe.flush() 101 102 def send(self, obj): 103 104 """ 105 Send the given object 'obj' through the channel. Then wait for an 106 acknowledgement. (The acknowledgement makes the caller wait, thus 107 preventing processes from exiting and disrupting the communications 108 channel and losing data.) 109 """ 110 111 self._send(obj) 112 if self._receive() != "OK": 113 raise AcknowledgementError, obj 114 115 def _receive(self): 116 117 "Receive an object through the channel, returning the object." 118 119 obj = pickle.load(self.read_pipe) 120 if isinstance(obj, Exception): 121 raise obj 122 else: 123 return obj 124 125 def receive(self): 126 127 """ 128 Receive an object through the channel, returning the object. Send an 129 acknowledgement of receipt. (The acknowledgement makes the sender wait, 130 thus preventing processes from exiting and disrupting the communications 131 channel and losing data.) 132 """ 133 134 try: 135 obj = self._receive() 136 return obj 137 finally: 138 self._send("OK") 139 140 class PersistentChannel(Channel): 141 142 """ 143 A persistent communications channel which can handle peer disconnection, 144 acting as a server, meaning that this channel is associated with a specific 145 address which can be contacted by other processes. 146 """ 147 148 def __init__(self, pid, endpoint, address): 149 Channel.__init__(self, pid, None, None) 150 self.endpoint = endpoint 151 self.address = address 152 self.poller = select.poll() 153 154 # Listen for connections before this process is interested in 155 # communicating. It is not desirable to wait for connections at this 156 # point because this will block the process. 157 158 self.endpoint.listen(1) 159 160 def close(self): 161 162 "Close the persistent channel and remove the socket file." 163 164 Channel.close(self) 165 try: 166 os.unlink(self.address) 167 except OSError: 168 pass 169 170 def _ensure_pipes(self): 171 172 "Ensure that the channel is capable of communicating." 173 174 if self.read_pipe is None or self.write_pipe is None: 175 176 # Accept any incoming connections. 177 178 endpoint, address = self.endpoint.accept() 179 self.read_pipe = endpoint.makefile("r", 0) 180 self.write_pipe = endpoint.makefile("w", 0) 181 182 # Monitor the write pipe for error conditions. 183 184 fileno = self.write_pipe.fileno() 185 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 186 187 def _reset_pipes(self): 188 189 "Discard the existing connection." 190 191 fileno = self.write_pipe.fileno() 192 self.poller.unregister(fileno) 193 self.read_pipe = None 194 self.write_pipe = None 195 self.endpoint.listen(1) 196 197 def _ensure_communication(self, timeout=None): 198 199 "Ensure that sending and receiving are possible." 200 201 while 1: 202 self._ensure_pipes() 203 fileno = self.write_pipe.fileno() 204 fds = self.poller.poll(timeout) 205 for fd, status in fds: 206 if fd != fileno: 207 continue 208 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 209 210 # Broken connection: discard it and start all over again. 211 212 self._reset_pipes() 213 break 214 else: 215 return 216 217 def _send(self, obj): 218 219 "Send the given object 'obj' through the channel." 220 221 self._ensure_communication() 222 Channel._send(self, obj) 223 224 def _receive(self): 225 226 "Receive an object through the channel, returning the object." 227 228 self._ensure_communication() 229 return Channel._receive(self) 230 231 # Management of processes and communications. 232 233 class Exchange: 234 235 """ 236 A communications exchange that can be used to detect channels which are 237 ready to communicate. Subclasses of this class can define the 'store_data' 238 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 239 """ 240 241 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): 242 243 """ 244 Initialise the exchange with an optional list of 'channels'. 245 246 If the optional 'limit' is specified, restrictions on the addition of 247 new channels can be enforced and observed through the 'add_wait', 'wait' 248 and 'finish' methods. To make use of these methods, create a subclass of 249 this class and define a working 'store_data' method. 250 251 If the optional 'reuse' parameter is set to a true value, channels and 252 processes will be reused for waiting computations, but the callable will 253 be invoked for each computation. 254 255 If the optional 'continuous' parameter is set to a true value, channels 256 and processes will be retained after receiving data sent from such 257 processes, since it will be assumed that they will communicate more 258 data. 259 260 If the optional 'autoclose' parameter is set to a false value, channels 261 will not be closed automatically when they are removed from the exchange 262 - by default they are closed when removed. 263 """ 264 265 self.limit = limit 266 self.reuse = reuse 267 self.autoclose = autoclose 268 self.continuous = continuous 269 270 self.waiting = [] 271 self.readables = {} 272 self.removed = [] 273 self.poller = select.poll() 274 275 for channel in channels or []: 276 self.add(channel) 277 278 def add(self, channel): 279 280 "Add the given 'channel' to the exchange." 281 282 fileno = channel.read_pipe.fileno() 283 self.readables[fileno] = channel 284 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 285 286 def active(self): 287 288 "Return a list of active channels." 289 290 return self.readables.values() 291 292 def ready(self, timeout=None): 293 294 """ 295 Wait for a period of time specified by the optional 'timeout' (or until 296 communication is possible) and return a list of channels which are ready 297 to be read from. 298 """ 299 300 fds = self.poller.poll(timeout) 301 readables = [] 302 self.removed = [] 303 304 for fd, status in fds: 305 channel = self.readables[fd] 306 removed = 0 307 308 # Remove ended/error channels. 309 310 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 311 self.remove(channel) 312 self.removed.append(channel) 313 removed = 1 314 315 # Record readable channels. 316 317 if status & select.POLLIN: 318 if not (removed and self.autoclose): 319 readables.append(channel) 320 321 return readables 322 323 def remove(self, channel): 324 325 """ 326 Remove the given 'channel' from the exchange. 327 """ 328 329 fileno = channel.read_pipe.fileno() 330 del self.readables[fileno] 331 self.poller.unregister(fileno) 332 if self.autoclose: 333 channel.close() 334 channel.wait() 335 336 # Enhanced exchange methods involving channel limits. 337 338 def add_wait(self, channel): 339 340 """ 341 Add the given 'channel' to the exchange, waiting if the limit on active 342 channels would be exceeded by adding the channel. 343 """ 344 345 self.wait() 346 self.add(channel) 347 348 def wait(self): 349 350 """ 351 Test for the limit on channels, blocking and reading incoming data until 352 the number of channels is below the limit. 353 """ 354 355 # If limited, block until channels have been closed. 356 357 while self.limit is not None and len(self.active()) >= self.limit: 358 self.store() 359 360 def _get_waiting(self, channel): 361 362 """ 363 Get waiting callable and argument information for new processes, given 364 the reception of data on the given 'channel'. 365 """ 366 367 if self.waiting: 368 callable, args, kw = self.waiting.pop() 369 370 # Try and reuse existing channels if possible. 371 372 if self.reuse: 373 374 # Re-add the channel - this may update information related to 375 # the channel in subclasses. 376 377 self.add(channel) 378 channel.send((args, kw)) 379 380 # For continuous channels, no action is taken on the channel or on 381 # new callable information. 382 383 elif self.continuous: 384 return None 385 386 else: 387 return callable, args, kw 388 389 # Where channels are being reused, but where no processes are waiting 390 # any more, send a special value to tell them to quit. 391 392 elif self.reuse: 393 channel.send(None) 394 395 return None 396 397 def finish(self): 398 399 """ 400 Finish the use of the exchange by waiting for all channels to complete. 401 """ 402 403 while self.active(): 404 self.store() 405 406 def store(self): 407 408 "For each ready channel, process the incoming data." 409 410 for channel in self.ready(): 411 self.store_data(channel) 412 self.start_waiting(channel) 413 414 def store_data(self, channel): 415 416 """ 417 Store incoming data from the specified 'channel'. In subclasses of this 418 class, such data could be stored using instance attributes. 419 """ 420 421 raise NotImplementedError, "store_data" 422 423 # Support for the convenience methods. 424 425 def _set_waiting(self, callable, args, kw): 426 427 """ 428 Support process creation by returning whether the given 'callable' has 429 been queued for later invocation. 430 """ 431 432 if self.limit is not None and len(self.active()) >= self.limit: 433 self.waiting.insert(0, (callable, args, kw)) 434 return 1 435 else: 436 return 0 437 438 def _get_channel_for_process(self, channel): 439 440 """ 441 Support process creation by returning the given 'channel' to the 442 creating process, and None to the created process. 443 """ 444 445 if channel.pid == 0: 446 return channel 447 else: 448 self.add_wait(channel) 449 return None 450 451 # Methods for overriding, related to the convenience methods. 452 453 def start_waiting(self, channel): 454 455 """ 456 Start a waiting process given the reception of data on the given 457 'channel'. 458 """ 459 460 details = self._get_waiting(channel) 461 if details is not None: 462 callable, args, kw = details 463 self.add(start(callable, *args, **kw)) 464 465 # Convenience methods. 466 467 def start(self, callable, *args, **kw): 468 469 """ 470 Create a new process for the given 'callable' using any additional 471 arguments provided. Then, monitor the channel created between this 472 process and the created process. 473 """ 474 475 if self._set_waiting(callable, args, kw): 476 return 477 478 self.add_wait(start(callable, *args, **kw)) 479 480 def create(self): 481 482 """ 483 Create a new process and return the created communications channel to 484 the created process. In the creating process, return None - the channel 485 receiving data from the created process will be automatically managed by 486 this exchange. 487 """ 488 489 channel = create() 490 return self._get_channel_for_process(channel) 491 492 def manage(self, callable): 493 494 """ 495 Wrap the given 'callable' in an object which can then be called in the 496 same way as 'callable', but with new processes and communications 497 managed automatically. 498 """ 499 500 return ManagedCallable(callable, self) 501 502 class Persistent: 503 504 """ 505 A mix-in class providing methods to exchanges for the management of 506 persistent communications. 507 """ 508 509 def start_waiting(self, channel): 510 511 """ 512 Start a waiting process given the reception of data on the given 513 'channel'. 514 """ 515 516 details = self._get_waiting(channel) 517 if details is not None: 518 callable, args, kw = details 519 self.add(start_persistent(channel.address, callable, *args, **kw)) 520 521 def start(self, address, callable, *args, **kw): 522 523 """ 524 Create a new process, located at the given 'address', for the given 525 'callable' using any additional arguments provided. Then, monitor the 526 channel created between this process and the created process. 527 """ 528 529 if self._set_waiting(callable, args, kw): 530 return 531 532 start_persistent(address, callable, *args, **kw) 533 534 def create(self, address): 535 536 """ 537 Create a new process, located at the given 'address', and return the 538 created communications channel to the created process. In the creating 539 process, return None - the channel receiving data from the created 540 process will be automatically managed by this exchange. 541 """ 542 543 channel = create_persistent(address) 544 return self._get_channel_for_process(channel) 545 546 def manage(self, address, callable): 547 548 """ 549 Using the given 'address', publish the given 'callable' in an object 550 which can then be called in the same way as 'callable', but with new 551 processes and communications managed automatically. 552 """ 553 554 return PersistentCallable(address, callable, self) 555 556 def connect(self, address): 557 558 "Connect to a process which is contactable via the given 'address'." 559 560 channel = connect_persistent(address) 561 self.add_wait(channel) 562 563 class ManagedCallable: 564 565 "A callable managed by an exchange." 566 567 def __init__(self, callable, exchange): 568 569 """ 570 Wrap the given 'callable', using the given 'exchange' to monitor the 571 channels created for communications between this and the created 572 processes. Note that the 'callable' must be parallel-aware (that is, 573 have a 'channel' parameter). Use the MakeParallel class to wrap other 574 kinds of callable objects. 575 """ 576 577 self.callable = callable 578 self.exchange = exchange 579 580 def __call__(self, *args, **kw): 581 582 "Invoke the callable with the supplied arguments." 583 584 self.exchange.start(self.callable, *args, **kw) 585 586 class PersistentCallable: 587 588 "A callable which sets up a persistent communications channel." 589 590 def __init__(self, address, callable, exchange): 591 592 """ 593 Using the given 'address', wrap the given 'callable', using the given 594 'exchange' to monitor the channels created for communications between 595 this and the created processes, so that when it is called, a background 596 process is started within which the 'callable' will run. Note that the 597 'callable' must be parallel-aware (that is, have a 'channel' parameter). 598 Use the MakeParallel class to wrap other kinds of callable objects. 599 """ 600 601 self.callable = callable 602 self.exchange = exchange 603 self.address = address 604 605 def __call__(self, *args, **kw): 606 607 "Invoke the callable with the supplied arguments." 608 609 self.exchange.start(self.address, self.callable, *args, **kw) 610 611 class BackgroundCallable: 612 613 """ 614 A callable which sets up a persistent communications channel, but is 615 unmanaged by an exchange. 616 """ 617 618 def __init__(self, address, callable): 619 620 """ 621 Using the given 'address', wrap the given 'callable'. This object can 622 then be invoked, but the wrapped callable will be run in a background 623 process. Note that the 'callable' must be parallel-aware (that is, have 624 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 625 of callable objects. 626 """ 627 628 self.callable = callable 629 self.address = address 630 631 def __call__(self, *args, **kw): 632 633 "Invoke the callable with the supplied arguments." 634 635 start_persistent(self.address, self.callable, *args, **kw) 636 637 # Abstractions and utilities. 638 639 class Map(Exchange): 640 641 "An exchange which can be used like the built-in 'map' function." 642 643 def __init__(self, *args, **kw): 644 Exchange.__init__(self, *args, **kw) 645 self.init() 646 647 def init(self): 648 649 "Remember the channel addition order to order output." 650 651 self.channel_number = 0 652 self.channels = {} 653 self.results = [] 654 self.current_index = 0 655 656 def add(self, channel): 657 658 "Add the given 'channel' to the exchange." 659 660 Exchange.add(self, channel) 661 self.channels[channel] = self.channel_number 662 self.channel_number += 1 663 664 def start(self, callable, *args, **kw): 665 666 """ 667 Create a new process for the given 'callable' using any additional 668 arguments provided. Then, monitor the channel created between this 669 process and the created process. 670 """ 671 672 self.results.append(Undefined) # placeholder 673 Exchange.start(self, callable, *args, **kw) 674 675 def create(self): 676 677 """ 678 Create a new process and return the created communications channel to 679 the created process. In the creating process, return None - the channel 680 receiving data from the created process will be automatically managed by 681 this exchange. 682 """ 683 684 self.results.append(Undefined) # placeholder 685 return Exchange.create(self) 686 687 def __call__(self, callable, sequence): 688 689 "Wrap and invoke 'callable' for each element in the 'sequence'." 690 691 if not isinstance(callable, MakeParallel): 692 wrapped = MakeParallel(callable) 693 else: 694 wrapped = callable 695 696 self.init() 697 698 # Start processes for each element in the sequence. 699 700 for i in sequence: 701 self.start(wrapped, i) 702 703 # Access to the results occurs through this object. 704 705 return self 706 707 def store_data(self, channel): 708 709 "Accumulate the incoming data, associating results with channels." 710 711 data = channel.receive() 712 self.results[self.channels[channel]] = data 713 del self.channels[channel] 714 715 def __iter__(self): 716 return self 717 718 def next(self): 719 720 "Return the next element in the map." 721 722 try: 723 return self._next() 724 except IndexError: 725 pass 726 727 while self.active(): 728 self.store() 729 try: 730 return self._next() 731 except IndexError: 732 pass 733 else: 734 raise StopIteration 735 736 def __getitem__(self, i): 737 738 "Return element 'i' from the map." 739 740 try: 741 return self._get(i) 742 except IndexError: 743 pass 744 745 while self.active(): 746 self.store() 747 try: 748 return self._get(i) 749 except IndexError: 750 pass 751 else: 752 raise IndexError, i 753 754 # Helper methods for the above access methods. 755 756 def _next(self): 757 result = self._get(self.current_index) 758 self.current_index += 1 759 return result 760 761 def _get(self, i): 762 result = self.results[i] 763 if result is Undefined or isinstance(i, slice) and Undefined in result: 764 raise IndexError, i 765 return result 766 767 class Queue(Exchange): 768 769 """ 770 An exchange acting as a queue, making data from created processes available 771 in the order in which it is received. 772 """ 773 774 def __init__(self, *args, **kw): 775 Exchange.__init__(self, *args, **kw) 776 self.queue = [] 777 778 def store_data(self, channel): 779 780 "Accumulate the incoming data, associating results with channels." 781 782 data = channel.receive() 783 self.queue.insert(0, data) 784 785 def __iter__(self): 786 return self 787 788 def next(self): 789 790 "Return the next element in the queue." 791 792 if self.queue: 793 return self.queue.pop() 794 while self.active(): 795 self.store() 796 if self.queue: 797 return self.queue.pop() 798 else: 799 raise StopIteration 800 801 class MakeParallel: 802 803 "A wrapper around functions making them able to communicate results." 804 805 def __init__(self, callable): 806 807 """ 808 Initialise the wrapper with the given 'callable'. This object will then 809 be able to accept a 'channel' parameter when invoked, and to forward the 810 result of the given 'callable' via the channel provided back to the 811 invoking process. 812 """ 813 814 self.callable = callable 815 816 def __call__(self, channel, *args, **kw): 817 818 "Invoke the callable and return its result via the given 'channel'." 819 820 channel.send(self.callable(*args, **kw)) 821 822 class MakeReusable(MakeParallel): 823 824 """ 825 A wrapper around functions making them able to communicate results in a 826 reusable fashion. 827 """ 828 829 def __call__(self, channel, *args, **kw): 830 831 "Invoke the callable and return its result via the given 'channel'." 832 833 channel.send(self.callable(*args, **kw)) 834 t = channel.receive() 835 while t is not None: 836 args, kw = t 837 channel.send(self.callable(*args, **kw)) 838 t = channel.receive() 839 840 # Persistent variants. 841 842 class PersistentExchange(Persistent, Exchange): 843 844 "An exchange which manages persistent communications." 845 846 pass 847 848 class PersistentQueue(Persistent, Queue): 849 850 "A queue which manages persistent communications." 851 852 pass 853 854 # Convenience functions. 855 856 def BackgroundQueue(address): 857 858 """ 859 Connect to a process reachable via the given 'address', making the results 860 of which accessible via a queue. 861 """ 862 863 queue = PersistentQueue(limit=1) 864 queue.connect(address) 865 return queue 866 867 def pmap(callable, sequence, limit=None): 868 869 """ 870 A parallel version of the built-in map function with an optional process 871 'limit'. The given 'callable' should not be parallel-aware (that is, have a 872 'channel' parameter) since it will be wrapped for parallel communications 873 before being invoked. 874 875 Return the processed 'sequence' where each element in the sequence is 876 processed by a different process. 877 """ 878 879 mymap = Map(limit=limit) 880 return mymap(callable, sequence) 881 882 # Utility functions. 883 884 _cpuinfo_fields = "processor", "physical id", "core id" 885 886 def _get_number_of_cores(): 887 888 """ 889 Return the number of distinct, genuine processor cores. If the platform is 890 not supported by this function, None is returned. 891 """ 892 893 try: 894 f = open("/proc/cpuinfo") 895 try: 896 processors = set() 897 898 # Use the _cpuinfo_field values as "digits" in a larger unique 899 # core identifier. 900 901 processor = [None, None, None] 902 903 for line in f.xreadlines(): 904 for i, field in enumerate(_cpuinfo_fields): 905 906 # Where the field is found, insert the value into the 907 # appropriate location in the processor identifier. 908 909 if line.startswith(field): 910 t = line.split(":") 911 processor[i] = int(t[1].strip()) 912 break 913 914 # Where a new processor description is started, record the 915 # identifier. 916 917 if line.startswith("processor") and processor[0] is not None: 918 processors.add(tuple(processor)) 919 processor = [None, None, None] 920 921 # At the end of reading the file, add any unrecorded processors. 922 923 if processor[0] is not None: 924 processors.add(tuple(processor)) 925 926 return len(processors) 927 928 finally: 929 f.close() 930 931 except OSError: 932 return None 933 934 def _get_number_of_cores_solaris(): 935 936 """ 937 Return the number of cores for OpenSolaris 2008.05 and possibly other 938 editions of Solaris. 939 """ 940 941 f = os.popen("psrinfo -p") 942 try: 943 return int(f.read().strip()) 944 finally: 945 f.close() 946 947 # Low-level functions. 948 949 def create_socketpair(): 950 951 """ 952 Create a new process, returning a communications channel to both the 953 creating process and the created process. 954 """ 955 956 parent, child = socket.socketpair() 957 for s in [parent, child]: 958 s.setblocking(1) 959 960 pid = os.fork() 961 if pid == 0: 962 parent.close() 963 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 964 else: 965 child.close() 966 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 967 968 def create_pipes(): 969 970 """ 971 Create a new process, returning a communications channel to both the 972 creating process and the created process. 973 974 This function uses pipes instead of a socket pair, since some platforms 975 seem to have problems with poll and such socket pairs. 976 """ 977 978 pr, cw = os.pipe() 979 cr, pw = os.pipe() 980 981 pid = os.fork() 982 if pid == 0: 983 os.close(pr) 984 os.close(pw) 985 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 986 else: 987 os.close(cr) 988 os.close(cw) 989 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 990 991 if platform.system() == "SunOS": 992 create = create_pipes 993 get_number_of_cores = _get_number_of_cores_solaris 994 else: 995 create = create_socketpair 996 get_number_of_cores = _get_number_of_cores 997 998 def create_persistent(address): 999 1000 """ 1001 Create a new process, returning a persistent communications channel between 1002 the creating process and the created process. This channel can be 1003 disconnected from the creating process and connected to another process, and 1004 thus can be used to collect results from daemon processes. 1005 1006 In order to be able to reconnect to created processes, the 'address' of the 1007 communications endpoint for the created process needs to be provided. This 1008 should be a filename. 1009 """ 1010 1011 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1012 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1013 child.bind(address) 1014 1015 for s in [parent, child]: 1016 s.setblocking(1) 1017 1018 pid = os.fork() 1019 if pid == 0: 1020 parent.close() 1021 return PersistentChannel(pid, child, address) 1022 else: 1023 child.close() 1024 #parent.connect(address) 1025 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1026 1027 def connect_persistent(address): 1028 1029 """ 1030 Connect via a persistent channel to an existing created process, reachable 1031 at the given 'address'. 1032 """ 1033 1034 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1035 parent.setblocking(1) 1036 parent.connect(address) 1037 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 1038 1039 def exit(channel): 1040 1041 """ 1042 Terminate a created process, closing the given 'channel'. 1043 """ 1044 1045 channel.close() 1046 os._exit(0) 1047 1048 def start(callable, *args, **kw): 1049 1050 """ 1051 Create a new process which shall start running in the given 'callable'. 1052 Additional arguments to the 'callable' can be given as additional arguments 1053 to this function. 1054 1055 Return a communications channel to the creating process. For the created 1056 process, supply a channel as the 'channel' parameter in the given 'callable' 1057 so that it may send data back to the creating process. 1058 """ 1059 1060 channel = create() 1061 if channel.pid == 0: 1062 try: 1063 try: 1064 callable(channel, *args, **kw) 1065 except: 1066 exc_type, exc_value, exc_traceback = sys.exc_info() 1067 channel.send(exc_value) 1068 finally: 1069 exit(channel) 1070 else: 1071 return channel 1072 1073 def start_persistent(address, callable, *args, **kw): 1074 1075 """ 1076 Create a new process which shall be reachable using the given 'address' and 1077 which will start running in the given 'callable'. Additional arguments to 1078 the 'callable' can be given as additional arguments to this function. 1079 1080 Return a communications channel to the creating process. For the created 1081 process, supply a channel as the 'channel' parameter in the given 'callable' 1082 so that it may send data back to the creating process. 1083 1084 Note that the created process employs a channel which is persistent: it can 1085 withstand disconnection from the creating process and subsequent connections 1086 from other processes. 1087 """ 1088 1089 channel = create_persistent(address) 1090 if channel.pid == 0: 1091 close_streams() 1092 try: 1093 try: 1094 callable(channel, *args, **kw) 1095 except: 1096 exc_type, exc_value, exc_traceback = sys.exc_info() 1097 channel.send(exc_value) 1098 finally: 1099 exit(channel) 1100 else: 1101 return channel 1102 1103 def close_streams(): 1104 1105 """ 1106 Close streams which keep the current process attached to any creating 1107 processes. 1108 """ 1109 1110 os.close(sys.stdin.fileno()) 1111 os.close(sys.stdout.fileno()) 1112 os.close(sys.stderr.fileno()) 1113 1114 def waitall(): 1115 1116 "Wait for all created processes to terminate." 1117 1118 try: 1119 while 1: 1120 os.wait() 1121 except OSError: 1122 pass 1123 1124 # vim: tabstop=4 expandtab shiftwidth=4