paulb@124 | 1 | <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
paulb@124 | 2 | <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> |
paulb@124 | 3 | <head> |
paulb@124 | 4 | <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> |
paulb@124 | 5 | <title>pprocess - Tutorial</title> |
paulb@124 | 6 | <link href="styles.css" rel="stylesheet" type="text/css" /> |
paulb@124 | 7 | </head> |
paulb@124 | 8 | <body> |
paulb@124 | 9 | |
paulb@124 | 10 | <h1>pprocess - Tutorial</h1> |
paulb@124 | 11 | |
paulb@124 | 12 | <p>The <code>pprocess</code> module provides several mechanisms for running |
paulb@124 | 13 | Python code concurrently in several processes. The most straightforward way of |
paulb@124 | 14 | making a program parallel-aware - that is, where the program can take |
paulb@124 | 15 | advantage of more than one processor to simultaneously process data - is to |
paulb@124 | 16 | use the <code>pmap</code> function.</p> |
paulb@124 | 17 | |
paulb@145 | 18 | <ul> |
paul@170 | 19 | <li><a href="#note">A Note on Parallel Processes</a></li> |
paulb@145 | 20 | <li><a href="#pmap">Converting Map-Style Code</a></li> |
paulb@145 | 21 | <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> |
paul@159 | 22 | <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a> |
paul@159 | 23 | <ul> |
paul@159 | 24 | <li><a href="#Exchange">Replacing Queues with Exchanges</li></a> |
paul@159 | 25 | <li><a href="#channel">Using Channels in Callables</li></a> |
paul@159 | 26 | </ul> |
paul@159 | 27 | </li> |
paulb@145 | 28 | <li><a href="#create">Converting Inline Computations</a></li> |
paulb@145 | 29 | <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> |
paul@158 | 30 | <li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li> |
paulb@145 | 31 | <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> |
paulb@145 | 32 | <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> |
paulb@145 | 33 | <li><a href="#Summary">Summary</a></li> |
paulb@145 | 34 | </ul> |
paulb@145 | 35 | |
paulb@149 | 36 | <p>For a brief summary of each of the features of <code>pprocess</code>, see |
paulb@149 | 37 | the <a href="reference.html">reference document</a>.</p> |
paulb@149 | 38 | |
paul@170 | 39 | <h2 id="note">A Note on Parallel Processes</h2> |
paul@170 | 40 | |
paul@170 | 41 | <p>The way <code>pprocess</code> uses multiple processes to perform work in |
paul@170 | 42 | parallel involves the <code>fork</code> system call, which on modern operating |
paul@170 | 43 | systems involves what is known as "copy-on-write" semantics. In plain language, |
paul@170 | 44 | when <code>pprocess</code> creates a new <em>child</em> process to perform work |
paul@170 | 45 | in parallel with other work that needs to be done, this new process will be a |
paul@170 | 46 | near-identical copy of the original <em>parent</em> process, and the running |
paul@170 | 47 | code will be able to access data resident in that parent process.</p> |
paul@170 | 48 | |
paul@170 | 49 | <p>However, when a child process modifies data, instead of changing that data |
paul@170 | 50 | in such a way that the parent process can see the modifications, the parent |
paul@170 | 51 | process will, in fact, remain oblivious to such changes. What happens is that |
paul@170 | 52 | as soon as the child process attempts to modify the data, it obtains its own |
paul@170 | 53 | separate copy which is then modified independently of the original data. Thus, |
paul@170 | 54 | a <em>copy</em> of any data is made when an attempt is made to <em>write</em> |
paul@170 | 55 | to such data. Meanwhile, the parent's copy of that data will be left untouched |
paul@170 | 56 | by the activities of the child.</p> |
paul@170 | 57 | |
paul@170 | 58 | <p>It is therefore essential to note that any data distributed to other |
paul@170 | 59 | processes, and which will then be modified by those processes, will not appear |
paul@170 | 60 | to change in the parent process even if the objects employed are mutable. This |
paul@170 | 61 | is rather different to the behaviour of a normal Python program: passing a |
paul@170 | 62 | list to a function, for example, mutates that list in such a way that upon |
paul@170 | 63 | returning from that function the modifications will still be present. For |
paul@170 | 64 | example:</p> |
paul@170 | 65 | |
paul@170 | 66 | <pre> |
paul@170 | 67 | def mutator(l): |
paul@170 | 68 | l.append(3) |
paul@170 | 69 | |
paul@170 | 70 | l = [1, 2] |
paul@170 | 71 | mutator(l) # l is now [1, 2, 3] |
paul@170 | 72 | </pre> |
paul@170 | 73 | |
paul@170 | 74 | <p>In contrast, passing a list to a child process will cause the list to |
paul@170 | 75 | mutate in the child process, but the parent process will not see the list |
paul@170 | 76 | change. For example:</p> |
paul@170 | 77 | |
paul@170 | 78 | <pre> |
paul@170 | 79 | def mutator(l): |
paul@170 | 80 | l.append(3) |
paul@170 | 81 | |
paul@170 | 82 | results = pprocess.Map() |
paul@170 | 83 | mutator = results.manage(pprocess.MakeParallel(mutator)) |
paul@170 | 84 | |
paul@170 | 85 | l = [1, 2] |
paul@170 | 86 | mutator(l) # l is now [1, 2] |
paul@170 | 87 | </pre> |
paul@170 | 88 | |
paul@170 | 89 | <p>To communicate changes to data between processes, the modified objects must |
paul@170 | 90 | be explicitly returned from child processes using the mechanisms described in |
paul@170 | 91 | this documentation. For example:</p> |
paul@170 | 92 | |
paul@170 | 93 | <pre> |
paul@170 | 94 | def mutator(l): |
paul@170 | 95 | l.append(3) |
paul@170 | 96 | return l # the modified object is explicitly returned |
paul@170 | 97 | |
paul@170 | 98 | results = pprocess.Map() |
paul@170 | 99 | mutator = results.manage(pprocess.MakeParallel(mutator)) |
paul@170 | 100 | |
paul@170 | 101 | l = [1, 2] |
paul@170 | 102 | mutator(l) |
paul@170 | 103 | |
paul@170 | 104 | all_l = results[:] # there are potentially many results, not just one |
paul@170 | 105 | l = all_l[0] # l is now [1, 2, 3], taken from the first result |
paul@170 | 106 | </pre> |
paul@170 | 107 | |
paul@170 | 108 | <p>It is perhaps easiest to think of the communications mechanisms as |
paul@170 | 109 | providing a gateway between processes through which information can be passed, |
paul@170 | 110 | with the rest of a program's data being private and hidden from the other |
paul@170 | 111 | processes (even if that data initially resembles what the other processes also |
paul@170 | 112 | see within themselves).</p> |
paul@170 | 113 | |
paulb@145 | 114 | <h2 id="pmap">Converting Map-Style Code</h2> |
paulb@124 | 115 | |
paulb@124 | 116 | <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> |
paulb@124 | 117 | |
paulb@124 | 118 | <pre> |
paulb@124 | 119 | t = time.time() |
paulb@124 | 120 | |
paulb@124 | 121 | # Initialise an array. |
paulb@124 | 122 | |
paulb@124 | 123 | sequence = [] |
paulb@124 | 124 | for i in range(0, N): |
paulb@124 | 125 | for j in range(0, N): |
paulb@124 | 126 | sequence.append((i, j)) |
paulb@124 | 127 | |
paulb@124 | 128 | # Perform the work. |
paulb@124 | 129 | |
paulb@124 | 130 | results = map(calculate, sequence) |
paulb@124 | 131 | |
paulb@124 | 132 | # Show the results. |
paulb@124 | 133 | |
paulb@124 | 134 | for i in range(0, N): |
paulb@124 | 135 | for result in results[i*N:i*N+N]: |
paulb@124 | 136 | print result, |
paulb@124 | 137 | print |
paulb@124 | 138 | |
paulb@124 | 139 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 140 | |
paulb@124 | 141 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 142 | found in the <code>examples/simple_map.py</code> file.)</p> |
paulb@124 | 143 | |
paulb@124 | 144 | <p>The principal features of this program involve the preparation of an array |
paulb@124 | 145 | for input purposes, and the use of the <code>map</code> function to iterate |
paulb@124 | 146 | over the combinations of <code>i</code> and <code>j</code> in the array. Even |
paulb@124 | 147 | if the <code>calculate</code> function could be invoked independently for each |
paulb@124 | 148 | input value, we have to wait for each computation to complete before |
paulb@124 | 149 | initiating a new one. The <code>calculate</code> function may be defined as |
paulb@124 | 150 | follows:</p> |
paulb@124 | 151 | |
paulb@124 | 152 | <pre> |
paulb@124 | 153 | def calculate(t): |
paulb@124 | 154 | |
paulb@124 | 155 | "A supposedly time-consuming calculation on 't'." |
paulb@124 | 156 | |
paulb@124 | 157 | i, j = t |
paulb@124 | 158 | time.sleep(delay) |
paulb@124 | 159 | return i * N + j |
paulb@124 | 160 | </pre> |
paulb@124 | 161 | |
paulb@124 | 162 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@124 | 163 | words - we can make this code use several processes instead of just one. Here |
paulb@124 | 164 | is the modified code:</p> |
paulb@124 | 165 | |
paulb@124 | 166 | <pre> |
paulb@124 | 167 | t = time.time() |
paulb@124 | 168 | |
paulb@124 | 169 | # Initialise an array. |
paulb@124 | 170 | |
paulb@124 | 171 | sequence = [] |
paulb@124 | 172 | for i in range(0, N): |
paulb@124 | 173 | for j in range(0, N): |
paulb@124 | 174 | sequence.append((i, j)) |
paulb@124 | 175 | |
paulb@124 | 176 | # Perform the work. |
paulb@124 | 177 | |
paulb@124 | 178 | results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) |
paulb@124 | 179 | |
paulb@124 | 180 | # Show the results. |
paulb@124 | 181 | |
paulb@124 | 182 | for i in range(0, N): |
paulb@124 | 183 | for result in results[i*N:i*N+N]: |
paulb@124 | 184 | print result, |
paulb@124 | 185 | print |
paulb@124 | 186 | |
paulb@124 | 187 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 188 | |
paulb@124 | 189 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 190 | found in the <code>examples/simple_pmap.py</code> file.)</p> |
paulb@124 | 191 | |
paulb@124 | 192 | <p>By replacing usage of the <code>map</code> function with the |
paulb@124 | 193 | <code>pprocess.pmap</code> function, and specifying the limit on the number of |
paulb@124 | 194 | processes to be active at any given time (the value of the <code>limit</code> |
paulb@124 | 195 | variable is defined elsewhere), several calculations can now be performed in |
paulb@124 | 196 | parallel.</p> |
paulb@124 | 197 | |
paulb@145 | 198 | <h2 id="Map">Converting Invocations to Parallel Operations</h2> |
paulb@124 | 199 | |
paulb@124 | 200 | <p>Although some programs make natural use of the <code>map</code> function, |
paulb@124 | 201 | others may employ an invocation in a nested loop. This may also be converted |
paulb@124 | 202 | to a parallel program. Consider the following Python code:</p> |
paulb@124 | 203 | |
paulb@124 | 204 | <pre> |
paulb@124 | 205 | t = time.time() |
paulb@124 | 206 | |
paulb@124 | 207 | # Initialise an array. |
paulb@124 | 208 | |
paulb@124 | 209 | results = [] |
paulb@124 | 210 | |
paulb@124 | 211 | # Perform the work. |
paulb@124 | 212 | |
paulb@124 | 213 | print "Calculating..." |
paulb@124 | 214 | for i in range(0, N): |
paulb@124 | 215 | for j in range(0, N): |
paulb@124 | 216 | results.append(calculate(i, j)) |
paulb@124 | 217 | |
paulb@124 | 218 | # Show the results. |
paulb@124 | 219 | |
paulb@124 | 220 | for i in range(0, N): |
paulb@124 | 221 | for result in results[i*N:i*N+N]: |
paulb@124 | 222 | print result, |
paulb@124 | 223 | print |
paulb@124 | 224 | |
paulb@124 | 225 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 226 | |
paulb@124 | 227 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 228 | found in the <code>examples/simple1.py</code> file.)</p> |
paulb@124 | 229 | |
paulb@124 | 230 | <p>Here, a computation in the <code>calculate</code> function is performed for |
paulb@124 | 231 | each combination of <code>i</code> and <code>j</code> in the nested loop, |
paulb@124 | 232 | returning a result value. However, we must wait for the completion of this |
paulb@124 | 233 | function for each element before moving on to the next element, and this means |
paulb@124 | 234 | that the computations are performed sequentially. Consequently, on a system |
paulb@124 | 235 | with more than one processor, even if we could call <code>calculate</code> for |
paulb@124 | 236 | more than one combination of <code>i</code> and <code>j</code><code></code> |
paulb@124 | 237 | and have the computations executing at the same time, the above program will |
paulb@124 | 238 | not take advantage of such capabilities.</p> |
paulb@124 | 239 | |
paulb@124 | 240 | <p>We use a slightly modified version of <code>calculate</code> which employs |
paulb@124 | 241 | two parameters instead of one:</p> |
paulb@124 | 242 | |
paulb@124 | 243 | <pre> |
paulb@124 | 244 | def calculate(i, j): |
paulb@124 | 245 | |
paulb@124 | 246 | """ |
paulb@124 | 247 | A supposedly time-consuming calculation on 'i' and 'j'. |
paulb@124 | 248 | """ |
paulb@124 | 249 | |
paulb@124 | 250 | time.sleep(delay) |
paulb@124 | 251 | return i * N + j |
paulb@124 | 252 | </pre> |
paulb@124 | 253 | |
paulb@124 | 254 | <p>In order to reduce the processing time - to speed the code up, in other |
paulb@124 | 255 | words - we can make this code use several processes instead of just one. Here |
paulb@124 | 256 | is the modified code:</p> |
paulb@124 | 257 | |
paulb@124 | 258 | <pre id="simple_managed_map"> |
paulb@124 | 259 | t = time.time() |
paulb@124 | 260 | |
paulb@124 | 261 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 262 | # channels/processes. |
paulb@124 | 263 | |
paulb@124 | 264 | <strong>results = pprocess.Map(limit=limit)</strong><code></code> |
paulb@124 | 265 | |
paulb@124 | 266 | # Wrap the calculate function and manage it. |
paulb@124 | 267 | |
paulb@124 | 268 | <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@124 | 269 | |
paulb@124 | 270 | # Perform the work. |
paulb@124 | 271 | |
paulb@124 | 272 | print "Calculating..." |
paulb@124 | 273 | for i in range(0, N): |
paulb@124 | 274 | for j in range(0, N): |
paulb@124 | 275 | <strong>calc</strong>(i, j) |
paulb@124 | 276 | |
paulb@124 | 277 | # Show the results. |
paulb@124 | 278 | |
paulb@124 | 279 | for i in range(0, N): |
paulb@124 | 280 | for result in results[i*N:i*N+N]: |
paulb@124 | 281 | print result, |
paulb@124 | 282 | print |
paulb@124 | 283 | |
paulb@124 | 284 | print "Time taken:", time.time() - t</pre> |
paulb@124 | 285 | |
paulb@124 | 286 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 287 | found in the <code>examples/simple_managed_map.py</code> file.)</p> |
paulb@124 | 288 | |
paulb@124 | 289 | <p>The principal changes in the above code involve the use of a |
paulb@124 | 290 | <code>pprocess.Map</code> object to collect the results, and a version of the |
paulb@124 | 291 | <code>calculate</code> function which is managed by the <code>Map</code> |
paulb@124 | 292 | object. What the <code>Map</code> object does is to arrange the results of |
paulb@124 | 293 | computations such that iterating over the object or accessing the object using |
paulb@124 | 294 | list operations provides the results in the same order as their corresponding |
paulb@124 | 295 | inputs.</p> |
paulb@124 | 296 | |
paulb@145 | 297 | <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> |
paulb@124 | 298 | |
paulb@124 | 299 | <p>In some programs, it is not important to receive the results of |
paulb@124 | 300 | computations in any particular order, usually because either the order of |
paulb@124 | 301 | these results is irrelevant, or because the results provide "positional" |
paulb@124 | 302 | information which let them be handled in an appropriate way. Consider the |
paulb@124 | 303 | following Python code:</p> |
paulb@124 | 304 | |
paulb@124 | 305 | <pre> |
paulb@124 | 306 | t = time.time() |
paulb@124 | 307 | |
paulb@124 | 308 | # Initialise an array. |
paulb@124 | 309 | |
paulb@124 | 310 | results = [0] * N * N |
paulb@124 | 311 | |
paulb@124 | 312 | # Perform the work. |
paulb@124 | 313 | |
paulb@124 | 314 | print "Calculating..." |
paulb@124 | 315 | for i in range(0, N): |
paulb@124 | 316 | for j in range(0, N): |
paulb@124 | 317 | i2, j2, result = calculate(i, j) |
paulb@124 | 318 | results[i2*N+j2] = result |
paulb@124 | 319 | |
paulb@124 | 320 | # Show the results. |
paulb@124 | 321 | |
paulb@124 | 322 | for i in range(0, N): |
paulb@124 | 323 | for result in results[i*N:i*N+N]: |
paulb@124 | 324 | print result, |
paulb@124 | 325 | print |
paulb@124 | 326 | |
paulb@124 | 327 | print "Time taken:", time.time() - t |
paulb@124 | 328 | </pre> |
paulb@124 | 329 | |
paulb@124 | 330 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 331 | found in the <code>examples/simple2.py</code> file.)</p> |
paulb@124 | 332 | |
paulb@124 | 333 | <p>Here, a result array is initialised first and each computation is performed |
paulb@124 | 334 | sequentially. A significant difference to the previous examples is the return |
paulb@124 | 335 | value of the <code>calculate</code> function: the position details |
paulb@124 | 336 | corresponding to <code>i</code> and <code>j</code> are returned alongside the |
paulb@124 | 337 | result. Obviously, this is of limited value in the above code because the |
paulb@124 | 338 | order of the computations and the reception of results is fixed. However, we |
paulb@124 | 339 | get no benefit from parallelisation in the above example.</p> |
paulb@124 | 340 | |
paulb@124 | 341 | <p>We can bring the benefits of parallel processing to the above program with |
paulb@124 | 342 | the following code:</p> |
paulb@124 | 343 | |
paulb@145 | 344 | <pre id="simple_managed_queue"> |
paulb@124 | 345 | t = time.time() |
paulb@124 | 346 | |
paulb@124 | 347 | # Initialise the communications queue with a limit on the number of |
paulb@124 | 348 | # channels/processes. |
paulb@124 | 349 | |
paulb@124 | 350 | <strong>queue = pprocess.Queue(limit=limit)</strong> |
paulb@124 | 351 | |
paulb@124 | 352 | # Initialise an array. |
paulb@124 | 353 | |
paulb@124 | 354 | results = [0] * N * N |
paulb@124 | 355 | |
paulb@124 | 356 | # Wrap the calculate function and manage it. |
paulb@124 | 357 | |
paulb@124 | 358 | <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> |
paulb@124 | 359 | |
paulb@124 | 360 | # Perform the work. |
paulb@124 | 361 | |
paulb@124 | 362 | print "Calculating..." |
paulb@124 | 363 | for i in range(0, N): |
paulb@124 | 364 | for j in range(0, N): |
paulb@124 | 365 | <strong>calc(i, j)</strong> |
paulb@124 | 366 | |
paulb@124 | 367 | # Store the results as they arrive. |
paulb@124 | 368 | |
paulb@124 | 369 | print "Finishing..." |
paulb@124 | 370 | <strong>for i, j, result in queue:</strong> |
paulb@124 | 371 | <strong>results[i*N+j] = result</strong> |
paulb@124 | 372 | |
paulb@124 | 373 | # Show the results. |
paulb@124 | 374 | |
paulb@124 | 375 | for i in range(0, N): |
paulb@124 | 376 | for result in results[i*N:i*N+N]: |
paulb@124 | 377 | print result, |
paulb@124 | 378 | print |
paulb@124 | 379 | |
paulb@124 | 380 | print "Time taken:", time.time() - t |
paulb@124 | 381 | </pre> |
paulb@124 | 382 | |
paulb@124 | 383 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 384 | found in the <code>examples/simple_managed_queue.py</code> file.)</p> |
paulb@124 | 385 | |
paulb@124 | 386 | <p>This revised code employs a <code>pprocess.Queue</code> object whose |
paulb@124 | 387 | purpose is to collect the results of computations and to make them available |
paulb@124 | 388 | in the order in which they were received. The code collecting results has been |
paulb@124 | 389 | moved into a separate loop independent of the original computation loop and |
paulb@124 | 390 | taking advantage of the more relevant "positional" information emerging from |
paulb@124 | 391 | the queue.</p> |
paulb@124 | 392 | |
paul@159 | 393 | <h3 id="Exchange">Replacing Queues with Exchanges</h3> |
paul@159 | 394 | |
paulb@124 | 395 | <p>We can take this example further, illustrating some of the mechanisms |
paulb@124 | 396 | employed by <code>pprocess</code>. Instead of collecting results in a queue, |
paulb@124 | 397 | we can define a class containing a method which is called when new results |
paulb@124 | 398 | arrive:</p> |
paulb@124 | 399 | |
paulb@124 | 400 | <pre> |
paulb@124 | 401 | class MyExchange(pprocess.Exchange): |
paulb@124 | 402 | |
paulb@124 | 403 | "Parallel convenience class containing the array assignment operation." |
paulb@124 | 404 | |
paulb@124 | 405 | def store_data(self, ch): |
paulb@124 | 406 | i, j, result = ch.receive() |
paulb@124 | 407 | self.D[i*N+j] = result |
paulb@124 | 408 | </pre> |
paulb@124 | 409 | |
paulb@124 | 410 | <p>This code exposes the channel paradigm which is used throughout |
paulb@124 | 411 | <code>pprocess</code> and is available to applications, if desired. The effect |
paulb@124 | 412 | of the method is the storage of a result received through the channel in an |
paulb@124 | 413 | attribute of the object. The following code shows how this class can be used, |
paulb@124 | 414 | with differences to the previous program illustrated:</p> |
paulb@124 | 415 | |
paulb@124 | 416 | <pre> |
paulb@124 | 417 | t = time.time() |
paulb@124 | 418 | |
paulb@124 | 419 | # Initialise the communications exchange with a limit on the number of |
paulb@124 | 420 | # channels/processes. |
paulb@124 | 421 | |
paulb@124 | 422 | <strong>exchange = MyExchange(limit=limit)</strong> |
paulb@124 | 423 | |
paulb@124 | 424 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@124 | 425 | # assignment of values as the data arrives. |
paulb@124 | 426 | |
paulb@124 | 427 | <strong>results = exchange.D = [0] * N * N</strong> |
paulb@124 | 428 | |
paulb@124 | 429 | # Wrap the calculate function and manage it. |
paulb@124 | 430 | |
paulb@124 | 431 | calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) |
paulb@124 | 432 | |
paulb@124 | 433 | # Perform the work. |
paulb@124 | 434 | |
paulb@124 | 435 | print "Calculating..." |
paulb@124 | 436 | for i in range(0, N): |
paulb@124 | 437 | for j in range(0, N): |
paulb@124 | 438 | calc(i, j) |
paulb@124 | 439 | |
paulb@124 | 440 | # Wait for the results. |
paulb@124 | 441 | |
paulb@124 | 442 | print "Finishing..." |
paulb@124 | 443 | <strong>exchange.finish()</strong> |
paulb@124 | 444 | |
paulb@124 | 445 | # Show the results. |
paulb@124 | 446 | |
paulb@124 | 447 | for i in range(0, N): |
paulb@124 | 448 | for result in results[i*N:i*N+N]: |
paulb@124 | 449 | print result, |
paulb@124 | 450 | print |
paulb@124 | 451 | |
paulb@124 | 452 | print "Time taken:", time.time() - t |
paulb@124 | 453 | </pre> |
paulb@124 | 454 | |
paulb@124 | 455 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 456 | found in the <code>examples/simple_managed.py</code> file.)</p> |
paulb@124 | 457 | |
paulb@124 | 458 | <p>The main visible differences between this and the previous program are the |
paulb@124 | 459 | storage of the result array in the exchange, the removal of the queue |
paulb@124 | 460 | consumption code from the main program, placing the act of storing values in |
paulb@124 | 461 | the exchange's <code>store_data</code> method, and the need to call the |
paulb@124 | 462 | <code>finish</code> method on the <code>MyExchange</code> object so that we do |
paulb@124 | 463 | not try and access the results too soon. One underlying benefit not visible in |
paulb@124 | 464 | the above code is that we no longer need to accumulate results in a queue or |
paulb@124 | 465 | other structure so that they may be processed and assigned to the correct |
paulb@124 | 466 | positions in the result array.</p> |
paulb@124 | 467 | |
paul@159 | 468 | <h3 id="channel">Using Channels in Callables</h3> |
paul@159 | 469 | |
paulb@124 | 470 | <p>For the curious, we may remove some of the remaining conveniences of the |
paulb@124 | 471 | above program to expose other features of <code>pprocess</code>. First, we |
paulb@124 | 472 | define a slightly modified version of the <code>calculate</code> function:</p> |
paulb@124 | 473 | |
paulb@124 | 474 | <pre> |
paulb@124 | 475 | def calculate(ch, i, j): |
paulb@124 | 476 | |
paulb@124 | 477 | """ |
paulb@124 | 478 | A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to |
paulb@124 | 479 | communicate with the parent process. |
paulb@124 | 480 | """ |
paulb@124 | 481 | |
paulb@124 | 482 | time.sleep(delay) |
paulb@124 | 483 | ch.send((i, j, i * N + j)) |
paulb@124 | 484 | </pre> |
paulb@124 | 485 | |
paulb@124 | 486 | <p>This function accepts a channel, <code>ch</code>, through which results |
paulb@124 | 487 | will be sent, and through which other values could potentially be received, |
paulb@124 | 488 | although we choose not to do so here. The program using this function is as |
paulb@124 | 489 | follows, with differences to the previous program illustrated:</p> |
paulb@124 | 490 | |
paulb@124 | 491 | <pre> |
paulb@124 | 492 | t = time.time() |
paulb@124 | 493 | |
paulb@124 | 494 | # Initialise the communications exchange with a limit on the number of |
paulb@124 | 495 | # channels/processes. |
paulb@124 | 496 | |
paulb@124 | 497 | exchange = MyExchange(limit=limit) |
paulb@124 | 498 | |
paulb@124 | 499 | # Initialise an array - it is stored in the exchange to permit automatic |
paulb@124 | 500 | # assignment of values as the data arrives. |
paulb@124 | 501 | |
paulb@124 | 502 | results = exchange.D = [0] * N * N |
paulb@124 | 503 | |
paulb@124 | 504 | # Perform the work. |
paulb@124 | 505 | |
paulb@124 | 506 | print "Calculating..." |
paulb@124 | 507 | for i in range(0, N): |
paulb@124 | 508 | for j in range(0, N): |
paulb@124 | 509 | <strong>exchange.start(calculate, i, j)</strong> |
paulb@124 | 510 | |
paulb@124 | 511 | # Wait for the results. |
paulb@124 | 512 | |
paulb@124 | 513 | print "Finishing..." |
paulb@124 | 514 | exchange.finish() |
paulb@124 | 515 | |
paulb@124 | 516 | # Show the results. |
paulb@124 | 517 | |
paulb@124 | 518 | for i in range(0, N): |
paulb@124 | 519 | for result in results[i*N:i*N+N]: |
paulb@124 | 520 | print result, |
paulb@124 | 521 | print |
paulb@124 | 522 | |
paulb@124 | 523 | print "Time taken:", time.time() - t |
paulb@124 | 524 | </pre> |
paulb@124 | 525 | |
paulb@124 | 526 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 527 | found in the <code>examples/simple_start.py</code> file.)</p> |
paulb@124 | 528 | |
paulb@124 | 529 | <p>Here, we have discarded two conveniences: the wrapping of callables using |
paulb@124 | 530 | <code>MakeParallel</code>, which lets us use functions without providing any |
paulb@124 | 531 | channel parameters, and the management of callables using the |
paulb@124 | 532 | <code>manage</code> method on queues, exchanges, and so on. The |
paulb@124 | 533 | <code>start</code> method still calls the provided callable, but using a |
paulb@124 | 534 | different notation from that employed previously.</p> |
paulb@124 | 535 | |
paulb@145 | 536 | <h2 id="create">Converting Inline Computations</h2> |
paulb@124 | 537 | |
paulb@124 | 538 | <p>Although many programs employ functions and other useful abstractions which |
paulb@124 | 539 | can be treated as parallelisable units, some programs perform computations |
paulb@124 | 540 | "inline", meaning that the code responsible appears directly within a loop or |
paulb@124 | 541 | related control-flow construct. Consider the following code:</p> |
paulb@124 | 542 | |
paulb@124 | 543 | <pre> |
paulb@124 | 544 | t = time.time() |
paulb@124 | 545 | |
paulb@124 | 546 | # Initialise an array. |
paulb@124 | 547 | |
paulb@124 | 548 | results = [0] * N * N |
paulb@124 | 549 | |
paulb@124 | 550 | # Perform the work. |
paulb@124 | 551 | |
paulb@124 | 552 | print "Calculating..." |
paulb@124 | 553 | for i in range(0, N): |
paulb@124 | 554 | for j in range(0, N): |
paulb@124 | 555 | time.sleep(delay) |
paulb@124 | 556 | results[i*N+j] = i * N + j |
paulb@124 | 557 | |
paulb@124 | 558 | # Show the results. |
paulb@124 | 559 | |
paulb@124 | 560 | for i in range(0, N): |
paulb@124 | 561 | for result in results[i*N:i*N+N]: |
paulb@124 | 562 | print result, |
paulb@124 | 563 | print |
paulb@124 | 564 | |
paulb@124 | 565 | print "Time taken:", time.time() - t |
paulb@124 | 566 | </pre> |
paulb@124 | 567 | |
paulb@124 | 568 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 569 | found in the <code>examples/simple.py</code> file.)</p> |
paulb@124 | 570 | |
paulb@124 | 571 | <p>To simulate "work", as in the different versions of the |
paulb@124 | 572 | <code>calculate</code> function, we use the <code>time.sleep</code> function |
paulb@124 | 573 | (which does not actually do work, and which will cause a process to be |
paulb@124 | 574 | descheduled in most cases, but which simulates the delay associated with work |
paulb@124 | 575 | being done). This inline work, which must be performed sequentially in the |
paulb@124 | 576 | above program, can be performed in parallel in a somewhat modified version of |
paulb@124 | 577 | the program:</p> |
paulb@124 | 578 | |
paulb@124 | 579 | <pre> |
paulb@124 | 580 | t = time.time() |
paulb@124 | 581 | |
paulb@124 | 582 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 583 | # channels/processes. |
paulb@124 | 584 | |
paulb@124 | 585 | <strong>results = pprocess.Map(limit=limit)</strong> |
paulb@124 | 586 | |
paulb@124 | 587 | # Perform the work. |
paulb@124 | 588 | # NOTE: Could use the with statement in the loop to package the |
paulb@124 | 589 | # NOTE: try...finally functionality. |
paulb@124 | 590 | |
paulb@124 | 591 | print "Calculating..." |
paulb@124 | 592 | for i in range(0, N): |
paulb@124 | 593 | for j in range(0, N): |
paulb@124 | 594 | <strong>ch = results.create()</strong> |
paulb@124 | 595 | <strong>if ch:</strong> |
paulb@124 | 596 | <strong>try: # Calculation work.</strong> |
paulb@124 | 597 | |
paulb@124 | 598 | time.sleep(delay) |
paulb@124 | 599 | <strong>ch.send(i * N + j)</strong> |
paulb@124 | 600 | |
paulb@124 | 601 | <strong>finally: # Important finalisation.</strong> |
paulb@124 | 602 | |
paulb@124 | 603 | <strong>pprocess.exit(ch)</strong> |
paulb@124 | 604 | |
paulb@124 | 605 | # Show the results. |
paulb@124 | 606 | |
paulb@124 | 607 | for i in range(0, N): |
paulb@124 | 608 | for result in results[i*N:i*N+N]: |
paulb@124 | 609 | print result, |
paulb@124 | 610 | print |
paulb@124 | 611 | |
paulb@124 | 612 | print "Time taken:", time.time() - t |
paulb@124 | 613 | </pre> |
paulb@124 | 614 | |
paulb@124 | 615 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 616 | found in the <code>examples/simple_create_map.py</code> file.)</p> |
paulb@124 | 617 | |
paulb@124 | 618 | <p>Although seemingly more complicated, the bulk of the changes in this |
paulb@124 | 619 | modified program are focused on obtaining a channel object, <code>ch</code>, |
paulb@124 | 620 | at the point where the computations are performed, and the wrapping of the |
paulb@124 | 621 | computation code in a <code>try</code>...<code>finally</code> statement which |
paulb@124 | 622 | ensures that the process associated with the channel exits when the |
paulb@124 | 623 | computation is complete. In order for the results of these computations to be |
paulb@124 | 624 | collected, a <code>pprocess.Map</code> object is used, since it will maintain |
paulb@124 | 625 | the results in the same order as the initiation of the computations which |
paulb@124 | 626 | produced them.</p> |
paulb@124 | 627 | |
paulb@145 | 628 | <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> |
paulb@124 | 629 | |
paulb@124 | 630 | <p>One notable aspect of the above programs when parallelised is that each |
paulb@124 | 631 | invocation of a computation in parallel creates a new process in which the |
paulb@124 | 632 | computation is to be performed, regardless of whether existing processes had |
paulb@124 | 633 | just finished producing results and could theoretically have been asked to |
paulb@124 | 634 | perform new computations. In other words, processes were created and destroyed |
paulb@124 | 635 | instead of being reused.</p> |
paulb@124 | 636 | |
paulb@124 | 637 | <p>However, we can request that processes be reused for computations by |
paulb@124 | 638 | enabling the <code>reuse</code> feature of exchange-like objects and employing |
paulb@124 | 639 | suitable reusable callables. Consider this modified version of the <a |
paulb@124 | 640 | href="#simple_managed_map">simple_managed_map</a> program:</p> |
paulb@124 | 641 | |
paulb@124 | 642 | <pre> |
paulb@124 | 643 | t = time.time() |
paulb@124 | 644 | |
paulb@124 | 645 | # Initialise the results using a map with a limit on the number of |
paulb@124 | 646 | # channels/processes. |
paulb@124 | 647 | |
paulb@124 | 648 | results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) |
paulb@124 | 649 | |
paulb@124 | 650 | # Wrap the calculate function and manage it. |
paulb@124 | 651 | |
paulb@124 | 652 | calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) |
paulb@124 | 653 | |
paulb@124 | 654 | # Perform the work. |
paulb@124 | 655 | |
paulb@124 | 656 | print "Calculating..." |
paulb@124 | 657 | for i in range(0, N): |
paulb@124 | 658 | for j in range(0, N): |
paulb@124 | 659 | calc(i, j) |
paulb@124 | 660 | |
paulb@124 | 661 | # Show the results. |
paulb@124 | 662 | |
paulb@124 | 663 | for i in range(0, N): |
paulb@124 | 664 | for result in results[i*N:i*N+N]: |
paulb@124 | 665 | print result, |
paulb@124 | 666 | print |
paulb@124 | 667 | |
paulb@124 | 668 | print "Time taken:", time.time() - t |
paulb@124 | 669 | </pre> |
paulb@124 | 670 | |
paulb@124 | 671 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@124 | 672 | found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> |
paulb@124 | 673 | |
paulb@124 | 674 | <p>By indicating that processes and channels shall be reused, and by wrapping |
paulb@124 | 675 | the <code>calculate</code> function with the necessary support, the |
paulb@124 | 676 | computations may be performed in parallel using a pool of processes instead of |
paulb@124 | 677 | creating a new process for each computation and then discarding it, only to |
paulb@124 | 678 | create a new process for the next computation.</p> |
paulb@124 | 679 | |
paul@159 | 680 | <h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2> |
paul@159 | 681 | |
paul@159 | 682 | <p>Although reusable processes offer the opportunity to invoke a callable over |
paul@159 | 683 | and over within the same created process, they do not fully support the |
paul@159 | 684 | potential of the underlying mechanisms in <code>pprocess</code>: created |
paul@159 | 685 | processes can communicate multiple values to the creating process and can |
paul@159 | 686 | theoretically run within the same callable forever.</p> |
paul@159 | 687 | |
paul@159 | 688 | <p>Consider this modified form of the <code>calculate</code> function:</p> |
paul@159 | 689 | |
paul@159 | 690 | <pre> |
paul@159 | 691 | def calculate(ch, i): |
paul@159 | 692 | |
paul@159 | 693 | """ |
paul@159 | 694 | A supposedly time-consuming calculation on 'i'. |
paul@159 | 695 | """ |
paul@159 | 696 | |
paul@159 | 697 | for j in range(0, N): |
paul@159 | 698 | time.sleep(delay) |
paul@159 | 699 | ch.send((i, j, i * N + j)) |
paul@159 | 700 | </pre> |
paul@159 | 701 | |
paul@159 | 702 | <p>This function accepts a channel <code>ch</code> together with an argument |
paul@159 | 703 | <code>i</code> corresponding to an entire row of the input array, as opposed |
paul@159 | 704 | to having two arguments (<code>i</code> and <code>j</code>) corresponding to a |
paul@159 | 705 | single cell in the input array. In this function, a series of calculations are |
paul@159 | 706 | performed and a number of values are returned through the channel, without the |
paul@159 | 707 | function terminating until all values have been returned for the row data.</p> |
paul@159 | 708 | |
paul@159 | 709 | <p>To use this modified function, a modified version of the |
paul@159 | 710 | <a href="#simple_managed_queue">simple_managed_queue</a> program is used:</p> |
paul@159 | 711 | |
paul@159 | 712 | <pre> |
paul@159 | 713 | t = time.time() |
paul@159 | 714 | |
paul@159 | 715 | # Initialise the communications queue with a limit on the number of |
paul@159 | 716 | # channels/processes. |
paul@159 | 717 | |
paul@159 | 718 | queue = pprocess.Queue(limit=limit<strong>, continuous=1</strong>) |
paul@159 | 719 | |
paul@159 | 720 | # Initialise an array. |
paul@159 | 721 | |
paul@159 | 722 | results = [0] * N * N |
paul@159 | 723 | |
paul@159 | 724 | # Manage the calculate function. |
paul@159 | 725 | |
paul@159 | 726 | calc = queue.manage(<strong>calculate</strong>) |
paul@159 | 727 | |
paul@159 | 728 | # Perform the work. |
paul@159 | 729 | |
paul@159 | 730 | print "Calculating..." |
paul@159 | 731 | for i in range(0, N): |
paul@159 | 732 | <strong>calc(i)</strong> |
paul@159 | 733 | |
paul@159 | 734 | # Store the results as they arrive. |
paul@159 | 735 | |
paul@159 | 736 | print "Finishing..." |
paul@159 | 737 | for i, j, result in queue: |
paul@159 | 738 | results[i*N+j] = result |
paul@159 | 739 | |
paul@159 | 740 | # Show the results. |
paul@159 | 741 | |
paul@159 | 742 | for i in range(0, N): |
paul@159 | 743 | for result in results[i*N:i*N+N]: |
paul@159 | 744 | print result, |
paul@159 | 745 | print |
paul@159 | 746 | |
paul@159 | 747 | print "Time taken:", time.time() - t |
paul@159 | 748 | </pre> |
paul@159 | 749 | |
paul@159 | 750 | <p>(This code in context with <code>import</code> statements and functions is |
paul@159 | 751 | found in the <code>examples/simple_continuous_queue.py</code> file.)</p> |
paul@159 | 752 | |
paul@165 | 753 | <p>Although the inner loop in the work section has been relocated to the |
paul@159 | 754 | <code>calculate</code> function, the queue still receives outputs from that |
paul@159 | 755 | function with positional information and a result for the result array. Thus, |
paul@159 | 756 | no change is needed for the retrieval of the results: they arrive in the queue |
paul@159 | 757 | as before.</p> |
paul@159 | 758 | |
paulb@145 | 759 | <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> |
paulb@145 | 760 | |
paulb@145 | 761 | <p>Occasionally, it is desirable to initiate time-consuming computations and to |
paulb@145 | 762 | not only leave such processes running in the background, but to be able to detach |
paulb@145 | 763 | the creating process from them completely, potentially terminating the creating |
paulb@145 | 764 | process altogether, and yet also be able to collect the results of the created |
paulb@145 | 765 | processes at a later time, potentially in another completely different process. |
paulb@145 | 766 | For such situations, we can make use of the <code>BackgroundCallable</code> |
paulb@145 | 767 | class, which converts a parallel-aware callable into a callable which will run |
paulb@145 | 768 | in a background process when invoked.</p> |
paulb@145 | 769 | |
paulb@145 | 770 | <p>Consider this excerpt from a modified version of the <a |
paulb@145 | 771 | href="#simple_managed_queue">simple_managed_queue</a> program:</p> |
paulb@145 | 772 | |
paulb@145 | 773 | <pre> |
paulb@145 | 774 | <strong>def task():</strong> |
paulb@145 | 775 | |
paulb@145 | 776 | # Initialise the communications queue with a limit on the number of |
paulb@145 | 777 | # channels/processes. |
paulb@145 | 778 | |
paulb@145 | 779 | queue = pprocess.Queue(limit=limit) |
paulb@145 | 780 | |
paulb@145 | 781 | # Initialise an array. |
paulb@145 | 782 | |
paulb@145 | 783 | results = [0] * N * N |
paulb@145 | 784 | |
paulb@145 | 785 | # Wrap the calculate function and manage it. |
paulb@145 | 786 | |
paulb@145 | 787 | calc = queue.manage(pprocess.MakeParallel(calculate)) |
paulb@145 | 788 | |
paulb@145 | 789 | # Perform the work. |
paulb@145 | 790 | |
paulb@145 | 791 | print "Calculating..." |
paulb@145 | 792 | for i in range(0, N): |
paulb@145 | 793 | for j in range(0, N): |
paulb@145 | 794 | calc(i, j) |
paulb@145 | 795 | |
paulb@145 | 796 | # Store the results as they arrive. |
paulb@145 | 797 | |
paulb@145 | 798 | print "Finishing..." |
paulb@145 | 799 | for i, j, result in queue: |
paulb@145 | 800 | results[i*N+j] = result |
paulb@145 | 801 | |
paulb@145 | 802 | <strong>return results</strong> |
paulb@145 | 803 | </pre> |
paulb@145 | 804 | |
paulb@145 | 805 | <p>Here, we have converted the main program into a function, and instead of |
paulb@145 | 806 | printing out the results, we return the results list from the function.</p> |
paulb@145 | 807 | |
paulb@145 | 808 | <p>Now, let us consider the new main program (with the relevant mechanisms |
paulb@145 | 809 | highlighted):</p> |
paulb@145 | 810 | |
paulb@145 | 811 | <pre> |
paulb@145 | 812 | t = time.time() |
paulb@145 | 813 | |
paulb@145 | 814 | if "--reconnect" not in sys.argv: |
paulb@145 | 815 | |
paulb@145 | 816 | # Wrap the computation and manage it. |
paulb@145 | 817 | |
paulb@145 | 818 | <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> |
paulb@145 | 819 | |
paulb@145 | 820 | # Perform the work. |
paulb@145 | 821 | |
paulb@145 | 822 | ptask() |
paulb@145 | 823 | |
paulb@145 | 824 | # Discard the callable. |
paulb@145 | 825 | |
paulb@145 | 826 | del ptask |
paulb@145 | 827 | print "Discarded the callable." |
paulb@145 | 828 | |
paulb@145 | 829 | if "--start" not in sys.argv: |
paulb@145 | 830 | |
paulb@145 | 831 | # Open a queue and reconnect to the task. |
paulb@145 | 832 | |
paulb@145 | 833 | print "Opening a queue." |
paulb@145 | 834 | <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> |
paulb@145 | 835 | |
paulb@145 | 836 | # Wait for the results. |
paulb@145 | 837 | |
paulb@145 | 838 | print "Waiting for persistent results" |
paulb@145 | 839 | for results in queue: |
paulb@145 | 840 | pass # should only be one element |
paulb@145 | 841 | |
paulb@145 | 842 | # Show the results. |
paulb@145 | 843 | |
paulb@145 | 844 | for i in range(0, N): |
paulb@145 | 845 | for result in results[i*N:i*N+N]: |
paulb@145 | 846 | print result, |
paulb@145 | 847 | print |
paulb@145 | 848 | |
paulb@145 | 849 | print "Time taken:", time.time() - t |
paulb@145 | 850 | </pre> |
paulb@145 | 851 | |
paulb@145 | 852 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@145 | 853 | found in the <code>examples/simple_background_queue.py</code> file.)</p> |
paulb@145 | 854 | |
paulb@145 | 855 | <p>This new main program has two parts: the part which initiates the |
paulb@145 | 856 | computation, and the part which connects to the computation in order to collect |
paulb@145 | 857 | the results. Both parts can be run in the same process, and this should result |
paulb@145 | 858 | in similar behaviour to that of the original |
paulb@145 | 859 | <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> |
paulb@145 | 860 | |
paulb@145 | 861 | <p>In the above program, however, we are free to specify <code>--start</code> as |
paulb@145 | 862 | an option when running the program, and the result of this is merely to initiate |
paulb@145 | 863 | the computation in a background process, using <code>BackgroundCallable</code> |
paulb@145 | 864 | to obtain a callable which, when invoked, creates the background process and |
paulb@145 | 865 | runs the computation. After doing this, the program will then exit, but it will |
paulb@145 | 866 | leave the computation running as a collection of background processes, and a |
paulb@145 | 867 | special file called <code>task.socket</code> will exist in the current working |
paulb@145 | 868 | directory.</p> |
paulb@145 | 869 | |
paulb@145 | 870 | <p>When the above program is run using the <code>--reconnect</code> option, an |
paulb@145 | 871 | attempt will be made to reconnect to the background processes already created by |
paulb@145 | 872 | attempting to contact them using the previously created <code>task.socket</code> |
paulb@145 | 873 | special file (which is, in fact, a UNIX-domain socket); this being done using |
paulb@145 | 874 | the <code>BackgroundQueue</code> function which will handle the incoming results |
paulb@145 | 875 | in a fashion similar to that of a <code>Queue</code> object. Since only one |
paulb@145 | 876 | result is returned by the computation (as defined by the <code>return</code> |
paulb@145 | 877 | statement in the <code>task</code> function), we need only expect one element to |
paulb@145 | 878 | be collected by the queue: a list containing all of the results produced in the |
paulb@145 | 879 | computation.</p> |
paulb@145 | 880 | |
paulb@145 | 881 | <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> |
paulb@145 | 882 | |
paulb@145 | 883 | <p>In the above example, a single background process was used to manage a number |
paulb@145 | 884 | of other processes, with all of them running in the background. However, it can |
paulb@145 | 885 | be desirable to manage more than one background process.</p> |
paulb@145 | 886 | |
paulb@145 | 887 | <p>Consider this excerpt from a modified version of the <a |
paulb@145 | 888 | href="#simple_managed_queue">simple_managed_queue</a> program:</p> |
paulb@145 | 889 | |
paulb@145 | 890 | <pre> |
paulb@145 | 891 | <strong>def task(i):</strong> |
paulb@145 | 892 | |
paulb@145 | 893 | # Initialise the communications queue with a limit on the number of |
paulb@145 | 894 | # channels/processes. |
paulb@145 | 895 | |
paulb@145 | 896 | queue = pprocess.Queue(limit=limit) |
paulb@145 | 897 | |
paulb@145 | 898 | # Initialise an array. |
paulb@145 | 899 | |
paulb@145 | 900 | results = [0] * N |
paulb@145 | 901 | |
paulb@145 | 902 | # Wrap the calculate function and manage it. |
paulb@145 | 903 | |
paulb@145 | 904 | calc = queue.manage(pprocess.MakeParallel(calculate)) |
paulb@145 | 905 | |
paulb@145 | 906 | # Perform the work. |
paulb@145 | 907 | |
paulb@145 | 908 | print "Calculating..." |
paulb@145 | 909 | <strong>for j in range(0, N):</strong> |
paulb@145 | 910 | <strong>calc(i, j)</strong> |
paulb@145 | 911 | |
paulb@145 | 912 | # Store the results as they arrive. |
paulb@145 | 913 | |
paulb@145 | 914 | print "Finishing..." |
paulb@145 | 915 | <strong>for i, j, result in queue:</strong> |
paulb@145 | 916 | <strong>results[j] = result</strong> |
paulb@145 | 917 | |
paulb@145 | 918 | <strong>return i, results</strong> |
paulb@145 | 919 | </pre> |
paulb@145 | 920 | |
paulb@145 | 921 | <p>Just as we see in the previous example, a function called <code>task</code> |
paulb@145 | 922 | has been defined to hold a background computation, and this function returns a |
paulb@145 | 923 | portion of the results. However, unlike the previous example or the original |
paulb@145 | 924 | example, the scope of the results of the computation collected in the function |
paulb@145 | 925 | have been changed: here, only results for calculations involving a certain value |
paulb@145 | 926 | of <code>i</code> are collected, with the particular value of <code>i</code> |
paulb@145 | 927 | returned along with the appropriate portion of the results.</p> |
paulb@145 | 928 | |
paulb@145 | 929 | <p>Now, let us consider the new main program (with the relevant mechanisms |
paulb@145 | 930 | highlighted):</p> |
paulb@145 | 931 | |
paulb@145 | 932 | <pre> |
paulb@145 | 933 | t = time.time() |
paulb@145 | 934 | |
paulb@145 | 935 | if "--reconnect" not in sys.argv: |
paulb@145 | 936 | |
paulb@145 | 937 | # Wrap the computation and manage it. |
paulb@145 | 938 | |
paulb@145 | 939 | <strong>ptask = pprocess.MakeParallel(task)</strong> |
paulb@145 | 940 | |
paulb@145 | 941 | <strong>for i in range(0, N):</strong> |
paulb@145 | 942 | |
paulb@145 | 943 | # Make a distinct callable for each part of the computation. |
paulb@145 | 944 | |
paulb@145 | 945 | <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> |
paulb@145 | 946 | |
paulb@145 | 947 | # Perform the work. |
paulb@145 | 948 | |
paulb@145 | 949 | <strong>ptask_i(i)</strong> |
paulb@145 | 950 | |
paulb@145 | 951 | # Discard the callable. |
paulb@145 | 952 | |
paulb@145 | 953 | del ptask |
paulb@145 | 954 | print "Discarded the callable." |
paulb@145 | 955 | |
paulb@145 | 956 | if "--start" not in sys.argv: |
paulb@145 | 957 | |
paulb@145 | 958 | # Open a queue and reconnect to the task. |
paulb@145 | 959 | |
paulb@145 | 960 | print "Opening a queue." |
paulb@145 | 961 | <strong>queue = pprocess.PersistentQueue()</strong> |
paulb@145 | 962 | <strong>for i in range(0, N):</strong> |
paulb@145 | 963 | <strong>queue.connect("task-%d.socket" % i)</strong> |
paulb@145 | 964 | |
paulb@145 | 965 | # Initialise an array. |
paulb@145 | 966 | |
paulb@145 | 967 | <strong>results = [0] * N</strong> |
paulb@145 | 968 | |
paulb@145 | 969 | # Wait for the results. |
paulb@145 | 970 | |
paulb@145 | 971 | print "Waiting for persistent results" |
paulb@145 | 972 | <strong>for i, result in queue:</strong> |
paulb@145 | 973 | <strong>results[i] = result</strong> |
paulb@145 | 974 | |
paulb@145 | 975 | # Show the results. |
paulb@145 | 976 | |
paulb@145 | 977 | for i in range(0, N): |
paulb@145 | 978 | <strong>for result in results[i]:</strong> |
paulb@145 | 979 | print result, |
paulb@145 | 980 | print |
paulb@145 | 981 | |
paulb@145 | 982 | print "Time taken:", time.time() - t |
paulb@145 | 983 | </pre> |
paulb@145 | 984 | |
paulb@145 | 985 | <p>(This code in context with <code>import</code> statements and functions is |
paulb@145 | 986 | found in the <code>examples/simple_persistent_queue.py</code> file.)</p> |
paulb@145 | 987 | |
paulb@145 | 988 | <p>In the first section, the process of making a parallel-aware callable is as |
paulb@145 | 989 | expected, but instead of then invoking a single background version of such a |
paulb@145 | 990 | callable, as in the previous example, we create a version for each value of |
paulb@145 | 991 | <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. |
paulb@145 | 992 | The result of this is a total of <code>N</code> background processes, each |
paulb@145 | 993 | running an invocation of the <code>task</code> function with a distinct value of |
paulb@145 | 994 | <code>i</code> (which in turn perform computations), and each employing a |
paulb@145 | 995 | UNIX-domain socket for communication with a name of the form |
paulb@145 | 996 | <code>task-<em>i</em>.socket</code>.</p> |
paulb@145 | 997 | |
paulb@145 | 998 | <p>In the second section, since we now have more than one background process, we |
paulb@145 | 999 | must find a way to monitor them after reconnecting to them; to achieve this, a |
paulb@145 | 1000 | <code>PersistentQueue</code> is created, which acts like a regular |
paulb@145 | 1001 | <code>Queue</code> object but is instead focused on handling persistent |
paulb@145 | 1002 | communications. Upon connecting the queue to each of the previously created |
paulb@145 | 1003 | UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and |
paulb@145 | 1004 | exposes received results through an iterator. Here, the principal difference |
paulb@145 | 1005 | from previous examples is the structure of results: instead of collecting each |
paulb@145 | 1006 | individual value in a flat <code>i</code> by <code>j</code> array, a list is |
paulb@145 | 1007 | returned for each value of <code>i</code> and is stored directly in another |
paulb@145 | 1008 | list.</p> |
paulb@145 | 1009 | |
paulb@145 | 1010 | <h3>Applications of Background Computations</h3> |
paulb@145 | 1011 | |
paulb@145 | 1012 | <p>Background computations are useful because they provide flexibility in the |
paulb@145 | 1013 | way the results can be collected. One area in which they can be useful is Web |
paulb@145 | 1014 | programming, where a process handling an incoming HTTP request may need to |
paulb@145 | 1015 | initiate a computation but then immediately send output to the Web client - such |
paulb@145 | 1016 | as a page indicating that the computation is "in progress" - without having to |
paulb@145 | 1017 | wait for the computation or to allocate resources to monitor it. Moreover, in |
paulb@145 | 1018 | some Web architectures, notably those employing the Common Gateway Interface |
paulb@145 | 1019 | (CGI), it is necessary for a process handling an incoming request to terminate |
paulb@145 | 1020 | before its output will be sent to clients. By using a |
paulb@145 | 1021 | <code>BackgroundCallable</code>, a Web server process can initiate a |
paulb@145 | 1022 | communication, and then subsequent server processes can be used to reconnect to |
paulb@145 | 1023 | the background computation and to wait efficiently for results.</p> |
paulb@145 | 1024 | |
paulb@145 | 1025 | <h2 id="Summary">Summary</h2> |
paulb@124 | 1026 | |
paulb@124 | 1027 | <p>The following table indicates the features used in converting one |
paulb@124 | 1028 | sequential example program to another parallel program:</p> |
paulb@124 | 1029 | |
paulb@124 | 1030 | <table border="1" cellspacing="0" cellpadding="5"> |
paulb@124 | 1031 | <thead> |
paulb@124 | 1032 | <tr> |
paulb@124 | 1033 | <th>Sequential Example</th> |
paulb@124 | 1034 | <th>Parallel Example</th> |
paulb@124 | 1035 | <th>Features Used</th> |
paulb@124 | 1036 | </tr> |
paulb@124 | 1037 | </thead> |
paulb@124 | 1038 | <tbody> |
paulb@124 | 1039 | <tr> |
paulb@124 | 1040 | <td>simple_map</td> |
paulb@124 | 1041 | <td>simple_pmap</td> |
paulb@124 | 1042 | <td>pmap</td> |
paulb@124 | 1043 | </tr> |
paulb@124 | 1044 | <tr> |
paulb@124 | 1045 | <td>simple1</td> |
paulb@124 | 1046 | <td>simple_managed_map</td> |
paulb@124 | 1047 | <td>MakeParallel, Map, manage</td> |
paulb@124 | 1048 | </tr> |
paulb@124 | 1049 | <tr> |
paul@159 | 1050 | <td rowspan="6">simple2</td> |
paulb@124 | 1051 | <td>simple_managed_queue</td> |
paulb@124 | 1052 | <td>MakeParallel, Queue, manage</td> |
paulb@124 | 1053 | </tr> |
paulb@124 | 1054 | <tr> |
paul@159 | 1055 | <td>simple_continuous_queue</td> |
paul@159 | 1056 | <td>Queue, manage (continuous)</td> |
paul@159 | 1057 | </tr> |
paul@159 | 1058 | <tr> |
paulb@124 | 1059 | <td>simple_managed</td> |
paulb@124 | 1060 | <td>MakeParallel, Exchange (subclass), manage, finish</td> |
paulb@124 | 1061 | </tr> |
paulb@124 | 1062 | <tr> |
paulb@124 | 1063 | <td>simple_start</td> |
paulb@124 | 1064 | <td>Channel, Exchange (subclass), start, finish</td> |
paulb@124 | 1065 | </tr> |
paulb@124 | 1066 | <tr> |
paulb@145 | 1067 | <td>simple_background_queue</td> |
paulb@145 | 1068 | <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> |
paulb@145 | 1069 | </tr> |
paulb@145 | 1070 | <tr> |
paulb@145 | 1071 | <td>simple_persistent_queue</td> |
paulb@145 | 1072 | <td>MakeParallel, BackgroundCallable, PersistentQueue</td> |
paulb@145 | 1073 | </tr> |
paulb@145 | 1074 | <tr> |
paulb@124 | 1075 | <td>simple</td> |
paulb@124 | 1076 | <td>simple_create_map</td> |
paulb@124 | 1077 | <td>Channel, Map, create, exit</td> |
paulb@124 | 1078 | </tr> |
paulb@124 | 1079 | </tbody> |
paulb@124 | 1080 | </table> |
paulb@124 | 1081 | |
paulb@124 | 1082 | </body> |
paulb@124 | 1083 | </html> |