1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.5" 24 25 import os 26 import sys 27 import select 28 import socket 29 import platform 30 31 try: 32 import cPickle as pickle 33 except ImportError: 34 import pickle 35 36 try: 37 set 38 except NameError: 39 from sets import Set as set 40 41 # Special values. 42 43 class Undefined: pass 44 45 # Communications. 46 47 class AcknowledgementError(Exception): 48 pass 49 50 class Channel: 51 52 "A communications channel." 53 54 def __init__(self, pid, read_pipe, write_pipe): 55 56 """ 57 Initialise the channel with a process identifier 'pid', a 'read_pipe' 58 from which messages will be received, and a 'write_pipe' into which 59 messages will be sent. 60 """ 61 62 self.pid = pid 63 self.read_pipe = read_pipe 64 self.write_pipe = write_pipe 65 66 def __del__(self): 67 68 # Since signals don't work well with I/O, we close pipes and wait for 69 # created processes upon finalisation. 70 71 self.close() 72 73 def close(self): 74 75 "Explicitly close the channel." 76 77 if self.read_pipe is not None: 78 self.read_pipe.close() 79 self.read_pipe = None 80 if self.write_pipe is not None: 81 self.write_pipe.close() 82 self.write_pipe = None 83 #self.wait(os.WNOHANG) 84 85 def wait(self, options=0): 86 87 "Wait for the created process, if any, to exit." 88 89 if self.pid != 0: 90 try: 91 os.waitpid(self.pid, options) 92 except OSError: 93 pass 94 95 def _send(self, obj): 96 97 "Send the given object 'obj' through the channel." 98 99 pickle.dump(obj, self.write_pipe) 100 self.write_pipe.flush() 101 102 def send(self, obj): 103 104 """ 105 Send the given object 'obj' through the channel. Then wait for an 106 acknowledgement. (The acknowledgement makes the caller wait, thus 107 preventing processes from exiting and disrupting the communications 108 channel and losing data.) 109 """ 110 111 self._send(obj) 112 if self._receive() != "OK": 113 raise AcknowledgementError, obj 114 115 def _receive(self): 116 117 "Receive an object through the channel, returning the object." 118 119 obj = pickle.load(self.read_pipe) 120 if isinstance(obj, Exception): 121 raise obj 122 else: 123 return obj 124 125 def receive(self): 126 127 """ 128 Receive an object through the channel, returning the object. Send an 129 acknowledgement of receipt. (The acknowledgement makes the sender wait, 130 thus preventing processes from exiting and disrupting the communications 131 channel and losing data.) 132 """ 133 134 try: 135 obj = self._receive() 136 return obj 137 finally: 138 self._send("OK") 139 140 class PersistentChannel(Channel): 141 142 """ 143 A persistent communications channel which can handle peer disconnection, 144 acting as a server, meaning that this channel is associated with a specific 145 address which can be contacted by other processes. 146 """ 147 148 def __init__(self, pid, endpoint, address): 149 Channel.__init__(self, pid, None, None) 150 self.endpoint = endpoint 151 self.address = address 152 self.poller = select.poll() 153 154 # Listen for connections before this process is interested in 155 # communicating. It is not desirable to wait for connections at this 156 # point because this will block the process. 157 158 self.endpoint.listen(1) 159 160 def close(self): 161 162 "Close the persistent channel and remove the socket file." 163 164 Channel.close(self) 165 try: 166 os.unlink(self.address) 167 except OSError: 168 pass 169 170 def _ensure_pipes(self): 171 172 "Ensure that the channel is capable of communicating." 173 174 if self.read_pipe is None or self.write_pipe is None: 175 176 # Accept any incoming connections. 177 178 endpoint, address = self.endpoint.accept() 179 self.read_pipe = endpoint.makefile("r", 0) 180 self.write_pipe = endpoint.makefile("w", 0) 181 182 # Monitor the write pipe for error conditions. 183 184 fileno = self.write_pipe.fileno() 185 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 186 187 def _reset_pipes(self): 188 189 "Discard the existing connection." 190 191 fileno = self.write_pipe.fileno() 192 self.poller.unregister(fileno) 193 self.read_pipe = None 194 self.write_pipe = None 195 self.endpoint.listen(1) 196 197 def _ensure_communication(self, timeout=None): 198 199 "Ensure that sending and receiving are possible." 200 201 while 1: 202 self._ensure_pipes() 203 fileno = self.write_pipe.fileno() 204 fds = self.poller.poll(timeout) 205 for fd, status in fds: 206 if fd != fileno: 207 continue 208 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 209 210 # Broken connection: discard it and start all over again. 211 212 self._reset_pipes() 213 break 214 else: 215 return 216 217 def _send(self, obj): 218 219 "Send the given object 'obj' through the channel." 220 221 self._ensure_communication() 222 Channel._send(self, obj) 223 224 def _receive(self): 225 226 "Receive an object through the channel, returning the object." 227 228 self._ensure_communication() 229 return Channel._receive(self) 230 231 # Management of processes and communications. 232 233 class Exchange: 234 235 """ 236 A communications exchange that can be used to detect channels which are 237 ready to communicate. Subclasses of this class can define the 'store_data' 238 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 239 240 Once exchanges are populated with active channels, use of the principal 241 methods of the exchange typically cause the 'store' method to be invoked, 242 resulting in the processing of any incoming data. 243 """ 244 245 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): 246 247 """ 248 Initialise the exchange with an optional list of 'channels'. 249 250 If the optional 'limit' is specified, restrictions on the addition of 251 new channels can be enforced and observed through the 'add_wait', 'wait' 252 and 'finish' methods. To make use of these methods, create a subclass of 253 this class and define a working 'store_data' method. 254 255 If the optional 'reuse' parameter is set to a true value, channels and 256 processes will be reused for waiting computations, but the callable will 257 be invoked for each computation. 258 259 If the optional 'continuous' parameter is set to a true value, channels 260 and processes will be retained after receiving data sent from such 261 processes, since it will be assumed that they will communicate more 262 data. 263 264 If the optional 'autoclose' parameter is set to a false value, channels 265 will not be closed automatically when they are removed from the exchange 266 - by default they are closed when removed. 267 """ 268 269 self.limit = limit 270 self.reuse = reuse 271 self.autoclose = autoclose 272 self.continuous = continuous 273 274 self.waiting = [] 275 self.readables = {} 276 self.removed = [] 277 self.poller = select.poll() 278 279 for channel in channels or []: 280 self.add(channel) 281 282 # Core methods, registering and reporting on channels. 283 284 def add(self, channel): 285 286 "Add the given 'channel' to the exchange." 287 288 fileno = channel.read_pipe.fileno() 289 self.readables[fileno] = channel 290 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 291 292 def active(self): 293 294 "Return a list of active channels." 295 296 return self.readables.values() 297 298 def ready(self, timeout=None): 299 300 """ 301 Wait for a period of time specified by the optional 'timeout' in 302 milliseconds (or until communication is possible) and return a list of 303 channels which are ready to be read from. 304 """ 305 306 fds = self.poller.poll(timeout) 307 readables = [] 308 self.removed = [] 309 310 for fd, status in fds: 311 channel = self.readables[fd] 312 removed = 0 313 314 # Remove ended/error channels. 315 316 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 317 self.remove(channel) 318 self.removed.append(channel) 319 removed = 1 320 321 # Record readable channels. 322 323 if status & select.POLLIN: 324 if not (removed and self.autoclose): 325 readables.append(channel) 326 327 return readables 328 329 def remove(self, channel): 330 331 """ 332 Remove the given 'channel' from the exchange. 333 """ 334 335 fileno = channel.read_pipe.fileno() 336 del self.readables[fileno] 337 self.poller.unregister(fileno) 338 if self.autoclose: 339 channel.close() 340 channel.wait() 341 342 # Enhanced exchange methods involving channel limits. 343 344 def unfinished(self): 345 346 "Return whether the exchange still has work scheduled or in progress." 347 348 return self.active() or self.waiting 349 350 def busy(self): 351 352 "Return whether the exchange uses as many channels as it is allowed to." 353 354 return self.limit is not None and len(self.active()) >= self.limit 355 356 def add_wait(self, channel): 357 358 """ 359 Add the given 'channel' to the exchange, waiting if the limit on active 360 channels would be exceeded by adding the channel. 361 """ 362 363 self.wait() 364 self.add(channel) 365 366 def wait(self): 367 368 """ 369 Test for the limit on channels, blocking and reading incoming data until 370 the number of channels is below the limit. 371 """ 372 373 # If limited, block until channels have been closed. 374 375 while self.busy(): 376 self.store() 377 378 def finish(self): 379 380 """ 381 Finish the use of the exchange by waiting for all channels to complete. 382 """ 383 384 while self.unfinished(): 385 self.store() 386 387 def store(self, timeout=None): 388 389 """ 390 For each ready channel, process the incoming data. If the optional 391 'timeout' parameter (a duration in milliseconds) is specified, wait only 392 for the specified duration if no channels are ready to provide data. 393 """ 394 395 # Either process input from active channels. 396 397 if self.active(): 398 for channel in self.ready(timeout): 399 self.store_data(channel) 400 self.start_waiting(channel) 401 402 # Or schedule new processes and channels. 403 404 else: 405 while self.waiting and not self.busy(): 406 callable, args, kw = self.waiting.pop() 407 self.start(callable, *args, **kw) 408 409 def store_data(self, channel): 410 411 """ 412 Store incoming data from the specified 'channel'. In subclasses of this 413 class, such data could be stored using instance attributes. 414 """ 415 416 raise NotImplementedError, "store_data" 417 418 # Support for the convenience methods. 419 420 def _get_waiting(self, channel): 421 422 """ 423 Get waiting callable and argument information for new processes, given 424 the reception of data on the given 'channel'. 425 """ 426 427 # For continuous channels, no scheduling is requested. 428 429 if self.waiting and not self.continuous: 430 431 # Schedule this callable and arguments. 432 433 callable, args, kw = self.waiting.pop() 434 435 # Try and reuse existing channels if possible. 436 437 if self.reuse: 438 439 # Re-add the channel - this may update information related to 440 # the channel in subclasses. 441 442 self.add(channel) 443 channel.send((args, kw)) 444 445 else: 446 return callable, args, kw 447 448 # Where channels are being reused, but where no processes are waiting 449 # any more, send a special value to tell them to quit. 450 451 elif self.reuse: 452 channel.send(None) 453 454 return None 455 456 def _set_waiting(self, callable, args, kw): 457 458 """ 459 Support process creation by returning whether the given 'callable' has 460 been queued for later invocation. 461 """ 462 463 if self.busy(): 464 self.waiting.insert(0, (callable, args, kw)) 465 return 1 466 else: 467 return 0 468 469 def _get_channel_for_process(self, channel): 470 471 """ 472 Support process creation by returning the given 'channel' to the 473 creating process, and None to the created process. 474 """ 475 476 if channel.pid == 0: 477 return channel 478 else: 479 self.add_wait(channel) 480 return None 481 482 # Methods for overriding, related to the convenience methods. 483 484 def start_waiting(self, channel): 485 486 """ 487 Start a waiting process given the reception of data on the given 488 'channel'. 489 """ 490 491 details = self._get_waiting(channel) 492 if details is not None: 493 callable, args, kw = details 494 self.add(start(callable, *args, **kw)) 495 496 # Convenience methods. 497 498 def start(self, callable, *args, **kw): 499 500 """ 501 Create a new process for the given 'callable' using any additional 502 arguments provided. Then, monitor the channel created between this 503 process and the created process. 504 """ 505 506 if self._set_waiting(callable, args, kw): 507 return 508 509 self.add_wait(start(callable, *args, **kw)) 510 511 def create(self): 512 513 """ 514 Create a new process and return the created communications channel to 515 the created process. In the creating process, return None - the channel 516 receiving data from the created process will be automatically managed by 517 this exchange. 518 """ 519 520 channel = create() 521 return self._get_channel_for_process(channel) 522 523 def manage(self, callable): 524 525 """ 526 Wrap the given 'callable' in an object which can then be called in the 527 same way as 'callable', but with new processes and communications 528 managed automatically. 529 """ 530 531 return ManagedCallable(callable, self) 532 533 class Persistent: 534 535 """ 536 A mix-in class providing methods to exchanges for the management of 537 persistent communications. 538 """ 539 540 def start_waiting(self, channel): 541 542 """ 543 Start a waiting process given the reception of data on the given 544 'channel'. 545 """ 546 547 details = self._get_waiting(channel) 548 if details is not None: 549 callable, args, kw = details 550 self.add(start_persistent(channel.address, callable, *args, **kw)) 551 552 def start(self, address, callable, *args, **kw): 553 554 """ 555 Create a new process, located at the given 'address', for the given 556 'callable' using any additional arguments provided. Then, monitor the 557 channel created between this process and the created process. 558 """ 559 560 if self._set_waiting(callable, args, kw): 561 return 562 563 start_persistent(address, callable, *args, **kw) 564 565 def create(self, address): 566 567 """ 568 Create a new process, located at the given 'address', and return the 569 created communications channel to the created process. In the creating 570 process, return None - the channel receiving data from the created 571 process will be automatically managed by this exchange. 572 """ 573 574 channel = create_persistent(address) 575 return self._get_channel_for_process(channel) 576 577 def manage(self, address, callable): 578 579 """ 580 Using the given 'address', publish the given 'callable' in an object 581 which can then be called in the same way as 'callable', but with new 582 processes and communications managed automatically. 583 """ 584 585 return PersistentCallable(address, callable, self) 586 587 def connect(self, address): 588 589 "Connect to a process which is contactable via the given 'address'." 590 591 channel = connect_persistent(address) 592 self.add_wait(channel) 593 594 class ManagedCallable: 595 596 "A callable managed by an exchange." 597 598 def __init__(self, callable, exchange): 599 600 """ 601 Wrap the given 'callable', using the given 'exchange' to monitor the 602 channels created for communications between this and the created 603 processes. Note that the 'callable' must be parallel-aware (that is, 604 have a 'channel' parameter). Use the MakeParallel class to wrap other 605 kinds of callable objects. 606 """ 607 608 self.callable = callable 609 self.exchange = exchange 610 611 def __call__(self, *args, **kw): 612 613 "Invoke the callable with the supplied arguments." 614 615 self.exchange.start(self.callable, *args, **kw) 616 617 class PersistentCallable: 618 619 "A callable which sets up a persistent communications channel." 620 621 def __init__(self, address, callable, exchange): 622 623 """ 624 Using the given 'address', wrap the given 'callable', using the given 625 'exchange' to monitor the channels created for communications between 626 this and the created processes, so that when it is called, a background 627 process is started within which the 'callable' will run. Note that the 628 'callable' must be parallel-aware (that is, have a 'channel' parameter). 629 Use the MakeParallel class to wrap other kinds of callable objects. 630 """ 631 632 self.callable = callable 633 self.exchange = exchange 634 self.address = address 635 636 def __call__(self, *args, **kw): 637 638 "Invoke the callable with the supplied arguments." 639 640 self.exchange.start(self.address, self.callable, *args, **kw) 641 642 class BackgroundCallable: 643 644 """ 645 A callable which sets up a persistent communications channel, but is 646 unmanaged by an exchange. 647 """ 648 649 def __init__(self, address, callable): 650 651 """ 652 Using the given 'address', wrap the given 'callable'. This object can 653 then be invoked, but the wrapped callable will be run in a background 654 process. Note that the 'callable' must be parallel-aware (that is, have 655 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 656 of callable objects. 657 """ 658 659 self.callable = callable 660 self.address = address 661 662 def __call__(self, *args, **kw): 663 664 "Invoke the callable with the supplied arguments." 665 666 start_persistent(self.address, self.callable, *args, **kw) 667 668 # Abstractions and utilities. 669 670 class Map(Exchange): 671 672 "An exchange which can be used like the built-in 'map' function." 673 674 def __init__(self, *args, **kw): 675 Exchange.__init__(self, *args, **kw) 676 self.init() 677 678 def init(self): 679 680 "Remember the channel addition order to order output." 681 682 self.channel_number = 0 683 self.channels = {} 684 self.results = [] 685 self.current_index = 0 686 687 def add(self, channel): 688 689 "Add the given 'channel' to the exchange." 690 691 Exchange.add(self, channel) 692 self.channels[channel] = self.channel_number 693 self.channel_number += 1 694 695 def start(self, callable, *args, **kw): 696 697 """ 698 Create a new process for the given 'callable' using any additional 699 arguments provided. Then, monitor the channel created between this 700 process and the created process. 701 """ 702 703 self.results.append(Undefined) # placeholder 704 Exchange.start(self, callable, *args, **kw) 705 706 def create(self): 707 708 """ 709 Create a new process and return the created communications channel to 710 the created process. In the creating process, return None - the channel 711 receiving data from the created process will be automatically managed by 712 this exchange. 713 """ 714 715 self.results.append(Undefined) # placeholder 716 return Exchange.create(self) 717 718 def __call__(self, callable, sequence): 719 720 "Wrap and invoke 'callable' for each element in the 'sequence'." 721 722 if not isinstance(callable, MakeParallel): 723 wrapped = MakeParallel(callable) 724 else: 725 wrapped = callable 726 727 self.init() 728 729 # Start processes for each element in the sequence. 730 731 for i in sequence: 732 self.start(wrapped, i) 733 734 # Access to the results occurs through this object. 735 736 return self 737 738 def store_data(self, channel): 739 740 "Accumulate the incoming data, associating results with channels." 741 742 data = channel.receive() 743 self.results[self.channels[channel]] = data 744 del self.channels[channel] 745 746 def __iter__(self): 747 return self 748 749 def next(self): 750 751 "Return the next element in the map." 752 753 try: 754 return self._next() 755 except IndexError: 756 pass 757 758 while self.unfinished(): 759 self.store() 760 try: 761 return self._next() 762 except IndexError: 763 pass 764 else: 765 raise StopIteration 766 767 def __getitem__(self, i): 768 769 "Return element 'i' from the map." 770 771 try: 772 return self._get(i) 773 except IndexError: 774 pass 775 776 while self.unfinished(): 777 self.store() 778 try: 779 return self._get(i) 780 except IndexError: 781 pass 782 else: 783 raise IndexError, i 784 785 # Helper methods for the above access methods. 786 787 def _next(self): 788 result = self._get(self.current_index) 789 self.current_index += 1 790 return result 791 792 def _get(self, i): 793 result = self.results[i] 794 if result is Undefined or isinstance(i, slice) and Undefined in result: 795 raise IndexError, i 796 return result 797 798 class Queue(Exchange): 799 800 """ 801 An exchange acting as a queue, making data from created processes available 802 in the order in which it is received. 803 """ 804 805 def __init__(self, *args, **kw): 806 Exchange.__init__(self, *args, **kw) 807 self.queue = [] 808 809 def store_data(self, channel): 810 811 "Accumulate the incoming data, associating results with channels." 812 813 data = channel.receive() 814 self.queue.insert(0, data) 815 816 def __iter__(self): 817 return self 818 819 def next(self): 820 821 "Return the next element in the queue." 822 823 if self.queue: 824 return self.queue.pop() 825 826 while self.unfinished(): 827 self.store() 828 if self.queue: 829 return self.queue.pop() 830 else: 831 raise StopIteration 832 833 def __len__(self): 834 835 "Return the current length of the queue." 836 837 return len(self.queue) 838 839 class MakeParallel: 840 841 "A wrapper around functions making them able to communicate results." 842 843 def __init__(self, callable): 844 845 """ 846 Initialise the wrapper with the given 'callable'. This object will then 847 be able to accept a 'channel' parameter when invoked, and to forward the 848 result of the given 'callable' via the channel provided back to the 849 invoking process. 850 """ 851 852 self.callable = callable 853 854 def __call__(self, channel, *args, **kw): 855 856 "Invoke the callable and return its result via the given 'channel'." 857 858 channel.send(self.callable(*args, **kw)) 859 860 class MakeReusable(MakeParallel): 861 862 """ 863 A wrapper around functions making them able to communicate results in a 864 reusable fashion. 865 """ 866 867 def __call__(self, channel, *args, **kw): 868 869 "Invoke the callable and return its result via the given 'channel'." 870 871 channel.send(self.callable(*args, **kw)) 872 t = channel.receive() 873 while t is not None: 874 args, kw = t 875 channel.send(self.callable(*args, **kw)) 876 t = channel.receive() 877 878 # Persistent variants. 879 880 class PersistentExchange(Persistent, Exchange): 881 882 "An exchange which manages persistent communications." 883 884 pass 885 886 class PersistentQueue(Persistent, Queue): 887 888 "A queue which manages persistent communications." 889 890 pass 891 892 # Convenience functions. 893 894 def BackgroundQueue(address): 895 896 """ 897 Connect to a process reachable via the given 'address', making the results 898 of which accessible via a queue. 899 """ 900 901 queue = PersistentQueue(limit=1) 902 queue.connect(address) 903 return queue 904 905 def pmap(callable, sequence, limit=None): 906 907 """ 908 A parallel version of the built-in map function with an optional process 909 'limit'. The given 'callable' should not be parallel-aware (that is, have a 910 'channel' parameter) since it will be wrapped for parallel communications 911 before being invoked. 912 913 Return the processed 'sequence' where each element in the sequence is 914 processed by a different process. 915 """ 916 917 mymap = Map(limit=limit) 918 return mymap(callable, sequence) 919 920 # Utility functions. 921 922 _cpuinfo_fields = "processor", "physical id", "core id" 923 924 def _get_number_of_cores(): 925 926 """ 927 Return the number of distinct, genuine processor cores. If the platform is 928 not supported by this function, None is returned. 929 """ 930 931 try: 932 f = open("/proc/cpuinfo") 933 try: 934 processors = set() 935 936 # Use the _cpuinfo_field values as "digits" in a larger unique 937 # core identifier. 938 939 processor = [None, None, None] 940 941 for line in f.xreadlines(): 942 for i, field in enumerate(_cpuinfo_fields): 943 944 # Where the field is found, insert the value into the 945 # appropriate location in the processor identifier. 946 947 if line.startswith(field): 948 t = line.split(":") 949 processor[i] = int(t[1].strip()) 950 break 951 952 # Where a new processor description is started, record the 953 # identifier. 954 955 if line.startswith("processor") and processor[0] is not None: 956 processors.add(tuple(processor)) 957 processor = [None, None, None] 958 959 # At the end of reading the file, add any unrecorded processors. 960 961 if processor[0] is not None: 962 processors.add(tuple(processor)) 963 964 return len(processors) 965 966 finally: 967 f.close() 968 969 except OSError: 970 return None 971 972 def _get_number_of_cores_solaris(): 973 974 """ 975 Return the number of cores for OpenSolaris 2008.05 and possibly other 976 editions of Solaris. 977 """ 978 979 f = os.popen("psrinfo -p") 980 try: 981 return int(f.read().strip()) 982 finally: 983 f.close() 984 985 # Low-level functions. 986 987 def create_socketpair(): 988 989 """ 990 Create a new process, returning a communications channel to both the 991 creating process and the created process. 992 """ 993 994 parent, child = socket.socketpair() 995 for s in [parent, child]: 996 s.setblocking(1) 997 998 pid = os.fork() 999 if pid == 0: 1000 parent.close() 1001 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 1002 else: 1003 child.close() 1004 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1005 1006 def create_pipes(): 1007 1008 """ 1009 Create a new process, returning a communications channel to both the 1010 creating process and the created process. 1011 1012 This function uses pipes instead of a socket pair, since some platforms 1013 seem to have problems with poll and such socket pairs. 1014 """ 1015 1016 pr, cw = os.pipe() 1017 cr, pw = os.pipe() 1018 1019 pid = os.fork() 1020 if pid == 0: 1021 os.close(pr) 1022 os.close(pw) 1023 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 1024 else: 1025 os.close(cr) 1026 os.close(cw) 1027 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 1028 1029 if platform.system() == "SunOS": 1030 create = create_pipes 1031 get_number_of_cores = _get_number_of_cores_solaris 1032 else: 1033 create = create_socketpair 1034 get_number_of_cores = _get_number_of_cores 1035 1036 def create_persistent(address): 1037 1038 """ 1039 Create a new process, returning a persistent communications channel between 1040 the creating process and the created process. This channel can be 1041 disconnected from the creating process and connected to another process, and 1042 thus can be used to collect results from daemon processes. 1043 1044 In order to be able to reconnect to created processes, the 'address' of the 1045 communications endpoint for the created process needs to be provided. This 1046 should be a filename. 1047 """ 1048 1049 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1050 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1051 child.bind(address) 1052 1053 for s in [parent, child]: 1054 s.setblocking(1) 1055 1056 pid = os.fork() 1057 if pid == 0: 1058 parent.close() 1059 return PersistentChannel(pid, child, address) 1060 else: 1061 child.close() 1062 #parent.connect(address) 1063 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1064 1065 def connect_persistent(address): 1066 1067 """ 1068 Connect via a persistent channel to an existing created process, reachable 1069 at the given 'address'. 1070 """ 1071 1072 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1073 parent.setblocking(1) 1074 parent.connect(address) 1075 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 1076 1077 def exit(channel): 1078 1079 """ 1080 Terminate a created process, closing the given 'channel'. 1081 """ 1082 1083 channel.close() 1084 os._exit(0) 1085 1086 def start(callable, *args, **kw): 1087 1088 """ 1089 Create a new process which shall start running in the given 'callable'. 1090 Additional arguments to the 'callable' can be given as additional arguments 1091 to this function. 1092 1093 Return a communications channel to the creating process. For the created 1094 process, supply a channel as the 'channel' parameter in the given 'callable' 1095 so that it may send data back to the creating process. 1096 """ 1097 1098 channel = create() 1099 if channel.pid == 0: 1100 try: 1101 try: 1102 callable(channel, *args, **kw) 1103 except: 1104 exc_type, exc_value, exc_traceback = sys.exc_info() 1105 channel.send(exc_value) 1106 finally: 1107 exit(channel) 1108 else: 1109 return channel 1110 1111 def start_persistent(address, callable, *args, **kw): 1112 1113 """ 1114 Create a new process which shall be reachable using the given 'address' and 1115 which will start running in the given 'callable'. Additional arguments to 1116 the 'callable' can be given as additional arguments to this function. 1117 1118 Return a communications channel to the creating process. For the created 1119 process, supply a channel as the 'channel' parameter in the given 'callable' 1120 so that it may send data back to the creating process. 1121 1122 Note that the created process employs a channel which is persistent: it can 1123 withstand disconnection from the creating process and subsequent connections 1124 from other processes. 1125 """ 1126 1127 channel = create_persistent(address) 1128 if channel.pid == 0: 1129 close_streams() 1130 try: 1131 try: 1132 callable(channel, *args, **kw) 1133 except: 1134 exc_type, exc_value, exc_traceback = sys.exc_info() 1135 channel.send(exc_value) 1136 finally: 1137 exit(channel) 1138 else: 1139 return channel 1140 1141 def close_streams(): 1142 1143 """ 1144 Close streams which keep the current process attached to any creating 1145 processes. 1146 """ 1147 1148 os.close(sys.stdin.fileno()) 1149 os.close(sys.stdout.fileno()) 1150 os.close(sys.stderr.fileno()) 1151 1152 def waitall(): 1153 1154 "Wait for all created processes to terminate." 1155 1156 try: 1157 while 1: 1158 os.wait() 1159 except OSError: 1160 pass 1161 1162 # vim: tabstop=4 expandtab shiftwidth=4