1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013 Paul Boddie <paul@boddie.org.uk> 8 Copyright (C) 2013 Yaroslav Halchenko <debian@onerussian.com> 9 10 This program is free software; you can redistribute it and/or modify it under 11 the terms of the GNU Lesser General Public License as published by the Free 12 Software Foundation; either version 3 of the License, or (at your option) any 13 later version. 14 15 This program is distributed in the hope that it will be useful, but WITHOUT 16 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 17 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 18 details. 19 20 You should have received a copy of the GNU Lesser General Public License along 21 with this program. If not, see <http://www.gnu.org/licenses/>. 22 """ 23 24 __version__ = "0.5.1" 25 26 import os 27 import sys 28 import select 29 import socket 30 import platform 31 32 from warnings import warn 33 34 try: 35 import cPickle as pickle 36 except ImportError: 37 import pickle 38 39 try: 40 set 41 except NameError: 42 from sets import Set as set 43 44 # Special values. 45 46 class Undefined: pass 47 48 # Communications. 49 50 class AcknowledgementError(Exception): 51 pass 52 53 class Channel: 54 55 "A communications channel." 56 57 def __init__(self, pid, read_pipe, write_pipe): 58 59 """ 60 Initialise the channel with a process identifier 'pid', a 'read_pipe' 61 from which messages will be received, and a 'write_pipe' into which 62 messages will be sent. 63 """ 64 65 self.pid = pid 66 self.read_pipe = read_pipe 67 self.write_pipe = write_pipe 68 69 def __del__(self): 70 71 # Since signals don't work well with I/O, we close pipes and wait for 72 # created processes upon finalisation. 73 74 self.close() 75 76 def close(self): 77 78 "Explicitly close the channel." 79 80 if self.read_pipe is not None: 81 self.read_pipe.close() 82 self.read_pipe = None 83 if self.write_pipe is not None: 84 self.write_pipe.close() 85 self.write_pipe = None 86 #self.wait(os.WNOHANG) 87 88 def wait(self, options=0): 89 90 "Wait for the created process, if any, to exit." 91 92 if self.pid != 0: 93 try: 94 os.waitpid(self.pid, options) 95 except OSError: 96 pass 97 98 def _send(self, obj): 99 100 "Send the given object 'obj' through the channel." 101 102 pickle.dump(obj, self.write_pipe) 103 self.write_pipe.flush() 104 105 def send(self, obj): 106 107 """ 108 Send the given object 'obj' through the channel. Then wait for an 109 acknowledgement. (The acknowledgement makes the caller wait, thus 110 preventing processes from exiting and disrupting the communications 111 channel and losing data.) 112 """ 113 114 self._send(obj) 115 if self._receive() != "OK": 116 raise AcknowledgementError, obj 117 118 def _receive(self): 119 120 "Receive an object through the channel, returning the object." 121 122 obj = pickle.load(self.read_pipe) 123 if isinstance(obj, Exception): 124 raise obj 125 else: 126 return obj 127 128 def receive(self): 129 130 """ 131 Receive an object through the channel, returning the object. Send an 132 acknowledgement of receipt. (The acknowledgement makes the sender wait, 133 thus preventing processes from exiting and disrupting the communications 134 channel and losing data.) 135 """ 136 137 try: 138 obj = self._receive() 139 return obj 140 finally: 141 self._send("OK") 142 143 class PersistentChannel(Channel): 144 145 """ 146 A persistent communications channel which can handle peer disconnection, 147 acting as a server, meaning that this channel is associated with a specific 148 address which can be contacted by other processes. 149 """ 150 151 def __init__(self, pid, endpoint, address): 152 Channel.__init__(self, pid, None, None) 153 self.endpoint = endpoint 154 self.address = address 155 self.poller = select.poll() 156 157 # Listen for connections before this process is interested in 158 # communicating. It is not desirable to wait for connections at this 159 # point because this will block the process. 160 161 self.endpoint.listen(1) 162 163 def close(self): 164 165 "Close the persistent channel and remove the socket file." 166 167 Channel.close(self) 168 try: 169 os.unlink(self.address) 170 except OSError: 171 pass 172 173 def _ensure_pipes(self): 174 175 "Ensure that the channel is capable of communicating." 176 177 if self.read_pipe is None or self.write_pipe is None: 178 179 # Accept any incoming connections. 180 181 endpoint, address = self.endpoint.accept() 182 self.read_pipe = endpoint.makefile("r", 0) 183 self.write_pipe = endpoint.makefile("w", 0) 184 185 # Monitor the write pipe for error conditions. 186 187 fileno = self.write_pipe.fileno() 188 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 189 190 def _reset_pipes(self): 191 192 "Discard the existing connection." 193 194 fileno = self.write_pipe.fileno() 195 self.poller.unregister(fileno) 196 self.read_pipe = None 197 self.write_pipe = None 198 self.endpoint.listen(1) 199 200 def _ensure_communication(self, timeout=None): 201 202 "Ensure that sending and receiving are possible." 203 204 while 1: 205 self._ensure_pipes() 206 fileno = self.write_pipe.fileno() 207 fds = self.poller.poll(timeout) 208 for fd, status in fds: 209 if fd != fileno: 210 continue 211 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 212 213 # Broken connection: discard it and start all over again. 214 215 self._reset_pipes() 216 break 217 else: 218 return 219 220 def _send(self, obj): 221 222 "Send the given object 'obj' through the channel." 223 224 self._ensure_communication() 225 Channel._send(self, obj) 226 227 def _receive(self): 228 229 "Receive an object through the channel, returning the object." 230 231 self._ensure_communication() 232 return Channel._receive(self) 233 234 # Management of processes and communications. 235 236 class Exchange: 237 238 """ 239 A communications exchange that can be used to detect channels which are 240 ready to communicate. Subclasses of this class can define the 'store_data' 241 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 242 243 Once exchanges are populated with active channels, use of the principal 244 methods of the exchange typically cause the 'store' method to be invoked, 245 resulting in the processing of any incoming data. 246 """ 247 248 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): 249 250 """ 251 Initialise the exchange with an optional list of 'channels'. 252 253 If the optional 'limit' is specified, restrictions on the addition of 254 new channels can be enforced and observed through the 'add_wait', 'wait' 255 and 'finish' methods. To make use of these methods, create a subclass of 256 this class and define a working 'store_data' method. 257 258 If the optional 'reuse' parameter is set to a true value, channels and 259 processes will be reused for waiting computations, but the callable will 260 be invoked for each computation. 261 262 If the optional 'continuous' parameter is set to a true value, channels 263 and processes will be retained after receiving data sent from such 264 processes, since it will be assumed that they will communicate more 265 data. 266 267 If the optional 'autoclose' parameter is set to a false value, channels 268 will not be closed automatically when they are removed from the exchange 269 - by default they are closed when removed. 270 """ 271 272 self.limit = limit 273 self.reuse = reuse 274 self.autoclose = autoclose 275 self.continuous = continuous 276 277 self.waiting = [] 278 self.readables = {} 279 self.removed = [] 280 self.poller = select.poll() 281 282 for channel in channels or []: 283 self.add(channel) 284 285 # Core methods, registering and reporting on channels. 286 287 def add(self, channel): 288 289 "Add the given 'channel' to the exchange." 290 291 fileno = channel.read_pipe.fileno() 292 self.readables[fileno] = channel 293 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 294 295 def active(self): 296 297 "Return a list of active channels." 298 299 return self.readables.values() 300 301 def ready(self, timeout=None): 302 303 """ 304 Wait for a period of time specified by the optional 'timeout' in 305 milliseconds (or until communication is possible) and return a list of 306 channels which are ready to be read from. 307 """ 308 309 fds = self.poller.poll(timeout) 310 readables = [] 311 self.removed = [] 312 313 for fd, status in fds: 314 channel = self.readables[fd] 315 removed = 0 316 317 # Remove ended/error channels. 318 319 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 320 self.remove(channel) 321 self.removed.append(channel) 322 removed = 1 323 324 # Record readable channels. 325 326 if status & select.POLLIN: 327 if not (removed and self.autoclose): 328 readables.append(channel) 329 330 return readables 331 332 def remove(self, channel): 333 334 """ 335 Remove the given 'channel' from the exchange. 336 """ 337 338 fileno = channel.read_pipe.fileno() 339 del self.readables[fileno] 340 self.poller.unregister(fileno) 341 if self.autoclose: 342 channel.close() 343 channel.wait() 344 345 # Enhanced exchange methods involving channel limits. 346 347 def unfinished(self): 348 349 "Return whether the exchange still has work scheduled or in progress." 350 351 return self.active() or self.waiting 352 353 def busy(self): 354 355 "Return whether the exchange uses as many channels as it is allowed to." 356 357 return self.limit is not None and len(self.active()) >= self.limit 358 359 def add_wait(self, channel): 360 361 """ 362 Add the given 'channel' to the exchange, waiting if the limit on active 363 channels would be exceeded by adding the channel. 364 """ 365 366 self.wait() 367 self.add(channel) 368 369 def wait(self): 370 371 """ 372 Test for the limit on channels, blocking and reading incoming data until 373 the number of channels is below the limit. 374 """ 375 376 # If limited, block until channels have been closed. 377 378 while self.busy(): 379 self.store() 380 381 def finish(self): 382 383 """ 384 Finish the use of the exchange by waiting for all channels to complete. 385 """ 386 387 while self.unfinished(): 388 self.store() 389 390 def store(self, timeout=None): 391 392 """ 393 For each ready channel, process the incoming data. If the optional 394 'timeout' parameter (a duration in milliseconds) is specified, wait only 395 for the specified duration if no channels are ready to provide data. 396 """ 397 398 # Either process input from active channels. 399 400 if self.active(): 401 for channel in self.ready(timeout): 402 try: 403 self.store_data(channel) 404 self.start_waiting(channel) 405 except IOError, exc: 406 self.remove(channel) 407 warn("Removed channel %r due to IOError: %s" % (channel, exc)) 408 409 # Or schedule new processes and channels. 410 411 else: 412 while self.waiting and not self.busy(): 413 callable, args, kw = self.waiting.pop() 414 self.start(callable, *args, **kw) 415 416 def store_data(self, channel): 417 418 """ 419 Store incoming data from the specified 'channel'. In subclasses of this 420 class, such data could be stored using instance attributes. 421 """ 422 423 raise NotImplementedError, "store_data" 424 425 # Support for the convenience methods. 426 427 def _get_waiting(self, channel): 428 429 """ 430 Get waiting callable and argument information for new processes, given 431 the reception of data on the given 'channel'. 432 """ 433 434 # For continuous channels, no scheduling is requested. 435 436 if self.waiting and not self.continuous: 437 438 # Schedule this callable and arguments. 439 440 callable, args, kw = self.waiting.pop() 441 442 # Try and reuse existing channels if possible. 443 444 if self.reuse: 445 446 # Re-add the channel - this may update information related to 447 # the channel in subclasses. 448 449 self.add(channel) 450 channel.send((args, kw)) 451 452 else: 453 return callable, args, kw 454 455 # Where channels are being reused, but where no processes are waiting 456 # any more, send a special value to tell them to quit. 457 458 elif self.reuse: 459 channel.send(None) 460 461 return None 462 463 def _set_waiting(self, callable, args, kw): 464 465 """ 466 Support process creation by returning whether the given 'callable' has 467 been queued for later invocation. 468 """ 469 470 if self.busy(): 471 self.waiting.insert(0, (callable, args, kw)) 472 return 1 473 else: 474 return 0 475 476 def _get_channel_for_process(self, channel): 477 478 """ 479 Support process creation by returning the given 'channel' to the 480 creating process, and None to the created process. 481 """ 482 483 if channel.pid == 0: 484 return channel 485 else: 486 self.add_wait(channel) 487 return None 488 489 # Methods for overriding, related to the convenience methods. 490 491 def start_waiting(self, channel): 492 493 """ 494 Start a waiting process given the reception of data on the given 495 'channel'. 496 """ 497 498 details = self._get_waiting(channel) 499 if details is not None: 500 callable, args, kw = details 501 self.add(start(callable, *args, **kw)) 502 503 # Convenience methods. 504 505 def start(self, callable, *args, **kw): 506 507 """ 508 Create a new process for the given 'callable' using any additional 509 arguments provided. Then, monitor the channel created between this 510 process and the created process. 511 """ 512 513 if self._set_waiting(callable, args, kw): 514 return 515 516 self.add_wait(start(callable, *args, **kw)) 517 518 def create(self): 519 520 """ 521 Create a new process and return the created communications channel to 522 the created process. In the creating process, return None - the channel 523 receiving data from the created process will be automatically managed by 524 this exchange. 525 """ 526 527 channel = create() 528 return self._get_channel_for_process(channel) 529 530 def manage(self, callable): 531 532 """ 533 Wrap the given 'callable' in an object which can then be called in the 534 same way as 'callable', but with new processes and communications 535 managed automatically. 536 """ 537 538 return ManagedCallable(callable, self) 539 540 class Persistent: 541 542 """ 543 A mix-in class providing methods to exchanges for the management of 544 persistent communications. 545 """ 546 547 def start_waiting(self, channel): 548 549 """ 550 Start a waiting process given the reception of data on the given 551 'channel'. 552 """ 553 554 details = self._get_waiting(channel) 555 if details is not None: 556 callable, args, kw = details 557 self.add(start_persistent(channel.address, callable, *args, **kw)) 558 559 def start(self, address, callable, *args, **kw): 560 561 """ 562 Create a new process, located at the given 'address', for the given 563 'callable' using any additional arguments provided. Then, monitor the 564 channel created between this process and the created process. 565 """ 566 567 if self._set_waiting(callable, args, kw): 568 return 569 570 start_persistent(address, callable, *args, **kw) 571 572 def create(self, address): 573 574 """ 575 Create a new process, located at the given 'address', and return the 576 created communications channel to the created process. In the creating 577 process, return None - the channel receiving data from the created 578 process will be automatically managed by this exchange. 579 """ 580 581 channel = create_persistent(address) 582 return self._get_channel_for_process(channel) 583 584 def manage(self, address, callable): 585 586 """ 587 Using the given 'address', publish the given 'callable' in an object 588 which can then be called in the same way as 'callable', but with new 589 processes and communications managed automatically. 590 """ 591 592 return PersistentCallable(address, callable, self) 593 594 def connect(self, address): 595 596 "Connect to a process which is contactable via the given 'address'." 597 598 channel = connect_persistent(address) 599 self.add_wait(channel) 600 601 class ManagedCallable: 602 603 "A callable managed by an exchange." 604 605 def __init__(self, callable, exchange): 606 607 """ 608 Wrap the given 'callable', using the given 'exchange' to monitor the 609 channels created for communications between this and the created 610 processes. Note that the 'callable' must be parallel-aware (that is, 611 have a 'channel' parameter). Use the MakeParallel class to wrap other 612 kinds of callable objects. 613 """ 614 615 self.callable = callable 616 self.exchange = exchange 617 618 def __call__(self, *args, **kw): 619 620 "Invoke the callable with the supplied arguments." 621 622 self.exchange.start(self.callable, *args, **kw) 623 624 class PersistentCallable: 625 626 "A callable which sets up a persistent communications channel." 627 628 def __init__(self, address, callable, exchange): 629 630 """ 631 Using the given 'address', wrap the given 'callable', using the given 632 'exchange' to monitor the channels created for communications between 633 this and the created processes, so that when it is called, a background 634 process is started within which the 'callable' will run. Note that the 635 'callable' must be parallel-aware (that is, have a 'channel' parameter). 636 Use the MakeParallel class to wrap other kinds of callable objects. 637 """ 638 639 self.callable = callable 640 self.exchange = exchange 641 self.address = address 642 643 def __call__(self, *args, **kw): 644 645 "Invoke the callable with the supplied arguments." 646 647 self.exchange.start(self.address, self.callable, *args, **kw) 648 649 class BackgroundCallable: 650 651 """ 652 A callable which sets up a persistent communications channel, but is 653 unmanaged by an exchange. 654 """ 655 656 def __init__(self, address, callable): 657 658 """ 659 Using the given 'address', wrap the given 'callable'. This object can 660 then be invoked, but the wrapped callable will be run in a background 661 process. Note that the 'callable' must be parallel-aware (that is, have 662 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 663 of callable objects. 664 """ 665 666 self.callable = callable 667 self.address = address 668 669 def __call__(self, *args, **kw): 670 671 "Invoke the callable with the supplied arguments." 672 673 start_persistent(self.address, self.callable, *args, **kw) 674 675 # Abstractions and utilities. 676 677 class Map(Exchange): 678 679 "An exchange which can be used like the built-in 'map' function." 680 681 def __init__(self, *args, **kw): 682 Exchange.__init__(self, *args, **kw) 683 self.init() 684 685 def init(self): 686 687 "Remember the channel addition order to order output." 688 689 self.channel_number = 0 690 self.channels = {} 691 self.results = [] 692 self.current_index = 0 693 694 def add(self, channel): 695 696 "Add the given 'channel' to the exchange." 697 698 Exchange.add(self, channel) 699 self.channels[channel] = self.channel_number 700 self.channel_number += 1 701 702 def start(self, callable, *args, **kw): 703 704 """ 705 Create a new process for the given 'callable' using any additional 706 arguments provided. Then, monitor the channel created between this 707 process and the created process. 708 """ 709 710 self.results.append(Undefined) # placeholder 711 Exchange.start(self, callable, *args, **kw) 712 713 def create(self): 714 715 """ 716 Create a new process and return the created communications channel to 717 the created process. In the creating process, return None - the channel 718 receiving data from the created process will be automatically managed by 719 this exchange. 720 """ 721 722 self.results.append(Undefined) # placeholder 723 return Exchange.create(self) 724 725 def __call__(self, callable, sequence): 726 727 "Wrap and invoke 'callable' for each element in the 'sequence'." 728 729 if not isinstance(callable, MakeParallel): 730 wrapped = MakeParallel(callable) 731 else: 732 wrapped = callable 733 734 self.init() 735 736 # Start processes for each element in the sequence. 737 738 for i in sequence: 739 self.start(wrapped, i) 740 741 # Access to the results occurs through this object. 742 743 return self 744 745 def store_data(self, channel): 746 747 "Accumulate the incoming data, associating results with channels." 748 749 data = channel.receive() 750 self.results[self.channels[channel]] = data 751 del self.channels[channel] 752 753 def __iter__(self): 754 return self 755 756 def next(self): 757 758 "Return the next element in the map." 759 760 try: 761 return self._next() 762 except IndexError: 763 pass 764 765 while self.unfinished(): 766 self.store() 767 try: 768 return self._next() 769 except IndexError: 770 pass 771 else: 772 raise StopIteration 773 774 def __getitem__(self, i): 775 776 "Return element 'i' from the map." 777 778 try: 779 return self._get(i) 780 except IndexError: 781 pass 782 783 while self.unfinished(): 784 self.store() 785 try: 786 return self._get(i) 787 except IndexError: 788 pass 789 else: 790 raise IndexError, i 791 792 # Helper methods for the above access methods. 793 794 def _next(self): 795 result = self._get(self.current_index) 796 self.current_index += 1 797 return result 798 799 def _get(self, i): 800 result = self.results[i] 801 if result is Undefined or isinstance(i, slice) and Undefined in result: 802 raise IndexError, i 803 return result 804 805 class Queue(Exchange): 806 807 """ 808 An exchange acting as a queue, making data from created processes available 809 in the order in which it is received. 810 """ 811 812 def __init__(self, *args, **kw): 813 Exchange.__init__(self, *args, **kw) 814 self.queue = [] 815 816 def store_data(self, channel): 817 818 "Accumulate the incoming data, associating results with channels." 819 820 data = channel.receive() 821 self.queue.insert(0, data) 822 823 def __iter__(self): 824 return self 825 826 def next(self): 827 828 "Return the next element in the queue." 829 830 if self.queue: 831 return self.queue.pop() 832 833 while self.unfinished(): 834 self.store() 835 if self.queue: 836 return self.queue.pop() 837 else: 838 raise StopIteration 839 840 def __len__(self): 841 842 "Return the current length of the queue." 843 844 return len(self.queue) 845 846 class MakeParallel: 847 848 "A wrapper around functions making them able to communicate results." 849 850 def __init__(self, callable): 851 852 """ 853 Initialise the wrapper with the given 'callable'. This object will then 854 be able to accept a 'channel' parameter when invoked, and to forward the 855 result of the given 'callable' via the channel provided back to the 856 invoking process. 857 """ 858 859 self.callable = callable 860 861 def __call__(self, channel, *args, **kw): 862 863 "Invoke the callable and return its result via the given 'channel'." 864 865 channel.send(self.callable(*args, **kw)) 866 867 class MakeReusable(MakeParallel): 868 869 """ 870 A wrapper around functions making them able to communicate results in a 871 reusable fashion. 872 """ 873 874 def __call__(self, channel, *args, **kw): 875 876 "Invoke the callable and return its result via the given 'channel'." 877 878 channel.send(self.callable(*args, **kw)) 879 t = channel.receive() 880 while t is not None: 881 args, kw = t 882 channel.send(self.callable(*args, **kw)) 883 t = channel.receive() 884 885 # Persistent variants. 886 887 class PersistentExchange(Persistent, Exchange): 888 889 "An exchange which manages persistent communications." 890 891 pass 892 893 class PersistentQueue(Persistent, Queue): 894 895 "A queue which manages persistent communications." 896 897 pass 898 899 # Convenience functions. 900 901 def BackgroundQueue(address): 902 903 """ 904 Connect to a process reachable via the given 'address', making the results 905 of which accessible via a queue. 906 """ 907 908 queue = PersistentQueue(limit=1) 909 queue.connect(address) 910 return queue 911 912 def pmap(callable, sequence, limit=None): 913 914 """ 915 A parallel version of the built-in map function with an optional process 916 'limit'. The given 'callable' should not be parallel-aware (that is, have a 917 'channel' parameter) since it will be wrapped for parallel communications 918 before being invoked. 919 920 Return the processed 'sequence' where each element in the sequence is 921 processed by a different process. 922 """ 923 924 mymap = Map(limit=limit) 925 return mymap(callable, sequence) 926 927 # Utility functions. 928 929 _cpuinfo_fields = "processor", "physical id", "core id" 930 931 def _get_number_of_cores(): 932 933 """ 934 Return the number of distinct, genuine processor cores. If the platform is 935 not supported by this function, None is returned. 936 """ 937 938 try: 939 f = open("/proc/cpuinfo") 940 try: 941 processors = set() 942 943 # Use the _cpuinfo_field values as "digits" in a larger unique 944 # core identifier. 945 946 processor = [None, None, None] 947 948 for line in f.xreadlines(): 949 for i, field in enumerate(_cpuinfo_fields): 950 951 # Where the field is found, insert the value into the 952 # appropriate location in the processor identifier. 953 954 if line.startswith(field): 955 t = line.split(":") 956 processor[i] = int(t[1].strip()) 957 break 958 959 # Where a new processor description is started, record the 960 # identifier. 961 962 if line.startswith("processor") and processor[0] is not None: 963 processors.add(tuple(processor)) 964 processor = [None, None, None] 965 966 # At the end of reading the file, add any unrecorded processors. 967 968 if processor[0] is not None: 969 processors.add(tuple(processor)) 970 971 return len(processors) 972 973 finally: 974 f.close() 975 976 except OSError: 977 return None 978 979 def _get_number_of_cores_solaris(): 980 981 """ 982 Return the number of cores for OpenSolaris 2008.05 and possibly other 983 editions of Solaris. 984 """ 985 986 f = os.popen("psrinfo -p") 987 try: 988 return int(f.read().strip()) 989 finally: 990 f.close() 991 992 # Low-level functions. 993 994 def create_socketpair(): 995 996 """ 997 Create a new process, returning a communications channel to both the 998 creating process and the created process. 999 """ 1000 1001 parent, child = socket.socketpair() 1002 for s in [parent, child]: 1003 s.setblocking(1) 1004 1005 pid = os.fork() 1006 if pid == 0: 1007 parent.close() 1008 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 1009 else: 1010 child.close() 1011 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1012 1013 def create_pipes(): 1014 1015 """ 1016 Create a new process, returning a communications channel to both the 1017 creating process and the created process. 1018 1019 This function uses pipes instead of a socket pair, since some platforms 1020 seem to have problems with poll and such socket pairs. 1021 """ 1022 1023 pr, cw = os.pipe() 1024 cr, pw = os.pipe() 1025 1026 pid = os.fork() 1027 if pid == 0: 1028 os.close(pr) 1029 os.close(pw) 1030 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 1031 else: 1032 os.close(cr) 1033 os.close(cw) 1034 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 1035 1036 if platform.system() == "SunOS": 1037 create = create_pipes 1038 get_number_of_cores = _get_number_of_cores_solaris 1039 else: 1040 create = create_socketpair 1041 get_number_of_cores = _get_number_of_cores 1042 1043 def create_persistent(address): 1044 1045 """ 1046 Create a new process, returning a persistent communications channel between 1047 the creating process and the created process. This channel can be 1048 disconnected from the creating process and connected to another process, and 1049 thus can be used to collect results from daemon processes. 1050 1051 In order to be able to reconnect to created processes, the 'address' of the 1052 communications endpoint for the created process needs to be provided. This 1053 should be a filename. 1054 """ 1055 1056 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1057 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1058 child.bind(address) 1059 1060 for s in [parent, child]: 1061 s.setblocking(1) 1062 1063 pid = os.fork() 1064 if pid == 0: 1065 parent.close() 1066 return PersistentChannel(pid, child, address) 1067 else: 1068 child.close() 1069 #parent.connect(address) 1070 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1071 1072 def connect_persistent(address): 1073 1074 """ 1075 Connect via a persistent channel to an existing created process, reachable 1076 at the given 'address'. 1077 """ 1078 1079 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1080 parent.setblocking(1) 1081 parent.connect(address) 1082 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 1083 1084 def exit(channel): 1085 1086 """ 1087 Terminate a created process, closing the given 'channel'. 1088 """ 1089 1090 channel.close() 1091 os._exit(0) 1092 1093 def start(callable, *args, **kw): 1094 1095 """ 1096 Create a new process which shall start running in the given 'callable'. 1097 Additional arguments to the 'callable' can be given as additional arguments 1098 to this function. 1099 1100 Return a communications channel to the creating process. For the created 1101 process, supply a channel as the 'channel' parameter in the given 'callable' 1102 so that it may send data back to the creating process. 1103 """ 1104 1105 channel = create() 1106 if channel.pid == 0: 1107 try: 1108 try: 1109 callable(channel, *args, **kw) 1110 except: 1111 exc_type, exc_value, exc_traceback = sys.exc_info() 1112 channel.send(exc_value) 1113 finally: 1114 exit(channel) 1115 else: 1116 return channel 1117 1118 def start_persistent(address, callable, *args, **kw): 1119 1120 """ 1121 Create a new process which shall be reachable using the given 'address' and 1122 which will start running in the given 'callable'. Additional arguments to 1123 the 'callable' can be given as additional arguments to this function. 1124 1125 Return a communications channel to the creating process. For the created 1126 process, supply a channel as the 'channel' parameter in the given 'callable' 1127 so that it may send data back to the creating process. 1128 1129 Note that the created process employs a channel which is persistent: it can 1130 withstand disconnection from the creating process and subsequent connections 1131 from other processes. 1132 """ 1133 1134 channel = create_persistent(address) 1135 if channel.pid == 0: 1136 close_streams() 1137 try: 1138 try: 1139 callable(channel, *args, **kw) 1140 except: 1141 exc_type, exc_value, exc_traceback = sys.exc_info() 1142 channel.send(exc_value) 1143 finally: 1144 exit(channel) 1145 else: 1146 return channel 1147 1148 def close_streams(): 1149 1150 """ 1151 Close streams which keep the current process attached to any creating 1152 processes. 1153 """ 1154 1155 os.close(sys.stdin.fileno()) 1156 os.close(sys.stdout.fileno()) 1157 os.close(sys.stderr.fileno()) 1158 1159 def waitall(): 1160 1161 "Wait for all created processes to terminate." 1162 1163 try: 1164 while 1: 1165 os.wait() 1166 except OSError: 1167 pass 1168 1169 # vim: tabstop=4 expandtab shiftwidth=4