1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/docs/tutorial.html Sun Sep 16 20:46:34 2007 +0000
1.3 @@ -0,0 +1,626 @@
1.4 +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
1.5 +<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
1.6 +<head>
1.7 + <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
1.8 + <title>pprocess - Tutorial</title>
1.9 + <link href="styles.css" rel="stylesheet" type="text/css" />
1.10 +</head>
1.11 +<body>
1.12 +
1.13 +<h1>pprocess - Tutorial</h1>
1.14 +
1.15 +<p>The <code>pprocess</code> module provides several mechanisms for running
1.16 +Python code concurrently in several processes. The most straightforward way of
1.17 +making a program parallel-aware - that is, where the program can take
1.18 +advantage of more than one processor to simultaneously process data - is to
1.19 +use the <code>pmap</code> function.</p>
1.20 +
1.21 +<h2>Converting Map-Style Code</h2>
1.22 +
1.23 +<p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>
1.24 +
1.25 +<pre>
1.26 + t = time.time()
1.27 +
1.28 + # Initialise an array.
1.29 +
1.30 + sequence = []
1.31 + for i in range(0, N):
1.32 + for j in range(0, N):
1.33 + sequence.append((i, j))
1.34 +
1.35 + # Perform the work.
1.36 +
1.37 + results = map(calculate, sequence)
1.38 +
1.39 + # Show the results.
1.40 +
1.41 + for i in range(0, N):
1.42 + for result in results[i*N:i*N+N]:
1.43 + print result,
1.44 + print
1.45 +
1.46 + print "Time taken:", time.time() - t</pre>
1.47 +
1.48 +<p>(This code in context with <code>import</code> statements and functions is
1.49 +found in the <code>examples/simple_map.py</code> file.)</p>
1.50 +
1.51 +<p>The principal features of this program involve the preparation of an array
1.52 +for input purposes, and the use of the <code>map</code> function to iterate
1.53 +over the combinations of <code>i</code> and <code>j</code> in the array. Even
1.54 +if the <code>calculate</code> function could be invoked independently for each
1.55 +input value, we have to wait for each computation to complete before
1.56 +initiating a new one. The <code>calculate</code> function may be defined as
1.57 +follows:</p>
1.58 +
1.59 +<pre>
1.60 +def calculate(t):
1.61 +
1.62 + "A supposedly time-consuming calculation on 't'."
1.63 +
1.64 + i, j = t
1.65 + time.sleep(delay)
1.66 + return i * N + j
1.67 +</pre>
1.68 +
1.69 +<p>In order to reduce the processing time - to speed the code up, in other
1.70 +words - we can make this code use several processes instead of just one. Here
1.71 +is the modified code:</p>
1.72 +
1.73 +<pre>
1.74 + t = time.time()
1.75 +
1.76 + # Initialise an array.
1.77 +
1.78 + sequence = []
1.79 + for i in range(0, N):
1.80 + for j in range(0, N):
1.81 + sequence.append((i, j))
1.82 +
1.83 + # Perform the work.
1.84 +
1.85 + results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)
1.86 +
1.87 + # Show the results.
1.88 +
1.89 + for i in range(0, N):
1.90 + for result in results[i*N:i*N+N]:
1.91 + print result,
1.92 + print
1.93 +
1.94 + print "Time taken:", time.time() - t</pre>
1.95 +
1.96 +<p>(This code in context with <code>import</code> statements and functions is
1.97 +found in the <code>examples/simple_pmap.py</code> file.)</p>
1.98 +
1.99 +<p>By replacing usage of the <code>map</code> function with the
1.100 +<code>pprocess.pmap</code> function, and specifying the limit on the number of
1.101 +processes to be active at any given time (the value of the <code>limit</code>
1.102 +variable is defined elsewhere), several calculations can now be performed in
1.103 +parallel.</p>
1.104 +
1.105 +<h2>Converting Invocations to Parallel Operations</h2>
1.106 +
1.107 +<p>Although some programs make natural use of the <code>map</code> function,
1.108 +others may employ an invocation in a nested loop. This may also be converted
1.109 +to a parallel program. Consider the following Python code:</p>
1.110 +
1.111 +<pre>
1.112 + t = time.time()
1.113 +
1.114 + # Initialise an array.
1.115 +
1.116 + results = []
1.117 +
1.118 + # Perform the work.
1.119 +
1.120 + print "Calculating..."
1.121 + for i in range(0, N):
1.122 + for j in range(0, N):
1.123 + results.append(calculate(i, j))
1.124 +
1.125 + # Show the results.
1.126 +
1.127 + for i in range(0, N):
1.128 + for result in results[i*N:i*N+N]:
1.129 + print result,
1.130 + print
1.131 +
1.132 + print "Time taken:", time.time() - t</pre>
1.133 +
1.134 +<p>(This code in context with <code>import</code> statements and functions is
1.135 +found in the <code>examples/simple1.py</code> file.)</p>
1.136 +
1.137 +<p>Here, a computation in the <code>calculate</code> function is performed for
1.138 +each combination of <code>i</code> and <code>j</code> in the nested loop,
1.139 +returning a result value. However, we must wait for the completion of this
1.140 +function for each element before moving on to the next element, and this means
1.141 +that the computations are performed sequentially. Consequently, on a system
1.142 +with more than one processor, even if we could call <code>calculate</code> for
1.143 +more than one combination of <code>i</code> and <code>j</code><code></code>
1.144 +and have the computations executing at the same time, the above program will
1.145 +not take advantage of such capabilities.</p>
1.146 +
1.147 +<p>We use a slightly modified version of <code>calculate</code> which employs
1.148 +two parameters instead of one:</p>
1.149 +
1.150 +<pre>
1.151 +def calculate(i, j):
1.152 +
1.153 + """
1.154 + A supposedly time-consuming calculation on 'i' and 'j'.
1.155 + """
1.156 +
1.157 + time.sleep(delay)
1.158 + return i * N + j
1.159 +</pre>
1.160 +
1.161 +<p>In order to reduce the processing time - to speed the code up, in other
1.162 +words - we can make this code use several processes instead of just one. Here
1.163 +is the modified code:</p>
1.164 +
1.165 +<pre id="simple_managed_map">
1.166 + t = time.time()
1.167 +
1.168 + # Initialise the results using a map with a limit on the number of
1.169 + # channels/processes.
1.170 +
1.171 + <strong>results = pprocess.Map(limit=limit)</strong><code></code>
1.172 +
1.173 + # Wrap the calculate function and manage it.
1.174 +
1.175 + <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>
1.176 +
1.177 + # Perform the work.
1.178 +
1.179 + print "Calculating..."
1.180 + for i in range(0, N):
1.181 + for j in range(0, N):
1.182 + <strong>calc</strong>(i, j)
1.183 +
1.184 + # Show the results.
1.185 +
1.186 + for i in range(0, N):
1.187 + for result in results[i*N:i*N+N]:
1.188 + print result,
1.189 + print
1.190 +
1.191 + print "Time taken:", time.time() - t</pre>
1.192 +
1.193 +<p>(This code in context with <code>import</code> statements and functions is
1.194 +found in the <code>examples/simple_managed_map.py</code> file.)</p>
1.195 +
1.196 +<p>The principal changes in the above code involve the use of a
1.197 +<code>pprocess.Map</code> object to collect the results, and a version of the
1.198 +<code>calculate</code> function which is managed by the <code>Map</code>
1.199 +object. What the <code>Map</code> object does is to arrange the results of
1.200 +computations such that iterating over the object or accessing the object using
1.201 +list operations provides the results in the same order as their corresponding
1.202 +inputs.</p>
1.203 +
1.204 +<h2>Converting Arbitrarily-Ordered Invocations</h2>
1.205 +
1.206 +<p>In some programs, it is not important to receive the results of
1.207 +computations in any particular order, usually because either the order of
1.208 +these results is irrelevant, or because the results provide "positional"
1.209 +information which let them be handled in an appropriate way. Consider the
1.210 +following Python code:</p>
1.211 +
1.212 +<pre>
1.213 + t = time.time()
1.214 +
1.215 + # Initialise an array.
1.216 +
1.217 + results = [0] * N * N
1.218 +
1.219 + # Perform the work.
1.220 +
1.221 + print "Calculating..."
1.222 + for i in range(0, N):
1.223 + for j in range(0, N):
1.224 + i2, j2, result = calculate(i, j)
1.225 + results[i2*N+j2] = result
1.226 +
1.227 + # Show the results.
1.228 +
1.229 + for i in range(0, N):
1.230 + for result in results[i*N:i*N+N]:
1.231 + print result,
1.232 + print
1.233 +
1.234 + print "Time taken:", time.time() - t
1.235 +</pre>
1.236 +
1.237 +<p>(This code in context with <code>import</code> statements and functions is
1.238 +found in the <code>examples/simple2.py</code> file.)</p>
1.239 +
1.240 +<p>Here, a result array is initialised first and each computation is performed
1.241 +sequentially. A significant difference to the previous examples is the return
1.242 +value of the <code>calculate</code> function: the position details
1.243 +corresponding to <code>i</code> and <code>j</code> are returned alongside the
1.244 +result. Obviously, this is of limited value in the above code because the
1.245 +order of the computations and the reception of results is fixed. However, we
1.246 +get no benefit from parallelisation in the above example.</p>
1.247 +
1.248 +<p>We can bring the benefits of parallel processing to the above program with
1.249 +the following code:</p>
1.250 +
1.251 +<pre>
1.252 + t = time.time()
1.253 +
1.254 + # Initialise the communications queue with a limit on the number of
1.255 + # channels/processes.
1.256 +
1.257 + <strong>queue = pprocess.Queue(limit=limit)</strong>
1.258 +
1.259 + # Initialise an array.
1.260 +
1.261 + results = [0] * N * N
1.262 +
1.263 + # Wrap the calculate function and manage it.
1.264 +
1.265 + <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>
1.266 +
1.267 + # Perform the work.
1.268 +
1.269 + print "Calculating..."
1.270 + for i in range(0, N):
1.271 + for j in range(0, N):
1.272 + <strong>calc(i, j)</strong>
1.273 +
1.274 + # Store the results as they arrive.
1.275 +
1.276 + print "Finishing..."
1.277 + <strong>for i, j, result in queue:</strong>
1.278 + <strong>results[i*N+j] = result</strong>
1.279 +
1.280 + # Show the results.
1.281 +
1.282 + for i in range(0, N):
1.283 + for result in results[i*N:i*N+N]:
1.284 + print result,
1.285 + print
1.286 +
1.287 + print "Time taken:", time.time() - t
1.288 +</pre>
1.289 +
1.290 +<p>(This code in context with <code>import</code> statements and functions is
1.291 +found in the <code>examples/simple_managed_queue.py</code> file.)</p>
1.292 +
1.293 +<p>This revised code employs a <code>pprocess.Queue</code> object whose
1.294 +purpose is to collect the results of computations and to make them available
1.295 +in the order in which they were received. The code collecting results has been
1.296 +moved into a separate loop independent of the original computation loop and
1.297 +taking advantage of the more relevant "positional" information emerging from
1.298 +the queue.</p>
1.299 +
1.300 +<p>We can take this example further, illustrating some of the mechanisms
1.301 +employed by <code>pprocess</code>. Instead of collecting results in a queue,
1.302 +we can define a class containing a method which is called when new results
1.303 +arrive:</p>
1.304 +
1.305 +<pre>
1.306 +class MyExchange(pprocess.Exchange):
1.307 +
1.308 + "Parallel convenience class containing the array assignment operation."
1.309 +
1.310 + def store_data(self, ch):
1.311 + i, j, result = ch.receive()
1.312 + self.D[i*N+j] = result
1.313 +</pre>
1.314 +
1.315 +<p>This code exposes the channel paradigm which is used throughout
1.316 +<code>pprocess</code> and is available to applications, if desired. The effect
1.317 +of the method is the storage of a result received through the channel in an
1.318 +attribute of the object. The following code shows how this class can be used,
1.319 +with differences to the previous program illustrated:</p>
1.320 +
1.321 +<pre>
1.322 + t = time.time()
1.323 +
1.324 + # Initialise the communications exchange with a limit on the number of
1.325 + # channels/processes.
1.326 +
1.327 + <strong>exchange = MyExchange(limit=limit)</strong>
1.328 +
1.329 + # Initialise an array - it is stored in the exchange to permit automatic
1.330 + # assignment of values as the data arrives.
1.331 +
1.332 + <strong>results = exchange.D = [0] * N * N</strong>
1.333 +
1.334 + # Wrap the calculate function and manage it.
1.335 +
1.336 + calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))
1.337 +
1.338 + # Perform the work.
1.339 +
1.340 + print "Calculating..."
1.341 + for i in range(0, N):
1.342 + for j in range(0, N):
1.343 + calc(i, j)
1.344 +
1.345 + # Wait for the results.
1.346 +
1.347 + print "Finishing..."
1.348 + <strong>exchange.finish()</strong>
1.349 +
1.350 + # Show the results.
1.351 +
1.352 + for i in range(0, N):
1.353 + for result in results[i*N:i*N+N]:
1.354 + print result,
1.355 + print
1.356 +
1.357 + print "Time taken:", time.time() - t
1.358 +</pre>
1.359 +
1.360 +<p>(This code in context with <code>import</code> statements and functions is
1.361 +found in the <code>examples/simple_managed.py</code> file.)</p>
1.362 +
1.363 +<p>The main visible differences between this and the previous program are the
1.364 +storage of the result array in the exchange, the removal of the queue
1.365 +consumption code from the main program, placing the act of storing values in
1.366 +the exchange's <code>store_data</code> method, and the need to call the
1.367 +<code>finish</code> method on the <code>MyExchange</code> object so that we do
1.368 +not try and access the results too soon. One underlying benefit not visible in
1.369 +the above code is that we no longer need to accumulate results in a queue or
1.370 +other structure so that they may be processed and assigned to the correct
1.371 +positions in the result array.</p>
1.372 +
1.373 +<p>For the curious, we may remove some of the remaining conveniences of the
1.374 +above program to expose other features of <code>pprocess</code>. First, we
1.375 +define a slightly modified version of the <code>calculate</code> function:</p>
1.376 +
1.377 +<pre>
1.378 +def calculate(ch, i, j):
1.379 +
1.380 + """
1.381 + A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
1.382 + communicate with the parent process.
1.383 + """
1.384 +
1.385 + time.sleep(delay)
1.386 + ch.send((i, j, i * N + j))
1.387 +</pre>
1.388 +
1.389 +<p>This function accepts a channel, <code>ch</code>, through which results
1.390 +will be sent, and through which other values could potentially be received,
1.391 +although we choose not to do so here. The program using this function is as
1.392 +follows, with differences to the previous program illustrated:</p>
1.393 +
1.394 +<pre>
1.395 + t = time.time()
1.396 +
1.397 + # Initialise the communications exchange with a limit on the number of
1.398 + # channels/processes.
1.399 +
1.400 + exchange = MyExchange(limit=limit)
1.401 +
1.402 + # Initialise an array - it is stored in the exchange to permit automatic
1.403 + # assignment of values as the data arrives.
1.404 +
1.405 + results = exchange.D = [0] * N * N
1.406 +
1.407 + # Perform the work.
1.408 +
1.409 + print "Calculating..."
1.410 + for i in range(0, N):
1.411 + for j in range(0, N):
1.412 + <strong>exchange.start(calculate, i, j)</strong>
1.413 +
1.414 + # Wait for the results.
1.415 +
1.416 + print "Finishing..."
1.417 + exchange.finish()
1.418 +
1.419 + # Show the results.
1.420 +
1.421 + for i in range(0, N):
1.422 + for result in results[i*N:i*N+N]:
1.423 + print result,
1.424 + print
1.425 +
1.426 + print "Time taken:", time.time() - t
1.427 +</pre>
1.428 +
1.429 +<p>(This code in context with <code>import</code> statements and functions is
1.430 +found in the <code>examples/simple_start.py</code> file.)</p>
1.431 +
1.432 +<p>Here, we have discarded two conveniences: the wrapping of callables using
1.433 +<code>MakeParallel</code>, which lets us use functions without providing any
1.434 +channel parameters, and the management of callables using the
1.435 +<code>manage</code> method on queues, exchanges, and so on. The
1.436 +<code>start</code> method still calls the provided callable, but using a
1.437 +different notation from that employed previously.</p>
1.438 +
1.439 +<h2>Converting Inline Computations</h2>
1.440 +
1.441 +<p>Although many programs employ functions and other useful abstractions which
1.442 +can be treated as parallelisable units, some programs perform computations
1.443 +"inline", meaning that the code responsible appears directly within a loop or
1.444 +related control-flow construct. Consider the following code:</p>
1.445 +
1.446 +<pre>
1.447 + t = time.time()
1.448 +
1.449 + # Initialise an array.
1.450 +
1.451 + results = [0] * N * N
1.452 +
1.453 + # Perform the work.
1.454 +
1.455 + print "Calculating..."
1.456 + for i in range(0, N):
1.457 + for j in range(0, N):
1.458 + time.sleep(delay)
1.459 + results[i*N+j] = i * N + j
1.460 +
1.461 + # Show the results.
1.462 +
1.463 + for i in range(0, N):
1.464 + for result in results[i*N:i*N+N]:
1.465 + print result,
1.466 + print
1.467 +
1.468 + print "Time taken:", time.time() - t
1.469 +</pre>
1.470 +
1.471 +<p>(This code in context with <code>import</code> statements and functions is
1.472 +found in the <code>examples/simple.py</code> file.)</p>
1.473 +
1.474 +<p>To simulate "work", as in the different versions of the
1.475 +<code>calculate</code> function, we use the <code>time.sleep</code> function
1.476 +(which does not actually do work, and which will cause a process to be
1.477 +descheduled in most cases, but which simulates the delay associated with work
1.478 +being done). This inline work, which must be performed sequentially in the
1.479 +above program, can be performed in parallel in a somewhat modified version of
1.480 +the program:</p>
1.481 +
1.482 +<pre>
1.483 + t = time.time()
1.484 +
1.485 + # Initialise the results using a map with a limit on the number of
1.486 + # channels/processes.
1.487 +
1.488 + <strong>results = pprocess.Map(limit=limit)</strong>
1.489 +
1.490 + # Perform the work.
1.491 + # NOTE: Could use the with statement in the loop to package the
1.492 + # NOTE: try...finally functionality.
1.493 +
1.494 + print "Calculating..."
1.495 + for i in range(0, N):
1.496 + for j in range(0, N):
1.497 + <strong>ch = results.create()</strong>
1.498 + <strong>if ch:</strong>
1.499 + <strong>try: # Calculation work.</strong>
1.500 +
1.501 + time.sleep(delay)
1.502 + <strong>ch.send(i * N + j)</strong>
1.503 +
1.504 + <strong>finally: # Important finalisation.</strong>
1.505 +
1.506 + <strong>pprocess.exit(ch)</strong>
1.507 +
1.508 + # Show the results.
1.509 +
1.510 + for i in range(0, N):
1.511 + for result in results[i*N:i*N+N]:
1.512 + print result,
1.513 + print
1.514 +
1.515 + print "Time taken:", time.time() - t
1.516 +</pre>
1.517 +
1.518 +<p>(This code in context with <code>import</code> statements and functions is
1.519 +found in the <code>examples/simple_create_map.py</code> file.)</p>
1.520 +
1.521 +<p>Although seemingly more complicated, the bulk of the changes in this
1.522 +modified program are focused on obtaining a channel object, <code>ch</code>,
1.523 +at the point where the computations are performed, and the wrapping of the
1.524 +computation code in a <code>try</code>...<code>finally</code> statement which
1.525 +ensures that the process associated with the channel exits when the
1.526 +computation is complete. In order for the results of these computations to be
1.527 +collected, a <code>pprocess.Map</code> object is used, since it will maintain
1.528 +the results in the same order as the initiation of the computations which
1.529 +produced them.</p>
1.530 +
1.531 +<h2>Reusing Processes in Parallel Programs</h2>
1.532 +
1.533 +<p>One notable aspect of the above programs when parallelised is that each
1.534 +invocation of a computation in parallel creates a new process in which the
1.535 +computation is to be performed, regardless of whether existing processes had
1.536 +just finished producing results and could theoretically have been asked to
1.537 +perform new computations. In other words, processes were created and destroyed
1.538 +instead of being reused.</p>
1.539 +
1.540 +<p>However, we can request that processes be reused for computations by
1.541 +enabling the <code>reuse</code> feature of exchange-like objects and employing
1.542 +suitable reusable callables. Consider this modified version of the <a
1.543 +href="#simple_managed_map">simple_managed_map</a> program:</p>
1.544 +
1.545 +<pre>
1.546 + t = time.time()
1.547 +
1.548 + # Initialise the results using a map with a limit on the number of
1.549 + # channels/processes.
1.550 +
1.551 + results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)
1.552 +
1.553 + # Wrap the calculate function and manage it.
1.554 +
1.555 + calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))
1.556 +
1.557 + # Perform the work.
1.558 +
1.559 + print "Calculating..."
1.560 + for i in range(0, N):
1.561 + for j in range(0, N):
1.562 + calc(i, j)
1.563 +
1.564 + # Show the results.
1.565 +
1.566 + for i in range(0, N):
1.567 + for result in results[i*N:i*N+N]:
1.568 + print result,
1.569 + print
1.570 +
1.571 + print "Time taken:", time.time() - t
1.572 +</pre>
1.573 +
1.574 +<p>(This code in context with <code>import</code> statements and functions is
1.575 +found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>
1.576 +
1.577 +<p>By indicating that processes and channels shall be reused, and by wrapping
1.578 +the <code>calculate</code> function with the necessary support, the
1.579 +computations may be performed in parallel using a pool of processes instead of
1.580 +creating a new process for each computation and then discarding it, only to
1.581 +create a new process for the next computation.</p>
1.582 +
1.583 +<h2>Summary</h2>
1.584 +
1.585 +<p>The following table indicates the features used in converting one
1.586 +sequential example program to another parallel program:</p>
1.587 +
1.588 +<table border="1" cellspacing="0" cellpadding="5">
1.589 + <thead>
1.590 + <tr>
1.591 + <th>Sequential Example</th>
1.592 + <th>Parallel Example</th>
1.593 + <th>Features Used</th>
1.594 + </tr>
1.595 + </thead>
1.596 + <tbody>
1.597 + <tr>
1.598 + <td>simple_map</td>
1.599 + <td>simple_pmap</td>
1.600 + <td>pmap</td>
1.601 + </tr>
1.602 + <tr>
1.603 + <td>simple1</td>
1.604 + <td>simple_managed_map</td>
1.605 + <td>MakeParallel, Map, manage</td>
1.606 + </tr>
1.607 + <tr>
1.608 + <td rowspan="3">simple2</td>
1.609 + <td>simple_managed_queue</td>
1.610 + <td>MakeParallel, Queue, manage</td>
1.611 + </tr>
1.612 + <tr>
1.613 + <td>simple_managed</td>
1.614 + <td>MakeParallel, Exchange (subclass), manage, finish</td>
1.615 + </tr>
1.616 + <tr>
1.617 + <td>simple_start</td>
1.618 + <td>Channel, Exchange (subclass), start, finish</td>
1.619 + </tr>
1.620 + <tr>
1.621 + <td>simple</td>
1.622 + <td>simple_create_map</td>
1.623 + <td>Channel, Map, create, exit</td>
1.624 + </tr>
1.625 + </tbody>
1.626 +</table>
1.627 +
1.628 +</body>
1.629 +</html>