1.1 --- a/README.txt Sun Sep 16 17:58:48 2007 +0000
1.2 +++ b/README.txt Sun Sep 16 19:35:34 2007 +0000
1.3 @@ -51,6 +51,10 @@
1.4 permitting the conversion of "normal" functions to a form usable in the
1.5 parallel environment.
1.6
1.7 +An additional example not listed above, simple_managed_map_reusable.py,
1.8 +employs the MakeReusable class instead of MakeParallel in order to demonstrate
1.9 +reusable processes and channels.
1.10 +
1.11 The tutorial provides some information about the examples: docs/tutorial.xhtml
1.12
1.13 Parallel Raytracing with PyGmy
1.14 @@ -112,6 +116,9 @@
1.15 automatically managed by the exchange from which they were acquired.
1.16 * Added MakeParallel: a wrapper instantiated around a normal function which
1.17 sends the result of that function over the supplied channel when invoked.
1.18 + * Added MakeReusable: a wrapper like MakeParallel which can be used in
1.19 + conjunction with the newly-added reuse capability of the Exchange class in
1.20 + order to reuse processes and channels.
1.21 * Added a Map class which attempts to emulate the built-in map function,
1.22 along with a pmap function using this class.
1.23 * Added a Queue class which provides a simpler iterator-style interface to
2.1 --- a/pprocess.py Sun Sep 16 17:58:48 2007 +0000
2.2 +++ b/pprocess.py Sun Sep 16 19:35:34 2007 +0000
2.3 @@ -21,13 +21,6 @@
2.4
2.5 --------
2.6
2.7 -The recommended styles of programming using pprocess involve the "Thread-style
2.8 -Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style
2.9 -Processing" sections below, although developers may wish to read the "Message
2.10 -Exchanges" section for more details of the API concerned, and the "Fork-style
2.11 -Processing" section may be of interest to those with experience of large scale
2.12 -parallel processing systems.
2.13 -
2.14 Thread-style Processing
2.15 -----------------------
2.16
2.17 @@ -183,6 +176,28 @@
2.18 initiate computations (since a queue is merely an enhanced exchange with a
2.19 specific implementation of the store_data method).
2.20
2.21 +Exchanges as Maps
2.22 +-----------------
2.23 +
2.24 +Where the above Queue class appears like an attractive solution for the
2.25 +management of the results of computations, but where the order of their
2.26 +consumption by the creating process remains important, the Map class may offer a
2.27 +suitable way of collecting and accessing results:
2.28 +
2.29 +results = pprocess.Map(limit=10)
2.30 +for value in inputs:
2.31 + results.start(fn, args)
2.32 +
2.33 +The results can then be consumed in an order corresponding to the order of the
2.34 +computations which produced them:
2.35 +
2.36 +for result in results:
2.37 + # Process each result.
2.38 +
2.39 +Internally, the Map object records a particular ordering of channels, ensuring
2.40 +that the received results can be mapped to this ordering, and that the results
2.41 +can be made available with this ordering preserved.
2.42 +
2.43 Managed Callables
2.44 -----------------
2.45
2.46 @@ -224,6 +239,40 @@
2.47 to be a parallel-aware function which has a channel argument: the pmap function
2.48 automatically wraps the given callable internally.
2.49
2.50 +Reusing Processes and Channels
2.51 +------------------------------
2.52 +
2.53 +So far, all parallel computations have been done with newly-created processes.
2.54 +However, this can seem somewhat inefficient, especially if processes are being
2.55 +continually created and destroyed (although if this happens too often, the
2.56 +amount of work done by each process may be too little, anyway). One solution is
2.57 +to retain processes after they have done their work and request that they
2.58 +perform more work for each new parallel task or invocation. To enable the reuse
2.59 +of processes in this way, a special keyword argument may be specified when
2.60 +creating Exchange objects (and subclasses such as Map and Queue). For example:
2.61 +
2.62 +exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
2.63 +
2.64 +Code invoked through such exchanges must be aware of channels and be constructed
2.65 +in such a way that it does not terminate after sending a result back to the
2.66 +creating process. Instead, it should repeatedly wait for subsequent sets of
2.67 +parameters (compatible with those either in the signature of a callable or with
2.68 +the original values read from the channel). Reusable code is terminated when the
2.69 +special value of None is sent from the creating process to the created process,
2.70 +indicating that no more parameters will be sent; this should cause the code to
2.71 +terminate.
2.72 +
2.73 +Making Existing Functions Parallel and Reusable
2.74 +-----------------------------------------------
2.75 +
2.76 +An easier way of making reusable code sections for parallel use is to employ the
2.77 +MakeReusable class to wrap an existing callable:
2.78 +
2.79 +fn = pprocess.MakeReusable(originalfn)
2.80 +
2.81 +This wraps the callable in a similar fashion to MakeParallel, but provides the
2.82 +necessary mechanisms described above for reusable code.
2.83 +
2.84 Signals and Waiting
2.85 -------------------
2.86
2.87 @@ -488,6 +537,11 @@
2.88 # Try and reuse existing channels if possible.
2.89
2.90 if self.reuse:
2.91 +
2.92 + # Re-add the channel - this may update information related to
2.93 + # the channel in subclasses.
2.94 +
2.95 + self.add(channel)
2.96 channel.send((args, kw))
2.97 else:
2.98 self.add(start(callable, *args, **kw))
2.99 @@ -729,6 +783,24 @@
2.100
2.101 channel.send(self.callable(*args, **kw))
2.102
2.103 +class MakeReusable(MakeParallel):
2.104 +
2.105 + """
2.106 + A wrapper around functions making them able to communicate results in a
2.107 + reusable fashion.
2.108 + """
2.109 +
2.110 + def __call__(self, channel, *args, **kw):
2.111 +
2.112 + "Invoke the callable and return its result via the given 'channel'."
2.113 +
2.114 + channel.send(self.callable(*args, **kw))
2.115 + t = channel.receive()
2.116 + while t is not None:
2.117 + args, kw = t
2.118 + channel.send(self.callable(*args, **kw))
2.119 + t = channel.receive()
2.120 +
2.121 # Utility functions.
2.122
2.123 def create():