# HG changeset patch
# User Paul Boddie
We can take this example further, illustrating some of the mechanisms
employed by pprocess
. Instead of collecting results in a queue,
we can define a class containing a method which is called when new results
@@ -382,6 +389,8 @@
other structure so that they may be processed and assigned to the correct
positions in the result array.
For the curious, we may remove some of the remaining conveniences of the
above program to expose other features of pprocess
. First, we
define a slightly modified version of the calculate
function:
Although reusable processes offer the opportunity to invoke a callable over
+and over within the same created process, they do not fully support the
+potential of the underlying mechanisms in pprocess
: created
+processes can communicate multiple values to the creating process and can
+theoretically run within the same callable forever.
Consider this modified form of the calculate
function:
+def calculate(ch, i): + + """ + A supposedly time-consuming calculation on 'i'. + """ + + for j in range(0, N): + time.sleep(delay) + ch.send((i, j, i * N + j)) ++ +
This function accepts a channel ch
together with an argument
+i
corresponding to an entire row of the input array, as opposed
+to having two arguments (i
and j
) corresponding to a
+single cell in the input array. In this function, a series of calculations are
+performed and a number of values are returned through the channel, without the
+function terminating until all values have been returned for the row data.
To use this modified function, a modified version of the +simple_managed_queue program is used:
+ ++ t = time.time() + + # Initialise the communications queue with a limit on the number of + # channels/processes. + + queue = pprocess.Queue(limit=limit, continuous=1) + + # Initialise an array. + + results = [0] * N * N + + # Manage the calculate function. + + calc = queue.manage(calculate) + + # Perform the work. + + print "Calculating..." + for i in range(0, N): + calc(i) + + # Store the results as they arrive. + + print "Finishing..." + for i, j, result in queue: + results[i*N+j] = result + + # Show the results. + + for i in range(0, N): + for result in results[i*N:i*N+N]: + print result, + print + + print "Time taken:", time.time() - t ++ +
(This code in context with import
statements and functions is
+found in the examples/simple_continuous_queue.py
file.)
Although the inner loop in the work section relocated to the
+calculate
function, the queue still receives outputs from that
+function with positional information and a result for the result array. Thus,
+no change is needed for the retrieval of the results: they arrive in the queue
+as before.
Occasionally, it is desirable to initiate time-consuming computations and to @@ -714,10 +802,6 @@ be collected by the queue: a list containing all of the results produced in the computation.
-To be written.
-In the above example, a single background process was used to manage a number @@ -887,11 +971,15 @@