1.1 --- a/pprocess.py Sun Sep 16 17:58:48 2007 +0000
1.2 +++ b/pprocess.py Sun Sep 16 19:35:34 2007 +0000
1.3 @@ -21,13 +21,6 @@
1.4
1.5 --------
1.6
1.7 -The recommended styles of programming using pprocess involve the "Thread-style
1.8 -Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style
1.9 -Processing" sections below, although developers may wish to read the "Message
1.10 -Exchanges" section for more details of the API concerned, and the "Fork-style
1.11 -Processing" section may be of interest to those with experience of large scale
1.12 -parallel processing systems.
1.13 -
1.14 Thread-style Processing
1.15 -----------------------
1.16
1.17 @@ -183,6 +176,28 @@
1.18 initiate computations (since a queue is merely an enhanced exchange with a
1.19 specific implementation of the store_data method).
1.20
1.21 +Exchanges as Maps
1.22 +-----------------
1.23 +
1.24 +Where the above Queue class appears like an attractive solution for the
1.25 +management of the results of computations, but where the order of their
1.26 +consumption by the creating process remains important, the Map class may offer a
1.27 +suitable way of collecting and accessing results:
1.28 +
1.29 +results = pprocess.Map(limit=10)
1.30 +for value in inputs:
1.31 + results.start(fn, args)
1.32 +
1.33 +The results can then be consumed in an order corresponding to the order of the
1.34 +computations which produced them:
1.35 +
1.36 +for result in results:
1.37 + # Process each result.
1.38 +
1.39 +Internally, the Map object records a particular ordering of channels, ensuring
1.40 +that the received results can be mapped to this ordering, and that the results
1.41 +can be made available with this ordering preserved.
1.42 +
1.43 Managed Callables
1.44 -----------------
1.45
1.46 @@ -224,6 +239,40 @@
1.47 to be a parallel-aware function which has a channel argument: the pmap function
1.48 automatically wraps the given callable internally.
1.49
1.50 +Reusing Processes and Channels
1.51 +------------------------------
1.52 +
1.53 +So far, all parallel computations have been done with newly-created processes.
1.54 +However, this can seem somewhat inefficient, especially if processes are being
1.55 +continually created and destroyed (although if this happens too often, the
1.56 +amount of work done by each process may be too little, anyway). One solution is
1.57 +to retain processes after they have done their work and request that they
1.58 +perform more work for each new parallel task or invocation. To enable the reuse
1.59 +of processes in this way, a special keyword argument may be specified when
1.60 +creating Exchange objects (and subclasses such as Map and Queue). For example:
1.61 +
1.62 +exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
1.63 +
1.64 +Code invoked through such exchanges must be aware of channels and be constructed
1.65 +in such a way that it does not terminate after sending a result back to the
1.66 +creating process. Instead, it should repeatedly wait for subsequent sets of
1.67 +parameters (compatible with those either in the signature of a callable or with
1.68 +the original values read from the channel). Reusable code is terminated when the
1.69 +special value of None is sent from the creating process to the created process,
1.70 +indicating that no more parameters will be sent; this should cause the code to
1.71 +terminate.
1.72 +
1.73 +Making Existing Functions Parallel and Reusable
1.74 +-----------------------------------------------
1.75 +
1.76 +An easier way of making reusable code sections for parallel use is to employ the
1.77 +MakeReusable class to wrap an existing callable:
1.78 +
1.79 +fn = pprocess.MakeReusable(originalfn)
1.80 +
1.81 +This wraps the callable in a similar fashion to MakeParallel, but provides the
1.82 +necessary mechanisms described above for reusable code.
1.83 +
1.84 Signals and Waiting
1.85 -------------------
1.86
1.87 @@ -488,6 +537,11 @@
1.88 # Try and reuse existing channels if possible.
1.89
1.90 if self.reuse:
1.91 +
1.92 + # Re-add the channel - this may update information related to
1.93 + # the channel in subclasses.
1.94 +
1.95 + self.add(channel)
1.96 channel.send((args, kw))
1.97 else:
1.98 self.add(start(callable, *args, **kw))
1.99 @@ -729,6 +783,24 @@
1.100
1.101 channel.send(self.callable(*args, **kw))
1.102
1.103 +class MakeReusable(MakeParallel):
1.104 +
1.105 + """
1.106 + A wrapper around functions making them able to communicate results in a
1.107 + reusable fashion.
1.108 + """
1.109 +
1.110 + def __call__(self, channel, *args, **kw):
1.111 +
1.112 + "Invoke the callable and return its result via the given 'channel'."
1.113 +
1.114 + channel.send(self.callable(*args, **kw))
1.115 + t = channel.receive()
1.116 + while t is not None:
1.117 + args, kw = t
1.118 + channel.send(self.callable(*args, **kw))
1.119 + t = channel.receive()
1.120 +
1.121 # Utility functions.
1.122
1.123 def create():