# HG changeset patch # User paulb # Date 1195862418 0 # Node ID 7e614f0cccb6d1e67a47f7553e130dc912566f2c # Parent 2b077110ab34ad0edb39a594a21a7affc13b2cc8 [project @ 2007-11-24 00:00:18 by paulb] Moved the reference material from the module docstring to a separate document. diff -r 2b077110ab34 -r 7e614f0cccb6 docs/reference.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/docs/reference.html Sat Nov 24 00:00:18 2007 +0000 @@ -0,0 +1,367 @@ + + +
+ +The pprocess
module defines a simple parallel processing API
+for Python, inspired somewhat by the thread
module, slightly less
+by pypar, and slightly
+less still by pypvm.
This document complements the tutorial by +providing an overview of the different styles of parallel programming supported +by the module. For an introduction and in order to get a clearer idea of the +most suitable styles for your own programs, consult the +tutorial.
+ +To create new processes to run a function or any callable object, specify the +"callable" and any arguments as follows:
+ ++channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) ++ +
This returns a channel which can then be used to communicate with the created +process. Meanwhile, in the created process, the given callable will be invoked +with another channel as its first argument followed by the specified arguments:
+ ++def fn(channel, arg1, arg2, named1, named2): + # Read from and write to the channel. + # Return value is ignored. + ... ++ +
To create new processes in a similar way to that employed when using os.fork
+(ie. the fork
system call on various operating systems), use the following
+method:
+channel = pprocess.create() +if channel.pid == 0: + # This code is run by the created process. + # Read from and write to the channel to communicate with the + # creating/calling process. + # An explicit exit of the process may be desirable to prevent the process + # from running code which is intended for the creating/calling process. + ... + pprocess.exit(channel) +else: + # This code is run by the creating/calling process. + # Read from and write to the channel to communicate with the created + # process. + ... ++ +
When creating many processes, each providing results for the consumption of the +main process, the collection of those results in an efficient fashion can be +problematic: if some processes take longer than others, and if we decide to read +from those processes when they are not ready instead of other processes which +are ready, the whole activity will take much longer than necessary.
+ +One solution to the problem of knowing when to read from channels is to create
+an Exchange
object, optionally initialising it with a list of channels
+through which data is expected to arrive:
+exchange = pprocess.Exchange() # populate the exchange later +exchange = pprocess.Exchange(channels) # populate the exchange with channels ++ +
We can add channels to the exchange using the add
method:
+exchange.add(channel) ++ +
To test whether an exchange is active - that is, whether it is actually
+monitoring any channels - we can use the active
method which
+returns all channels being monitored by the exchange:
+channels = exchange.active() ++ +
We may then check the exchange to see whether any data is ready to be received; +for example:
+ ++for channel in exchange.ready(): + # Read from and write to the channel. + ... ++ +
If we do not wish to wait indefinitely for a list of channels, we can set a
+timeout value as an argument to the ready
method (as a floating
+point number specifying the timeout in seconds, where 0
means a
+non-blocking poll as stated in the select
module's select
+function documentation).
A convenient form of message exchanges can be adopted by defining a subclass of
+the Exchange
class and defining a particular method:
+class MyExchange(pprocess.Exchange): + def store_data(self, channel): + data = channel.receive() + # Do something with data here. ++ +
The exact operations performed on the received data might be as simple as +storing it on an instance attribute. To make use of the exchange, we would +instantiate it as usual:
+ ++exchange = MyExchange() # populate the exchange later +exchange = MyExchange(limit=10) # set a limit for later population ++ +
The exchange can now be used in a simpler fashion than that shown above. We can
+add channels as before using the add
method, or we can choose to only
+add channels if the specified limit of channels is not exceeded:
+exchange.add(channel) # add a channel as normal +exchange.add_wait(channel) # add a channel, waiting if the limit would be + # exceeded ++ +
Or we can request that the exchange create a channel on our behalf:
+ ++channel = exchange.create() ++ +
We can even start processes and monitor channels without ever handling the +channel ourselves:
+ ++exchange.start(fn, arg1, arg2, named1=value1, named2=value2) ++ +
We can explicitly wait for "free space" for channels by calling the
+wait
method, although the start
and add_wait
+methods make this less interesting:
+exchange.wait() ++ +
Finally, when finishing the computation, we can choose to merely call the
+finish
method and have the remaining data processed automatically:
+exchange.finish() ++ +
Clearly, this approach is less flexible but more convenient than the raw message +exchange API as described above. However, it permits much simpler and clearer +code.
+ +Instead of having to subclass the pprocess.Exchange
class and
+to define the store_data
method, it might be more desirable to let
+the exchange manage the communications between created and creating processes
+and to let the creating process just consume received data as it arrives,
+without particular regard for the order of the received data - perhaps the
+creating process has its own way of managing such issues.
For such situations, the Queue
class may be instantiated and
+channels added to the queue using the various methods provided:
+queue = pprocess.Queue(limit=10) +channel = queue.create() +if channel: + # Do some computation. + pprocess.exit(channel) ++ +
The results can then be consumed by treating the queue like an iterator:
+ ++for result in queue: + # Capture each result. ++ +
This approach does not, of course, require the direct handling of channels.
+One could instead use the start
method on the queue to create
+processes and to initiate computations (since a queue is merely an enhanced
+exchange with a specific implementation of the store_data
+method).
Where the above Queue
class appears like an attractive solution
+for the management of the results of computations, but where the order of their
+consumption by the creating process remains important, the Map
+class may offer a suitable way of collecting and accessing results:
+results = pprocess.Map(limit=10) +for value in inputs: + results.start(fn, args) ++ +
The results can then be consumed in an order corresponding to the order of the +computations which produced them:
+ ++for result in results: + # Process each result. ++ +
Internally, the Map
object records a particular ordering of
+channels, ensuring that the received results can be mapped to this ordering,
+and that the results can be made available with this ordering preserved.
A further simplification of the above convenient use of message exchanges
+involves the creation of callables (eg. functions) which are automatically
+monitored by an exchange. We create such a callable by calling the
+manage
method on an exchange:
+myfn = exchange.manage(fn) ++ +
This callable can then be invoked instead of using the exchange's
+start
method:
+myfn(arg1, arg2, named1=value1, named2=value2) ++ +
The exchange's finish
method can be used as usual to process
+incoming data.
In making a program parallel, existing functions which only return results can
+be manually modified to accept and use channels to communicate results back to
+the main process. However, a simple alternative is to use the MakeParallel
+class to provide a wrapper around unmodified functions which will return the results
+from those functions in the channels provided. For example:
+fn = pprocess.MakeParallel(originalfn) ++ +
In situations where a callable would normally be used in conjunction with the
+Python built-in map
function, an alternative solution can be adopted by using
+the pmap
function:
+pprocess.pmap(fn, sequence) ++ +
Here, the sequence would have to contain elements that each contain the
+required parameters of the specified callable, fn
. Note that the
+callable does not need to be a parallel-aware function which has a
+channel
argument: the pmap
function automatically
+wraps the given callable internally.
So far, all parallel computations have been done with newly-created
+processes. However, this can seem somewhat inefficient, especially if processes
+are being continually created and destroyed (although if this happens too
+often, the amount of work done by each process may be too little, anyway). One
+solution is to retain processes after they have done their work and request
+that they perform more work for each new parallel task or invocation. To enable
+the reuse of processes in this way, a special keyword argument may be specified
+when creating Exchange
instances (and instances of subclasses such
+as Map
and Queue
). For example:
+exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes ++ +
Code invoked through such exchanges must be aware of channels and be
+constructed in such a way that it does not terminate after sending a result
+back to the creating process. Instead, it should repeatedly wait for subsequent
+sets of parameters (compatible with those either in the signature of a callable
+or with the original values read from the channel). Reusable code is terminated
+when the special value of None
is sent from the creating process
+to the created process, indicating that no more parameters will be sent; this
+should cause the code to terminate.
An easier way of making reusable code sections for parallel use is to employ the
+MakeReusable
class to wrap an existing callable:
+fn = pprocess.MakeReusable(originalfn) ++ +
This wraps the callable in a similar fashion to MakeParallel
, but
+provides the necessary mechanisms described above for reusable code.
When created/child processes terminate, one would typically want to be informed +of such conditions using a signal handler. Unfortunately, Python seems to have +issues with restartable reads from file descriptors when interrupted by signals:
+ +The exact combination of conditions indicating closed pipes remains relatively +obscure. Here is a message/thread describing them (in the context of another +topic):
+ + + +It would seem, from using sockets and from studying the asyncore
+module, that sockets are more predictable than pipes.
Notes about poll
implementations can be found here: