1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/docs/tutorial.html Sun Sep 16 20:46:34 2007 +0000
1.3 @@ -0,0 +1,626 @@
1.4 +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
1.5 +<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
1.6 +<head>
1.7 + <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
1.8 + <title>pprocess - Tutorial</title>
1.9 + <link href="styles.css" rel="stylesheet" type="text/css" />
1.10 +</head>
1.11 +<body>
1.12 +
1.13 +<h1>pprocess - Tutorial</h1>
1.14 +
1.15 +<p>The <code>pprocess</code> module provides several mechanisms for running
1.16 +Python code concurrently in several processes. The most straightforward way of
1.17 +making a program parallel-aware - that is, where the program can take
1.18 +advantage of more than one processor to simultaneously process data - is to
1.19 +use the <code>pmap</code> function.</p>
1.20 +
1.21 +<h2>Converting Map-Style Code</h2>
1.22 +
1.23 +<p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>
1.24 +
1.25 +<pre>
1.26 + t = time.time()
1.27 +
1.28 + # Initialise an array.
1.29 +
1.30 + sequence = []
1.31 + for i in range(0, N):
1.32 + for j in range(0, N):
1.33 + sequence.append((i, j))
1.34 +
1.35 + # Perform the work.
1.36 +
1.37 + results = map(calculate, sequence)
1.38 +
1.39 + # Show the results.
1.40 +
1.41 + for i in range(0, N):
1.42 + for result in results[i*N:i*N+N]:
1.43 + print result,
1.44 + print
1.45 +
1.46 + print "Time taken:", time.time() - t</pre>
1.47 +
1.48 +<p>(This code in context with <code>import</code> statements and functions is
1.49 +found in the <code>examples/simple_map.py</code> file.)</p>
1.50 +
1.51 +<p>The principal features of this program involve the preparation of an array
1.52 +for input purposes, and the use of the <code>map</code> function to iterate
1.53 +over the combinations of <code>i</code> and <code>j</code> in the array. Even
1.54 +if the <code>calculate</code> function could be invoked independently for each
1.55 +input value, we have to wait for each computation to complete before
1.56 +initiating a new one. The <code>calculate</code> function may be defined as
1.57 +follows:</p>
1.58 +
1.59 +<pre>
1.60 +def calculate(t):
1.61 +
1.62 + "A supposedly time-consuming calculation on 't'."
1.63 +
1.64 + i, j = t
1.65 + time.sleep(delay)
1.66 + return i * N + j
1.67 +</pre>
1.68 +
1.69 +<p>In order to reduce the processing time - to speed the code up, in other
1.70 +words - we can make this code use several processes instead of just one. Here
1.71 +is the modified code:</p>
1.72 +
1.73 +<pre>
1.74 + t = time.time()
1.75 +
1.76 + # Initialise an array.
1.77 +
1.78 + sequence = []
1.79 + for i in range(0, N):
1.80 + for j in range(0, N):
1.81 + sequence.append((i, j))
1.82 +
1.83 + # Perform the work.
1.84 +
1.85 + results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)
1.86 +
1.87 + # Show the results.
1.88 +
1.89 + for i in range(0, N):
1.90 + for result in results[i*N:i*N+N]:
1.91 + print result,
1.92 + print
1.93 +
1.94 + print "Time taken:", time.time() - t</pre>
1.95 +
1.96 +<p>(This code in context with <code>import</code> statements and functions is
1.97 +found in the <code>examples/simple_pmap.py</code> file.)</p>
1.98 +
1.99 +<p>By replacing usage of the <code>map</code> function with the
1.100 +<code>pprocess.pmap</code> function, and specifying the limit on the number of
1.101 +processes to be active at any given time (the value of the <code>limit</code>
1.102 +variable is defined elsewhere), several calculations can now be performed in
1.103 +parallel.</p>
1.104 +
1.105 +<h2>Converting Invocations to Parallel Operations</h2>
1.106 +
1.107 +<p>Although some programs make natural use of the <code>map</code> function,
1.108 +others may employ an invocation in a nested loop. This may also be converted
1.109 +to a parallel program. Consider the following Python code:</p>
1.110 +
1.111 +<pre>
1.112 + t = time.time()
1.113 +
1.114 + # Initialise an array.
1.115 +
1.116 + results = []
1.117 +
1.118 + # Perform the work.
1.119 +
1.120 + print "Calculating..."
1.121 + for i in range(0, N):
1.122 + for j in range(0, N):
1.123 + results.append(calculate(i, j))
1.124 +
1.125 + # Show the results.
1.126 +
1.127 + for i in range(0, N):
1.128 + for result in results[i*N:i*N+N]:
1.129 + print result,
1.130 + print
1.131 +
1.132 + print "Time taken:", time.time() - t</pre>
1.133 +
1.134 +<p>(This code in context with <code>import</code> statements and functions is
1.135 +found in the <code>examples/simple1.py</code> file.)</p>
1.136 +
1.137 +<p>Here, a computation in the <code>calculate</code> function is performed for
1.138 +each combination of <code>i</code> and <code>j</code> in the nested loop,
1.139 +returning a result value. However, we must wait for the completion of this
1.140 +function for each element before moving on to the next element, and this means
1.141 +that the computations are performed sequentially. Consequently, on a system
1.142 +with more than one processor, even if we could call <code>calculate</code> for
1.143 +more than one combination of <code>i</code> and <code>j</code><code></code>
1.144 +and have the computations executing at the same time, the above program will
1.145 +not take advantage of such capabilities.</p>
1.146 +
1.147 +<p>We use a slightly modified version of <code>calculate</code> which employs
1.148 +two parameters instead of one:</p>
1.149 +
1.150 +<pre>
1.151 +def calculate(i, j):
1.152 +
1.153 + """
1.154 + A supposedly time-consuming calculation on 'i' and 'j'.
1.155 + """
1.156 +
1.157 + time.sleep(delay)
1.158 + return i * N + j
1.159 +</pre>
1.160 +
1.161 +<p>In order to reduce the processing time - to speed the code up, in other
1.162 +words - we can make this code use several processes instead of just one. Here
1.163 +is the modified code:</p>
1.164 +
1.165 +<pre id="simple_managed_map">
1.166 + t = time.time()
1.167 +
1.168 + # Initialise the results using a map with a limit on the number of
1.169 + # channels/processes.
1.170 +
1.171 + <strong>results = pprocess.Map(limit=limit)</strong><code></code>
1.172 +
1.173 + # Wrap the calculate function and manage it.
1.174 +
1.175 + <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>
1.176 +
1.177 + # Perform the work.
1.178 +
1.179 + print "Calculating..."
1.180 + for i in range(0, N):
1.181 + for j in range(0, N):
1.182 + <strong>calc</strong>(i, j)
1.183 +
1.184 + # Show the results.
1.185 +
1.186 + for i in range(0, N):
1.187 + for result in results[i*N:i*N+N]:
1.188 + print result,
1.189 + print
1.190 +
1.191 + print "Time taken:", time.time() - t</pre>
1.192 +
1.193 +<p>(This code in context with <code>import</code> statements and functions is
1.194 +found in the <code>examples/simple_managed_map.py</code> file.)</p>
1.195 +
1.196 +<p>The principal changes in the above code involve the use of a
1.197 +<code>pprocess.Map</code> object to collect the results, and a version of the
1.198 +<code>calculate</code> function which is managed by the <code>Map</code>
1.199 +object. What the <code>Map</code> object does is to arrange the results of
1.200 +computations such that iterating over the object or accessing the object using
1.201 +list operations provides the results in the same order as their corresponding
1.202 +inputs.</p>
1.203 +
1.204 +<h2>Converting Arbitrarily-Ordered Invocations</h2>
1.205 +
1.206 +<p>In some programs, it is not important to receive the results of
1.207 +computations in any particular order, usually because either the order of
1.208 +these results is irrelevant, or because the results provide "positional"
1.209 +information which let them be handled in an appropriate way. Consider the
1.210 +following Python code:</p>
1.211 +
1.212 +<pre>
1.213 + t = time.time()
1.214 +
1.215 + # Initialise an array.
1.216 +
1.217 + results = [0] * N * N
1.218 +
1.219 + # Perform the work.
1.220 +
1.221 + print "Calculating..."
1.222 + for i in range(0, N):
1.223 + for j in range(0, N):
1.224 + i2, j2, result = calculate(i, j)
1.225 + results[i2*N+j2] = result
1.226 +
1.227 + # Show the results.
1.228 +
1.229 + for i in range(0, N):
1.230 + for result in results[i*N:i*N+N]:
1.231 + print result,
1.232 + print
1.233 +
1.234 + print "Time taken:", time.time() - t
1.235 +</pre>
1.236 +
1.237 +<p>(This code in context with <code>import</code> statements and functions is
1.238 +found in the <code>examples/simple2.py</code> file.)</p>
1.239 +
1.240 +<p>Here, a result array is initialised first and each computation is performed
1.241 +sequentially. A significant difference to the previous examples is the return
1.242 +value of the <code>calculate</code> function: the position details
1.243 +corresponding to <code>i</code> and <code>j</code> are returned alongside the
1.244 +result. Obviously, this is of limited value in the above code because the
1.245 +order of the computations and the reception of results is fixed. However, we
1.246 +get no benefit from parallelisation in the above example.</p>
1.247 +
1.248 +<p>We can bring the benefits of parallel processing to the above program with
1.249 +the following code:</p>
1.250 +
1.251 +<pre>
1.252 + t = time.time()
1.253 +
1.254 + # Initialise the communications queue with a limit on the number of
1.255 + # channels/processes.
1.256 +
1.257 + <strong>queue = pprocess.Queue(limit=limit)</strong>
1.258 +
1.259 + # Initialise an array.
1.260 +
1.261 + results = [0] * N * N
1.262 +
1.263 + # Wrap the calculate function and manage it.
1.264 +
1.265 + <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>
1.266 +
1.267 + # Perform the work.
1.268 +
1.269 + print "Calculating..."
1.270 + for i in range(0, N):
1.271 + for j in range(0, N):
1.272 + <strong>calc(i, j)</strong>
1.273 +
1.274 + # Store the results as they arrive.
1.275 +
1.276 + print "Finishing..."
1.277 + <strong>for i, j, result in queue:</strong>
1.278 + <strong>results[i*N+j] = result</strong>
1.279 +
1.280 + # Show the results.
1.281 +
1.282 + for i in range(0, N):
1.283 + for result in results[i*N:i*N+N]:
1.284 + print result,
1.285 + print
1.286 +
1.287 + print "Time taken:", time.time() - t
1.288 +</pre>
1.289 +
1.290 +<p>(This code in context with <code>import</code> statements and functions is
1.291 +found in the <code>examples/simple_managed_queue.py</code> file.)</p>
1.292 +
1.293 +<p>This revised code employs a <code>pprocess.Queue</code> object whose
1.294 +purpose is to collect the results of computations and to make them available
1.295 +in the order in which they were received. The code collecting results has been
1.296 +moved into a separate loop independent of the original computation loop and
1.297 +taking advantage of the more relevant "positional" information emerging from
1.298 +the queue.</p>
1.299 +
1.300 +<p>We can take this example further, illustrating some of the mechanisms
1.301 +employed by <code>pprocess</code>. Instead of collecting results in a queue,
1.302 +we can define a class containing a method which is called when new results
1.303 +arrive:</p>
1.304 +
1.305 +<pre>
1.306 +class MyExchange(pprocess.Exchange):
1.307 +
1.308 + "Parallel convenience class containing the array assignment operation."
1.309 +
1.310 + def store_data(self, ch):
1.311 + i, j, result = ch.receive()
1.312 + self.D[i*N+j] = result
1.313 +</pre>
1.314 +
1.315 +<p>This code exposes the channel paradigm which is used throughout
1.316 +<code>pprocess</code> and is available to applications, if desired. The effect
1.317 +of the method is the storage of a result received through the channel in an
1.318 +attribute of the object. The following code shows how this class can be used,
1.319 +with differences to the previous program illustrated:</p>
1.320 +
1.321 +<pre>
1.322 + t = time.time()
1.323 +
1.324 + # Initialise the communications exchange with a limit on the number of
1.325 + # channels/processes.
1.326 +
1.327 + <strong>exchange = MyExchange(limit=limit)</strong>
1.328 +
1.329 + # Initialise an array - it is stored in the exchange to permit automatic
1.330 + # assignment of values as the data arrives.
1.331 +
1.332 + <strong>results = exchange.D = [0] * N * N</strong>
1.333 +
1.334 + # Wrap the calculate function and manage it.
1.335 +
1.336 + calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))
1.337 +
1.338 + # Perform the work.
1.339 +
1.340 + print "Calculating..."
1.341 + for i in range(0, N):
1.342 + for j in range(0, N):
1.343 + calc(i, j)
1.344 +
1.345 + # Wait for the results.
1.346 +
1.347 + print "Finishing..."
1.348 + <strong>exchange.finish()</strong>
1.349 +
1.350 + # Show the results.
1.351 +
1.352 + for i in range(0, N):
1.353 + for result in results[i*N:i*N+N]:
1.354 + print result,
1.355 + print
1.356 +
1.357 + print "Time taken:", time.time() - t
1.358 +</pre>
1.359 +
1.360 +<p>(This code in context with <code>import</code> statements and functions is
1.361 +found in the <code>examples/simple_managed.py</code> file.)</p>
1.362 +
1.363 +<p>The main visible differences between this and the previous program are the
1.364 +storage of the result array in the exchange, the removal of the queue
1.365 +consumption code from the main program, placing the act of storing values in
1.366 +the exchange's <code>store_data</code> method, and the need to call the
1.367 +<code>finish</code> method on the <code>MyExchange</code> object so that we do
1.368 +not try and access the results too soon. One underlying benefit not visible in
1.369 +the above code is that we no longer need to accumulate results in a queue or
1.370 +other structure so that they may be processed and assigned to the correct
1.371 +positions in the result array.</p>
1.372 +
1.373 +<p>For the curious, we may remove some of the remaining conveniences of the
1.374 +above program to expose other features of <code>pprocess</code>. First, we
1.375 +define a slightly modified version of the <code>calculate</code> function:</p>
1.376 +
1.377 +<pre>
1.378 +def calculate(ch, i, j):
1.379 +
1.380 + """
1.381 + A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
1.382 + communicate with the parent process.
1.383 + """
1.384 +
1.385 + time.sleep(delay)
1.386 + ch.send((i, j, i * N + j))
1.387 +</pre>
1.388 +
1.389 +<p>This function accepts a channel, <code>ch</code>, through which results
1.390 +will be sent, and through which other values could potentially be received,
1.391 +although we choose not to do so here. The program using this function is as
1.392 +follows, with differences to the previous program illustrated:</p>
1.393 +
1.394 +<pre>
1.395 + t = time.time()
1.396 +
1.397 + # Initialise the communications exchange with a limit on the number of
1.398 + # channels/processes.
1.399 +
1.400 + exchange = MyExchange(limit=limit)
1.401 +
1.402 + # Initialise an array - it is stored in the exchange to permit automatic
1.403 + # assignment of values as the data arrives.
1.404 +
1.405 + results = exchange.D = [0] * N * N
1.406 +
1.407 + # Perform the work.
1.408 +
1.409 + print "Calculating..."
1.410 + for i in range(0, N):
1.411 + for j in range(0, N):
1.412 + <strong>exchange.start(calculate, i, j)</strong>
1.413 +
1.414 + # Wait for the results.
1.415 +
1.416 + print "Finishing..."
1.417 + exchange.finish()
1.418 +
1.419 + # Show the results.
1.420 +
1.421 + for i in range(0, N):
1.422 + for result in results[i*N:i*N+N]:
1.423 + print result,
1.424 + print
1.425 +
1.426 + print "Time taken:", time.time() - t
1.427 +</pre>
1.428 +
1.429 +<p>(This code in context with <code>import</code> statements and functions is
1.430 +found in the <code>examples/simple_start.py</code> file.)</p>
1.431 +
1.432 +<p>Here, we have discarded two conveniences: the wrapping of callables using
1.433 +<code>MakeParallel</code>, which lets us use functions without providing any
1.434 +channel parameters, and the management of callables using the
1.435 +<code>manage</code> method on queues, exchanges, and so on. The
1.436 +<code>start</code> method still calls the provided callable, but using a
1.437 +different notation from that employed previously.</p>
1.438 +
1.439 +<h2>Converting Inline Computations</h2>
1.440 +
1.441 +<p>Although many programs employ functions and other useful abstractions which
1.442 +can be treated as parallelisable units, some programs perform computations
1.443 +"inline", meaning that the code responsible appears directly within a loop or
1.444 +related control-flow construct. Consider the following code:</p>
1.445 +
1.446 +<pre>
1.447 + t = time.time()
1.448 +
1.449 + # Initialise an array.
1.450 +
1.451 + results = [0] * N * N
1.452 +
1.453 + # Perform the work.
1.454 +
1.455 + print "Calculating..."
1.456 + for i in range(0, N):
1.457 + for j in range(0, N):
1.458 + time.sleep(delay)
1.459 + results[i*N+j] = i * N + j
1.460 +
1.461 + # Show the results.
1.462 +
1.463 + for i in range(0, N):
1.464 + for result in results[i*N:i*N+N]:
1.465 + print result,
1.466 + print
1.467 +
1.468 + print "Time taken:", time.time() - t
1.469 +</pre>
1.470 +
1.471 +<p>(This code in context with <code>import</code> statements and functions is
1.472 +found in the <code>examples/simple.py</code> file.)</p>
1.473 +
1.474 +<p>To simulate "work", as in the different versions of the
1.475 +<code>calculate</code> function, we use the <code>time.sleep</code> function
1.476 +(which does not actually do work, and which will cause a process to be
1.477 +descheduled in most cases, but which simulates the delay associated with work
1.478 +being done). This inline work, which must be performed sequentially in the
1.479 +above program, can be performed in parallel in a somewhat modified version of
1.480 +the program:</p>
1.481 +
1.482 +<pre>
1.483 + t = time.time()
1.484 +
1.485 + # Initialise the results using a map with a limit on the number of
1.486 + # channels/processes.
1.487 +
1.488 + <strong>results = pprocess.Map(limit=limit)</strong>
1.489 +
1.490 + # Perform the work.
1.491 + # NOTE: Could use the with statement in the loop to package the
1.492 + # NOTE: try...finally functionality.
1.493 +
1.494 + print "Calculating..."
1.495 + for i in range(0, N):
1.496 + for j in range(0, N):
1.497 + <strong>ch = results.create()</strong>
1.498 + <strong>if ch:</strong>
1.499 + <strong>try: # Calculation work.</strong>
1.500 +
1.501 + time.sleep(delay)
1.502 + <strong>ch.send(i * N + j)</strong>
1.503 +
1.504 + <strong>finally: # Important finalisation.</strong>
1.505 +
1.506 + <strong>pprocess.exit(ch)</strong>
1.507 +
1.508 + # Show the results.
1.509 +
1.510 + for i in range(0, N):
1.511 + for result in results[i*N:i*N+N]:
1.512 + print result,
1.513 + print
1.514 +
1.515 + print "Time taken:", time.time() - t
1.516 +</pre>
1.517 +
1.518 +<p>(This code in context with <code>import</code> statements and functions is
1.519 +found in the <code>examples/simple_create_map.py</code> file.)</p>
1.520 +
1.521 +<p>Although seemingly more complicated, the bulk of the changes in this
1.522 +modified program are focused on obtaining a channel object, <code>ch</code>,
1.523 +at the point where the computations are performed, and the wrapping of the
1.524 +computation code in a <code>try</code>...<code>finally</code> statement which
1.525 +ensures that the process associated with the channel exits when the
1.526 +computation is complete. In order for the results of these computations to be
1.527 +collected, a <code>pprocess.Map</code> object is used, since it will maintain
1.528 +the results in the same order as the initiation of the computations which
1.529 +produced them.</p>
1.530 +
1.531 +<h2>Reusing Processes in Parallel Programs</h2>
1.532 +
1.533 +<p>One notable aspect of the above programs when parallelised is that each
1.534 +invocation of a computation in parallel creates a new process in which the
1.535 +computation is to be performed, regardless of whether existing processes had
1.536 +just finished producing results and could theoretically have been asked to
1.537 +perform new computations. In other words, processes were created and destroyed
1.538 +instead of being reused.</p>
1.539 +
1.540 +<p>However, we can request that processes be reused for computations by
1.541 +enabling the <code>reuse</code> feature of exchange-like objects and employing
1.542 +suitable reusable callables. Consider this modified version of the <a
1.543 +href="#simple_managed_map">simple_managed_map</a> program:</p>
1.544 +
1.545 +<pre>
1.546 + t = time.time()
1.547 +
1.548 + # Initialise the results using a map with a limit on the number of
1.549 + # channels/processes.
1.550 +
1.551 + results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)
1.552 +
1.553 + # Wrap the calculate function and manage it.
1.554 +
1.555 + calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))
1.556 +
1.557 + # Perform the work.
1.558 +
1.559 + print "Calculating..."
1.560 + for i in range(0, N):
1.561 + for j in range(0, N):
1.562 + calc(i, j)
1.563 +
1.564 + # Show the results.
1.565 +
1.566 + for i in range(0, N):
1.567 + for result in results[i*N:i*N+N]:
1.568 + print result,
1.569 + print
1.570 +
1.571 + print "Time taken:", time.time() - t
1.572 +</pre>
1.573 +
1.574 +<p>(This code in context with <code>import</code> statements and functions is
1.575 +found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>
1.576 +
1.577 +<p>By indicating that processes and channels shall be reused, and by wrapping
1.578 +the <code>calculate</code> function with the necessary support, the
1.579 +computations may be performed in parallel using a pool of processes instead of
1.580 +creating a new process for each computation and then discarding it, only to
1.581 +create a new process for the next computation.</p>
1.582 +
1.583 +<h2>Summary</h2>
1.584 +
1.585 +<p>The following table indicates the features used in converting one
1.586 +sequential example program to another parallel program:</p>
1.587 +
1.588 +<table border="1" cellspacing="0" cellpadding="5">
1.589 + <thead>
1.590 + <tr>
1.591 + <th>Sequential Example</th>
1.592 + <th>Parallel Example</th>
1.593 + <th>Features Used</th>
1.594 + </tr>
1.595 + </thead>
1.596 + <tbody>
1.597 + <tr>
1.598 + <td>simple_map</td>
1.599 + <td>simple_pmap</td>
1.600 + <td>pmap</td>
1.601 + </tr>
1.602 + <tr>
1.603 + <td>simple1</td>
1.604 + <td>simple_managed_map</td>
1.605 + <td>MakeParallel, Map, manage</td>
1.606 + </tr>
1.607 + <tr>
1.608 + <td rowspan="3">simple2</td>
1.609 + <td>simple_managed_queue</td>
1.610 + <td>MakeParallel, Queue, manage</td>
1.611 + </tr>
1.612 + <tr>
1.613 + <td>simple_managed</td>
1.614 + <td>MakeParallel, Exchange (subclass), manage, finish</td>
1.615 + </tr>
1.616 + <tr>
1.617 + <td>simple_start</td>
1.618 + <td>Channel, Exchange (subclass), start, finish</td>
1.619 + </tr>
1.620 + <tr>
1.621 + <td>simple</td>
1.622 + <td>simple_create_map</td>
1.623 + <td>Channel, Map, create, exit</td>
1.624 + </tr>
1.625 + </tbody>
1.626 +</table>
1.627 +
1.628 +</body>
1.629 +</html>
2.1 --- a/docs/tutorial.xhtml Sun Sep 16 19:38:51 2007 +0000
2.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
2.3 @@ -1,626 +0,0 @@
2.4 -<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
2.5 -<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
2.6 -<head>
2.7 - <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
2.8 - <title>pprocess - Tutorial</title>
2.9 - <link href="styles.css" rel="stylesheet" type="text/css" />
2.10 -</head>
2.11 -<body>
2.12 -
2.13 -<h1>pprocess - Tutorial</h1>
2.14 -
2.15 -<p>The <code>pprocess</code> module provides several mechanisms for running
2.16 -Python code concurrently in several processes. The most straightforward way of
2.17 -making a program parallel-aware - that is, where the program can take
2.18 -advantage of more than one processor to simultaneously process data - is to
2.19 -use the <code>pmap</code> function.</p>
2.20 -
2.21 -<h2>Converting Map-Style Code</h2>
2.22 -
2.23 -<p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>
2.24 -
2.25 -<pre>
2.26 - t = time.time()
2.27 -
2.28 - # Initialise an array.
2.29 -
2.30 - sequence = []
2.31 - for i in range(0, N):
2.32 - for j in range(0, N):
2.33 - sequence.append((i, j))
2.34 -
2.35 - # Perform the work.
2.36 -
2.37 - results = map(calculate, sequence)
2.38 -
2.39 - # Show the results.
2.40 -
2.41 - for i in range(0, N):
2.42 - for result in results[i*N:i*N+N]:
2.43 - print result,
2.44 - print
2.45 -
2.46 - print "Time taken:", time.time() - t</pre>
2.47 -
2.48 -<p>(This code in context with <code>import</code> statements and functions is
2.49 -found in the <code>examples/simple_map.py</code> file.)</p>
2.50 -
2.51 -<p>The principal features of this program involve the preparation of an array
2.52 -for input purposes, and the use of the <code>map</code> function to iterate
2.53 -over the combinations of <code>i</code> and <code>j</code> in the array. Even
2.54 -if the <code>calculate</code> function could be invoked independently for each
2.55 -input value, we have to wait for each computation to complete before
2.56 -initiating a new one. The <code>calculate</code> function may be defined as
2.57 -follows:</p>
2.58 -
2.59 -<pre>
2.60 -def calculate(t):
2.61 -
2.62 - "A supposedly time-consuming calculation on 't'."
2.63 -
2.64 - i, j = t
2.65 - time.sleep(delay)
2.66 - return i * N + j
2.67 -</pre>
2.68 -
2.69 -<p>In order to reduce the processing time - to speed the code up, in other
2.70 -words - we can make this code use several processes instead of just one. Here
2.71 -is the modified code:</p>
2.72 -
2.73 -<pre>
2.74 - t = time.time()
2.75 -
2.76 - # Initialise an array.
2.77 -
2.78 - sequence = []
2.79 - for i in range(0, N):
2.80 - for j in range(0, N):
2.81 - sequence.append((i, j))
2.82 -
2.83 - # Perform the work.
2.84 -
2.85 - results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)
2.86 -
2.87 - # Show the results.
2.88 -
2.89 - for i in range(0, N):
2.90 - for result in results[i*N:i*N+N]:
2.91 - print result,
2.92 - print
2.93 -
2.94 - print "Time taken:", time.time() - t</pre>
2.95 -
2.96 -<p>(This code in context with <code>import</code> statements and functions is
2.97 -found in the <code>examples/simple_pmap.py</code> file.)</p>
2.98 -
2.99 -<p>By replacing usage of the <code>map</code> function with the
2.100 -<code>pprocess.pmap</code> function, and specifying the limit on the number of
2.101 -processes to be active at any given time (the value of the <code>limit</code>
2.102 -variable is defined elsewhere), several calculations can now be performed in
2.103 -parallel.</p>
2.104 -
2.105 -<h2>Converting Invocations to Parallel Operations</h2>
2.106 -
2.107 -<p>Although some programs make natural use of the <code>map</code> function,
2.108 -others may employ an invocation in a nested loop. This may also be converted
2.109 -to a parallel program. Consider the following Python code:</p>
2.110 -
2.111 -<pre>
2.112 - t = time.time()
2.113 -
2.114 - # Initialise an array.
2.115 -
2.116 - results = []
2.117 -
2.118 - # Perform the work.
2.119 -
2.120 - print "Calculating..."
2.121 - for i in range(0, N):
2.122 - for j in range(0, N):
2.123 - results.append(calculate(i, j))
2.124 -
2.125 - # Show the results.
2.126 -
2.127 - for i in range(0, N):
2.128 - for result in results[i*N:i*N+N]:
2.129 - print result,
2.130 - print
2.131 -
2.132 - print "Time taken:", time.time() - t</pre>
2.133 -
2.134 -<p>(This code in context with <code>import</code> statements and functions is
2.135 -found in the <code>examples/simple1.py</code> file.)</p>
2.136 -
2.137 -<p>Here, a computation in the <code>calculate</code> function is performed for
2.138 -each combination of <code>i</code> and <code>j</code> in the nested loop,
2.139 -returning a result value. However, we must wait for the completion of this
2.140 -function for each element before moving on to the next element, and this means
2.141 -that the computations are performed sequentially. Consequently, on a system
2.142 -with more than one processor, even if we could call <code>calculate</code> for
2.143 -more than one combination of <code>i</code> and <code>j</code><code></code>
2.144 -and have the computations executing at the same time, the above program will
2.145 -not take advantage of such capabilities.</p>
2.146 -
2.147 -<p>We use a slightly modified version of <code>calculate</code> which employs
2.148 -two parameters instead of one:</p>
2.149 -
2.150 -<pre>
2.151 -def calculate(i, j):
2.152 -
2.153 - """
2.154 - A supposedly time-consuming calculation on 'i' and 'j'.
2.155 - """
2.156 -
2.157 - time.sleep(delay)
2.158 - return i * N + j
2.159 -</pre>
2.160 -
2.161 -<p>In order to reduce the processing time - to speed the code up, in other
2.162 -words - we can make this code use several processes instead of just one. Here
2.163 -is the modified code:</p>
2.164 -
2.165 -<pre id="simple_managed_map">
2.166 - t = time.time()
2.167 -
2.168 - # Initialise the results using a map with a limit on the number of
2.169 - # channels/processes.
2.170 -
2.171 - <strong>results = pprocess.Map(limit=limit)</strong><code></code>
2.172 -
2.173 - # Wrap the calculate function and manage it.
2.174 -
2.175 - <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>
2.176 -
2.177 - # Perform the work.
2.178 -
2.179 - print "Calculating..."
2.180 - for i in range(0, N):
2.181 - for j in range(0, N):
2.182 - <strong>calc</strong>(i, j)
2.183 -
2.184 - # Show the results.
2.185 -
2.186 - for i in range(0, N):
2.187 - for result in results[i*N:i*N+N]:
2.188 - print result,
2.189 - print
2.190 -
2.191 - print "Time taken:", time.time() - t</pre>
2.192 -
2.193 -<p>(This code in context with <code>import</code> statements and functions is
2.194 -found in the <code>examples/simple_managed_map.py</code> file.)</p>
2.195 -
2.196 -<p>The principal changes in the above code involve the use of a
2.197 -<code>pprocess.Map</code> object to collect the results, and a version of the
2.198 -<code>calculate</code> function which is managed by the <code>Map</code>
2.199 -object. What the <code>Map</code> object does is to arrange the results of
2.200 -computations such that iterating over the object or accessing the object using
2.201 -list operations provides the results in the same order as their corresponding
2.202 -inputs.</p>
2.203 -
2.204 -<h2>Converting Arbitrarily-Ordered Invocations</h2>
2.205 -
2.206 -<p>In some programs, it is not important to receive the results of
2.207 -computations in any particular order, usually because either the order of
2.208 -these results is irrelevant, or because the results provide "positional"
2.209 -information which let them be handled in an appropriate way. Consider the
2.210 -following Python code:</p>
2.211 -
2.212 -<pre>
2.213 - t = time.time()
2.214 -
2.215 - # Initialise an array.
2.216 -
2.217 - results = [0] * N * N
2.218 -
2.219 - # Perform the work.
2.220 -
2.221 - print "Calculating..."
2.222 - for i in range(0, N):
2.223 - for j in range(0, N):
2.224 - i2, j2, result = calculate(i, j)
2.225 - results[i2*N+j2] = result
2.226 -
2.227 - # Show the results.
2.228 -
2.229 - for i in range(0, N):
2.230 - for result in results[i*N:i*N+N]:
2.231 - print result,
2.232 - print
2.233 -
2.234 - print "Time taken:", time.time() - t
2.235 -</pre>
2.236 -
2.237 -<p>(This code in context with <code>import</code> statements and functions is
2.238 -found in the <code>examples/simple2.py</code> file.)</p>
2.239 -
2.240 -<p>Here, a result array is initialised first and each computation is performed
2.241 -sequentially. A significant difference to the previous examples is the return
2.242 -value of the <code>calculate</code> function: the position details
2.243 -corresponding to <code>i</code> and <code>j</code> are returned alongside the
2.244 -result. Obviously, this is of limited value in the above code because the
2.245 -order of the computations and the reception of results is fixed. However, we
2.246 -get no benefit from parallelisation in the above example.</p>
2.247 -
2.248 -<p>We can bring the benefits of parallel processing to the above program with
2.249 -the following code:</p>
2.250 -
2.251 -<pre>
2.252 - t = time.time()
2.253 -
2.254 - # Initialise the communications queue with a limit on the number of
2.255 - # channels/processes.
2.256 -
2.257 - <strong>queue = pprocess.Queue(limit=limit)</strong>
2.258 -
2.259 - # Initialise an array.
2.260 -
2.261 - results = [0] * N * N
2.262 -
2.263 - # Wrap the calculate function and manage it.
2.264 -
2.265 - <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>
2.266 -
2.267 - # Perform the work.
2.268 -
2.269 - print "Calculating..."
2.270 - for i in range(0, N):
2.271 - for j in range(0, N):
2.272 - <strong>calc(i, j)</strong>
2.273 -
2.274 - # Store the results as they arrive.
2.275 -
2.276 - print "Finishing..."
2.277 - <strong>for i, j, result in queue:</strong>
2.278 - <strong>results[i*N+j] = result</strong>
2.279 -
2.280 - # Show the results.
2.281 -
2.282 - for i in range(0, N):
2.283 - for result in results[i*N:i*N+N]:
2.284 - print result,
2.285 - print
2.286 -
2.287 - print "Time taken:", time.time() - t
2.288 -</pre>
2.289 -
2.290 -<p>(This code in context with <code>import</code> statements and functions is
2.291 -found in the <code>examples/simple_managed_queue.py</code> file.)</p>
2.292 -
2.293 -<p>This revised code employs a <code>pprocess.Queue</code> object whose
2.294 -purpose is to collect the results of computations and to make them available
2.295 -in the order in which they were received. The code collecting results has been
2.296 -moved into a separate loop independent of the original computation loop and
2.297 -taking advantage of the more relevant "positional" information emerging from
2.298 -the queue.</p>
2.299 -
2.300 -<p>We can take this example further, illustrating some of the mechanisms
2.301 -employed by <code>pprocess</code>. Instead of collecting results in a queue,
2.302 -we can define a class containing a method which is called when new results
2.303 -arrive:</p>
2.304 -
2.305 -<pre>
2.306 -class MyExchange(pprocess.Exchange):
2.307 -
2.308 - "Parallel convenience class containing the array assignment operation."
2.309 -
2.310 - def store_data(self, ch):
2.311 - i, j, result = ch.receive()
2.312 - self.D[i*N+j] = result
2.313 -</pre>
2.314 -
2.315 -<p>This code exposes the channel paradigm which is used throughout
2.316 -<code>pprocess</code> and is available to applications, if desired. The effect
2.317 -of the method is the storage of a result received through the channel in an
2.318 -attribute of the object. The following code shows how this class can be used,
2.319 -with differences to the previous program illustrated:</p>
2.320 -
2.321 -<pre>
2.322 - t = time.time()
2.323 -
2.324 - # Initialise the communications exchange with a limit on the number of
2.325 - # channels/processes.
2.326 -
2.327 - <strong>exchange = MyExchange(limit=limit)</strong>
2.328 -
2.329 - # Initialise an array - it is stored in the exchange to permit automatic
2.330 - # assignment of values as the data arrives.
2.331 -
2.332 - <strong>results = exchange.D = [0] * N * N</strong>
2.333 -
2.334 - # Wrap the calculate function and manage it.
2.335 -
2.336 - calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))
2.337 -
2.338 - # Perform the work.
2.339 -
2.340 - print "Calculating..."
2.341 - for i in range(0, N):
2.342 - for j in range(0, N):
2.343 - calc(i, j)
2.344 -
2.345 - # Wait for the results.
2.346 -
2.347 - print "Finishing..."
2.348 - <strong>exchange.finish()</strong>
2.349 -
2.350 - # Show the results.
2.351 -
2.352 - for i in range(0, N):
2.353 - for result in results[i*N:i*N+N]:
2.354 - print result,
2.355 - print
2.356 -
2.357 - print "Time taken:", time.time() - t
2.358 -</pre>
2.359 -
2.360 -<p>(This code in context with <code>import</code> statements and functions is
2.361 -found in the <code>examples/simple_managed.py</code> file.)</p>
2.362 -
2.363 -<p>The main visible differences between this and the previous program are the
2.364 -storage of the result array in the exchange, the removal of the queue
2.365 -consumption code from the main program, placing the act of storing values in
2.366 -the exchange's <code>store_data</code> method, and the need to call the
2.367 -<code>finish</code> method on the <code>MyExchange</code> object so that we do
2.368 -not try and access the results too soon. One underlying benefit not visible in
2.369 -the above code is that we no longer need to accumulate results in a queue or
2.370 -other structure so that they may be processed and assigned to the correct
2.371 -positions in the result array.</p>
2.372 -
2.373 -<p>For the curious, we may remove some of the remaining conveniences of the
2.374 -above program to expose other features of <code>pprocess</code>. First, we
2.375 -define a slightly modified version of the <code>calculate</code> function:</p>
2.376 -
2.377 -<pre>
2.378 -def calculate(ch, i, j):
2.379 -
2.380 - """
2.381 - A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
2.382 - communicate with the parent process.
2.383 - """
2.384 -
2.385 - time.sleep(delay)
2.386 - ch.send((i, j, i * N + j))
2.387 -</pre>
2.388 -
2.389 -<p>This function accepts a channel, <code>ch</code>, through which results
2.390 -will be sent, and through which other values could potentially be received,
2.391 -although we choose not to do so here. The program using this function is as
2.392 -follows, with differences to the previous program illustrated:</p>
2.393 -
2.394 -<pre>
2.395 - t = time.time()
2.396 -
2.397 - # Initialise the communications exchange with a limit on the number of
2.398 - # channels/processes.
2.399 -
2.400 - exchange = MyExchange(limit=limit)
2.401 -
2.402 - # Initialise an array - it is stored in the exchange to permit automatic
2.403 - # assignment of values as the data arrives.
2.404 -
2.405 - results = exchange.D = [0] * N * N
2.406 -
2.407 - # Perform the work.
2.408 -
2.409 - print "Calculating..."
2.410 - for i in range(0, N):
2.411 - for j in range(0, N):
2.412 - <strong>exchange.start(calculate, i, j)</strong>
2.413 -
2.414 - # Wait for the results.
2.415 -
2.416 - print "Finishing..."
2.417 - exchange.finish()
2.418 -
2.419 - # Show the results.
2.420 -
2.421 - for i in range(0, N):
2.422 - for result in results[i*N:i*N+N]:
2.423 - print result,
2.424 - print
2.425 -
2.426 - print "Time taken:", time.time() - t
2.427 -</pre>
2.428 -
2.429 -<p>(This code in context with <code>import</code> statements and functions is
2.430 -found in the <code>examples/simple_start.py</code> file.)</p>
2.431 -
2.432 -<p>Here, we have discarded two conveniences: the wrapping of callables using
2.433 -<code>MakeParallel</code>, which lets us use functions without providing any
2.434 -channel parameters, and the management of callables using the
2.435 -<code>manage</code> method on queues, exchanges, and so on. The
2.436 -<code>start</code> method still calls the provided callable, but using a
2.437 -different notation from that employed previously.</p>
2.438 -
2.439 -<h2>Converting Inline Computations</h2>
2.440 -
2.441 -<p>Although many programs employ functions and other useful abstractions which
2.442 -can be treated as parallelisable units, some programs perform computations
2.443 -"inline", meaning that the code responsible appears directly within a loop or
2.444 -related control-flow construct. Consider the following code:</p>
2.445 -
2.446 -<pre>
2.447 - t = time.time()
2.448 -
2.449 - # Initialise an array.
2.450 -
2.451 - results = [0] * N * N
2.452 -
2.453 - # Perform the work.
2.454 -
2.455 - print "Calculating..."
2.456 - for i in range(0, N):
2.457 - for j in range(0, N):
2.458 - time.sleep(delay)
2.459 - results[i*N+j] = i * N + j
2.460 -
2.461 - # Show the results.
2.462 -
2.463 - for i in range(0, N):
2.464 - for result in results[i*N:i*N+N]:
2.465 - print result,
2.466 - print
2.467 -
2.468 - print "Time taken:", time.time() - t
2.469 -</pre>
2.470 -
2.471 -<p>(This code in context with <code>import</code> statements and functions is
2.472 -found in the <code>examples/simple.py</code> file.)</p>
2.473 -
2.474 -<p>To simulate "work", as in the different versions of the
2.475 -<code>calculate</code> function, we use the <code>time.sleep</code> function
2.476 -(which does not actually do work, and which will cause a process to be
2.477 -descheduled in most cases, but which simulates the delay associated with work
2.478 -being done). This inline work, which must be performed sequentially in the
2.479 -above program, can be performed in parallel in a somewhat modified version of
2.480 -the program:</p>
2.481 -
2.482 -<pre>
2.483 - t = time.time()
2.484 -
2.485 - # Initialise the results using a map with a limit on the number of
2.486 - # channels/processes.
2.487 -
2.488 - <strong>results = pprocess.Map(limit=limit)</strong>
2.489 -
2.490 - # Perform the work.
2.491 - # NOTE: Could use the with statement in the loop to package the
2.492 - # NOTE: try...finally functionality.
2.493 -
2.494 - print "Calculating..."
2.495 - for i in range(0, N):
2.496 - for j in range(0, N):
2.497 - <strong>ch = results.create()</strong>
2.498 - <strong>if ch:</strong>
2.499 - <strong>try: # Calculation work.</strong>
2.500 -
2.501 - time.sleep(delay)
2.502 - <strong>ch.send(i * N + j)</strong>
2.503 -
2.504 - <strong>finally: # Important finalisation.</strong>
2.505 -
2.506 - <strong>pprocess.exit(ch)</strong>
2.507 -
2.508 - # Show the results.
2.509 -
2.510 - for i in range(0, N):
2.511 - for result in results[i*N:i*N+N]:
2.512 - print result,
2.513 - print
2.514 -
2.515 - print "Time taken:", time.time() - t
2.516 -</pre>
2.517 -
2.518 -<p>(This code in context with <code>import</code> statements and functions is
2.519 -found in the <code>examples/simple_create_map.py</code> file.)</p>
2.520 -
2.521 -<p>Although seemingly more complicated, the bulk of the changes in this
2.522 -modified program are focused on obtaining a channel object, <code>ch</code>,
2.523 -at the point where the computations are performed, and the wrapping of the
2.524 -computation code in a <code>try</code>...<code>finally</code> statement which
2.525 -ensures that the process associated with the channel exits when the
2.526 -computation is complete. In order for the results of these computations to be
2.527 -collected, a <code>pprocess.Map</code> object is used, since it will maintain
2.528 -the results in the same order as the initiation of the computations which
2.529 -produced them.</p>
2.530 -
2.531 -<h2>Reusing Processes in Parallel Programs</h2>
2.532 -
2.533 -<p>One notable aspect of the above programs when parallelised is that each
2.534 -invocation of a computation in parallel creates a new process in which the
2.535 -computation is to be performed, regardless of whether existing processes had
2.536 -just finished producing results and could theoretically have been asked to
2.537 -perform new computations. In other words, processes were created and destroyed
2.538 -instead of being reused.</p>
2.539 -
2.540 -<p>However, we can request that processes be reused for computations by
2.541 -enabling the <code>reuse</code> feature of exchange-like objects and employing
2.542 -suitable reusable callables. Consider this modified version of the <a
2.543 -href="#simple_managed_map">simple_managed_map</a> program:</p>
2.544 -
2.545 -<pre>
2.546 - t = time.time()
2.547 -
2.548 - # Initialise the results using a map with a limit on the number of
2.549 - # channels/processes.
2.550 -
2.551 - results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)
2.552 -
2.553 - # Wrap the calculate function and manage it.
2.554 -
2.555 - calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))
2.556 -
2.557 - # Perform the work.
2.558 -
2.559 - print "Calculating..."
2.560 - for i in range(0, N):
2.561 - for j in range(0, N):
2.562 - calc(i, j)
2.563 -
2.564 - # Show the results.
2.565 -
2.566 - for i in range(0, N):
2.567 - for result in results[i*N:i*N+N]:
2.568 - print result,
2.569 - print
2.570 -
2.571 - print "Time taken:", time.time() - t
2.572 -</pre>
2.573 -
2.574 -<p>(This code in context with <code>import</code> statements and functions is
2.575 -found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>
2.576 -
2.577 -<p>By indicating that processes and channels shall be reused, and by wrapping
2.578 -the <code>calculate</code> function with the necessary support, the
2.579 -computations may be performed in parallel using a pool of processes instead of
2.580 -creating a new process for each computation and then discarding it, only to
2.581 -create a new process for the next computation.</p>
2.582 -
2.583 -<h2>Summary</h2>
2.584 -
2.585 -<p>The following table indicates the features used in converting one
2.586 -sequential example program to another parallel program:</p>
2.587 -
2.588 -<table border="1" cellspacing="0" cellpadding="5">
2.589 - <thead>
2.590 - <tr>
2.591 - <th>Sequential Example</th>
2.592 - <th>Parallel Example</th>
2.593 - <th>Features Used</th>
2.594 - </tr>
2.595 - </thead>
2.596 - <tbody>
2.597 - <tr>
2.598 - <td>simple_map</td>
2.599 - <td>simple_pmap</td>
2.600 - <td>pmap</td>
2.601 - </tr>
2.602 - <tr>
2.603 - <td>simple1</td>
2.604 - <td>simple_managed_map</td>
2.605 - <td>MakeParallel, Map, manage</td>
2.606 - </tr>
2.607 - <tr>
2.608 - <td rowspan="3">simple2</td>
2.609 - <td>simple_managed_queue</td>
2.610 - <td>MakeParallel, Queue, manage</td>
2.611 - </tr>
2.612 - <tr>
2.613 - <td>simple_managed</td>
2.614 - <td>MakeParallel, Exchange (subclass), manage, finish</td>
2.615 - </tr>
2.616 - <tr>
2.617 - <td>simple_start</td>
2.618 - <td>Channel, Exchange (subclass), start, finish</td>
2.619 - </tr>
2.620 - <tr>
2.621 - <td>simple</td>
2.622 - <td>simple_create_map</td>
2.623 - <td>Channel, Map, create, exit</td>
2.624 - </tr>
2.625 - </tbody>
2.626 -</table>
2.627 -
2.628 -</body>
2.629 -</html>