paul@543 | 1 | #!/usr/bin/env python |
paul@543 | 2 | |
paul@543 | 3 | """ |
paul@543 | 4 | Thread module emulating a subset of Java's threading model. |
paul@543 | 5 | |
paul@543 | 6 | See docs/COPYING.txt and docs/LICENCE-python.txt for copyright and licensing |
paul@543 | 7 | information for Python standard library modules. |
paul@543 | 8 | """ |
paul@543 | 9 | |
paul@543 | 10 | import sys as _sys |
paul@543 | 11 | |
paul@543 | 12 | try: |
paul@543 | 13 | import thread |
paul@543 | 14 | except ImportError: |
paul@543 | 15 | del _sys.modules[__name__] |
paul@543 | 16 | raise |
paul@543 | 17 | |
paul@543 | 18 | from time import time as _time, sleep as _sleep |
paul@543 | 19 | from traceback import format_exc as _format_exc |
paul@543 | 20 | from collections import deque |
paul@543 | 21 | |
paul@543 | 22 | # Rename some stuff so "from threading import *" is safe |
paul@543 | 23 | __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', |
paul@543 | 24 | 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', |
paul@543 | 25 | 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] |
paul@543 | 26 | |
paul@543 | 27 | _start_new_thread = thread.start_new_thread |
paul@543 | 28 | _allocate_lock = thread.allocate_lock |
paul@543 | 29 | _get_ident = thread.get_ident |
paul@543 | 30 | ThreadError = thread.error |
paul@543 | 31 | del thread |
paul@543 | 32 | |
paul@543 | 33 | |
paul@543 | 34 | # Support for profile and trace hooks |
paul@543 | 35 | |
paul@543 | 36 | _profile_hook = None |
paul@543 | 37 | _trace_hook = None |
paul@543 | 38 | |
paul@543 | 39 | def setprofile(func): |
paul@543 | 40 | global _profile_hook |
paul@543 | 41 | _profile_hook = func |
paul@543 | 42 | |
paul@543 | 43 | def settrace(func): |
paul@543 | 44 | global _trace_hook |
paul@543 | 45 | _trace_hook = func |
paul@543 | 46 | |
paul@543 | 47 | # Synchronization classes |
paul@543 | 48 | |
paul@543 | 49 | Lock = _allocate_lock |
paul@543 | 50 | |
paul@543 | 51 | def RLock(*args, **kwargs): |
paul@543 | 52 | return _RLock(*args, **kwargs) |
paul@543 | 53 | |
paul@543 | 54 | class _RLock: |
paul@543 | 55 | |
paul@543 | 56 | def __init__(self): |
paul@543 | 57 | self.__block = _allocate_lock() |
paul@543 | 58 | self.__owner = None |
paul@543 | 59 | self.__count = 0 |
paul@543 | 60 | |
paul@543 | 61 | def __repr__(self): |
paul@543 | 62 | owner = self.__owner |
paul@543 | 63 | return "<%s(%s, %d)>" % ( |
paul@543 | 64 | self.__class__.__name__, |
paul@543 | 65 | owner and owner.getName(), |
paul@543 | 66 | self.__count) |
paul@543 | 67 | |
paul@543 | 68 | def acquire(self, blocking=1): |
paul@543 | 69 | me = currentThread() |
paul@543 | 70 | if self.__owner is me: |
paul@543 | 71 | self.__count = self.__count + 1 |
paul@543 | 72 | if __debug__: |
paul@543 | 73 | self._note("%s.acquire(%s): recursive success", self, blocking) |
paul@543 | 74 | return 1 |
paul@543 | 75 | rc = self.__block.acquire(blocking) |
paul@543 | 76 | if rc: |
paul@543 | 77 | self.__owner = me |
paul@543 | 78 | self.__count = 1 |
paul@543 | 79 | if __debug__: |
paul@543 | 80 | self._note("%s.acquire(%s): initial success", self, blocking) |
paul@543 | 81 | else: |
paul@543 | 82 | if __debug__: |
paul@543 | 83 | self._note("%s.acquire(%s): failure", self, blocking) |
paul@543 | 84 | return rc |
paul@543 | 85 | |
paul@543 | 86 | __enter__ = acquire |
paul@543 | 87 | |
paul@543 | 88 | def release(self): |
paul@543 | 89 | if self.__owner is not currentThread(): |
paul@543 | 90 | raise RuntimeError("cannot release un-aquired lock") |
paul@543 | 91 | self.__count = count = self.__count - 1 |
paul@543 | 92 | if not count: |
paul@543 | 93 | self.__owner = None |
paul@543 | 94 | self.__block.release() |
paul@543 | 95 | if __debug__: |
paul@543 | 96 | self._note("%s.release(): final release", self) |
paul@543 | 97 | else: |
paul@543 | 98 | if __debug__: |
paul@543 | 99 | self._note("%s.release(): non-final release", self) |
paul@543 | 100 | |
paul@543 | 101 | def __exit__(self, t, v, tb): |
paul@543 | 102 | self.release() |
paul@543 | 103 | |
paul@543 | 104 | # Internal methods used by condition variables |
paul@543 | 105 | |
paul@543 | 106 | def _acquire_restore(self, (count, owner)): |
paul@543 | 107 | self.__block.acquire() |
paul@543 | 108 | self.__count = count |
paul@543 | 109 | self.__owner = owner |
paul@543 | 110 | if __debug__: |
paul@543 | 111 | self._note("%s._acquire_restore()", self) |
paul@543 | 112 | |
paul@543 | 113 | def _release_save(self): |
paul@543 | 114 | if __debug__: |
paul@543 | 115 | self._note("%s._release_save()", self) |
paul@543 | 116 | count = self.__count |
paul@543 | 117 | self.__count = 0 |
paul@543 | 118 | owner = self.__owner |
paul@543 | 119 | self.__owner = None |
paul@543 | 120 | self.__block.release() |
paul@543 | 121 | return (count, owner) |
paul@543 | 122 | |
paul@543 | 123 | def _is_owned(self): |
paul@543 | 124 | return self.__owner is currentThread() |
paul@543 | 125 | |
paul@543 | 126 | |
paul@543 | 127 | def Condition(*args, **kwargs): |
paul@543 | 128 | return _Condition(*args, **kwargs) |
paul@543 | 129 | |
paul@543 | 130 | class _Condition: |
paul@543 | 131 | |
paul@543 | 132 | def __init__(self, lock=None): |
paul@543 | 133 | if lock is None: |
paul@543 | 134 | lock = RLock() |
paul@543 | 135 | self.__lock = lock |
paul@543 | 136 | # Export the lock's acquire() and release() methods |
paul@543 | 137 | self.acquire = lock.acquire |
paul@543 | 138 | self.release = lock.release |
paul@543 | 139 | # If the lock defines _release_save() and/or _acquire_restore(), |
paul@543 | 140 | # these override the default implementations (which just call |
paul@543 | 141 | # release() and acquire() on the lock). Ditto for _is_owned(). |
paul@543 | 142 | try: |
paul@543 | 143 | self._release_save = lock._release_save |
paul@543 | 144 | except AttributeError: |
paul@543 | 145 | pass |
paul@543 | 146 | try: |
paul@543 | 147 | self._acquire_restore = lock._acquire_restore |
paul@543 | 148 | except AttributeError: |
paul@543 | 149 | pass |
paul@543 | 150 | try: |
paul@543 | 151 | self._is_owned = lock._is_owned |
paul@543 | 152 | except AttributeError: |
paul@543 | 153 | pass |
paul@543 | 154 | self.__waiters = [] |
paul@543 | 155 | |
paul@543 | 156 | def __enter__(self): |
paul@543 | 157 | return self.__lock.__enter__() |
paul@543 | 158 | |
paul@543 | 159 | def __exit__(self, *args): |
paul@543 | 160 | return self.__lock.__exit__(*args) |
paul@543 | 161 | |
paul@543 | 162 | def __repr__(self): |
paul@543 | 163 | return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) |
paul@543 | 164 | |
paul@543 | 165 | def _release_save(self): |
paul@543 | 166 | self.__lock.release() # No state to save |
paul@543 | 167 | |
paul@543 | 168 | def _acquire_restore(self, x): |
paul@543 | 169 | self.__lock.acquire() # Ignore saved state |
paul@543 | 170 | |
paul@543 | 171 | def _is_owned(self): |
paul@543 | 172 | # Return True if lock is owned by currentThread. |
paul@543 | 173 | # This method is called only if __lock doesn't have _is_owned(). |
paul@543 | 174 | if self.__lock.acquire(0): |
paul@543 | 175 | self.__lock.release() |
paul@543 | 176 | return False |
paul@543 | 177 | else: |
paul@543 | 178 | return True |
paul@543 | 179 | |
paul@543 | 180 | def wait(self, timeout=None): |
paul@543 | 181 | if not self._is_owned(): |
paul@543 | 182 | raise RuntimeError("cannot wait on un-aquired lock") |
paul@543 | 183 | waiter = _allocate_lock() |
paul@543 | 184 | waiter.acquire() |
paul@543 | 185 | self.__waiters.append(waiter) |
paul@543 | 186 | saved_state = self._release_save() |
paul@543 | 187 | try: # restore state no matter what (e.g., KeyboardInterrupt) |
paul@543 | 188 | if timeout is None: |
paul@543 | 189 | waiter.acquire() |
paul@543 | 190 | if __debug__: |
paul@543 | 191 | self._note("%s.wait(): got it", self) |
paul@543 | 192 | else: |
paul@543 | 193 | # Balancing act: We can't afford a pure busy loop, so we |
paul@543 | 194 | # have to sleep; but if we sleep the whole timeout time, |
paul@543 | 195 | # we'll be unresponsive. The scheme here sleeps very |
paul@543 | 196 | # little at first, longer as time goes on, but never longer |
paul@543 | 197 | # than 20 times per second (or the timeout time remaining). |
paul@543 | 198 | endtime = _time() + timeout |
paul@543 | 199 | delay = 0.0005 # 500 us -> initial delay of 1 ms |
paul@543 | 200 | while True: |
paul@543 | 201 | gotit = waiter.acquire(0) |
paul@543 | 202 | if gotit: |
paul@543 | 203 | break |
paul@543 | 204 | remaining = endtime - _time() |
paul@543 | 205 | if remaining <= 0: |
paul@543 | 206 | break |
paul@543 | 207 | delay = min(delay * 2, remaining, .05) |
paul@543 | 208 | _sleep(delay) |
paul@543 | 209 | if not gotit: |
paul@543 | 210 | if __debug__: |
paul@543 | 211 | self._note("%s.wait(%s): timed out", self, timeout) |
paul@543 | 212 | try: |
paul@543 | 213 | self.__waiters.remove(waiter) |
paul@543 | 214 | except ValueError: |
paul@543 | 215 | pass |
paul@543 | 216 | else: |
paul@543 | 217 | if __debug__: |
paul@543 | 218 | self._note("%s.wait(%s): got it", self, timeout) |
paul@543 | 219 | finally: |
paul@543 | 220 | self._acquire_restore(saved_state) |
paul@543 | 221 | |
paul@543 | 222 | def notify(self, n=1): |
paul@543 | 223 | if not self._is_owned(): |
paul@543 | 224 | raise RuntimeError("cannot notify on un-aquired lock") |
paul@543 | 225 | __waiters = self.__waiters |
paul@543 | 226 | waiters = __waiters[:n] |
paul@543 | 227 | if not waiters: |
paul@543 | 228 | if __debug__: |
paul@543 | 229 | self._note("%s.notify(): no waiters", self) |
paul@543 | 230 | return |
paul@543 | 231 | self._note("%s.notify(): notifying %d waiter%s", self, n, |
paul@543 | 232 | n!=1 and "s" or "") |
paul@543 | 233 | for waiter in waiters: |
paul@543 | 234 | waiter.release() |
paul@543 | 235 | try: |
paul@543 | 236 | __waiters.remove(waiter) |
paul@543 | 237 | except ValueError: |
paul@543 | 238 | pass |
paul@543 | 239 | |
paul@543 | 240 | def notifyAll(self): |
paul@543 | 241 | self.notify(len(self.__waiters)) |
paul@543 | 242 | |
paul@543 | 243 | |
paul@543 | 244 | def Semaphore(*args, **kwargs): |
paul@543 | 245 | return _Semaphore(*args, **kwargs) |
paul@543 | 246 | |
paul@543 | 247 | class _Semaphore: |
paul@543 | 248 | |
paul@543 | 249 | # After Tim Peters' semaphore class, but not quite the same (no maximum) |
paul@543 | 250 | |
paul@543 | 251 | def __init__(self, value=1): |
paul@543 | 252 | if value < 0: |
paul@543 | 253 | raise ValueError("semaphore initial value must be >= 0") |
paul@543 | 254 | self.__cond = Condition(Lock()) |
paul@543 | 255 | self.__value = value |
paul@543 | 256 | |
paul@543 | 257 | def acquire(self, blocking=1): |
paul@543 | 258 | rc = False |
paul@543 | 259 | self.__cond.acquire() |
paul@543 | 260 | while self.__value == 0: |
paul@543 | 261 | if not blocking: |
paul@543 | 262 | break |
paul@543 | 263 | if __debug__: |
paul@543 | 264 | self._note("%s.acquire(%s): blocked waiting, value=%s", |
paul@543 | 265 | self, blocking, self.__value) |
paul@543 | 266 | self.__cond.wait() |
paul@543 | 267 | else: |
paul@543 | 268 | self.__value = self.__value - 1 |
paul@543 | 269 | if __debug__: |
paul@543 | 270 | self._note("%s.acquire: success, value=%s", |
paul@543 | 271 | self, self.__value) |
paul@543 | 272 | rc = True |
paul@543 | 273 | self.__cond.release() |
paul@543 | 274 | return rc |
paul@543 | 275 | |
paul@543 | 276 | __enter__ = acquire |
paul@543 | 277 | |
paul@543 | 278 | def release(self): |
paul@543 | 279 | self.__cond.acquire() |
paul@543 | 280 | self.__value = self.__value + 1 |
paul@543 | 281 | if __debug__: |
paul@543 | 282 | self._note("%s.release: success, value=%s", |
paul@543 | 283 | self, self.__value) |
paul@543 | 284 | self.__cond.notify() |
paul@543 | 285 | self.__cond.release() |
paul@543 | 286 | |
paul@543 | 287 | def __exit__(self, t, v, tb): |
paul@543 | 288 | self.release() |
paul@543 | 289 | |
paul@543 | 290 | |
paul@543 | 291 | def BoundedSemaphore(*args, **kwargs): |
paul@543 | 292 | return _BoundedSemaphore(*args, **kwargs) |
paul@543 | 293 | |
paul@543 | 294 | class _BoundedSemaphore(_Semaphore): |
paul@543 | 295 | """Semaphore that checks that # releases is <= # acquires""" |
paul@543 | 296 | def __init__(self, value=1): |
paul@543 | 297 | _Semaphore.__init__(self, value) |
paul@543 | 298 | self._initial_value = value |
paul@543 | 299 | |
paul@543 | 300 | def release(self): |
paul@543 | 301 | if self._Semaphore__value >= self._initial_value: |
paul@543 | 302 | raise ValueError, "Semaphore released too many times" |
paul@543 | 303 | return _Semaphore.release(self) |
paul@543 | 304 | |
paul@543 | 305 | |
paul@543 | 306 | def Event(*args, **kwargs): |
paul@543 | 307 | return _Event(*args, **kwargs) |
paul@543 | 308 | |
paul@543 | 309 | class _Event: |
paul@543 | 310 | |
paul@543 | 311 | # After Tim Peters' event class (without is_posted()) |
paul@543 | 312 | |
paul@543 | 313 | def __init__(self): |
paul@543 | 314 | self.__cond = Condition(Lock()) |
paul@543 | 315 | self.__flag = False |
paul@543 | 316 | |
paul@543 | 317 | def isSet(self): |
paul@543 | 318 | return self.__flag |
paul@543 | 319 | |
paul@543 | 320 | def set(self): |
paul@543 | 321 | self.__cond.acquire() |
paul@543 | 322 | try: |
paul@543 | 323 | self.__flag = True |
paul@543 | 324 | self.__cond.notifyAll() |
paul@543 | 325 | finally: |
paul@543 | 326 | self.__cond.release() |
paul@543 | 327 | |
paul@543 | 328 | def clear(self): |
paul@543 | 329 | self.__cond.acquire() |
paul@543 | 330 | try: |
paul@543 | 331 | self.__flag = False |
paul@543 | 332 | finally: |
paul@543 | 333 | self.__cond.release() |
paul@543 | 334 | |
paul@543 | 335 | def wait(self, timeout=None): |
paul@543 | 336 | self.__cond.acquire() |
paul@543 | 337 | try: |
paul@543 | 338 | if not self.__flag: |
paul@543 | 339 | self.__cond.wait(timeout) |
paul@543 | 340 | finally: |
paul@543 | 341 | self.__cond.release() |
paul@543 | 342 | |
paul@543 | 343 | # Helper to generate new thread names |
paul@543 | 344 | _counter = 0 |
paul@543 | 345 | def _newname(template="Thread-%d"): |
paul@543 | 346 | global _counter |
paul@543 | 347 | _counter = _counter + 1 |
paul@543 | 348 | return template % _counter |
paul@543 | 349 | |
paul@543 | 350 | # Active thread administration |
paul@543 | 351 | _active_limbo_lock = _allocate_lock() |
paul@543 | 352 | _active = {} # maps thread id to Thread object |
paul@543 | 353 | _limbo = {} |
paul@543 | 354 | |
paul@543 | 355 | |
paul@543 | 356 | # Main class for threads |
paul@543 | 357 | |
paul@543 | 358 | class Thread: |
paul@543 | 359 | |
paul@543 | 360 | __initialized = False |
paul@543 | 361 | # Need to store a reference to sys.exc_info for printing |
paul@543 | 362 | # out exceptions when a thread tries to use a global var. during interp. |
paul@543 | 363 | # shutdown and thus raises an exception about trying to perform some |
paul@543 | 364 | # operation on/with a NoneType |
paul@543 | 365 | __exc_info = _sys.exc_info |
paul@543 | 366 | |
paul@543 | 367 | def __init__(self, group=None, target=None, name=None, |
paul@543 | 368 | args=(), kwargs=None): |
paul@543 | 369 | assert group is None, "group argument must be None for now" |
paul@543 | 370 | if kwargs is None: |
paul@543 | 371 | kwargs = {} |
paul@543 | 372 | self.__target = target |
paul@543 | 373 | self.__name = str(name or _newname()) |
paul@543 | 374 | self.__args = args |
paul@543 | 375 | self.__kwargs = kwargs |
paul@543 | 376 | self.__daemonic = self._set_daemon() |
paul@543 | 377 | self.__started = False |
paul@543 | 378 | self.__stopped = False |
paul@543 | 379 | self.__block = Condition(Lock()) |
paul@543 | 380 | self.__initialized = True |
paul@543 | 381 | # sys.stderr is not stored in the class like |
paul@543 | 382 | # sys.exc_info since it can be changed between instances |
paul@543 | 383 | self.__stderr = _sys.stderr |
paul@543 | 384 | |
paul@543 | 385 | def _set_daemon(self): |
paul@543 | 386 | # Overridden in _MainThread and _DummyThread |
paul@543 | 387 | return currentThread().isDaemon() |
paul@543 | 388 | |
paul@543 | 389 | def __repr__(self): |
paul@543 | 390 | assert self.__initialized, "Thread.__init__() was not called" |
paul@543 | 391 | status = "initial" |
paul@543 | 392 | if self.__started: |
paul@543 | 393 | status = "started" |
paul@543 | 394 | if self.__stopped: |
paul@543 | 395 | status = "stopped" |
paul@543 | 396 | if self.__daemonic: |
paul@543 | 397 | status = status + " daemon" |
paul@543 | 398 | return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) |
paul@543 | 399 | |
paul@543 | 400 | def start(self): |
paul@543 | 401 | if not self.__initialized: |
paul@543 | 402 | raise RuntimeError("thread.__init__() not called") |
paul@543 | 403 | if self.__started: |
paul@543 | 404 | raise RuntimeError("thread already started") |
paul@543 | 405 | if __debug__: |
paul@543 | 406 | self._note("%s.start(): starting thread", self) |
paul@543 | 407 | _active_limbo_lock.acquire() |
paul@543 | 408 | _limbo[self] = self |
paul@543 | 409 | _active_limbo_lock.release() |
paul@543 | 410 | _start_new_thread(self.__bootstrap, ()) |
paul@543 | 411 | self.__started = True |
paul@543 | 412 | _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) |
paul@543 | 413 | |
paul@543 | 414 | def run(self): |
paul@543 | 415 | if self.__target: |
paul@543 | 416 | self.__target(*self.__args, **self.__kwargs) |
paul@543 | 417 | |
paul@543 | 418 | def __bootstrap(self): |
paul@543 | 419 | # Wrapper around the real bootstrap code that ignores |
paul@543 | 420 | # exceptions during interpreter cleanup. Those typically |
paul@543 | 421 | # happen when a daemon thread wakes up at an unfortunate |
paul@543 | 422 | # moment, finds the world around it destroyed, and raises some |
paul@543 | 423 | # random exception *** while trying to report the exception in |
paul@543 | 424 | # __bootstrap_inner() below ***. Those random exceptions |
paul@543 | 425 | # don't help anybody, and they confuse users, so we suppress |
paul@543 | 426 | # them. We suppress them only when it appears that the world |
paul@543 | 427 | # indeed has already been destroyed, so that exceptions in |
paul@543 | 428 | # __bootstrap_inner() during normal business hours are properly |
paul@543 | 429 | # reported. Also, we only suppress them for daemonic threads; |
paul@543 | 430 | # if a non-daemonic encounters this, something else is wrong. |
paul@543 | 431 | try: |
paul@543 | 432 | self.__bootstrap_inner() |
paul@543 | 433 | except: |
paul@543 | 434 | if self.__daemonic and _sys is None: |
paul@543 | 435 | return |
paul@543 | 436 | raise |
paul@543 | 437 | |
paul@543 | 438 | def __bootstrap_inner(self): |
paul@543 | 439 | try: |
paul@543 | 440 | self.__started = True |
paul@543 | 441 | _active_limbo_lock.acquire() |
paul@543 | 442 | _active[_get_ident()] = self |
paul@543 | 443 | del _limbo[self] |
paul@543 | 444 | _active_limbo_lock.release() |
paul@543 | 445 | if __debug__: |
paul@543 | 446 | self._note("%s.__bootstrap(): thread started", self) |
paul@543 | 447 | |
paul@543 | 448 | if _trace_hook: |
paul@543 | 449 | self._note("%s.__bootstrap(): registering trace hook", self) |
paul@543 | 450 | _sys.settrace(_trace_hook) |
paul@543 | 451 | if _profile_hook: |
paul@543 | 452 | self._note("%s.__bootstrap(): registering profile hook", self) |
paul@543 | 453 | _sys.setprofile(_profile_hook) |
paul@543 | 454 | |
paul@543 | 455 | try: |
paul@543 | 456 | self.run() |
paul@543 | 457 | except SystemExit: |
paul@543 | 458 | if __debug__: |
paul@543 | 459 | self._note("%s.__bootstrap(): raised SystemExit", self) |
paul@543 | 460 | except: |
paul@543 | 461 | if __debug__: |
paul@543 | 462 | self._note("%s.__bootstrap(): unhandled exception", self) |
paul@543 | 463 | # If sys.stderr is no more (most likely from interpreter |
paul@543 | 464 | # shutdown) use self.__stderr. Otherwise still use sys (as in |
paul@543 | 465 | # _sys) in case sys.stderr was redefined since the creation of |
paul@543 | 466 | # self. |
paul@543 | 467 | if _sys: |
paul@543 | 468 | _sys.stderr.write("Exception in thread %s:\n%s\n" % |
paul@543 | 469 | (self.getName(), _format_exc())) |
paul@543 | 470 | else: |
paul@543 | 471 | # Do the best job possible w/o a huge amt. of code to |
paul@543 | 472 | # approximate a traceback (code ideas from |
paul@543 | 473 | # Lib/traceback.py) |
paul@543 | 474 | exc_type, exc_value, exc_tb = self.__exc_info() |
paul@543 | 475 | try: |
paul@543 | 476 | print>>self.__stderr, ( |
paul@543 | 477 | "Exception in thread " + self.getName() + |
paul@543 | 478 | " (most likely raised during interpreter shutdown):") |
paul@543 | 479 | print>>self.__stderr, ( |
paul@543 | 480 | "Traceback (most recent call last):") |
paul@543 | 481 | while exc_tb: |
paul@543 | 482 | print>>self.__stderr, ( |
paul@543 | 483 | ' File "%s", line %s, in %s' % |
paul@543 | 484 | (exc_tb.tb_frame.f_code.co_filename, |
paul@543 | 485 | exc_tb.tb_lineno, |
paul@543 | 486 | exc_tb.tb_frame.f_code.co_name)) |
paul@543 | 487 | exc_tb = exc_tb.tb_next |
paul@543 | 488 | print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) |
paul@543 | 489 | # Make sure that exc_tb gets deleted since it is a memory |
paul@543 | 490 | # hog; deleting everything else is just for thoroughness |
paul@543 | 491 | finally: |
paul@543 | 492 | del exc_type, exc_value, exc_tb |
paul@543 | 493 | else: |
paul@543 | 494 | if __debug__: |
paul@543 | 495 | self._note("%s.__bootstrap(): normal return", self) |
paul@543 | 496 | finally: |
paul@543 | 497 | _active_limbo_lock.acquire() |
paul@543 | 498 | try: |
paul@543 | 499 | self.__stop() |
paul@543 | 500 | try: |
paul@543 | 501 | # We don't call self.__delete() because it also |
paul@543 | 502 | # grabs _active_limbo_lock. |
paul@543 | 503 | del _active[_get_ident()] |
paul@543 | 504 | except: |
paul@543 | 505 | pass |
paul@543 | 506 | finally: |
paul@543 | 507 | _active_limbo_lock.release() |
paul@543 | 508 | |
paul@543 | 509 | def __stop(self): |
paul@543 | 510 | self.__block.acquire() |
paul@543 | 511 | self.__stopped = True |
paul@543 | 512 | self.__block.notifyAll() |
paul@543 | 513 | self.__block.release() |
paul@543 | 514 | |
paul@543 | 515 | def __delete(self): |
paul@543 | 516 | "Remove current thread from the dict of currently running threads." |
paul@543 | 517 | |
paul@543 | 518 | # Notes about running with dummy_thread: |
paul@543 | 519 | # |
paul@543 | 520 | # Must take care to not raise an exception if dummy_thread is being |
paul@543 | 521 | # used (and thus this module is being used as an instance of |
paul@543 | 522 | # dummy_threading). dummy_thread.get_ident() always returns -1 since |
paul@543 | 523 | # there is only one thread if dummy_thread is being used. Thus |
paul@543 | 524 | # len(_active) is always <= 1 here, and any Thread instance created |
paul@543 | 525 | # overwrites the (if any) thread currently registered in _active. |
paul@543 | 526 | # |
paul@543 | 527 | # An instance of _MainThread is always created by 'threading'. This |
paul@543 | 528 | # gets overwritten the instant an instance of Thread is created; both |
paul@543 | 529 | # threads return -1 from dummy_thread.get_ident() and thus have the |
paul@543 | 530 | # same key in the dict. So when the _MainThread instance created by |
paul@543 | 531 | # 'threading' tries to clean itself up when atexit calls this method |
paul@543 | 532 | # it gets a KeyError if another Thread instance was created. |
paul@543 | 533 | # |
paul@543 | 534 | # This all means that KeyError from trying to delete something from |
paul@543 | 535 | # _active if dummy_threading is being used is a red herring. But |
paul@543 | 536 | # since it isn't if dummy_threading is *not* being used then don't |
paul@543 | 537 | # hide the exception. |
paul@543 | 538 | |
paul@543 | 539 | _active_limbo_lock.acquire() |
paul@543 | 540 | try: |
paul@543 | 541 | try: |
paul@543 | 542 | del _active[_get_ident()] |
paul@543 | 543 | except KeyError: |
paul@543 | 544 | if 'dummy_threading' not in _sys.modules: |
paul@543 | 545 | raise |
paul@543 | 546 | finally: |
paul@543 | 547 | _active_limbo_lock.release() |
paul@543 | 548 | |
paul@543 | 549 | def join(self, timeout=None): |
paul@543 | 550 | if not self.__initialized: |
paul@543 | 551 | raise RuntimeError("Thread.__init__() not called") |
paul@543 | 552 | if not self.__started: |
paul@543 | 553 | raise RuntimeError("cannot join thread before it is started") |
paul@543 | 554 | if self is currentThread(): |
paul@543 | 555 | raise RuntimeError("cannot join current thread") |
paul@543 | 556 | |
paul@543 | 557 | if __debug__: |
paul@543 | 558 | if not self.__stopped: |
paul@543 | 559 | self._note("%s.join(): waiting until thread stops", self) |
paul@543 | 560 | self.__block.acquire() |
paul@543 | 561 | try: |
paul@543 | 562 | if timeout is None: |
paul@543 | 563 | while not self.__stopped: |
paul@543 | 564 | self.__block.wait() |
paul@543 | 565 | if __debug__: |
paul@543 | 566 | self._note("%s.join(): thread stopped", self) |
paul@543 | 567 | else: |
paul@543 | 568 | deadline = _time() + timeout |
paul@543 | 569 | while not self.__stopped: |
paul@543 | 570 | delay = deadline - _time() |
paul@543 | 571 | if delay <= 0: |
paul@543 | 572 | if __debug__: |
paul@543 | 573 | self._note("%s.join(): timed out", self) |
paul@543 | 574 | break |
paul@543 | 575 | self.__block.wait(delay) |
paul@543 | 576 | else: |
paul@543 | 577 | if __debug__: |
paul@543 | 578 | self._note("%s.join(): thread stopped", self) |
paul@543 | 579 | finally: |
paul@543 | 580 | self.__block.release() |
paul@543 | 581 | |
paul@543 | 582 | def getName(self): |
paul@543 | 583 | assert self.__initialized, "Thread.__init__() not called" |
paul@543 | 584 | return self.__name |
paul@543 | 585 | |
paul@543 | 586 | def setName(self, name): |
paul@543 | 587 | assert self.__initialized, "Thread.__init__() not called" |
paul@543 | 588 | self.__name = str(name) |
paul@543 | 589 | |
paul@543 | 590 | def isAlive(self): |
paul@543 | 591 | assert self.__initialized, "Thread.__init__() not called" |
paul@543 | 592 | return self.__started and not self.__stopped |
paul@543 | 593 | |
paul@543 | 594 | def isDaemon(self): |
paul@543 | 595 | assert self.__initialized, "Thread.__init__() not called" |
paul@543 | 596 | return self.__daemonic |
paul@543 | 597 | |
paul@543 | 598 | def setDaemon(self, daemonic): |
paul@543 | 599 | if not self.__initialized: |
paul@543 | 600 | raise RuntimeError("Thread.__init__() not called") |
paul@543 | 601 | if self.__started: |
paul@543 | 602 | raise RuntimeError("cannot set daemon status of active thread"); |
paul@543 | 603 | self.__daemonic = daemonic |
paul@543 | 604 | |
paul@543 | 605 | # The timer class was contributed by Itamar Shtull-Trauring |
paul@543 | 606 | |
paul@543 | 607 | def Timer(*args, **kwargs): |
paul@543 | 608 | return _Timer(*args, **kwargs) |
paul@543 | 609 | |
paul@543 | 610 | class _Timer(Thread): |
paul@543 | 611 | """Call a function after a specified number of seconds: |
paul@543 | 612 | |
paul@543 | 613 | t = Timer(30.0, f, args=[], kwargs={}) |
paul@543 | 614 | t.start() |
paul@543 | 615 | t.cancel() # stop the timer's action if it's still waiting |
paul@543 | 616 | """ |
paul@543 | 617 | |
paul@543 | 618 | def __init__(self, interval, function, args=[], kwargs={}): |
paul@543 | 619 | Thread.__init__(self) |
paul@543 | 620 | self.interval = interval |
paul@543 | 621 | self.function = function |
paul@543 | 622 | self.args = args |
paul@543 | 623 | self.kwargs = kwargs |
paul@543 | 624 | self.finished = Event() |
paul@543 | 625 | |
paul@543 | 626 | def cancel(self): |
paul@543 | 627 | """Stop the timer if it hasn't finished yet""" |
paul@543 | 628 | self.finished.set() |
paul@543 | 629 | |
paul@543 | 630 | def run(self): |
paul@543 | 631 | self.finished.wait(self.interval) |
paul@543 | 632 | if not self.finished.isSet(): |
paul@543 | 633 | self.function(*self.args, **self.kwargs) |
paul@543 | 634 | self.finished.set() |
paul@543 | 635 | |
paul@543 | 636 | # Special thread class to represent the main thread |
paul@543 | 637 | # This is garbage collected through an exit handler |
paul@543 | 638 | |
paul@543 | 639 | class _MainThread(Thread): |
paul@543 | 640 | |
paul@543 | 641 | def __init__(self): |
paul@543 | 642 | Thread.__init__(self, name="MainThread") |
paul@543 | 643 | self._Thread__started = True |
paul@543 | 644 | _active_limbo_lock.acquire() |
paul@543 | 645 | _active[_get_ident()] = self |
paul@543 | 646 | _active_limbo_lock.release() |
paul@543 | 647 | |
paul@543 | 648 | def _set_daemon(self): |
paul@543 | 649 | return False |
paul@543 | 650 | |
paul@543 | 651 | def _exitfunc(self): |
paul@543 | 652 | self._Thread__stop() |
paul@543 | 653 | t = _pickSomeNonDaemonThread() |
paul@543 | 654 | if t: |
paul@543 | 655 | if __debug__: |
paul@543 | 656 | self._note("%s: waiting for other threads", self) |
paul@543 | 657 | while t: |
paul@543 | 658 | t.join() |
paul@543 | 659 | t = _pickSomeNonDaemonThread() |
paul@543 | 660 | if __debug__: |
paul@543 | 661 | self._note("%s: exiting", self) |
paul@543 | 662 | self._Thread__delete() |
paul@543 | 663 | |
paul@543 | 664 | def _pickSomeNonDaemonThread(): |
paul@543 | 665 | for t in enumerate(): |
paul@543 | 666 | if not t.isDaemon() and t.isAlive(): |
paul@543 | 667 | return t |
paul@543 | 668 | return None |
paul@543 | 669 | |
paul@543 | 670 | |
paul@543 | 671 | # Dummy thread class to represent threads not started here. |
paul@543 | 672 | # These aren't garbage collected when they die, nor can they be waited for. |
paul@543 | 673 | # If they invoke anything in threading.py that calls currentThread(), they |
paul@543 | 674 | # leave an entry in the _active dict forever after. |
paul@543 | 675 | # Their purpose is to return *something* from currentThread(). |
paul@543 | 676 | # They are marked as daemon threads so we won't wait for them |
paul@543 | 677 | # when we exit (conform previous semantics). |
paul@543 | 678 | |
paul@543 | 679 | class _DummyThread(Thread): |
paul@543 | 680 | |
paul@543 | 681 | def __init__(self): |
paul@543 | 682 | Thread.__init__(self, name=_newname("Dummy-%d")) |
paul@543 | 683 | |
paul@543 | 684 | # Thread.__block consumes an OS-level locking primitive, which |
paul@543 | 685 | # can never be used by a _DummyThread. Since a _DummyThread |
paul@543 | 686 | # instance is immortal, that's bad, so release this resource. |
paul@543 | 687 | del self._Thread__block |
paul@543 | 688 | |
paul@543 | 689 | self._Thread__started = True |
paul@543 | 690 | _active_limbo_lock.acquire() |
paul@543 | 691 | _active[_get_ident()] = self |
paul@543 | 692 | _active_limbo_lock.release() |
paul@543 | 693 | |
paul@543 | 694 | def _set_daemon(self): |
paul@543 | 695 | return True |
paul@543 | 696 | |
paul@543 | 697 | def join(self, timeout=None): |
paul@543 | 698 | assert False, "cannot join a dummy thread" |
paul@543 | 699 | |
paul@543 | 700 | |
paul@543 | 701 | # Global API functions |
paul@543 | 702 | |
paul@543 | 703 | def currentThread(): |
paul@543 | 704 | try: |
paul@543 | 705 | return _active[_get_ident()] |
paul@543 | 706 | except KeyError: |
paul@543 | 707 | ##print "currentThread(): no current thread for", _get_ident() |
paul@543 | 708 | return _DummyThread() |
paul@543 | 709 | |
paul@543 | 710 | def activeCount(): |
paul@543 | 711 | _active_limbo_lock.acquire() |
paul@543 | 712 | count = len(_active) + len(_limbo) |
paul@543 | 713 | _active_limbo_lock.release() |
paul@543 | 714 | return count |
paul@543 | 715 | |
paul@543 | 716 | def enumerate(): |
paul@543 | 717 | _active_limbo_lock.acquire() |
paul@543 | 718 | active = _active.values() + _limbo.values() |
paul@543 | 719 | _active_limbo_lock.release() |
paul@543 | 720 | return active |
paul@543 | 721 | |
paul@543 | 722 | from thread import stack_size |
paul@543 | 723 | |
paul@543 | 724 | # Create the main thread object, |
paul@543 | 725 | # and make it available for the interpreter |
paul@543 | 726 | # (Py_Main) as threading._shutdown. |
paul@543 | 727 | |
paul@543 | 728 | _shutdown = _MainThread()._exitfunc |
paul@543 | 729 | |
paul@543 | 730 | # get thread-local implementation, either from the thread |
paul@543 | 731 | # module, or from the python fallback |
paul@543 | 732 | |
paul@543 | 733 | try: |
paul@543 | 734 | from thread import _local as local |
paul@543 | 735 | except ImportError: |
paul@543 | 736 | from _threading_local import local |
paul@543 | 737 | |
paul@543 | 738 | |
paul@543 | 739 | # Self-test code |
paul@543 | 740 | |
paul@543 | 741 | def _test(): |
paul@543 | 742 | |
paul@543 | 743 | class BoundedQueue: |
paul@543 | 744 | |
paul@543 | 745 | def __init__(self, limit): |
paul@543 | 746 | self.mon = RLock() |
paul@543 | 747 | self.rc = Condition(self.mon) |
paul@543 | 748 | self.wc = Condition(self.mon) |
paul@543 | 749 | self.limit = limit |
paul@543 | 750 | self.queue = deque() |
paul@543 | 751 | |
paul@543 | 752 | def put(self, item): |
paul@543 | 753 | self.mon.acquire() |
paul@543 | 754 | while len(self.queue) >= self.limit: |
paul@543 | 755 | self._note("put(%s): queue full", item) |
paul@543 | 756 | self.wc.wait() |
paul@543 | 757 | self.queue.append(item) |
paul@543 | 758 | self._note("put(%s): appended, length now %d", |
paul@543 | 759 | item, len(self.queue)) |
paul@543 | 760 | self.rc.notify() |
paul@543 | 761 | self.mon.release() |
paul@543 | 762 | |
paul@543 | 763 | def get(self): |
paul@543 | 764 | self.mon.acquire() |
paul@543 | 765 | while not self.queue: |
paul@543 | 766 | self._note("get(): queue empty") |
paul@543 | 767 | self.rc.wait() |
paul@543 | 768 | item = self.queue.popleft() |
paul@543 | 769 | self._note("get(): got %s, %d left", item, len(self.queue)) |
paul@543 | 770 | self.wc.notify() |
paul@543 | 771 | self.mon.release() |
paul@543 | 772 | return item |
paul@543 | 773 | |
paul@543 | 774 | class ProducerThread(Thread): |
paul@543 | 775 | |
paul@543 | 776 | def __init__(self, queue, quota): |
paul@543 | 777 | Thread.__init__(self, name="Producer") |
paul@543 | 778 | self.queue = queue |
paul@543 | 779 | self.quota = quota |
paul@543 | 780 | |
paul@543 | 781 | def run(self): |
paul@543 | 782 | from random import random |
paul@543 | 783 | counter = 0 |
paul@543 | 784 | while counter < self.quota: |
paul@543 | 785 | counter = counter + 1 |
paul@543 | 786 | self.queue.put("%s.%d" % (self.getName(), counter)) |
paul@543 | 787 | _sleep(random() * 0.00001) |
paul@543 | 788 | |
paul@543 | 789 | |
paul@543 | 790 | class ConsumerThread(Thread): |
paul@543 | 791 | |
paul@543 | 792 | def __init__(self, queue, count): |
paul@543 | 793 | Thread.__init__(self, name="Consumer") |
paul@543 | 794 | self.queue = queue |
paul@543 | 795 | self.count = count |
paul@543 | 796 | |
paul@543 | 797 | def run(self): |
paul@543 | 798 | while self.count > 0: |
paul@543 | 799 | item = self.queue.get() |
paul@543 | 800 | print item |
paul@543 | 801 | self.count = self.count - 1 |
paul@543 | 802 | |
paul@543 | 803 | NP = 3 |
paul@543 | 804 | QL = 4 |
paul@543 | 805 | NI = 5 |
paul@543 | 806 | |
paul@543 | 807 | Q = BoundedQueue(QL) |
paul@543 | 808 | P = [] |
paul@543 | 809 | for i in range(NP): |
paul@543 | 810 | t = ProducerThread(Q, NI) |
paul@543 | 811 | t.setName("Producer-%d" % (i+1)) |
paul@543 | 812 | P.append(t) |
paul@543 | 813 | C = ConsumerThread(Q, NI*NP) |
paul@543 | 814 | for t in P: |
paul@543 | 815 | t.start() |
paul@543 | 816 | _sleep(0.000001) |
paul@543 | 817 | C.start() |
paul@543 | 818 | for t in P: |
paul@543 | 819 | t.join() |
paul@543 | 820 | C.join() |
paul@543 | 821 | |
paul@543 | 822 | if __name__ == '__main__': |
paul@543 | 823 | _test() |
paul@543 | 824 | |
paul@543 | 825 | # vim: tabstop=4 expandtab shiftwidth=4 |