pprocess

Changeset

173:45aa2c728c7a
2016-10-04 Paul Boddie raw files shortlog changelog graph Attempt to handle temporary resource unavailability by queuing processes, sleeping if no processes have been created already.
pprocess.py (file)
     1.1 --- a/pprocess.py	Tue Oct 20 23:16:46 2015 +0200
     1.2 +++ b/pprocess.py	Tue Oct 04 00:16:09 2016 +0200
     1.3 @@ -4,7 +4,8 @@
     1.4  A simple parallel processing API for Python, inspired somewhat by the thread
     1.5  module, slightly less by pypar, and slightly less still by pypvm.
     1.6  
     1.7 -Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013 Paul Boddie <paul@boddie.org.uk>
     1.8 +Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013, 2014,
     1.9 +              2016 Paul Boddie <paul@boddie.org.uk>
    1.10  Copyright (C) 2013 Yaroslav Halchenko <debian@onerussian.com>
    1.11  
    1.12  This program is free software; you can redistribute it and/or modify it under
    1.13 @@ -28,7 +29,9 @@
    1.14  import select
    1.15  import socket
    1.16  import platform
    1.17 +import errno
    1.18  
    1.19 +from time import sleep
    1.20  from warnings import warn
    1.21  
    1.22  try:
    1.23 @@ -402,16 +405,22 @@
    1.24                  try:
    1.25                      self.store_data(channel)
    1.26                      self.start_waiting(channel)
    1.27 -                except IOError, exc:
    1.28 +                except (IOError, OSError), exc:
    1.29                      self.remove(channel)
    1.30 -                    warn("Removed channel %r due to IOError: %s" % (channel, exc))
    1.31 +                    warn("Removed channel %r due to exception: %s" % (channel, exc))
    1.32  
    1.33          # Or schedule new processes and channels.
    1.34  
    1.35          else:
    1.36              while self.waiting and not self.busy():
    1.37 -                callable, args, kw = self.waiting.pop()
    1.38 -                self.start(callable, *args, **kw)
    1.39 +                details = self.waiting.pop()
    1.40 +
    1.41 +                # Stop actively scheduling if resources are exhausted.
    1.42 +
    1.43 +                if not self.start_new_waiting(details):
    1.44 +                    if not self.active():
    1.45 +                        sleep(1)
    1.46 +                    break
    1.47  
    1.48      def store_data(self, channel):
    1.49  
    1.50 @@ -449,6 +458,8 @@
    1.51                  self.add(channel)
    1.52                  channel.send((args, kw))
    1.53  
    1.54 +            # Return the details for a new channel.
    1.55 +
    1.56              else:
    1.57                  return callable, args, kw
    1.58  
    1.59 @@ -496,9 +507,31 @@
    1.60          """
    1.61  
    1.62          details = self._get_waiting(channel)
    1.63 +
    1.64          if details is not None:
    1.65 -            callable, args, kw = details
    1.66 -            self.add(start(callable, *args, **kw))
    1.67 +            self.start_new_waiting(details)
    1.68 +
    1.69 +    def start_new_waiting(self, details):
    1.70 +
    1.71 +        """
    1.72 +        Start a waiting process with the given 'details', obtaining a new
    1.73 +        channel.
    1.74 +        """
    1.75 +
    1.76 +        callable, args, kw = details
    1.77 +        channel = self._start(callable, *args, **kw)
    1.78 +
    1.79 +        # Monitor any newly-created process.
    1.80 +
    1.81 +        if channel:
    1.82 +            self.add(channel)
    1.83 +            return True
    1.84 +
    1.85 +        # Push the details back onto the end of the waiting list.
    1.86 +
    1.87 +        else:
    1.88 +            self.waiting.append(details)
    1.89 +            return False
    1.90  
    1.91      # Convenience methods.
    1.92  
    1.93 @@ -511,9 +544,21 @@
    1.94          """
    1.95  
    1.96          if self._set_waiting(callable, args, kw):
    1.97 -            return
    1.98 +            return False
    1.99 +
   1.100 +        channel = self._start(callable, *args, **kw)
   1.101 +
   1.102 +        # Monitor any newly-created process.
   1.103  
   1.104 -        self.add_wait(start(callable, *args, **kw))
   1.105 +        if channel:
   1.106 +            self.add_wait(channel)
   1.107 +            return True
   1.108 +
   1.109 +        # Otherwise, add the details to the waiting list unconditionally.
   1.110 +
   1.111 +        else:
   1.112 +            self.waiting.insert(0, (callable, args, kw))
   1.113 +            return False
   1.114  
   1.115      def create(self):
   1.116  
   1.117 @@ -537,6 +582,22 @@
   1.118  
   1.119          return ManagedCallable(callable, self)
   1.120  
   1.121 +    def _start(self, callable, *args, **kw):
   1.122 +
   1.123 +        """
   1.124 +        Create a new process for the given 'callable' using any additional
   1.125 +        arguments provided. Return any successfully created channel or None if
   1.126 +        no process could be created at the present time.
   1.127 +        """
   1.128 +
   1.129 +        try:
   1.130 +            return start(callable, *args, **kw)
   1.131 +        except OSError, exc:
   1.132 +            if exc.errno != errno.EAGAIN:
   1.133 +                raise
   1.134 +            else:
   1.135 +                return None
   1.136 +
   1.137  class Persistent:
   1.138  
   1.139      """