# HG changeset patch # User paulb # Date 1189971334 0 # Node ID 9abc49db3422ae09d2fad7ae0a68f83577e57e93 # Parent 3c743856195c42e46f0d15f40c0b3556f7d83351 [project @ 2007-09-16 19:35:34 by paulb] Added MakeReusable and made sure that the initiation of waiting invocations causes reused channels to be re-added (thus letting reusable processes work with the Map class). Updated documentation and release information. diff -r 3c743856195c -r 9abc49db3422 README.txt --- a/README.txt Sun Sep 16 17:58:48 2007 +0000 +++ b/README.txt Sun Sep 16 19:35:34 2007 +0000 @@ -51,6 +51,10 @@ permitting the conversion of "normal" functions to a form usable in the parallel environment. +An additional example not listed above, simple_managed_map_reusable.py, +employs the MakeReusable class instead of MakeParallel in order to demonstrate +reusable processes and channels. + The tutorial provides some information about the examples: docs/tutorial.xhtml Parallel Raytracing with PyGmy @@ -112,6 +116,9 @@ automatically managed by the exchange from which they were acquired. * Added MakeParallel: a wrapper instantiated around a normal function which sends the result of that function over the supplied channel when invoked. + * Added MakeReusable: a wrapper like MakeParallel which can be used in + conjunction with the newly-added reuse capability of the Exchange class in + order to reuse processes and channels. * Added a Map class which attempts to emulate the built-in map function, along with a pmap function using this class. * Added a Queue class which provides a simpler iterator-style interface to diff -r 3c743856195c -r 9abc49db3422 pprocess.py --- a/pprocess.py Sun Sep 16 17:58:48 2007 +0000 +++ b/pprocess.py Sun Sep 16 19:35:34 2007 +0000 @@ -21,13 +21,6 @@ -------- -The recommended styles of programming using pprocess involve the "Thread-style -Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style -Processing" sections below, although developers may wish to read the "Message -Exchanges" section for more details of the API concerned, and the "Fork-style -Processing" section may be of interest to those with experience of large scale -parallel processing systems. - Thread-style Processing ----------------------- @@ -183,6 +176,28 @@ initiate computations (since a queue is merely an enhanced exchange with a specific implementation of the store_data method). +Exchanges as Maps +----------------- + +Where the above Queue class appears like an attractive solution for the +management of the results of computations, but where the order of their +consumption by the creating process remains important, the Map class may offer a +suitable way of collecting and accessing results: + +results = pprocess.Map(limit=10) +for value in inputs: + results.start(fn, args) + +The results can then be consumed in an order corresponding to the order of the +computations which produced them: + +for result in results: + # Process each result. + +Internally, the Map object records a particular ordering of channels, ensuring +that the received results can be mapped to this ordering, and that the results +can be made available with this ordering preserved. + Managed Callables ----------------- @@ -224,6 +239,40 @@ to be a parallel-aware function which has a channel argument: the pmap function automatically wraps the given callable internally. +Reusing Processes and Channels +------------------------------ + +So far, all parallel computations have been done with newly-created processes. +However, this can seem somewhat inefficient, especially if processes are being +continually created and destroyed (although if this happens too often, the +amount of work done by each process may be too little, anyway). One solution is +to retain processes after they have done their work and request that they +perform more work for each new parallel task or invocation. To enable the reuse +of processes in this way, a special keyword argument may be specified when +creating Exchange objects (and subclasses such as Map and Queue). For example: + +exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes + +Code invoked through such exchanges must be aware of channels and be constructed +in such a way that it does not terminate after sending a result back to the +creating process. Instead, it should repeatedly wait for subsequent sets of +parameters (compatible with those either in the signature of a callable or with +the original values read from the channel). Reusable code is terminated when the +special value of None is sent from the creating process to the created process, +indicating that no more parameters will be sent; this should cause the code to +terminate. + +Making Existing Functions Parallel and Reusable +----------------------------------------------- + +An easier way of making reusable code sections for parallel use is to employ the +MakeReusable class to wrap an existing callable: + +fn = pprocess.MakeReusable(originalfn) + +This wraps the callable in a similar fashion to MakeParallel, but provides the +necessary mechanisms described above for reusable code. + Signals and Waiting ------------------- @@ -488,6 +537,11 @@ # Try and reuse existing channels if possible. if self.reuse: + + # Re-add the channel - this may update information related to + # the channel in subclasses. + + self.add(channel) channel.send((args, kw)) else: self.add(start(callable, *args, **kw)) @@ -729,6 +783,24 @@ channel.send(self.callable(*args, **kw)) +class MakeReusable(MakeParallel): + + """ + A wrapper around functions making them able to communicate results in a + reusable fashion. + """ + + def __call__(self, channel, *args, **kw): + + "Invoke the callable and return its result via the given 'channel'." + + channel.send(self.callable(*args, **kw)) + t = channel.receive() + while t is not None: + args, kw = t + channel.send(self.callable(*args, **kw)) + t = channel.receive() + # Utility functions. def create():