# HG changeset patch # User paulb # Date 1189961648 0 # Node ID 905d8ec730036039aaf30fff4a8eb6d847c4b875 # Parent 2160dc40d9d9efc9d86568349d3b5a4a0cc57e53 [project @ 2007-09-16 16:54:08 by paulb] Added channel/process reuse. diff -r 2160dc40d9d9 -r 905d8ec73003 pprocess.py --- a/pprocess.py Sun Sep 16 00:10:27 2007 +0000 +++ b/pprocess.py Sun Sep 16 16:54:08 2007 +0000 @@ -367,7 +367,7 @@ method in order to enable the 'add_wait', 'wait' and 'finish' methods. """ - def __init__(self, channels=None, limit=None, autoclose=1): + def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): """ Initialise the exchange with an optional list of 'channels'. @@ -377,14 +377,18 @@ and 'finish' methods. To make use of these methods, create a subclass of this class and define a working 'store_data' method. + If the optional 'reuse' parameter is set to a true value, channels and + processes will be reused for waiting computations. + If the optional 'autoclose' parameter is set to a false value, channels will not be closed automatically when they are removed from the exchange - by default they are closed when removed. """ self.limit = limit + self.reuse = reuse + self.autoclose = autoclose self.waiting = [] - self.autoclose = autoclose self.readables = {} self.removed = [] self.poller = select.poll() @@ -471,15 +475,28 @@ while self.limit is not None and len(self.active()) >= self.limit: self.store() - def start_waiting(self): + def start_waiting(self, channel): """ - Start a waiting process. + Start a waiting process given the reception of data on the given + 'channel'. """ if self.waiting: callable, args, kw = self.waiting.pop() - self.add(start(callable, *args, **kw)) + + # Try and reuse existing channels if possible. + + if self.reuse: + channel.send((args, kw)) + else: + self.add(start(callable, *args, **kw)) + + # Where channels are being reused, but where no processes are waiting + # any more, send a special value to tell them to quit. + + elif self.reuse: + channel.send(None) def finish(self): @@ -496,7 +513,7 @@ for channel in self.ready(): self.store_data(channel) - self.start_waiting() + self.start_waiting(channel) def store_data(self, channel):