1.1 --- a/docs/tutorial.xhtml Sun Sep 16 19:38:51 2007 +0000
1.2 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000
1.3 @@ -1,626 +0,0 @@
1.4 -<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
1.5 -<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
1.6 -<head>
1.7 - <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
1.8 - <title>pprocess - Tutorial</title>
1.9 - <link href="styles.css" rel="stylesheet" type="text/css" />
1.10 -</head>
1.11 -<body>
1.12 -
1.13 -<h1>pprocess - Tutorial</h1>
1.14 -
1.15 -<p>The <code>pprocess</code> module provides several mechanisms for running
1.16 -Python code concurrently in several processes. The most straightforward way of
1.17 -making a program parallel-aware - that is, where the program can take
1.18 -advantage of more than one processor to simultaneously process data - is to
1.19 -use the <code>pmap</code> function.</p>
1.20 -
1.21 -<h2>Converting Map-Style Code</h2>
1.22 -
1.23 -<p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>
1.24 -
1.25 -<pre>
1.26 - t = time.time()
1.27 -
1.28 - # Initialise an array.
1.29 -
1.30 - sequence = []
1.31 - for i in range(0, N):
1.32 - for j in range(0, N):
1.33 - sequence.append((i, j))
1.34 -
1.35 - # Perform the work.
1.36 -
1.37 - results = map(calculate, sequence)
1.38 -
1.39 - # Show the results.
1.40 -
1.41 - for i in range(0, N):
1.42 - for result in results[i*N:i*N+N]:
1.43 - print result,
1.44 - print
1.45 -
1.46 - print "Time taken:", time.time() - t</pre>
1.47 -
1.48 -<p>(This code in context with <code>import</code> statements and functions is
1.49 -found in the <code>examples/simple_map.py</code> file.)</p>
1.50 -
1.51 -<p>The principal features of this program involve the preparation of an array
1.52 -for input purposes, and the use of the <code>map</code> function to iterate
1.53 -over the combinations of <code>i</code> and <code>j</code> in the array. Even
1.54 -if the <code>calculate</code> function could be invoked independently for each
1.55 -input value, we have to wait for each computation to complete before
1.56 -initiating a new one. The <code>calculate</code> function may be defined as
1.57 -follows:</p>
1.58 -
1.59 -<pre>
1.60 -def calculate(t):
1.61 -
1.62 - "A supposedly time-consuming calculation on 't'."
1.63 -
1.64 - i, j = t
1.65 - time.sleep(delay)
1.66 - return i * N + j
1.67 -</pre>
1.68 -
1.69 -<p>In order to reduce the processing time - to speed the code up, in other
1.70 -words - we can make this code use several processes instead of just one. Here
1.71 -is the modified code:</p>
1.72 -
1.73 -<pre>
1.74 - t = time.time()
1.75 -
1.76 - # Initialise an array.
1.77 -
1.78 - sequence = []
1.79 - for i in range(0, N):
1.80 - for j in range(0, N):
1.81 - sequence.append((i, j))
1.82 -
1.83 - # Perform the work.
1.84 -
1.85 - results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)
1.86 -
1.87 - # Show the results.
1.88 -
1.89 - for i in range(0, N):
1.90 - for result in results[i*N:i*N+N]:
1.91 - print result,
1.92 - print
1.93 -
1.94 - print "Time taken:", time.time() - t</pre>
1.95 -
1.96 -<p>(This code in context with <code>import</code> statements and functions is
1.97 -found in the <code>examples/simple_pmap.py</code> file.)</p>
1.98 -
1.99 -<p>By replacing usage of the <code>map</code> function with the
1.100 -<code>pprocess.pmap</code> function, and specifying the limit on the number of
1.101 -processes to be active at any given time (the value of the <code>limit</code>
1.102 -variable is defined elsewhere), several calculations can now be performed in
1.103 -parallel.</p>
1.104 -
1.105 -<h2>Converting Invocations to Parallel Operations</h2>
1.106 -
1.107 -<p>Although some programs make natural use of the <code>map</code> function,
1.108 -others may employ an invocation in a nested loop. This may also be converted
1.109 -to a parallel program. Consider the following Python code:</p>
1.110 -
1.111 -<pre>
1.112 - t = time.time()
1.113 -
1.114 - # Initialise an array.
1.115 -
1.116 - results = []
1.117 -
1.118 - # Perform the work.
1.119 -
1.120 - print "Calculating..."
1.121 - for i in range(0, N):
1.122 - for j in range(0, N):
1.123 - results.append(calculate(i, j))
1.124 -
1.125 - # Show the results.
1.126 -
1.127 - for i in range(0, N):
1.128 - for result in results[i*N:i*N+N]:
1.129 - print result,
1.130 - print
1.131 -
1.132 - print "Time taken:", time.time() - t</pre>
1.133 -
1.134 -<p>(This code in context with <code>import</code> statements and functions is
1.135 -found in the <code>examples/simple1.py</code> file.)</p>
1.136 -
1.137 -<p>Here, a computation in the <code>calculate</code> function is performed for
1.138 -each combination of <code>i</code> and <code>j</code> in the nested loop,
1.139 -returning a result value. However, we must wait for the completion of this
1.140 -function for each element before moving on to the next element, and this means
1.141 -that the computations are performed sequentially. Consequently, on a system
1.142 -with more than one processor, even if we could call <code>calculate</code> for
1.143 -more than one combination of <code>i</code> and <code>j</code><code></code>
1.144 -and have the computations executing at the same time, the above program will
1.145 -not take advantage of such capabilities.</p>
1.146 -
1.147 -<p>We use a slightly modified version of <code>calculate</code> which employs
1.148 -two parameters instead of one:</p>
1.149 -
1.150 -<pre>
1.151 -def calculate(i, j):
1.152 -
1.153 - """
1.154 - A supposedly time-consuming calculation on 'i' and 'j'.
1.155 - """
1.156 -
1.157 - time.sleep(delay)
1.158 - return i * N + j
1.159 -</pre>
1.160 -
1.161 -<p>In order to reduce the processing time - to speed the code up, in other
1.162 -words - we can make this code use several processes instead of just one. Here
1.163 -is the modified code:</p>
1.164 -
1.165 -<pre id="simple_managed_map">
1.166 - t = time.time()
1.167 -
1.168 - # Initialise the results using a map with a limit on the number of
1.169 - # channels/processes.
1.170 -
1.171 - <strong>results = pprocess.Map(limit=limit)</strong><code></code>
1.172 -
1.173 - # Wrap the calculate function and manage it.
1.174 -
1.175 - <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>
1.176 -
1.177 - # Perform the work.
1.178 -
1.179 - print "Calculating..."
1.180 - for i in range(0, N):
1.181 - for j in range(0, N):
1.182 - <strong>calc</strong>(i, j)
1.183 -
1.184 - # Show the results.
1.185 -
1.186 - for i in range(0, N):
1.187 - for result in results[i*N:i*N+N]:
1.188 - print result,
1.189 - print
1.190 -
1.191 - print "Time taken:", time.time() - t</pre>
1.192 -
1.193 -<p>(This code in context with <code>import</code> statements and functions is
1.194 -found in the <code>examples/simple_managed_map.py</code> file.)</p>
1.195 -
1.196 -<p>The principal changes in the above code involve the use of a
1.197 -<code>pprocess.Map</code> object to collect the results, and a version of the
1.198 -<code>calculate</code> function which is managed by the <code>Map</code>
1.199 -object. What the <code>Map</code> object does is to arrange the results of
1.200 -computations such that iterating over the object or accessing the object using
1.201 -list operations provides the results in the same order as their corresponding
1.202 -inputs.</p>
1.203 -
1.204 -<h2>Converting Arbitrarily-Ordered Invocations</h2>
1.205 -
1.206 -<p>In some programs, it is not important to receive the results of
1.207 -computations in any particular order, usually because either the order of
1.208 -these results is irrelevant, or because the results provide "positional"
1.209 -information which let them be handled in an appropriate way. Consider the
1.210 -following Python code:</p>
1.211 -
1.212 -<pre>
1.213 - t = time.time()
1.214 -
1.215 - # Initialise an array.
1.216 -
1.217 - results = [0] * N * N
1.218 -
1.219 - # Perform the work.
1.220 -
1.221 - print "Calculating..."
1.222 - for i in range(0, N):
1.223 - for j in range(0, N):
1.224 - i2, j2, result = calculate(i, j)
1.225 - results[i2*N+j2] = result
1.226 -
1.227 - # Show the results.
1.228 -
1.229 - for i in range(0, N):
1.230 - for result in results[i*N:i*N+N]:
1.231 - print result,
1.232 - print
1.233 -
1.234 - print "Time taken:", time.time() - t
1.235 -</pre>
1.236 -
1.237 -<p>(This code in context with <code>import</code> statements and functions is
1.238 -found in the <code>examples/simple2.py</code> file.)</p>
1.239 -
1.240 -<p>Here, a result array is initialised first and each computation is performed
1.241 -sequentially. A significant difference to the previous examples is the return
1.242 -value of the <code>calculate</code> function: the position details
1.243 -corresponding to <code>i</code> and <code>j</code> are returned alongside the
1.244 -result. Obviously, this is of limited value in the above code because the
1.245 -order of the computations and the reception of results is fixed. However, we
1.246 -get no benefit from parallelisation in the above example.</p>
1.247 -
1.248 -<p>We can bring the benefits of parallel processing to the above program with
1.249 -the following code:</p>
1.250 -
1.251 -<pre>
1.252 - t = time.time()
1.253 -
1.254 - # Initialise the communications queue with a limit on the number of
1.255 - # channels/processes.
1.256 -
1.257 - <strong>queue = pprocess.Queue(limit=limit)</strong>
1.258 -
1.259 - # Initialise an array.
1.260 -
1.261 - results = [0] * N * N
1.262 -
1.263 - # Wrap the calculate function and manage it.
1.264 -
1.265 - <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>
1.266 -
1.267 - # Perform the work.
1.268 -
1.269 - print "Calculating..."
1.270 - for i in range(0, N):
1.271 - for j in range(0, N):
1.272 - <strong>calc(i, j)</strong>
1.273 -
1.274 - # Store the results as they arrive.
1.275 -
1.276 - print "Finishing..."
1.277 - <strong>for i, j, result in queue:</strong>
1.278 - <strong>results[i*N+j] = result</strong>
1.279 -
1.280 - # Show the results.
1.281 -
1.282 - for i in range(0, N):
1.283 - for result in results[i*N:i*N+N]:
1.284 - print result,
1.285 - print
1.286 -
1.287 - print "Time taken:", time.time() - t
1.288 -</pre>
1.289 -
1.290 -<p>(This code in context with <code>import</code> statements and functions is
1.291 -found in the <code>examples/simple_managed_queue.py</code> file.)</p>
1.292 -
1.293 -<p>This revised code employs a <code>pprocess.Queue</code> object whose
1.294 -purpose is to collect the results of computations and to make them available
1.295 -in the order in which they were received. The code collecting results has been
1.296 -moved into a separate loop independent of the original computation loop and
1.297 -taking advantage of the more relevant "positional" information emerging from
1.298 -the queue.</p>
1.299 -
1.300 -<p>We can take this example further, illustrating some of the mechanisms
1.301 -employed by <code>pprocess</code>. Instead of collecting results in a queue,
1.302 -we can define a class containing a method which is called when new results
1.303 -arrive:</p>
1.304 -
1.305 -<pre>
1.306 -class MyExchange(pprocess.Exchange):
1.307 -
1.308 - "Parallel convenience class containing the array assignment operation."
1.309 -
1.310 - def store_data(self, ch):
1.311 - i, j, result = ch.receive()
1.312 - self.D[i*N+j] = result
1.313 -</pre>
1.314 -
1.315 -<p>This code exposes the channel paradigm which is used throughout
1.316 -<code>pprocess</code> and is available to applications, if desired. The effect
1.317 -of the method is the storage of a result received through the channel in an
1.318 -attribute of the object. The following code shows how this class can be used,
1.319 -with differences to the previous program illustrated:</p>
1.320 -
1.321 -<pre>
1.322 - t = time.time()
1.323 -
1.324 - # Initialise the communications exchange with a limit on the number of
1.325 - # channels/processes.
1.326 -
1.327 - <strong>exchange = MyExchange(limit=limit)</strong>
1.328 -
1.329 - # Initialise an array - it is stored in the exchange to permit automatic
1.330 - # assignment of values as the data arrives.
1.331 -
1.332 - <strong>results = exchange.D = [0] * N * N</strong>
1.333 -
1.334 - # Wrap the calculate function and manage it.
1.335 -
1.336 - calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))
1.337 -
1.338 - # Perform the work.
1.339 -
1.340 - print "Calculating..."
1.341 - for i in range(0, N):
1.342 - for j in range(0, N):
1.343 - calc(i, j)
1.344 -
1.345 - # Wait for the results.
1.346 -
1.347 - print "Finishing..."
1.348 - <strong>exchange.finish()</strong>
1.349 -
1.350 - # Show the results.
1.351 -
1.352 - for i in range(0, N):
1.353 - for result in results[i*N:i*N+N]:
1.354 - print result,
1.355 - print
1.356 -
1.357 - print "Time taken:", time.time() - t
1.358 -</pre>
1.359 -
1.360 -<p>(This code in context with <code>import</code> statements and functions is
1.361 -found in the <code>examples/simple_managed.py</code> file.)</p>
1.362 -
1.363 -<p>The main visible differences between this and the previous program are the
1.364 -storage of the result array in the exchange, the removal of the queue
1.365 -consumption code from the main program, placing the act of storing values in
1.366 -the exchange's <code>store_data</code> method, and the need to call the
1.367 -<code>finish</code> method on the <code>MyExchange</code> object so that we do
1.368 -not try and access the results too soon. One underlying benefit not visible in
1.369 -the above code is that we no longer need to accumulate results in a queue or
1.370 -other structure so that they may be processed and assigned to the correct
1.371 -positions in the result array.</p>
1.372 -
1.373 -<p>For the curious, we may remove some of the remaining conveniences of the
1.374 -above program to expose other features of <code>pprocess</code>. First, we
1.375 -define a slightly modified version of the <code>calculate</code> function:</p>
1.376 -
1.377 -<pre>
1.378 -def calculate(ch, i, j):
1.379 -
1.380 - """
1.381 - A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
1.382 - communicate with the parent process.
1.383 - """
1.384 -
1.385 - time.sleep(delay)
1.386 - ch.send((i, j, i * N + j))
1.387 -</pre>
1.388 -
1.389 -<p>This function accepts a channel, <code>ch</code>, through which results
1.390 -will be sent, and through which other values could potentially be received,
1.391 -although we choose not to do so here. The program using this function is as
1.392 -follows, with differences to the previous program illustrated:</p>
1.393 -
1.394 -<pre>
1.395 - t = time.time()
1.396 -
1.397 - # Initialise the communications exchange with a limit on the number of
1.398 - # channels/processes.
1.399 -
1.400 - exchange = MyExchange(limit=limit)
1.401 -
1.402 - # Initialise an array - it is stored in the exchange to permit automatic
1.403 - # assignment of values as the data arrives.
1.404 -
1.405 - results = exchange.D = [0] * N * N
1.406 -
1.407 - # Perform the work.
1.408 -
1.409 - print "Calculating..."
1.410 - for i in range(0, N):
1.411 - for j in range(0, N):
1.412 - <strong>exchange.start(calculate, i, j)</strong>
1.413 -
1.414 - # Wait for the results.
1.415 -
1.416 - print "Finishing..."
1.417 - exchange.finish()
1.418 -
1.419 - # Show the results.
1.420 -
1.421 - for i in range(0, N):
1.422 - for result in results[i*N:i*N+N]:
1.423 - print result,
1.424 - print
1.425 -
1.426 - print "Time taken:", time.time() - t
1.427 -</pre>
1.428 -
1.429 -<p>(This code in context with <code>import</code> statements and functions is
1.430 -found in the <code>examples/simple_start.py</code> file.)</p>
1.431 -
1.432 -<p>Here, we have discarded two conveniences: the wrapping of callables using
1.433 -<code>MakeParallel</code>, which lets us use functions without providing any
1.434 -channel parameters, and the management of callables using the
1.435 -<code>manage</code> method on queues, exchanges, and so on. The
1.436 -<code>start</code> method still calls the provided callable, but using a
1.437 -different notation from that employed previously.</p>
1.438 -
1.439 -<h2>Converting Inline Computations</h2>
1.440 -
1.441 -<p>Although many programs employ functions and other useful abstractions which
1.442 -can be treated as parallelisable units, some programs perform computations
1.443 -"inline", meaning that the code responsible appears directly within a loop or
1.444 -related control-flow construct. Consider the following code:</p>
1.445 -
1.446 -<pre>
1.447 - t = time.time()
1.448 -
1.449 - # Initialise an array.
1.450 -
1.451 - results = [0] * N * N
1.452 -
1.453 - # Perform the work.
1.454 -
1.455 - print "Calculating..."
1.456 - for i in range(0, N):
1.457 - for j in range(0, N):
1.458 - time.sleep(delay)
1.459 - results[i*N+j] = i * N + j
1.460 -
1.461 - # Show the results.
1.462 -
1.463 - for i in range(0, N):
1.464 - for result in results[i*N:i*N+N]:
1.465 - print result,
1.466 - print
1.467 -
1.468 - print "Time taken:", time.time() - t
1.469 -</pre>
1.470 -
1.471 -<p>(This code in context with <code>import</code> statements and functions is
1.472 -found in the <code>examples/simple.py</code> file.)</p>
1.473 -
1.474 -<p>To simulate "work", as in the different versions of the
1.475 -<code>calculate</code> function, we use the <code>time.sleep</code> function
1.476 -(which does not actually do work, and which will cause a process to be
1.477 -descheduled in most cases, but which simulates the delay associated with work
1.478 -being done). This inline work, which must be performed sequentially in the
1.479 -above program, can be performed in parallel in a somewhat modified version of
1.480 -the program:</p>
1.481 -
1.482 -<pre>
1.483 - t = time.time()
1.484 -
1.485 - # Initialise the results using a map with a limit on the number of
1.486 - # channels/processes.
1.487 -
1.488 - <strong>results = pprocess.Map(limit=limit)</strong>
1.489 -
1.490 - # Perform the work.
1.491 - # NOTE: Could use the with statement in the loop to package the
1.492 - # NOTE: try...finally functionality.
1.493 -
1.494 - print "Calculating..."
1.495 - for i in range(0, N):
1.496 - for j in range(0, N):
1.497 - <strong>ch = results.create()</strong>
1.498 - <strong>if ch:</strong>
1.499 - <strong>try: # Calculation work.</strong>
1.500 -
1.501 - time.sleep(delay)
1.502 - <strong>ch.send(i * N + j)</strong>
1.503 -
1.504 - <strong>finally: # Important finalisation.</strong>
1.505 -
1.506 - <strong>pprocess.exit(ch)</strong>
1.507 -
1.508 - # Show the results.
1.509 -
1.510 - for i in range(0, N):
1.511 - for result in results[i*N:i*N+N]:
1.512 - print result,
1.513 - print
1.514 -
1.515 - print "Time taken:", time.time() - t
1.516 -</pre>
1.517 -
1.518 -<p>(This code in context with <code>import</code> statements and functions is
1.519 -found in the <code>examples/simple_create_map.py</code> file.)</p>
1.520 -
1.521 -<p>Although seemingly more complicated, the bulk of the changes in this
1.522 -modified program are focused on obtaining a channel object, <code>ch</code>,
1.523 -at the point where the computations are performed, and the wrapping of the
1.524 -computation code in a <code>try</code>...<code>finally</code> statement which
1.525 -ensures that the process associated with the channel exits when the
1.526 -computation is complete. In order for the results of these computations to be
1.527 -collected, a <code>pprocess.Map</code> object is used, since it will maintain
1.528 -the results in the same order as the initiation of the computations which
1.529 -produced them.</p>
1.530 -
1.531 -<h2>Reusing Processes in Parallel Programs</h2>
1.532 -
1.533 -<p>One notable aspect of the above programs when parallelised is that each
1.534 -invocation of a computation in parallel creates a new process in which the
1.535 -computation is to be performed, regardless of whether existing processes had
1.536 -just finished producing results and could theoretically have been asked to
1.537 -perform new computations. In other words, processes were created and destroyed
1.538 -instead of being reused.</p>
1.539 -
1.540 -<p>However, we can request that processes be reused for computations by
1.541 -enabling the <code>reuse</code> feature of exchange-like objects and employing
1.542 -suitable reusable callables. Consider this modified version of the <a
1.543 -href="#simple_managed_map">simple_managed_map</a> program:</p>
1.544 -
1.545 -<pre>
1.546 - t = time.time()
1.547 -
1.548 - # Initialise the results using a map with a limit on the number of
1.549 - # channels/processes.
1.550 -
1.551 - results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)
1.552 -
1.553 - # Wrap the calculate function and manage it.
1.554 -
1.555 - calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))
1.556 -
1.557 - # Perform the work.
1.558 -
1.559 - print "Calculating..."
1.560 - for i in range(0, N):
1.561 - for j in range(0, N):
1.562 - calc(i, j)
1.563 -
1.564 - # Show the results.
1.565 -
1.566 - for i in range(0, N):
1.567 - for result in results[i*N:i*N+N]:
1.568 - print result,
1.569 - print
1.570 -
1.571 - print "Time taken:", time.time() - t
1.572 -</pre>
1.573 -
1.574 -<p>(This code in context with <code>import</code> statements and functions is
1.575 -found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>
1.576 -
1.577 -<p>By indicating that processes and channels shall be reused, and by wrapping
1.578 -the <code>calculate</code> function with the necessary support, the
1.579 -computations may be performed in parallel using a pool of processes instead of
1.580 -creating a new process for each computation and then discarding it, only to
1.581 -create a new process for the next computation.</p>
1.582 -
1.583 -<h2>Summary</h2>
1.584 -
1.585 -<p>The following table indicates the features used in converting one
1.586 -sequential example program to another parallel program:</p>
1.587 -
1.588 -<table border="1" cellspacing="0" cellpadding="5">
1.589 - <thead>
1.590 - <tr>
1.591 - <th>Sequential Example</th>
1.592 - <th>Parallel Example</th>
1.593 - <th>Features Used</th>
1.594 - </tr>
1.595 - </thead>
1.596 - <tbody>
1.597 - <tr>
1.598 - <td>simple_map</td>
1.599 - <td>simple_pmap</td>
1.600 - <td>pmap</td>
1.601 - </tr>
1.602 - <tr>
1.603 - <td>simple1</td>
1.604 - <td>simple_managed_map</td>
1.605 - <td>MakeParallel, Map, manage</td>
1.606 - </tr>
1.607 - <tr>
1.608 - <td rowspan="3">simple2</td>
1.609 - <td>simple_managed_queue</td>
1.610 - <td>MakeParallel, Queue, manage</td>
1.611 - </tr>
1.612 - <tr>
1.613 - <td>simple_managed</td>
1.614 - <td>MakeParallel, Exchange (subclass), manage, finish</td>
1.615 - </tr>
1.616 - <tr>
1.617 - <td>simple_start</td>
1.618 - <td>Channel, Exchange (subclass), start, finish</td>
1.619 - </tr>
1.620 - <tr>
1.621 - <td>simple</td>
1.622 - <td>simple_create_map</td>
1.623 - <td>Channel, Map, create, exit</td>
1.624 - </tr>
1.625 - </tbody>
1.626 -</table>
1.627 -
1.628 -</body>
1.629 -</html>