1.1 --- a/pprocess.py Tue May 13 18:05:42 2008 +0000
1.2 +++ b/pprocess.py Sun Jun 01 14:50:28 2008 +0000
1.3 @@ -57,7 +57,6 @@
1.4 self.pid = pid
1.5 self.read_pipe = read_pipe
1.6 self.write_pipe = write_pipe
1.7 - self.closed = 0
1.8
1.9 def __del__(self):
1.10
1.11 @@ -70,11 +69,13 @@
1.12
1.13 "Explicitly close the channel."
1.14
1.15 - if not self.closed:
1.16 - self.closed = 1
1.17 + if self.read_pipe is not None:
1.18 self.read_pipe.close()
1.19 + self.read_pipe = None
1.20 + if self.write_pipe is not None:
1.21 self.write_pipe.close()
1.22 - #self.wait(os.WNOHANG)
1.23 + self.write_pipe = None
1.24 + #self.wait(os.WNOHANG)
1.25
1.26 def wait(self, options=0):
1.27
1.28 @@ -131,6 +132,83 @@
1.29 finally:
1.30 self._send("OK")
1.31
1.32 +class PersistentChannel(Channel):
1.33 +
1.34 + """
1.35 + A persistent communications channel which can handle peer disconnection,
1.36 + acting as a server, meaning that this channel is associated with a specific
1.37 + address which can be contacted by other processes.
1.38 + """
1.39 +
1.40 + def __init__(self, pid, endpoint, address):
1.41 + Channel.__init__(self, pid, None, None)
1.42 + self.endpoint = endpoint
1.43 + self.address = address
1.44 + self.poller = select.poll()
1.45 + self._ensure_pipes()
1.46 +
1.47 + def close(self):
1.48 + #print "Closing persistent channel"
1.49 + Channel.close(self)
1.50 + try:
1.51 + os.unlink(self.address)
1.52 + except OSError:
1.53 + pass
1.54 +
1.55 + def _ensure_pipes(self):
1.56 +
1.57 + "Ensure that the channel is capable of communicating."
1.58 +
1.59 + if self.read_pipe is None or self.write_pipe is None:
1.60 + #print "Waiting for connection"
1.61 + self.endpoint.listen(1)
1.62 + endpoint, address = self.endpoint.accept()
1.63 + self.read_pipe = endpoint.makefile("r", 0)
1.64 + self.write_pipe = endpoint.makefile("w", 0)
1.65 +
1.66 + # Monitor the write pipe for error conditions.
1.67 +
1.68 + fileno = self.write_pipe.fileno()
1.69 + self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.70 +
1.71 + def _reset_pipes(self):
1.72 + fileno = self.write_pipe.fileno()
1.73 + self.poller.unregister(fileno)
1.74 + self.read_pipe = None
1.75 + self.write_pipe = None
1.76 +
1.77 + def _ensure_communication(self, timeout=None):
1.78 +
1.79 + "Ensure that sending and receiving are possible."
1.80 +
1.81 + while 1:
1.82 + fileno = self.write_pipe.fileno()
1.83 + fds = self.poller.poll(timeout)
1.84 + for fd, status in fds:
1.85 + if fd != fileno:
1.86 + continue
1.87 + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
1.88 + #print "Broken connection"
1.89 + self._reset_pipes()
1.90 + self._ensure_pipes()
1.91 + break
1.92 + else:
1.93 + return
1.94 +
1.95 + def _send(self, obj):
1.96 +
1.97 + "Send the given object 'obj' through the channel."
1.98 +
1.99 + self._ensure_communication()
1.100 + Channel._send(self, obj)
1.101 +
1.102 + def _receive(self):
1.103 +
1.104 + "Receive an object through the channel, returning the object."
1.105 +
1.106 + self._ensure_communication()
1.107 + return Channel._receive(self)
1.108 +
1.109 # Management of processes and communications.
1.110
1.111 class Exchange:
1.112 @@ -173,8 +251,9 @@
1.113
1.114 "Add the given 'channel' to the exchange."
1.115
1.116 - self.readables[channel.read_pipe.fileno()] = channel
1.117 - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.118 + fileno = channel.read_pipe.fileno()
1.119 + self.readables[fileno] = channel
1.120 + self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
1.121
1.122 def active(self):
1.123
1.124 @@ -249,11 +328,11 @@
1.125 while self.limit is not None and len(self.active()) >= self.limit:
1.126 self.store()
1.127
1.128 - def start_waiting(self, channel):
1.129 + def _get_waiting(self, channel):
1.130
1.131 """
1.132 - Start a waiting process given the reception of data on the given
1.133 - 'channel'.
1.134 + Get waiting callable and argument information for new processes, given
1.135 + the reception of data on the given 'channel'.
1.136 """
1.137
1.138 if self.waiting:
1.139 @@ -269,7 +348,7 @@
1.140 self.add(channel)
1.141 channel.send((args, kw))
1.142 else:
1.143 - self.add(start(callable, *args, **kw))
1.144 + return callable, args, kw
1.145
1.146 # Where channels are being reused, but where no processes are waiting
1.147 # any more, send a special value to tell them to quit.
1.148 @@ -277,6 +356,8 @@
1.149 elif self.reuse:
1.150 channel.send(None)
1.151
1.152 + return None
1.153 +
1.154 def finish(self):
1.155
1.156 """
1.157 @@ -303,18 +384,59 @@
1.158
1.159 raise NotImplementedError, "store_data"
1.160
1.161 + # Support for the convenience methods.
1.162 +
1.163 + def _set_waiting(self, callable, args, kw):
1.164 +
1.165 + """
1.166 + Support process creation by returning whether the given 'callable' has
1.167 + been queued for later invocation.
1.168 + """
1.169 +
1.170 + if self.limit is not None and len(self.active()) >= self.limit:
1.171 + self.waiting.insert(0, (callable, args, kw))
1.172 + return 1
1.173 + else:
1.174 + return 0
1.175 +
1.176 + def _get_channel_for_process(self, channel):
1.177 +
1.178 + """
1.179 + Support process creation by returning the given 'channel' to the
1.180 + creating process, and None to the created process.
1.181 + """
1.182 +
1.183 + if channel.pid == 0:
1.184 + return channel
1.185 + else:
1.186 + self.add_wait(channel)
1.187 + return None
1.188 +
1.189 + # Methods for overriding, related to the convenience methods.
1.190 +
1.191 + def start_waiting(self, channel):
1.192 +
1.193 + """
1.194 + Start a waiting process given the reception of data on the given
1.195 + 'channel'.
1.196 + """
1.197 +
1.198 + details = self._get_waiting(channel)
1.199 + if details is not None:
1.200 + callable, args, kw = details
1.201 + self.add(start(callable, *args, **kw))
1.202 +
1.203 # Convenience methods.
1.204
1.205 def start(self, callable, *args, **kw):
1.206
1.207 """
1.208 - Using pprocess.start, create a new process for the given 'callable'
1.209 - using any additional arguments provided. Then, monitor the channel
1.210 - created between this process and the created process.
1.211 + Create a new process for the given 'callable' using any additional
1.212 + arguments provided. Then, monitor the channel created between this
1.213 + process and the created process.
1.214 """
1.215
1.216 - if self.limit is not None and len(self.active()) >= self.limit:
1.217 - self.waiting.insert(0, (callable, args, kw))
1.218 + if self._set_waiting(callable, args, kw):
1.219 return
1.220
1.221 self.add_wait(start(callable, *args, **kw))
1.222 @@ -322,18 +444,14 @@
1.223 def create(self):
1.224
1.225 """
1.226 - Using pprocess.create, create a new process and return the created
1.227 - communications channel to the created process. In the creating process,
1.228 - return None - the channel receiving data from the created process will
1.229 - be automatically managed by this exchange.
1.230 + Create a new process and return the created communications channel to
1.231 + the created process. In the creating process, return None - the channel
1.232 + receiving data from the created process will be automatically managed by
1.233 + this exchange.
1.234 """
1.235
1.236 channel = create()
1.237 - if channel.pid == 0:
1.238 - return channel
1.239 - else:
1.240 - self.add_wait(channel)
1.241 - return None
1.242 + return self._get_channel_for_process(channel)
1.243
1.244 def manage(self, callable):
1.245
1.246 @@ -345,6 +463,67 @@
1.247
1.248 return ManagedCallable(callable, self)
1.249
1.250 +class Persistent:
1.251 +
1.252 + """
1.253 + A mix-in class providing methods to exchanges for the management of
1.254 + persistent communications.
1.255 + """
1.256 +
1.257 + def start_waiting(self, channel):
1.258 +
1.259 + """
1.260 + Start a waiting process given the reception of data on the given
1.261 + 'channel'.
1.262 + """
1.263 +
1.264 + details = self._get_waiting(channel)
1.265 + if details is not None:
1.266 + callable, args, kw = details
1.267 + self.add(start_persistent(channel.address, callable, *args, **kw))
1.268 +
1.269 + def start(self, address, callable, *args, **kw):
1.270 +
1.271 + """
1.272 + Create a new process, located at the given 'address', for the given
1.273 + 'callable' using any additional arguments provided. Then, monitor the
1.274 + channel created between this process and the created process.
1.275 + """
1.276 +
1.277 + if self._set_waiting(callable, args, kw):
1.278 + return
1.279 +
1.280 + start_persistent(address, callable, *args, **kw)
1.281 +
1.282 + def create(self, address):
1.283 +
1.284 + """
1.285 + Create a new process, located at the given 'address', and return the
1.286 + created communications channel to the created process. In the creating
1.287 + process, return None - the channel receiving data from the created
1.288 + process will be automatically managed by this exchange.
1.289 + """
1.290 +
1.291 + channel = create_persistent(address)
1.292 + return self._get_channel_for_process(channel)
1.293 +
1.294 + def manage(self, address, callable):
1.295 +
1.296 + """
1.297 + Using the given 'address', publish the given 'callable' in an object
1.298 + which can then be called in the same way as 'callable', but with new
1.299 + processes and communications managed automatically.
1.300 + """
1.301 +
1.302 + return PersistentCallable(address, callable, self)
1.303 +
1.304 + def connect(self, address):
1.305 +
1.306 + "Connect to a process which is contactable via the given 'address'."
1.307 +
1.308 + channel = connect_persistent(address)
1.309 + self.add_wait(channel)
1.310 +
1.311 class ManagedCallable:
1.312
1.313 "A callable managed by an exchange."
1.314 @@ -368,6 +547,31 @@
1.315
1.316 self.exchange.start(self.callable, *args, **kw)
1.317
1.318 +class PersistentCallable:
1.319 +
1.320 + "A callable which sets up a persistent communications channel."
1.321 +
1.322 + def __init__(self, address, callable, exchange):
1.323 +
1.324 + """
1.325 + Using the given 'address', wrap the given 'callable', using the given
1.326 + 'exchange' to monitor the channels created for communications between
1.327 + this and the created processes, so that when it is called, a background
1.328 + process is started within which the 'callable' will run. Note that the
1.329 + 'callable' must be parallel-aware (that is, have a 'channel' parameter).
1.330 + Use the MakeParallel class to wrap other kinds of callable objects.
1.331 + """
1.332 +
1.333 + self.callable = callable
1.334 + self.exchange = exchange
1.335 + self.address = address
1.336 +
1.337 + def __call__(self, *args, **kw):
1.338 +
1.339 + "Invoke the callable and discard the result."
1.340 +
1.341 + self.exchange.start(self.address, self.callable, *args, **kw)
1.342 +
1.343 # Abstractions and utilities.
1.344
1.345 class Map(Exchange):
1.346 @@ -397,9 +601,9 @@
1.347 def start(self, callable, *args, **kw):
1.348
1.349 """
1.350 - Using pprocess.start, create a new process for the given 'callable'
1.351 - using any additional arguments provided. Then, monitor the channel
1.352 - created between this process and the created process.
1.353 + Create a new process for the given 'callable' using any additional
1.354 + arguments provided. Then, monitor the channel created between this
1.355 + process and the created process.
1.356 """
1.357
1.358 self.results.append(None) # placeholder
1.359 @@ -408,10 +612,10 @@
1.360 def create(self):
1.361
1.362 """
1.363 - Using pprocess.create, create a new process and return the created
1.364 - communications channel to the created process. In the creating process,
1.365 - return None - the channel receiving data from the created process will
1.366 - be automatically managed by this exchange.
1.367 + Create a new process and return the created communications channel to
1.368 + the created process. In the creating process, return None - the channel
1.369 + receiving data from the created process will be automatically managed by
1.370 + this exchange.
1.371 """
1.372
1.373 self.results.append(None) # placeholder
1.374 @@ -526,6 +730,20 @@
1.375 channel.send(self.callable(*args, **kw))
1.376 t = channel.receive()
1.377
1.378 +# Persistent variants.
1.379 +
1.380 +class PersistentExchange(Persistent, Exchange):
1.381 +
1.382 + "An exchange which manages persistent communications."
1.383 +
1.384 + pass
1.385 +
1.386 +class PersistentQueue(Persistent, Queue):
1.387 +
1.388 + "A queue which manages persistent communications."
1.389 +
1.390 + pass
1.391 +
1.392 # Utility functions.
1.393
1.394 _cpuinfo_fields = "physical id", "core id"
1.395 @@ -584,6 +802,47 @@
1.396 child.close()
1.397 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1.398
1.399 +def create_persistent(address):
1.400 +
1.401 + """
1.402 + Create a new process, returning a persistent communications channel between
1.403 + the creating process and the created process. This channel can be
1.404 + disconnected from the creating process and connected to another process, and
1.405 + thus can be used to collect results from daemon processes.
1.406 +
1.407 + In order to be able to reconnect to created processes, the 'address' of the
1.408 + communications endpoint for the created process needs to be provided. This
1.409 + should be a filename.
1.410 + """
1.411 +
1.412 + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1.413 + child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1.414 + child.bind(address)
1.415 +
1.416 + for s in [parent, child]:
1.417 + s.setblocking(1)
1.418 +
1.419 + pid = os.fork()
1.420 + if pid == 0:
1.421 + parent.close()
1.422 + return PersistentChannel(pid, child, address)
1.423 + else:
1.424 + child.close()
1.425 + #parent.connect(address)
1.426 + return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1.427 +
1.428 +def connect_persistent(address):
1.429 +
1.430 + """
1.431 + Connect via a persistent channel to an existing created process, reachable
1.432 + at the given 'address'.
1.433 + """
1.434 +
1.435 + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1.436 + parent.setblocking(1)
1.437 + parent.connect(address)
1.438 + return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
1.439 +
1.440 def exit(channel):
1.441
1.442 """
1.443 @@ -618,6 +877,47 @@
1.444 else:
1.445 return channel
1.446
1.447 +def start_persistent(address, callable, *args, **kw):
1.448 +
1.449 + """
1.450 + Create a new process which shall be reachable using the given 'address' and
1.451 + which will start running in the given 'callable'. Additional arguments to
1.452 + the 'callable' can be given as additional arguments to this function.
1.453 +
1.454 + Return a communications channel to the creating process. For the created
1.455 + process, supply a channel as the 'channel' parameter in the given 'callable'
1.456 + so that it may send data back to the creating process.
1.457 +
1.458 + Note that the created process employs a channel which is persistent: it can
1.459 + withstand disconnection from the creating process and subsequent connections
1.460 + from other processes.
1.461 + """
1.462 +
1.463 + channel = create_persistent(address)
1.464 + if channel.pid == 0:
1.465 + close_streams()
1.466 + try:
1.467 + try:
1.468 + callable(channel, *args, **kw)
1.469 + except:
1.470 + exc_type, exc_value, exc_traceback = sys.exc_info()
1.471 + channel.send(exc_value)
1.472 + finally:
1.473 + exit(channel)
1.474 + else:
1.475 + return channel
1.476 +
1.477 +def close_streams():
1.478 +
1.479 + """
1.480 + Close streams which keep the current process attached to any creating
1.481 + processes.
1.482 + """
1.483 +
1.484 + os.close(sys.stdin.fileno())
1.485 + os.close(sys.stdout.fileno())
1.486 + os.close(sys.stderr.fileno())
1.487 +
1.488 def waitall():
1.489
1.490 "Wait for all created processes to terminate."