1.1 --- a/pprocess.py Sun Sep 16 00:10:27 2007 +0000
1.2 +++ b/pprocess.py Sun Sep 16 16:54:08 2007 +0000
1.3 @@ -367,7 +367,7 @@
1.4 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
1.5 """
1.6
1.7 - def __init__(self, channels=None, limit=None, autoclose=1):
1.8 + def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
1.9
1.10 """
1.11 Initialise the exchange with an optional list of 'channels'.
1.12 @@ -377,14 +377,18 @@
1.13 and 'finish' methods. To make use of these methods, create a subclass of
1.14 this class and define a working 'store_data' method.
1.15
1.16 + If the optional 'reuse' parameter is set to a true value, channels and
1.17 + processes will be reused for waiting computations.
1.18 +
1.19 If the optional 'autoclose' parameter is set to a false value, channels
1.20 will not be closed automatically when they are removed from the exchange
1.21 - by default they are closed when removed.
1.22 """
1.23
1.24 self.limit = limit
1.25 + self.reuse = reuse
1.26 + self.autoclose = autoclose
1.27 self.waiting = []
1.28 - self.autoclose = autoclose
1.29 self.readables = {}
1.30 self.removed = []
1.31 self.poller = select.poll()
1.32 @@ -471,15 +475,28 @@
1.33 while self.limit is not None and len(self.active()) >= self.limit:
1.34 self.store()
1.35
1.36 - def start_waiting(self):
1.37 + def start_waiting(self, channel):
1.38
1.39 """
1.40 - Start a waiting process.
1.41 + Start a waiting process given the reception of data on the given
1.42 + 'channel'.
1.43 """
1.44
1.45 if self.waiting:
1.46 callable, args, kw = self.waiting.pop()
1.47 - self.add(start(callable, *args, **kw))
1.48 +
1.49 + # Try and reuse existing channels if possible.
1.50 +
1.51 + if self.reuse:
1.52 + channel.send((args, kw))
1.53 + else:
1.54 + self.add(start(callable, *args, **kw))
1.55 +
1.56 + # Where channels are being reused, but where no processes are waiting
1.57 + # any more, send a special value to tell them to quit.
1.58 +
1.59 + elif self.reuse:
1.60 + channel.send(None)
1.61
1.62 def finish(self):
1.63
1.64 @@ -496,7 +513,7 @@
1.65
1.66 for channel in self.ready():
1.67 self.store_data(channel)
1.68 - self.start_waiting()
1.69 + self.start_waiting(channel)
1.70
1.71 def store_data(self, channel):
1.72