pprocess

Changeset

173:45aa2c728c7a
21 months ago Paul Boddie raw files shortlog changelog graph Attempt to handle temporary resource unavailability by queuing processes, sleeping if no processes have been created already.
pprocess.py (file)
     1.1 --- a/pprocess.py	Tue Oct 20 23:16:46 2015 +0200
     1.2 +++ b/pprocess.py	Tue Oct 04 00:16:09 2016 +0200
     1.3 @@ -4,7 +4,8 @@
     1.4  A simple parallel processing API for Python, inspired somewhat by the thread
     1.5  module, slightly less by pypar, and slightly less still by pypvm.
     1.6  
     1.7 -Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013 Paul Boddie <paul@boddie.org.uk>
     1.8 +Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013, 2014,
     1.9 +              2016 Paul Boddie <paul@boddie.org.uk>
    1.10  Copyright (C) 2013 Yaroslav Halchenko <debian@onerussian.com>
    1.11  
    1.12  This program is free software; you can redistribute it and/or modify it under
    1.13 @@ -28,7 +29,9 @@
    1.14  import select
    1.15  import socket
    1.16  import platform
    1.17 +import errno
    1.18  
    1.19 +from time import sleep
    1.20  from warnings import warn
    1.21  
    1.22  try:
    1.23 @@ -402,16 +405,22 @@
    1.24                  try:
    1.25                      self.store_data(channel)
    1.26                      self.start_waiting(channel)
    1.27 -                except IOError, exc:
    1.28 +                except (IOError, OSError), exc:
    1.29                      self.remove(channel)
    1.30 -                    warn("Removed channel %r due to IOError: %s" % (channel, exc))
    1.31 +                    warn("Removed channel %r due to exception: %s" % (channel, exc))
    1.32  
    1.33          # Or schedule new processes and channels.
    1.34  
    1.35          else:
    1.36              while self.waiting and not self.busy():
    1.37 -                callable, args, kw = self.waiting.pop()
    1.38 -                self.start(callable, *args, **kw)
    1.39 +                details = self.waiting.pop()
    1.40 +
    1.41 +                # Stop actively scheduling if resources are exhausted.
    1.42 +
    1.43 +                if not self.start_new_waiting(details):
    1.44 +                    if not self.active():
    1.45 +                        sleep(1)
    1.46 +                    break
    1.47  
    1.48      def store_data(self, channel):
    1.49  
    1.50 @@ -449,6 +458,8 @@
    1.51                  self.add(channel)
    1.52                  channel.send((args, kw))
    1.53  
    1.54 +            # Return the details for a new channel.
    1.55 +
    1.56              else:
    1.57                  return callable, args, kw
    1.58  
    1.59 @@ -496,9 +507,31 @@
    1.60          """
    1.61  
    1.62          details = self._get_waiting(channel)
    1.63 +
    1.64          if details is not None:
    1.65 -            callable, args, kw = details
    1.66 -            self.add(start(callable, *args, **kw))
    1.67 +            self.start_new_waiting(details)
    1.68 +
    1.69 +    def start_new_waiting(self, details):
    1.70 +
    1.71 +        """
    1.72 +        Start a waiting process with the given 'details', obtaining a new
    1.73 +        channel.
    1.74 +        """
    1.75 +
    1.76 +        callable, args, kw = details
    1.77 +        channel = self._start(callable, *args, **kw)
    1.78 +
    1.79 +        # Monitor any newly-created process.
    1.80 +
    1.81 +        if channel:
    1.82 +            self.add(channel)
    1.83 +            return True
    1.84 +
    1.85 +        # Push the details back onto the end of the waiting list.
    1.86 +
    1.87 +        else:
    1.88 +            self.waiting.append(details)
    1.89 +            return False
    1.90  
    1.91      # Convenience methods.
    1.92  
    1.93 @@ -511,9 +544,21 @@
    1.94          """
    1.95  
    1.96          if self._set_waiting(callable, args, kw):
    1.97 -            return
    1.98 +            return False
    1.99 +
   1.100 +        channel = self._start(callable, *args, **kw)
   1.101 +
   1.102 +        # Monitor any newly-created process.
   1.103  
   1.104 -        self.add_wait(start(callable, *args, **kw))
   1.105 +        if channel:
   1.106 +            self.add_wait(channel)
   1.107 +            return True
   1.108 +
   1.109 +        # Otherwise, add the details to the waiting list unconditionally.
   1.110 +
   1.111 +        else:
   1.112 +            self.waiting.insert(0, (callable, args, kw))
   1.113 +            return False
   1.114  
   1.115      def create(self):
   1.116  
   1.117 @@ -537,6 +582,22 @@
   1.118  
   1.119          return ManagedCallable(callable, self)
   1.120  
   1.121 +    def _start(self, callable, *args, **kw):
   1.122 +
   1.123 +        """
   1.124 +        Create a new process for the given 'callable' using any additional
   1.125 +        arguments provided. Return any successfully created channel or None if
   1.126 +        no process could be created at the present time.
   1.127 +        """
   1.128 +
   1.129 +        try:
   1.130 +            return start(callable, *args, **kw)
   1.131 +        except OSError, exc:
   1.132 +            if exc.errno != errno.EAGAIN:
   1.133 +                raise
   1.134 +            else:
   1.135 +                return None
   1.136 +
   1.137  class Persistent:
   1.138  
   1.139      """