# HG changeset patch # User paulb # Date 1195862418 0 # Node ID 7e614f0cccb6d1e67a47f7553e130dc912566f2c # Parent 2b077110ab34ad0edb39a594a21a7affc13b2cc8 [project @ 2007-11-24 00:00:18 by paulb] Moved the reference material from the module docstring to a separate document. diff -r 2b077110ab34 -r 7e614f0cccb6 docs/reference.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/docs/reference.html Sat Nov 24 00:00:18 2007 +0000 @@ -0,0 +1,367 @@ + + + + + pprocess - Reference + + + + +

pprocess - Reference

+ +

The pprocess module defines a simple parallel processing API +for Python, inspired somewhat by the thread module, slightly less +by pypar, and slightly +less still by pypvm.

+ +

This document complements the tutorial by +providing an overview of the different styles of parallel programming supported +by the module. For an introduction and in order to get a clearer idea of the +most suitable styles for your own programs, consult the +tutorial.

+ + + +

Thread-style Processing

+ +

To create new processes to run a function or any callable object, specify the +"callable" and any arguments as follows:

+ +
+channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
+
+ +

This returns a channel which can then be used to communicate with the created +process. Meanwhile, in the created process, the given callable will be invoked +with another channel as its first argument followed by the specified arguments:

+ +
+def fn(channel, arg1, arg2, named1, named2):
+    # Read from and write to the channel.
+    # Return value is ignored.
+    ...
+
+ +

Fork-style Processing

+ +

To create new processes in a similar way to that employed when using os.fork +(ie. the fork system call on various operating systems), use the following +method:

+ +
+channel = pprocess.create()
+if channel.pid == 0:
+    # This code is run by the created process.
+    # Read from and write to the channel to communicate with the
+    # creating/calling process.
+    # An explicit exit of the process may be desirable to prevent the process
+    # from running code which is intended for the creating/calling process.
+    ...
+    pprocess.exit(channel)
+else:
+    # This code is run by the creating/calling process.
+    # Read from and write to the channel to communicate with the created
+    # process.
+    ...
+
+ +

Message Exchanges

+ +

When creating many processes, each providing results for the consumption of the +main process, the collection of those results in an efficient fashion can be +problematic: if some processes take longer than others, and if we decide to read +from those processes when they are not ready instead of other processes which +are ready, the whole activity will take much longer than necessary.

+ +

One solution to the problem of knowing when to read from channels is to create +an Exchange object, optionally initialising it with a list of channels +through which data is expected to arrive:

+ +
+exchange = pprocess.Exchange()           # populate the exchange later
+exchange = pprocess.Exchange(channels)   # populate the exchange with channels
+
+ +

We can add channels to the exchange using the add method:

+ +
+exchange.add(channel)
+
+ +

To test whether an exchange is active - that is, whether it is actually +monitoring any channels - we can use the active method which +returns all channels being monitored by the exchange:

+ +
+channels = exchange.active()
+
+ +

We may then check the exchange to see whether any data is ready to be received; +for example:

+ +
+for channel in exchange.ready():
+    # Read from and write to the channel.
+    ...
+
+ +

If we do not wish to wait indefinitely for a list of channels, we can set a +timeout value as an argument to the ready method (as a floating +point number specifying the timeout in seconds, where 0 means a +non-blocking poll as stated in the select module's select +function documentation).

+ +

Convenient Message Exchanges

+ +

A convenient form of message exchanges can be adopted by defining a subclass of +the Exchange class and defining a particular method:

+ +
+class MyExchange(pprocess.Exchange):
+    def store_data(self, channel):
+        data = channel.receive()
+        # Do something with data here.
+
+ +

The exact operations performed on the received data might be as simple as +storing it on an instance attribute. To make use of the exchange, we would +instantiate it as usual:

+ +
+exchange = MyExchange()         # populate the exchange later
+exchange = MyExchange(limit=10) # set a limit for later population
+
+ +

The exchange can now be used in a simpler fashion than that shown above. We can +add channels as before using the add method, or we can choose to only +add channels if the specified limit of channels is not exceeded:

+ +
+exchange.add(channel)           # add a channel as normal
+exchange.add_wait(channel)      # add a channel, waiting if the limit would be
+                                # exceeded
+
+ +

Or we can request that the exchange create a channel on our behalf:

+ +
+channel = exchange.create()
+
+ +

We can even start processes and monitor channels without ever handling the +channel ourselves:

+ +
+exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
+
+ +

We can explicitly wait for "free space" for channels by calling the +wait method, although the start and add_wait +methods make this less interesting:

+ +
+exchange.wait()
+
+ +

Finally, when finishing the computation, we can choose to merely call the +finish method and have the remaining data processed automatically:

+ +
+exchange.finish()
+
+ +

Clearly, this approach is less flexible but more convenient than the raw message +exchange API as described above. However, it permits much simpler and clearer +code.

+ +

Exchanges as Queues

+ +

Instead of having to subclass the pprocess.Exchange class and +to define the store_data method, it might be more desirable to let +the exchange manage the communications between created and creating processes +and to let the creating process just consume received data as it arrives, +without particular regard for the order of the received data - perhaps the +creating process has its own way of managing such issues.

+ +

For such situations, the Queue class may be instantiated and +channels added to the queue using the various methods provided:

+ +
+queue = pprocess.Queue(limit=10)
+channel = queue.create()
+if channel:
+    # Do some computation.
+    pprocess.exit(channel)
+
+ +

The results can then be consumed by treating the queue like an iterator:

+ +
+for result in queue:
+    # Capture each result.
+
+ +

This approach does not, of course, require the direct handling of channels. +One could instead use the start method on the queue to create +processes and to initiate computations (since a queue is merely an enhanced +exchange with a specific implementation of the store_data +method).

+ +

Exchanges as Maps

+ +

Where the above Queue class appears like an attractive solution +for the management of the results of computations, but where the order of their +consumption by the creating process remains important, the Map +class may offer a suitable way of collecting and accessing results:

+ +
+results = pprocess.Map(limit=10)
+for value in inputs:
+    results.start(fn, args)
+
+ +

The results can then be consumed in an order corresponding to the order of the +computations which produced them:

+ +
+for result in results:
+    # Process each result.
+
+ +

Internally, the Map object records a particular ordering of +channels, ensuring that the received results can be mapped to this ordering, +and that the results can be made available with this ordering preserved.

+ +

Managed Callables

+ +

A further simplification of the above convenient use of message exchanges +involves the creation of callables (eg. functions) which are automatically +monitored by an exchange. We create such a callable by calling the +manage method on an exchange:

+ +
+myfn = exchange.manage(fn)
+
+ +

This callable can then be invoked instead of using the exchange's +start method:

+ +
+myfn(arg1, arg2, named1=value1, named2=value2)
+
+ +

The exchange's finish method can be used as usual to process +incoming data.

+ +

Making Existing Functions Parallel

+ +

In making a program parallel, existing functions which only return results can +be manually modified to accept and use channels to communicate results back to +the main process. However, a simple alternative is to use the MakeParallel +class to provide a wrapper around unmodified functions which will return the results +from those functions in the channels provided. For example:

+ +
+fn = pprocess.MakeParallel(originalfn)
+
+ +

Map-style Processing

+ +

In situations where a callable would normally be used in conjunction with the +Python built-in map function, an alternative solution can be adopted by using +the pmap function:

+ +
+pprocess.pmap(fn, sequence)
+
+ +

Here, the sequence would have to contain elements that each contain the +required parameters of the specified callable, fn. Note that the +callable does not need to be a parallel-aware function which has a +channel argument: the pmap function automatically +wraps the given callable internally.

+ +

Reusing Processes and Channels

+ +

So far, all parallel computations have been done with newly-created +processes. However, this can seem somewhat inefficient, especially if processes +are being continually created and destroyed (although if this happens too +often, the amount of work done by each process may be too little, anyway). One +solution is to retain processes after they have done their work and request +that they perform more work for each new parallel task or invocation. To enable +the reuse of processes in this way, a special keyword argument may be specified +when creating Exchange instances (and instances of subclasses such +as Map and Queue). For example:

+ +
+exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
+
+ +

Code invoked through such exchanges must be aware of channels and be +constructed in such a way that it does not terminate after sending a result +back to the creating process. Instead, it should repeatedly wait for subsequent +sets of parameters (compatible with those either in the signature of a callable +or with the original values read from the channel). Reusable code is terminated +when the special value of None is sent from the creating process +to the created process, indicating that no more parameters will be sent; this +should cause the code to terminate.

+ +

Making Existing Functions Parallel and Reusable

+ +

An easier way of making reusable code sections for parallel use is to employ the +MakeReusable class to wrap an existing callable:

+ +
+fn = pprocess.MakeReusable(originalfn)
+
+ +

This wraps the callable in a similar fashion to MakeParallel, but +provides the necessary mechanisms described above for reusable code.

+ +

Implementation Notes

+ +

Signals and Waiting

+ +

When created/child processes terminate, one would typically want to be informed +of such conditions using a signal handler. Unfortunately, Python seems to have +issues with restartable reads from file descriptors when interrupted by signals:

+ + + +

Select and Poll

+ +

The exact combination of conditions indicating closed pipes remains relatively +obscure. Here is a message/thread describing them (in the context of another +topic):

+ + + +

It would seem, from using sockets and from studying the asyncore +module, that sockets are more predictable than pipes.

+ +

Notes about poll implementations can be found here:

+ + + + +