1.1 --- a/pprocess.py Sun May 17 23:29:54 2009 +0200
1.2 +++ b/pprocess.py Thu May 21 19:24:06 2009 +0200
1.3 @@ -38,6 +38,10 @@
1.4 except NameError:
1.5 from sets import Set as set
1.6
1.7 +# Special values.
1.8 +
1.9 +class Undefined: pass
1.10 +
1.11 # Communications.
1.12
1.13 class AcknowledgementError(Exception):
1.14 @@ -631,6 +635,7 @@
1.15 self.channel_number = 0
1.16 self.channels = {}
1.17 self.results = []
1.18 + self.current_index = 0
1.19
1.20 def add(self, channel):
1.21
1.22 @@ -648,7 +653,7 @@
1.23 process and the created process.
1.24 """
1.25
1.26 - self.results.append(None) # placeholder
1.27 + self.results.append(Undefined) # placeholder
1.28 Exchange.start(self, callable, *args, **kw)
1.29
1.30 def create(self):
1.31 @@ -660,7 +665,7 @@
1.32 this exchange.
1.33 """
1.34
1.35 - self.results.append(None) # placeholder
1.36 + self.results.append(Undefined) # placeholder
1.37 return Exchange.create(self)
1.38
1.39 def __call__(self, callable, sequence):
1.40 @@ -683,14 +688,6 @@
1.41
1.42 return self
1.43
1.44 - def __getitem__(self, i):
1.45 - self.finish()
1.46 - return self.results[i]
1.47 -
1.48 - def __iter__(self):
1.49 - self.finish()
1.50 - return iter(self.results)
1.51 -
1.52 def store_data(self, channel):
1.53
1.54 "Accumulate the incoming data, associating results with channels."
1.55 @@ -699,6 +696,58 @@
1.56 self.results[self.channels[channel]] = data
1.57 del self.channels[channel]
1.58
1.59 + def __iter__(self):
1.60 + return self
1.61 +
1.62 + def next(self):
1.63 +
1.64 + "Return the next element in the map."
1.65 +
1.66 + try:
1.67 + return self._next()
1.68 + except IndexError:
1.69 + pass
1.70 +
1.71 + while self.active():
1.72 + self.store()
1.73 + try:
1.74 + return self._next()
1.75 + except IndexError:
1.76 + pass
1.77 + else:
1.78 + raise StopIteration
1.79 +
1.80 + def __getitem__(self, i):
1.81 +
1.82 + "Return element 'i' from the map."
1.83 +
1.84 + try:
1.85 + return self._get(i)
1.86 + except IndexError:
1.87 + pass
1.88 +
1.89 + while self.active():
1.90 + self.store()
1.91 + try:
1.92 + return self._get(i)
1.93 + except IndexError:
1.94 + pass
1.95 + else:
1.96 + raise IndexError, i
1.97 +
1.98 + # Helper methods for the above access methods.
1.99 +
1.100 + def _next(self):
1.101 + result = self._get(self.current_index)
1.102 + self.current_index += 1
1.103 + return result
1.104 +
1.105 + def _get(self, i):
1.106 + result = self.results[i]
1.107 + if result is Undefined or isinstance(i, slice) and Undefined in result:
1.108 + raise IndexError, i
1.109 + return result
1.110 +
1.111 class Queue(Exchange):
1.112
1.113 """