Lichen

lib/threading.py

286:ea96958037c6
2016-11-30 Paul Boddie Implemented the zip function.
     1 #!/usr/bin/env python     2      3 """     4 Thread module emulating a subset of Java's threading model.     5      6 See docs/COPYING.txt and docs/LICENCE-python.txt for copyright and licensing     7 information for Python standard library modules.     8 """     9     10 import sys as _sys    11     12 try:    13     import thread    14 except ImportError:    15     del _sys.modules[__name__]    16     raise    17     18 from time import time as _time, sleep as _sleep    19 from traceback import format_exc as _format_exc    20 from collections import deque    21     22 # Rename some stuff so "from threading import *" is safe    23 __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',    24            'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',    25            'Timer', 'setprofile', 'settrace', 'local', 'stack_size']    26     27 _start_new_thread = thread.start_new_thread    28 _allocate_lock = thread.allocate_lock    29 _get_ident = thread.get_ident    30 ThreadError = thread.error    31 del thread    32     33     34 # Support for profile and trace hooks    35     36 _profile_hook = None    37 _trace_hook = None    38     39 def setprofile(func):    40     global _profile_hook    41     _profile_hook = func    42     43 def settrace(func):    44     global _trace_hook    45     _trace_hook = func    46     47 # Synchronization classes    48     49 Lock = _allocate_lock    50     51 def RLock(*args, **kwargs):    52     return _RLock(*args, **kwargs)    53     54 class _RLock:    55     56     def __init__(self):    57         self.__block = _allocate_lock()    58         self.__owner = None    59         self.__count = 0    60     61     def __repr__(self):    62         owner = self.__owner    63         return "<%s(%s, %d)>" % (    64                 self.__class__.__name__,    65                 owner and owner.getName(),    66                 self.__count)    67     68     def acquire(self, blocking=1):    69         me = currentThread()    70         if self.__owner is me:    71             self.__count = self.__count + 1    72             if __debug__:    73                 self._note("%s.acquire(%s): recursive success", self, blocking)    74             return 1    75         rc = self.__block.acquire(blocking)    76         if rc:    77             self.__owner = me    78             self.__count = 1    79             if __debug__:    80                 self._note("%s.acquire(%s): initial success", self, blocking)    81         else:    82             if __debug__:    83                 self._note("%s.acquire(%s): failure", self, blocking)    84         return rc    85     86     __enter__ = acquire    87     88     def release(self):    89         if self.__owner is not currentThread():    90             raise RuntimeError("cannot release un-aquired lock")    91         self.__count = count = self.__count - 1    92         if not count:    93             self.__owner = None    94             self.__block.release()    95             if __debug__:    96                 self._note("%s.release(): final release", self)    97         else:    98             if __debug__:    99                 self._note("%s.release(): non-final release", self)   100    101     def __exit__(self, t, v, tb):   102         self.release()   103    104     # Internal methods used by condition variables   105    106     def _acquire_restore(self, (count, owner)):   107         self.__block.acquire()   108         self.__count = count   109         self.__owner = owner   110         if __debug__:   111             self._note("%s._acquire_restore()", self)   112    113     def _release_save(self):   114         if __debug__:   115             self._note("%s._release_save()", self)   116         count = self.__count   117         self.__count = 0   118         owner = self.__owner   119         self.__owner = None   120         self.__block.release()   121         return (count, owner)   122    123     def _is_owned(self):   124         return self.__owner is currentThread()   125    126    127 def Condition(*args, **kwargs):   128     return _Condition(*args, **kwargs)   129    130 class _Condition:   131    132     def __init__(self, lock=None):   133         if lock is None:   134             lock = RLock()   135         self.__lock = lock   136         # Export the lock's acquire() and release() methods   137         self.acquire = lock.acquire   138         self.release = lock.release   139         # If the lock defines _release_save() and/or _acquire_restore(),   140         # these override the default implementations (which just call   141         # release() and acquire() on the lock).  Ditto for _is_owned().   142         try:   143             self._release_save = lock._release_save   144         except AttributeError:   145             pass   146         try:   147             self._acquire_restore = lock._acquire_restore   148         except AttributeError:   149             pass   150         try:   151             self._is_owned = lock._is_owned   152         except AttributeError:   153             pass   154         self.__waiters = []   155    156     def __enter__(self):   157         return self.__lock.__enter__()   158    159     def __exit__(self, *args):   160         return self.__lock.__exit__(*args)   161    162     def __repr__(self):   163         return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))   164    165     def _release_save(self):   166         self.__lock.release()           # No state to save   167    168     def _acquire_restore(self, x):   169         self.__lock.acquire()           # Ignore saved state   170    171     def _is_owned(self):   172         # Return True if lock is owned by currentThread.   173         # This method is called only if __lock doesn't have _is_owned().   174         if self.__lock.acquire(0):   175             self.__lock.release()   176             return False   177         else:   178             return True   179    180     def wait(self, timeout=None):   181         if not self._is_owned():   182             raise RuntimeError("cannot wait on un-aquired lock")   183         waiter = _allocate_lock()   184         waiter.acquire()   185         self.__waiters.append(waiter)   186         saved_state = self._release_save()   187         try:    # restore state no matter what (e.g., KeyboardInterrupt)   188             if timeout is None:   189                 waiter.acquire()   190                 if __debug__:   191                     self._note("%s.wait(): got it", self)   192             else:   193                 # Balancing act:  We can't afford a pure busy loop, so we   194                 # have to sleep; but if we sleep the whole timeout time,   195                 # we'll be unresponsive.  The scheme here sleeps very   196                 # little at first, longer as time goes on, but never longer   197                 # than 20 times per second (or the timeout time remaining).   198                 endtime = _time() + timeout   199                 delay = 0.0005 # 500 us -> initial delay of 1 ms   200                 while True:   201                     gotit = waiter.acquire(0)   202                     if gotit:   203                         break   204                     remaining = endtime - _time()   205                     if remaining <= 0:   206                         break   207                     delay = min(delay * 2, remaining, .05)   208                     _sleep(delay)   209                 if not gotit:   210                     if __debug__:   211                         self._note("%s.wait(%s): timed out", self, timeout)   212                     try:   213                         self.__waiters.remove(waiter)   214                     except ValueError:   215                         pass   216                 else:   217                     if __debug__:   218                         self._note("%s.wait(%s): got it", self, timeout)   219         finally:   220             self._acquire_restore(saved_state)   221    222     def notify(self, n=1):   223         if not self._is_owned():   224             raise RuntimeError("cannot notify on un-aquired lock")   225         __waiters = self.__waiters   226         waiters = __waiters[:n]   227         if not waiters:   228             if __debug__:   229                 self._note("%s.notify(): no waiters", self)   230             return   231         self._note("%s.notify(): notifying %d waiter%s", self, n,   232                    n!=1 and "s" or "")   233         for waiter in waiters:   234             waiter.release()   235             try:   236                 __waiters.remove(waiter)   237             except ValueError:   238                 pass   239    240     def notifyAll(self):   241         self.notify(len(self.__waiters))   242    243    244 def Semaphore(*args, **kwargs):   245     return _Semaphore(*args, **kwargs)   246    247 class _Semaphore:   248    249     # After Tim Peters' semaphore class, but not quite the same (no maximum)   250    251     def __init__(self, value=1):   252         if value < 0:   253             raise ValueError("semaphore initial value must be >= 0")   254         self.__cond = Condition(Lock())   255         self.__value = value   256    257     def acquire(self, blocking=1):   258         rc = False   259         self.__cond.acquire()   260         while self.__value == 0:   261             if not blocking:   262                 break   263             if __debug__:   264                 self._note("%s.acquire(%s): blocked waiting, value=%s",   265                            self, blocking, self.__value)   266             self.__cond.wait()   267         else:   268             self.__value = self.__value - 1   269             if __debug__:   270                 self._note("%s.acquire: success, value=%s",   271                            self, self.__value)   272             rc = True   273         self.__cond.release()   274         return rc   275    276     __enter__ = acquire   277    278     def release(self):   279         self.__cond.acquire()   280         self.__value = self.__value + 1   281         if __debug__:   282             self._note("%s.release: success, value=%s",   283                        self, self.__value)   284         self.__cond.notify()   285         self.__cond.release()   286    287     def __exit__(self, t, v, tb):   288         self.release()   289    290    291 def BoundedSemaphore(*args, **kwargs):   292     return _BoundedSemaphore(*args, **kwargs)   293    294 class _BoundedSemaphore(_Semaphore):   295     """Semaphore that checks that # releases is <= # acquires"""   296     def __init__(self, value=1):   297         _Semaphore.__init__(self, value)   298         self._initial_value = value   299    300     def release(self):   301         if self._Semaphore__value >= self._initial_value:   302             raise ValueError, "Semaphore released too many times"   303         return _Semaphore.release(self)   304    305    306 def Event(*args, **kwargs):   307     return _Event(*args, **kwargs)   308    309 class _Event:   310    311     # After Tim Peters' event class (without is_posted())   312    313     def __init__(self):   314         self.__cond = Condition(Lock())   315         self.__flag = False   316    317     def isSet(self):   318         return self.__flag   319    320     def set(self):   321         self.__cond.acquire()   322         try:   323             self.__flag = True   324             self.__cond.notifyAll()   325         finally:   326             self.__cond.release()   327    328     def clear(self):   329         self.__cond.acquire()   330         try:   331             self.__flag = False   332         finally:   333             self.__cond.release()   334    335     def wait(self, timeout=None):   336         self.__cond.acquire()   337         try:   338             if not self.__flag:   339                 self.__cond.wait(timeout)   340         finally:   341             self.__cond.release()   342    343 # Helper to generate new thread names   344 _counter = 0   345 def _newname(template="Thread-%d"):   346     global _counter   347     _counter = _counter + 1   348     return template % _counter   349    350 # Active thread administration   351 _active_limbo_lock = _allocate_lock()   352 _active = {}    # maps thread id to Thread object   353 _limbo = {}   354    355    356 # Main class for threads   357    358 class Thread:   359    360     __initialized = False   361     # Need to store a reference to sys.exc_info for printing   362     # out exceptions when a thread tries to use a global var. during interp.   363     # shutdown and thus raises an exception about trying to perform some   364     # operation on/with a NoneType   365     __exc_info = _sys.exc_info   366    367     def __init__(self, group=None, target=None, name=None,   368                  args=(), kwargs=None):   369         assert group is None, "group argument must be None for now"   370         if kwargs is None:   371             kwargs = {}   372         self.__target = target   373         self.__name = str(name or _newname())   374         self.__args = args   375         self.__kwargs = kwargs   376         self.__daemonic = self._set_daemon()   377         self.__started = False   378         self.__stopped = False   379         self.__block = Condition(Lock())   380         self.__initialized = True   381         # sys.stderr is not stored in the class like   382         # sys.exc_info since it can be changed between instances   383         self.__stderr = _sys.stderr   384    385     def _set_daemon(self):   386         # Overridden in _MainThread and _DummyThread   387         return currentThread().isDaemon()   388    389     def __repr__(self):   390         assert self.__initialized, "Thread.__init__() was not called"   391         status = "initial"   392         if self.__started:   393             status = "started"   394         if self.__stopped:   395             status = "stopped"   396         if self.__daemonic:   397             status = status + " daemon"   398         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)   399    400     def start(self):   401         if not self.__initialized:   402             raise RuntimeError("thread.__init__() not called")   403         if self.__started:   404             raise RuntimeError("thread already started")   405         if __debug__:   406             self._note("%s.start(): starting thread", self)   407         _active_limbo_lock.acquire()   408         _limbo[self] = self   409         _active_limbo_lock.release()   410         _start_new_thread(self.__bootstrap, ())   411         self.__started = True   412         _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)   413    414     def run(self):   415         if self.__target:   416             self.__target(*self.__args, **self.__kwargs)   417    418     def __bootstrap(self):   419         # Wrapper around the real bootstrap code that ignores   420         # exceptions during interpreter cleanup.  Those typically   421         # happen when a daemon thread wakes up at an unfortunate   422         # moment, finds the world around it destroyed, and raises some   423         # random exception *** while trying to report the exception in   424         # __bootstrap_inner() below ***.  Those random exceptions   425         # don't help anybody, and they confuse users, so we suppress   426         # them.  We suppress them only when it appears that the world   427         # indeed has already been destroyed, so that exceptions in   428         # __bootstrap_inner() during normal business hours are properly   429         # reported.  Also, we only suppress them for daemonic threads;   430         # if a non-daemonic encounters this, something else is wrong.   431         try:   432             self.__bootstrap_inner()   433         except:   434             if self.__daemonic and _sys is None:   435                 return   436             raise   437    438     def __bootstrap_inner(self):   439         try:   440             self.__started = True   441             _active_limbo_lock.acquire()   442             _active[_get_ident()] = self   443             del _limbo[self]   444             _active_limbo_lock.release()   445             if __debug__:   446                 self._note("%s.__bootstrap(): thread started", self)   447    448             if _trace_hook:   449                 self._note("%s.__bootstrap(): registering trace hook", self)   450                 _sys.settrace(_trace_hook)   451             if _profile_hook:   452                 self._note("%s.__bootstrap(): registering profile hook", self)   453                 _sys.setprofile(_profile_hook)   454    455             try:   456                 self.run()   457             except SystemExit:   458                 if __debug__:   459                     self._note("%s.__bootstrap(): raised SystemExit", self)   460             except:   461                 if __debug__:   462                     self._note("%s.__bootstrap(): unhandled exception", self)   463                 # If sys.stderr is no more (most likely from interpreter   464                 # shutdown) use self.__stderr.  Otherwise still use sys (as in   465                 # _sys) in case sys.stderr was redefined since the creation of   466                 # self.   467                 if _sys:   468                     _sys.stderr.write("Exception in thread %s:\n%s\n" %   469                                       (self.getName(), _format_exc()))   470                 else:   471                     # Do the best job possible w/o a huge amt. of code to   472                     # approximate a traceback (code ideas from   473                     # Lib/traceback.py)   474                     exc_type, exc_value, exc_tb = self.__exc_info()   475                     try:   476                         print>>self.__stderr, (   477                             "Exception in thread " + self.getName() +   478                             " (most likely raised during interpreter shutdown):")   479                         print>>self.__stderr, (   480                             "Traceback (most recent call last):")   481                         while exc_tb:   482                             print>>self.__stderr, (   483                                 '  File "%s", line %s, in %s' %   484                                 (exc_tb.tb_frame.f_code.co_filename,   485                                     exc_tb.tb_lineno,   486                                     exc_tb.tb_frame.f_code.co_name))   487                             exc_tb = exc_tb.tb_next   488                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))   489                     # Make sure that exc_tb gets deleted since it is a memory   490                     # hog; deleting everything else is just for thoroughness   491                     finally:   492                         del exc_type, exc_value, exc_tb   493             else:   494                 if __debug__:   495                     self._note("%s.__bootstrap(): normal return", self)   496         finally:   497             _active_limbo_lock.acquire()   498             try:   499                 self.__stop()   500                 try:   501                     # We don't call self.__delete() because it also   502                     # grabs _active_limbo_lock.   503                     del _active[_get_ident()]   504                 except:   505                     pass   506             finally:   507                 _active_limbo_lock.release()   508    509     def __stop(self):   510         self.__block.acquire()   511         self.__stopped = True   512         self.__block.notifyAll()   513         self.__block.release()   514    515     def __delete(self):   516         "Remove current thread from the dict of currently running threads."   517    518         # Notes about running with dummy_thread:   519         #   520         # Must take care to not raise an exception if dummy_thread is being   521         # used (and thus this module is being used as an instance of   522         # dummy_threading).  dummy_thread.get_ident() always returns -1 since   523         # there is only one thread if dummy_thread is being used.  Thus   524         # len(_active) is always <= 1 here, and any Thread instance created   525         # overwrites the (if any) thread currently registered in _active.   526         #   527         # An instance of _MainThread is always created by 'threading'.  This   528         # gets overwritten the instant an instance of Thread is created; both   529         # threads return -1 from dummy_thread.get_ident() and thus have the   530         # same key in the dict.  So when the _MainThread instance created by   531         # 'threading' tries to clean itself up when atexit calls this method   532         # it gets a KeyError if another Thread instance was created.   533         #   534         # This all means that KeyError from trying to delete something from   535         # _active if dummy_threading is being used is a red herring.  But   536         # since it isn't if dummy_threading is *not* being used then don't   537         # hide the exception.   538    539         _active_limbo_lock.acquire()   540         try:   541             try:   542                 del _active[_get_ident()]   543             except KeyError:   544                 if 'dummy_threading' not in _sys.modules:   545                     raise   546         finally:   547             _active_limbo_lock.release()   548    549     def join(self, timeout=None):   550         if not self.__initialized:   551             raise RuntimeError("Thread.__init__() not called")   552         if not self.__started:   553             raise RuntimeError("cannot join thread before it is started")   554         if self is currentThread():   555             raise RuntimeError("cannot join current thread")   556    557         if __debug__:   558             if not self.__stopped:   559                 self._note("%s.join(): waiting until thread stops", self)   560         self.__block.acquire()   561         try:   562             if timeout is None:   563                 while not self.__stopped:   564                     self.__block.wait()   565                 if __debug__:   566                     self._note("%s.join(): thread stopped", self)   567             else:   568                 deadline = _time() + timeout   569                 while not self.__stopped:   570                     delay = deadline - _time()   571                     if delay <= 0:   572                         if __debug__:   573                             self._note("%s.join(): timed out", self)   574                         break   575                     self.__block.wait(delay)   576                 else:   577                     if __debug__:   578                         self._note("%s.join(): thread stopped", self)   579         finally:   580             self.__block.release()   581    582     def getName(self):   583         assert self.__initialized, "Thread.__init__() not called"   584         return self.__name   585    586     def setName(self, name):   587         assert self.__initialized, "Thread.__init__() not called"   588         self.__name = str(name)   589    590     def isAlive(self):   591         assert self.__initialized, "Thread.__init__() not called"   592         return self.__started and not self.__stopped   593    594     def isDaemon(self):   595         assert self.__initialized, "Thread.__init__() not called"   596         return self.__daemonic   597    598     def setDaemon(self, daemonic):   599         if not self.__initialized:   600             raise RuntimeError("Thread.__init__() not called")   601         if self.__started:   602             raise RuntimeError("cannot set daemon status of active thread");   603         self.__daemonic = daemonic   604    605 # The timer class was contributed by Itamar Shtull-Trauring   606    607 def Timer(*args, **kwargs):   608     return _Timer(*args, **kwargs)   609    610 class _Timer(Thread):   611     """Call a function after a specified number of seconds:   612    613     t = Timer(30.0, f, args=[], kwargs={})   614     t.start()   615     t.cancel() # stop the timer's action if it's still waiting   616     """   617    618     def __init__(self, interval, function, args=[], kwargs={}):   619         Thread.__init__(self)   620         self.interval = interval   621         self.function = function   622         self.args = args   623         self.kwargs = kwargs   624         self.finished = Event()   625    626     def cancel(self):   627         """Stop the timer if it hasn't finished yet"""   628         self.finished.set()   629    630     def run(self):   631         self.finished.wait(self.interval)   632         if not self.finished.isSet():   633             self.function(*self.args, **self.kwargs)   634         self.finished.set()   635    636 # Special thread class to represent the main thread   637 # This is garbage collected through an exit handler   638    639 class _MainThread(Thread):   640    641     def __init__(self):   642         Thread.__init__(self, name="MainThread")   643         self._Thread__started = True   644         _active_limbo_lock.acquire()   645         _active[_get_ident()] = self   646         _active_limbo_lock.release()   647    648     def _set_daemon(self):   649         return False   650    651     def _exitfunc(self):   652         self._Thread__stop()   653         t = _pickSomeNonDaemonThread()   654         if t:   655             if __debug__:   656                 self._note("%s: waiting for other threads", self)   657         while t:   658             t.join()   659             t = _pickSomeNonDaemonThread()   660         if __debug__:   661             self._note("%s: exiting", self)   662         self._Thread__delete()   663    664 def _pickSomeNonDaemonThread():   665     for t in enumerate():   666         if not t.isDaemon() and t.isAlive():   667             return t   668     return None   669    670    671 # Dummy thread class to represent threads not started here.   672 # These aren't garbage collected when they die, nor can they be waited for.   673 # If they invoke anything in threading.py that calls currentThread(), they   674 # leave an entry in the _active dict forever after.   675 # Their purpose is to return *something* from currentThread().   676 # They are marked as daemon threads so we won't wait for them   677 # when we exit (conform previous semantics).   678    679 class _DummyThread(Thread):   680    681     def __init__(self):   682         Thread.__init__(self, name=_newname("Dummy-%d"))   683    684         # Thread.__block consumes an OS-level locking primitive, which   685         # can never be used by a _DummyThread.  Since a _DummyThread   686         # instance is immortal, that's bad, so release this resource.   687         del self._Thread__block   688    689         self._Thread__started = True   690         _active_limbo_lock.acquire()   691         _active[_get_ident()] = self   692         _active_limbo_lock.release()   693    694     def _set_daemon(self):   695         return True   696    697     def join(self, timeout=None):   698         assert False, "cannot join a dummy thread"   699    700    701 # Global API functions   702    703 def currentThread():   704     try:   705         return _active[_get_ident()]   706     except KeyError:   707         ##print "currentThread(): no current thread for", _get_ident()   708         return _DummyThread()   709    710 def activeCount():   711     _active_limbo_lock.acquire()   712     count = len(_active) + len(_limbo)   713     _active_limbo_lock.release()   714     return count   715    716 def enumerate():   717     _active_limbo_lock.acquire()   718     active = _active.values() + _limbo.values()   719     _active_limbo_lock.release()   720     return active   721    722 from thread import stack_size   723    724 # Create the main thread object,   725 # and make it available for the interpreter   726 # (Py_Main) as threading._shutdown.   727    728 _shutdown = _MainThread()._exitfunc   729    730 # get thread-local implementation, either from the thread   731 # module, or from the python fallback   732    733 try:   734     from thread import _local as local   735 except ImportError:   736     from _threading_local import local   737    738    739 # Self-test code   740    741 def _test():   742    743     class BoundedQueue:   744    745         def __init__(self, limit):   746             self.mon = RLock()   747             self.rc = Condition(self.mon)   748             self.wc = Condition(self.mon)   749             self.limit = limit   750             self.queue = deque()   751    752         def put(self, item):   753             self.mon.acquire()   754             while len(self.queue) >= self.limit:   755                 self._note("put(%s): queue full", item)   756                 self.wc.wait()   757             self.queue.append(item)   758             self._note("put(%s): appended, length now %d",   759                        item, len(self.queue))   760             self.rc.notify()   761             self.mon.release()   762    763         def get(self):   764             self.mon.acquire()   765             while not self.queue:   766                 self._note("get(): queue empty")   767                 self.rc.wait()   768             item = self.queue.popleft()   769             self._note("get(): got %s, %d left", item, len(self.queue))   770             self.wc.notify()   771             self.mon.release()   772             return item   773    774     class ProducerThread(Thread):   775    776         def __init__(self, queue, quota):   777             Thread.__init__(self, name="Producer")   778             self.queue = queue   779             self.quota = quota   780    781         def run(self):   782             from random import random   783             counter = 0   784             while counter < self.quota:   785                 counter = counter + 1   786                 self.queue.put("%s.%d" % (self.getName(), counter))   787                 _sleep(random() * 0.00001)   788    789    790     class ConsumerThread(Thread):   791    792         def __init__(self, queue, count):   793             Thread.__init__(self, name="Consumer")   794             self.queue = queue   795             self.count = count   796    797         def run(self):   798             while self.count > 0:   799                 item = self.queue.get()   800                 print item   801                 self.count = self.count - 1   802    803     NP = 3   804     QL = 4   805     NI = 5   806    807     Q = BoundedQueue(QL)   808     P = []   809     for i in range(NP):   810         t = ProducerThread(Q, NI)   811         t.setName("Producer-%d" % (i+1))   812         P.append(t)   813     C = ConsumerThread(Q, NI*NP)   814     for t in P:   815         t.start()   816         _sleep(0.000001)   817     C.start()   818     for t in P:   819         t.join()   820     C.join()   821    822 if __name__ == '__main__':   823     _test()   824    825 # vim: tabstop=4 expandtab shiftwidth=4