1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 """ 22 23 __version__ = "0.4" 24 25 import os 26 import sys 27 import select 28 import socket 29 30 try: 31 import cPickle as pickle 32 except ImportError: 33 import pickle 34 35 try: 36 set 37 except NameError: 38 from sets import Set as set 39 40 # Communications. 41 42 class AcknowledgementError(Exception): 43 pass 44 45 class Channel: 46 47 "A communications channel." 48 49 def __init__(self, pid, read_pipe, write_pipe): 50 51 """ 52 Initialise the channel with a process identifier 'pid', a 'read_pipe' 53 from which messages will be received, and a 'write_pipe' into which 54 messages will be sent. 55 """ 56 57 self.pid = pid 58 self.read_pipe = read_pipe 59 self.write_pipe = write_pipe 60 61 def __del__(self): 62 63 # Since signals don't work well with I/O, we close pipes and wait for 64 # created processes upon finalisation. 65 66 self.close() 67 68 def close(self): 69 70 "Explicitly close the channel." 71 72 if self.read_pipe is not None: 73 self.read_pipe.close() 74 self.read_pipe = None 75 if self.write_pipe is not None: 76 self.write_pipe.close() 77 self.write_pipe = None 78 #self.wait(os.WNOHANG) 79 80 def wait(self, options=0): 81 82 "Wait for the created process, if any, to exit." 83 84 if self.pid != 0: 85 try: 86 os.waitpid(self.pid, options) 87 except OSError: 88 pass 89 90 def _send(self, obj): 91 92 "Send the given object 'obj' through the channel." 93 94 pickle.dump(obj, self.write_pipe) 95 self.write_pipe.flush() 96 97 def send(self, obj): 98 99 """ 100 Send the given object 'obj' through the channel. Then wait for an 101 acknowledgement. (The acknowledgement makes the caller wait, thus 102 preventing processes from exiting and disrupting the communications 103 channel and losing data.) 104 """ 105 106 self._send(obj) 107 if self._receive() != "OK": 108 raise AcknowledgementError, obj 109 110 def _receive(self): 111 112 "Receive an object through the channel, returning the object." 113 114 obj = pickle.load(self.read_pipe) 115 if isinstance(obj, Exception): 116 raise obj 117 else: 118 return obj 119 120 def receive(self): 121 122 """ 123 Receive an object through the channel, returning the object. Send an 124 acknowledgement of receipt. (The acknowledgement makes the sender wait, 125 thus preventing processes from exiting and disrupting the communications 126 channel and losing data.) 127 """ 128 129 try: 130 obj = self._receive() 131 return obj 132 finally: 133 self._send("OK") 134 135 class PersistentChannel(Channel): 136 137 """ 138 A persistent communications channel which can handle peer disconnection, 139 acting as a server, meaning that this channel is associated with a specific 140 address which can be contacted by other processes. 141 """ 142 143 def __init__(self, pid, endpoint, address): 144 Channel.__init__(self, pid, None, None) 145 self.endpoint = endpoint 146 self.address = address 147 self.poller = select.poll() 148 149 # Listen for connections before this process is interested in 150 # communicating. It is not desirable to wait for connections at this 151 # point because this will block the process. 152 153 self.endpoint.listen(1) 154 155 def close(self): 156 157 "Close the persistent channel and remove the socket file." 158 159 Channel.close(self) 160 try: 161 os.unlink(self.address) 162 except OSError: 163 pass 164 165 def _ensure_pipes(self): 166 167 "Ensure that the channel is capable of communicating." 168 169 if self.read_pipe is None or self.write_pipe is None: 170 171 # Accept any incoming connections. 172 173 endpoint, address = self.endpoint.accept() 174 self.read_pipe = endpoint.makefile("r", 0) 175 self.write_pipe = endpoint.makefile("w", 0) 176 177 # Monitor the write pipe for error conditions. 178 179 fileno = self.write_pipe.fileno() 180 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) 181 182 def _reset_pipes(self): 183 184 "Discard the existing connection." 185 186 fileno = self.write_pipe.fileno() 187 self.poller.unregister(fileno) 188 self.read_pipe = None 189 self.write_pipe = None 190 self.endpoint.listen(1) 191 192 def _ensure_communication(self, timeout=None): 193 194 "Ensure that sending and receiving are possible." 195 196 while 1: 197 self._ensure_pipes() 198 fileno = self.write_pipe.fileno() 199 fds = self.poller.poll(timeout) 200 for fd, status in fds: 201 if fd != fileno: 202 continue 203 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 204 205 # Broken connection: discard it and start all over again. 206 207 self._reset_pipes() 208 break 209 else: 210 return 211 212 def _send(self, obj): 213 214 "Send the given object 'obj' through the channel." 215 216 self._ensure_communication() 217 Channel._send(self, obj) 218 219 def _receive(self): 220 221 "Receive an object through the channel, returning the object." 222 223 self._ensure_communication() 224 return Channel._receive(self) 225 226 # Management of processes and communications. 227 228 class Exchange: 229 230 """ 231 A communications exchange that can be used to detect channels which are 232 ready to communicate. Subclasses of this class can define the 'store_data' 233 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 234 """ 235 236 def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): 237 238 """ 239 Initialise the exchange with an optional list of 'channels'. 240 241 If the optional 'limit' is specified, restrictions on the addition of 242 new channels can be enforced and observed through the 'add_wait', 'wait' 243 and 'finish' methods. To make use of these methods, create a subclass of 244 this class and define a working 'store_data' method. 245 246 If the optional 'reuse' parameter is set to a true value, channels and 247 processes will be reused for waiting computations. 248 249 If the optional 'autoclose' parameter is set to a false value, channels 250 will not be closed automatically when they are removed from the exchange 251 - by default they are closed when removed. 252 """ 253 254 self.limit = limit 255 self.reuse = reuse 256 self.autoclose = autoclose 257 self.waiting = [] 258 self.readables = {} 259 self.removed = [] 260 self.poller = select.poll() 261 for channel in channels or []: 262 self.add(channel) 263 264 def add(self, channel): 265 266 "Add the given 'channel' to the exchange." 267 268 fileno = channel.read_pipe.fileno() 269 self.readables[fileno] = channel 270 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 271 272 def active(self): 273 274 "Return a list of active channels." 275 276 return self.readables.values() 277 278 def ready(self, timeout=None): 279 280 """ 281 Wait for a period of time specified by the optional 'timeout' (or until 282 communication is possible) and return a list of channels which are ready 283 to be read from. 284 """ 285 286 fds = self.poller.poll(timeout) 287 readables = [] 288 self.removed = [] 289 290 for fd, status in fds: 291 channel = self.readables[fd] 292 removed = 0 293 294 # Remove ended/error channels. 295 296 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 297 self.remove(channel) 298 self.removed.append(channel) 299 removed = 1 300 301 # Record readable channels. 302 303 if status & select.POLLIN: 304 if not (removed and self.autoclose): 305 readables.append(channel) 306 307 return readables 308 309 def remove(self, channel): 310 311 """ 312 Remove the given 'channel' from the exchange. 313 """ 314 315 fileno = channel.read_pipe.fileno() 316 del self.readables[fileno] 317 self.poller.unregister(fileno) 318 if self.autoclose: 319 channel.close() 320 channel.wait() 321 322 # Enhanced exchange methods involving channel limits. 323 324 def add_wait(self, channel): 325 326 """ 327 Add the given 'channel' to the exchange, waiting if the limit on active 328 channels would be exceeded by adding the channel. 329 """ 330 331 self.wait() 332 self.add(channel) 333 334 def wait(self): 335 336 """ 337 Test for the limit on channels, blocking and reading incoming data until 338 the number of channels is below the limit. 339 """ 340 341 # If limited, block until channels have been closed. 342 343 while self.limit is not None and len(self.active()) >= self.limit: 344 self.store() 345 346 def _get_waiting(self, channel): 347 348 """ 349 Get waiting callable and argument information for new processes, given 350 the reception of data on the given 'channel'. 351 """ 352 353 if self.waiting: 354 callable, args, kw = self.waiting.pop() 355 356 # Try and reuse existing channels if possible. 357 358 if self.reuse: 359 360 # Re-add the channel - this may update information related to 361 # the channel in subclasses. 362 363 self.add(channel) 364 channel.send((args, kw)) 365 else: 366 return callable, args, kw 367 368 # Where channels are being reused, but where no processes are waiting 369 # any more, send a special value to tell them to quit. 370 371 elif self.reuse: 372 channel.send(None) 373 374 return None 375 376 def finish(self): 377 378 """ 379 Finish the use of the exchange by waiting for all channels to complete. 380 """ 381 382 while self.active(): 383 self.store() 384 385 def store(self): 386 387 "For each ready channel, process the incoming data." 388 389 for channel in self.ready(): 390 self.store_data(channel) 391 self.start_waiting(channel) 392 393 def store_data(self, channel): 394 395 """ 396 Store incoming data from the specified 'channel'. In subclasses of this 397 class, such data could be stored using instance attributes. 398 """ 399 400 raise NotImplementedError, "store_data" 401 402 # Support for the convenience methods. 403 404 def _set_waiting(self, callable, args, kw): 405 406 """ 407 Support process creation by returning whether the given 'callable' has 408 been queued for later invocation. 409 """ 410 411 if self.limit is not None and len(self.active()) >= self.limit: 412 self.waiting.insert(0, (callable, args, kw)) 413 return 1 414 else: 415 return 0 416 417 def _get_channel_for_process(self, channel): 418 419 """ 420 Support process creation by returning the given 'channel' to the 421 creating process, and None to the created process. 422 """ 423 424 if channel.pid == 0: 425 return channel 426 else: 427 self.add_wait(channel) 428 return None 429 430 # Methods for overriding, related to the convenience methods. 431 432 def start_waiting(self, channel): 433 434 """ 435 Start a waiting process given the reception of data on the given 436 'channel'. 437 """ 438 439 details = self._get_waiting(channel) 440 if details is not None: 441 callable, args, kw = details 442 self.add(start(callable, *args, **kw)) 443 444 # Convenience methods. 445 446 def start(self, callable, *args, **kw): 447 448 """ 449 Create a new process for the given 'callable' using any additional 450 arguments provided. Then, monitor the channel created between this 451 process and the created process. 452 """ 453 454 if self._set_waiting(callable, args, kw): 455 return 456 457 self.add_wait(start(callable, *args, **kw)) 458 459 def create(self): 460 461 """ 462 Create a new process and return the created communications channel to 463 the created process. In the creating process, return None - the channel 464 receiving data from the created process will be automatically managed by 465 this exchange. 466 """ 467 468 channel = create() 469 return self._get_channel_for_process(channel) 470 471 def manage(self, callable): 472 473 """ 474 Wrap the given 'callable' in an object which can then be called in the 475 same way as 'callable', but with new processes and communications 476 managed automatically. 477 """ 478 479 return ManagedCallable(callable, self) 480 481 class Persistent: 482 483 """ 484 A mix-in class providing methods to exchanges for the management of 485 persistent communications. 486 """ 487 488 def start_waiting(self, channel): 489 490 """ 491 Start a waiting process given the reception of data on the given 492 'channel'. 493 """ 494 495 details = self._get_waiting(channel) 496 if details is not None: 497 callable, args, kw = details 498 self.add(start_persistent(channel.address, callable, *args, **kw)) 499 500 def start(self, address, callable, *args, **kw): 501 502 """ 503 Create a new process, located at the given 'address', for the given 504 'callable' using any additional arguments provided. Then, monitor the 505 channel created between this process and the created process. 506 """ 507 508 if self._set_waiting(callable, args, kw): 509 return 510 511 start_persistent(address, callable, *args, **kw) 512 513 def create(self, address): 514 515 """ 516 Create a new process, located at the given 'address', and return the 517 created communications channel to the created process. In the creating 518 process, return None - the channel receiving data from the created 519 process will be automatically managed by this exchange. 520 """ 521 522 channel = create_persistent(address) 523 return self._get_channel_for_process(channel) 524 525 def manage(self, address, callable): 526 527 """ 528 Using the given 'address', publish the given 'callable' in an object 529 which can then be called in the same way as 'callable', but with new 530 processes and communications managed automatically. 531 """ 532 533 return PersistentCallable(address, callable, self) 534 535 def connect(self, address): 536 537 "Connect to a process which is contactable via the given 'address'." 538 539 channel = connect_persistent(address) 540 self.add_wait(channel) 541 542 class ManagedCallable: 543 544 "A callable managed by an exchange." 545 546 def __init__(self, callable, exchange): 547 548 """ 549 Wrap the given 'callable', using the given 'exchange' to monitor the 550 channels created for communications between this and the created 551 processes. Note that the 'callable' must be parallel-aware (that is, 552 have a 'channel' parameter). Use the MakeParallel class to wrap other 553 kinds of callable objects. 554 """ 555 556 self.callable = callable 557 self.exchange = exchange 558 559 def __call__(self, *args, **kw): 560 561 "Invoke the callable with the supplied arguments." 562 563 self.exchange.start(self.callable, *args, **kw) 564 565 class PersistentCallable: 566 567 "A callable which sets up a persistent communications channel." 568 569 def __init__(self, address, callable, exchange): 570 571 """ 572 Using the given 'address', wrap the given 'callable', using the given 573 'exchange' to monitor the channels created for communications between 574 this and the created processes, so that when it is called, a background 575 process is started within which the 'callable' will run. Note that the 576 'callable' must be parallel-aware (that is, have a 'channel' parameter). 577 Use the MakeParallel class to wrap other kinds of callable objects. 578 """ 579 580 self.callable = callable 581 self.exchange = exchange 582 self.address = address 583 584 def __call__(self, *args, **kw): 585 586 "Invoke the callable with the supplied arguments." 587 588 self.exchange.start(self.address, self.callable, *args, **kw) 589 590 class BackgroundCallable: 591 592 """ 593 A callable which sets up a persistent communications channel, but is 594 unmanaged by an exchange. 595 """ 596 597 def __init__(self, address, callable): 598 599 """ 600 Using the given 'address', wrap the given 'callable'. This object can 601 then be invoked, but the wrapped callable will be run in a background 602 process. Note that the 'callable' must be parallel-aware (that is, have 603 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 604 of callable objects. 605 """ 606 607 self.callable = callable 608 self.address = address 609 610 def __call__(self, *args, **kw): 611 612 "Invoke the callable with the supplied arguments." 613 614 start_persistent(self.address, self.callable, *args, **kw) 615 616 # Abstractions and utilities. 617 618 class Map(Exchange): 619 620 "An exchange which can be used like the built-in 'map' function." 621 622 def __init__(self, *args, **kw): 623 Exchange.__init__(self, *args, **kw) 624 self.init() 625 626 def init(self): 627 628 "Remember the channel addition order to order output." 629 630 self.channel_number = 0 631 self.channels = {} 632 self.results = [] 633 634 def add(self, channel): 635 636 "Add the given 'channel' to the exchange." 637 638 Exchange.add(self, channel) 639 self.channels[channel] = self.channel_number 640 self.channel_number += 1 641 642 def start(self, callable, *args, **kw): 643 644 """ 645 Create a new process for the given 'callable' using any additional 646 arguments provided. Then, monitor the channel created between this 647 process and the created process. 648 """ 649 650 self.results.append(None) # placeholder 651 Exchange.start(self, callable, *args, **kw) 652 653 def create(self): 654 655 """ 656 Create a new process and return the created communications channel to 657 the created process. In the creating process, return None - the channel 658 receiving data from the created process will be automatically managed by 659 this exchange. 660 """ 661 662 self.results.append(None) # placeholder 663 return Exchange.create(self) 664 665 def __call__(self, callable, sequence): 666 667 "Wrap and invoke 'callable' for each element in the 'sequence'." 668 669 if not isinstance(callable, MakeParallel): 670 wrapped = MakeParallel(callable) 671 else: 672 wrapped = callable 673 674 self.init() 675 676 # Start processes for each element in the sequence. 677 678 for i in sequence: 679 self.start(wrapped, i) 680 681 # Access to the results occurs through this object. 682 683 return self 684 685 def __getitem__(self, i): 686 self.finish() 687 return self.results[i] 688 689 def __iter__(self): 690 self.finish() 691 return iter(self.results) 692 693 def store_data(self, channel): 694 695 "Accumulate the incoming data, associating results with channels." 696 697 data = channel.receive() 698 self.results[self.channels[channel]] = data 699 del self.channels[channel] 700 701 class Queue(Exchange): 702 703 """ 704 An exchange acting as a queue, making data from created processes available 705 in the order in which it is received. 706 """ 707 708 def __init__(self, *args, **kw): 709 Exchange.__init__(self, *args, **kw) 710 self.queue = [] 711 712 def store_data(self, channel): 713 714 "Accumulate the incoming data, associating results with channels." 715 716 data = channel.receive() 717 self.queue.insert(0, data) 718 719 def __iter__(self): 720 return self 721 722 def next(self): 723 724 "Return the next element in the queue." 725 726 if self.queue: 727 return self.queue.pop() 728 while self.active(): 729 self.store() 730 if self.queue: 731 return self.queue.pop() 732 else: 733 raise StopIteration 734 735 class MakeParallel: 736 737 "A wrapper around functions making them able to communicate results." 738 739 def __init__(self, callable): 740 741 """ 742 Initialise the wrapper with the given 'callable'. This object will then 743 be able to accept a 'channel' parameter when invoked, and to forward the 744 result of the given 'callable' via the channel provided back to the 745 invoking process. 746 """ 747 748 self.callable = callable 749 750 def __call__(self, channel, *args, **kw): 751 752 "Invoke the callable and return its result via the given 'channel'." 753 754 channel.send(self.callable(*args, **kw)) 755 756 class MakeReusable(MakeParallel): 757 758 """ 759 A wrapper around functions making them able to communicate results in a 760 reusable fashion. 761 """ 762 763 def __call__(self, channel, *args, **kw): 764 765 "Invoke the callable and return its result via the given 'channel'." 766 767 channel.send(self.callable(*args, **kw)) 768 t = channel.receive() 769 while t is not None: 770 args, kw = t 771 channel.send(self.callable(*args, **kw)) 772 t = channel.receive() 773 774 # Persistent variants. 775 776 class PersistentExchange(Persistent, Exchange): 777 778 "An exchange which manages persistent communications." 779 780 pass 781 782 class PersistentQueue(Persistent, Queue): 783 784 "A queue which manages persistent communications." 785 786 pass 787 788 # Convenience functions. 789 790 def BackgroundQueue(address): 791 792 """ 793 Connect to a process reachable via the given 'address', making the results 794 of which accessible via a queue. 795 """ 796 797 queue = PersistentQueue(limit=1) 798 queue.connect(address) 799 return queue 800 801 def pmap(callable, sequence, limit=None): 802 803 """ 804 A parallel version of the built-in map function with an optional process 805 'limit'. The given 'callable' should not be parallel-aware (that is, have a 806 'channel' parameter) since it will be wrapped for parallel communications 807 before being invoked. 808 809 Return the processed 'sequence' where each element in the sequence is 810 processed by a different process. 811 """ 812 813 mymap = Map(limit=limit) 814 return mymap(callable, sequence) 815 816 # Utility functions. 817 818 _cpuinfo_fields = "physical id", "core id" 819 820 def get_number_of_cores(): 821 822 """ 823 Return the number of distinct, genuine processor cores. If the platform is 824 not supported by this function, None is returned. 825 """ 826 827 try: 828 f = open("/proc/cpuinfo") 829 try: 830 processors = set() 831 processor = [None, None] 832 833 for line in f.xreadlines(): 834 for i, field in enumerate(_cpuinfo_fields): 835 if line.startswith(field): 836 t = line.split(":") 837 processor[i] = int(t[1].strip()) 838 break 839 else: 840 if line.startswith("processor") and processor[0] is not None: 841 processors.add(tuple(processor)) 842 processor = [None, None] 843 844 if processor[0] is not None: 845 processors.add(tuple(processor)) 846 847 return len(processors) 848 849 finally: 850 f.close() 851 852 except OSError: 853 return None 854 855 # Low-level functions. 856 857 def create(): 858 859 """ 860 Create a new process, returning a communications channel to both the 861 creating process and the created process. 862 """ 863 864 parent, child = socket.socketpair() 865 for s in [parent, child]: 866 s.setblocking(1) 867 868 pid = os.fork() 869 if pid == 0: 870 parent.close() 871 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 872 else: 873 child.close() 874 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 875 876 def create_persistent(address): 877 878 """ 879 Create a new process, returning a persistent communications channel between 880 the creating process and the created process. This channel can be 881 disconnected from the creating process and connected to another process, and 882 thus can be used to collect results from daemon processes. 883 884 In order to be able to reconnect to created processes, the 'address' of the 885 communications endpoint for the created process needs to be provided. This 886 should be a filename. 887 """ 888 889 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 890 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 891 child.bind(address) 892 893 for s in [parent, child]: 894 s.setblocking(1) 895 896 pid = os.fork() 897 if pid == 0: 898 parent.close() 899 return PersistentChannel(pid, child, address) 900 else: 901 child.close() 902 #parent.connect(address) 903 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 904 905 def connect_persistent(address): 906 907 """ 908 Connect via a persistent channel to an existing created process, reachable 909 at the given 'address'. 910 """ 911 912 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 913 parent.setblocking(1) 914 parent.connect(address) 915 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) 916 917 def exit(channel): 918 919 """ 920 Terminate a created process, closing the given 'channel'. 921 """ 922 923 channel.close() 924 os._exit(0) 925 926 def start(callable, *args, **kw): 927 928 """ 929 Create a new process which shall start running in the given 'callable'. 930 Additional arguments to the 'callable' can be given as additional arguments 931 to this function. 932 933 Return a communications channel to the creating process. For the created 934 process, supply a channel as the 'channel' parameter in the given 'callable' 935 so that it may send data back to the creating process. 936 """ 937 938 channel = create() 939 if channel.pid == 0: 940 try: 941 try: 942 callable(channel, *args, **kw) 943 except: 944 exc_type, exc_value, exc_traceback = sys.exc_info() 945 channel.send(exc_value) 946 finally: 947 exit(channel) 948 else: 949 return channel 950 951 def start_persistent(address, callable, *args, **kw): 952 953 """ 954 Create a new process which shall be reachable using the given 'address' and 955 which will start running in the given 'callable'. Additional arguments to 956 the 'callable' can be given as additional arguments to this function. 957 958 Return a communications channel to the creating process. For the created 959 process, supply a channel as the 'channel' parameter in the given 'callable' 960 so that it may send data back to the creating process. 961 962 Note that the created process employs a channel which is persistent: it can 963 withstand disconnection from the creating process and subsequent connections 964 from other processes. 965 """ 966 967 channel = create_persistent(address) 968 if channel.pid == 0: 969 close_streams() 970 try: 971 try: 972 callable(channel, *args, **kw) 973 except: 974 exc_type, exc_value, exc_traceback = sys.exc_info() 975 channel.send(exc_value) 976 finally: 977 exit(channel) 978 else: 979 return channel 980 981 def close_streams(): 982 983 """ 984 Close streams which keep the current process attached to any creating 985 processes. 986 """ 987 988 os.close(sys.stdin.fileno()) 989 os.close(sys.stdout.fileno()) 990 os.close(sys.stderr.fileno()) 991 992 def waitall(): 993 994 "Wait for all created processes to terminate." 995 996 try: 997 while 1: 998 os.wait() 999 except OSError: 1000 pass 1001 1002 # vim: tabstop=4 expandtab shiftwidth=4