paulb@95 | 1 | <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
paulb@95 | 2 | <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> |
paulb@95 | 3 | <head> |
paulb@108 | 4 | <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> |
paulb@95 | 5 | <title>pprocess - Tutorial</title> |
paulb@111 | 6 | <link href="styles.css" rel="stylesheet" type="text/css" /> |
paulb@95 | 7 | </head> |
paulb@95 | 8 | <body> |
paulb@95 | 9 | |
paulb@95 | 10 | <h1>pprocess - Tutorial</h1> |
paulb@95 | 11 | |
paulb@111 | 12 | <p>The <code>pprocess</code> module provides several mechanisms for running |
paulb@111 | 13 | Python code concurrently in several processes. The most straightforward way of |
paulb@111 | 14 | making a program parallel-aware - that is, where the program can take |
paulb@111 | 15 | advantage of more than one processor to simultaneously process data - is to |
paulb@111 | 16 | use the <code>pmap</code> function.</p> |
paulb@108 | 17 | |
paulb@108 | 18 | <h2>Converting Map-Style Code</h2> |
paulb@108 | 19 | |
paulb@108 | 20 | <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> |
paulb@95 | 21 | |
paulb@95 | 22 | <pre> |
paulb@95 | 23 | t = time.time() |
paulb@95 | 24 | |
paulb@108 | 25 | # Initialise an array. |
paulb@108 | 26 | |
paulb@95 | 27 | sequence = [] |
paulb@95 | 28 | for i in range(0, N): |
paulb@95 | 29 | for j in range(0, N): |
paulb@95 | 30 | sequence.append((i, j)) |
paulb@95 | 31 | |
paulb@108 | 32 | # Perform the work. |
paulb@108 | 33 | |
paulb@95 | 34 | results = map(calculate, sequence) |
paulb@95 | 35 | |
paulb@108 | 36 | # Show the results. |
paulb@108 | 37 | |
paulb@95 | 38 | for i in range(0, N): |
paulb@95 | 39 | for result in results[i*N:i*N+N]: |
paulb@95 | 40 | print result, |
paulb@95 | 41 | print |
paulb@108 | 42 | |
paulb@108 | 43 | print "Time taken:", time.time() - t</pre> |
paulb@95 | 44 | |
paulb@111 | 45 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 46 | found in the <code>examples/simple_map.py</code> file.)</p> |
paulb@111 | 47 | |
paulb@111 | 48 | <p>The principal features of this program involve the preparation of an array |
paulb@111 | 49 | for input purposes, and the use of the <code>map</code> function to iterate |
paulb@111 | 50 | over the combinations of <code>i</code> and <code>j</code> in the array. Even |
paulb@111 | 51 | if the <code>calculate</code> function could be invoked independently for each |
paulb@111 | 52 | input value, we have to wait for each computation to complete before |
paulb@111 | 53 | initiating a new one. The <code>calculate</code> function may be defined as |
paulb@111 | 54 | follows:</p> |
paulb@95 | 55 | |
paulb@111 | 56 | <pre> |
paulb@111 | 57 | def calculate(t): |
paulb@111 | 58 | |
paulb@111 | 59 | "A supposedly time-consuming calculation on 't'." |
paulb@95 | 60 | |
paulb@111 | 61 | i, j = t |
paulb@111 | 62 | time.sleep(delay) |
paulb@111 | 63 | return i * N + j |
paulb@111 | 64 | </pre> |
paulb@111 | 65 | |
paulb@111 | 66 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@111 | 67 | words - we can make this code use several processes instead of just one. Here |
paulb@111 | 68 | is the modified code:</p> |
paulb@95 | 69 | |
paulb@95 | 70 | <pre> |
paulb@95 | 71 | t = time.time() |
paulb@95 | 72 | |
paulb@108 | 73 | # Initialise an array. |
paulb@108 | 74 | |
paulb@95 | 75 | sequence = [] |
paulb@95 | 76 | for i in range(0, N): |
paulb@95 | 77 | for j in range(0, N): |
paulb@95 | 78 | sequence.append((i, j)) |
paulb@95 | 79 | |
paulb@108 | 80 | # Perform the work. |
paulb@108 | 81 | |
paulb@95 | 82 | results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) |
paulb@95 | 83 | |
paulb@108 | 84 | # Show the results. |
paulb@108 | 85 | |
paulb@95 | 86 | for i in range(0, N): |
paulb@95 | 87 | for result in results[i*N:i*N+N]: |
paulb@95 | 88 | print result, |
paulb@95 | 89 | print |
paulb@108 | 90 | |
paulb@108 | 91 | print "Time taken:", time.time() - t</pre> |
paulb@95 | 92 | |
paulb@111 | 93 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 94 | found in the <code>examples/simple_pmap.py</code> file.)</p> |
paulb@95 | 95 | |
paulb@111 | 96 | <p>By replacing usage of the <code>map</code> function with the |
paulb@111 | 97 | <code>pprocess.pmap</code> function, and specifying the limit on the number of |
paulb@111 | 98 | processes to be active at any given time (the value of the <code>limit</code> |
paulb@111 | 99 | variable is defined elsewhere), several calculations can now be performed in |
paulb@111 | 100 | parallel.</p> |
paulb@95 | 101 | |
paulb@108 | 102 | <h2>Converting Invocations to Parallel Operations</h2> |
paulb@108 | 103 | |
paulb@111 | 104 | <p>Although some programs make natural use of the <code>map</code> function, |
paulb@111 | 105 | others may employ an invocation in a nested loop. This may also be converted |
paulb@111 | 106 | to a parallel program. Consider the following Python code:</p> |
paulb@108 | 107 | |
paulb@108 | 108 | <pre> |
paulb@108 | 109 | t = time.time() |
paulb@108 | 110 | |
paulb@108 | 111 | # Initialise an array. |
paulb@108 | 112 | |
paulb@108 | 113 | results = [] |
paulb@108 | 114 | |
paulb@108 | 115 | # Perform the work. |
paulb@108 | 116 | |
paulb@108 | 117 | print "Calculating..." |
paulb@108 | 118 | for i in range(0, N): |
paulb@108 | 119 | for j in range(0, N): |
paulb@108 | 120 | results.append(calculate(i, j)) |
paulb@108 | 121 | |
paulb@108 | 122 | # Show the results. |
paulb@108 | 123 | |
paulb@108 | 124 | for i in range(0, N): |
paulb@108 | 125 | for result in results[i*N:i*N+N]: |
paulb@108 | 126 | print result, |
paulb@108 | 127 | print |
paulb@108 | 128 | |
paulb@108 | 129 | print "Time taken:", time.time() - t</pre> |
paulb@108 | 130 | |
paulb@111 | 131 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 132 | found in the <code>examples/simple1.py</code> file.)</p> |
paulb@111 | 133 | |
paulb@111 | 134 | <p>Here, a computation in the <code>calculate</code> function is performed for |
paulb@111 | 135 | each combination of <code>i</code> and <code>j</code> in the nested loop, |
paulb@111 | 136 | returning a result value. However, we must wait for the completion of this |
paulb@111 | 137 | function for each element before moving on to the next element, and this means |
paulb@111 | 138 | that the computations are performed sequentially. Consequently, on a system |
paulb@111 | 139 | with more than one processor, even if we could call <code>calculate</code> for |
paulb@111 | 140 | more than one combination of <code>i</code> and <code>j</code><code></code> |
paulb@111 | 141 | and have the computations executing at the same time, the above program will |
paulb@111 | 142 | not take advantage of such capabilities.</p> |
paulb@108 | 143 | |
paulb@111 | 144 | <p>We use a slightly modified version of <code>calculate</code> which employs |
paulb@111 | 145 | two parameters instead of one:</p> |
paulb@111 | 146 | |
paulb@111 | 147 | <pre> |
paulb@111 | 148 | def calculate(i, j): |
paulb@108 | 149 | |
paulb@111 | 150 | """ |
paulb@111 | 151 | A supposedly time-consuming calculation on 'i' and 'j'. |
paulb@111 | 152 | """ |
paulb@111 | 153 | |
paulb@111 | 154 | time.sleep(delay) |
paulb@111 | 155 | return i * N + j |
paulb@111 | 156 | </pre> |
paulb@111 | 157 | |
paulb@111 | 158 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@111 | 159 | words - we can make this code use several processes instead of just one. Here |
paulb@111 | 160 | is the modified code:</p> |
paulb@108 | 161 | |
paulb@108 | 162 | <pre> |
paulb@108 | 163 | t = time.time() |
paulb@108 | 164 | |
paulb@111 | 165 | # Initialise the results using a map with a limit on the number of |
paulb@108 | 166 | # channels/processes. |
paulb@108 | 167 | |
paulb@108 | 168 | <strong>results = pprocess.Map(limit=limit)</strong><code></code> |
paulb@108 | 169 | |
paulb@108 | 170 | # Wrap the calculate function and manage it. |
paulb@108 | 171 | |
paulb@108 | 172 | <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@108 | 173 | |
paulb@108 | 174 | # Perform the work. |
paulb@108 | 175 | |
paulb@108 | 176 | print "Calculating..." |
paulb@108 | 177 | for i in range(0, N): |
paulb@108 | 178 | for j in range(0, N): |
paulb@108 | 179 | <strong>calc</strong>(i, j) |
paulb@108 | 180 | |
paulb@108 | 181 | # Show the results. |
paulb@108 | 182 | |
paulb@108 | 183 | for i in range(0, N): |
paulb@108 | 184 | for result in results[i*N:i*N+N]: |
paulb@108 | 185 | print result, |
paulb@108 | 186 | print |
paulb@108 | 187 | |
paulb@108 | 188 | print "Time taken:", time.time() - t</pre> |
paulb@108 | 189 | |
paulb@111 | 190 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 191 | found in the <code>examples/simple_manage_map.py</code> file.)</p> |
paulb@111 | 192 | |
paulb@111 | 193 | <p>The principal changes in the above code involve the use of a |
paulb@111 | 194 | <code>pprocess.Map</code> object to collect the results, and a version of the |
paulb@111 | 195 | <code>calculate</code> function which is managed by the <code>Map</code> |
paulb@111 | 196 | object. What the <code>Map</code> object does is to arrange the results of |
paulb@111 | 197 | computations such that iterating over the object or accessing the object using |
paulb@111 | 198 | list operations provides the results in the same order as their corresponding |
paulb@111 | 199 | inputs.</p> |
paulb@111 | 200 | |
paulb@111 | 201 | <h2>Converting Arbitrarily-Ordered Invocations</h2> |
paulb@111 | 202 | |
paulb@111 | 203 | <p>In some programs, it is not important to receive the results of |
paulb@111 | 204 | computations in any particular order, usually because either the order of |
paulb@111 | 205 | these results is irrelevant, or because the results provide "positional" |
paulb@111 | 206 | information which let them be handled in an appropriate way. Consider the |
paulb@111 | 207 | following Python code:</p> |
paulb@111 | 208 | |
paulb@111 | 209 | <pre> |
paulb@111 | 210 | t = time.time() |
paulb@111 | 211 | |
paulb@111 | 212 | # Initialise an array. |
paulb@111 | 213 | |
paulb@111 | 214 | results = [0] * N * N |
paulb@111 | 215 | |
paulb@111 | 216 | # Perform the work. |
paulb@111 | 217 | |
paulb@111 | 218 | print "Calculating..." |
paulb@111 | 219 | for i in range(0, N): |
paulb@111 | 220 | for j in range(0, N): |
paulb@111 | 221 | i2, j2, result = calculate(i, j) |
paulb@111 | 222 | results[i2*N+j2] = result |
paulb@111 | 223 | |
paulb@111 | 224 | # Show the results. |
paulb@111 | 225 | |
paulb@111 | 226 | for i in range(0, N): |
paulb@111 | 227 | for result in results[i*N:i*N+N]: |
paulb@111 | 228 | print result, |
paulb@111 | 229 | print |
paulb@111 | 230 | |
paulb@111 | 231 | print "Time taken:", time.time() - t |
paulb@111 | 232 | </pre> |
paulb@111 | 233 | |
paulb@111 | 234 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 235 | found in the <code>examples/simple2.py</code> file.)</p> |
paulb@111 | 236 | |
paulb@111 | 237 | <p>Here, a result array is initialised first and each computation is performed |
paulb@111 | 238 | sequentially. A significant difference to the previous examples is the return |
paulb@111 | 239 | value of the <code>calculate</code> function: the position details |
paulb@111 | 240 | corresponding to <code>i</code> and <code>j</code> are returned alongside the |
paulb@111 | 241 | result. Obviously, this is of limited value in the above code because the |
paulb@111 | 242 | order of the computations and the reception of results is fixed. However, we |
paulb@111 | 243 | get no benefit from parallelisation in the above example.</p> |
paulb@111 | 244 | |
paulb@111 | 245 | <p>We can bring the benefits of parallel processing to the above program with |
paulb@111 | 246 | the following code:</p> |
paulb@111 | 247 | |
paulb@111 | 248 | <pre> |
paulb@111 | 249 | t = time.time() |
paulb@111 | 250 | |
paulb@111 | 251 | # Initialise the communications queue with a limit on the number of |
paulb@111 | 252 | # channels/processes. |
paulb@111 | 253 | |
paulb@111 | 254 | <strong>queue = pprocess.Queue(limit=limit)</strong> |
paulb@111 | 255 | |
paulb@111 | 256 | # Initialise an array. |
paulb@111 | 257 | |
paulb@111 | 258 | results = [0] * N * N |
paulb@111 | 259 | |
paulb@111 | 260 | # Wrap the calculate function and manage it. |
paulb@111 | 261 | |
paulb@111 | 262 | <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@111 | 263 | |
paulb@111 | 264 | # Perform the work. |
paulb@111 | 265 | |
paulb@111 | 266 | print "Calculating..." |
paulb@111 | 267 | for i in range(0, N): |
paulb@111 | 268 | for j in range(0, N): |
paulb@111 | 269 | <strong>calc(i, j)</strong> |
paulb@111 | 270 | |
paulb@111 | 271 | # Store the results as they arrive. |
paulb@111 | 272 | |
paulb@111 | 273 | print "Finishing..." |
paulb@111 | 274 | <strong>for i, j, result in queue:</strong> |
paulb@111 | 275 | <strong>results[i*N+j] = result</strong> |
paulb@111 | 276 | |
paulb@111 | 277 | # Show the results. |
paulb@111 | 278 | |
paulb@111 | 279 | for i in range(0, N): |
paulb@111 | 280 | for result in results[i*N:i*N+N]: |
paulb@111 | 281 | print result, |
paulb@111 | 282 | print |
paulb@111 | 283 | |
paulb@111 | 284 | print "Time taken:", time.time() - t |
paulb@111 | 285 | </pre> |
paulb@111 | 286 | |
paulb@111 | 287 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 288 | found in the <code>examples/simple_managed_queue.py</code> file.)</p> |
paulb@111 | 289 | |
paulb@111 | 290 | <p>This revised code employs a <code>pprocess.Queue</code> object whose |
paulb@111 | 291 | purpose is to collect the results of computations and to make them available |
paulb@111 | 292 | in the order in which they were received. The code collecting results has been |
paulb@111 | 293 | moved into a separate loop independent of the original computation loop and |
paulb@111 | 294 | taking advantage of the more relevant "positional" information emerging from |
paulb@111 | 295 | the queue.</p> |
paulb@111 | 296 | |
paulb@111 | 297 | <p>We can take this example further, illustrating some of the mechanisms |
paulb@111 | 298 | employed by <code>pprocess</code>. Instead of collecting results in a queue, |
paulb@111 | 299 | we can define a class containing a method which is called when new results |
paulb@111 | 300 | arrive:</p> |
paulb@111 | 301 | |
paulb@111 | 302 | <pre> |
paulb@111 | 303 | class MyExchange(pprocess.Exchange): |
paulb@111 | 304 | |
paulb@111 | 305 | "Parallel convenience class containing the array assignment operation." |
paulb@111 | 306 | |
paulb@111 | 307 | def store_data(self, ch): |
paulb@111 | 308 | i, j, result = ch.receive() |
paulb@111 | 309 | self.D[i*N+j] = result |
paulb@111 | 310 | </pre> |
paulb@111 | 311 | |
paulb@111 | 312 | <p>This code exposes the channel paradigm which is used throughout |
paulb@111 | 313 | <code>pprocess</code> and is available to applications, if desired. The effect |
paulb@111 | 314 | of the method is the storage of a result received through the channel in an |
paulb@111 | 315 | attribute of the object. The following code shows how this class can be used, |
paulb@111 | 316 | with differences to the previous program illustrated:</p> |
paulb@111 | 317 | |
paulb@111 | 318 | <pre> |
paulb@111 | 319 | t = time.time() |
paulb@111 | 320 | |
paulb@111 | 321 | # Initialise the communications exchange with a limit on the number of |
paulb@111 | 322 | # channels/processes. |
paulb@111 | 323 | |
paulb@111 | 324 | <strong>exchange = MyExchange(limit=limit)</strong> |
paulb@111 | 325 | |
paulb@111 | 326 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@111 | 327 | # assignment of values as the data arrives. |
paulb@111 | 328 | |
paulb@111 | 329 | <strong>results = exchange.D = [0] * N * N</strong> |
paulb@111 | 330 | |
paulb@111 | 331 | # Wrap the calculate function and manage it. |
paulb@111 | 332 | |
paulb@111 | 333 | calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) |
paulb@111 | 334 | |
paulb@111 | 335 | # Perform the work. |
paulb@111 | 336 | |
paulb@111 | 337 | print "Calculating..." |
paulb@111 | 338 | for i in range(0, N): |
paulb@111 | 339 | for j in range(0, N): |
paulb@111 | 340 | calc(i, j) |
paulb@111 | 341 | |
paulb@111 | 342 | # Wait for the results. |
paulb@111 | 343 | |
paulb@111 | 344 | print "Finishing..." |
paulb@111 | 345 | <strong>exchange.finish()</strong> |
paulb@111 | 346 | |
paulb@111 | 347 | # Show the results. |
paulb@111 | 348 | |
paulb@111 | 349 | for i in range(0, N): |
paulb@111 | 350 | for result in results[i*N:i*N+N]: |
paulb@111 | 351 | print result, |
paulb@111 | 352 | print |
paulb@111 | 353 | |
paulb@111 | 354 | print "Time taken:", time.time() - t |
paulb@111 | 355 | </pre> |
paulb@108 | 356 | |
paulb@111 | 357 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 358 | found in the <code>examples/simple_managed.py</code> file.)</p> |
paulb@111 | 359 | |
paulb@111 | 360 | <p>The main visible differences between this and the previous program are the |
paulb@111 | 361 | storage of the result array in the exchange, the removal of the queue |
paulb@111 | 362 | consumption code from the main program, placing the act of storing values in |
paulb@111 | 363 | the exchange's <code>store_data</code> method, and the need to call the |
paulb@111 | 364 | <code>finish</code> method on the <code>MyExchange</code> object so that we do |
paulb@111 | 365 | not try and access the results too soon. One underlying benefit not visible in |
paulb@111 | 366 | the above code is that we no longer need to accumulate results in a queue or |
paulb@111 | 367 | other structure so that they may be processed and assigned to the correct |
paulb@111 | 368 | positions in the result array.</p> |
paulb@111 | 369 | |
paulb@111 | 370 | <p>For the curious, we may remove some of the remaining conveniences of the |
paulb@111 | 371 | above program to expose other features of <code>pprocess</code>. First, we |
paulb@111 | 372 | define a slightly modified version of the <code>calculate</code> function:</p> |
paulb@111 | 373 | |
paulb@111 | 374 | <pre> |
paulb@111 | 375 | def calculate(ch, i, j): |
paulb@111 | 376 | |
paulb@111 | 377 | """ |
paulb@111 | 378 | A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to |
paulb@111 | 379 | communicate with the parent process. |
paulb@111 | 380 | """ |
paulb@111 | 381 | |
paulb@111 | 382 | time.sleep(delay) |
paulb@111 | 383 | ch.send((i, j, i * N + j)) |
paulb@111 | 384 | </pre> |
paulb@111 | 385 | |
paulb@111 | 386 | <p>This function accepts a channel, <code>ch</code>, through which results |
paulb@111 | 387 | will be sent, and through which other values could potentially be received, |
paulb@111 | 388 | although we choose not to do so here. The program using this function is as |
paulb@111 | 389 | follows, with differences to the previous program illustrated:</p> |
paulb@111 | 390 | |
paulb@111 | 391 | <pre> |
paulb@111 | 392 | t = time.time() |
paulb@111 | 393 | |
paulb@111 | 394 | # Initialise the communications exchange with a limit on the number of |
paulb@111 | 395 | # channels/processes. |
paulb@111 | 396 | |
paulb@111 | 397 | exchange = MyExchange(limit=limit) |
paulb@111 | 398 | |
paulb@111 | 399 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@111 | 400 | # assignment of values as the data arrives. |
paulb@111 | 401 | |
paulb@111 | 402 | results = exchange.D = [0] * N * N |
paulb@111 | 403 | |
paulb@111 | 404 | # Perform the work. |
paulb@111 | 405 | |
paulb@111 | 406 | print "Calculating..." |
paulb@111 | 407 | for i in range(0, N): |
paulb@111 | 408 | for j in range(0, N): |
paulb@111 | 409 | <strong>exchange.start(calculate, i, j)</strong> |
paulb@111 | 410 | |
paulb@111 | 411 | # Wait for the results. |
paulb@111 | 412 | |
paulb@111 | 413 | print "Finishing..." |
paulb@111 | 414 | exchange.finish() |
paulb@111 | 415 | |
paulb@111 | 416 | # Show the results. |
paulb@111 | 417 | |
paulb@111 | 418 | for i in range(0, N): |
paulb@111 | 419 | for result in results[i*N:i*N+N]: |
paulb@111 | 420 | print result, |
paulb@111 | 421 | print |
paulb@111 | 422 | |
paulb@111 | 423 | print "Time taken:", time.time() - t |
paulb@111 | 424 | </pre> |
paulb@111 | 425 | |
paulb@111 | 426 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 427 | found in the <code>examples/simple_start.py</code> file.)</p> |
paulb@111 | 428 | |
paulb@111 | 429 | <p>Here, we have discarded two conveniences: the wrapping of callables using |
paulb@111 | 430 | <code>MakeParallel</code>, which lets us use functions without providing any |
paulb@111 | 431 | channel parameters, and the management of callables using the |
paulb@111 | 432 | <code>manage</code> method on queues, exchanges, and so on. The |
paulb@111 | 433 | <code>start</code> method still calls the provided callable, but using a |
paulb@111 | 434 | different notation from that employed previously.</p> |
paulb@111 | 435 | |
paulb@111 | 436 | <h2>Converting Inline Computations</h2> |
paulb@111 | 437 | |
paulb@111 | 438 | <p>Although many programs employ functions and other useful abstractions which |
paulb@111 | 439 | can be treated as parallelisable units, some programs perform computations |
paulb@111 | 440 | "inline", meaning that the code responsible appears directly within a loop or |
paulb@111 | 441 | related control-flow construct. Consider the following code:</p> |
paulb@111 | 442 | |
paulb@111 | 443 | <pre> |
paulb@111 | 444 | t = time.time() |
paulb@111 | 445 | |
paulb@111 | 446 | # Initialise an array. |
paulb@111 | 447 | |
paulb@111 | 448 | results = [0] * N * N |
paulb@111 | 449 | |
paulb@111 | 450 | # Perform the work. |
paulb@111 | 451 | |
paulb@111 | 452 | print "Calculating..." |
paulb@111 | 453 | for i in range(0, N): |
paulb@111 | 454 | for j in range(0, N): |
paulb@111 | 455 | time.sleep(delay) |
paulb@111 | 456 | results[i*N+j] = i * N + j |
paulb@111 | 457 | |
paulb@111 | 458 | # Show the results. |
paulb@111 | 459 | |
paulb@111 | 460 | for i in range(0, N): |
paulb@111 | 461 | for result in results[i*N:i*N+N]: |
paulb@111 | 462 | print result, |
paulb@111 | 463 | print |
paulb@111 | 464 | |
paulb@111 | 465 | print "Time taken:", time.time() - t |
paulb@111 | 466 | </pre> |
paulb@111 | 467 | |
paulb@111 | 468 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 469 | found in the <code>examples/simple.py</code> file.)</p> |
paulb@111 | 470 | |
paulb@111 | 471 | <p>To simulate "work", as in the different versions of the |
paulb@111 | 472 | <code>calculate</code> function, we use the <code>time.sleep</code> function |
paulb@111 | 473 | (which does not actually do work, and which will cause a process to be |
paulb@111 | 474 | descheduled in most cases, but which simulates the delay associated with work |
paulb@111 | 475 | being done). This inline work, which must be performed sequentially in the |
paulb@111 | 476 | above program, can be performed in parallel in a somewhat modified version of |
paulb@111 | 477 | the program:</p> |
paulb@111 | 478 | |
paulb@111 | 479 | <pre> |
paulb@111 | 480 | t = time.time() |
paulb@111 | 481 | |
paulb@111 | 482 | # Initialise the results using a map with a limit on the number of |
paulb@111 | 483 | # channels/processes. |
paulb@111 | 484 | |
paulb@111 | 485 | <strong>results = pprocess.Map(limit=limit)</strong> |
paulb@111 | 486 | |
paulb@111 | 487 | # Perform the work. |
paulb@111 | 488 | # NOTE: Could use the with statement in the loop to package the |
paulb@111 | 489 | # NOTE: try...finally functionality. |
paulb@111 | 490 | |
paulb@111 | 491 | print "Calculating..." |
paulb@111 | 492 | for i in range(0, N): |
paulb@111 | 493 | for j in range(0, N): |
paulb@111 | 494 | <strong>ch = results.create()</strong> |
paulb@111 | 495 | <strong>if ch:</strong> |
paulb@111 | 496 | <strong>try: # Calculation work.</strong> |
paulb@111 | 497 | |
paulb@111 | 498 | time.sleep(delay) |
paulb@111 | 499 | <strong>ch.send(i * N + j)</strong> |
paulb@111 | 500 | |
paulb@111 | 501 | <strong>finally: # Important finalisation.</strong> |
paulb@111 | 502 | |
paulb@111 | 503 | <strong>pprocess.exit(ch)</strong> |
paulb@111 | 504 | |
paulb@111 | 505 | # Show the results. |
paulb@111 | 506 | |
paulb@111 | 507 | for i in range(0, N): |
paulb@111 | 508 | for result in results[i*N:i*N+N]: |
paulb@111 | 509 | print result, |
paulb@111 | 510 | print |
paulb@111 | 511 | |
paulb@111 | 512 | print "Time taken:", time.time() - t |
paulb@111 | 513 | </pre> |
paulb@111 | 514 | |
paulb@111 | 515 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@111 | 516 | found in the <code>examples/simple_create_map.py</code> file.)</p> |
paulb@111 | 517 | |
paulb@111 | 518 | <p>Although seemingly more complicated, the bulk of the changes in this |
paulb@111 | 519 | modified program are focused on obtaining a channel object, <code>ch</code>, |
paulb@111 | 520 | at the point where the computations are performed, and the wrapping of the |
paulb@111 | 521 | computation code in a <code>try</code>...<code>finally</code> statement which |
paulb@111 | 522 | ensures that the process associated with the channel exits when the |
paulb@111 | 523 | computation is complete. In order for the results of these computations to be |
paulb@111 | 524 | collected, a <code>pprocess.Map</code> object is used, since it will maintain |
paulb@111 | 525 | the results in the same order as the initiation of the computations which |
paulb@111 | 526 | produced them.</p> |
paulb@108 | 527 | |
paulb@95 | 528 | </body> |
paulb@95 | 529 | </html> |