# HG changeset patch # User paulb # Date 1189975594 0 # Node ID 5e6e212d57e123136602fc1ba751b6eb727d5eb8 # Parent 88f103df4316522322a27f02269370b3fbd4ab67 [project @ 2007-09-16 20:46:34 by paulb] Renamed tutorial document. diff -r 88f103df4316 -r 5e6e212d57e1 docs/tutorial.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/docs/tutorial.html Sun Sep 16 20:46:34 2007 +0000 @@ -0,0 +1,626 @@ + + +
+ +The pprocess
module provides several mechanisms for running
+Python code concurrently in several processes. The most straightforward way of
+making a program parallel-aware - that is, where the program can take
+advantage of more than one processor to simultaneously process data - is to
+use the pmap
function.
Consider a program using the built-in map
function and a sequence of inputs:
+ t = time.time() + + # Initialise an array. + + sequence = [] + for i in range(0, N): + for j in range(0, N): + sequence.append((i, j)) + + # Perform the work. + + results = map(calculate, sequence) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t+ +
(This code in context with import
statements and functions is
+found in the examples/simple_map.py
file.)
The principal features of this program involve the preparation of an array
+for input purposes, and the use of the map
function to iterate
+over the combinations of i
and j
in the array. Even
+if the calculate
function could be invoked independently for each
+input value, we have to wait for each computation to complete before
+initiating a new one. The calculate
function may be defined as
+follows:
+def calculate(t): + + "A supposedly time-consuming calculation on 't'." + + i, j = t + time.sleep(delay) + return i * N + j ++ +
In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:
+ ++ t = time.time() + + # Initialise an array. + + sequence = [] + for i in range(0, N): + for j in range(0, N): + sequence.append((i, j)) + + # Perform the work. + + results = pprocess.pmap(calculate, sequence, limit=limit) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t+ +
(This code in context with import
statements and functions is
+found in the examples/simple_pmap.py
file.)
By replacing usage of the map
function with the
+pprocess.pmap
function, and specifying the limit on the number of
+processes to be active at any given time (the value of the limit
+variable is defined elsewhere), several calculations can now be performed in
+parallel.
Although some programs make natural use of the map
function,
+others may employ an invocation in a nested loop. This may also be converted
+to a parallel program. Consider the following Python code:
+ t = time.time() + + # Initialise an array. + + results = [] + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + results.append(calculate(i, j)) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t+ +
(This code in context with import
statements and functions is
+found in the examples/simple1.py
file.)
Here, a computation in the calculate
function is performed for
+each combination of i
and j
in the nested loop,
+returning a result value. However, we must wait for the completion of this
+function for each element before moving on to the next element, and this means
+that the computations are performed sequentially. Consequently, on a system
+with more than one processor, even if we could call calculate
for
+more than one combination of i
and j
+and have the computations executing at the same time, the above program will
+not take advantage of such capabilities.
We use a slightly modified version of calculate
which employs
+two parameters instead of one:
+def calculate(i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j'. + """ + + time.sleep(delay) + return i * N + j ++ +
In order to reduce the processing time - to speed the code up, in other +words - we can make this code use several processes instead of just one. Here +is the modified code:
+ +
+ t = time.time()
+
+ # Initialise the results using a map with a limit on the number of
+ # channels/processes.
+
+ results = pprocess.Map(limit=limit)
+
+ # Wrap the calculate function and manage it.
+
+ calc = results.manage(pprocess.MakeParallel(calculate))
+
+ # Perform the work.
+
+ print "Calculating..."
+ for i in range(0, N):
+ for j in range(0, N):
+ calc(i, j)
+
+ # Show the results.
+
+ for i in range(0, N):
+ for result in results[i*N:i*N+N]:
+ print result,
+ print
+
+ print "Time taken:", time.time() - t
+
+(This code in context with import
statements and functions is
+found in the examples/simple_managed_map.py
file.)
The principal changes in the above code involve the use of a
+pprocess.Map
object to collect the results, and a version of the
+calculate
function which is managed by the Map
+object. What the Map
object does is to arrange the results of
+computations such that iterating over the object or accessing the object using
+list operations provides the results in the same order as their corresponding
+inputs.
In some programs, it is not important to receive the results of +computations in any particular order, usually because either the order of +these results is irrelevant, or because the results provide "positional" +information which let them be handled in an appropriate way. Consider the +following Python code:
+ ++ t = time.time() + + # Initialise an array. + + results = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + i2, j2, result = calculate(i, j) + results[i2*N+j2] = result + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple2.py
file.)
Here, a result array is initialised first and each computation is performed
+sequentially. A significant difference to the previous examples is the return
+value of the calculate
function: the position details
+corresponding to i
and j
are returned alongside the
+result. Obviously, this is of limited value in the above code because the
+order of the computations and the reception of results is fixed. However, we
+get no benefit from parallelisation in the above example.
We can bring the benefits of parallel processing to the above program with +the following code:
+ ++ t = time.time() + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit) + + # Initialise an array. + + results = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = queue.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_managed_queue.py
file.)
This revised code employs a pprocess.Queue
object whose
+purpose is to collect the results of computations and to make them available
+in the order in which they were received. The code collecting results has been
+moved into a separate loop independent of the original computation loop and
+taking advantage of the more relevant "positional" information emerging from
+the queue.
We can take this example further, illustrating some of the mechanisms
+employed by pprocess
. Instead of collecting results in a queue,
+we can define a class containing a method which is called when new results
+arrive:
+class MyExchange(pprocess.Exchange): + + "Parallel convenience class containing the array assignment operation." + + def store_data(self, ch): + i, j, result = ch.receive() + self.D[i*N+j] = result ++ +
This code exposes the channel paradigm which is used throughout
+pprocess
and is available to applications, if desired. The effect
+of the method is the storage of a result received through the channel in an
+attribute of the object. The following code shows how this class can be used,
+with differences to the previous program illustrated:
+ t = time.time() + + # Initialise the communications exchange with a limit on the number of + # channels/processes. + + exchange = MyExchange(limit=limit) + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = exchange.D = [0] * N * N + + # Wrap the calculate function and manage it. + + calc = exchange.manage(pprocess.MakeParallel(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Wait for the results. + + print "Finishing..." + exchange.finish() + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_managed.py
file.)
The main visible differences between this and the previous program are the
+storage of the result array in the exchange, the removal of the queue
+consumption code from the main program, placing the act of storing values in
+the exchange's store_data
method, and the need to call the
+finish
method on the MyExchange
object so that we do
+not try and access the results too soon. One underlying benefit not visible in
+the above code is that we no longer need to accumulate results in a queue or
+other structure so that they may be processed and assigned to the correct
+positions in the result array.
For the curious, we may remove some of the remaining conveniences of the
+above program to expose other features of pprocess
. First, we
+define a slightly modified version of the calculate
function:
+def calculate(ch, i, j): + + """ + A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to + communicate with the parent process. + """ + + time.sleep(delay) + ch.send((i, j, i * N + j)) ++ +
This function accepts a channel, ch
, through which results
+will be sent, and through which other values could potentially be received,
+although we choose not to do so here. The program using this function is as
+follows, with differences to the previous program illustrated:
+ t = time.time() + + # Initialise the communications exchange with a limit on the number of + # channels/processes. + + exchange = MyExchange(limit=limit) + + # Initialise an array - it is stored in the exchange to permit automatic + # assignment of values as the data arrives. + + results = exchange.D = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + exchange.start(calculate, i, j) + + # Wait for the results. + + print "Finishing..." + exchange.finish() + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_start.py
file.)
Here, we have discarded two conveniences: the wrapping of callables using
+MakeParallel
, which lets us use functions without providing any
+channel parameters, and the management of callables using the
+manage
method on queues, exchanges, and so on. The
+start
method still calls the provided callable, but using a
+different notation from that employed previously.
Although many programs employ functions and other useful abstractions which +can be treated as parallelisable units, some programs perform computations +"inline", meaning that the code responsible appears directly within a loop or +related control-flow construct. Consider the following code:
+ ++ t = time.time() + + # Initialise an array. + + results = [0] * N * N + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + time.sleep(delay) + results[i*N+j] = i * N + j + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple.py
file.)
To simulate "work", as in the different versions of the
+calculate
function, we use the time.sleep
function
+(which does not actually do work, and which will cause a process to be
+descheduled in most cases, but which simulates the delay associated with work
+being done). This inline work, which must be performed sequentially in the
+above program, can be performed in parallel in a somewhat modified version of
+the program:
+ t = time.time() + + # Initialise the results using a map with a limit on the number of + # channels/processes. + + results = pprocess.Map(limit=limit) + + # Perform the work. + # NOTE: Could use the with statement in the loop to package the + # NOTE: try...finally functionality. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + ch = results.create() + if ch: + try: # Calculation work. + + time.sleep(delay) + ch.send(i * N + j) + + finally: # Important finalisation. + + pprocess.exit(ch) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_create_map.py
file.)
Although seemingly more complicated, the bulk of the changes in this
+modified program are focused on obtaining a channel object, ch
,
+at the point where the computations are performed, and the wrapping of the
+computation code in a try
...finally
statement which
+ensures that the process associated with the channel exits when the
+computation is complete. In order for the results of these computations to be
+collected, a pprocess.Map
object is used, since it will maintain
+the results in the same order as the initiation of the computations which
+produced them.
One notable aspect of the above programs when parallelised is that each +invocation of a computation in parallel creates a new process in which the +computation is to be performed, regardless of whether existing processes had +just finished producing results and could theoretically have been asked to +perform new computations. In other words, processes were created and destroyed +instead of being reused.
+ +However, we can request that processes be reused for computations by
+enabling the reuse
feature of exchange-like objects and employing
+suitable reusable callables. Consider this modified version of the simple_managed_map program:
+ t = time.time() + + # Initialise the results using a map with a limit on the number of + # channels/processes. + + results = pprocess.Map(limit=limit, reuse=1) + + # Wrap the calculate function and manage it. + + calc = results.manage(pprocess.MakeReusable(calculate)) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + for j in range(0, N): + calc(i, j) + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_manage_map_reusable.py
file.)
By indicating that processes and channels shall be reused, and by wrapping
+the calculate
function with the necessary support, the
+computations may be performed in parallel using a pool of processes instead of
+creating a new process for each computation and then discarding it, only to
+create a new process for the next computation.
The following table indicates the features used in converting one +sequential example program to another parallel program:
+ +Sequential Example | +Parallel Example | +Features Used | +
---|---|---|
simple_map | +simple_pmap | +pmap | +
simple1 | +simple_managed_map | +MakeParallel, Map, manage | +
simple2 | +simple_managed_queue | +MakeParallel, Queue, manage | +
simple_managed | +MakeParallel, Exchange (subclass), manage, finish | +|
simple_start | +Channel, Exchange (subclass), start, finish | +|
simple | +simple_create_map | +Channel, Map, create, exit | +
The pprocess
module provides several mechanisms for running
-Python code concurrently in several processes. The most straightforward way of
-making a program parallel-aware - that is, where the program can take
-advantage of more than one processor to simultaneously process data - is to
-use the pmap
function.
Consider a program using the built-in map
function and a sequence of inputs:
- t = time.time() - - # Initialise an array. - - sequence = [] - for i in range(0, N): - for j in range(0, N): - sequence.append((i, j)) - - # Perform the work. - - results = map(calculate, sequence) - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t- -
(This code in context with import
statements and functions is
-found in the examples/simple_map.py
file.)
The principal features of this program involve the preparation of an array
-for input purposes, and the use of the map
function to iterate
-over the combinations of i
and j
in the array. Even
-if the calculate
function could be invoked independently for each
-input value, we have to wait for each computation to complete before
-initiating a new one. The calculate
function may be defined as
-follows:
-def calculate(t): - - "A supposedly time-consuming calculation on 't'." - - i, j = t - time.sleep(delay) - return i * N + j -- -
In order to reduce the processing time - to speed the code up, in other -words - we can make this code use several processes instead of just one. Here -is the modified code:
- -- t = time.time() - - # Initialise an array. - - sequence = [] - for i in range(0, N): - for j in range(0, N): - sequence.append((i, j)) - - # Perform the work. - - results = pprocess.pmap(calculate, sequence, limit=limit) - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t- -
(This code in context with import
statements and functions is
-found in the examples/simple_pmap.py
file.)
By replacing usage of the map
function with the
-pprocess.pmap
function, and specifying the limit on the number of
-processes to be active at any given time (the value of the limit
-variable is defined elsewhere), several calculations can now be performed in
-parallel.
Although some programs make natural use of the map
function,
-others may employ an invocation in a nested loop. This may also be converted
-to a parallel program. Consider the following Python code:
- t = time.time() - - # Initialise an array. - - results = [] - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - results.append(calculate(i, j)) - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t- -
(This code in context with import
statements and functions is
-found in the examples/simple1.py
file.)
Here, a computation in the calculate
function is performed for
-each combination of i
and j
in the nested loop,
-returning a result value. However, we must wait for the completion of this
-function for each element before moving on to the next element, and this means
-that the computations are performed sequentially. Consequently, on a system
-with more than one processor, even if we could call calculate
for
-more than one combination of i
and j
-and have the computations executing at the same time, the above program will
-not take advantage of such capabilities.
We use a slightly modified version of calculate
which employs
-two parameters instead of one:
-def calculate(i, j): - - """ - A supposedly time-consuming calculation on 'i' and 'j'. - """ - - time.sleep(delay) - return i * N + j -- -
In order to reduce the processing time - to speed the code up, in other -words - we can make this code use several processes instead of just one. Here -is the modified code:
- -
- t = time.time()
-
- # Initialise the results using a map with a limit on the number of
- # channels/processes.
-
- results = pprocess.Map(limit=limit)
-
- # Wrap the calculate function and manage it.
-
- calc = results.manage(pprocess.MakeParallel(calculate))
-
- # Perform the work.
-
- print "Calculating..."
- for i in range(0, N):
- for j in range(0, N):
- calc(i, j)
-
- # Show the results.
-
- for i in range(0, N):
- for result in results[i*N:i*N+N]:
- print result,
- print
-
- print "Time taken:", time.time() - t
-
-(This code in context with import
statements and functions is
-found in the examples/simple_managed_map.py
file.)
The principal changes in the above code involve the use of a
-pprocess.Map
object to collect the results, and a version of the
-calculate
function which is managed by the Map
-object. What the Map
object does is to arrange the results of
-computations such that iterating over the object or accessing the object using
-list operations provides the results in the same order as their corresponding
-inputs.
In some programs, it is not important to receive the results of -computations in any particular order, usually because either the order of -these results is irrelevant, or because the results provide "positional" -information which let them be handled in an appropriate way. Consider the -following Python code:
- -- t = time.time() - - # Initialise an array. - - results = [0] * N * N - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - i2, j2, result = calculate(i, j) - results[i2*N+j2] = result - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple2.py
file.)
Here, a result array is initialised first and each computation is performed
-sequentially. A significant difference to the previous examples is the return
-value of the calculate
function: the position details
-corresponding to i
and j
are returned alongside the
-result. Obviously, this is of limited value in the above code because the
-order of the computations and the reception of results is fixed. However, we
-get no benefit from parallelisation in the above example.
We can bring the benefits of parallel processing to the above program with -the following code:
- -- t = time.time() - - # Initialise the communications queue with a limit on the number of - # channels/processes. - - queue = pprocess.Queue(limit=limit) - - # Initialise an array. - - results = [0] * N * N - - # Wrap the calculate function and manage it. - - calc = queue.manage(pprocess.MakeParallel(calculate)) - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - calc(i, j) - - # Store the results as they arrive. - - print "Finishing..." - for i, j, result in queue: - results[i*N+j] = result - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple_managed_queue.py
file.)
This revised code employs a pprocess.Queue
object whose
-purpose is to collect the results of computations and to make them available
-in the order in which they were received. The code collecting results has been
-moved into a separate loop independent of the original computation loop and
-taking advantage of the more relevant "positional" information emerging from
-the queue.
We can take this example further, illustrating some of the mechanisms
-employed by pprocess
. Instead of collecting results in a queue,
-we can define a class containing a method which is called when new results
-arrive:
-class MyExchange(pprocess.Exchange): - - "Parallel convenience class containing the array assignment operation." - - def store_data(self, ch): - i, j, result = ch.receive() - self.D[i*N+j] = result -- -
This code exposes the channel paradigm which is used throughout
-pprocess
and is available to applications, if desired. The effect
-of the method is the storage of a result received through the channel in an
-attribute of the object. The following code shows how this class can be used,
-with differences to the previous program illustrated:
- t = time.time() - - # Initialise the communications exchange with a limit on the number of - # channels/processes. - - exchange = MyExchange(limit=limit) - - # Initialise an array - it is stored in the exchange to permit automatic - # assignment of values as the data arrives. - - results = exchange.D = [0] * N * N - - # Wrap the calculate function and manage it. - - calc = exchange.manage(pprocess.MakeParallel(calculate)) - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - calc(i, j) - - # Wait for the results. - - print "Finishing..." - exchange.finish() - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple_managed.py
file.)
The main visible differences between this and the previous program are the
-storage of the result array in the exchange, the removal of the queue
-consumption code from the main program, placing the act of storing values in
-the exchange's store_data
method, and the need to call the
-finish
method on the MyExchange
object so that we do
-not try and access the results too soon. One underlying benefit not visible in
-the above code is that we no longer need to accumulate results in a queue or
-other structure so that they may be processed and assigned to the correct
-positions in the result array.
For the curious, we may remove some of the remaining conveniences of the
-above program to expose other features of pprocess
. First, we
-define a slightly modified version of the calculate
function:
-def calculate(ch, i, j): - - """ - A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to - communicate with the parent process. - """ - - time.sleep(delay) - ch.send((i, j, i * N + j)) -- -
This function accepts a channel, ch
, through which results
-will be sent, and through which other values could potentially be received,
-although we choose not to do so here. The program using this function is as
-follows, with differences to the previous program illustrated:
- t = time.time() - - # Initialise the communications exchange with a limit on the number of - # channels/processes. - - exchange = MyExchange(limit=limit) - - # Initialise an array - it is stored in the exchange to permit automatic - # assignment of values as the data arrives. - - results = exchange.D = [0] * N * N - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - exchange.start(calculate, i, j) - - # Wait for the results. - - print "Finishing..." - exchange.finish() - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple_start.py
file.)
Here, we have discarded two conveniences: the wrapping of callables using
-MakeParallel
, which lets us use functions without providing any
-channel parameters, and the management of callables using the
-manage
method on queues, exchanges, and so on. The
-start
method still calls the provided callable, but using a
-different notation from that employed previously.
Although many programs employ functions and other useful abstractions which -can be treated as parallelisable units, some programs perform computations -"inline", meaning that the code responsible appears directly within a loop or -related control-flow construct. Consider the following code:
- -- t = time.time() - - # Initialise an array. - - results = [0] * N * N - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - time.sleep(delay) - results[i*N+j] = i * N + j - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple.py
file.)
To simulate "work", as in the different versions of the
-calculate
function, we use the time.sleep
function
-(which does not actually do work, and which will cause a process to be
-descheduled in most cases, but which simulates the delay associated with work
-being done). This inline work, which must be performed sequentially in the
-above program, can be performed in parallel in a somewhat modified version of
-the program:
- t = time.time() - - # Initialise the results using a map with a limit on the number of - # channels/processes. - - results = pprocess.Map(limit=limit) - - # Perform the work. - # NOTE: Could use the with statement in the loop to package the - # NOTE: try...finally functionality. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - ch = results.create() - if ch: - try: # Calculation work. - - time.sleep(delay) - ch.send(i * N + j) - - finally: # Important finalisation. - - pprocess.exit(ch) - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple_create_map.py
file.)
Although seemingly more complicated, the bulk of the changes in this
-modified program are focused on obtaining a channel object, ch
,
-at the point where the computations are performed, and the wrapping of the
-computation code in a try
...finally
statement which
-ensures that the process associated with the channel exits when the
-computation is complete. In order for the results of these computations to be
-collected, a pprocess.Map
object is used, since it will maintain
-the results in the same order as the initiation of the computations which
-produced them.
One notable aspect of the above programs when parallelised is that each -invocation of a computation in parallel creates a new process in which the -computation is to be performed, regardless of whether existing processes had -just finished producing results and could theoretically have been asked to -perform new computations. In other words, processes were created and destroyed -instead of being reused.
- -However, we can request that processes be reused for computations by
-enabling the reuse
feature of exchange-like objects and employing
-suitable reusable callables. Consider this modified version of the simple_managed_map program:
- t = time.time() - - # Initialise the results using a map with a limit on the number of - # channels/processes. - - results = pprocess.Map(limit=limit, reuse=1) - - # Wrap the calculate function and manage it. - - calc = results.manage(pprocess.MakeReusable(calculate)) - - # Perform the work. - - print "Calculating..." - for i in range(0, N): - for j in range(0, N): - calc(i, j) - - # Show the results. - - for i in range(0, N): - for result in results[i*N:i*N+N]: - print result, - print - - print "Time taken:", time.time() - t -- -
(This code in context with import
statements and functions is
-found in the examples/simple_manage_map_reusable.py
file.)
By indicating that processes and channels shall be reused, and by wrapping
-the calculate
function with the necessary support, the
-computations may be performed in parallel using a pool of processes instead of
-creating a new process for each computation and then discarding it, only to
-create a new process for the next computation.
The following table indicates the features used in converting one -sequential example program to another parallel program:
- -Sequential Example | -Parallel Example | -Features Used | -
---|---|---|
simple_map | -simple_pmap | -pmap | -
simple1 | -simple_managed_map | -MakeParallel, Map, manage | -
simple2 | -simple_managed_queue | -MakeParallel, Queue, manage | -
simple_managed | -MakeParallel, Exchange (subclass), manage, finish | -|
simple_start | -Channel, Exchange (subclass), start, finish | -|
simple | -simple_create_map | -Channel, Map, create, exit | -