pprocess

Changeset

142:f79af4b941b8
2008-06-01 paulb raw files shortlog changelog graph [project @ 2008-06-01 23:41:22 by paulb] Updated release information. Fixed connection management in PersistentChannel. Added convenience classes and functions for background processes.
pprocess.py (file)
     1.1 --- a/pprocess.py	Sun Jun 01 14:50:46 2008 +0000
     1.2 +++ b/pprocess.py	Sun Jun 01 23:41:22 2008 +0000
     1.3 @@ -20,7 +20,7 @@
     1.4  with this program.  If not, see <http://www.gnu.org/licenses/>.
     1.5  """
     1.6  
     1.7 -__version__ = "0.3.2"
     1.8 +__version__ = "0.4"
     1.9  
    1.10  import os
    1.11  import sys
    1.12 @@ -145,10 +145,17 @@
    1.13          self.endpoint = endpoint
    1.14          self.address = address
    1.15          self.poller = select.poll()
    1.16 -        self._ensure_pipes()
    1.17 +
    1.18 +        # Listen for connections before this process is interested in
    1.19 +        # communicating. It is not desirable to wait for connections at this
    1.20 +        # point because this will block the process.
    1.21 +
    1.22 +        self.endpoint.listen(1)
    1.23  
    1.24      def close(self):
    1.25 -        #print "Closing persistent channel"
    1.26 +
    1.27 +        "Close the persistent channel and remove the socket file."
    1.28 +
    1.29          Channel.close(self)
    1.30          try:
    1.31              os.unlink(self.address)
    1.32 @@ -160,8 +167,9 @@
    1.33          "Ensure that the channel is capable of communicating."
    1.34  
    1.35          if self.read_pipe is None or self.write_pipe is None:
    1.36 -            #print "Waiting for connection"
    1.37 -            self.endpoint.listen(1)
    1.38 +
    1.39 +            # Accept any incoming connections.
    1.40 +
    1.41              endpoint, address = self.endpoint.accept()
    1.42              self.read_pipe = endpoint.makefile("r", 0)
    1.43              self.write_pipe = endpoint.makefile("w", 0)
    1.44 @@ -172,25 +180,31 @@
    1.45              self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
    1.46  
    1.47      def _reset_pipes(self):
    1.48 +
    1.49 +        "Discard the existing connection."
    1.50 +
    1.51          fileno = self.write_pipe.fileno()
    1.52          self.poller.unregister(fileno)
    1.53          self.read_pipe = None
    1.54          self.write_pipe = None
    1.55 +        self.endpoint.listen(1)
    1.56  
    1.57      def _ensure_communication(self, timeout=None):
    1.58  
    1.59          "Ensure that sending and receiving are possible."
    1.60  
    1.61          while 1:
    1.62 +            self._ensure_pipes()
    1.63              fileno = self.write_pipe.fileno()
    1.64              fds = self.poller.poll(timeout)
    1.65              for fd, status in fds:
    1.66                  if fd != fileno:
    1.67                      continue
    1.68                  if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
    1.69 -                    #print "Broken connection"
    1.70 +
    1.71 +                    # Broken connection: discard it and start all over again.
    1.72 +
    1.73                      self._reset_pipes()
    1.74 -                    self._ensure_pipes()
    1.75                      break
    1.76              else:
    1.77                  return
    1.78 @@ -298,8 +312,9 @@
    1.79          Remove the given 'channel' from the exchange.
    1.80          """
    1.81  
    1.82 -        del self.readables[channel.read_pipe.fileno()]
    1.83 -        self.poller.unregister(channel.read_pipe.fileno())
    1.84 +        fileno = channel.read_pipe.fileno()
    1.85 +        del self.readables[fileno]
    1.86 +        self.poller.unregister(fileno)
    1.87          if self.autoclose:
    1.88              channel.close()
    1.89              channel.wait()
    1.90 @@ -568,10 +583,36 @@
    1.91  
    1.92      def __call__(self, *args, **kw):
    1.93  
    1.94 -        "Invoke the callable and discard the result."
    1.95 +        "Invoke the callable with the supplied arguments."
    1.96  
    1.97          self.exchange.start(self.address, self.callable, *args, **kw)
    1.98  
    1.99 +class BackgroundCallable:
   1.100 +
   1.101 +    """
   1.102 +    A callable which sets up a persistent communications channel, but is
   1.103 +    unmanaged by an exchange.
   1.104 +    """
   1.105 +
   1.106 +    def __init__(self, address, callable):
   1.107 +
   1.108 +        """
   1.109 +        Using the given 'address', wrap the given 'callable'. This object can
   1.110 +        then be invoked, but the wrapped callable will be run in a background
   1.111 +        process. Note that the 'callable' must be parallel-aware (that is, have
   1.112 +        a 'channel' parameter). Use the MakeParallel class to wrap other kinds
   1.113 +        of callable objects.
   1.114 +        """
   1.115 +
   1.116 +        self.callable = callable
   1.117 +        self.address = address
   1.118 +
   1.119 +    def __call__(self, *args, **kw):
   1.120 +
   1.121 +        "Invoke the callable with the supplied arguments."
   1.122 +
   1.123 +        start_persistent(self.address, self.callable, *args, **kw)
   1.124 +
   1.125  # Abstractions and utilities.
   1.126  
   1.127  class Map(Exchange):
   1.128 @@ -744,6 +785,34 @@
   1.129  
   1.130      pass
   1.131  
   1.132 +# Convenience functions.
   1.133 +
   1.134 +def BackgroundQueue(address):
   1.135 +
   1.136 +    """
   1.137 +    Connect to a process reachable via the given 'address', making the results
   1.138 +    of which accessible via a queue.
   1.139 +    """
   1.140 +
   1.141 +    queue = PersistentQueue(limit=1)
   1.142 +    queue.connect(address)
   1.143 +    return queue
   1.144 +
   1.145 +def pmap(callable, sequence, limit=None):
   1.146 +
   1.147 +    """
   1.148 +    A parallel version of the built-in map function with an optional process
   1.149 +    'limit'. The given 'callable' should not be parallel-aware (that is, have a
   1.150 +    'channel' parameter) since it will be wrapped for parallel communications
   1.151 +    before being invoked.
   1.152 +
   1.153 +    Return the processed 'sequence' where each element in the sequence is
   1.154 +    processed by a different process.
   1.155 +    """
   1.156 +
   1.157 +    mymap = Map(limit=limit)
   1.158 +    return mymap(callable, sequence)
   1.159 +
   1.160  # Utility functions.
   1.161  
   1.162  _cpuinfo_fields = "physical id", "core id"
   1.163 @@ -783,6 +852,8 @@
   1.164      except OSError:
   1.165          return None
   1.166  
   1.167 +# Low-level functions.
   1.168 +
   1.169  def create():
   1.170  
   1.171      """
   1.172 @@ -928,19 +999,4 @@
   1.173      except OSError:
   1.174          pass
   1.175  
   1.176 -def pmap(callable, sequence, limit=None):
   1.177 -
   1.178 -    """
   1.179 -    A parallel version of the built-in map function with an optional process
   1.180 -    'limit'. The given 'callable' should not be parallel-aware (that is, have a
   1.181 -    'channel' parameter) since it will be wrapped for parallel communications
   1.182 -    before being invoked.
   1.183 -
   1.184 -    Return the processed 'sequence' where each element in the sequence is
   1.185 -    processed by a different process.
   1.186 -    """
   1.187 -
   1.188 -    mymap = Map(limit=limit)
   1.189 -    return mymap(callable, sequence)
   1.190 -
   1.191  # vim: tabstop=4 expandtab shiftwidth=4