# HG changeset patch # User paulb # Date 1189720899 0 # Node ID 643acf9a0427f21f8f7a156144da5ea68ef2da9c # Parent 6a6c183df04aaba014159768139ada0aadaa23d6 [project @ 2007-09-13 22:01:39 by paulb] Improved the Map class so that channels are not held indefinitely in a dictionary - instead, each channel is associated with the position of the input value in the supplied sequence for which it was created, and result values are then placed directly in a pre-allocated sequence using this position number. Fixed the invocation of callables in the Map class, passing each input value as a single parameter (as map does). Made notes in the module docstring about pmap and unwrapped callables. diff -r 6a6c183df04a -r 643acf9a0427 pprocess.py --- a/pprocess.py Wed Sep 12 00:14:45 2007 +0000 +++ b/pprocess.py Thu Sep 13 22:01:39 2007 +0000 @@ -165,6 +165,17 @@ The exchange's finish method can be used as usual to process incoming data. +Making Existing Functions Parallel +---------------------------------- + +In making a program parallel, existing functions which only return results can +be manually modified to accept and use channels to communicate results back to +the main process. However, a simple alternative is to use the MakeParallel class +to provide a wrapper around unmodified functions which will return the results +from those functions in the channels provided. For example: + +fn = pprocess.MakeParallel(originalfn) + Map-style Processing -------------------- @@ -175,18 +186,9 @@ pprocess.pmap(fn, sequence) Here, the sequence would have to contain elements that each contain the required -parameters of the specified callable, fn. - -Making Existing Functions Parallel ----------------------------------- - -In making a program parallel, existing functions which only return results can -be manually modified to accept and use channels to communicate results back to -the main process. However, a simple alternative is to use the MakeParallel class -to provide a wrapper around unmodified functions which will return the results -from those functions in the channels provided. For example: - -fn = pprocess.MakeParallel(originalfn) +parameters of the specified callable, fn. Note that the callable does not need +to be a parallel-aware function which has a channel argument: the pmap function +automatically wraps the given callable internally. Signals and Waiting ------------------- @@ -513,36 +515,41 @@ "Add the given 'channel' to the exchange." Exchange.add(self, channel) - self.channels.append(channel) + self.channels[channel] = self.channel_number + self.channel_number += 1 def __call__(self, callable, sequence): "Wrap and invoke 'callable' for each element in the 'sequence'." - wrapped = MakeParallel(callable) + if not isinstance(callable, MakeParallel): + wrapped = MakeParallel(callable) + else: + wrapped = callable # Remember the channel addition order to order output. - self.channels = [] - self.results = {} + self.channel_number = 0 + self.channels = {} + self.results = [] for i in sequence: - self.start(wrapped, *i) + self.results.append(None) # placeholder + self.start(wrapped, i) self.finish() - # NOTE: Could use a generator instead. + # NOTE: Could return results as they arrive, but we would then need to + # NOTE: return the position of each result in the original sequence. - result = [] - for channel in self.channels: - result.append(self.results[channel]) - return result + return self.results def store_data(self, channel): "Accumulate the incoming data, associating results with channels." data = channel.receive() - self.results[channel] = data + self.results[self.channels[channel]] = data + del self.channels[channel] class MakeParallel: @@ -619,7 +626,8 @@ """ A parallel version of the built-in map function with an optional process - 'limit'. + 'limit'. The given 'callable' need not be parallel-aware since it will be + wrapped for parallel communications before invocation. """ mymap = Map(limit=limit)