# HG changeset patch # User paulb # Date 1189885269 0 # Node ID 650cd5af09a5d2386d606cbff5c62cde6c8c50f3 # Parent 4044f325e3e7074be5e43b5df43b8cdc248a1232 [project @ 2007-09-15 19:41:09 by paulb] Made the Map class more usable as a result collector with the usual Exchange methods and managed callables, rather than being called itself and directly providing a result. Added iterator and list-style access through Map objects, rather than returning the results list to a caller. diff -r 4044f325e3e7 -r 650cd5af09a5 pprocess.py --- a/pprocess.py Sat Sep 15 15:57:34 2007 +0000 +++ b/pprocess.py Sat Sep 15 19:41:09 2007 +0000 @@ -578,6 +578,18 @@ "An exchange which can be used like the built-in 'map' function." + def __init__(self, *args, **kw): + Exchange.__init__(self, *args, **kw) + self.init() + + def init(self): + + "Remember the channel addition order to order output." + + self.channel_number = 0 + self.channels = {} + self.results = [] + def add(self, channel): "Add the given 'channel' to the exchange." @@ -586,6 +598,17 @@ self.channels[channel] = self.channel_number self.channel_number += 1 + def start(self, callable, *args, **kw): + + """ + Using pprocess.start, create a new process for the given 'callable' + using any additional arguments provided. Then, monitor the channel + created between this process and the created process. + """ + + self.results.append(None) # placeholder + Exchange.start(self, callable, *args, **kw) + def __call__(self, callable, sequence): "Wrap and invoke 'callable' for each element in the 'sequence'." @@ -595,21 +618,24 @@ else: wrapped = callable - # Remember the channel addition order to order output. + self.init() - self.channel_number = 0 - self.channels = {} - self.results = [] + # Start processes for each element in the sequence. for i in sequence: - self.results.append(None) # placeholder self.start(wrapped, i) - self.finish() + + # Access to the results occurs through this object. + + return self - # NOTE: Could return results as they arrive, but we would then need to - # NOTE: return the position of each result in the original sequence. + def __getitem__(self, i): + self.finish() + return self.results[i] - return self.results + def __iter__(self): + self.finish() + return iter(self.results) def store_data(self, channel):