# HG changeset patch # User paulb # Date 1212446623 0 # Node ID 614aa092ba4e7bcb0608e7c752fe2ab0d2eb8e4d # Parent c9753c628f21c86c33195a04cf172ac264f13933 [project @ 2008-06-02 22:43:43 by paulb] Added details of the persistent and background features and examples. diff -r c9753c628f21 -r 614aa092ba4e docs/reference.html --- a/docs/reference.html Mon Jun 02 22:43:11 2008 +0000 +++ b/docs/reference.html Mon Jun 02 22:43:43 2008 +0000 @@ -32,6 +32,8 @@
This wraps the callable in a similar fashion to MakeParallel
, but
provides the necessary mechanisms described above for reusable code.
So far, all parallel computations have involved created processes which +depend on the existence of the created process to collect results and to +communicate with these created processes, preventing the created process from +terminating, even if the created processes actually perform work and potentially +create output which need not concern the process which created them. In order to +separate creating and created processes, the concept of a background process +(also known as a daemon process) is introduced.
+ +The BackgroundCallable
class acts somewhat like the
+manage
method on exchange-based objects, although no exchange is
+immediately involved, and instances of BackgroundCallable
provide
+wrappers around existing parallel-aware callables which then be invoked in order
+to initiate a background computation in a created process. For example:
+backgroundfn = pprocess.BackgroundCallable(address, fn) ++ +
This wraps the supplied callable (which can itself be the result of using
+MakeParallel
), with the resulting wrapper lending itself to
+invocation like any other function. One distinguishing feature is that of the
+address
: in order to contact the background process after
+invocation to (amongst other things) receive any result, a specific address
+must be given to define the contact point between the created process and any
+processes seeking to connect to it. Since these "persistent" communications
+employ special files (specifically UNIX-domain sockets), the address must be a
+suitable filename.
Background processes employing persistent communications require adaptations
+of the facilities described in the sections above. For a single background
+process, the BackgroundQueue
function is sufficient to create a
+queue-like object which can monitor the communications channel between the
+connecting process and a background process. For example:
+queue = pprocess.BackgroundQueue(address) ++ +
This code will cause the process reachable via the given address
+to be contacted and any results made available via the created queue-like
+object.
Where many background processes have been created, a single
+PersistentQueue
object can monitor their communications by being
+connected to them all, as in the following example:
+queue = pprocess.PersistentQueue() +for address in addresses: + queue.connect(address) ++ +
Here, the queue monitors all previously created processes whose addresses
+reside in the addresses
sequence. Upon iterating over the queue,
+results will be taken from whichever process happens to have data available in
+no particular pre-defined order.
pmap
function.
-Consider a program using the built-in map
function and a sequence of inputs:
Although some programs make natural use of the map
function,
others may employ an invocation in a nested loop. This may also be converted
@@ -198,7 +209,7 @@
list operations provides the results in the same order as their corresponding
inputs.
In some programs, it is not important to receive the results of computations in any particular order, usually because either the order of @@ -245,7 +256,7 @@
We can bring the benefits of parallel processing to the above program with the following code:
-+t = time.time() # Initialise the communications queue with a limit on the number of @@ -433,7 +444,7 @@start
method still calls the provided callable, but using a different notation from that employed previously. -Converting Inline Computations
+Converting Inline Computations
Although many programs employ functions and other useful abstractions which can be treated as parallelisable units, some programs perform computations @@ -525,7 +536,7 @@ the results in the same order as the initiation of the computations which produced them.
-Reusing Processes in Parallel Programs
+Reusing Processes in Parallel Programs
One notable aspect of the above programs when parallelised is that each invocation of a computation in parallel creates a new process in which the @@ -577,7 +588,273 @@ creating a new process for each computation and then discarding it, only to create a new process for the next computation.
-Summary
+Performing Computations in Background Processes
+ +Occasionally, it is desirable to initiate time-consuming computations and to +not only leave such processes running in the background, but to be able to detach +the creating process from them completely, potentially terminating the creating +process altogether, and yet also be able to collect the results of the created +processes at a later time, potentially in another completely different process. +For such situations, we can make use of the
+ +BackgroundCallable
+class, which converts a parallel-aware callable into a callable which will run +in a background process when invoked.Consider this excerpt from a modified version of the simple_managed_queue program:
+ ++def task(): + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + return results ++ +Here, we have converted the main program into a function, and instead of +printing out the results, we return the results list from the function.
+ +Now, let us consider the new main program (with the relevant mechanisms +highlighted):
+ ++ t = time.time() + + if "--reconnect" not in sys.argv: + + # Wrap the computation and manage it. + + ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task)) + + # Perform the work. + + ptask() + + # Discard the callable. + + del ptask + print "Discarded the callable." + + if "--start" not in sys.argv: + + # Open a queue and reconnect to the task. + + print "Opening a queue." + queue = pprocess.BackgroundQueue("task.socket") + + # Wait for the results. + + print "Waiting for persistent results" + for results in queue: + pass # should only be one element + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +(This code in context with
+ +import
statements and functions is +found in theexamples/simple_background_queue.py
file.)This new main program has two parts: the part which initiates the +computation, and the part which connects to the computation in order to collect +the results. Both parts can be run in the same process, and this should result +in similar behaviour to that of the original +simple_managed_queue program.
+ +In the above program, however, we are free to specify
+ +--start
as +an option when running the program, and the result of this is merely to initiate +the computation in a background process, usingBackgroundCallable
+to obtain a callable which, when invoked, creates the background process and +runs the computation. After doing this, the program will then exit, but it will +leave the computation running as a collection of background processes, and a +special file calledtask.socket
will exist in the current working +directory.When the above program is run using the
+ +--reconnect
option, an +attempt will be made to reconnect to the background processes already created by +attempting to contact them using the previously createdtask.socket
+special file (which is, in fact, a UNIX-domain socket); this being done using +theBackgroundQueue
function which will handle the incoming results +in a fashion similar to that of aQueue
object. Since only one +result is returned by the computation (as defined by thereturn
+statement in thetask
function), we need only expect one element to +be collected by the queue: a list containing all of the results produced in the +computation.Managing Several Background Processes
+ +In the above example, a single background process was used to manage a number +of other processes, with all of them running in the background. However, it can +be desirable to manage more than one background process.
+ +Consider this excerpt from a modified version of the simple_managed_queue program:
+ ++def task(i): + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[j] = result + + return i, results ++ +Just as we see in the previous example, a function called
+ +task
+has been defined to hold a background computation, and this function returns a +portion of the results. However, unlike the previous example or the original +example, the scope of the results of the computation collected in the function +have been changed: here, only results for calculations involving a certain value +ofi
are collected, with the particular value ofi
+returned along with the appropriate portion of the results.Now, let us consider the new main program (with the relevant mechanisms +highlighted):
+ ++ t = time.time() + + if "--reconnect" not in sys.argv: + + # Wrap the computation and manage it. + + ptask = pprocess.MakeParallel(task) + + for i in range(0, N): + + # Make a distinct callable for each part of the computation. + + ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask) + + # Perform the work. + + ptask_i(i) + + # Discard the callable. + + del ptask + print "Discarded the callable." + + if "--start" not in sys.argv: + + # Open a queue and reconnect to the task. + + print "Opening a queue." + queue = pprocess.PersistentQueue() + for i in range(0, N): + queue.connect("task-%d.socket" % i) + + # Initialise an array. + + results = [0] * N + + # Wait for the results. + + print "Waiting for persistent results" + for i, result in queue: + results[i] = result + + # Show the results. + + for i in range(0, N): + for result in results[i]: + print result, + print + + print "Time taken:", time.time() - t ++ +(This code in context with
+ +import
statements and functions is +found in theexamples/simple_persistent_queue.py
file.)In the first section, the process of making a parallel-aware callable is as +expected, but instead of then invoking a single background version of such a +callable, as in the previous example, we create a version for each value of +
+ +i
(usingBackgroundCallable
) and then invoke each one. +The result of this is a total ofN
background processes, each +running an invocation of thetask
function with a distinct value of +i
(which in turn perform computations), and each employing a +UNIX-domain socket for communication with a name of the form +task-i.socket
.In the second section, since we now have more than one background process, we +must find a way to monitor them after reconnecting to them; to achieve this, a +
+ +PersistentQueue
is created, which acts like a regular +Queue
object but is instead focused on handling persistent +communications. Upon connecting the queue to each of the previously created +UNIX-domain sockets, the queue acts like a regularQueue
and +exposes received results through an iterator. Here, the principal difference +from previous examples is the structure of results: instead of collecting each +individual value in a flati
byj
array, a list is +returned for each value ofi
and is stored directly in another +list.Applications of Background Computations
+ +Background computations are useful because they provide flexibility in the +way the results can be collected. One area in which they can be useful is Web +programming, where a process handling an incoming HTTP request may need to +initiate a computation but then immediately send output to the Web client - such +as a page indicating that the computation is "in progress" - without having to +wait for the computation or to allocate resources to monitor it. Moreover, in +some Web architectures, notably those employing the Common Gateway Interface +(CGI), it is necessary for a process handling an incoming request to terminate +before its output will be sent to clients. By using a +
+ +BackgroundCallable
, a Web server process can initiate a +communication, and then subsequent server processes can be used to reconnect to +the background computation and to wait efficiently for results.Summary
The following table indicates the features used in converting one sequential example program to another parallel program:
@@ -602,7 +879,7 @@MakeParallel, Map, manage - @@ -615,6 +892,14 @@simple2 +simple2 simple_managed_queue MakeParallel, Queue, manage Channel, Exchange (subclass), start, finish + +simple_background_queue +MakeParallel, BackgroundCallable, BackgroundQueue ++ +simple_persistent_queue +MakeParallel, BackgroundCallable, PersistentQueue +simple simple_create_map Channel, Map, create, exit