# HG changeset patch # User paulb # Date 1189815083 0 # Node ID 36f4afc7748a127b82f4dc622d89fe77a796c830 # Parent d78115b759e5b54523d88b9a0ff50b99662748fb [project @ 2007-09-15 00:11:23 by paulb] Added a create method on exchanges, an exit function on the module, and a Queue class for more convenient result consumption. diff -r d78115b759e5 -r 36f4afc7748a pprocess.py --- a/pprocess.py Fri Sep 14 23:16:24 2007 +0000 +++ b/pprocess.py Sat Sep 15 00:11:23 2007 +0000 @@ -52,7 +52,7 @@ (ie. the fork system call on various operating systems), use the following method: -channel = create() +channel = pprocess.create() if channel.pid == 0: # This code is run by the created process. # Read from and write to the channel to communicate with the @@ -60,6 +60,7 @@ # An explicit exit of the process may be desirable to prevent the process # from running code which is intended for the creating/calling process. ... + pprocess.exit(channel) else: # This code is run by the creating/calling process. # Read from and write to the channel to communicate with the created @@ -130,6 +131,10 @@ exchange.add_wait(channel) # add a channel, waiting if the limit would be # exceeded +Or we can request that the exchange create a channel on our behalf: + +channel = exchange.create() + We can even start processes and monitor channels without ever handling the channel ourselves: @@ -149,6 +154,35 @@ exchange API as described above. However, it permits much simpler and clearer code. +Exchanges as Queues +------------------- + +Instead of having to subclass the pprocess.Exchange class and to define the +store_data method, it might be more desirable to let the exchange manage the +communications between created and creating processes and to let the creating +process just consume received data as it arrives, without particular regard for +the order of the received data - perhaps the creating process has its own way of +managing such issues. + +For such situations, the Queue class may be instantiated and channels added to +the queue using the various methods provided: + +queue = pprocess.Queue(limit=10) +channel = queue.create() +if channel: + # Do some computation. + pprocess.exit(channel) + +The results can then be consumed by treating the queue like an iterator: + +for result in queue: + # Capture each result. + +This approach does not, of course, require the direct handling of channels. One +could instead use the start method on the queue to create processes and to +initiate computations (since a queue is merely an enhanced exchange with a +specific implementation of the store_data method). + Managed Callables ----------------- @@ -473,6 +507,22 @@ self.add_wait(start(callable, *args, **kw)) + def create(self): + + """ + Using pprocess.create, create a new process and return the created + communications channel to the created process. In the creating process, + return None - the channel receiving data from the created process will + be automatically managed by this exchange. + """ + + channel = create() + if channel.pid == 0: + return channel + else: + self.add_wait(channel) + return None + def manage(self, callable): """ @@ -553,6 +603,40 @@ self.results[self.channels[channel]] = data del self.channels[channel] +class Queue(Exchange): + + """ + An exchange acting as a queue, making data from created processes available + in the order in which it is received. + """ + + def __init__(self, *args, **kw): + Exchange.__init__(self, *args, **kw) + self.queue = [] + + def store_data(self, channel): + + "Accumulate the incoming data, associating results with channels." + + data = channel.receive() + self.queue.insert(0, data) + + def __iter__(self): + return self + + def next(self): + + "Return the next element in the queue." + + if self.queue: + return self.queue.pop() + while self.active(): + self.store() + if self.queue: + return self.queue.pop() + else: + raise StopIteration + class MakeParallel: "A wrapper around functions making them able to communicate results." @@ -595,6 +679,15 @@ child.close() return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) +def exit(channel): + + """ + Terminate a created process, closing the given 'channel'. + """ + + channel.close() + os._exit(0) + def start(callable, *args, **kw): """ @@ -616,8 +709,7 @@ exc_type, exc_value, exc_traceback = sys.exc_info() channel.send(exc_value) finally: - channel.close() - os._exit(0) + pprocess.exit(channel) else: return channel