1.1 --- a/pprocess.py Wed Sep 12 00:14:45 2007 +0000
1.2 +++ b/pprocess.py Thu Sep 13 22:01:39 2007 +0000
1.3 @@ -165,6 +165,17 @@
1.4
1.5 The exchange's finish method can be used as usual to process incoming data.
1.6
1.7 +Making Existing Functions Parallel
1.8 +----------------------------------
1.9 +
1.10 +In making a program parallel, existing functions which only return results can
1.11 +be manually modified to accept and use channels to communicate results back to
1.12 +the main process. However, a simple alternative is to use the MakeParallel class
1.13 +to provide a wrapper around unmodified functions which will return the results
1.14 +from those functions in the channels provided. For example:
1.15 +
1.16 +fn = pprocess.MakeParallel(originalfn)
1.17 +
1.18 Map-style Processing
1.19 --------------------
1.20
1.21 @@ -175,18 +186,9 @@
1.22 pprocess.pmap(fn, sequence)
1.23
1.24 Here, the sequence would have to contain elements that each contain the required
1.25 -parameters of the specified callable, fn.
1.26 -
1.27 -Making Existing Functions Parallel
1.28 -----------------------------------
1.29 -
1.30 -In making a program parallel, existing functions which only return results can
1.31 -be manually modified to accept and use channels to communicate results back to
1.32 -the main process. However, a simple alternative is to use the MakeParallel class
1.33 -to provide a wrapper around unmodified functions which will return the results
1.34 -from those functions in the channels provided. For example:
1.35 -
1.36 -fn = pprocess.MakeParallel(originalfn)
1.37 +parameters of the specified callable, fn. Note that the callable does not need
1.38 +to be a parallel-aware function which has a channel argument: the pmap function
1.39 +automatically wraps the given callable internally.
1.40
1.41 Signals and Waiting
1.42 -------------------
1.43 @@ -513,36 +515,41 @@
1.44 "Add the given 'channel' to the exchange."
1.45
1.46 Exchange.add(self, channel)
1.47 - self.channels.append(channel)
1.48 + self.channels[channel] = self.channel_number
1.49 + self.channel_number += 1
1.50
1.51 def __call__(self, callable, sequence):
1.52
1.53 "Wrap and invoke 'callable' for each element in the 'sequence'."
1.54
1.55 - wrapped = MakeParallel(callable)
1.56 + if not isinstance(callable, MakeParallel):
1.57 + wrapped = MakeParallel(callable)
1.58 + else:
1.59 + wrapped = callable
1.60
1.61 # Remember the channel addition order to order output.
1.62
1.63 - self.channels = []
1.64 - self.results = {}
1.65 + self.channel_number = 0
1.66 + self.channels = {}
1.67 + self.results = []
1.68
1.69 for i in sequence:
1.70 - self.start(wrapped, *i)
1.71 + self.results.append(None) # placeholder
1.72 + self.start(wrapped, i)
1.73 self.finish()
1.74
1.75 - # NOTE: Could use a generator instead.
1.76 + # NOTE: Could return results as they arrive, but we would then need to
1.77 + # NOTE: return the position of each result in the original sequence.
1.78
1.79 - result = []
1.80 - for channel in self.channels:
1.81 - result.append(self.results[channel])
1.82 - return result
1.83 + return self.results
1.84
1.85 def store_data(self, channel):
1.86
1.87 "Accumulate the incoming data, associating results with channels."
1.88
1.89 data = channel.receive()
1.90 - self.results[channel] = data
1.91 + self.results[self.channels[channel]] = data
1.92 + del self.channels[channel]
1.93
1.94 class MakeParallel:
1.95
1.96 @@ -619,7 +626,8 @@
1.97
1.98 """
1.99 A parallel version of the built-in map function with an optional process
1.100 - 'limit'.
1.101 + 'limit'. The given 'callable' need not be parallel-aware since it will be
1.102 + wrapped for parallel communications before invocation.
1.103 """
1.104
1.105 mymap = Map(limit=limit)