# HG changeset patch # User paulb # Date 1189900025 0 # Node ID 7022b1c3ec9e8325af06d34f7c2ef33f962945cd # Parent 761f6fca5e2edcc5e560315c033e4823ff69529d [project @ 2007-09-15 23:47:05 by paulb] Covered most of the examples. diff -r 761f6fca5e2e -r 7022b1c3ec9e docs/tutorial.xhtml --- a/docs/tutorial.xhtml Sat Sep 15 23:46:55 2007 +0000 +++ b/docs/tutorial.xhtml Sat Sep 15 23:47:05 2007 +0000 @@ -3,17 +3,17 @@ pprocess - Tutorial +

pprocess - Tutorial

-

The pprocess -module provides several mechanisms for running Python code concurrently -in several processes. The most straightforward way of making a program -parallel-aware - that is, where the program can take advantage of more -than one processor to simultaneously process data - is to use the -pmap function.

+

The pprocess module provides several mechanisms for running +Python code concurrently in several processes. The most straightforward way of +making a program parallel-aware - that is, where the program can take +advantage of more than one processor to simultaneously process data - is to +use the pmap function.

Converting Map-Style Code

@@ -42,16 +42,30 @@ print "Time taken:", time.time() - t -

(This code in context with import statements and functions is found in the examples/simple_map.py file.)

+

(This code in context with import statements and functions is +found in the examples/simple_map.py file.)

+ +

The principal features of this program involve the preparation of an array +for input purposes, and the use of the map function to iterate +over the combinations of i and j in the array. Even +if the calculate function could be invoked independently for each +input value, we have to wait for each computation to complete before +initiating a new one. The calculate function may be defined as +follows:

-

The principal features of this program involve the preparation of an array for input purposes, and the use of the map function to iterate over the combinations of i and j in the array. Even if the calculate -function could be invoked independently for each input value, we have -to wait for each computation to complete before initiating a new -one.

+
+def calculate(t):
+
+    "A supposedly time-consuming calculation on 't'."
 
-

In order to reduce the processing time - to speed the code up, -in other words - we can make this code use several processes instead of -just one. Here is the modified code:

+ i, j = t + time.sleep(delay) + return i * N + j +
+ +

In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:

     t = time.time()
@@ -76,15 +90,20 @@
 
     print "Time taken:", time.time() - t
-

(This code in context with import statements and functions is found in the examples/simple_pmap.py file.)

+

(This code in context with import statements and functions is +found in the examples/simple_pmap.py file.)

-

By replacing usage of the map function with the pprocess.pmap -function, and specifying the limit on the number of processes to be active at any -given time, several calculations can now be performed in parallel.

+

By replacing usage of the map function with the +pprocess.pmap function, and specifying the limit on the number of +processes to be active at any given time (the value of the limit +variable is defined elsewhere), several calculations can now be performed in +parallel.

Converting Invocations to Parallel Operations

-

Although some programs make natural use of the map function, others may employ an invocation in a nested loop. This may also be converted to a parallel program. Consider the following Python code:

+

Although some programs make natural use of the map function, +others may employ an invocation in a nested loop. This may also be converted +to a parallel program. Consider the following Python code:

     t = time.time()
@@ -109,23 +128,41 @@
 
     print "Time taken:", time.time() - t
-

(This code in context with import statements and functions is found in the examples/simple1.py file.)

+

(This code in context with import statements and functions is +found in the examples/simple1.py file.)

+ +

Here, a computation in the calculate function is performed for +each combination of i and j in the nested loop, +returning a result value. However, we must wait for the completion of this +function for each element before moving on to the next element, and this means +that the computations are performed sequentially. Consequently, on a system +with more than one processor, even if we could call calculate for +more than one combination of i and j +and have the computations executing at the same time, the above program will +not take advantage of such capabilities.

-

Here, a computation in the calculate function is performed for each combination of i and j -in the nested loop, returning a result value. However, we must wait for -the completion of this function for each element before moving on to -the next element, and this means that the computations are performed -sequentially. Consequently, on a system with more than one processor, -even if we could call calculate for more than one combination of i and j and have the computations executing at the same time, the above program will not take advantage of such capabilities.

+

We use a slightly modified version of calculate which employs +two parameters instead of one:

+ +
+def calculate(i, j):
 
-

In order to reduce the processing time - to speed the code up, -in other words - we can make this code use several processes instead of -just one. Here is the modified code:

+ """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + time.sleep(delay) + return i * N + j +
+ +

In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:

     t = time.time()
 
-    # Initialise the results using map with a limit on the number of
+    # Initialise the results using a map with a limit on the number of
     # channels/processes.
 
     results = pprocess.Map(limit=limit)
@@ -150,12 +187,343 @@
 
     print "Time taken:", time.time() - t
-

(This code in context with import statements and functions is found in the examples/simple_manage_map.py file.)

+

(This code in context with import statements and functions is +found in the examples/simple_manage_map.py file.)

+ +

The principal changes in the above code involve the use of a +pprocess.Map object to collect the results, and a version of the +calculate function which is managed by the Map +object. What the Map object does is to arrange the results of +computations such that iterating over the object or accessing the object using +list operations provides the results in the same order as their corresponding +inputs.

+ +

Converting Arbitrarily-Ordered Invocations

+ +

In some programs, it is not important to receive the results of +computations in any particular order, usually because either the order of +these results is irrelevant, or because the results provide "positional" +information which let them be handled in an appropriate way. Consider the +following Python code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            i2, j2, result = calculate(i, j)
+            results[i2*N+j2] = result
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple2.py file.)

+ +

Here, a result array is initialised first and each computation is performed +sequentially. A significant difference to the previous examples is the return +value of the calculate function: the position details +corresponding to i and j are returned alongside the +result. Obviously, this is of limited value in the above code because the +order of the computations and the reception of results is fixed. However, we +get no benefit from parallelisation in the above example.

+ +

We can bring the benefits of parallel processing to the above program with +the following code:

+ +
+    t = time.time()
+
+    # Initialise the communications queue with a limit on the number of
+    # channels/processes.
+
+    queue = pprocess.Queue(limit=limit)
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Wrap the calculate function and manage it.
+
+    calc = queue.manage(pprocess.MakeParallel(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Store the results as they arrive.
+
+    print "Finishing..."
+    for i, j, result in queue:
+        results[i*N+j] = result
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_managed_queue.py file.)

+ +

This revised code employs a pprocess.Queue object whose +purpose is to collect the results of computations and to make them available +in the order in which they were received. The code collecting results has been +moved into a separate loop independent of the original computation loop and +taking advantage of the more relevant "positional" information emerging from +the queue.

+ +

We can take this example further, illustrating some of the mechanisms +employed by pprocess. Instead of collecting results in a queue, +we can define a class containing a method which is called when new results +arrive:

+ +
+class MyExchange(pprocess.Exchange):
+
+    "Parallel convenience class containing the array assignment operation."
+
+    def store_data(self, ch):
+        i, j, result = ch.receive()
+        self.D[i*N+j] = result
+
+ +

This code exposes the channel paradigm which is used throughout +pprocess and is available to applications, if desired. The effect +of the method is the storage of a result received through the channel in an +attribute of the object. The following code shows how this class can be used, +with differences to the previous program illustrated:

+ +
+    t = time.time()
+
+    # Initialise the communications exchange with a limit on the number of
+    # channels/processes.
+
+    exchange = MyExchange(limit=limit)
+
+    # Initialise an array - it is stored in the exchange to permit automatic
+    # assignment of values as the data arrives.
+
+    results = exchange.D = [0] * N * N
+
+    # Wrap the calculate function and manage it.
+
+    calc = exchange.manage(pprocess.MakeParallel(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Wait for the results.
+
+    print "Finishing..."
+    exchange.finish()
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
-

The principal changes in the above code involve the use of a pprocess.Map object to collect the results, and a version of the calculate function which is managed by the Map object. What the Map -object does is to arrange the results of computations such that -iterating over the object or accessing the object using list operations -provides the results in the same order as their corresponding inputs.

+

(This code in context with import statements and functions is +found in the examples/simple_managed.py file.)

+ +

The main visible differences between this and the previous program are the +storage of the result array in the exchange, the removal of the queue +consumption code from the main program, placing the act of storing values in +the exchange's store_data method, and the need to call the +finish method on the MyExchange object so that we do +not try and access the results too soon. One underlying benefit not visible in +the above code is that we no longer need to accumulate results in a queue or +other structure so that they may be processed and assigned to the correct +positions in the result array.

+ +

For the curious, we may remove some of the remaining conveniences of the +above program to expose other features of pprocess. First, we +define a slightly modified version of the calculate function:

+ +
+def calculate(ch, i, j):
+
+    """
+    A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
+    communicate with the parent process.
+    """
+
+    time.sleep(delay)
+    ch.send((i, j, i * N + j))
+
+ +

This function accepts a channel, ch, through which results +will be sent, and through which other values could potentially be received, +although we choose not to do so here. The program using this function is as +follows, with differences to the previous program illustrated:

+ +
+    t = time.time()
+
+    # Initialise the communications exchange with a limit on the number of
+    # channels/processes.
+
+    exchange = MyExchange(limit=limit)
+
+    # Initialise an array - it is stored in the exchange to permit automatic
+    # assignment of values as the data arrives.
+
+    results = exchange.D = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            exchange.start(calculate, i, j)
+
+    # Wait for the results.
+
+    print "Finishing..."
+    exchange.finish()
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_start.py file.)

+ +

Here, we have discarded two conveniences: the wrapping of callables using +MakeParallel, which lets us use functions without providing any +channel parameters, and the management of callables using the +manage method on queues, exchanges, and so on. The +start method still calls the provided callable, but using a +different notation from that employed previously.

+ +

Converting Inline Computations

+ +

Although many programs employ functions and other useful abstractions which +can be treated as parallelisable units, some programs perform computations +"inline", meaning that the code responsible appears directly within a loop or +related control-flow construct. Consider the following code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            time.sleep(delay)
+            results[i*N+j] = i * N + j
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple.py file.)

+ +

To simulate "work", as in the different versions of the +calculate function, we use the time.sleep function +(which does not actually do work, and which will cause a process to be +descheduled in most cases, but which simulates the delay associated with work +being done). This inline work, which must be performed sequentially in the +above program, can be performed in parallel in a somewhat modified version of +the program:

+ +
+    t = time.time()
+
+    # Initialise the results using a map with a limit on the number of
+    # channels/processes.
+
+    results = pprocess.Map(limit=limit)
+
+    # Perform the work.
+    # NOTE: Could use the with statement in the loop to package the
+    # NOTE: try...finally functionality.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            ch = results.create()
+            if ch:
+                try: # Calculation work.
+
+                    time.sleep(delay)
+                    ch.send(i * N + j)
+
+                finally: # Important finalisation.
+
+                    pprocess.exit(ch)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_create_map.py file.)

+ +

Although seemingly more complicated, the bulk of the changes in this +modified program are focused on obtaining a channel object, ch, +at the point where the computations are performed, and the wrapping of the +computation code in a try...finally statement which +ensures that the process associated with the channel exits when the +computation is complete. In order for the results of these computations to be +collected, a pprocess.Map object is used, since it will maintain +the results in the same order as the initiation of the computations which +produced them.