1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013, 2014, 8 2016 Paul Boddie <paul@boddie.org.uk> 9 Copyright (C) 2013 Yaroslav Halchenko <debian@onerussian.com> 10 11 This program is free software; you can redistribute it and/or modify it under 12 the terms of the GNU Lesser General Public License as published by the Free 13 Software Foundation; either version 3 of the License, or (at your option) any 14 later version. 15 16 This program is distributed in the hope that it will be useful, but WITHOUT 17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 18 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 19 details. 20 21 You should have received a copy of the GNU Lesser General Public License along 22 with this program. If not, see <http://www.gnu.org/licenses/>. 23 """ 24 25 __version__ = "0.5.1" 26 27 import os 28 import sys 29 import select 30 import socket 31 import platform 32 import errno 33 34 from time import sleep 35 from warnings import warn 36 37 try: 38 import cPickle as pickle 39 except ImportError: 40 import pickle 41 42 try: 43 set 44 except NameError: 45 from sets import Set as set 46 47 # Special values. 48 49 class Undefined: pass 50 51 # Communications. 52 53 class AcknowledgementError(Exception): 54 pass 55 56 class Channel: 57 58 "A communications channel." 59 60 def __init__(self, pid, read_pipe, write_pipe): 61 62 """ 63 Initialise the channel with a process identifier 'pid', a 'read_pipe' 64 from which messages will be received, and a 'write_pipe' into which 65 messages will be sent. 66 """ 67 68 self.pid = pid 69 self.read_pipe = read_pipe 70 self.write_pipe = write_pipe 71 72 def __del__(self): 73 74 # Since signals don't work well with I/O, we close pipes and wait for 75 # created processes upon finalisation. 76 77 self.close() 78 79 def close(self): 80 81 "Explicitly close the channel." 82 83 if self.read_pipe is not None: 84 self.read_pipe.close() 85 self.read_pipe = None 86 if self.write_pipe is not None: 87 self.write_pipe.close() 88 self.write_pipe = None 89 #self.wait(os.WNOHANG) 90 91 def wait(self, options=0): 92 93 "Wait for the created process, if any, to exit." 94 95 if self.pid != 0: 96 try: 97 os.waitpid(self.pid, options) 98 except OSError: 99 pass 100 101 def _send(self, obj): 102 103 "Send the given object 'obj' through the channel." 104 105 pickle.dump(obj, self.write_pipe) 106 self.write_pipe.flush() 107 108 def send(self, obj): 109 110 """ 111 Send the given object 'obj' through the channel. Then wait for an 112 acknowledgement. (The acknowledgement makes the caller wait, thus 113 preventing processes from exiting and disrupting the communications 114 channel and losing data.) 115 """ 116 117 self._send(obj) 118 if self._receive() != "OK": 119 raise AcknowledgementError, obj 120 121 def _receive(self): 122 123 "Receive an object through the channel, returning the object." 124 125 obj = pickle.load(self.read_pipe) 126 if isinstance(obj, Exception): 127 raise obj 128 else: 129 return obj 130 131 def receive(self): 132 133 """ 134 Receive an object through the channel, returning the object. Send an 135 acknowledgement of receipt. (The acknowledgement makes the sender wait, 136 thus preventing processes from exiting and disrupting the communications 137 channel and losing data.) 138 """ 139 140 try: 141 obj = self._receive() 142 return obj 143 finally: 144 self._send("OK") 145 146 class PersistentChannel(Channel): 147 148 """ 149 A persistent communications channel which can handle peer disconnection, 150 acting as a server, meaning that this channel is associated with a specific 151 address which can be contacted by other processes. 152 """ 153 154 def __init__(self, pid, endpoint, address): 155 Channel.__init__(self, pid, None, None) 156 self.endpoint = endpoint 157 self.address = address 158 self.poller = select.poll() 159 160 # Listen for connections before this process is interested in 161 # communicating. It is not desirable to wait for connections at this 162 # point because this will block the process. 163 164 self.endpoint.listen(1) 165 166 def close(self): 167 168 "Close the persistent channel and remove the socket file." 169 170 Channel.close(self) 171 try: 172 os.unlink(self.address) 173 except OSError: 174 pass 175 176 def _ensure_pipes(self): 177 178 "Ensure that the channel is capable of communicating." 179 180 if self.read_pipe is None or self.write_pipe is None: 181 182 # Accept any incoming connections. 183 184 endpoint, address = self.endpoint.accept() 185 self.read_pipe = endpoint.makefile("r", 0) 186 self.write_pipe = endpoint.makefile("w", 0) 187 188 # Monitor the write pipe for error conditions. 189 190 fileno = self.write_pipe.fileno() 191 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 192 193 def _reset_pipes(self): 194 195 "Discard the existing connection." 196 197 fileno = self.write_pipe.fileno() 198 self.poller.unregister(fileno) 199 self.read_pipe = None 200 self.write_pipe = None 201 self.endpoint.listen(1) 202 203 def _ensure_communication(self, timeout=None): 204 205 "Ensure that sending and receiving are possible." 206 207 while 1: 208 self._ensure_pipes() 209 fileno = self.write_pipe.fileno() 210 fds = self.poller.poll(timeout) 211 for fd, status in fds: 212 if fd != fileno: 213 continue 214 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 215 216 # Broken connection: discard it and start all over again. 217 218 self._reset_pipes() 219 break 220 else: 221 return 222 223 def _send(self, obj): 224 225 "Send the given object 'obj' through the channel." 226 227 self._ensure_communication() 228 Channel._send(self, obj) 229 230 def _receive(self): 231 232 "Receive an object through the channel, returning the object." 233 234 self._ensure_communication() 235 return Channel._receive(self) 236 237 # Management of processes and communications. 238 239 class Exchange: 240 241 """ 242 A communications exchange that can be used to detect channels which are 243 ready to communicate. Subclasses of this class can define the 'store_data' 244 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 245 246 Once exchanges are populated with active channels, use of the principal 247 methods of the exchange typically cause the 'store' method to be invoked, 248 resulting in the processing of any incoming data. 249 """ 250 251 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): 252 253 """ 254 Initialise the exchange with an optional list of 'channels'. 255 256 If the optional 'limit' is specified, restrictions on the addition of 257 new channels can be enforced and observed through the 'add_wait', 'wait' 258 and 'finish' methods. To make use of these methods, create a subclass of 259 this class and define a working 'store_data' method. 260 261 If the optional 'reuse' parameter is set to a true value, channels and 262 processes will be reused for waiting computations, but the callable will 263 be invoked for each computation. 264 265 If the optional 'continuous' parameter is set to a true value, channels 266 and processes will be retained after receiving data sent from such 267 processes, since it will be assumed that they will communicate more 268 data. 269 270 If the optional 'autoclose' parameter is set to a false value, channels 271 will not be closed automatically when they are removed from the exchange 272 - by default they are closed when removed. 273 """ 274 275 self.limit = limit 276 self.reuse = reuse 277 self.autoclose = autoclose 278 self.continuous = continuous 279 280 self.waiting = [] 281 self.readables = {} 282 self.removed = [] 283 self.poller = select.poll() 284 285 for channel in channels or []: 286 self.add(channel) 287 288 # Core methods, registering and reporting on channels. 289 290 def add(self, channel): 291 292 "Add the given 'channel' to the exchange." 293 294 fileno = channel.read_pipe.fileno() 295 self.readables[fileno] = channel 296 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 297 298 def active(self): 299 300 "Return a list of active channels." 301 302 return self.readables.values() 303 304 def ready(self, timeout=None): 305 306 """ 307 Wait for a period of time specified by the optional 'timeout' in 308 milliseconds (or until communication is possible) and return a list of 309 channels which are ready to be read from. 310 """ 311 312 fds = self.poller.poll(timeout) 313 readables = [] 314 self.removed = [] 315 316 for fd, status in fds: 317 channel = self.readables[fd] 318 removed = 0 319 320 # Remove ended/error channels. 321 322 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 323 self.remove(channel) 324 self.removed.append(channel) 325 removed = 1 326 327 # Record readable channels. 328 329 if status & select.POLLIN: 330 if not (removed and self.autoclose): 331 readables.append(channel) 332 333 return readables 334 335 def remove(self, channel): 336 337 """ 338 Remove the given 'channel' from the exchange. 339 """ 340 341 fileno = channel.read_pipe.fileno() 342 del self.readables[fileno] 343 self.poller.unregister(fileno) 344 if self.autoclose: 345 channel.close() 346 channel.wait() 347 348 # Enhanced exchange methods involving channel limits. 349 350 def unfinished(self): 351 352 "Return whether the exchange still has work scheduled or in progress." 353 354 return self.active() or self.waiting 355 356 def busy(self): 357 358 "Return whether the exchange uses as many channels as it is allowed to." 359 360 return self.limit is not None and len(self.active()) >= self.limit 361 362 def add_wait(self, channel): 363 364 """ 365 Add the given 'channel' to the exchange, waiting if the limit on active 366 channels would be exceeded by adding the channel. 367 """ 368 369 self.wait() 370 self.add(channel) 371 372 def wait(self): 373 374 """ 375 Test for the limit on channels, blocking and reading incoming data until 376 the number of channels is below the limit. 377 """ 378 379 # If limited, block until channels have been closed. 380 381 while self.busy(): 382 self.store() 383 384 def finish(self): 385 386 """ 387 Finish the use of the exchange by waiting for all channels to complete. 388 """ 389 390 while self.unfinished(): 391 self.store() 392 393 def store(self, timeout=None): 394 395 """ 396 For each ready channel, process the incoming data. If the optional 397 'timeout' parameter (a duration in milliseconds) is specified, wait only 398 for the specified duration if no channels are ready to provide data. 399 """ 400 401 # Either process input from active channels. 402 403 if self.active(): 404 for channel in self.ready(timeout): 405 try: 406 self.store_data(channel) 407 self.start_waiting(channel) 408 except (IOError, OSError), exc: 409 self.remove(channel) 410 warn("Removed channel %r due to exception: %s" % (channel, exc)) 411 412 # Or schedule new processes and channels. 413 414 else: 415 while self.waiting and not self.busy(): 416 details = self.waiting.pop() 417 418 # Stop actively scheduling if resources are exhausted. 419 420 if not self.start_new_waiting(details): 421 if not self.active(): 422 sleep(1) 423 break 424 425 def store_data(self, channel): 426 427 """ 428 Store incoming data from the specified 'channel'. In subclasses of this 429 class, such data could be stored using instance attributes. 430 """ 431 432 raise NotImplementedError, "store_data" 433 434 # Support for the convenience methods. 435 436 def _get_waiting(self, channel): 437 438 """ 439 Get waiting callable and argument information for new processes, given 440 the reception of data on the given 'channel'. 441 """ 442 443 # For continuous channels, no scheduling is requested. 444 445 if self.waiting and not self.continuous: 446 447 # Schedule this callable and arguments. 448 449 callable, args, kw = self.waiting.pop() 450 451 # Try and reuse existing channels if possible. 452 453 if self.reuse: 454 455 # Re-add the channel - this may update information related to 456 # the channel in subclasses. 457 458 self.add(channel) 459 channel.send((args, kw)) 460 461 # Return the details for a new channel. 462 463 else: 464 return callable, args, kw 465 466 # Where channels are being reused, but where no processes are waiting 467 # any more, send a special value to tell them to quit. 468 469 elif self.reuse: 470 channel.send(None) 471 472 return None 473 474 def _set_waiting(self, callable, args, kw): 475 476 """ 477 Support process creation by returning whether the given 'callable' has 478 been queued for later invocation. 479 """ 480 481 if self.busy(): 482 self.waiting.insert(0, (callable, args, kw)) 483 return 1 484 else: 485 return 0 486 487 def _get_channel_for_process(self, channel): 488 489 """ 490 Support process creation by returning the given 'channel' to the 491 creating process, and None to the created process. 492 """ 493 494 if channel.pid == 0: 495 return channel 496 else: 497 self.add_wait(channel) 498 return None 499 500 # Methods for overriding, related to the convenience methods. 501 502 def start_waiting(self, channel): 503 504 """ 505 Start a waiting process given the reception of data on the given 506 'channel'. 507 """ 508 509 details = self._get_waiting(channel) 510 511 if details is not None: 512 self.start_new_waiting(details) 513 514 def start_new_waiting(self, details): 515 516 """ 517 Start a waiting process with the given 'details', obtaining a new 518 channel. 519 """ 520 521 callable, args, kw = details 522 channel = self._start(callable, *args, **kw) 523 524 # Monitor any newly-created process. 525 526 if channel: 527 self.add(channel) 528 return True 529 530 # Push the details back onto the end of the waiting list. 531 532 else: 533 self.waiting.append(details) 534 return False 535 536 # Convenience methods. 537 538 def start(self, callable, *args, **kw): 539 540 """ 541 Create a new process for the given 'callable' using any additional 542 arguments provided. Then, monitor the channel created between this 543 process and the created process. 544 """ 545 546 if self._set_waiting(callable, args, kw): 547 return False 548 549 channel = self._start(callable, *args, **kw) 550 551 # Monitor any newly-created process. 552 553 if channel: 554 self.add_wait(channel) 555 return True 556 557 # Otherwise, add the details to the waiting list unconditionally. 558 559 else: 560 self.waiting.insert(0, (callable, args, kw)) 561 return False 562 563 def create(self): 564 565 """ 566 Create a new process and return the created communications channel to 567 the created process. In the creating process, return None - the channel 568 receiving data from the created process will be automatically managed by 569 this exchange. 570 """ 571 572 channel = create() 573 return self._get_channel_for_process(channel) 574 575 def manage(self, callable): 576 577 """ 578 Wrap the given 'callable' in an object which can then be called in the 579 same way as 'callable', but with new processes and communications 580 managed automatically. 581 """ 582 583 return ManagedCallable(callable, self) 584 585 def _start(self, callable, *args, **kw): 586 587 """ 588 Create a new process for the given 'callable' using any additional 589 arguments provided. Return any successfully created channel or None if 590 no process could be created at the present time. 591 """ 592 593 try: 594 return start(callable, *args, **kw) 595 except OSError, exc: 596 if exc.errno != errno.EAGAIN: 597 raise 598 else: 599 return None 600 601 class Persistent: 602 603 """ 604 A mix-in class providing methods to exchanges for the management of 605 persistent communications. 606 """ 607 608 def start_waiting(self, channel): 609 610 """ 611 Start a waiting process given the reception of data on the given 612 'channel'. 613 """ 614 615 details = self._get_waiting(channel) 616 if details is not None: 617 callable, args, kw = details 618 self.add(start_persistent(channel.address, callable, *args, **kw)) 619 620 def start(self, address, callable, *args, **kw): 621 622 """ 623 Create a new process, located at the given 'address', for the given 624 'callable' using any additional arguments provided. Then, monitor the 625 channel created between this process and the created process. 626 """ 627 628 if self._set_waiting(callable, args, kw): 629 return 630 631 start_persistent(address, callable, *args, **kw) 632 633 def create(self, address): 634 635 """ 636 Create a new process, located at the given 'address', and return the 637 created communications channel to the created process. In the creating 638 process, return None - the channel receiving data from the created 639 process will be automatically managed by this exchange. 640 """ 641 642 channel = create_persistent(address) 643 return self._get_channel_for_process(channel) 644 645 def manage(self, address, callable): 646 647 """ 648 Using the given 'address', publish the given 'callable' in an object 649 which can then be called in the same way as 'callable', but with new 650 processes and communications managed automatically. 651 """ 652 653 return PersistentCallable(address, callable, self) 654 655 def connect(self, address): 656 657 "Connect to a process which is contactable via the given 'address'." 658 659 channel = connect_persistent(address) 660 self.add_wait(channel) 661 662 class ManagedCallable: 663 664 "A callable managed by an exchange." 665 666 def __init__(self, callable, exchange): 667 668 """ 669 Wrap the given 'callable', using the given 'exchange' to monitor the 670 channels created for communications between this and the created 671 processes. Note that the 'callable' must be parallel-aware (that is, 672 have a 'channel' parameter). Use the MakeParallel class to wrap other 673 kinds of callable objects. 674 """ 675 676 self.callable = callable 677 self.exchange = exchange 678 679 def __call__(self, *args, **kw): 680 681 "Invoke the callable with the supplied arguments." 682 683 self.exchange.start(self.callable, *args, **kw) 684 685 class PersistentCallable: 686 687 "A callable which sets up a persistent communications channel." 688 689 def __init__(self, address, callable, exchange): 690 691 """ 692 Using the given 'address', wrap the given 'callable', using the given 693 'exchange' to monitor the channels created for communications between 694 this and the created processes, so that when it is called, a background 695 process is started within which the 'callable' will run. Note that the 696 'callable' must be parallel-aware (that is, have a 'channel' parameter). 697 Use the MakeParallel class to wrap other kinds of callable objects. 698 """ 699 700 self.callable = callable 701 self.exchange = exchange 702 self.address = address 703 704 def __call__(self, *args, **kw): 705 706 "Invoke the callable with the supplied arguments." 707 708 self.exchange.start(self.address, self.callable, *args, **kw) 709 710 class BackgroundCallable: 711 712 """ 713 A callable which sets up a persistent communications channel, but is 714 unmanaged by an exchange. 715 """ 716 717 def __init__(self, address, callable): 718 719 """ 720 Using the given 'address', wrap the given 'callable'. This object can 721 then be invoked, but the wrapped callable will be run in a background 722 process. Note that the 'callable' must be parallel-aware (that is, have 723 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 724 of callable objects. 725 """ 726 727 self.callable = callable 728 self.address = address 729 730 def __call__(self, *args, **kw): 731 732 "Invoke the callable with the supplied arguments." 733 734 start_persistent(self.address, self.callable, *args, **kw) 735 736 # Abstractions and utilities. 737 738 class Map(Exchange): 739 740 "An exchange which can be used like the built-in 'map' function." 741 742 def __init__(self, *args, **kw): 743 Exchange.__init__(self, *args, **kw) 744 self.init() 745 746 def init(self): 747 748 "Remember the channel addition order to order output." 749 750 self.channel_number = 0 751 self.channels = {} 752 self.results = [] 753 self.current_index = 0 754 755 def add(self, channel): 756 757 "Add the given 'channel' to the exchange." 758 759 Exchange.add(self, channel) 760 self.channels[channel] = self.channel_number 761 self.channel_number += 1 762 763 def start(self, callable, *args, **kw): 764 765 """ 766 Create a new process for the given 'callable' using any additional 767 arguments provided. Then, monitor the channel created between this 768 process and the created process. 769 """ 770 771 self.results.append(Undefined) # placeholder 772 Exchange.start(self, callable, *args, **kw) 773 774 def create(self): 775 776 """ 777 Create a new process and return the created communications channel to 778 the created process. In the creating process, return None - the channel 779 receiving data from the created process will be automatically managed by 780 this exchange. 781 """ 782 783 self.results.append(Undefined) # placeholder 784 return Exchange.create(self) 785 786 def __call__(self, callable, sequence): 787 788 "Wrap and invoke 'callable' for each element in the 'sequence'." 789 790 if not isinstance(callable, MakeParallel): 791 wrapped = MakeParallel(callable) 792 else: 793 wrapped = callable 794 795 self.init() 796 797 # Start processes for each element in the sequence. 798 799 for i in sequence: 800 self.start(wrapped, i) 801 802 # Access to the results occurs through this object. 803 804 return self 805 806 def store_data(self, channel): 807 808 "Accumulate the incoming data, associating results with channels." 809 810 data = channel.receive() 811 self.results[self.channels[channel]] = data 812 del self.channels[channel] 813 814 def __iter__(self): 815 return self 816 817 def next(self): 818 819 "Return the next element in the map." 820 821 try: 822 return self._next() 823 except IndexError: 824 pass 825 826 while self.unfinished(): 827 self.store() 828 try: 829 return self._next() 830 except IndexError: 831 pass 832 else: 833 raise StopIteration 834 835 def __getitem__(self, i): 836 837 "Return element 'i' from the map." 838 839 try: 840 return self._get(i) 841 except IndexError: 842 pass 843 844 while self.unfinished(): 845 self.store() 846 try: 847 return self._get(i) 848 except IndexError: 849 pass 850 else: 851 raise IndexError, i 852 853 # Helper methods for the above access methods. 854 855 def _next(self): 856 result = self._get(self.current_index) 857 self.current_index += 1 858 return result 859 860 def _get(self, i): 861 result = self.results[i] 862 if result is Undefined or isinstance(i, slice) and Undefined in result: 863 raise IndexError, i 864 return result 865 866 class Queue(Exchange): 867 868 """ 869 An exchange acting as a queue, making data from created processes available 870 in the order in which it is received. 871 """ 872 873 def __init__(self, *args, **kw): 874 Exchange.__init__(self, *args, **kw) 875 self.queue = [] 876 877 def store_data(self, channel): 878 879 "Accumulate the incoming data, associating results with channels." 880 881 data = channel.receive() 882 self.queue.insert(0, data) 883 884 def __iter__(self): 885 return self 886 887 def next(self): 888 889 "Return the next element in the queue." 890 891 if self.queue: 892 return self.queue.pop() 893 894 while self.unfinished(): 895 self.store() 896 if self.queue: 897 return self.queue.pop() 898 else: 899 raise StopIteration 900 901 def __len__(self): 902 903 "Return the current length of the queue." 904 905 return len(self.queue) 906 907 class MakeParallel: 908 909 "A wrapper around functions making them able to communicate results." 910 911 def __init__(self, callable): 912 913 """ 914 Initialise the wrapper with the given 'callable'. This object will then 915 be able to accept a 'channel' parameter when invoked, and to forward the 916 result of the given 'callable' via the channel provided back to the 917 invoking process. 918 """ 919 920 self.callable = callable 921 922 def __call__(self, channel, *args, **kw): 923 924 "Invoke the callable and return its result via the given 'channel'." 925 926 channel.send(self.callable(*args, **kw)) 927 928 class MakeReusable(MakeParallel): 929 930 """ 931 A wrapper around functions making them able to communicate results in a 932 reusable fashion. 933 """ 934 935 def __call__(self, channel, *args, **kw): 936 937 "Invoke the callable and return its result via the given 'channel'." 938 939 channel.send(self.callable(*args, **kw)) 940 t = channel.receive() 941 while t is not None: 942 args, kw = t 943 channel.send(self.callable(*args, **kw)) 944 t = channel.receive() 945 946 # Persistent variants. 947 948 class PersistentExchange(Persistent, Exchange): 949 950 "An exchange which manages persistent communications." 951 952 pass 953 954 class PersistentQueue(Persistent, Queue): 955 956 "A queue which manages persistent communications." 957 958 pass 959 960 # Convenience functions. 961 962 def BackgroundQueue(address): 963 964 """ 965 Connect to a process reachable via the given 'address', making the results 966 of which accessible via a queue. 967 """ 968 969 queue = PersistentQueue(limit=1) 970 queue.connect(address) 971 return queue 972 973 def pmap(callable, sequence, limit=None): 974 975 """ 976 A parallel version of the built-in map function with an optional process 977 'limit'. The given 'callable' should not be parallel-aware (that is, have a 978 'channel' parameter) since it will be wrapped for parallel communications 979 before being invoked. 980 981 Return the processed 'sequence' where each element in the sequence is 982 processed by a different process. 983 """ 984 985 mymap = Map(limit=limit) 986 return mymap(callable, sequence) 987 988 # Utility functions. 989 990 _cpuinfo_fields = "processor", "physical id", "core id" 991 992 def _get_number_of_cores(): 993 994 """ 995 Return the number of distinct, genuine processor cores. If the platform is 996 not supported by this function, None is returned. 997 """ 998 999 try: 1000 f = open("/proc/cpuinfo") 1001 try: 1002 processors = set() 1003 1004 # Use the _cpuinfo_field values as "digits" in a larger unique 1005 # core identifier. 1006 1007 processor = [None, None, None] 1008 1009 for line in f.xreadlines(): 1010 for i, field in enumerate(_cpuinfo_fields): 1011 1012 # Where the field is found, insert the value into the 1013 # appropriate location in the processor identifier. 1014 1015 if line.startswith(field): 1016 t = line.split(":") 1017 processor[i] = int(t[1].strip()) 1018 break 1019 1020 # Where a new processor description is started, record the 1021 # identifier. 1022 1023 if line.startswith("processor") and processor[0] is not None: 1024 processors.add(tuple(processor)) 1025 processor = [None, None, None] 1026 1027 # At the end of reading the file, add any unrecorded processors. 1028 1029 if processor[0] is not None: 1030 processors.add(tuple(processor)) 1031 1032 return len(processors) 1033 1034 finally: 1035 f.close() 1036 1037 except OSError: 1038 return None 1039 1040 def _get_number_of_cores_solaris(): 1041 1042 """ 1043 Return the number of cores for OpenSolaris 2008.05 and possibly other 1044 editions of Solaris. 1045 """ 1046 1047 f = os.popen("psrinfo -p") 1048 try: 1049 return int(f.read().strip()) 1050 finally: 1051 f.close() 1052 1053 _system_profiler_field = "Total Number of Cores:" 1054 1055 def _get_number_of_cores_macosx(): 1056 1057 "Return the number of cores for Mac OS X." 1058 1059 f = os.popen("/usr/sbin/system_profiler -detailLevel full SPHardwareDataType") 1060 try: 1061 for line in f.xreadlines(): 1062 line = line.strip() 1063 if line.startswith(_system_profiler_field): 1064 return int(line[len(_system_profiler_field):].strip()) 1065 1066 return None 1067 1068 finally: 1069 f.close() 1070 1071 # Low-level functions. 1072 1073 def create_socketpair(): 1074 1075 """ 1076 Create a new process, returning a communications channel to both the 1077 creating process and the created process. 1078 """ 1079 1080 parent, child = socket.socketpair() 1081 for s in [parent, child]: 1082 s.setblocking(1) 1083 1084 pid = os.fork() 1085 if pid == 0: 1086 parent.close() 1087 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 1088 else: 1089 child.close() 1090 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1091 1092 def create_pipes(): 1093 1094 """ 1095 Create a new process, returning a communications channel to both the 1096 creating process and the created process. 1097 1098 This function uses pipes instead of a socket pair, since some platforms 1099 seem to have problems with poll and such socket pairs. 1100 """ 1101 1102 pr, cw = os.pipe() 1103 cr, pw = os.pipe() 1104 1105 pid = os.fork() 1106 if pid == 0: 1107 os.close(pr) 1108 os.close(pw) 1109 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 1110 else: 1111 os.close(cr) 1112 os.close(cw) 1113 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0)) 1114 1115 # Configure the interprocess communications and core-counting functions. 1116 1117 if platform.system() == "SunOS": 1118 create = create_pipes 1119 get_number_of_cores = _get_number_of_cores_solaris 1120 elif platform.system() == "Darwin": 1121 create = create_socketpair 1122 get_number_of_cores = _get_number_of_cores_macosx 1123 else: 1124 create = create_socketpair 1125 get_number_of_cores = _get_number_of_cores 1126 1127 def create_persistent(address): 1128 1129 """ 1130 Create a new process, returning a persistent communications channel between 1131 the creating process and the created process. This channel can be 1132 disconnected from the creating process and connected to another process, and 1133 thus can be used to collect results from daemon processes. 1134 1135 In order to be able to reconnect to created processes, the 'address' of the 1136 communications endpoint for the created process needs to be provided. This 1137 should be a filename. 1138 """ 1139 1140 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1141 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1142 child.bind(address) 1143 1144 for s in [parent, child]: 1145 s.setblocking(1) 1146 1147 pid = os.fork() 1148 if pid == 0: 1149 parent.close() 1150 return PersistentChannel(pid, child, address) 1151 else: 1152 child.close() 1153 #parent.connect(address) 1154 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 1155 1156 def connect_persistent(address): 1157 1158 """ 1159 Connect via a persistent channel to an existing created process, reachable 1160 at the given 'address'. 1161 """ 1162 1163 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1164 parent.setblocking(1) 1165 parent.connect(address) 1166 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 1167 1168 def exit(channel): 1169 1170 """ 1171 Terminate a created process, closing the given 'channel'. 1172 """ 1173 1174 channel.close() 1175 os._exit(0) 1176 1177 def start(callable, *args, **kw): 1178 1179 """ 1180 Create a new process which shall start running in the given 'callable'. 1181 Additional arguments to the 'callable' can be given as additional arguments 1182 to this function. 1183 1184 Return a communications channel to the creating process. For the created 1185 process, supply a channel as the 'channel' parameter in the given 'callable' 1186 so that it may send data back to the creating process. 1187 """ 1188 1189 channel = create() 1190 if channel.pid == 0: 1191 try: 1192 try: 1193 callable(channel, *args, **kw) 1194 except: 1195 exc_type, exc_value, exc_traceback = sys.exc_info() 1196 channel.send(exc_value) 1197 finally: 1198 exit(channel) 1199 else: 1200 return channel 1201 1202 def start_persistent(address, callable, *args, **kw): 1203 1204 """ 1205 Create a new process which shall be reachable using the given 'address' and 1206 which will start running in the given 'callable'. Additional arguments to 1207 the 'callable' can be given as additional arguments to this function. 1208 1209 Return a communications channel to the creating process. For the created 1210 process, supply a channel as the 'channel' parameter in the given 'callable' 1211 so that it may send data back to the creating process. 1212 1213 Note that the created process employs a channel which is persistent: it can 1214 withstand disconnection from the creating process and subsequent connections 1215 from other processes. 1216 """ 1217 1218 channel = create_persistent(address) 1219 if channel.pid == 0: 1220 close_streams() 1221 try: 1222 try: 1223 callable(channel, *args, **kw) 1224 except: 1225 exc_type, exc_value, exc_traceback = sys.exc_info() 1226 channel.send(exc_value) 1227 finally: 1228 exit(channel) 1229 else: 1230 return channel 1231 1232 def close_streams(): 1233 1234 """ 1235 Close streams which keep the current process attached to any creating 1236 processes. 1237 """ 1238 1239 os.close(sys.stdin.fileno()) 1240 os.close(sys.stdout.fileno()) 1241 os.close(sys.stderr.fileno()) 1242 1243 def waitall(): 1244 1245 "Wait for all created processes to terminate." 1246 1247 try: 1248 while 1: 1249 os.wait() 1250 except OSError: 1251 pass 1252 1253 # vim: tabstop=4 expandtab shiftwidth=4