# HG changeset patch # User paulb # Date 1189975594 0 # Node ID 5e6e212d57e123136602fc1ba751b6eb727d5eb8 # Parent 88f103df4316522322a27f02269370b3fbd4ab67 [project @ 2007-09-16 20:46:34 by paulb] Renamed tutorial document. diff -r 88f103df4316 -r 5e6e212d57e1 docs/tutorial.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/docs/tutorial.html Sun Sep 16 20:46:34 2007 +0000 @@ -0,0 +1,626 @@ + + + + + pprocess - Tutorial + + + + +

pprocess - Tutorial

+ +

The pprocess module provides several mechanisms for running +Python code concurrently in several processes. The most straightforward way of +making a program parallel-aware - that is, where the program can take +advantage of more than one processor to simultaneously process data - is to +use the pmap function.

+ +

Converting Map-Style Code

+ +

Consider a program using the built-in map function and a sequence of inputs:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    sequence = []
+    for i in range(0, N):
+        for j in range(0, N):
+            sequence.append((i, j))
+
+    # Perform the work.
+
+    results = map(calculate, sequence)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+ +

(This code in context with import statements and functions is +found in the examples/simple_map.py file.)

+ +

The principal features of this program involve the preparation of an array +for input purposes, and the use of the map function to iterate +over the combinations of i and j in the array. Even +if the calculate function could be invoked independently for each +input value, we have to wait for each computation to complete before +initiating a new one. The calculate function may be defined as +follows:

+ +
+def calculate(t):
+
+    "A supposedly time-consuming calculation on 't'."
+
+    i, j = t
+    time.sleep(delay)
+    return i * N + j
+
+ +

In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    sequence = []
+    for i in range(0, N):
+        for j in range(0, N):
+            sequence.append((i, j))
+
+    # Perform the work.
+
+    results = pprocess.pmap(calculate, sequence, limit=limit)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+ +

(This code in context with import statements and functions is +found in the examples/simple_pmap.py file.)

+ +

By replacing usage of the map function with the +pprocess.pmap function, and specifying the limit on the number of +processes to be active at any given time (the value of the limit +variable is defined elsewhere), several calculations can now be performed in +parallel.

+ +

Converting Invocations to Parallel Operations

+ +

Although some programs make natural use of the map function, +others may employ an invocation in a nested loop. This may also be converted +to a parallel program. Consider the following Python code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    results = []
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            results.append(calculate(i, j))
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+ +

(This code in context with import statements and functions is +found in the examples/simple1.py file.)

+ +

Here, a computation in the calculate function is performed for +each combination of i and j in the nested loop, +returning a result value. However, we must wait for the completion of this +function for each element before moving on to the next element, and this means +that the computations are performed sequentially. Consequently, on a system +with more than one processor, even if we could call calculate for +more than one combination of i and j +and have the computations executing at the same time, the above program will +not take advantage of such capabilities.

+ +

We use a slightly modified version of calculate which employs +two parameters instead of one:

+ +
+def calculate(i, j):
+
+    """
+    A supposedly time-consuming calculation on 'i' and 'j'.
+    """
+
+    time.sleep(delay)
+    return i * N + j
+
+ +

In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:

+ +
+    t = time.time()
+
+    # Initialise the results using a map with a limit on the number of
+    # channels/processes.
+
+    results = pprocess.Map(limit=limit)
+
+    # Wrap the calculate function and manage it.
+
+    calc = results.manage(pprocess.MakeParallel(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+ +

(This code in context with import statements and functions is +found in the examples/simple_managed_map.py file.)

+ +

The principal changes in the above code involve the use of a +pprocess.Map object to collect the results, and a version of the +calculate function which is managed by the Map +object. What the Map object does is to arrange the results of +computations such that iterating over the object or accessing the object using +list operations provides the results in the same order as their corresponding +inputs.

+ +

Converting Arbitrarily-Ordered Invocations

+ +

In some programs, it is not important to receive the results of +computations in any particular order, usually because either the order of +these results is irrelevant, or because the results provide "positional" +information which let them be handled in an appropriate way. Consider the +following Python code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            i2, j2, result = calculate(i, j)
+            results[i2*N+j2] = result
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple2.py file.)

+ +

Here, a result array is initialised first and each computation is performed +sequentially. A significant difference to the previous examples is the return +value of the calculate function: the position details +corresponding to i and j are returned alongside the +result. Obviously, this is of limited value in the above code because the +order of the computations and the reception of results is fixed. However, we +get no benefit from parallelisation in the above example.

+ +

We can bring the benefits of parallel processing to the above program with +the following code:

+ +
+    t = time.time()
+
+    # Initialise the communications queue with a limit on the number of
+    # channels/processes.
+
+    queue = pprocess.Queue(limit=limit)
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Wrap the calculate function and manage it.
+
+    calc = queue.manage(pprocess.MakeParallel(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Store the results as they arrive.
+
+    print "Finishing..."
+    for i, j, result in queue:
+        results[i*N+j] = result
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_managed_queue.py file.)

+ +

This revised code employs a pprocess.Queue object whose +purpose is to collect the results of computations and to make them available +in the order in which they were received. The code collecting results has been +moved into a separate loop independent of the original computation loop and +taking advantage of the more relevant "positional" information emerging from +the queue.

+ +

We can take this example further, illustrating some of the mechanisms +employed by pprocess. Instead of collecting results in a queue, +we can define a class containing a method which is called when new results +arrive:

+ +
+class MyExchange(pprocess.Exchange):
+
+    "Parallel convenience class containing the array assignment operation."
+
+    def store_data(self, ch):
+        i, j, result = ch.receive()
+        self.D[i*N+j] = result
+
+ +

This code exposes the channel paradigm which is used throughout +pprocess and is available to applications, if desired. The effect +of the method is the storage of a result received through the channel in an +attribute of the object. The following code shows how this class can be used, +with differences to the previous program illustrated:

+ +
+    t = time.time()
+
+    # Initialise the communications exchange with a limit on the number of
+    # channels/processes.
+
+    exchange = MyExchange(limit=limit)
+
+    # Initialise an array - it is stored in the exchange to permit automatic
+    # assignment of values as the data arrives.
+
+    results = exchange.D = [0] * N * N
+
+    # Wrap the calculate function and manage it.
+
+    calc = exchange.manage(pprocess.MakeParallel(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Wait for the results.
+
+    print "Finishing..."
+    exchange.finish()
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_managed.py file.)

+ +

The main visible differences between this and the previous program are the +storage of the result array in the exchange, the removal of the queue +consumption code from the main program, placing the act of storing values in +the exchange's store_data method, and the need to call the +finish method on the MyExchange object so that we do +not try and access the results too soon. One underlying benefit not visible in +the above code is that we no longer need to accumulate results in a queue or +other structure so that they may be processed and assigned to the correct +positions in the result array.

+ +

For the curious, we may remove some of the remaining conveniences of the +above program to expose other features of pprocess. First, we +define a slightly modified version of the calculate function:

+ +
+def calculate(ch, i, j):
+
+    """
+    A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
+    communicate with the parent process.
+    """
+
+    time.sleep(delay)
+    ch.send((i, j, i * N + j))
+
+ +

This function accepts a channel, ch, through which results +will be sent, and through which other values could potentially be received, +although we choose not to do so here. The program using this function is as +follows, with differences to the previous program illustrated:

+ +
+    t = time.time()
+
+    # Initialise the communications exchange with a limit on the number of
+    # channels/processes.
+
+    exchange = MyExchange(limit=limit)
+
+    # Initialise an array - it is stored in the exchange to permit automatic
+    # assignment of values as the data arrives.
+
+    results = exchange.D = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            exchange.start(calculate, i, j)
+
+    # Wait for the results.
+
+    print "Finishing..."
+    exchange.finish()
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_start.py file.)

+ +

Here, we have discarded two conveniences: the wrapping of callables using +MakeParallel, which lets us use functions without providing any +channel parameters, and the management of callables using the +manage method on queues, exchanges, and so on. The +start method still calls the provided callable, but using a +different notation from that employed previously.

+ +

Converting Inline Computations

+ +

Although many programs employ functions and other useful abstractions which +can be treated as parallelisable units, some programs perform computations +"inline", meaning that the code responsible appears directly within a loop or +related control-flow construct. Consider the following code:

+ +
+    t = time.time()
+
+    # Initialise an array.
+
+    results = [0] * N * N
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            time.sleep(delay)
+            results[i*N+j] = i * N + j
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple.py file.)

+ +

To simulate "work", as in the different versions of the +calculate function, we use the time.sleep function +(which does not actually do work, and which will cause a process to be +descheduled in most cases, but which simulates the delay associated with work +being done). This inline work, which must be performed sequentially in the +above program, can be performed in parallel in a somewhat modified version of +the program:

+ +
+    t = time.time()
+
+    # Initialise the results using a map with a limit on the number of
+    # channels/processes.
+
+    results = pprocess.Map(limit=limit)
+
+    # Perform the work.
+    # NOTE: Could use the with statement in the loop to package the
+    # NOTE: try...finally functionality.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            ch = results.create()
+            if ch:
+                try: # Calculation work.
+
+                    time.sleep(delay)
+                    ch.send(i * N + j)
+
+                finally: # Important finalisation.
+
+                    pprocess.exit(ch)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_create_map.py file.)

+ +

Although seemingly more complicated, the bulk of the changes in this +modified program are focused on obtaining a channel object, ch, +at the point where the computations are performed, and the wrapping of the +computation code in a try...finally statement which +ensures that the process associated with the channel exits when the +computation is complete. In order for the results of these computations to be +collected, a pprocess.Map object is used, since it will maintain +the results in the same order as the initiation of the computations which +produced them.

+ +

Reusing Processes in Parallel Programs

+ +

One notable aspect of the above programs when parallelised is that each +invocation of a computation in parallel creates a new process in which the +computation is to be performed, regardless of whether existing processes had +just finished producing results and could theoretically have been asked to +perform new computations. In other words, processes were created and destroyed +instead of being reused.

+ +

However, we can request that processes be reused for computations by +enabling the reuse feature of exchange-like objects and employing +suitable reusable callables. Consider this modified version of the simple_managed_map program:

+ +
+    t = time.time()
+
+    # Initialise the results using a map with a limit on the number of
+    # channels/processes.
+
+    results = pprocess.Map(limit=limit, reuse=1)
+
+    # Wrap the calculate function and manage it.
+
+    calc = results.manage(pprocess.MakeReusable(calculate))
+
+    # Perform the work.
+
+    print "Calculating..."
+    for i in range(0, N):
+        for j in range(0, N):
+            calc(i, j)
+
+    # Show the results.
+
+    for i in range(0, N):
+        for result in results[i*N:i*N+N]:
+            print result,
+        print
+
+    print "Time taken:", time.time() - t
+
+ +

(This code in context with import statements and functions is +found in the examples/simple_manage_map_reusable.py file.)

+ +

By indicating that processes and channels shall be reused, and by wrapping +the calculate function with the necessary support, the +computations may be performed in parallel using a pool of processes instead of +creating a new process for each computation and then discarding it, only to +create a new process for the next computation.

+ +

Summary

+ +

The following table indicates the features used in converting one +sequential example program to another parallel program:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Sequential ExampleParallel ExampleFeatures Used
simple_mapsimple_pmappmap
simple1simple_managed_mapMakeParallel, Map, manage
simple2simple_managed_queueMakeParallel, Queue, manage
simple_managedMakeParallel, Exchange (subclass), manage, finish
simple_startChannel, Exchange (subclass), start, finish
simplesimple_create_mapChannel, Map, create, exit
+ + + diff -r 88f103df4316 -r 5e6e212d57e1 docs/tutorial.xhtml --- a/docs/tutorial.xhtml Sun Sep 16 19:38:51 2007 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,626 +0,0 @@ - - - - - pprocess - Tutorial - - - - -

pprocess - Tutorial

- -

The pprocess module provides several mechanisms for running -Python code concurrently in several processes. The most straightforward way of -making a program parallel-aware - that is, where the program can take -advantage of more than one processor to simultaneously process data - is to -use the pmap function.

- -

Converting Map-Style Code

- -

Consider a program using the built-in map function and a sequence of inputs:

- -
-    t = time.time()
-
-    # Initialise an array.
-
-    sequence = []
-    for i in range(0, N):
-        for j in range(0, N):
-            sequence.append((i, j))
-
-    # Perform the work.
-
-    results = map(calculate, sequence)
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
- -

(This code in context with import statements and functions is -found in the examples/simple_map.py file.)

- -

The principal features of this program involve the preparation of an array -for input purposes, and the use of the map function to iterate -over the combinations of i and j in the array. Even -if the calculate function could be invoked independently for each -input value, we have to wait for each computation to complete before -initiating a new one. The calculate function may be defined as -follows:

- -
-def calculate(t):
-
-    "A supposedly time-consuming calculation on 't'."
-
-    i, j = t
-    time.sleep(delay)
-    return i * N + j
-
- -

In order to reduce the processing time - to speed the code up, in other -words - we can make this code use several processes instead of just one. Here -is the modified code:

- -
-    t = time.time()
-
-    # Initialise an array.
-
-    sequence = []
-    for i in range(0, N):
-        for j in range(0, N):
-            sequence.append((i, j))
-
-    # Perform the work.
-
-    results = pprocess.pmap(calculate, sequence, limit=limit)
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
- -

(This code in context with import statements and functions is -found in the examples/simple_pmap.py file.)

- -

By replacing usage of the map function with the -pprocess.pmap function, and specifying the limit on the number of -processes to be active at any given time (the value of the limit -variable is defined elsewhere), several calculations can now be performed in -parallel.

- -

Converting Invocations to Parallel Operations

- -

Although some programs make natural use of the map function, -others may employ an invocation in a nested loop. This may also be converted -to a parallel program. Consider the following Python code:

- -
-    t = time.time()
-
-    # Initialise an array.
-
-    results = []
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            results.append(calculate(i, j))
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
- -

(This code in context with import statements and functions is -found in the examples/simple1.py file.)

- -

Here, a computation in the calculate function is performed for -each combination of i and j in the nested loop, -returning a result value. However, we must wait for the completion of this -function for each element before moving on to the next element, and this means -that the computations are performed sequentially. Consequently, on a system -with more than one processor, even if we could call calculate for -more than one combination of i and j -and have the computations executing at the same time, the above program will -not take advantage of such capabilities.

- -

We use a slightly modified version of calculate which employs -two parameters instead of one:

- -
-def calculate(i, j):
-
-    """
-    A supposedly time-consuming calculation on 'i' and 'j'.
-    """
-
-    time.sleep(delay)
-    return i * N + j
-
- -

In order to reduce the processing time - to speed the code up, in other -words - we can make this code use several processes instead of just one. Here -is the modified code:

- -
-    t = time.time()
-
-    # Initialise the results using a map with a limit on the number of
-    # channels/processes.
-
-    results = pprocess.Map(limit=limit)
-
-    # Wrap the calculate function and manage it.
-
-    calc = results.manage(pprocess.MakeParallel(calculate))
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            calc(i, j)
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
- -

(This code in context with import statements and functions is -found in the examples/simple_managed_map.py file.)

- -

The principal changes in the above code involve the use of a -pprocess.Map object to collect the results, and a version of the -calculate function which is managed by the Map -object. What the Map object does is to arrange the results of -computations such that iterating over the object or accessing the object using -list operations provides the results in the same order as their corresponding -inputs.

- -

Converting Arbitrarily-Ordered Invocations

- -

In some programs, it is not important to receive the results of -computations in any particular order, usually because either the order of -these results is irrelevant, or because the results provide "positional" -information which let them be handled in an appropriate way. Consider the -following Python code:

- -
-    t = time.time()
-
-    # Initialise an array.
-
-    results = [0] * N * N
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            i2, j2, result = calculate(i, j)
-            results[i2*N+j2] = result
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple2.py file.)

- -

Here, a result array is initialised first and each computation is performed -sequentially. A significant difference to the previous examples is the return -value of the calculate function: the position details -corresponding to i and j are returned alongside the -result. Obviously, this is of limited value in the above code because the -order of the computations and the reception of results is fixed. However, we -get no benefit from parallelisation in the above example.

- -

We can bring the benefits of parallel processing to the above program with -the following code:

- -
-    t = time.time()
-
-    # Initialise the communications queue with a limit on the number of
-    # channels/processes.
-
-    queue = pprocess.Queue(limit=limit)
-
-    # Initialise an array.
-
-    results = [0] * N * N
-
-    # Wrap the calculate function and manage it.
-
-    calc = queue.manage(pprocess.MakeParallel(calculate))
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            calc(i, j)
-
-    # Store the results as they arrive.
-
-    print "Finishing..."
-    for i, j, result in queue:
-        results[i*N+j] = result
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple_managed_queue.py file.)

- -

This revised code employs a pprocess.Queue object whose -purpose is to collect the results of computations and to make them available -in the order in which they were received. The code collecting results has been -moved into a separate loop independent of the original computation loop and -taking advantage of the more relevant "positional" information emerging from -the queue.

- -

We can take this example further, illustrating some of the mechanisms -employed by pprocess. Instead of collecting results in a queue, -we can define a class containing a method which is called when new results -arrive:

- -
-class MyExchange(pprocess.Exchange):
-
-    "Parallel convenience class containing the array assignment operation."
-
-    def store_data(self, ch):
-        i, j, result = ch.receive()
-        self.D[i*N+j] = result
-
- -

This code exposes the channel paradigm which is used throughout -pprocess and is available to applications, if desired. The effect -of the method is the storage of a result received through the channel in an -attribute of the object. The following code shows how this class can be used, -with differences to the previous program illustrated:

- -
-    t = time.time()
-
-    # Initialise the communications exchange with a limit on the number of
-    # channels/processes.
-
-    exchange = MyExchange(limit=limit)
-
-    # Initialise an array - it is stored in the exchange to permit automatic
-    # assignment of values as the data arrives.
-
-    results = exchange.D = [0] * N * N
-
-    # Wrap the calculate function and manage it.
-
-    calc = exchange.manage(pprocess.MakeParallel(calculate))
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            calc(i, j)
-
-    # Wait for the results.
-
-    print "Finishing..."
-    exchange.finish()
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple_managed.py file.)

- -

The main visible differences between this and the previous program are the -storage of the result array in the exchange, the removal of the queue -consumption code from the main program, placing the act of storing values in -the exchange's store_data method, and the need to call the -finish method on the MyExchange object so that we do -not try and access the results too soon. One underlying benefit not visible in -the above code is that we no longer need to accumulate results in a queue or -other structure so that they may be processed and assigned to the correct -positions in the result array.

- -

For the curious, we may remove some of the remaining conveniences of the -above program to expose other features of pprocess. First, we -define a slightly modified version of the calculate function:

- -
-def calculate(ch, i, j):
-
-    """
-    A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
-    communicate with the parent process.
-    """
-
-    time.sleep(delay)
-    ch.send((i, j, i * N + j))
-
- -

This function accepts a channel, ch, through which results -will be sent, and through which other values could potentially be received, -although we choose not to do so here. The program using this function is as -follows, with differences to the previous program illustrated:

- -
-    t = time.time()
-
-    # Initialise the communications exchange with a limit on the number of
-    # channels/processes.
-
-    exchange = MyExchange(limit=limit)
-
-    # Initialise an array - it is stored in the exchange to permit automatic
-    # assignment of values as the data arrives.
-
-    results = exchange.D = [0] * N * N
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            exchange.start(calculate, i, j)
-
-    # Wait for the results.
-
-    print "Finishing..."
-    exchange.finish()
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple_start.py file.)

- -

Here, we have discarded two conveniences: the wrapping of callables using -MakeParallel, which lets us use functions without providing any -channel parameters, and the management of callables using the -manage method on queues, exchanges, and so on. The -start method still calls the provided callable, but using a -different notation from that employed previously.

- -

Converting Inline Computations

- -

Although many programs employ functions and other useful abstractions which -can be treated as parallelisable units, some programs perform computations -"inline", meaning that the code responsible appears directly within a loop or -related control-flow construct. Consider the following code:

- -
-    t = time.time()
-
-    # Initialise an array.
-
-    results = [0] * N * N
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            time.sleep(delay)
-            results[i*N+j] = i * N + j
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple.py file.)

- -

To simulate "work", as in the different versions of the -calculate function, we use the time.sleep function -(which does not actually do work, and which will cause a process to be -descheduled in most cases, but which simulates the delay associated with work -being done). This inline work, which must be performed sequentially in the -above program, can be performed in parallel in a somewhat modified version of -the program:

- -
-    t = time.time()
-
-    # Initialise the results using a map with a limit on the number of
-    # channels/processes.
-
-    results = pprocess.Map(limit=limit)
-
-    # Perform the work.
-    # NOTE: Could use the with statement in the loop to package the
-    # NOTE: try...finally functionality.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            ch = results.create()
-            if ch:
-                try: # Calculation work.
-
-                    time.sleep(delay)
-                    ch.send(i * N + j)
-
-                finally: # Important finalisation.
-
-                    pprocess.exit(ch)
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple_create_map.py file.)

- -

Although seemingly more complicated, the bulk of the changes in this -modified program are focused on obtaining a channel object, ch, -at the point where the computations are performed, and the wrapping of the -computation code in a try...finally statement which -ensures that the process associated with the channel exits when the -computation is complete. In order for the results of these computations to be -collected, a pprocess.Map object is used, since it will maintain -the results in the same order as the initiation of the computations which -produced them.

- -

Reusing Processes in Parallel Programs

- -

One notable aspect of the above programs when parallelised is that each -invocation of a computation in parallel creates a new process in which the -computation is to be performed, regardless of whether existing processes had -just finished producing results and could theoretically have been asked to -perform new computations. In other words, processes were created and destroyed -instead of being reused.

- -

However, we can request that processes be reused for computations by -enabling the reuse feature of exchange-like objects and employing -suitable reusable callables. Consider this modified version of the simple_managed_map program:

- -
-    t = time.time()
-
-    # Initialise the results using a map with a limit on the number of
-    # channels/processes.
-
-    results = pprocess.Map(limit=limit, reuse=1)
-
-    # Wrap the calculate function and manage it.
-
-    calc = results.manage(pprocess.MakeReusable(calculate))
-
-    # Perform the work.
-
-    print "Calculating..."
-    for i in range(0, N):
-        for j in range(0, N):
-            calc(i, j)
-
-    # Show the results.
-
-    for i in range(0, N):
-        for result in results[i*N:i*N+N]:
-            print result,
-        print
-
-    print "Time taken:", time.time() - t
-
- -

(This code in context with import statements and functions is -found in the examples/simple_manage_map_reusable.py file.)

- -

By indicating that processes and channels shall be reused, and by wrapping -the calculate function with the necessary support, the -computations may be performed in parallel using a pool of processes instead of -creating a new process for each computation and then discarding it, only to -create a new process for the next computation.

- -

Summary

- -

The following table indicates the features used in converting one -sequential example program to another parallel program:

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Sequential ExampleParallel ExampleFeatures Used
simple_mapsimple_pmappmap
simple1simple_managed_mapMakeParallel, Map, manage
simple2simple_managed_queueMakeParallel, Queue, manage
simple_managedMakeParallel, Exchange (subclass), manage, finish
simple_startChannel, Exchange (subclass), start, finish
simplesimple_create_mapChannel, Map, create, exit
- - -