2.1 --- a/pprocess.py Tue May 13 18:05:42 2008 +0000
2.2 +++ b/pprocess.py Sun Jun 01 14:50:28 2008 +0000
2.3 @@ -57,7 +57,6 @@
2.4 self.pid = pid
2.5 self.read_pipe = read_pipe
2.6 self.write_pipe = write_pipe
2.7 - self.closed = 0
2.8
2.9 def __del__(self):
2.10
2.11 @@ -70,11 +69,13 @@
2.12
2.13 "Explicitly close the channel."
2.14
2.15 - if not self.closed:
2.16 - self.closed = 1
2.17 + if self.read_pipe is not None:
2.18 self.read_pipe.close()
2.19 + self.read_pipe = None
2.20 + if self.write_pipe is not None:
2.21 self.write_pipe.close()
2.22 - #self.wait(os.WNOHANG)
2.23 + self.write_pipe = None
2.24 + #self.wait(os.WNOHANG)
2.25
2.26 def wait(self, options=0):
2.27
2.28 @@ -131,6 +132,83 @@
2.29 finally:
2.30 self._send("OK")
2.31
2.32 +class PersistentChannel(Channel):
2.33 +
2.34 + """
2.35 + A persistent communications channel which can handle peer disconnection,
2.36 + acting as a server, meaning that this channel is associated with a specific
2.37 + address which can be contacted by other processes.
2.38 + """
2.39 +
2.40 + def __init__(self, pid, endpoint, address):
2.41 + Channel.__init__(self, pid, None, None)
2.42 + self.endpoint = endpoint
2.43 + self.address = address
2.44 + self.poller = select.poll()
2.45 + self._ensure_pipes()
2.46 +
2.47 + def close(self):
2.48 + #print "Closing persistent channel"
2.49 + Channel.close(self)
2.50 + try:
2.51 + os.unlink(self.address)
2.52 + except OSError:
2.53 + pass
2.54 +
2.55 + def _ensure_pipes(self):
2.56 +
2.57 + "Ensure that the channel is capable of communicating."
2.58 +
2.59 + if self.read_pipe is None or self.write_pipe is None:
2.60 + #print "Waiting for connection"
2.61 + self.endpoint.listen(1)
2.62 + endpoint, address = self.endpoint.accept()
2.63 + self.read_pipe = endpoint.makefile("r", 0)
2.64 + self.write_pipe = endpoint.makefile("w", 0)
2.65 +
2.66 + # Monitor the write pipe for error conditions.
2.67 +
2.68 + fileno = self.write_pipe.fileno()
2.69 + self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
2.70 +
2.71 + def _reset_pipes(self):
2.72 + fileno = self.write_pipe.fileno()
2.73 + self.poller.unregister(fileno)
2.74 + self.read_pipe = None
2.75 + self.write_pipe = None
2.76 +
2.77 + def _ensure_communication(self, timeout=None):
2.78 +
2.79 + "Ensure that sending and receiving are possible."
2.80 +
2.81 + while 1:
2.82 + fileno = self.write_pipe.fileno()
2.83 + fds = self.poller.poll(timeout)
2.84 + for fd, status in fds:
2.85 + if fd != fileno:
2.86 + continue
2.87 + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
2.88 + #print "Broken connection"
2.89 + self._reset_pipes()
2.90 + self._ensure_pipes()
2.91 + break
2.92 + else:
2.93 + return
2.94 +
2.95 + def _send(self, obj):
2.96 +
2.97 + "Send the given object 'obj' through the channel."
2.98 +
2.99 + self._ensure_communication()
2.100 + Channel._send(self, obj)
2.101 +
2.102 + def _receive(self):
2.103 +
2.104 + "Receive an object through the channel, returning the object."
2.105 +
2.106 + self._ensure_communication()
2.107 + return Channel._receive(self)
2.108 +
2.109 # Management of processes and communications.
2.110
2.111 class Exchange:
2.112 @@ -173,8 +251,9 @@
2.113
2.114 "Add the given 'channel' to the exchange."
2.115
2.116 - self.readables[channel.read_pipe.fileno()] = channel
2.117 - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
2.118 + fileno = channel.read_pipe.fileno()
2.119 + self.readables[fileno] = channel
2.120 + self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
2.121
2.122 def active(self):
2.123
2.124 @@ -249,11 +328,11 @@
2.125 while self.limit is not None and len(self.active()) >= self.limit:
2.126 self.store()
2.127
2.128 - def start_waiting(self, channel):
2.129 + def _get_waiting(self, channel):
2.130
2.131 """
2.132 - Start a waiting process given the reception of data on the given
2.133 - 'channel'.
2.134 + Get waiting callable and argument information for new processes, given
2.135 + the reception of data on the given 'channel'.
2.136 """
2.137
2.138 if self.waiting:
2.139 @@ -269,7 +348,7 @@
2.140 self.add(channel)
2.141 channel.send((args, kw))
2.142 else:
2.143 - self.add(start(callable, *args, **kw))
2.144 + return callable, args, kw
2.145
2.146 # Where channels are being reused, but where no processes are waiting
2.147 # any more, send a special value to tell them to quit.
2.148 @@ -277,6 +356,8 @@
2.149 elif self.reuse:
2.150 channel.send(None)
2.151
2.152 + return None
2.153 +
2.154 def finish(self):
2.155
2.156 """
2.157 @@ -303,18 +384,59 @@
2.158
2.159 raise NotImplementedError, "store_data"
2.160
2.161 + # Support for the convenience methods.
2.162 +
2.163 + def _set_waiting(self, callable, args, kw):
2.164 +
2.165 + """
2.166 + Support process creation by returning whether the given 'callable' has
2.167 + been queued for later invocation.
2.168 + """
2.169 +
2.170 + if self.limit is not None and len(self.active()) >= self.limit:
2.171 + self.waiting.insert(0, (callable, args, kw))
2.172 + return 1
2.173 + else:
2.174 + return 0
2.175 +
2.176 + def _get_channel_for_process(self, channel):
2.177 +
2.178 + """
2.179 + Support process creation by returning the given 'channel' to the
2.180 + creating process, and None to the created process.
2.181 + """
2.182 +
2.183 + if channel.pid == 0:
2.184 + return channel
2.185 + else:
2.186 + self.add_wait(channel)
2.187 + return None
2.188 +
2.189 + # Methods for overriding, related to the convenience methods.
2.190 +
2.191 + def start_waiting(self, channel):
2.192 +
2.193 + """
2.194 + Start a waiting process given the reception of data on the given
2.195 + 'channel'.
2.196 + """
2.197 +
2.198 + details = self._get_waiting(channel)
2.199 + if details is not None:
2.200 + callable, args, kw = details
2.201 + self.add(start(callable, *args, **kw))
2.202 +
2.203 # Convenience methods.
2.204
2.205 def start(self, callable, *args, **kw):
2.206
2.207 """
2.208 - Using pprocess.start, create a new process for the given 'callable'
2.209 - using any additional arguments provided. Then, monitor the channel
2.210 - created between this process and the created process.
2.211 + Create a new process for the given 'callable' using any additional
2.212 + arguments provided. Then, monitor the channel created between this
2.213 + process and the created process.
2.214 """
2.215
2.216 - if self.limit is not None and len(self.active()) >= self.limit:
2.217 - self.waiting.insert(0, (callable, args, kw))
2.218 + if self._set_waiting(callable, args, kw):
2.219 return
2.220
2.221 self.add_wait(start(callable, *args, **kw))
2.222 @@ -322,18 +444,14 @@
2.223 def create(self):
2.224
2.225 """
2.226 - Using pprocess.create, create a new process and return the created
2.227 - communications channel to the created process. In the creating process,
2.228 - return None - the channel receiving data from the created process will
2.229 - be automatically managed by this exchange.
2.230 + Create a new process and return the created communications channel to
2.231 + the created process. In the creating process, return None - the channel
2.232 + receiving data from the created process will be automatically managed by
2.233 + this exchange.
2.234 """
2.235
2.236 channel = create()
2.237 - if channel.pid == 0:
2.238 - return channel
2.239 - else:
2.240 - self.add_wait(channel)
2.241 - return None
2.242 + return self._get_channel_for_process(channel)
2.243
2.244 def manage(self, callable):
2.245
2.246 @@ -345,6 +463,67 @@
2.247
2.248 return ManagedCallable(callable, self)
2.249
2.250 +class Persistent:
2.251 +
2.252 + """
2.253 + A mix-in class providing methods to exchanges for the management of
2.254 + persistent communications.
2.255 + """
2.256 +
2.257 + def start_waiting(self, channel):
2.258 +
2.259 + """
2.260 + Start a waiting process given the reception of data on the given
2.261 + 'channel'.
2.262 + """
2.263 +
2.264 + details = self._get_waiting(channel)
2.265 + if details is not None:
2.266 + callable, args, kw = details
2.267 + self.add(start_persistent(channel.address, callable, *args, **kw))
2.268 +
2.269 + def start(self, address, callable, *args, **kw):
2.270 +
2.271 + """
2.272 + Create a new process, located at the given 'address', for the given
2.273 + 'callable' using any additional arguments provided. Then, monitor the
2.274 + channel created between this process and the created process.
2.275 + """
2.276 +
2.277 + if self._set_waiting(callable, args, kw):
2.278 + return
2.279 +
2.280 + start_persistent(address, callable, *args, **kw)
2.281 +
2.282 + def create(self, address):
2.283 +
2.284 + """
2.285 + Create a new process, located at the given 'address', and return the
2.286 + created communications channel to the created process. In the creating
2.287 + process, return None - the channel receiving data from the created
2.288 + process will be automatically managed by this exchange.
2.289 + """
2.290 +
2.291 + channel = create_persistent(address)
2.292 + return self._get_channel_for_process(channel)
2.293 +
2.294 + def manage(self, address, callable):
2.295 +
2.296 + """
2.297 + Using the given 'address', publish the given 'callable' in an object
2.298 + which can then be called in the same way as 'callable', but with new
2.299 + processes and communications managed automatically.
2.300 + """
2.301 +
2.302 + return PersistentCallable(address, callable, self)
2.303 +
2.304 + def connect(self, address):
2.305 +
2.306 + "Connect to a process which is contactable via the given 'address'."
2.307 +
2.308 + channel = connect_persistent(address)
2.309 + self.add_wait(channel)
2.310 +
2.311 class ManagedCallable:
2.312
2.313 "A callable managed by an exchange."
2.314 @@ -368,6 +547,31 @@
2.315
2.316 self.exchange.start(self.callable, *args, **kw)
2.317
2.318 +class PersistentCallable:
2.319 +
2.320 + "A callable which sets up a persistent communications channel."
2.321 +
2.322 + def __init__(self, address, callable, exchange):
2.323 +
2.324 + """
2.325 + Using the given 'address', wrap the given 'callable', using the given
2.326 + 'exchange' to monitor the channels created for communications between
2.327 + this and the created processes, so that when it is called, a background
2.328 + process is started within which the 'callable' will run. Note that the
2.329 + 'callable' must be parallel-aware (that is, have a 'channel' parameter).
2.330 + Use the MakeParallel class to wrap other kinds of callable objects.
2.331 + """
2.332 +
2.333 + self.callable = callable
2.334 + self.exchange = exchange
2.335 + self.address = address
2.336 +
2.337 + def __call__(self, *args, **kw):
2.338 +
2.339 + "Invoke the callable and discard the result."
2.340 +
2.341 + self.exchange.start(self.address, self.callable, *args, **kw)
2.342 +
2.343 # Abstractions and utilities.
2.344
2.345 class Map(Exchange):
2.346 @@ -397,9 +601,9 @@
2.347 def start(self, callable, *args, **kw):
2.348
2.349 """
2.350 - Using pprocess.start, create a new process for the given 'callable'
2.351 - using any additional arguments provided. Then, monitor the channel
2.352 - created between this process and the created process.
2.353 + Create a new process for the given 'callable' using any additional
2.354 + arguments provided. Then, monitor the channel created between this
2.355 + process and the created process.
2.356 """
2.357
2.358 self.results.append(None) # placeholder
2.359 @@ -408,10 +612,10 @@
2.360 def create(self):
2.361
2.362 """
2.363 - Using pprocess.create, create a new process and return the created
2.364 - communications channel to the created process. In the creating process,
2.365 - return None - the channel receiving data from the created process will
2.366 - be automatically managed by this exchange.
2.367 + Create a new process and return the created communications channel to
2.368 + the created process. In the creating process, return None - the channel
2.369 + receiving data from the created process will be automatically managed by
2.370 + this exchange.
2.371 """
2.372
2.373 self.results.append(None) # placeholder
2.374 @@ -526,6 +730,20 @@
2.375 channel.send(self.callable(*args, **kw))
2.376 t = channel.receive()
2.377
2.378 +# Persistent variants.
2.379 +
2.380 +class PersistentExchange(Persistent, Exchange):
2.381 +
2.382 + "An exchange which manages persistent communications."
2.383 +
2.384 + pass
2.385 +
2.386 +class PersistentQueue(Persistent, Queue):
2.387 +
2.388 + "A queue which manages persistent communications."
2.389 +
2.390 + pass
2.391 +
2.392 # Utility functions.
2.393
2.394 _cpuinfo_fields = "physical id", "core id"
2.395 @@ -584,6 +802,47 @@
2.396 child.close()
2.397 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
2.398
2.399 +def create_persistent(address):
2.400 +
2.401 + """
2.402 + Create a new process, returning a persistent communications channel between
2.403 + the creating process and the created process. This channel can be
2.404 + disconnected from the creating process and connected to another process, and
2.405 + thus can be used to collect results from daemon processes.
2.406 +
2.407 + In order to be able to reconnect to created processes, the 'address' of the
2.408 + communications endpoint for the created process needs to be provided. This
2.409 + should be a filename.
2.410 + """
2.411 +
2.412 + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
2.413 + child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
2.414 + child.bind(address)
2.415 +
2.416 + for s in [parent, child]:
2.417 + s.setblocking(1)
2.418 +
2.419 + pid = os.fork()
2.420 + if pid == 0:
2.421 + parent.close()
2.422 + return PersistentChannel(pid, child, address)
2.423 + else:
2.424 + child.close()
2.425 + #parent.connect(address)
2.426 + return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
2.427 +
2.428 +def connect_persistent(address):
2.429 +
2.430 + """
2.431 + Connect via a persistent channel to an existing created process, reachable
2.432 + at the given 'address'.
2.433 + """
2.434 +
2.435 + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
2.436 + parent.setblocking(1)
2.437 + parent.connect(address)
2.438 + return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
2.439 +
2.440 def exit(channel):
2.441
2.442 """
2.443 @@ -618,6 +877,47 @@
2.444 else:
2.445 return channel
2.446
2.447 +def start_persistent(address, callable, *args, **kw):
2.448 +
2.449 + """
2.450 + Create a new process which shall be reachable using the given 'address' and
2.451 + which will start running in the given 'callable'. Additional arguments to
2.452 + the 'callable' can be given as additional arguments to this function.
2.453 +
2.454 + Return a communications channel to the creating process. For the created
2.455 + process, supply a channel as the 'channel' parameter in the given 'callable'
2.456 + so that it may send data back to the creating process.
2.457 +
2.458 + Note that the created process employs a channel which is persistent: it can
2.459 + withstand disconnection from the creating process and subsequent connections
2.460 + from other processes.
2.461 + """
2.462 +
2.463 + channel = create_persistent(address)
2.464 + if channel.pid == 0:
2.465 + close_streams()
2.466 + try:
2.467 + try:
2.468 + callable(channel, *args, **kw)
2.469 + except:
2.470 + exc_type, exc_value, exc_traceback = sys.exc_info()
2.471 + channel.send(exc_value)
2.472 + finally:
2.473 + exit(channel)
2.474 + else:
2.475 + return channel
2.476 +
2.477 +def close_streams():
2.478 +
2.479 + """
2.480 + Close streams which keep the current process attached to any creating
2.481 + processes.
2.482 + """
2.483 +
2.484 + os.close(sys.stdin.fileno())
2.485 + os.close(sys.stdout.fileno())
2.486 + os.close(sys.stderr.fileno())
2.487 +
2.488 def waitall():
2.489
2.490 "Wait for all created processes to terminate."