# HG changeset patch # User paulb # Date 1189900025 0 # Node ID 7022b1c3ec9e8325af06d34f7c2ef33f962945cd # Parent 761f6fca5e2edcc5e560315c033e4823ff69529d [project @ 2007-09-15 23:47:05 by paulb] Covered most of the examples. diff -r 761f6fca5e2e -r 7022b1c3ec9e docs/tutorial.xhtml --- a/docs/tutorial.xhtml Sat Sep 15 23:46:55 2007 +0000 +++ b/docs/tutorial.xhtml Sat Sep 15 23:47:05 2007 +0000 @@ -3,17 +3,17 @@
The pprocess
-module provides several mechanisms for running Python code concurrently
-in several processes. The most straightforward way of making a program
-parallel-aware - that is, where the program can take advantage of more
-than one processor to simultaneously process data - is to use the
-pmap
function.
The pprocess
module provides several mechanisms for running
+Python code concurrently in several processes. The most straightforward way of
+making a program parallel-aware - that is, where the program can take
+advantage of more than one processor to simultaneously process data - is to
+use the pmap
function.
(This code in context with import
statements and functions is found in the examples/simple_map.py
file.)
(This code in context with import
statements and functions is
+found in the examples/simple_map.py
file.)
The principal features of this program involve the preparation of an array
+for input purposes, and the use of the map
function to iterate
+over the combinations of i
and j
in the array. Even
+if the calculate
function could be invoked independently for each
+input value, we have to wait for each computation to complete before
+initiating a new one. The calculate
function may be defined as
+follows:
The principal features of this program involve the preparation of an array for input purposes, and the use of the map
function to iterate over the combinations of i
and j
in the array. Even if the calculate
-function could be invoked independently for each input value, we have
-to wait for each computation to complete before initiating a new
-one.
+def calculate(t): + + "A supposedly time-consuming calculation on 't'." -+ +In order to reduce the processing time - to speed the code up, -in other words - we can make this code use several processes instead of -just one. Here is the modified code:
+ i, j = t + time.sleep(delay) + return i * N + j +
In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:
t = time.time() @@ -76,15 +90,20 @@ print "Time taken:", time.time() - t-
(This code in context with import
statements and functions is found in the examples/simple_pmap.py
file.)
(This code in context with import
statements and functions is
+found in the examples/simple_pmap.py
file.)
By replacing usage of the map
function with the pprocess.pmap
-function, and specifying the limit on the number of processes to be active at any
-given time, several calculations can now be performed in parallel.
By replacing usage of the map
function with the
+pprocess.pmap
function, and specifying the limit on the number of
+processes to be active at any given time (the value of the limit
+variable is defined elsewhere), several calculations can now be performed in
+parallel.
Although some programs make natural use of the map
function, others may employ an invocation in a nested loop. This may also be converted to a parallel program. Consider the following Python code:
Although some programs make natural use of the map
function,
+others may employ an invocation in a nested loop. This may also be converted
+to a parallel program. Consider the following Python code:
t = time.time() @@ -109,23 +128,41 @@ print "Time taken:", time.time() - t-
(This code in context with import
statements and functions is found in the examples/simple1.py
file.)
(This code in context with import
statements and functions is
+found in the examples/simple1.py
file.)
Here, a computation in the calculate
function is performed for
+each combination of i
and j
in the nested loop,
+returning a result value. However, we must wait for the completion of this
+function for each element before moving on to the next element, and this means
+that the computations are performed sequentially. Consequently, on a system
+with more than one processor, even if we could call calculate
for
+more than one combination of i
and j
+and have the computations executing at the same time, the above program will
+not take advantage of such capabilities.
Here, a computation in the calculate
function is performed for each combination of i
and j
-in the nested loop, returning a result value. However, we must wait for
-the completion of this function for each element before moving on to
-the next element, and this means that the computations are performed
-sequentially. Consequently, on a system with more than one processor,
-even if we could call calculate
for more than one combination of i
and j
and have the computations executing at the same time, the above program will not take advantage of such capabilities.
We use a slightly modified version of calculate
which employs
+two parameters instead of one:
+def calculate(i, j): -+ +In order to reduce the processing time - to speed the code up, -in other words - we can make this code use several processes instead of -just one. Here is the modified code:
+ """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + time.sleep(delay) + return i * N + j +
In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:
t = time.time()
- # Initialise the results using map with a limit on the number of
+ # Initialise the results using a map with a limit on the number of
# channels/processes.
results = pprocess.Map(limit=limit)
@@ -150,12 +187,343 @@
print "Time taken:", time.time() - t
-(This code in context with import
statements and functions is found in the examples/simple_manage_map.py
file.)
(This code in context with import
statements and functions is
+found in the examples/simple_manage_map.py
file.)
The principal changes in the above code involve the use of a
+pprocess.Map
object to collect the results, and a version of the
+calculate
function which is managed by the Map
+object. What the Map
object does is to arrange the results of
+computations such that iterating over the object or accessing the object using
+list operations provides the results in the same order as their corresponding
+inputs.
In some programs, it is not important to receive the results of +computations in any particular order, usually because either the order of +these results is irrelevant, or because the results provide "positional" +information which let them be handled in an appropriate way. Consider the +following Python code:
+ ++ t = time.time() + + # Initialise an array. + + results = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + i2, j2, result = calculate(i, j) + results[i2*N+j2] = result + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple2.py
file.)
Here, a result array is initialised first and each computation is performed
+sequentially. A significant difference to the previous examples is the return
+value of the calculate
function: the position details
+corresponding to i
and j
are returned alongside the
+result. Obviously, this is of limited value in the above code because the
+order of the computations and the reception of results is fixed. However, we
+get no benefit from parallelisation in the above example.
We can bring the benefits of parallel processing to the above program with +the following code:
+ ++ t = time.time() + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_managed_queue.py
file.)
This revised code employs a pprocess.Queue
object whose
+purpose is to collect the results of computations and to make them available
+in the order in which they were received. The code collecting results has been
+moved into a separate loop independent of the original computation loop and
+taking advantage of the more relevant "positional" information emerging from
+the queue.
We can take this example further, illustrating some of the mechanisms
+employed by pprocess
. Instead of collecting results in a queue,
+we can define a class containing a method which is called when new results
+arrive:
+class MyExchange(pprocess.Exchange): + + "Parallel convenience class containing the array assignment operation." + + def store_data(self, ch): + i, j, result = ch.receive() + self.D[i*N+j] = result ++ +
This code exposes the channel paradigm which is used throughout
+pprocess
and is available to applications, if desired. The effect
+of the method is the storage of a result received through the channel in an
+attribute of the object. The following code shows how this class can be used,
+with differences to the previous program illustrated:
+ t = time.time() + + # Initialise the communications exchange with a limit on the number of + # channels/processes. + + exchange = MyExchange(limit=limit) + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = exchange.D = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = exchange.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Wait for the results. + + print "Finishing..." + exchange.finish() + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t +-
The principal changes in the above code involve the use of a pprocess.Map
object to collect the results, and a version of the calculate
function which is managed by the Map
object. What the Map
-object does is to arrange the results of computations such that
-iterating over the object or accessing the object using list operations
-provides the results in the same order as their corresponding inputs.
(This code in context with import
statements and functions is
+found in the examples/simple_managed.py
file.)
The main visible differences between this and the previous program are the
+storage of the result array in the exchange, the removal of the queue
+consumption code from the main program, placing the act of storing values in
+the exchange's store_data
method, and the need to call the
+finish
method on the MyExchange
object so that we do
+not try and access the results too soon. One underlying benefit not visible in
+the above code is that we no longer need to accumulate results in a queue or
+other structure so that they may be processed and assigned to the correct
+positions in the result array.
For the curious, we may remove some of the remaining conveniences of the
+above program to expose other features of pprocess
. First, we
+define a slightly modified version of the calculate
function:
+def calculate(ch, i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to + communicate with the parent process. + """ + + time.sleep(delay) + ch.send((i, j, i * N + j)) ++ +
This function accepts a channel, ch
, through which results
+will be sent, and through which other values could potentially be received,
+although we choose not to do so here. The program using this function is as
+follows, with differences to the previous program illustrated:
+ t = time.time() + + # Initialise the communications exchange with a limit on the number of + # channels/processes. + + exchange = MyExchange(limit=limit) + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = exchange.D = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + exchange.start(calculate, i, j) + + # Wait for the results. + + print "Finishing..." + exchange.finish() + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_start.py
file.)
Here, we have discarded two conveniences: the wrapping of callables using
+MakeParallel
, which lets us use functions without providing any
+channel parameters, and the management of callables using the
+manage
method on queues, exchanges, and so on. The
+start
method still calls the provided callable, but using a
+different notation from that employed previously.
Although many programs employ functions and other useful abstractions which +can be treated as parallelisable units, some programs perform computations +"inline", meaning that the code responsible appears directly within a loop or +related control-flow construct. Consider the following code:
+ ++ t = time.time() + + # Initialise an array. + + results = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + time.sleep(delay) + results[i*N+j] = i * N + j + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple.py
file.)
To simulate "work", as in the different versions of the
+calculate
function, we use the time.sleep
function
+(which does not actually do work, and which will cause a process to be
+descheduled in most cases, but which simulates the delay associated with work
+being done). This inline work, which must be performed sequentially in the
+above program, can be performed in parallel in a somewhat modified version of
+the program:
+ t = time.time() + + # Initialise the results using a map with a limit on the number of + # channels/processes. + + results = pprocess.Map(limit=limit) + + # Perform the work. + # NOTE: Could use the with statement in the loop to package the + # NOTE: try...finally functionality. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + ch = results.create() + if ch: + try: # Calculation work. + + time.sleep(delay) + ch.send(i * N + j) + + finally: # Important finalisation. + + pprocess.exit(ch) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_create_map.py
file.)
Although seemingly more complicated, the bulk of the changes in this
+modified program are focused on obtaining a channel object, ch
,
+at the point where the computations are performed, and the wrapping of the
+computation code in a try
...finally
statement which
+ensures that the process associated with the channel exits when the
+computation is complete. In order for the results of these computations to be
+collected, a pprocess.Map
object is used, since it will maintain
+the results in the same order as the initiation of the computations which
+produced them.