1.1 --- a/pprocess.py Sun Jun 01 14:50:46 2008 +0000
1.2 +++ b/pprocess.py Sun Jun 01 23:41:22 2008 +0000
1.3 @@ -20,7 +20,7 @@
1.4 with this program. If not, see <http://www.gnu.org/licenses/>.
1.5 """
1.6
1.7 -__version__ = "0.3.2"
1.8 +__version__ = "0.4"
1.9
1.10 import os
1.11 import sys
1.12 @@ -145,10 +145,17 @@
1.13 self.endpoint = endpoint
1.14 self.address = address
1.15 self.poller = select.poll()
1.16 - self._ensure_pipes()
1.17 +
1.18 + # Listen for connections before this process is interested in
1.19 + # communicating. It is not desirable to wait for connections at this
1.20 + # point because this will block the process.
1.21 +
1.22 + self.endpoint.listen(1)
1.23
1.24 def close(self):
1.25 - #print "Closing persistent channel"
1.26 +
1.27 + "Close the persistent channel and remove the socket file."
1.28 +
1.29 Channel.close(self)
1.30 try:
1.31 os.unlink(self.address)
1.32 @@ -160,8 +167,9 @@
1.33 "Ensure that the channel is capable of communicating."
1.34
1.35 if self.read_pipe is None or self.write_pipe is None:
1.36 - #print "Waiting for connection"
1.37 - self.endpoint.listen(1)
1.38 +
1.39 + # Accept any incoming connections.
1.40 +
1.41 endpoint, address = self.endpoint.accept()
1.42 self.read_pipe = endpoint.makefile("r", 0)
1.43 self.write_pipe = endpoint.makefile("w", 0)
1.44 @@ -172,25 +180,31 @@
1.45 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.46
1.47 def _reset_pipes(self):
1.48 +
1.49 + "Discard the existing connection."
1.50 +
1.51 fileno = self.write_pipe.fileno()
1.52 self.poller.unregister(fileno)
1.53 self.read_pipe = None
1.54 self.write_pipe = None
1.55 + self.endpoint.listen(1)
1.56
1.57 def _ensure_communication(self, timeout=None):
1.58
1.59 "Ensure that sending and receiving are possible."
1.60
1.61 while 1:
1.62 + self._ensure_pipes()
1.63 fileno = self.write_pipe.fileno()
1.64 fds = self.poller.poll(timeout)
1.65 for fd, status in fds:
1.66 if fd != fileno:
1.67 continue
1.68 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
1.69 - #print "Broken connection"
1.70 +
1.71 + # Broken connection: discard it and start all over again.
1.72 +
1.73 self._reset_pipes()
1.74 - self._ensure_pipes()
1.75 break
1.76 else:
1.77 return
1.78 @@ -298,8 +312,9 @@
1.79 Remove the given 'channel' from the exchange.
1.80 """
1.81
1.82 - del self.readables[channel.read_pipe.fileno()]
1.83 - self.poller.unregister(channel.read_pipe.fileno())
1.84 + fileno = channel.read_pipe.fileno()
1.85 + del self.readables[fileno]
1.86 + self.poller.unregister(fileno)
1.87 if self.autoclose:
1.88 channel.close()
1.89 channel.wait()
1.90 @@ -568,10 +583,36 @@
1.91
1.92 def __call__(self, *args, **kw):
1.93
1.94 - "Invoke the callable and discard the result."
1.95 + "Invoke the callable with the supplied arguments."
1.96
1.97 self.exchange.start(self.address, self.callable, *args, **kw)
1.98
1.99 +class BackgroundCallable:
1.100 +
1.101 + """
1.102 + A callable which sets up a persistent communications channel, but is
1.103 + unmanaged by an exchange.
1.104 + """
1.105 +
1.106 + def __init__(self, address, callable):
1.107 +
1.108 + """
1.109 + Using the given 'address', wrap the given 'callable'. This object can
1.110 + then be invoked, but the wrapped callable will be run in a background
1.111 + process. Note that the 'callable' must be parallel-aware (that is, have
1.112 + a 'channel' parameter). Use the MakeParallel class to wrap other kinds
1.113 + of callable objects.
1.114 + """
1.115 +
1.116 + self.callable = callable
1.117 + self.address = address
1.118 +
1.119 + def __call__(self, *args, **kw):
1.120 +
1.121 + "Invoke the callable with the supplied arguments."
1.122 +
1.123 + start_persistent(self.address, self.callable, *args, **kw)
1.124 +
1.125 # Abstractions and utilities.
1.126
1.127 class Map(Exchange):
1.128 @@ -744,6 +785,34 @@
1.129
1.130 pass
1.131
1.132 +# Convenience functions.
1.133 +
1.134 +def BackgroundQueue(address):
1.135 +
1.136 + """
1.137 + Connect to a process reachable via the given 'address', making the results
1.138 + of which accessible via a queue.
1.139 + """
1.140 +
1.141 + queue = PersistentQueue(limit=1)
1.142 + queue.connect(address)
1.143 + return queue
1.144 +
1.145 +def pmap(callable, sequence, limit=None):
1.146 +
1.147 + """
1.148 + A parallel version of the built-in map function with an optional process
1.149 + 'limit'. The given 'callable' should not be parallel-aware (that is, have a
1.150 + 'channel' parameter) since it will be wrapped for parallel communications
1.151 + before being invoked.
1.152 +
1.153 + Return the processed 'sequence' where each element in the sequence is
1.154 + processed by a different process.
1.155 + """
1.156 +
1.157 + mymap = Map(limit=limit)
1.158 + return mymap(callable, sequence)
1.159 +
1.160 # Utility functions.
1.161
1.162 _cpuinfo_fields = "physical id", "core id"
1.163 @@ -783,6 +852,8 @@
1.164 except OSError:
1.165 return None
1.166
1.167 +# Low-level functions.
1.168 +
1.169 def create():
1.170
1.171 """
1.172 @@ -928,19 +999,4 @@
1.173 except OSError:
1.174 pass
1.175
1.176 -def pmap(callable, sequence, limit=None):
1.177 -
1.178 - """
1.179 - A parallel version of the built-in map function with an optional process
1.180 - 'limit'. The given 'callable' should not be parallel-aware (that is, have a
1.181 - 'channel' parameter) since it will be wrapped for parallel communications
1.182 - before being invoked.
1.183 -
1.184 - Return the processed 'sequence' where each element in the sequence is
1.185 - processed by a different process.
1.186 - """
1.187 -
1.188 - mymap = Map(limit=limit)
1.189 - return mymap(callable, sequence)
1.190 -
1.191 # vim: tabstop=4 expandtab shiftwidth=4