paulb@124 | 1 | <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
paulb@124 | 2 | <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> |
paulb@124 | 3 | <head> |
paulb@124 | 4 | <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> |
paulb@124 | 5 | <title>pprocess - Tutorial</title> |
paulb@124 | 6 | <link href="styles.css" rel="stylesheet" type="text/css" /> |
paulb@124 | 7 | </head> |
paulb@124 | 8 | <body> |
paulb@124 | 9 | |
paulb@124 | 10 | <h1>pprocess - Tutorial</h1> |
paulb@124 | 11 | |
paulb@124 | 12 | <p>The <code>pprocess</code> module provides several mechanisms for running |
paulb@124 | 13 | Python code concurrently in several processes. The most straightforward way of |
paulb@124 | 14 | making a program parallel-aware - that is, where the program can take |
paulb@124 | 15 | advantage of more than one processor to simultaneously process data - is to |
paulb@124 | 16 | use the <code>pmap</code> function.</p> |
paulb@124 | 17 | |
paulb@145 | 18 | <ul> |
paulb@145 | 19 | <li><a href="#pmap">Converting Map-Style Code</a></li> |
paulb@145 | 20 | <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> |
paul@159 | 21 | <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a> |
paul@159 | 22 | <ul> |
paul@159 | 23 | <li><a href="#Exchange">Replacing Queues with Exchanges</li></a> |
paul@159 | 24 | <li><a href="#channel">Using Channels in Callables</li></a> |
paul@159 | 25 | </ul> |
paul@159 | 26 | </li> |
paulb@145 | 27 | <li><a href="#create">Converting Inline Computations</a></li> |
paulb@145 | 28 | <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> |
paul@158 | 29 | <li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li> |
paulb@145 | 30 | <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> |
paulb@145 | 31 | <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> |
paulb@145 | 32 | <li><a href="#Summary">Summary</a></li> |
paulb@145 | 33 | </ul> |
paulb@145 | 34 | |
paulb@149 | 35 | <p>For a brief summary of each of the features of <code>pprocess</code>, see |
paulb@149 | 36 | the <a href="reference.html">reference document</a>.</p> |
paulb@149 | 37 | |
paulb@145 | 38 | <h2 id="pmap">Converting Map-Style Code</h2> |
paulb@124 | 39 | |
paulb@124 | 40 | <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> |
paulb@124 | 41 | |
paulb@124 | 42 | <pre> |
paulb@124 | 43 | t = time.time() |
paulb@124 | 44 | |
paulb@124 | 45 | # Initialise an array. |
paulb@124 | 46 | |
paulb@124 | 47 | sequence = [] |
paulb@124 | 48 | for i in range(0, N): |
paulb@124 | 49 | for j in range(0, N): |
paulb@124 | 50 | sequence.append((i, j)) |
paulb@124 | 51 | |
paulb@124 | 52 | # Perform the work. |
paulb@124 | 53 | |
paulb@124 | 54 | results = map(calculate, sequence) |
paulb@124 | 55 | |
paulb@124 | 56 | # Show the results. |
paulb@124 | 57 | |
paulb@124 | 58 | for i in range(0, N): |
paulb@124 | 59 | for result in results[i*N:i*N+N]: |
paulb@124 | 60 | print result, |
paulb@124 | 61 | print |
paulb@124 | 62 | |
paulb@124 | 63 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 64 | |
paulb@124 | 65 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 66 | found in the <code>examples/simple_map.py</code> file.)</p> |
paulb@124 | 67 | |
paulb@124 | 68 | <p>The principal features of this program involve the preparation of an array |
paulb@124 | 69 | for input purposes, and the use of the <code>map</code> function to iterate |
paulb@124 | 70 | over the combinations of <code>i</code> and <code>j</code> in the array. Even |
paulb@124 | 71 | if the <code>calculate</code> function could be invoked independently for each |
paulb@124 | 72 | input value, we have to wait for each computation to complete before |
paulb@124 | 73 | initiating a new one. The <code>calculate</code> function may be defined as |
paulb@124 | 74 | follows:</p> |
paulb@124 | 75 | |
paulb@124 | 76 | <pre> |
paulb@124 | 77 | def calculate(t): |
paulb@124 | 78 | |
paulb@124 | 79 | "A supposedly time-consuming calculation on 't'." |
paulb@124 | 80 | |
paulb@124 | 81 | i, j = t |
paulb@124 | 82 | time.sleep(delay) |
paulb@124 | 83 | return i * N + j |
paulb@124 | 84 | </pre> |
paulb@124 | 85 | |
paulb@124 | 86 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@124 | 87 | words - we can make this code use several processes instead of just one. Here |
paulb@124 | 88 | is the modified code:</p> |
paulb@124 | 89 | |
paulb@124 | 90 | <pre> |
paulb@124 | 91 | t = time.time() |
paulb@124 | 92 | |
paulb@124 | 93 | # Initialise an array. |
paulb@124 | 94 | |
paulb@124 | 95 | sequence = [] |
paulb@124 | 96 | for i in range(0, N): |
paulb@124 | 97 | for j in range(0, N): |
paulb@124 | 98 | sequence.append((i, j)) |
paulb@124 | 99 | |
paulb@124 | 100 | # Perform the work. |
paulb@124 | 101 | |
paulb@124 | 102 | results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) |
paulb@124 | 103 | |
paulb@124 | 104 | # Show the results. |
paulb@124 | 105 | |
paulb@124 | 106 | for i in range(0, N): |
paulb@124 | 107 | for result in results[i*N:i*N+N]: |
paulb@124 | 108 | print result, |
paulb@124 | 109 | print |
paulb@124 | 110 | |
paulb@124 | 111 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 112 | |
paulb@124 | 113 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 114 | found in the <code>examples/simple_pmap.py</code> file.)</p> |
paulb@124 | 115 | |
paulb@124 | 116 | <p>By replacing usage of the <code>map</code> function with the |
paulb@124 | 117 | <code>pprocess.pmap</code> function, and specifying the limit on the number of |
paulb@124 | 118 | processes to be active at any given time (the value of the <code>limit</code> |
paulb@124 | 119 | variable is defined elsewhere), several calculations can now be performed in |
paulb@124 | 120 | parallel.</p> |
paulb@124 | 121 | |
paulb@145 | 122 | <h2 id="Map">Converting Invocations to Parallel Operations</h2> |
paulb@124 | 123 | |
paulb@124 | 124 | <p>Although some programs make natural use of the <code>map</code> function, |
paulb@124 | 125 | others may employ an invocation in a nested loop. This may also be converted |
paulb@124 | 126 | to a parallel program. Consider the following Python code:</p> |
paulb@124 | 127 | |
paulb@124 | 128 | <pre> |
paulb@124 | 129 | t = time.time() |
paulb@124 | 130 | |
paulb@124 | 131 | # Initialise an array. |
paulb@124 | 132 | |
paulb@124 | 133 | results = [] |
paulb@124 | 134 | |
paulb@124 | 135 | # Perform the work. |
paulb@124 | 136 | |
paulb@124 | 137 | print "Calculating..." |
paulb@124 | 138 | for i in range(0, N): |
paulb@124 | 139 | for j in range(0, N): |
paulb@124 | 140 | results.append(calculate(i, j)) |
paulb@124 | 141 | |
paulb@124 | 142 | # Show the results. |
paulb@124 | 143 | |
paulb@124 | 144 | for i in range(0, N): |
paulb@124 | 145 | for result in results[i*N:i*N+N]: |
paulb@124 | 146 | print result, |
paulb@124 | 147 | print |
paulb@124 | 148 | |
paulb@124 | 149 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 150 | |
paulb@124 | 151 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 152 | found in the <code>examples/simple1.py</code> file.)</p> |
paulb@124 | 153 | |
paulb@124 | 154 | <p>Here, a computation in the <code>calculate</code> function is performed for |
paulb@124 | 155 | each combination of <code>i</code> and <code>j</code> in the nested loop, |
paulb@124 | 156 | returning a result value. However, we must wait for the completion of this |
paulb@124 | 157 | function for each element before moving on to the next element, and this means |
paulb@124 | 158 | that the computations are performed sequentially. Consequently, on a system |
paulb@124 | 159 | with more than one processor, even if we could call <code>calculate</code> for |
paulb@124 | 160 | more than one combination of <code>i</code> and <code>j</code><code></code> |
paulb@124 | 161 | and have the computations executing at the same time, the above program will |
paulb@124 | 162 | not take advantage of such capabilities.</p> |
paulb@124 | 163 | |
paulb@124 | 164 | <p>We use a slightly modified version of <code>calculate</code> which employs |
paulb@124 | 165 | two parameters instead of one:</p> |
paulb@124 | 166 | |
paulb@124 | 167 | <pre> |
paulb@124 | 168 | def calculate(i, j): |
paulb@124 | 169 | |
paulb@124 | 170 | """ |
paulb@124 | 171 | A supposedly time-consuming calculation on 'i' and 'j'. |
paulb@124 | 172 | """ |
paulb@124 | 173 | |
paulb@124 | 174 | time.sleep(delay) |
paulb@124 | 175 | return i * N + j |
paulb@124 | 176 | </pre> |
paulb@124 | 177 | |
paulb@124 | 178 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@124 | 179 | words - we can make this code use several processes instead of just one. Here |
paulb@124 | 180 | is the modified code:</p> |
paulb@124 | 181 | |
paulb@124 | 182 | <pre id="simple_managed_map"> |
paulb@124 | 183 | t = time.time() |
paulb@124 | 184 | |
paulb@124 | 185 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 186 | # channels/processes. |
paulb@124 | 187 | |
paulb@124 | 188 | <strong>results = pprocess.Map(limit=limit)</strong><code></code> |
paulb@124 | 189 | |
paulb@124 | 190 | # Wrap the calculate function and manage it. |
paulb@124 | 191 | |
paulb@124 | 192 | <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@124 | 193 | |
paulb@124 | 194 | # Perform the work. |
paulb@124 | 195 | |
paulb@124 | 196 | print "Calculating..." |
paulb@124 | 197 | for i in range(0, N): |
paulb@124 | 198 | for j in range(0, N): |
paulb@124 | 199 | <strong>calc</strong>(i, j) |
paulb@124 | 200 | |
paulb@124 | 201 | # Show the results. |
paulb@124 | 202 | |
paulb@124 | 203 | for i in range(0, N): |
paulb@124 | 204 | for result in results[i*N:i*N+N]: |
paulb@124 | 205 | print result, |
paulb@124 | 206 | print |
paulb@124 | 207 | |
paulb@124 | 208 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 209 | |
paulb@124 | 210 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 211 | found in the <code>examples/simple_managed_map.py</code> file.)</p> |
paulb@124 | 212 | |
paulb@124 | 213 | <p>The principal changes in the above code involve the use of a |
paulb@124 | 214 | <code>pprocess.Map</code> object to collect the results, and a version of the |
paulb@124 | 215 | <code>calculate</code> function which is managed by the <code>Map</code> |
paulb@124 | 216 | object. What the <code>Map</code> object does is to arrange the results of |
paulb@124 | 217 | computations such that iterating over the object or accessing the object using |
paulb@124 | 218 | list operations provides the results in the same order as their corresponding |
paulb@124 | 219 | inputs.</p> |
paulb@124 | 220 | |
paulb@145 | 221 | <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> |
paulb@124 | 222 | |
paulb@124 | 223 | <p>In some programs, it is not important to receive the results of |
paulb@124 | 224 | computations in any particular order, usually because either the order of |
paulb@124 | 225 | these results is irrelevant, or because the results provide "positional" |
paulb@124 | 226 | information which let them be handled in an appropriate way. Consider the |
paulb@124 | 227 | following Python code:</p> |
paulb@124 | 228 | |
paulb@124 | 229 | <pre> |
paulb@124 | 230 | t = time.time() |
paulb@124 | 231 | |
paulb@124 | 232 | # Initialise an array. |
paulb@124 | 233 | |
paulb@124 | 234 | results = [0] * N * N |
paulb@124 | 235 | |
paulb@124 | 236 | # Perform the work. |
paulb@124 | 237 | |
paulb@124 | 238 | print "Calculating..." |
paulb@124 | 239 | for i in range(0, N): |
paulb@124 | 240 | for j in range(0, N): |
paulb@124 | 241 | i2, j2, result = calculate(i, j) |
paulb@124 | 242 | results[i2*N+j2] = result |
paulb@124 | 243 | |
paulb@124 | 244 | # Show the results. |
paulb@124 | 245 | |
paulb@124 | 246 | for i in range(0, N): |
paulb@124 | 247 | for result in results[i*N:i*N+N]: |
paulb@124 | 248 | print result, |
paulb@124 | 249 | print |
paulb@124 | 250 | |
paulb@124 | 251 | print "Time taken:", time.time() - t |
paulb@124 | 252 | </pre> |
paulb@124 | 253 | |
paulb@124 | 254 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 255 | found in the <code>examples/simple2.py</code> file.)</p> |
paulb@124 | 256 | |
paulb@124 | 257 | <p>Here, a result array is initialised first and each computation is performed |
paulb@124 | 258 | sequentially. A significant difference to the previous examples is the return |
paulb@124 | 259 | value of the <code>calculate</code> function: the position details |
paulb@124 | 260 | corresponding to <code>i</code> and <code>j</code> are returned alongside the |
paulb@124 | 261 | result. Obviously, this is of limited value in the above code because the |
paulb@124 | 262 | order of the computations and the reception of results is fixed. However, we |
paulb@124 | 263 | get no benefit from parallelisation in the above example.</p> |
paulb@124 | 264 | |
paulb@124 | 265 | <p>We can bring the benefits of parallel processing to the above program with |
paulb@124 | 266 | the following code:</p> |
paulb@124 | 267 | |
paulb@145 | 268 | <pre id="simple_managed_queue"> |
paulb@124 | 269 | t = time.time() |
paulb@124 | 270 | |
paulb@124 | 271 | # Initialise the communications queue with a limit on the number of |
paulb@124 | 272 | # channels/processes. |
paulb@124 | 273 | |
paulb@124 | 274 | <strong>queue = pprocess.Queue(limit=limit)</strong> |
paulb@124 | 275 | |
paulb@124 | 276 | # Initialise an array. |
paulb@124 | 277 | |
paulb@124 | 278 | results = [0] * N * N |
paulb@124 | 279 | |
paulb@124 | 280 | # Wrap the calculate function and manage it. |
paulb@124 | 281 | |
paulb@124 | 282 | <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@124 | 283 | |
paulb@124 | 284 | # Perform the work. |
paulb@124 | 285 | |
paulb@124 | 286 | print "Calculating..." |
paulb@124 | 287 | for i in range(0, N): |
paulb@124 | 288 | for j in range(0, N): |
paulb@124 | 289 | <strong>calc(i, j)</strong> |
paulb@124 | 290 | |
paulb@124 | 291 | # Store the results as they arrive. |
paulb@124 | 292 | |
paulb@124 | 293 | print "Finishing..." |
paulb@124 | 294 | <strong>for i, j, result in queue:</strong> |
paulb@124 | 295 | <strong>results[i*N+j] = result</strong> |
paulb@124 | 296 | |
paulb@124 | 297 | # Show the results. |
paulb@124 | 298 | |
paulb@124 | 299 | for i in range(0, N): |
paulb@124 | 300 | for result in results[i*N:i*N+N]: |
paulb@124 | 301 | print result, |
paulb@124 | 302 | print |
paulb@124 | 303 | |
paulb@124 | 304 | print "Time taken:", time.time() - t |
paulb@124 | 305 | </pre> |
paulb@124 | 306 | |
paulb@124 | 307 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 308 | found in the <code>examples/simple_managed_queue.py</code> file.)</p> |
paulb@124 | 309 | |
paulb@124 | 310 | <p>This revised code employs a <code>pprocess.Queue</code> object whose |
paulb@124 | 311 | purpose is to collect the results of computations and to make them available |
paulb@124 | 312 | in the order in which they were received. The code collecting results has been |
paulb@124 | 313 | moved into a separate loop independent of the original computation loop and |
paulb@124 | 314 | taking advantage of the more relevant "positional" information emerging from |
paulb@124 | 315 | the queue.</p> |
paulb@124 | 316 | |
paul@159 | 317 | <h3 id="Exchange">Replacing Queues with Exchanges</h3> |
paul@159 | 318 | |
paulb@124 | 319 | <p>We can take this example further, illustrating some of the mechanisms |
paulb@124 | 320 | employed by <code>pprocess</code>. Instead of collecting results in a queue, |
paulb@124 | 321 | we can define a class containing a method which is called when new results |
paulb@124 | 322 | arrive:</p> |
paulb@124 | 323 | |
paulb@124 | 324 | <pre> |
paulb@124 | 325 | class MyExchange(pprocess.Exchange): |
paulb@124 | 326 | |
paulb@124 | 327 | "Parallel convenience class containing the array assignment operation." |
paulb@124 | 328 | |
paulb@124 | 329 | def store_data(self, ch): |
paulb@124 | 330 | i, j, result = ch.receive() |
paulb@124 | 331 | self.D[i*N+j] = result |
paulb@124 | 332 | </pre> |
paulb@124 | 333 | |
paulb@124 | 334 | <p>This code exposes the channel paradigm which is used throughout |
paulb@124 | 335 | <code>pprocess</code> and is available to applications, if desired. The effect |
paulb@124 | 336 | of the method is the storage of a result received through the channel in an |
paulb@124 | 337 | attribute of the object. The following code shows how this class can be used, |
paulb@124 | 338 | with differences to the previous program illustrated:</p> |
paulb@124 | 339 | |
paulb@124 | 340 | <pre> |
paulb@124 | 341 | t = time.time() |
paulb@124 | 342 | |
paulb@124 | 343 | # Initialise the communications exchange with a limit on the number of |
paulb@124 | 344 | # channels/processes. |
paulb@124 | 345 | |
paulb@124 | 346 | <strong>exchange = MyExchange(limit=limit)</strong> |
paulb@124 | 347 | |
paulb@124 | 348 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@124 | 349 | # assignment of values as the data arrives. |
paulb@124 | 350 | |
paulb@124 | 351 | <strong>results = exchange.D = [0] * N * N</strong> |
paulb@124 | 352 | |
paulb@124 | 353 | # Wrap the calculate function and manage it. |
paulb@124 | 354 | |
paulb@124 | 355 | calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) |
paulb@124 | 356 | |
paulb@124 | 357 | # Perform the work. |
paulb@124 | 358 | |
paulb@124 | 359 | print "Calculating..." |
paulb@124 | 360 | for i in range(0, N): |
paulb@124 | 361 | for j in range(0, N): |
paulb@124 | 362 | calc(i, j) |
paulb@124 | 363 | |
paulb@124 | 364 | # Wait for the results. |
paulb@124 | 365 | |
paulb@124 | 366 | print "Finishing..." |
paulb@124 | 367 | <strong>exchange.finish()</strong> |
paulb@124 | 368 | |
paulb@124 | 369 | # Show the results. |
paulb@124 | 370 | |
paulb@124 | 371 | for i in range(0, N): |
paulb@124 | 372 | for result in results[i*N:i*N+N]: |
paulb@124 | 373 | print result, |
paulb@124 | 374 | print |
paulb@124 | 375 | |
paulb@124 | 376 | print "Time taken:", time.time() - t |
paulb@124 | 377 | </pre> |
paulb@124 | 378 | |
paulb@124 | 379 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 380 | found in the <code>examples/simple_managed.py</code> file.)</p> |
paulb@124 | 381 | |
paulb@124 | 382 | <p>The main visible differences between this and the previous program are the |
paulb@124 | 383 | storage of the result array in the exchange, the removal of the queue |
paulb@124 | 384 | consumption code from the main program, placing the act of storing values in |
paulb@124 | 385 | the exchange's <code>store_data</code> method, and the need to call the |
paulb@124 | 386 | <code>finish</code> method on the <code>MyExchange</code> object so that we do |
paulb@124 | 387 | not try and access the results too soon. One underlying benefit not visible in |
paulb@124 | 388 | the above code is that we no longer need to accumulate results in a queue or |
paulb@124 | 389 | other structure so that they may be processed and assigned to the correct |
paulb@124 | 390 | positions in the result array.</p> |
paulb@124 | 391 | |
paul@159 | 392 | <h3 id="channel">Using Channels in Callables</h3> |
paul@159 | 393 | |
paulb@124 | 394 | <p>For the curious, we may remove some of the remaining conveniences of the |
paulb@124 | 395 | above program to expose other features of <code>pprocess</code>. First, we |
paulb@124 | 396 | define a slightly modified version of the <code>calculate</code> function:</p> |
paulb@124 | 397 | |
paulb@124 | 398 | <pre> |
paulb@124 | 399 | def calculate(ch, i, j): |
paulb@124 | 400 | |
paulb@124 | 401 | """ |
paulb@124 | 402 | A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to |
paulb@124 | 403 | communicate with the parent process. |
paulb@124 | 404 | """ |
paulb@124 | 405 | |
paulb@124 | 406 | time.sleep(delay) |
paulb@124 | 407 | ch.send((i, j, i * N + j)) |
paulb@124 | 408 | </pre> |
paulb@124 | 409 | |
paulb@124 | 410 | <p>This function accepts a channel, <code>ch</code>, through which results |
paulb@124 | 411 | will be sent, and through which other values could potentially be received, |
paulb@124 | 412 | although we choose not to do so here. The program using this function is as |
paulb@124 | 413 | follows, with differences to the previous program illustrated:</p> |
paulb@124 | 414 | |
paulb@124 | 415 | <pre> |
paulb@124 | 416 | t = time.time() |
paulb@124 | 417 | |
paulb@124 | 418 | # Initialise the communications exchange with a limit on the number of |
paulb@124 | 419 | # channels/processes. |
paulb@124 | 420 | |
paulb@124 | 421 | exchange = MyExchange(limit=limit) |
paulb@124 | 422 | |
paulb@124 | 423 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@124 | 424 | # assignment of values as the data arrives. |
paulb@124 | 425 | |
paulb@124 | 426 | results = exchange.D = [0] * N * N |
paulb@124 | 427 | |
paulb@124 | 428 | # Perform the work. |
paulb@124 | 429 | |
paulb@124 | 430 | print "Calculating..." |
paulb@124 | 431 | for i in range(0, N): |
paulb@124 | 432 | for j in range(0, N): |
paulb@124 | 433 | <strong>exchange.start(calculate, i, j)</strong> |
paulb@124 | 434 | |
paulb@124 | 435 | # Wait for the results. |
paulb@124 | 436 | |
paulb@124 | 437 | print "Finishing..." |
paulb@124 | 438 | exchange.finish() |
paulb@124 | 439 | |
paulb@124 | 440 | # Show the results. |
paulb@124 | 441 | |
paulb@124 | 442 | for i in range(0, N): |
paulb@124 | 443 | for result in results[i*N:i*N+N]: |
paulb@124 | 444 | print result, |
paulb@124 | 445 | print |
paulb@124 | 446 | |
paulb@124 | 447 | print "Time taken:", time.time() - t |
paulb@124 | 448 | </pre> |
paulb@124 | 449 | |
paulb@124 | 450 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 451 | found in the <code>examples/simple_start.py</code> file.)</p> |
paulb@124 | 452 | |
paulb@124 | 453 | <p>Here, we have discarded two conveniences: the wrapping of callables using |
paulb@124 | 454 | <code>MakeParallel</code>, which lets us use functions without providing any |
paulb@124 | 455 | channel parameters, and the management of callables using the |
paulb@124 | 456 | <code>manage</code> method on queues, exchanges, and so on. The |
paulb@124 | 457 | <code>start</code> method still calls the provided callable, but using a |
paulb@124 | 458 | different notation from that employed previously.</p> |
paulb@124 | 459 | |
paulb@145 | 460 | <h2 id="create">Converting Inline Computations</h2> |
paulb@124 | 461 | |
paulb@124 | 462 | <p>Although many programs employ functions and other useful abstractions which |
paulb@124 | 463 | can be treated as parallelisable units, some programs perform computations |
paulb@124 | 464 | "inline", meaning that the code responsible appears directly within a loop or |
paulb@124 | 465 | related control-flow construct. Consider the following code:</p> |
paulb@124 | 466 | |
paulb@124 | 467 | <pre> |
paulb@124 | 468 | t = time.time() |
paulb@124 | 469 | |
paulb@124 | 470 | # Initialise an array. |
paulb@124 | 471 | |
paulb@124 | 472 | results = [0] * N * N |
paulb@124 | 473 | |
paulb@124 | 474 | # Perform the work. |
paulb@124 | 475 | |
paulb@124 | 476 | print "Calculating..." |
paulb@124 | 477 | for i in range(0, N): |
paulb@124 | 478 | for j in range(0, N): |
paulb@124 | 479 | time.sleep(delay) |
paulb@124 | 480 | results[i*N+j] = i * N + j |
paulb@124 | 481 | |
paulb@124 | 482 | # Show the results. |
paulb@124 | 483 | |
paulb@124 | 484 | for i in range(0, N): |
paulb@124 | 485 | for result in results[i*N:i*N+N]: |
paulb@124 | 486 | print result, |
paulb@124 | 487 | print |
paulb@124 | 488 | |
paulb@124 | 489 | print "Time taken:", time.time() - t |
paulb@124 | 490 | </pre> |
paulb@124 | 491 | |
paulb@124 | 492 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 493 | found in the <code>examples/simple.py</code> file.)</p> |
paulb@124 | 494 | |
paulb@124 | 495 | <p>To simulate "work", as in the different versions of the |
paulb@124 | 496 | <code>calculate</code> function, we use the <code>time.sleep</code> function |
paulb@124 | 497 | (which does not actually do work, and which will cause a process to be |
paulb@124 | 498 | descheduled in most cases, but which simulates the delay associated with work |
paulb@124 | 499 | being done). This inline work, which must be performed sequentially in the |
paulb@124 | 500 | above program, can be performed in parallel in a somewhat modified version of |
paulb@124 | 501 | the program:</p> |
paulb@124 | 502 | |
paulb@124 | 503 | <pre> |
paulb@124 | 504 | t = time.time() |
paulb@124 | 505 | |
paulb@124 | 506 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 507 | # channels/processes. |
paulb@124 | 508 | |
paulb@124 | 509 | <strong>results = pprocess.Map(limit=limit)</strong> |
paulb@124 | 510 | |
paulb@124 | 511 | # Perform the work. |
paulb@124 | 512 | # NOTE: Could use the with statement in the loop to package the |
paulb@124 | 513 | # NOTE: try...finally functionality. |
paulb@124 | 514 | |
paulb@124 | 515 | print "Calculating..." |
paulb@124 | 516 | for i in range(0, N): |
paulb@124 | 517 | for j in range(0, N): |
paulb@124 | 518 | <strong>ch = results.create()</strong> |
paulb@124 | 519 | <strong>if ch:</strong> |
paulb@124 | 520 | <strong>try: # Calculation work.</strong> |
paulb@124 | 521 | |
paulb@124 | 522 | time.sleep(delay) |
paulb@124 | 523 | <strong>ch.send(i * N + j)</strong> |
paulb@124 | 524 | |
paulb@124 | 525 | <strong>finally: # Important finalisation.</strong> |
paulb@124 | 526 | |
paulb@124 | 527 | <strong>pprocess.exit(ch)</strong> |
paulb@124 | 528 | |
paulb@124 | 529 | # Show the results. |
paulb@124 | 530 | |
paulb@124 | 531 | for i in range(0, N): |
paulb@124 | 532 | for result in results[i*N:i*N+N]: |
paulb@124 | 533 | print result, |
paulb@124 | 534 | print |
paulb@124 | 535 | |
paulb@124 | 536 | print "Time taken:", time.time() - t |
paulb@124 | 537 | </pre> |
paulb@124 | 538 | |
paulb@124 | 539 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 540 | found in the <code>examples/simple_create_map.py</code> file.)</p> |
paulb@124 | 541 | |
paulb@124 | 542 | <p>Although seemingly more complicated, the bulk of the changes in this |
paulb@124 | 543 | modified program are focused on obtaining a channel object, <code>ch</code>, |
paulb@124 | 544 | at the point where the computations are performed, and the wrapping of the |
paulb@124 | 545 | computation code in a <code>try</code>...<code>finally</code> statement which |
paulb@124 | 546 | ensures that the process associated with the channel exits when the |
paulb@124 | 547 | computation is complete. In order for the results of these computations to be |
paulb@124 | 548 | collected, a <code>pprocess.Map</code> object is used, since it will maintain |
paulb@124 | 549 | the results in the same order as the initiation of the computations which |
paulb@124 | 550 | produced them.</p> |
paulb@124 | 551 | |
paulb@145 | 552 | <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> |
paulb@124 | 553 | |
paulb@124 | 554 | <p>One notable aspect of the above programs when parallelised is that each |
paulb@124 | 555 | invocation of a computation in parallel creates a new process in which the |
paulb@124 | 556 | computation is to be performed, regardless of whether existing processes had |
paulb@124 | 557 | just finished producing results and could theoretically have been asked to |
paulb@124 | 558 | perform new computations. In other words, processes were created and destroyed |
paulb@124 | 559 | instead of being reused.</p> |
paulb@124 | 560 | |
paulb@124 | 561 | <p>However, we can request that processes be reused for computations by |
paulb@124 | 562 | enabling the <code>reuse</code> feature of exchange-like objects and employing |
paulb@124 | 563 | suitable reusable callables. Consider this modified version of the <a |
paulb@124 | 564 | href="#simple_managed_map">simple_managed_map</a> program:</p> |
paulb@124 | 565 | |
paulb@124 | 566 | <pre> |
paulb@124 | 567 | t = time.time() |
paulb@124 | 568 | |
paulb@124 | 569 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 570 | # channels/processes. |
paulb@124 | 571 | |
paulb@124 | 572 | results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) |
paulb@124 | 573 | |
paulb@124 | 574 | # Wrap the calculate function and manage it. |
paulb@124 | 575 | |
paulb@124 | 576 | calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) |
paulb@124 | 577 | |
paulb@124 | 578 | # Perform the work. |
paulb@124 | 579 | |
paulb@124 | 580 | print "Calculating..." |
paulb@124 | 581 | for i in range(0, N): |
paulb@124 | 582 | for j in range(0, N): |
paulb@124 | 583 | calc(i, j) |
paulb@124 | 584 | |
paulb@124 | 585 | # Show the results. |
paulb@124 | 586 | |
paulb@124 | 587 | for i in range(0, N): |
paulb@124 | 588 | for result in results[i*N:i*N+N]: |
paulb@124 | 589 | print result, |
paulb@124 | 590 | print |
paulb@124 | 591 | |
paulb@124 | 592 | print "Time taken:", time.time() - t |
paulb@124 | 593 | </pre> |
paulb@124 | 594 | |
paulb@124 | 595 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 596 | found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> |
paulb@124 | 597 | |
paulb@124 | 598 | <p>By indicating that processes and channels shall be reused, and by wrapping |
paulb@124 | 599 | the <code>calculate</code> function with the necessary support, the |
paulb@124 | 600 | computations may be performed in parallel using a pool of processes instead of |
paulb@124 | 601 | creating a new process for each computation and then discarding it, only to |
paulb@124 | 602 | create a new process for the next computation.</p> |
paulb@124 | 603 | |
paul@159 | 604 | <h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2> |
paul@159 | 605 | |
paul@159 | 606 | <p>Although reusable processes offer the opportunity to invoke a callable over |
paul@159 | 607 | and over within the same created process, they do not fully support the |
paul@159 | 608 | potential of the underlying mechanisms in <code>pprocess</code>: created |
paul@159 | 609 | processes can communicate multiple values to the creating process and can |
paul@159 | 610 | theoretically run within the same callable forever.</p> |
paul@159 | 611 | |
paul@159 | 612 | <p>Consider this modified form of the <code>calculate</code> function:</p> |
paul@159 | 613 | |
paul@159 | 614 | <pre> |
paul@159 | 615 | def calculate(ch, i): |
paul@159 | 616 | |
paul@159 | 617 | """ |
paul@159 | 618 | A supposedly time-consuming calculation on 'i'. |
paul@159 | 619 | """ |
paul@159 | 620 | |
paul@159 | 621 | for j in range(0, N): |
paul@159 | 622 | time.sleep(delay) |
paul@159 | 623 | ch.send((i, j, i * N + j)) |
paul@159 | 624 | </pre> |
paul@159 | 625 | |
paul@159 | 626 | <p>This function accepts a channel <code>ch</code> together with an argument |
paul@159 | 627 | <code>i</code> corresponding to an entire row of the input array, as opposed |
paul@159 | 628 | to having two arguments (<code>i</code> and <code>j</code>) corresponding to a |
paul@159 | 629 | single cell in the input array. In this function, a series of calculations are |
paul@159 | 630 | performed and a number of values are returned through the channel, without the |
paul@159 | 631 | function terminating until all values have been returned for the row data.</p> |
paul@159 | 632 | |
paul@159 | 633 | <p>To use this modified function, a modified version of the |
paul@159 | 634 | <a href="#simple_managed_queue">simple_managed_queue</a> program is used:</p> |
paul@159 | 635 | |
paul@159 | 636 | <pre> |
paul@159 | 637 | t = time.time() |
paul@159 | 638 | |
paul@159 | 639 | # Initialise the communications queue with a limit on the number of |
paul@159 | 640 | # channels/processes. |
paul@159 | 641 | |
paul@159 | 642 | queue = pprocess.Queue(limit=limit<strong>, continuous=1</strong>) |
paul@159 | 643 | |
paul@159 | 644 | # Initialise an array. |
paul@159 | 645 | |
paul@159 | 646 | results = [0] * N * N |
paul@159 | 647 | |
paul@159 | 648 | # Manage the calculate function. |
paul@159 | 649 | |
paul@159 | 650 | calc = queue.manage(<strong>calculate</strong>) |
paul@159 | 651 | |
paul@159 | 652 | # Perform the work. |
paul@159 | 653 | |
paul@159 | 654 | print "Calculating..." |
paul@159 | 655 | for i in range(0, N): |
paul@159 | 656 | <strong>calc(i)</strong> |
paul@159 | 657 | |
paul@159 | 658 | # Store the results as they arrive. |
paul@159 | 659 | |
paul@159 | 660 | print "Finishing..." |
paul@159 | 661 | for i, j, result in queue: |
paul@159 | 662 | results[i*N+j] = result |
paul@159 | 663 | |
paul@159 | 664 | # Show the results. |
paul@159 | 665 | |
paul@159 | 666 | for i in range(0, N): |
paul@159 | 667 | for result in results[i*N:i*N+N]: |
paul@159 | 668 | print result, |
paul@159 | 669 | print |
paul@159 | 670 | |
paul@159 | 671 | print "Time taken:", time.time() - t |
paul@159 | 672 | </pre> |
paul@159 | 673 | |
paul@159 | 674 | <p>(This code in context with <code>import</code> statements and functions is |
paul@159 | 675 | found in the <code>examples/simple_continuous_queue.py</code> file.)</p> |
paul@159 | 676 | |
paul@165 | 677 | <p>Although the inner loop in the work section has been relocated to the |
paul@159 | 678 | <code>calculate</code> function, the queue still receives outputs from that |
paul@159 | 679 | function with positional information and a result for the result array. Thus, |
paul@159 | 680 | no change is needed for the retrieval of the results: they arrive in the queue |
paul@159 | 681 | as before.</p> |
paul@159 | 682 | |
paulb@145 | 683 | <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> |
paulb@145 | 684 | |
paulb@145 | 685 | <p>Occasionally, it is desirable to initiate time-consuming computations and to |
paulb@145 | 686 | not only leave such processes running in the background, but to be able to detach |
paulb@145 | 687 | the creating process from them completely, potentially terminating the creating |
paulb@145 | 688 | process altogether, and yet also be able to collect the results of the created |
paulb@145 | 689 | processes at a later time, potentially in another completely different process. |
paulb@145 | 690 | For such situations, we can make use of the <code>BackgroundCallable</code> |
paulb@145 | 691 | class, which converts a parallel-aware callable into a callable which will run |
paulb@145 | 692 | in a background process when invoked.</p> |
paulb@145 | 693 | |
paulb@145 | 694 | <p>Consider this excerpt from a modified version of the <a |
paulb@145 | 695 | href="#simple_managed_queue">simple_managed_queue</a> program:</p> |
paulb@145 | 696 | |
paulb@145 | 697 | <pre> |
paulb@145 | 698 | <strong>def task():</strong> |
paulb@145 | 699 | |
paulb@145 | 700 | # Initialise the communications queue with a limit on the number of |
paulb@145 | 701 | # channels/processes. |
paulb@145 | 702 | |
paulb@145 | 703 | queue = pprocess.Queue(limit=limit) |
paulb@145 | 704 | |
paulb@145 | 705 | # Initialise an array. |
paulb@145 | 706 | |
paulb@145 | 707 | results = [0] * N * N |
paulb@145 | 708 | |
paulb@145 | 709 | # Wrap the calculate function and manage it. |
paulb@145 | 710 | |
paulb@145 | 711 | calc = queue.manage(pprocess.MakeParallel(calculate)) |
paulb@145 | 712 | |
paulb@145 | 713 | # Perform the work. |
paulb@145 | 714 | |
paulb@145 | 715 | print "Calculating..." |
paulb@145 | 716 | for i in range(0, N): |
paulb@145 | 717 | for j in range(0, N): |
paulb@145 | 718 | calc(i, j) |
paulb@145 | 719 | |
paulb@145 | 720 | # Store the results as they arrive. |
paulb@145 | 721 | |
paulb@145 | 722 | print "Finishing..." |
paulb@145 | 723 | for i, j, result in queue: |
paulb@145 | 724 | results[i*N+j] = result |
paulb@145 | 725 | |
paulb@145 | 726 | <strong>return results</strong> |
paulb@145 | 727 | </pre> |
paulb@145 | 728 | |
paulb@145 | 729 | <p>Here, we have converted the main program into a function, and instead of |
paulb@145 | 730 | printing out the results, we return the results list from the function.</p> |
paulb@145 | 731 | |
paulb@145 | 732 | <p>Now, let us consider the new main program (with the relevant mechanisms |
paulb@145 | 733 | highlighted):</p> |
paulb@145 | 734 | |
paulb@145 | 735 | <pre> |
paulb@145 | 736 | t = time.time() |
paulb@145 | 737 | |
paulb@145 | 738 | if "--reconnect" not in sys.argv: |
paulb@145 | 739 | |
paulb@145 | 740 | # Wrap the computation and manage it. |
paulb@145 | 741 | |
paulb@145 | 742 | <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> |
paulb@145 | 743 | |
paulb@145 | 744 | # Perform the work. |
paulb@145 | 745 | |
paulb@145 | 746 | ptask() |
paulb@145 | 747 | |
paulb@145 | 748 | # Discard the callable. |
paulb@145 | 749 | |
paulb@145 | 750 | del ptask |
paulb@145 | 751 | print "Discarded the callable." |
paulb@145 | 752 | |
paulb@145 | 753 | if "--start" not in sys.argv: |
paulb@145 | 754 | |
paulb@145 | 755 | # Open a queue and reconnect to the task. |
paulb@145 | 756 | |
paulb@145 | 757 | print "Opening a queue." |
paulb@145 | 758 | <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> |
paulb@145 | 759 | |
paulb@145 | 760 | # Wait for the results. |
paulb@145 | 761 | |
paulb@145 | 762 | print "Waiting for persistent results" |
paulb@145 | 763 | for results in queue: |
paulb@145 | 764 | pass # should only be one element |
paulb@145 | 765 | |
paulb@145 | 766 | # Show the results. |
paulb@145 | 767 | |
paulb@145 | 768 | for i in range(0, N): |
paulb@145 | 769 | for result in results[i*N:i*N+N]: |
paulb@145 | 770 | print result, |
paulb@145 | 771 | print |
paulb@145 | 772 | |
paulb@145 | 773 | print "Time taken:", time.time() - t |
paulb@145 | 774 | </pre> |
paulb@145 | 775 | |
paulb@145 | 776 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@145 | 777 | found in the <code>examples/simple_background_queue.py</code> file.)</p> |
paulb@145 | 778 | |
paulb@145 | 779 | <p>This new main program has two parts: the part which initiates the |
paulb@145 | 780 | computation, and the part which connects to the computation in order to collect |
paulb@145 | 781 | the results. Both parts can be run in the same process, and this should result |
paulb@145 | 782 | in similar behaviour to that of the original |
paulb@145 | 783 | <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> |
paulb@145 | 784 | |
paulb@145 | 785 | <p>In the above program, however, we are free to specify <code>--start</code> as |
paulb@145 | 786 | an option when running the program, and the result of this is merely to initiate |
paulb@145 | 787 | the computation in a background process, using <code>BackgroundCallable</code> |
paulb@145 | 788 | to obtain a callable which, when invoked, creates the background process and |
paulb@145 | 789 | runs the computation. After doing this, the program will then exit, but it will |
paulb@145 | 790 | leave the computation running as a collection of background processes, and a |
paulb@145 | 791 | special file called <code>task.socket</code> will exist in the current working |
paulb@145 | 792 | directory.</p> |
paulb@145 | 793 | |
paulb@145 | 794 | <p>When the above program is run using the <code>--reconnect</code> option, an |
paulb@145 | 795 | attempt will be made to reconnect to the background processes already created by |
paulb@145 | 796 | attempting to contact them using the previously created <code>task.socket</code> |
paulb@145 | 797 | special file (which is, in fact, a UNIX-domain socket); this being done using |
paulb@145 | 798 | the <code>BackgroundQueue</code> function which will handle the incoming results |
paulb@145 | 799 | in a fashion similar to that of a <code>Queue</code> object. Since only one |
paulb@145 | 800 | result is returned by the computation (as defined by the <code>return</code> |
paulb@145 | 801 | statement in the <code>task</code> function), we need only expect one element to |
paulb@145 | 802 | be collected by the queue: a list containing all of the results produced in the |
paulb@145 | 803 | computation.</p> |
paulb@145 | 804 | |
paulb@145 | 805 | <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> |
paulb@145 | 806 | |
paulb@145 | 807 | <p>In the above example, a single background process was used to manage a number |
paulb@145 | 808 | of other processes, with all of them running in the background. However, it can |
paulb@145 | 809 | be desirable to manage more than one background process.</p> |
paulb@145 | 810 | |
paulb@145 | 811 | <p>Consider this excerpt from a modified version of the <a |
paulb@145 | 812 | href="#simple_managed_queue">simple_managed_queue</a> program:</p> |
paulb@145 | 813 | |
paulb@145 | 814 | <pre> |
paulb@145 | 815 | <strong>def task(i):</strong> |
paulb@145 | 816 | |
paulb@145 | 817 | # Initialise the communications queue with a limit on the number of |
paulb@145 | 818 | # channels/processes. |
paulb@145 | 819 | |
paulb@145 | 820 | queue = pprocess.Queue(limit=limit) |
paulb@145 | 821 | |
paulb@145 | 822 | # Initialise an array. |
paulb@145 | 823 | |
paulb@145 | 824 | results = [0] * N |
paulb@145 | 825 | |
paulb@145 | 826 | # Wrap the calculate function and manage it. |
paulb@145 | 827 | |
paulb@145 | 828 | calc = queue.manage(pprocess.MakeParallel(calculate)) |
paulb@145 | 829 | |
paulb@145 | 830 | # Perform the work. |
paulb@145 | 831 | |
paulb@145 | 832 | print "Calculating..." |
paulb@145 | 833 | <strong>for j in range(0, N):</strong> |
paulb@145 | 834 | <strong>calc(i, j)</strong> |
paulb@145 | 835 | |
paulb@145 | 836 | # Store the results as they arrive. |
paulb@145 | 837 | |
paulb@145 | 838 | print "Finishing..." |
paulb@145 | 839 | <strong>for i, j, result in queue:</strong> |
paulb@145 | 840 | <strong>results[j] = result</strong> |
paulb@145 | 841 | |
paulb@145 | 842 | <strong>return i, results</strong> |
paulb@145 | 843 | </pre> |
paulb@145 | 844 | |
paulb@145 | 845 | <p>Just as we see in the previous example, a function called <code>task</code> |
paulb@145 | 846 | has been defined to hold a background computation, and this function returns a |
paulb@145 | 847 | portion of the results. However, unlike the previous example or the original |
paulb@145 | 848 | example, the scope of the results of the computation collected in the function |
paulb@145 | 849 | have been changed: here, only results for calculations involving a certain value |
paulb@145 | 850 | of <code>i</code> are collected, with the particular value of <code>i</code> |
paulb@145 | 851 | returned along with the appropriate portion of the results.</p> |
paulb@145 | 852 | |
paulb@145 | 853 | <p>Now, let us consider the new main program (with the relevant mechanisms |
paulb@145 | 854 | highlighted):</p> |
paulb@145 | 855 | |
paulb@145 | 856 | <pre> |
paulb@145 | 857 | t = time.time() |
paulb@145 | 858 | |
paulb@145 | 859 | if "--reconnect" not in sys.argv: |
paulb@145 | 860 | |
paulb@145 | 861 | # Wrap the computation and manage it. |
paulb@145 | 862 | |
paulb@145 | 863 | <strong>ptask = pprocess.MakeParallel(task)</strong> |
paulb@145 | 864 | |
paulb@145 | 865 | <strong>for i in range(0, N):</strong> |
paulb@145 | 866 | |
paulb@145 | 867 | # Make a distinct callable for each part of the computation. |
paulb@145 | 868 | |
paulb@145 | 869 | <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> |
paulb@145 | 870 | |
paulb@145 | 871 | # Perform the work. |
paulb@145 | 872 | |
paulb@145 | 873 | <strong>ptask_i(i)</strong> |
paulb@145 | 874 | |
paulb@145 | 875 | # Discard the callable. |
paulb@145 | 876 | |
paulb@145 | 877 | del ptask |
paulb@145 | 878 | print "Discarded the callable." |
paulb@145 | 879 | |
paulb@145 | 880 | if "--start" not in sys.argv: |
paulb@145 | 881 | |
paulb@145 | 882 | # Open a queue and reconnect to the task. |
paulb@145 | 883 | |
paulb@145 | 884 | print "Opening a queue." |
paulb@145 | 885 | <strong>queue = pprocess.PersistentQueue()</strong> |
paulb@145 | 886 | <strong>for i in range(0, N):</strong> |
paulb@145 | 887 | <strong>queue.connect("task-%d.socket" % i)</strong> |
paulb@145 | 888 | |
paulb@145 | 889 | # Initialise an array. |
paulb@145 | 890 | |
paulb@145 | 891 | <strong>results = [0] * N</strong> |
paulb@145 | 892 | |
paulb@145 | 893 | # Wait for the results. |
paulb@145 | 894 | |
paulb@145 | 895 | print "Waiting for persistent results" |
paulb@145 | 896 | <strong>for i, result in queue:</strong> |
paulb@145 | 897 | <strong>results[i] = result</strong> |
paulb@145 | 898 | |
paulb@145 | 899 | # Show the results. |
paulb@145 | 900 | |
paulb@145 | 901 | for i in range(0, N): |
paulb@145 | 902 | <strong>for result in results[i]:</strong> |
paulb@145 | 903 | print result, |
paulb@145 | 904 | print |
paulb@145 | 905 | |
paulb@145 | 906 | print "Time taken:", time.time() - t |
paulb@145 | 907 | </pre> |
paulb@145 | 908 | |
paulb@145 | 909 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@145 | 910 | found in the <code>examples/simple_persistent_queue.py</code> file.)</p> |
paulb@145 | 911 | |
paulb@145 | 912 | <p>In the first section, the process of making a parallel-aware callable is as |
paulb@145 | 913 | expected, but instead of then invoking a single background version of such a |
paulb@145 | 914 | callable, as in the previous example, we create a version for each value of |
paulb@145 | 915 | <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. |
paulb@145 | 916 | The result of this is a total of <code>N</code> background processes, each |
paulb@145 | 917 | running an invocation of the <code>task</code> function with a distinct value of |
paulb@145 | 918 | <code>i</code> (which in turn perform computations), and each employing a |
paulb@145 | 919 | UNIX-domain socket for communication with a name of the form |
paulb@145 | 920 | <code>task-<em>i</em>.socket</code>.</p> |
paulb@145 | 921 | |
paulb@145 | 922 | <p>In the second section, since we now have more than one background process, we |
paulb@145 | 923 | must find a way to monitor them after reconnecting to them; to achieve this, a |
paulb@145 | 924 | <code>PersistentQueue</code> is created, which acts like a regular |
paulb@145 | 925 | <code>Queue</code> object but is instead focused on handling persistent |
paulb@145 | 926 | communications. Upon connecting the queue to each of the previously created |
paulb@145 | 927 | UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and |
paulb@145 | 928 | exposes received results through an iterator. Here, the principal difference |
paulb@145 | 929 | from previous examples is the structure of results: instead of collecting each |
paulb@145 | 930 | individual value in a flat <code>i</code> by <code>j</code> array, a list is |
paulb@145 | 931 | returned for each value of <code>i</code> and is stored directly in another |
paulb@145 | 932 | list.</p> |
paulb@145 | 933 | |
paulb@145 | 934 | <h3>Applications of Background Computations</h3> |
paulb@145 | 935 | |
paulb@145 | 936 | <p>Background computations are useful because they provide flexibility in the |
paulb@145 | 937 | way the results can be collected. One area in which they can be useful is Web |
paulb@145 | 938 | programming, where a process handling an incoming HTTP request may need to |
paulb@145 | 939 | initiate a computation but then immediately send output to the Web client - such |
paulb@145 | 940 | as a page indicating that the computation is "in progress" - without having to |
paulb@145 | 941 | wait for the computation or to allocate resources to monitor it. Moreover, in |
paulb@145 | 942 | some Web architectures, notably those employing the Common Gateway Interface |
paulb@145 | 943 | (CGI), it is necessary for a process handling an incoming request to terminate |
paulb@145 | 944 | before its output will be sent to clients. By using a |
paulb@145 | 945 | <code>BackgroundCallable</code>, a Web server process can initiate a |
paulb@145 | 946 | communication, and then subsequent server processes can be used to reconnect to |
paulb@145 | 947 | the background computation and to wait efficiently for results.</p> |
paulb@145 | 948 | |
paulb@145 | 949 | <h2 id="Summary">Summary</h2> |
paulb@124 | 950 | |
paulb@124 | 951 | <p>The following table indicates the features used in converting one |
paulb@124 | 952 | sequential example program to another parallel program:</p> |
paulb@124 | 953 | |
paulb@124 | 954 | <table border="1" cellspacing="0" cellpadding="5"> |
paulb@124 | 955 | <thead> |
paulb@124 | 956 | <tr> |
paulb@124 | 957 | <th>Sequential Example</th> |
paulb@124 | 958 | <th>Parallel Example</th> |
paulb@124 | 959 | <th>Features Used</th> |
paulb@124 | 960 | </tr> |
paulb@124 | 961 | </thead> |
paulb@124 | 962 | <tbody> |
paulb@124 | 963 | <tr> |
paulb@124 | 964 | <td>simple_map</td> |
paulb@124 | 965 | <td>simple_pmap</td> |
paulb@124 | 966 | <td>pmap</td> |
paulb@124 | 967 | </tr> |
paulb@124 | 968 | <tr> |
paulb@124 | 969 | <td>simple1</td> |
paulb@124 | 970 | <td>simple_managed_map</td> |
paulb@124 | 971 | <td>MakeParallel, Map, manage</td> |
paulb@124 | 972 | </tr> |
paulb@124 | 973 | <tr> |
paul@159 | 974 | <td rowspan="6">simple2</td> |
paulb@124 | 975 | <td>simple_managed_queue</td> |
paulb@124 | 976 | <td>MakeParallel, Queue, manage</td> |
paulb@124 | 977 | </tr> |
paulb@124 | 978 | <tr> |
paul@159 | 979 | <td>simple_continuous_queue</td> |
paul@159 | 980 | <td>Queue, manage (continuous)</td> |
paul@159 | 981 | </tr> |
paul@159 | 982 | <tr> |
paulb@124 | 983 | <td>simple_managed</td> |
paulb@124 | 984 | <td>MakeParallel, Exchange (subclass), manage, finish</td> |
paulb@124 | 985 | </tr> |
paulb@124 | 986 | <tr> |
paulb@124 | 987 | <td>simple_start</td> |
paulb@124 | 988 | <td>Channel, Exchange (subclass), start, finish</td> |
paulb@124 | 989 | </tr> |
paulb@124 | 990 | <tr> |
paulb@145 | 991 | <td>simple_background_queue</td> |
paulb@145 | 992 | <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> |
paulb@145 | 993 | </tr> |
paulb@145 | 994 | <tr> |
paulb@145 | 995 | <td>simple_persistent_queue</td> |
paulb@145 | 996 | <td>MakeParallel, BackgroundCallable, PersistentQueue</td> |
paulb@145 | 997 | </tr> |
paulb@145 | 998 | <tr> |
paulb@124 | 999 | <td>simple</td> |
paulb@124 | 1000 | <td>simple_create_map</td> |
paulb@124 | 1001 | <td>Channel, Map, create, exit</td> |
paulb@124 | 1002 | </tr> |
paulb@124 | 1003 | </tbody> |
paulb@124 | 1004 | </table> |
paulb@124 | 1005 | |
paulb@124 | 1006 | </body> |
paulb@124 | 1007 | </html> |