# HG changeset patch # User paulb # Date 1212363682 0 # Node ID f79af4b941b88a873f90262e5d82e8a38f9296d9 # Parent 7ffca924cff4746943b2d92cc48f5b1ec722c0d8 [project @ 2008-06-01 23:41:22 by paulb] Updated release information. Fixed connection management in PersistentChannel. Added convenience classes and functions for background processes. diff -r 7ffca924cff4 -r f79af4b941b8 pprocess.py --- a/pprocess.py Sun Jun 01 14:50:46 2008 +0000 +++ b/pprocess.py Sun Jun 01 23:41:22 2008 +0000 @@ -20,7 +20,7 @@ with this program. If not, see . """ -__version__ = "0.3.2" +__version__ = "0.4" import os import sys @@ -145,10 +145,17 @@ self.endpoint = endpoint self.address = address self.poller = select.poll() - self._ensure_pipes() + + # Listen for connections before this process is interested in + # communicating. It is not desirable to wait for connections at this + # point because this will block the process. + + self.endpoint.listen(1) def close(self): - #print "Closing persistent channel" + + "Close the persistent channel and remove the socket file." + Channel.close(self) try: os.unlink(self.address) @@ -160,8 +167,9 @@ "Ensure that the channel is capable of communicating." if self.read_pipe is None or self.write_pipe is None: - #print "Waiting for connection" - self.endpoint.listen(1) + + # Accept any incoming connections. + endpoint, address = self.endpoint.accept() self.read_pipe = endpoint.makefile("r", 0) self.write_pipe = endpoint.makefile("w", 0) @@ -172,25 +180,31 @@ self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) def _reset_pipes(self): + + "Discard the existing connection." + fileno = self.write_pipe.fileno() self.poller.unregister(fileno) self.read_pipe = None self.write_pipe = None + self.endpoint.listen(1) def _ensure_communication(self, timeout=None): "Ensure that sending and receiving are possible." while 1: + self._ensure_pipes() fileno = self.write_pipe.fileno() fds = self.poller.poll(timeout) for fd, status in fds: if fd != fileno: continue if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): - #print "Broken connection" + + # Broken connection: discard it and start all over again. + self._reset_pipes() - self._ensure_pipes() break else: return @@ -298,8 +312,9 @@ Remove the given 'channel' from the exchange. """ - del self.readables[channel.read_pipe.fileno()] - self.poller.unregister(channel.read_pipe.fileno()) + fileno = channel.read_pipe.fileno() + del self.readables[fileno] + self.poller.unregister(fileno) if self.autoclose: channel.close() channel.wait() @@ -568,10 +583,36 @@ def __call__(self, *args, **kw): - "Invoke the callable and discard the result." + "Invoke the callable with the supplied arguments." self.exchange.start(self.address, self.callable, *args, **kw) +class BackgroundCallable: + + """ + A callable which sets up a persistent communications channel, but is + unmanaged by an exchange. + """ + + def __init__(self, address, callable): + + """ + Using the given 'address', wrap the given 'callable'. This object can + then be invoked, but the wrapped callable will be run in a background + process. Note that the 'callable' must be parallel-aware (that is, have + a 'channel' parameter). Use the MakeParallel class to wrap other kinds + of callable objects. + """ + + self.callable = callable + self.address = address + + def __call__(self, *args, **kw): + + "Invoke the callable with the supplied arguments." + + start_persistent(self.address, self.callable, *args, **kw) + # Abstractions and utilities. class Map(Exchange): @@ -744,6 +785,34 @@ pass +# Convenience functions. + +def BackgroundQueue(address): + + """ + Connect to a process reachable via the given 'address', making the results + of which accessible via a queue. + """ + + queue = PersistentQueue(limit=1) + queue.connect(address) + return queue + +def pmap(callable, sequence, limit=None): + + """ + A parallel version of the built-in map function with an optional process + 'limit'. The given 'callable' should not be parallel-aware (that is, have a + 'channel' parameter) since it will be wrapped for parallel communications + before being invoked. + + Return the processed 'sequence' where each element in the sequence is + processed by a different process. + """ + + mymap = Map(limit=limit) + return mymap(callable, sequence) + # Utility functions. _cpuinfo_fields = "physical id", "core id" @@ -783,6 +852,8 @@ except OSError: return None +# Low-level functions. + def create(): """ @@ -928,19 +999,4 @@ except OSError: pass -def pmap(callable, sequence, limit=None): - - """ - A parallel version of the built-in map function with an optional process - 'limit'. The given 'callable' should not be parallel-aware (that is, have a - 'channel' parameter) since it will be wrapped for parallel communications - before being invoked. - - Return the processed 'sequence' where each element in the sequence is - processed by a different process. - """ - - mymap = Map(limit=limit) - return mymap(callable, sequence) - # vim: tabstop=4 expandtab shiftwidth=4