# HG changeset patch # User paulb # Date 1212331828 0 # Node ID 60e9dd6158393aee60a49dcf6cb47728ffa9ab08 # Parent 49b5b0a3fa6144911955c17e70f5d541684868ea [project @ 2008-06-01 14:50:28 by paulb] Added support for persistent/background processes, which involves a certain amount of reorganisation to support different styles of process creation and interprocess communication. diff -r 49b5b0a3fa61 -r 60e9dd615839 README.txt --- a/README.txt Tue May 13 18:05:42 2008 +0000 +++ b/README.txt Sun Jun 01 14:50:28 2008 +0000 @@ -7,6 +7,19 @@ systems with multiple CPUs or multicore CPUs, processes should take advantage of as many CPUs or cores as the operating system permits. +Tutorial +-------- + +The tutorial provides some information about the examples described below. +See the docs/tutorial.xhtml file in the distribution for more details. + +Reference +--------- + +A description of the different mechanisms provided by the pprocess module can +be found in the reference document. See the docs/reference.xhtml file in the +distribution for more details. + Quick Start ----------- @@ -51,11 +64,28 @@ permitting the conversion of "normal" functions to a form usable in the parallel environment. +Reusable Processes +------------------ + An additional example not listed above, simple_managed_map_reusable.py, employs the MakeReusable class instead of MakeParallel in order to demonstrate -reusable processes and channels. +reusable processes and channels: + +PYTHONPATH=. python examples/simple_managed_map_reusable.py + +Persistent Processes +-------------------- -The tutorial provides some information about the examples: docs/tutorial.xhtml +A number of persistent variants of some of the above examples employ a +persistent or background process which can be started by one process and +contacted later by another in order to collect the results of a computation. +For example: + +PYTHONPATH=. python examples/simple_persistent_managed.py --start +PYTHONPATH=. python examples/simple_persistent_managed.py --reconnect + +PYTHONPATH=. python examples/simple_persistent_managed_queue.py --start +PYTHONPATH=. python examples/simple_persistent_managed_queue.py --reconnect Parallel Raytracing with PyGmy ------------------------------ @@ -115,6 +145,7 @@ New in pprocess 0.3.2 (Changes since pprocess 0.3.1) ---------------------------------------------------- + * Added support for persistent/background processes. * Added a utility function to detect and return the number of processor cores available. * Added missing documentation stylesheet. diff -r 49b5b0a3fa61 -r 60e9dd615839 pprocess.py --- a/pprocess.py Tue May 13 18:05:42 2008 +0000 +++ b/pprocess.py Sun Jun 01 14:50:28 2008 +0000 @@ -57,7 +57,6 @@ self.pid = pid self.read_pipe = read_pipe self.write_pipe = write_pipe - self.closed = 0 def __del__(self): @@ -70,11 +69,13 @@ "Explicitly close the channel." - if not self.closed: - self.closed = 1 + if self.read_pipe is not None: self.read_pipe.close() + self.read_pipe = None + if self.write_pipe is not None: self.write_pipe.close() - #self.wait(os.WNOHANG) + self.write_pipe = None + #self.wait(os.WNOHANG) def wait(self, options=0): @@ -131,6 +132,83 @@ finally: self._send("OK") +class PersistentChannel(Channel): + + """ + A persistent communications channel which can handle peer disconnection, + acting as a server, meaning that this channel is associated with a specific + address which can be contacted by other processes. + """ + + def __init__(self, pid, endpoint, address): + Channel.__init__(self, pid, None, None) + self.endpoint = endpoint + self.address = address + self.poller = select.poll() + self._ensure_pipes() + + def close(self): + #print "Closing persistent channel" + Channel.close(self) + try: + os.unlink(self.address) + except OSError: + pass + + def _ensure_pipes(self): + + "Ensure that the channel is capable of communicating." + + if self.read_pipe is None or self.write_pipe is None: + #print "Waiting for connection" + self.endpoint.listen(1) + endpoint, address = self.endpoint.accept() + self.read_pipe = endpoint.makefile("r", 0) + self.write_pipe = endpoint.makefile("w", 0) + + # Monitor the write pipe for error conditions. + + fileno = self.write_pipe.fileno() + self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR) + + def _reset_pipes(self): + fileno = self.write_pipe.fileno() + self.poller.unregister(fileno) + self.read_pipe = None + self.write_pipe = None + + def _ensure_communication(self, timeout=None): + + "Ensure that sending and receiving are possible." + + while 1: + fileno = self.write_pipe.fileno() + fds = self.poller.poll(timeout) + for fd, status in fds: + if fd != fileno: + continue + if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): + #print "Broken connection" + self._reset_pipes() + self._ensure_pipes() + break + else: + return + + def _send(self, obj): + + "Send the given object 'obj' through the channel." + + self._ensure_communication() + Channel._send(self, obj) + + def _receive(self): + + "Receive an object through the channel, returning the object." + + self._ensure_communication() + return Channel._receive(self) + # Management of processes and communications. class Exchange: @@ -173,8 +251,9 @@ "Add the given 'channel' to the exchange." - self.readables[channel.read_pipe.fileno()] = channel - self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) + fileno = channel.read_pipe.fileno() + self.readables[fileno] = channel + self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) def active(self): @@ -249,11 +328,11 @@ while self.limit is not None and len(self.active()) >= self.limit: self.store() - def start_waiting(self, channel): + def _get_waiting(self, channel): """ - Start a waiting process given the reception of data on the given - 'channel'. + Get waiting callable and argument information for new processes, given + the reception of data on the given 'channel'. """ if self.waiting: @@ -269,7 +348,7 @@ self.add(channel) channel.send((args, kw)) else: - self.add(start(callable, *args, **kw)) + return callable, args, kw # Where channels are being reused, but where no processes are waiting # any more, send a special value to tell them to quit. @@ -277,6 +356,8 @@ elif self.reuse: channel.send(None) + return None + def finish(self): """ @@ -303,18 +384,59 @@ raise NotImplementedError, "store_data" + # Support for the convenience methods. + + def _set_waiting(self, callable, args, kw): + + """ + Support process creation by returning whether the given 'callable' has + been queued for later invocation. + """ + + if self.limit is not None and len(self.active()) >= self.limit: + self.waiting.insert(0, (callable, args, kw)) + return 1 + else: + return 0 + + def _get_channel_for_process(self, channel): + + """ + Support process creation by returning the given 'channel' to the + creating process, and None to the created process. + """ + + if channel.pid == 0: + return channel + else: + self.add_wait(channel) + return None + + # Methods for overriding, related to the convenience methods. + + def start_waiting(self, channel): + + """ + Start a waiting process given the reception of data on the given + 'channel'. + """ + + details = self._get_waiting(channel) + if details is not None: + callable, args, kw = details + self.add(start(callable, *args, **kw)) + # Convenience methods. def start(self, callable, *args, **kw): """ - Using pprocess.start, create a new process for the given 'callable' - using any additional arguments provided. Then, monitor the channel - created between this process and the created process. + Create a new process for the given 'callable' using any additional + arguments provided. Then, monitor the channel created between this + process and the created process. """ - if self.limit is not None and len(self.active()) >= self.limit: - self.waiting.insert(0, (callable, args, kw)) + if self._set_waiting(callable, args, kw): return self.add_wait(start(callable, *args, **kw)) @@ -322,18 +444,14 @@ def create(self): """ - Using pprocess.create, create a new process and return the created - communications channel to the created process. In the creating process, - return None - the channel receiving data from the created process will - be automatically managed by this exchange. + Create a new process and return the created communications channel to + the created process. In the creating process, return None - the channel + receiving data from the created process will be automatically managed by + this exchange. """ channel = create() - if channel.pid == 0: - return channel - else: - self.add_wait(channel) - return None + return self._get_channel_for_process(channel) def manage(self, callable): @@ -345,6 +463,67 @@ return ManagedCallable(callable, self) +class Persistent: + + """ + A mix-in class providing methods to exchanges for the management of + persistent communications. + """ + + def start_waiting(self, channel): + + """ + Start a waiting process given the reception of data on the given + 'channel'. + """ + + details = self._get_waiting(channel) + if details is not None: + callable, args, kw = details + self.add(start_persistent(channel.address, callable, *args, **kw)) + + def start(self, address, callable, *args, **kw): + + """ + Create a new process, located at the given 'address', for the given + 'callable' using any additional arguments provided. Then, monitor the + channel created between this process and the created process. + """ + + if self._set_waiting(callable, args, kw): + return + + start_persistent(address, callable, *args, **kw) + + def create(self, address): + + """ + Create a new process, located at the given 'address', and return the + created communications channel to the created process. In the creating + process, return None - the channel receiving data from the created + process will be automatically managed by this exchange. + """ + + channel = create_persistent(address) + return self._get_channel_for_process(channel) + + def manage(self, address, callable): + + """ + Using the given 'address', publish the given 'callable' in an object + which can then be called in the same way as 'callable', but with new + processes and communications managed automatically. + """ + + return PersistentCallable(address, callable, self) + + def connect(self, address): + + "Connect to a process which is contactable via the given 'address'." + + channel = connect_persistent(address) + self.add_wait(channel) + class ManagedCallable: "A callable managed by an exchange." @@ -368,6 +547,31 @@ self.exchange.start(self.callable, *args, **kw) +class PersistentCallable: + + "A callable which sets up a persistent communications channel." + + def __init__(self, address, callable, exchange): + + """ + Using the given 'address', wrap the given 'callable', using the given + 'exchange' to monitor the channels created for communications between + this and the created processes, so that when it is called, a background + process is started within which the 'callable' will run. Note that the + 'callable' must be parallel-aware (that is, have a 'channel' parameter). + Use the MakeParallel class to wrap other kinds of callable objects. + """ + + self.callable = callable + self.exchange = exchange + self.address = address + + def __call__(self, *args, **kw): + + "Invoke the callable and discard the result." + + self.exchange.start(self.address, self.callable, *args, **kw) + # Abstractions and utilities. class Map(Exchange): @@ -397,9 +601,9 @@ def start(self, callable, *args, **kw): """ - Using pprocess.start, create a new process for the given 'callable' - using any additional arguments provided. Then, monitor the channel - created between this process and the created process. + Create a new process for the given 'callable' using any additional + arguments provided. Then, monitor the channel created between this + process and the created process. """ self.results.append(None) # placeholder @@ -408,10 +612,10 @@ def create(self): """ - Using pprocess.create, create a new process and return the created - communications channel to the created process. In the creating process, - return None - the channel receiving data from the created process will - be automatically managed by this exchange. + Create a new process and return the created communications channel to + the created process. In the creating process, return None - the channel + receiving data from the created process will be automatically managed by + this exchange. """ self.results.append(None) # placeholder @@ -526,6 +730,20 @@ channel.send(self.callable(*args, **kw)) t = channel.receive() +# Persistent variants. + +class PersistentExchange(Persistent, Exchange): + + "An exchange which manages persistent communications." + + pass + +class PersistentQueue(Persistent, Queue): + + "A queue which manages persistent communications." + + pass + # Utility functions. _cpuinfo_fields = "physical id", "core id" @@ -584,6 +802,47 @@ child.close() return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) +def create_persistent(address): + + """ + Create a new process, returning a persistent communications channel between + the creating process and the created process. This channel can be + disconnected from the creating process and connected to another process, and + thus can be used to collect results from daemon processes. + + In order to be able to reconnect to created processes, the 'address' of the + communications endpoint for the created process needs to be provided. This + should be a filename. + """ + + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + child.bind(address) + + for s in [parent, child]: + s.setblocking(1) + + pid = os.fork() + if pid == 0: + parent.close() + return PersistentChannel(pid, child, address) + else: + child.close() + #parent.connect(address) + return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) + +def connect_persistent(address): + + """ + Connect via a persistent channel to an existing created process, reachable + at the given 'address'. + """ + + parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + parent.setblocking(1) + parent.connect(address) + return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0)) + def exit(channel): """ @@ -618,6 +877,47 @@ else: return channel +def start_persistent(address, callable, *args, **kw): + + """ + Create a new process which shall be reachable using the given 'address' and + which will start running in the given 'callable'. Additional arguments to + the 'callable' can be given as additional arguments to this function. + + Return a communications channel to the creating process. For the created + process, supply a channel as the 'channel' parameter in the given 'callable' + so that it may send data back to the creating process. + + Note that the created process employs a channel which is persistent: it can + withstand disconnection from the creating process and subsequent connections + from other processes. + """ + + channel = create_persistent(address) + if channel.pid == 0: + close_streams() + try: + try: + callable(channel, *args, **kw) + except: + exc_type, exc_value, exc_traceback = sys.exc_info() + channel.send(exc_value) + finally: + exit(channel) + else: + return channel + +def close_streams(): + + """ + Close streams which keep the current process attached to any creating + processes. + """ + + os.close(sys.stdin.fileno()) + os.close(sys.stdout.fileno()) + os.close(sys.stderr.fileno()) + def waitall(): "Wait for all created processes to terminate."