1 #!/usr/bin/env python 2 3 """ 4 Thread module emulating a subset of Java's threading model. 5 6 See docs/COPYING.txt and docs/LICENCE-python.txt for copyright and licensing 7 information for Python standard library modules. 8 """ 9 10 import sys as _sys 11 12 try: 13 import thread 14 except ImportError: 15 del _sys.modules[__name__] 16 raise 17 18 from time import time as _time, sleep as _sleep 19 from traceback import format_exc as _format_exc 20 from collections import deque 21 22 # Rename some stuff so "from threading import *" is safe 23 __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', 24 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 25 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 26 27 _start_new_thread = thread.start_new_thread 28 _allocate_lock = thread.allocate_lock 29 _get_ident = thread.get_ident 30 ThreadError = thread.error 31 del thread 32 33 34 # Support for profile and trace hooks 35 36 _profile_hook = None 37 _trace_hook = None 38 39 def setprofile(func): 40 global _profile_hook 41 _profile_hook = func 42 43 def settrace(func): 44 global _trace_hook 45 _trace_hook = func 46 47 # Synchronization classes 48 49 Lock = _allocate_lock 50 51 def RLock(*args, **kwargs): 52 return _RLock(*args, **kwargs) 53 54 class _RLock: 55 56 def __init__(self): 57 self.__block = _allocate_lock() 58 self.__owner = None 59 self.__count = 0 60 61 def __repr__(self): 62 owner = self.__owner 63 return "<%s(%s, %d)>" % ( 64 self.__class__.__name__, 65 owner and owner.getName(), 66 self.__count) 67 68 def acquire(self, blocking=1): 69 me = currentThread() 70 if self.__owner is me: 71 self.__count = self.__count + 1 72 if __debug__: 73 self._note("%s.acquire(%s): recursive success", self, blocking) 74 return 1 75 rc = self.__block.acquire(blocking) 76 if rc: 77 self.__owner = me 78 self.__count = 1 79 if __debug__: 80 self._note("%s.acquire(%s): initial success", self, blocking) 81 else: 82 if __debug__: 83 self._note("%s.acquire(%s): failure", self, blocking) 84 return rc 85 86 __enter__ = acquire 87 88 def release(self): 89 if self.__owner is not currentThread(): 90 raise RuntimeError("cannot release un-aquired lock") 91 self.__count = count = self.__count - 1 92 if not count: 93 self.__owner = None 94 self.__block.release() 95 if __debug__: 96 self._note("%s.release(): final release", self) 97 else: 98 if __debug__: 99 self._note("%s.release(): non-final release", self) 100 101 def __exit__(self, t, v, tb): 102 self.release() 103 104 # Internal methods used by condition variables 105 106 def _acquire_restore(self, (count, owner)): 107 self.__block.acquire() 108 self.__count = count 109 self.__owner = owner 110 if __debug__: 111 self._note("%s._acquire_restore()", self) 112 113 def _release_save(self): 114 if __debug__: 115 self._note("%s._release_save()", self) 116 count = self.__count 117 self.__count = 0 118 owner = self.__owner 119 self.__owner = None 120 self.__block.release() 121 return (count, owner) 122 123 def _is_owned(self): 124 return self.__owner is currentThread() 125 126 127 def Condition(*args, **kwargs): 128 return _Condition(*args, **kwargs) 129 130 class _Condition: 131 132 def __init__(self, lock=None): 133 if lock is None: 134 lock = RLock() 135 self.__lock = lock 136 # Export the lock's acquire() and release() methods 137 self.acquire = lock.acquire 138 self.release = lock.release 139 # If the lock defines _release_save() and/or _acquire_restore(), 140 # these override the default implementations (which just call 141 # release() and acquire() on the lock). Ditto for _is_owned(). 142 try: 143 self._release_save = lock._release_save 144 except AttributeError: 145 pass 146 try: 147 self._acquire_restore = lock._acquire_restore 148 except AttributeError: 149 pass 150 try: 151 self._is_owned = lock._is_owned 152 except AttributeError: 153 pass 154 self.__waiters = [] 155 156 def __enter__(self): 157 return self.__lock.__enter__() 158 159 def __exit__(self, *args): 160 return self.__lock.__exit__(*args) 161 162 def __repr__(self): 163 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) 164 165 def _release_save(self): 166 self.__lock.release() # No state to save 167 168 def _acquire_restore(self, x): 169 self.__lock.acquire() # Ignore saved state 170 171 def _is_owned(self): 172 # Return True if lock is owned by currentThread. 173 # This method is called only if __lock doesn't have _is_owned(). 174 if self.__lock.acquire(0): 175 self.__lock.release() 176 return False 177 else: 178 return True 179 180 def wait(self, timeout=None): 181 if not self._is_owned(): 182 raise RuntimeError("cannot wait on un-aquired lock") 183 waiter = _allocate_lock() 184 waiter.acquire() 185 self.__waiters.append(waiter) 186 saved_state = self._release_save() 187 try: # restore state no matter what (e.g., KeyboardInterrupt) 188 if timeout is None: 189 waiter.acquire() 190 if __debug__: 191 self._note("%s.wait(): got it", self) 192 else: 193 # Balancing act: We can't afford a pure busy loop, so we 194 # have to sleep; but if we sleep the whole timeout time, 195 # we'll be unresponsive. The scheme here sleeps very 196 # little at first, longer as time goes on, but never longer 197 # than 20 times per second (or the timeout time remaining). 198 endtime = _time() + timeout 199 delay = 0.0005 # 500 us -> initial delay of 1 ms 200 while True: 201 gotit = waiter.acquire(0) 202 if gotit: 203 break 204 remaining = endtime - _time() 205 if remaining <= 0: 206 break 207 delay = min(delay * 2, remaining, .05) 208 _sleep(delay) 209 if not gotit: 210 if __debug__: 211 self._note("%s.wait(%s): timed out", self, timeout) 212 try: 213 self.__waiters.remove(waiter) 214 except ValueError: 215 pass 216 else: 217 if __debug__: 218 self._note("%s.wait(%s): got it", self, timeout) 219 finally: 220 self._acquire_restore(saved_state) 221 222 def notify(self, n=1): 223 if not self._is_owned(): 224 raise RuntimeError("cannot notify on un-aquired lock") 225 __waiters = self.__waiters 226 waiters = __waiters[:n] 227 if not waiters: 228 if __debug__: 229 self._note("%s.notify(): no waiters", self) 230 return 231 self._note("%s.notify(): notifying %d waiter%s", self, n, 232 n!=1 and "s" or "") 233 for waiter in waiters: 234 waiter.release() 235 try: 236 __waiters.remove(waiter) 237 except ValueError: 238 pass 239 240 def notifyAll(self): 241 self.notify(len(self.__waiters)) 242 243 244 def Semaphore(*args, **kwargs): 245 return _Semaphore(*args, **kwargs) 246 247 class _Semaphore: 248 249 # After Tim Peters' semaphore class, but not quite the same (no maximum) 250 251 def __init__(self, value=1): 252 if value < 0: 253 raise ValueError("semaphore initial value must be >= 0") 254 self.__cond = Condition(Lock()) 255 self.__value = value 256 257 def acquire(self, blocking=1): 258 rc = False 259 self.__cond.acquire() 260 while self.__value == 0: 261 if not blocking: 262 break 263 if __debug__: 264 self._note("%s.acquire(%s): blocked waiting, value=%s", 265 self, blocking, self.__value) 266 self.__cond.wait() 267 else: 268 self.__value = self.__value - 1 269 if __debug__: 270 self._note("%s.acquire: success, value=%s", 271 self, self.__value) 272 rc = True 273 self.__cond.release() 274 return rc 275 276 __enter__ = acquire 277 278 def release(self): 279 self.__cond.acquire() 280 self.__value = self.__value + 1 281 if __debug__: 282 self._note("%s.release: success, value=%s", 283 self, self.__value) 284 self.__cond.notify() 285 self.__cond.release() 286 287 def __exit__(self, t, v, tb): 288 self.release() 289 290 291 def BoundedSemaphore(*args, **kwargs): 292 return _BoundedSemaphore(*args, **kwargs) 293 294 class _BoundedSemaphore(_Semaphore): 295 """Semaphore that checks that # releases is <= # acquires""" 296 def __init__(self, value=1): 297 _Semaphore.__init__(self, value) 298 self._initial_value = value 299 300 def release(self): 301 if self._Semaphore__value >= self._initial_value: 302 raise ValueError, "Semaphore released too many times" 303 return _Semaphore.release(self) 304 305 306 def Event(*args, **kwargs): 307 return _Event(*args, **kwargs) 308 309 class _Event: 310 311 # After Tim Peters' event class (without is_posted()) 312 313 def __init__(self): 314 self.__cond = Condition(Lock()) 315 self.__flag = False 316 317 def isSet(self): 318 return self.__flag 319 320 def set(self): 321 self.__cond.acquire() 322 try: 323 self.__flag = True 324 self.__cond.notifyAll() 325 finally: 326 self.__cond.release() 327 328 def clear(self): 329 self.__cond.acquire() 330 try: 331 self.__flag = False 332 finally: 333 self.__cond.release() 334 335 def wait(self, timeout=None): 336 self.__cond.acquire() 337 try: 338 if not self.__flag: 339 self.__cond.wait(timeout) 340 finally: 341 self.__cond.release() 342 343 # Helper to generate new thread names 344 _counter = 0 345 def _newname(template="Thread-%d"): 346 global _counter 347 _counter = _counter + 1 348 return template % _counter 349 350 # Active thread administration 351 _active_limbo_lock = _allocate_lock() 352 _active = {} # maps thread id to Thread object 353 _limbo = {} 354 355 356 # Main class for threads 357 358 class Thread: 359 360 __initialized = False 361 # Need to store a reference to sys.exc_info for printing 362 # out exceptions when a thread tries to use a global var. during interp. 363 # shutdown and thus raises an exception about trying to perform some 364 # operation on/with a NoneType 365 __exc_info = _sys.exc_info 366 367 def __init__(self, group=None, target=None, name=None, 368 args=(), kwargs=None): 369 assert group is None, "group argument must be None for now" 370 if kwargs is None: 371 kwargs = {} 372 self.__target = target 373 self.__name = str(name or _newname()) 374 self.__args = args 375 self.__kwargs = kwargs 376 self.__daemonic = self._set_daemon() 377 self.__started = False 378 self.__stopped = False 379 self.__block = Condition(Lock()) 380 self.__initialized = True 381 # sys.stderr is not stored in the class like 382 # sys.exc_info since it can be changed between instances 383 self.__stderr = _sys.stderr 384 385 def _set_daemon(self): 386 # Overridden in _MainThread and _DummyThread 387 return currentThread().isDaemon() 388 389 def __repr__(self): 390 assert self.__initialized, "Thread.__init__() was not called" 391 status = "initial" 392 if self.__started: 393 status = "started" 394 if self.__stopped: 395 status = "stopped" 396 if self.__daemonic: 397 status = status + " daemon" 398 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) 399 400 def start(self): 401 if not self.__initialized: 402 raise RuntimeError("thread.__init__() not called") 403 if self.__started: 404 raise RuntimeError("thread already started") 405 if __debug__: 406 self._note("%s.start(): starting thread", self) 407 _active_limbo_lock.acquire() 408 _limbo[self] = self 409 _active_limbo_lock.release() 410 _start_new_thread(self.__bootstrap, ()) 411 self.__started = True 412 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) 413 414 def run(self): 415 if self.__target: 416 self.__target(*self.__args, **self.__kwargs) 417 418 def __bootstrap(self): 419 # Wrapper around the real bootstrap code that ignores 420 # exceptions during interpreter cleanup. Those typically 421 # happen when a daemon thread wakes up at an unfortunate 422 # moment, finds the world around it destroyed, and raises some 423 # random exception *** while trying to report the exception in 424 # __bootstrap_inner() below ***. Those random exceptions 425 # don't help anybody, and they confuse users, so we suppress 426 # them. We suppress them only when it appears that the world 427 # indeed has already been destroyed, so that exceptions in 428 # __bootstrap_inner() during normal business hours are properly 429 # reported. Also, we only suppress them for daemonic threads; 430 # if a non-daemonic encounters this, something else is wrong. 431 try: 432 self.__bootstrap_inner() 433 except: 434 if self.__daemonic and _sys is None: 435 return 436 raise 437 438 def __bootstrap_inner(self): 439 try: 440 self.__started = True 441 _active_limbo_lock.acquire() 442 _active[_get_ident()] = self 443 del _limbo[self] 444 _active_limbo_lock.release() 445 if __debug__: 446 self._note("%s.__bootstrap(): thread started", self) 447 448 if _trace_hook: 449 self._note("%s.__bootstrap(): registering trace hook", self) 450 _sys.settrace(_trace_hook) 451 if _profile_hook: 452 self._note("%s.__bootstrap(): registering profile hook", self) 453 _sys.setprofile(_profile_hook) 454 455 try: 456 self.run() 457 except SystemExit: 458 if __debug__: 459 self._note("%s.__bootstrap(): raised SystemExit", self) 460 except: 461 if __debug__: 462 self._note("%s.__bootstrap(): unhandled exception", self) 463 # If sys.stderr is no more (most likely from interpreter 464 # shutdown) use self.__stderr. Otherwise still use sys (as in 465 # _sys) in case sys.stderr was redefined since the creation of 466 # self. 467 if _sys: 468 _sys.stderr.write("Exception in thread %s:\n%s\n" % 469 (self.getName(), _format_exc())) 470 else: 471 # Do the best job possible w/o a huge amt. of code to 472 # approximate a traceback (code ideas from 473 # Lib/traceback.py) 474 exc_type, exc_value, exc_tb = self.__exc_info() 475 try: 476 print>>self.__stderr, ( 477 "Exception in thread " + self.getName() + 478 " (most likely raised during interpreter shutdown):") 479 print>>self.__stderr, ( 480 "Traceback (most recent call last):") 481 while exc_tb: 482 print>>self.__stderr, ( 483 ' File "%s", line %s, in %s' % 484 (exc_tb.tb_frame.f_code.co_filename, 485 exc_tb.tb_lineno, 486 exc_tb.tb_frame.f_code.co_name)) 487 exc_tb = exc_tb.tb_next 488 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 489 # Make sure that exc_tb gets deleted since it is a memory 490 # hog; deleting everything else is just for thoroughness 491 finally: 492 del exc_type, exc_value, exc_tb 493 else: 494 if __debug__: 495 self._note("%s.__bootstrap(): normal return", self) 496 finally: 497 _active_limbo_lock.acquire() 498 try: 499 self.__stop() 500 try: 501 # We don't call self.__delete() because it also 502 # grabs _active_limbo_lock. 503 del _active[_get_ident()] 504 except: 505 pass 506 finally: 507 _active_limbo_lock.release() 508 509 def __stop(self): 510 self.__block.acquire() 511 self.__stopped = True 512 self.__block.notifyAll() 513 self.__block.release() 514 515 def __delete(self): 516 "Remove current thread from the dict of currently running threads." 517 518 # Notes about running with dummy_thread: 519 # 520 # Must take care to not raise an exception if dummy_thread is being 521 # used (and thus this module is being used as an instance of 522 # dummy_threading). dummy_thread.get_ident() always returns -1 since 523 # there is only one thread if dummy_thread is being used. Thus 524 # len(_active) is always <= 1 here, and any Thread instance created 525 # overwrites the (if any) thread currently registered in _active. 526 # 527 # An instance of _MainThread is always created by 'threading'. This 528 # gets overwritten the instant an instance of Thread is created; both 529 # threads return -1 from dummy_thread.get_ident() and thus have the 530 # same key in the dict. So when the _MainThread instance created by 531 # 'threading' tries to clean itself up when atexit calls this method 532 # it gets a KeyError if another Thread instance was created. 533 # 534 # This all means that KeyError from trying to delete something from 535 # _active if dummy_threading is being used is a red herring. But 536 # since it isn't if dummy_threading is *not* being used then don't 537 # hide the exception. 538 539 _active_limbo_lock.acquire() 540 try: 541 try: 542 del _active[_get_ident()] 543 except KeyError: 544 if 'dummy_threading' not in _sys.modules: 545 raise 546 finally: 547 _active_limbo_lock.release() 548 549 def join(self, timeout=None): 550 if not self.__initialized: 551 raise RuntimeError("Thread.__init__() not called") 552 if not self.__started: 553 raise RuntimeError("cannot join thread before it is started") 554 if self is currentThread(): 555 raise RuntimeError("cannot join current thread") 556 557 if __debug__: 558 if not self.__stopped: 559 self._note("%s.join(): waiting until thread stops", self) 560 self.__block.acquire() 561 try: 562 if timeout is None: 563 while not self.__stopped: 564 self.__block.wait() 565 if __debug__: 566 self._note("%s.join(): thread stopped", self) 567 else: 568 deadline = _time() + timeout 569 while not self.__stopped: 570 delay = deadline - _time() 571 if delay <= 0: 572 if __debug__: 573 self._note("%s.join(): timed out", self) 574 break 575 self.__block.wait(delay) 576 else: 577 if __debug__: 578 self._note("%s.join(): thread stopped", self) 579 finally: 580 self.__block.release() 581 582 def getName(self): 583 assert self.__initialized, "Thread.__init__() not called" 584 return self.__name 585 586 def setName(self, name): 587 assert self.__initialized, "Thread.__init__() not called" 588 self.__name = str(name) 589 590 def isAlive(self): 591 assert self.__initialized, "Thread.__init__() not called" 592 return self.__started and not self.__stopped 593 594 def isDaemon(self): 595 assert self.__initialized, "Thread.__init__() not called" 596 return self.__daemonic 597 598 def setDaemon(self, daemonic): 599 if not self.__initialized: 600 raise RuntimeError("Thread.__init__() not called") 601 if self.__started: 602 raise RuntimeError("cannot set daemon status of active thread"); 603 self.__daemonic = daemonic 604 605 # The timer class was contributed by Itamar Shtull-Trauring 606 607 def Timer(*args, **kwargs): 608 return _Timer(*args, **kwargs) 609 610 class _Timer(Thread): 611 """Call a function after a specified number of seconds: 612 613 t = Timer(30.0, f, args=[], kwargs={}) 614 t.start() 615 t.cancel() # stop the timer's action if it's still waiting 616 """ 617 618 def __init__(self, interval, function, args=[], kwargs={}): 619 Thread.__init__(self) 620 self.interval = interval 621 self.function = function 622 self.args = args 623 self.kwargs = kwargs 624 self.finished = Event() 625 626 def cancel(self): 627 """Stop the timer if it hasn't finished yet""" 628 self.finished.set() 629 630 def run(self): 631 self.finished.wait(self.interval) 632 if not self.finished.isSet(): 633 self.function(*self.args, **self.kwargs) 634 self.finished.set() 635 636 # Special thread class to represent the main thread 637 # This is garbage collected through an exit handler 638 639 class _MainThread(Thread): 640 641 def __init__(self): 642 Thread.__init__(self, name="MainThread") 643 self._Thread__started = True 644 _active_limbo_lock.acquire() 645 _active[_get_ident()] = self 646 _active_limbo_lock.release() 647 648 def _set_daemon(self): 649 return False 650 651 def _exitfunc(self): 652 self._Thread__stop() 653 t = _pickSomeNonDaemonThread() 654 if t: 655 if __debug__: 656 self._note("%s: waiting for other threads", self) 657 while t: 658 t.join() 659 t = _pickSomeNonDaemonThread() 660 if __debug__: 661 self._note("%s: exiting", self) 662 self._Thread__delete() 663 664 def _pickSomeNonDaemonThread(): 665 for t in enumerate(): 666 if not t.isDaemon() and t.isAlive(): 667 return t 668 return None 669 670 671 # Dummy thread class to represent threads not started here. 672 # These aren't garbage collected when they die, nor can they be waited for. 673 # If they invoke anything in threading.py that calls currentThread(), they 674 # leave an entry in the _active dict forever after. 675 # Their purpose is to return *something* from currentThread(). 676 # They are marked as daemon threads so we won't wait for them 677 # when we exit (conform previous semantics). 678 679 class _DummyThread(Thread): 680 681 def __init__(self): 682 Thread.__init__(self, name=_newname("Dummy-%d")) 683 684 # Thread.__block consumes an OS-level locking primitive, which 685 # can never be used by a _DummyThread. Since a _DummyThread 686 # instance is immortal, that's bad, so release this resource. 687 del self._Thread__block 688 689 self._Thread__started = True 690 _active_limbo_lock.acquire() 691 _active[_get_ident()] = self 692 _active_limbo_lock.release() 693 694 def _set_daemon(self): 695 return True 696 697 def join(self, timeout=None): 698 assert False, "cannot join a dummy thread" 699 700 701 # Global API functions 702 703 def currentThread(): 704 try: 705 return _active[_get_ident()] 706 except KeyError: 707 ##print "currentThread(): no current thread for", _get_ident() 708 return _DummyThread() 709 710 def activeCount(): 711 _active_limbo_lock.acquire() 712 count = len(_active) + len(_limbo) 713 _active_limbo_lock.release() 714 return count 715 716 def enumerate(): 717 _active_limbo_lock.acquire() 718 active = _active.values() + _limbo.values() 719 _active_limbo_lock.release() 720 return active 721 722 from thread import stack_size 723 724 # Create the main thread object, 725 # and make it available for the interpreter 726 # (Py_Main) as threading._shutdown. 727 728 _shutdown = _MainThread()._exitfunc 729 730 # get thread-local implementation, either from the thread 731 # module, or from the python fallback 732 733 try: 734 from thread import _local as local 735 except ImportError: 736 from _threading_local import local 737 738 739 # Self-test code 740 741 def _test(): 742 743 class BoundedQueue: 744 745 def __init__(self, limit): 746 self.mon = RLock() 747 self.rc = Condition(self.mon) 748 self.wc = Condition(self.mon) 749 self.limit = limit 750 self.queue = deque() 751 752 def put(self, item): 753 self.mon.acquire() 754 while len(self.queue) >= self.limit: 755 self._note("put(%s): queue full", item) 756 self.wc.wait() 757 self.queue.append(item) 758 self._note("put(%s): appended, length now %d", 759 item, len(self.queue)) 760 self.rc.notify() 761 self.mon.release() 762 763 def get(self): 764 self.mon.acquire() 765 while not self.queue: 766 self._note("get(): queue empty") 767 self.rc.wait() 768 item = self.queue.popleft() 769 self._note("get(): got %s, %d left", item, len(self.queue)) 770 self.wc.notify() 771 self.mon.release() 772 return item 773 774 class ProducerThread(Thread): 775 776 def __init__(self, queue, quota): 777 Thread.__init__(self, name="Producer") 778 self.queue = queue 779 self.quota = quota 780 781 def run(self): 782 from random import random 783 counter = 0 784 while counter < self.quota: 785 counter = counter + 1 786 self.queue.put("%s.%d" % (self.getName(), counter)) 787 _sleep(random() * 0.00001) 788 789 790 class ConsumerThread(Thread): 791 792 def __init__(self, queue, count): 793 Thread.__init__(self, name="Consumer") 794 self.queue = queue 795 self.count = count 796 797 def run(self): 798 while self.count > 0: 799 item = self.queue.get() 800 print item 801 self.count = self.count - 1 802 803 NP = 3 804 QL = 4 805 NI = 5 806 807 Q = BoundedQueue(QL) 808 P = [] 809 for i in range(NP): 810 t = ProducerThread(Q, NI) 811 t.setName("Producer-%d" % (i+1)) 812 P.append(t) 813 C = ConsumerThread(Q, NI*NP) 814 for t in P: 815 t.start() 816 _sleep(0.000001) 817 C.start() 818 for t in P: 819 t.join() 820 C.join() 821 822 if __name__ == '__main__': 823 _test() 824 825 # vim: tabstop=4 expandtab shiftwidth=4