1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/lib/threading.py Tue Jun 12 01:40:10 2012 +0200
1.3 @@ -0,0 +1,825 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +Thread module emulating a subset of Java's threading model.
1.8 +
1.9 +See docs/COPYING.txt and docs/LICENCE-python.txt for copyright and licensing
1.10 +information for Python standard library modules.
1.11 +"""
1.12 +
1.13 +import sys as _sys
1.14 +
1.15 +try:
1.16 + import thread
1.17 +except ImportError:
1.18 + del _sys.modules[__name__]
1.19 + raise
1.20 +
1.21 +from time import time as _time, sleep as _sleep
1.22 +from traceback import format_exc as _format_exc
1.23 +from collections import deque
1.24 +
1.25 +# Rename some stuff so "from threading import *" is safe
1.26 +__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
1.27 + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
1.28 + 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
1.29 +
1.30 +_start_new_thread = thread.start_new_thread
1.31 +_allocate_lock = thread.allocate_lock
1.32 +_get_ident = thread.get_ident
1.33 +ThreadError = thread.error
1.34 +del thread
1.35 +
1.36 +
1.37 +# Support for profile and trace hooks
1.38 +
1.39 +_profile_hook = None
1.40 +_trace_hook = None
1.41 +
1.42 +def setprofile(func):
1.43 + global _profile_hook
1.44 + _profile_hook = func
1.45 +
1.46 +def settrace(func):
1.47 + global _trace_hook
1.48 + _trace_hook = func
1.49 +
1.50 +# Synchronization classes
1.51 +
1.52 +Lock = _allocate_lock
1.53 +
1.54 +def RLock(*args, **kwargs):
1.55 + return _RLock(*args, **kwargs)
1.56 +
1.57 +class _RLock:
1.58 +
1.59 + def __init__(self):
1.60 + self.__block = _allocate_lock()
1.61 + self.__owner = None
1.62 + self.__count = 0
1.63 +
1.64 + def __repr__(self):
1.65 + owner = self.__owner
1.66 + return "<%s(%s, %d)>" % (
1.67 + self.__class__.__name__,
1.68 + owner and owner.getName(),
1.69 + self.__count)
1.70 +
1.71 + def acquire(self, blocking=1):
1.72 + me = currentThread()
1.73 + if self.__owner is me:
1.74 + self.__count = self.__count + 1
1.75 + if __debug__:
1.76 + self._note("%s.acquire(%s): recursive success", self, blocking)
1.77 + return 1
1.78 + rc = self.__block.acquire(blocking)
1.79 + if rc:
1.80 + self.__owner = me
1.81 + self.__count = 1
1.82 + if __debug__:
1.83 + self._note("%s.acquire(%s): initial success", self, blocking)
1.84 + else:
1.85 + if __debug__:
1.86 + self._note("%s.acquire(%s): failure", self, blocking)
1.87 + return rc
1.88 +
1.89 + __enter__ = acquire
1.90 +
1.91 + def release(self):
1.92 + if self.__owner is not currentThread():
1.93 + raise RuntimeError("cannot release un-aquired lock")
1.94 + self.__count = count = self.__count - 1
1.95 + if not count:
1.96 + self.__owner = None
1.97 + self.__block.release()
1.98 + if __debug__:
1.99 + self._note("%s.release(): final release", self)
1.100 + else:
1.101 + if __debug__:
1.102 + self._note("%s.release(): non-final release", self)
1.103 +
1.104 + def __exit__(self, t, v, tb):
1.105 + self.release()
1.106 +
1.107 + # Internal methods used by condition variables
1.108 +
1.109 + def _acquire_restore(self, (count, owner)):
1.110 + self.__block.acquire()
1.111 + self.__count = count
1.112 + self.__owner = owner
1.113 + if __debug__:
1.114 + self._note("%s._acquire_restore()", self)
1.115 +
1.116 + def _release_save(self):
1.117 + if __debug__:
1.118 + self._note("%s._release_save()", self)
1.119 + count = self.__count
1.120 + self.__count = 0
1.121 + owner = self.__owner
1.122 + self.__owner = None
1.123 + self.__block.release()
1.124 + return (count, owner)
1.125 +
1.126 + def _is_owned(self):
1.127 + return self.__owner is currentThread()
1.128 +
1.129 +
1.130 +def Condition(*args, **kwargs):
1.131 + return _Condition(*args, **kwargs)
1.132 +
1.133 +class _Condition:
1.134 +
1.135 + def __init__(self, lock=None):
1.136 + if lock is None:
1.137 + lock = RLock()
1.138 + self.__lock = lock
1.139 + # Export the lock's acquire() and release() methods
1.140 + self.acquire = lock.acquire
1.141 + self.release = lock.release
1.142 + # If the lock defines _release_save() and/or _acquire_restore(),
1.143 + # these override the default implementations (which just call
1.144 + # release() and acquire() on the lock). Ditto for _is_owned().
1.145 + try:
1.146 + self._release_save = lock._release_save
1.147 + except AttributeError:
1.148 + pass
1.149 + try:
1.150 + self._acquire_restore = lock._acquire_restore
1.151 + except AttributeError:
1.152 + pass
1.153 + try:
1.154 + self._is_owned = lock._is_owned
1.155 + except AttributeError:
1.156 + pass
1.157 + self.__waiters = []
1.158 +
1.159 + def __enter__(self):
1.160 + return self.__lock.__enter__()
1.161 +
1.162 + def __exit__(self, *args):
1.163 + return self.__lock.__exit__(*args)
1.164 +
1.165 + def __repr__(self):
1.166 + return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
1.167 +
1.168 + def _release_save(self):
1.169 + self.__lock.release() # No state to save
1.170 +
1.171 + def _acquire_restore(self, x):
1.172 + self.__lock.acquire() # Ignore saved state
1.173 +
1.174 + def _is_owned(self):
1.175 + # Return True if lock is owned by currentThread.
1.176 + # This method is called only if __lock doesn't have _is_owned().
1.177 + if self.__lock.acquire(0):
1.178 + self.__lock.release()
1.179 + return False
1.180 + else:
1.181 + return True
1.182 +
1.183 + def wait(self, timeout=None):
1.184 + if not self._is_owned():
1.185 + raise RuntimeError("cannot wait on un-aquired lock")
1.186 + waiter = _allocate_lock()
1.187 + waiter.acquire()
1.188 + self.__waiters.append(waiter)
1.189 + saved_state = self._release_save()
1.190 + try: # restore state no matter what (e.g., KeyboardInterrupt)
1.191 + if timeout is None:
1.192 + waiter.acquire()
1.193 + if __debug__:
1.194 + self._note("%s.wait(): got it", self)
1.195 + else:
1.196 + # Balancing act: We can't afford a pure busy loop, so we
1.197 + # have to sleep; but if we sleep the whole timeout time,
1.198 + # we'll be unresponsive. The scheme here sleeps very
1.199 + # little at first, longer as time goes on, but never longer
1.200 + # than 20 times per second (or the timeout time remaining).
1.201 + endtime = _time() + timeout
1.202 + delay = 0.0005 # 500 us -> initial delay of 1 ms
1.203 + while True:
1.204 + gotit = waiter.acquire(0)
1.205 + if gotit:
1.206 + break
1.207 + remaining = endtime - _time()
1.208 + if remaining <= 0:
1.209 + break
1.210 + delay = min(delay * 2, remaining, .05)
1.211 + _sleep(delay)
1.212 + if not gotit:
1.213 + if __debug__:
1.214 + self._note("%s.wait(%s): timed out", self, timeout)
1.215 + try:
1.216 + self.__waiters.remove(waiter)
1.217 + except ValueError:
1.218 + pass
1.219 + else:
1.220 + if __debug__:
1.221 + self._note("%s.wait(%s): got it", self, timeout)
1.222 + finally:
1.223 + self._acquire_restore(saved_state)
1.224 +
1.225 + def notify(self, n=1):
1.226 + if not self._is_owned():
1.227 + raise RuntimeError("cannot notify on un-aquired lock")
1.228 + __waiters = self.__waiters
1.229 + waiters = __waiters[:n]
1.230 + if not waiters:
1.231 + if __debug__:
1.232 + self._note("%s.notify(): no waiters", self)
1.233 + return
1.234 + self._note("%s.notify(): notifying %d waiter%s", self, n,
1.235 + n!=1 and "s" or "")
1.236 + for waiter in waiters:
1.237 + waiter.release()
1.238 + try:
1.239 + __waiters.remove(waiter)
1.240 + except ValueError:
1.241 + pass
1.242 +
1.243 + def notifyAll(self):
1.244 + self.notify(len(self.__waiters))
1.245 +
1.246 +
1.247 +def Semaphore(*args, **kwargs):
1.248 + return _Semaphore(*args, **kwargs)
1.249 +
1.250 +class _Semaphore:
1.251 +
1.252 + # After Tim Peters' semaphore class, but not quite the same (no maximum)
1.253 +
1.254 + def __init__(self, value=1):
1.255 + if value < 0:
1.256 + raise ValueError("semaphore initial value must be >= 0")
1.257 + self.__cond = Condition(Lock())
1.258 + self.__value = value
1.259 +
1.260 + def acquire(self, blocking=1):
1.261 + rc = False
1.262 + self.__cond.acquire()
1.263 + while self.__value == 0:
1.264 + if not blocking:
1.265 + break
1.266 + if __debug__:
1.267 + self._note("%s.acquire(%s): blocked waiting, value=%s",
1.268 + self, blocking, self.__value)
1.269 + self.__cond.wait()
1.270 + else:
1.271 + self.__value = self.__value - 1
1.272 + if __debug__:
1.273 + self._note("%s.acquire: success, value=%s",
1.274 + self, self.__value)
1.275 + rc = True
1.276 + self.__cond.release()
1.277 + return rc
1.278 +
1.279 + __enter__ = acquire
1.280 +
1.281 + def release(self):
1.282 + self.__cond.acquire()
1.283 + self.__value = self.__value + 1
1.284 + if __debug__:
1.285 + self._note("%s.release: success, value=%s",
1.286 + self, self.__value)
1.287 + self.__cond.notify()
1.288 + self.__cond.release()
1.289 +
1.290 + def __exit__(self, t, v, tb):
1.291 + self.release()
1.292 +
1.293 +
1.294 +def BoundedSemaphore(*args, **kwargs):
1.295 + return _BoundedSemaphore(*args, **kwargs)
1.296 +
1.297 +class _BoundedSemaphore(_Semaphore):
1.298 + """Semaphore that checks that # releases is <= # acquires"""
1.299 + def __init__(self, value=1):
1.300 + _Semaphore.__init__(self, value)
1.301 + self._initial_value = value
1.302 +
1.303 + def release(self):
1.304 + if self._Semaphore__value >= self._initial_value:
1.305 + raise ValueError, "Semaphore released too many times"
1.306 + return _Semaphore.release(self)
1.307 +
1.308 +
1.309 +def Event(*args, **kwargs):
1.310 + return _Event(*args, **kwargs)
1.311 +
1.312 +class _Event:
1.313 +
1.314 + # After Tim Peters' event class (without is_posted())
1.315 +
1.316 + def __init__(self):
1.317 + self.__cond = Condition(Lock())
1.318 + self.__flag = False
1.319 +
1.320 + def isSet(self):
1.321 + return self.__flag
1.322 +
1.323 + def set(self):
1.324 + self.__cond.acquire()
1.325 + try:
1.326 + self.__flag = True
1.327 + self.__cond.notifyAll()
1.328 + finally:
1.329 + self.__cond.release()
1.330 +
1.331 + def clear(self):
1.332 + self.__cond.acquire()
1.333 + try:
1.334 + self.__flag = False
1.335 + finally:
1.336 + self.__cond.release()
1.337 +
1.338 + def wait(self, timeout=None):
1.339 + self.__cond.acquire()
1.340 + try:
1.341 + if not self.__flag:
1.342 + self.__cond.wait(timeout)
1.343 + finally:
1.344 + self.__cond.release()
1.345 +
1.346 +# Helper to generate new thread names
1.347 +_counter = 0
1.348 +def _newname(template="Thread-%d"):
1.349 + global _counter
1.350 + _counter = _counter + 1
1.351 + return template % _counter
1.352 +
1.353 +# Active thread administration
1.354 +_active_limbo_lock = _allocate_lock()
1.355 +_active = {} # maps thread id to Thread object
1.356 +_limbo = {}
1.357 +
1.358 +
1.359 +# Main class for threads
1.360 +
1.361 +class Thread:
1.362 +
1.363 + __initialized = False
1.364 + # Need to store a reference to sys.exc_info for printing
1.365 + # out exceptions when a thread tries to use a global var. during interp.
1.366 + # shutdown and thus raises an exception about trying to perform some
1.367 + # operation on/with a NoneType
1.368 + __exc_info = _sys.exc_info
1.369 +
1.370 + def __init__(self, group=None, target=None, name=None,
1.371 + args=(), kwargs=None):
1.372 + assert group is None, "group argument must be None for now"
1.373 + if kwargs is None:
1.374 + kwargs = {}
1.375 + self.__target = target
1.376 + self.__name = str(name or _newname())
1.377 + self.__args = args
1.378 + self.__kwargs = kwargs
1.379 + self.__daemonic = self._set_daemon()
1.380 + self.__started = False
1.381 + self.__stopped = False
1.382 + self.__block = Condition(Lock())
1.383 + self.__initialized = True
1.384 + # sys.stderr is not stored in the class like
1.385 + # sys.exc_info since it can be changed between instances
1.386 + self.__stderr = _sys.stderr
1.387 +
1.388 + def _set_daemon(self):
1.389 + # Overridden in _MainThread and _DummyThread
1.390 + return currentThread().isDaemon()
1.391 +
1.392 + def __repr__(self):
1.393 + assert self.__initialized, "Thread.__init__() was not called"
1.394 + status = "initial"
1.395 + if self.__started:
1.396 + status = "started"
1.397 + if self.__stopped:
1.398 + status = "stopped"
1.399 + if self.__daemonic:
1.400 + status = status + " daemon"
1.401 + return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
1.402 +
1.403 + def start(self):
1.404 + if not self.__initialized:
1.405 + raise RuntimeError("thread.__init__() not called")
1.406 + if self.__started:
1.407 + raise RuntimeError("thread already started")
1.408 + if __debug__:
1.409 + self._note("%s.start(): starting thread", self)
1.410 + _active_limbo_lock.acquire()
1.411 + _limbo[self] = self
1.412 + _active_limbo_lock.release()
1.413 + _start_new_thread(self.__bootstrap, ())
1.414 + self.__started = True
1.415 + _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
1.416 +
1.417 + def run(self):
1.418 + if self.__target:
1.419 + self.__target(*self.__args, **self.__kwargs)
1.420 +
1.421 + def __bootstrap(self):
1.422 + # Wrapper around the real bootstrap code that ignores
1.423 + # exceptions during interpreter cleanup. Those typically
1.424 + # happen when a daemon thread wakes up at an unfortunate
1.425 + # moment, finds the world around it destroyed, and raises some
1.426 + # random exception *** while trying to report the exception in
1.427 + # __bootstrap_inner() below ***. Those random exceptions
1.428 + # don't help anybody, and they confuse users, so we suppress
1.429 + # them. We suppress them only when it appears that the world
1.430 + # indeed has already been destroyed, so that exceptions in
1.431 + # __bootstrap_inner() during normal business hours are properly
1.432 + # reported. Also, we only suppress them for daemonic threads;
1.433 + # if a non-daemonic encounters this, something else is wrong.
1.434 + try:
1.435 + self.__bootstrap_inner()
1.436 + except:
1.437 + if self.__daemonic and _sys is None:
1.438 + return
1.439 + raise
1.440 +
1.441 + def __bootstrap_inner(self):
1.442 + try:
1.443 + self.__started = True
1.444 + _active_limbo_lock.acquire()
1.445 + _active[_get_ident()] = self
1.446 + del _limbo[self]
1.447 + _active_limbo_lock.release()
1.448 + if __debug__:
1.449 + self._note("%s.__bootstrap(): thread started", self)
1.450 +
1.451 + if _trace_hook:
1.452 + self._note("%s.__bootstrap(): registering trace hook", self)
1.453 + _sys.settrace(_trace_hook)
1.454 + if _profile_hook:
1.455 + self._note("%s.__bootstrap(): registering profile hook", self)
1.456 + _sys.setprofile(_profile_hook)
1.457 +
1.458 + try:
1.459 + self.run()
1.460 + except SystemExit:
1.461 + if __debug__:
1.462 + self._note("%s.__bootstrap(): raised SystemExit", self)
1.463 + except:
1.464 + if __debug__:
1.465 + self._note("%s.__bootstrap(): unhandled exception", self)
1.466 + # If sys.stderr is no more (most likely from interpreter
1.467 + # shutdown) use self.__stderr. Otherwise still use sys (as in
1.468 + # _sys) in case sys.stderr was redefined since the creation of
1.469 + # self.
1.470 + if _sys:
1.471 + _sys.stderr.write("Exception in thread %s:\n%s\n" %
1.472 + (self.getName(), _format_exc()))
1.473 + else:
1.474 + # Do the best job possible w/o a huge amt. of code to
1.475 + # approximate a traceback (code ideas from
1.476 + # Lib/traceback.py)
1.477 + exc_type, exc_value, exc_tb = self.__exc_info()
1.478 + try:
1.479 + print>>self.__stderr, (
1.480 + "Exception in thread " + self.getName() +
1.481 + " (most likely raised during interpreter shutdown):")
1.482 + print>>self.__stderr, (
1.483 + "Traceback (most recent call last):")
1.484 + while exc_tb:
1.485 + print>>self.__stderr, (
1.486 + ' File "%s", line %s, in %s' %
1.487 + (exc_tb.tb_frame.f_code.co_filename,
1.488 + exc_tb.tb_lineno,
1.489 + exc_tb.tb_frame.f_code.co_name))
1.490 + exc_tb = exc_tb.tb_next
1.491 + print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
1.492 + # Make sure that exc_tb gets deleted since it is a memory
1.493 + # hog; deleting everything else is just for thoroughness
1.494 + finally:
1.495 + del exc_type, exc_value, exc_tb
1.496 + else:
1.497 + if __debug__:
1.498 + self._note("%s.__bootstrap(): normal return", self)
1.499 + finally:
1.500 + _active_limbo_lock.acquire()
1.501 + try:
1.502 + self.__stop()
1.503 + try:
1.504 + # We don't call self.__delete() because it also
1.505 + # grabs _active_limbo_lock.
1.506 + del _active[_get_ident()]
1.507 + except:
1.508 + pass
1.509 + finally:
1.510 + _active_limbo_lock.release()
1.511 +
1.512 + def __stop(self):
1.513 + self.__block.acquire()
1.514 + self.__stopped = True
1.515 + self.__block.notifyAll()
1.516 + self.__block.release()
1.517 +
1.518 + def __delete(self):
1.519 + "Remove current thread from the dict of currently running threads."
1.520 +
1.521 + # Notes about running with dummy_thread:
1.522 + #
1.523 + # Must take care to not raise an exception if dummy_thread is being
1.524 + # used (and thus this module is being used as an instance of
1.525 + # dummy_threading). dummy_thread.get_ident() always returns -1 since
1.526 + # there is only one thread if dummy_thread is being used. Thus
1.527 + # len(_active) is always <= 1 here, and any Thread instance created
1.528 + # overwrites the (if any) thread currently registered in _active.
1.529 + #
1.530 + # An instance of _MainThread is always created by 'threading'. This
1.531 + # gets overwritten the instant an instance of Thread is created; both
1.532 + # threads return -1 from dummy_thread.get_ident() and thus have the
1.533 + # same key in the dict. So when the _MainThread instance created by
1.534 + # 'threading' tries to clean itself up when atexit calls this method
1.535 + # it gets a KeyError if another Thread instance was created.
1.536 + #
1.537 + # This all means that KeyError from trying to delete something from
1.538 + # _active if dummy_threading is being used is a red herring. But
1.539 + # since it isn't if dummy_threading is *not* being used then don't
1.540 + # hide the exception.
1.541 +
1.542 + _active_limbo_lock.acquire()
1.543 + try:
1.544 + try:
1.545 + del _active[_get_ident()]
1.546 + except KeyError:
1.547 + if 'dummy_threading' not in _sys.modules:
1.548 + raise
1.549 + finally:
1.550 + _active_limbo_lock.release()
1.551 +
1.552 + def join(self, timeout=None):
1.553 + if not self.__initialized:
1.554 + raise RuntimeError("Thread.__init__() not called")
1.555 + if not self.__started:
1.556 + raise RuntimeError("cannot join thread before it is started")
1.557 + if self is currentThread():
1.558 + raise RuntimeError("cannot join current thread")
1.559 +
1.560 + if __debug__:
1.561 + if not self.__stopped:
1.562 + self._note("%s.join(): waiting until thread stops", self)
1.563 + self.__block.acquire()
1.564 + try:
1.565 + if timeout is None:
1.566 + while not self.__stopped:
1.567 + self.__block.wait()
1.568 + if __debug__:
1.569 + self._note("%s.join(): thread stopped", self)
1.570 + else:
1.571 + deadline = _time() + timeout
1.572 + while not self.__stopped:
1.573 + delay = deadline - _time()
1.574 + if delay <= 0:
1.575 + if __debug__:
1.576 + self._note("%s.join(): timed out", self)
1.577 + break
1.578 + self.__block.wait(delay)
1.579 + else:
1.580 + if __debug__:
1.581 + self._note("%s.join(): thread stopped", self)
1.582 + finally:
1.583 + self.__block.release()
1.584 +
1.585 + def getName(self):
1.586 + assert self.__initialized, "Thread.__init__() not called"
1.587 + return self.__name
1.588 +
1.589 + def setName(self, name):
1.590 + assert self.__initialized, "Thread.__init__() not called"
1.591 + self.__name = str(name)
1.592 +
1.593 + def isAlive(self):
1.594 + assert self.__initialized, "Thread.__init__() not called"
1.595 + return self.__started and not self.__stopped
1.596 +
1.597 + def isDaemon(self):
1.598 + assert self.__initialized, "Thread.__init__() not called"
1.599 + return self.__daemonic
1.600 +
1.601 + def setDaemon(self, daemonic):
1.602 + if not self.__initialized:
1.603 + raise RuntimeError("Thread.__init__() not called")
1.604 + if self.__started:
1.605 + raise RuntimeError("cannot set daemon status of active thread");
1.606 + self.__daemonic = daemonic
1.607 +
1.608 +# The timer class was contributed by Itamar Shtull-Trauring
1.609 +
1.610 +def Timer(*args, **kwargs):
1.611 + return _Timer(*args, **kwargs)
1.612 +
1.613 +class _Timer(Thread):
1.614 + """Call a function after a specified number of seconds:
1.615 +
1.616 + t = Timer(30.0, f, args=[], kwargs={})
1.617 + t.start()
1.618 + t.cancel() # stop the timer's action if it's still waiting
1.619 + """
1.620 +
1.621 + def __init__(self, interval, function, args=[], kwargs={}):
1.622 + Thread.__init__(self)
1.623 + self.interval = interval
1.624 + self.function = function
1.625 + self.args = args
1.626 + self.kwargs = kwargs
1.627 + self.finished = Event()
1.628 +
1.629 + def cancel(self):
1.630 + """Stop the timer if it hasn't finished yet"""
1.631 + self.finished.set()
1.632 +
1.633 + def run(self):
1.634 + self.finished.wait(self.interval)
1.635 + if not self.finished.isSet():
1.636 + self.function(*self.args, **self.kwargs)
1.637 + self.finished.set()
1.638 +
1.639 +# Special thread class to represent the main thread
1.640 +# This is garbage collected through an exit handler
1.641 +
1.642 +class _MainThread(Thread):
1.643 +
1.644 + def __init__(self):
1.645 + Thread.__init__(self, name="MainThread")
1.646 + self._Thread__started = True
1.647 + _active_limbo_lock.acquire()
1.648 + _active[_get_ident()] = self
1.649 + _active_limbo_lock.release()
1.650 +
1.651 + def _set_daemon(self):
1.652 + return False
1.653 +
1.654 + def _exitfunc(self):
1.655 + self._Thread__stop()
1.656 + t = _pickSomeNonDaemonThread()
1.657 + if t:
1.658 + if __debug__:
1.659 + self._note("%s: waiting for other threads", self)
1.660 + while t:
1.661 + t.join()
1.662 + t = _pickSomeNonDaemonThread()
1.663 + if __debug__:
1.664 + self._note("%s: exiting", self)
1.665 + self._Thread__delete()
1.666 +
1.667 +def _pickSomeNonDaemonThread():
1.668 + for t in enumerate():
1.669 + if not t.isDaemon() and t.isAlive():
1.670 + return t
1.671 + return None
1.672 +
1.673 +
1.674 +# Dummy thread class to represent threads not started here.
1.675 +# These aren't garbage collected when they die, nor can they be waited for.
1.676 +# If they invoke anything in threading.py that calls currentThread(), they
1.677 +# leave an entry in the _active dict forever after.
1.678 +# Their purpose is to return *something* from currentThread().
1.679 +# They are marked as daemon threads so we won't wait for them
1.680 +# when we exit (conform previous semantics).
1.681 +
1.682 +class _DummyThread(Thread):
1.683 +
1.684 + def __init__(self):
1.685 + Thread.__init__(self, name=_newname("Dummy-%d"))
1.686 +
1.687 + # Thread.__block consumes an OS-level locking primitive, which
1.688 + # can never be used by a _DummyThread. Since a _DummyThread
1.689 + # instance is immortal, that's bad, so release this resource.
1.690 + del self._Thread__block
1.691 +
1.692 + self._Thread__started = True
1.693 + _active_limbo_lock.acquire()
1.694 + _active[_get_ident()] = self
1.695 + _active_limbo_lock.release()
1.696 +
1.697 + def _set_daemon(self):
1.698 + return True
1.699 +
1.700 + def join(self, timeout=None):
1.701 + assert False, "cannot join a dummy thread"
1.702 +
1.703 +
1.704 +# Global API functions
1.705 +
1.706 +def currentThread():
1.707 + try:
1.708 + return _active[_get_ident()]
1.709 + except KeyError:
1.710 + ##print "currentThread(): no current thread for", _get_ident()
1.711 + return _DummyThread()
1.712 +
1.713 +def activeCount():
1.714 + _active_limbo_lock.acquire()
1.715 + count = len(_active) + len(_limbo)
1.716 + _active_limbo_lock.release()
1.717 + return count
1.718 +
1.719 +def enumerate():
1.720 + _active_limbo_lock.acquire()
1.721 + active = _active.values() + _limbo.values()
1.722 + _active_limbo_lock.release()
1.723 + return active
1.724 +
1.725 +from thread import stack_size
1.726 +
1.727 +# Create the main thread object,
1.728 +# and make it available for the interpreter
1.729 +# (Py_Main) as threading._shutdown.
1.730 +
1.731 +_shutdown = _MainThread()._exitfunc
1.732 +
1.733 +# get thread-local implementation, either from the thread
1.734 +# module, or from the python fallback
1.735 +
1.736 +try:
1.737 + from thread import _local as local
1.738 +except ImportError:
1.739 + from _threading_local import local
1.740 +
1.741 +
1.742 +# Self-test code
1.743 +
1.744 +def _test():
1.745 +
1.746 + class BoundedQueue:
1.747 +
1.748 + def __init__(self, limit):
1.749 + self.mon = RLock()
1.750 + self.rc = Condition(self.mon)
1.751 + self.wc = Condition(self.mon)
1.752 + self.limit = limit
1.753 + self.queue = deque()
1.754 +
1.755 + def put(self, item):
1.756 + self.mon.acquire()
1.757 + while len(self.queue) >= self.limit:
1.758 + self._note("put(%s): queue full", item)
1.759 + self.wc.wait()
1.760 + self.queue.append(item)
1.761 + self._note("put(%s): appended, length now %d",
1.762 + item, len(self.queue))
1.763 + self.rc.notify()
1.764 + self.mon.release()
1.765 +
1.766 + def get(self):
1.767 + self.mon.acquire()
1.768 + while not self.queue:
1.769 + self._note("get(): queue empty")
1.770 + self.rc.wait()
1.771 + item = self.queue.popleft()
1.772 + self._note("get(): got %s, %d left", item, len(self.queue))
1.773 + self.wc.notify()
1.774 + self.mon.release()
1.775 + return item
1.776 +
1.777 + class ProducerThread(Thread):
1.778 +
1.779 + def __init__(self, queue, quota):
1.780 + Thread.__init__(self, name="Producer")
1.781 + self.queue = queue
1.782 + self.quota = quota
1.783 +
1.784 + def run(self):
1.785 + from random import random
1.786 + counter = 0
1.787 + while counter < self.quota:
1.788 + counter = counter + 1
1.789 + self.queue.put("%s.%d" % (self.getName(), counter))
1.790 + _sleep(random() * 0.00001)
1.791 +
1.792 +
1.793 + class ConsumerThread(Thread):
1.794 +
1.795 + def __init__(self, queue, count):
1.796 + Thread.__init__(self, name="Consumer")
1.797 + self.queue = queue
1.798 + self.count = count
1.799 +
1.800 + def run(self):
1.801 + while self.count > 0:
1.802 + item = self.queue.get()
1.803 + print item
1.804 + self.count = self.count - 1
1.805 +
1.806 + NP = 3
1.807 + QL = 4
1.808 + NI = 5
1.809 +
1.810 + Q = BoundedQueue(QL)
1.811 + P = []
1.812 + for i in range(NP):
1.813 + t = ProducerThread(Q, NI)
1.814 + t.setName("Producer-%d" % (i+1))
1.815 + P.append(t)
1.816 + C = ConsumerThread(Q, NI*NP)
1.817 + for t in P:
1.818 + t.start()
1.819 + _sleep(0.000001)
1.820 + C.start()
1.821 + for t in P:
1.822 + t.join()
1.823 + C.join()
1.824 +
1.825 +if __name__ == '__main__':
1.826 + _test()
1.827 +
1.828 +# vim: tabstop=4 expandtab shiftwidth=4