# HG changeset patch # User paulb # Date 1195862415 0 # Node ID 2b077110ab34ad0edb39a594a21a7affc13b2cc8 # Parent d25debefa1da5ed3483030a3d4c5818d9943404e [project @ 2007-11-24 00:00:15 by paulb] Moved the reference material from the module docstring to a separate document. Updated the release number. diff -r d25debefa1da -r 2b077110ab34 PKG-INFO --- a/PKG-INFO Thu Jun 19 21:44:53 2008 +0200 +++ b/PKG-INFO Sat Nov 24 00:00:15 2007 +0000 @@ -1,11 +1,11 @@ Metadata-Version: 1.1 Name: pprocess -Version: 0.3 +Version: 0.3.1 Author: Paul Boddie Author-email: paul at boddie org uk Maintainer: Paul Boddie Maintainer-email: paul at boddie org uk -Download-url: http://www.boddie.org.uk/python/downloads/pprocess-0.3.tar.gz +Download-url: http://www.boddie.org.uk/python/downloads/pprocess-0.3.1.tar.gz Summary: Elementary parallel programming for Python License: LGPL (version 3 or later) Description: The pprocess module provides elementary support for parallel diff -r d25debefa1da -r 2b077110ab34 pprocess.py --- a/pprocess.py Thu Jun 19 21:44:53 2008 +0200 +++ b/pprocess.py Sat Nov 24 00:00:15 2007 +0000 @@ -18,289 +18,9 @@ You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . - --------- - -Thread-style Processing ------------------------ - -To create new processes to run a function or any callable object, specify the -"callable" and any arguments as follows: - -channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) - -This returns a channel which can then be used to communicate with the created -process. Meanwhile, in the created process, the given callable will be invoked -with another channel as its first argument followed by the specified arguments: - -def fn(channel, arg1, arg2, named1, named2): - # Read from and write to the channel. - # Return value is ignored. - ... - -Fork-style Processing ---------------------- - -To create new processes in a similar way to that employed when using os.fork -(ie. the fork system call on various operating systems), use the following -method: - -channel = pprocess.create() -if channel.pid == 0: - # This code is run by the created process. - # Read from and write to the channel to communicate with the - # creating/calling process. - # An explicit exit of the process may be desirable to prevent the process - # from running code which is intended for the creating/calling process. - ... - pprocess.exit(channel) -else: - # This code is run by the creating/calling process. - # Read from and write to the channel to communicate with the created - # process. - ... - -Message Exchanges ------------------ - -When creating many processes, each providing results for the consumption of the -main process, the collection of those results in an efficient fashion can be -problematic: if some processes take longer than others, and if we decide to read -from those processes when they are not ready instead of other processes which -are ready, the whole activity will take much longer than necessary. - -One solution to the problem of knowing when to read from channels is to create -an Exchange object, optionally initialising it with a list of channels through -which data is expected to arrive: - -exchange = pprocess.Exchange() # populate the exchange later -exchange = pprocess.Exchange(channels) # populate the exchange with channels - -We can add channels to the exchange using the add method: - -exchange.add(channel) - -To test whether an exchange is active - that is, whether it is actually -monitoring any channels - we can use the active method which returns all -channels being monitored by the exchange: - -channels = exchange.active() - -We may then check the exchange to see whether any data is ready to be received; -for example: - -for channel in exchange.ready(): - # Read from and write to the channel. - ... - -If we do not wish to wait indefinitely for a list of channels, we can set a -timeout value as an argument to the ready method (as a floating point number -specifying the timeout in seconds, where 0 means a non-blocking poll as stated -in the select module's select function documentation). - -Convenient Message Exchanges ----------------------------- - -A convenient form of message exchanges can be adopted by defining a subclass of -the Exchange class and defining a particular method: - -class MyExchange(pprocess.Exchange): - def store_data(self, channel): - data = channel.receive() - # Do something with data here. - -The exact operations performed on the received data might be as simple as -storing it on an instance attribute. To make use of the exchange, we would -instantiate it as usual: - -exchange = MyExchange() # populate the exchange later -exchange = MyExchange(limit=10) # set a limit for later population - -The exchange can now be used in a simpler fashion than that shown above. We can -add channels as before using the add method, or we can choose to only add -channels if the specified limit of channels is not exceeded: - -exchange.add(channel) # add a channel as normal -exchange.add_wait(channel) # add a channel, waiting if the limit would be - # exceeded - -Or we can request that the exchange create a channel on our behalf: - -channel = exchange.create() - -We can even start processes and monitor channels without ever handling the -channel ourselves: - -exchange.start(fn, arg1, arg2, named1=value1, named2=value2) - -We can explicitly wait for "free space" for channels by calling the wait method, -although the start and add_wait methods make this less interesting: - -exchange.wait() - -Finally, when finishing the computation, we can choose to merely call the finish -method and have the remaining data processed automatically: - -exchange.finish() - -Clearly, this approach is less flexible but more convenient than the raw message -exchange API as described above. However, it permits much simpler and clearer -code. - -Exchanges as Queues -------------------- - -Instead of having to subclass the pprocess.Exchange class and to define the -store_data method, it might be more desirable to let the exchange manage the -communications between created and creating processes and to let the creating -process just consume received data as it arrives, without particular regard for -the order of the received data - perhaps the creating process has its own way of -managing such issues. - -For such situations, the Queue class may be instantiated and channels added to -the queue using the various methods provided: - -queue = pprocess.Queue(limit=10) -channel = queue.create() -if channel: - # Do some computation. - pprocess.exit(channel) - -The results can then be consumed by treating the queue like an iterator: - -for result in queue: - # Capture each result. - -This approach does not, of course, require the direct handling of channels. One -could instead use the start method on the queue to create processes and to -initiate computations (since a queue is merely an enhanced exchange with a -specific implementation of the store_data method). - -Exchanges as Maps ------------------ - -Where the above Queue class appears like an attractive solution for the -management of the results of computations, but where the order of their -consumption by the creating process remains important, the Map class may offer a -suitable way of collecting and accessing results: - -results = pprocess.Map(limit=10) -for value in inputs: - results.start(fn, args) - -The results can then be consumed in an order corresponding to the order of the -computations which produced them: - -for result in results: - # Process each result. - -Internally, the Map object records a particular ordering of channels, ensuring -that the received results can be mapped to this ordering, and that the results -can be made available with this ordering preserved. - -Managed Callables ------------------ - -A further simplification of the above convenient use of message exchanges -involves the creation of callables (eg. functions) which are automatically -monitored by an exchange. We create such a callable by calling the manage method -on an exchange: - -myfn = exchange.manage(fn) - -This callable can then be invoked instead of using the exchange's start method: - -myfn(arg1, arg2, named1=value1, named2=value2) - -The exchange's finish method can be used as usual to process incoming data. - -Making Existing Functions Parallel ----------------------------------- - -In making a program parallel, existing functions which only return results can -be manually modified to accept and use channels to communicate results back to -the main process. However, a simple alternative is to use the MakeParallel class -to provide a wrapper around unmodified functions which will return the results -from those functions in the channels provided. For example: - -fn = pprocess.MakeParallel(originalfn) - -Map-style Processing --------------------- - -In situations where a callable would normally be used in conjunction with the -Python built-in map function, an alternative solution can be adopted by using -the pmap function: - -pprocess.pmap(fn, sequence) - -Here, the sequence would have to contain elements that each contain the required -parameters of the specified callable, fn. Note that the callable does not need -to be a parallel-aware function which has a channel argument: the pmap function -automatically wraps the given callable internally. - -Reusing Processes and Channels ------------------------------- - -So far, all parallel computations have been done with newly-created processes. -However, this can seem somewhat inefficient, especially if processes are being -continually created and destroyed (although if this happens too often, the -amount of work done by each process may be too little, anyway). One solution is -to retain processes after they have done their work and request that they -perform more work for each new parallel task or invocation. To enable the reuse -of processes in this way, a special keyword argument may be specified when -creating Exchange objects (and subclasses such as Map and Queue). For example: - -exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes - -Code invoked through such exchanges must be aware of channels and be constructed -in such a way that it does not terminate after sending a result back to the -creating process. Instead, it should repeatedly wait for subsequent sets of -parameters (compatible with those either in the signature of a callable or with -the original values read from the channel). Reusable code is terminated when the -special value of None is sent from the creating process to the created process, -indicating that no more parameters will be sent; this should cause the code to -terminate. - -Making Existing Functions Parallel and Reusable ------------------------------------------------ - -An easier way of making reusable code sections for parallel use is to employ the -MakeReusable class to wrap an existing callable: - -fn = pprocess.MakeReusable(originalfn) - -This wraps the callable in a similar fashion to MakeParallel, but provides the -necessary mechanisms described above for reusable code. - -Signals and Waiting -------------------- - -When created/child processes terminate, one would typically want to be informed -of such conditions using a signal handler. Unfortunately, Python seems to have -issues with restartable reads from file descriptors when interrupted by signals: - -http://mail.python.org/pipermail/python-dev/2002-September/028572.html -http://twistedmatrix.com/bugs/issue733 - -Select and Poll ---------------- - -The exact combination of conditions indicating closed pipes remains relatively -obscure. Here is a message/thread describing them (in the context of another -topic): - -http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html - -It would seem, from using sockets and from studying the asyncore module, that -sockets are more predictable than pipes. - -Notes about poll implementations can be found here: - -http://www.greenend.org.uk/rjk/2001/06/poll.html """ -__version__ = "0.3" +__version__ = "0.3.1" import os import sys