1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <ul> 19 <li><a href="#note">A Note on Parallel Processes</a></li> 20 <li><a href="#pmap">Converting Map-Style Code</a></li> 21 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> 22 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a> 23 <ul> 24 <li><a href="#Exchange">Replacing Queues with Exchanges</li></a> 25 <li><a href="#channel">Using Channels in Callables</li></a> 26 </ul> 27 </li> 28 <li><a href="#create">Converting Inline Computations</a></li> 29 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> 30 <li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li> 31 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> 32 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> 33 <li><a href="#Summary">Summary</a></li> 34 </ul> 35 36 <p>For a brief summary of each of the features of <code>pprocess</code>, see 37 the <a href="reference.html">reference document</a>.</p> 38 39 <h2 id="note">A Note on Parallel Processes</h2> 40 41 <p>The way <code>pprocess</code> uses multiple processes to perform work in 42 parallel involves the <code>fork</code> system call, which on modern operating 43 systems involves what is known as "copy-on-write" semantics. In plain language, 44 when <code>pprocess</code> creates a new <em>child</em> process to perform work 45 in parallel with other work that needs to be done, this new process will be a 46 near-identical copy of the original <em>parent</em> process, and the running 47 code will be able to access data resident in that parent process.</p> 48 49 <p>However, when a child process modifies data, instead of changing that data 50 in such a way that the parent process can see the modifications, the parent 51 process will, in fact, remain oblivious to such changes. What happens is that 52 as soon as the child process attempts to modify the data, it obtains its own 53 separate copy which is then modified independently of the original data. Thus, 54 a <em>copy</em> of any data is made when an attempt is made to <em>write</em> 55 to such data. Meanwhile, the parent's copy of that data will be left untouched 56 by the activities of the child.</p> 57 58 <p>It is therefore essential to note that any data distributed to other 59 processes, and which will then be modified by those processes, will not appear 60 to change in the parent process even if the objects employed are mutable. This 61 is rather different to the behaviour of a normal Python program: passing a 62 list to a function, for example, mutates that list in such a way that upon 63 returning from that function the modifications will still be present. For 64 example:</p> 65 66 <pre> 67 def mutator(l): 68 l.append(3) 69 70 l = [1, 2] 71 mutator(l) # l is now [1, 2, 3] 72 </pre> 73 74 <p>In contrast, passing a list to a child process will cause the list to 75 mutate in the child process, but the parent process will not see the list 76 change. For example:</p> 77 78 <pre> 79 def mutator(l): 80 l.append(3) 81 82 results = pprocess.Map() 83 mutator = results.manage(pprocess.MakeParallel(mutator)) 84 85 l = [1, 2] 86 mutator(l) # l is now [1, 2] 87 </pre> 88 89 <p>To communicate changes to data between processes, the modified objects must 90 be explicitly returned from child processes using the mechanisms described in 91 this documentation. For example:</p> 92 93 <pre> 94 def mutator(l): 95 l.append(3) 96 return l # the modified object is explicitly returned 97 98 results = pprocess.Map() 99 mutator = results.manage(pprocess.MakeParallel(mutator)) 100 101 l = [1, 2] 102 mutator(l) 103 104 all_l = results[:] # there are potentially many results, not just one 105 l = all_l[0] # l is now [1, 2, 3], taken from the first result 106 </pre> 107 108 <p>It is perhaps easiest to think of the communications mechanisms as 109 providing a gateway between processes through which information can be passed, 110 with the rest of a program's data being private and hidden from the other 111 processes (even if that data initially resembles what the other processes also 112 see within themselves).</p> 113 114 <h2 id="pmap">Converting Map-Style Code</h2> 115 116 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 117 118 <pre> 119 t = time.time() 120 121 # Initialise an array. 122 123 sequence = [] 124 for i in range(0, N): 125 for j in range(0, N): 126 sequence.append((i, j)) 127 128 # Perform the work. 129 130 results = map(calculate, sequence) 131 132 # Show the results. 133 134 for i in range(0, N): 135 for result in results[i*N:i*N+N]: 136 print result, 137 print 138 139 print "Time taken:", time.time() - t</pre> 140 141 <p>(This code in context with <code>import</code> statements and functions is 142 found in the <code>examples/simple_map.py</code> file.)</p> 143 144 <p>The principal features of this program involve the preparation of an array 145 for input purposes, and the use of the <code>map</code> function to iterate 146 over the combinations of <code>i</code> and <code>j</code> in the array. Even 147 if the <code>calculate</code> function could be invoked independently for each 148 input value, we have to wait for each computation to complete before 149 initiating a new one. The <code>calculate</code> function may be defined as 150 follows:</p> 151 152 <pre> 153 def calculate(t): 154 155 "A supposedly time-consuming calculation on 't'." 156 157 i, j = t 158 time.sleep(delay) 159 return i * N + j 160 </pre> 161 162 <p>In order to reduce the processing time - to speed the code up, in other 163 words - we can make this code use several processes instead of just one. Here 164 is the modified code:</p> 165 166 <pre> 167 t = time.time() 168 169 # Initialise an array. 170 171 sequence = [] 172 for i in range(0, N): 173 for j in range(0, N): 174 sequence.append((i, j)) 175 176 # Perform the work. 177 178 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 179 180 # Show the results. 181 182 for i in range(0, N): 183 for result in results[i*N:i*N+N]: 184 print result, 185 print 186 187 print "Time taken:", time.time() - t</pre> 188 189 <p>(This code in context with <code>import</code> statements and functions is 190 found in the <code>examples/simple_pmap.py</code> file.)</p> 191 192 <p>By replacing usage of the <code>map</code> function with the 193 <code>pprocess.pmap</code> function, and specifying the limit on the number of 194 processes to be active at any given time (the value of the <code>limit</code> 195 variable is defined elsewhere), several calculations can now be performed in 196 parallel.</p> 197 198 <h2 id="Map">Converting Invocations to Parallel Operations</h2> 199 200 <p>Although some programs make natural use of the <code>map</code> function, 201 others may employ an invocation in a nested loop. This may also be converted 202 to a parallel program. Consider the following Python code:</p> 203 204 <pre> 205 t = time.time() 206 207 # Initialise an array. 208 209 results = [] 210 211 # Perform the work. 212 213 print "Calculating..." 214 for i in range(0, N): 215 for j in range(0, N): 216 results.append(calculate(i, j)) 217 218 # Show the results. 219 220 for i in range(0, N): 221 for result in results[i*N:i*N+N]: 222 print result, 223 print 224 225 print "Time taken:", time.time() - t</pre> 226 227 <p>(This code in context with <code>import</code> statements and functions is 228 found in the <code>examples/simple1.py</code> file.)</p> 229 230 <p>Here, a computation in the <code>calculate</code> function is performed for 231 each combination of <code>i</code> and <code>j</code> in the nested loop, 232 returning a result value. However, we must wait for the completion of this 233 function for each element before moving on to the next element, and this means 234 that the computations are performed sequentially. Consequently, on a system 235 with more than one processor, even if we could call <code>calculate</code> for 236 more than one combination of <code>i</code> and <code>j</code><code></code> 237 and have the computations executing at the same time, the above program will 238 not take advantage of such capabilities.</p> 239 240 <p>We use a slightly modified version of <code>calculate</code> which employs 241 two parameters instead of one:</p> 242 243 <pre> 244 def calculate(i, j): 245 246 """ 247 A supposedly time-consuming calculation on 'i' and 'j'. 248 """ 249 250 time.sleep(delay) 251 return i * N + j 252 </pre> 253 254 <p>In order to reduce the processing time - to speed the code up, in other 255 words - we can make this code use several processes instead of just one. Here 256 is the modified code:</p> 257 258 <pre id="simple_managed_map"> 259 t = time.time() 260 261 # Initialise the results using a map with a limit on the number of 262 # channels/processes. 263 264 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 265 266 # Wrap the calculate function and manage it. 267 268 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 269 270 # Perform the work. 271 272 print "Calculating..." 273 for i in range(0, N): 274 for j in range(0, N): 275 <strong>calc</strong>(i, j) 276 277 # Show the results. 278 279 for i in range(0, N): 280 for result in results[i*N:i*N+N]: 281 print result, 282 print 283 284 print "Time taken:", time.time() - t</pre> 285 286 <p>(This code in context with <code>import</code> statements and functions is 287 found in the <code>examples/simple_managed_map.py</code> file.)</p> 288 289 <p>The principal changes in the above code involve the use of a 290 <code>pprocess.Map</code> object to collect the results, and a version of the 291 <code>calculate</code> function which is managed by the <code>Map</code> 292 object. What the <code>Map</code> object does is to arrange the results of 293 computations such that iterating over the object or accessing the object using 294 list operations provides the results in the same order as their corresponding 295 inputs.</p> 296 297 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> 298 299 <p>In some programs, it is not important to receive the results of 300 computations in any particular order, usually because either the order of 301 these results is irrelevant, or because the results provide "positional" 302 information which let them be handled in an appropriate way. Consider the 303 following Python code:</p> 304 305 <pre> 306 t = time.time() 307 308 # Initialise an array. 309 310 results = [0] * N * N 311 312 # Perform the work. 313 314 print "Calculating..." 315 for i in range(0, N): 316 for j in range(0, N): 317 i2, j2, result = calculate(i, j) 318 results[i2*N+j2] = result 319 320 # Show the results. 321 322 for i in range(0, N): 323 for result in results[i*N:i*N+N]: 324 print result, 325 print 326 327 print "Time taken:", time.time() - t 328 </pre> 329 330 <p>(This code in context with <code>import</code> statements and functions is 331 found in the <code>examples/simple2.py</code> file.)</p> 332 333 <p>Here, a result array is initialised first and each computation is performed 334 sequentially. A significant difference to the previous examples is the return 335 value of the <code>calculate</code> function: the position details 336 corresponding to <code>i</code> and <code>j</code> are returned alongside the 337 result. Obviously, this is of limited value in the above code because the 338 order of the computations and the reception of results is fixed. However, we 339 get no benefit from parallelisation in the above example.</p> 340 341 <p>We can bring the benefits of parallel processing to the above program with 342 the following code:</p> 343 344 <pre id="simple_managed_queue"> 345 t = time.time() 346 347 # Initialise the communications queue with a limit on the number of 348 # channels/processes. 349 350 <strong>queue = pprocess.Queue(limit=limit)</strong> 351 352 # Initialise an array. 353 354 results = [0] * N * N 355 356 # Wrap the calculate function and manage it. 357 358 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 359 360 # Perform the work. 361 362 print "Calculating..." 363 for i in range(0, N): 364 for j in range(0, N): 365 <strong>calc(i, j)</strong> 366 367 # Store the results as they arrive. 368 369 print "Finishing..." 370 <strong>for i, j, result in queue:</strong> 371 <strong>results[i*N+j] = result</strong> 372 373 # Show the results. 374 375 for i in range(0, N): 376 for result in results[i*N:i*N+N]: 377 print result, 378 print 379 380 print "Time taken:", time.time() - t 381 </pre> 382 383 <p>(This code in context with <code>import</code> statements and functions is 384 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 385 386 <p>This revised code employs a <code>pprocess.Queue</code> object whose 387 purpose is to collect the results of computations and to make them available 388 in the order in which they were received. The code collecting results has been 389 moved into a separate loop independent of the original computation loop and 390 taking advantage of the more relevant "positional" information emerging from 391 the queue.</p> 392 393 <h3 id="Exchange">Replacing Queues with Exchanges</h3> 394 395 <p>We can take this example further, illustrating some of the mechanisms 396 employed by <code>pprocess</code>. Instead of collecting results in a queue, 397 we can define a class containing a method which is called when new results 398 arrive:</p> 399 400 <pre> 401 class MyExchange(pprocess.Exchange): 402 403 "Parallel convenience class containing the array assignment operation." 404 405 def store_data(self, ch): 406 i, j, result = ch.receive() 407 self.D[i*N+j] = result 408 </pre> 409 410 <p>This code exposes the channel paradigm which is used throughout 411 <code>pprocess</code> and is available to applications, if desired. The effect 412 of the method is the storage of a result received through the channel in an 413 attribute of the object. The following code shows how this class can be used, 414 with differences to the previous program illustrated:</p> 415 416 <pre> 417 t = time.time() 418 419 # Initialise the communications exchange with a limit on the number of 420 # channels/processes. 421 422 <strong>exchange = MyExchange(limit=limit)</strong> 423 424 # Initialise an array - it is stored in the exchange to permit automatic 425 # assignment of values as the data arrives. 426 427 <strong>results = exchange.D = [0] * N * N</strong> 428 429 # Wrap the calculate function and manage it. 430 431 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 432 433 # Perform the work. 434 435 print "Calculating..." 436 for i in range(0, N): 437 for j in range(0, N): 438 calc(i, j) 439 440 # Wait for the results. 441 442 print "Finishing..." 443 <strong>exchange.finish()</strong> 444 445 # Show the results. 446 447 for i in range(0, N): 448 for result in results[i*N:i*N+N]: 449 print result, 450 print 451 452 print "Time taken:", time.time() - t 453 </pre> 454 455 <p>(This code in context with <code>import</code> statements and functions is 456 found in the <code>examples/simple_managed.py</code> file.)</p> 457 458 <p>The main visible differences between this and the previous program are the 459 storage of the result array in the exchange, the removal of the queue 460 consumption code from the main program, placing the act of storing values in 461 the exchange's <code>store_data</code> method, and the need to call the 462 <code>finish</code> method on the <code>MyExchange</code> object so that we do 463 not try and access the results too soon. One underlying benefit not visible in 464 the above code is that we no longer need to accumulate results in a queue or 465 other structure so that they may be processed and assigned to the correct 466 positions in the result array.</p> 467 468 <h3 id="channel">Using Channels in Callables</h3> 469 470 <p>For the curious, we may remove some of the remaining conveniences of the 471 above program to expose other features of <code>pprocess</code>. First, we 472 define a slightly modified version of the <code>calculate</code> function:</p> 473 474 <pre> 475 def calculate(ch, i, j): 476 477 """ 478 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 479 communicate with the parent process. 480 """ 481 482 time.sleep(delay) 483 ch.send((i, j, i * N + j)) 484 </pre> 485 486 <p>This function accepts a channel, <code>ch</code>, through which results 487 will be sent, and through which other values could potentially be received, 488 although we choose not to do so here. The program using this function is as 489 follows, with differences to the previous program illustrated:</p> 490 491 <pre> 492 t = time.time() 493 494 # Initialise the communications exchange with a limit on the number of 495 # channels/processes. 496 497 exchange = MyExchange(limit=limit) 498 499 # Initialise an array - it is stored in the exchange to permit automatic 500 # assignment of values as the data arrives. 501 502 results = exchange.D = [0] * N * N 503 504 # Perform the work. 505 506 print "Calculating..." 507 for i in range(0, N): 508 for j in range(0, N): 509 <strong>exchange.start(calculate, i, j)</strong> 510 511 # Wait for the results. 512 513 print "Finishing..." 514 exchange.finish() 515 516 # Show the results. 517 518 for i in range(0, N): 519 for result in results[i*N:i*N+N]: 520 print result, 521 print 522 523 print "Time taken:", time.time() - t 524 </pre> 525 526 <p>(This code in context with <code>import</code> statements and functions is 527 found in the <code>examples/simple_start.py</code> file.)</p> 528 529 <p>Here, we have discarded two conveniences: the wrapping of callables using 530 <code>MakeParallel</code>, which lets us use functions without providing any 531 channel parameters, and the management of callables using the 532 <code>manage</code> method on queues, exchanges, and so on. The 533 <code>start</code> method still calls the provided callable, but using a 534 different notation from that employed previously.</p> 535 536 <h2 id="create">Converting Inline Computations</h2> 537 538 <p>Although many programs employ functions and other useful abstractions which 539 can be treated as parallelisable units, some programs perform computations 540 "inline", meaning that the code responsible appears directly within a loop or 541 related control-flow construct. Consider the following code:</p> 542 543 <pre> 544 t = time.time() 545 546 # Initialise an array. 547 548 results = [0] * N * N 549 550 # Perform the work. 551 552 print "Calculating..." 553 for i in range(0, N): 554 for j in range(0, N): 555 time.sleep(delay) 556 results[i*N+j] = i * N + j 557 558 # Show the results. 559 560 for i in range(0, N): 561 for result in results[i*N:i*N+N]: 562 print result, 563 print 564 565 print "Time taken:", time.time() - t 566 </pre> 567 568 <p>(This code in context with <code>import</code> statements and functions is 569 found in the <code>examples/simple.py</code> file.)</p> 570 571 <p>To simulate "work", as in the different versions of the 572 <code>calculate</code> function, we use the <code>time.sleep</code> function 573 (which does not actually do work, and which will cause a process to be 574 descheduled in most cases, but which simulates the delay associated with work 575 being done). This inline work, which must be performed sequentially in the 576 above program, can be performed in parallel in a somewhat modified version of 577 the program:</p> 578 579 <pre> 580 t = time.time() 581 582 # Initialise the results using a map with a limit on the number of 583 # channels/processes. 584 585 <strong>results = pprocess.Map(limit=limit)</strong> 586 587 # Perform the work. 588 # NOTE: Could use the with statement in the loop to package the 589 # NOTE: try...finally functionality. 590 591 print "Calculating..." 592 for i in range(0, N): 593 for j in range(0, N): 594 <strong>ch = results.create()</strong> 595 <strong>if ch:</strong> 596 <strong>try: # Calculation work.</strong> 597 598 time.sleep(delay) 599 <strong>ch.send(i * N + j)</strong> 600 601 <strong>finally: # Important finalisation.</strong> 602 603 <strong>pprocess.exit(ch)</strong> 604 605 # Show the results. 606 607 for i in range(0, N): 608 for result in results[i*N:i*N+N]: 609 print result, 610 print 611 612 print "Time taken:", time.time() - t 613 </pre> 614 615 <p>(This code in context with <code>import</code> statements and functions is 616 found in the <code>examples/simple_create_map.py</code> file.)</p> 617 618 <p>Although seemingly more complicated, the bulk of the changes in this 619 modified program are focused on obtaining a channel object, <code>ch</code>, 620 at the point where the computations are performed, and the wrapping of the 621 computation code in a <code>try</code>...<code>finally</code> statement which 622 ensures that the process associated with the channel exits when the 623 computation is complete. In order for the results of these computations to be 624 collected, a <code>pprocess.Map</code> object is used, since it will maintain 625 the results in the same order as the initiation of the computations which 626 produced them.</p> 627 628 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> 629 630 <p>One notable aspect of the above programs when parallelised is that each 631 invocation of a computation in parallel creates a new process in which the 632 computation is to be performed, regardless of whether existing processes had 633 just finished producing results and could theoretically have been asked to 634 perform new computations. In other words, processes were created and destroyed 635 instead of being reused.</p> 636 637 <p>However, we can request that processes be reused for computations by 638 enabling the <code>reuse</code> feature of exchange-like objects and employing 639 suitable reusable callables. Consider this modified version of the <a 640 href="#simple_managed_map">simple_managed_map</a> program:</p> 641 642 <pre> 643 t = time.time() 644 645 # Initialise the results using a map with a limit on the number of 646 # channels/processes. 647 648 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 649 650 # Wrap the calculate function and manage it. 651 652 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 653 654 # Perform the work. 655 656 print "Calculating..." 657 for i in range(0, N): 658 for j in range(0, N): 659 calc(i, j) 660 661 # Show the results. 662 663 for i in range(0, N): 664 for result in results[i*N:i*N+N]: 665 print result, 666 print 667 668 print "Time taken:", time.time() - t 669 </pre> 670 671 <p>(This code in context with <code>import</code> statements and functions is 672 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 673 674 <p>By indicating that processes and channels shall be reused, and by wrapping 675 the <code>calculate</code> function with the necessary support, the 676 computations may be performed in parallel using a pool of processes instead of 677 creating a new process for each computation and then discarding it, only to 678 create a new process for the next computation.</p> 679 680 <h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2> 681 682 <p>Although reusable processes offer the opportunity to invoke a callable over 683 and over within the same created process, they do not fully support the 684 potential of the underlying mechanisms in <code>pprocess</code>: created 685 processes can communicate multiple values to the creating process and can 686 theoretically run within the same callable forever.</p> 687 688 <p>Consider this modified form of the <code>calculate</code> function:</p> 689 690 <pre> 691 def calculate(ch, i): 692 693 """ 694 A supposedly time-consuming calculation on 'i'. 695 """ 696 697 for j in range(0, N): 698 time.sleep(delay) 699 ch.send((i, j, i * N + j)) 700 </pre> 701 702 <p>This function accepts a channel <code>ch</code> together with an argument 703 <code>i</code> corresponding to an entire row of the input array, as opposed 704 to having two arguments (<code>i</code> and <code>j</code>) corresponding to a 705 single cell in the input array. In this function, a series of calculations are 706 performed and a number of values are returned through the channel, without the 707 function terminating until all values have been returned for the row data.</p> 708 709 <p>To use this modified function, a modified version of the 710 <a href="#simple_managed_queue">simple_managed_queue</a> program is used:</p> 711 712 <pre> 713 t = time.time() 714 715 # Initialise the communications queue with a limit on the number of 716 # channels/processes. 717 718 queue = pprocess.Queue(limit=limit<strong>, continuous=1</strong>) 719 720 # Initialise an array. 721 722 results = [0] * N * N 723 724 # Manage the calculate function. 725 726 calc = queue.manage(<strong>calculate</strong>) 727 728 # Perform the work. 729 730 print "Calculating..." 731 for i in range(0, N): 732 <strong>calc(i)</strong> 733 734 # Store the results as they arrive. 735 736 print "Finishing..." 737 for i, j, result in queue: 738 results[i*N+j] = result 739 740 # Show the results. 741 742 for i in range(0, N): 743 for result in results[i*N:i*N+N]: 744 print result, 745 print 746 747 print "Time taken:", time.time() - t 748 </pre> 749 750 <p>(This code in context with <code>import</code> statements and functions is 751 found in the <code>examples/simple_continuous_queue.py</code> file.)</p> 752 753 <p>Although the inner loop in the work section has been relocated to the 754 <code>calculate</code> function, the queue still receives outputs from that 755 function with positional information and a result for the result array. Thus, 756 no change is needed for the retrieval of the results: they arrive in the queue 757 as before.</p> 758 759 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> 760 761 <p>Occasionally, it is desirable to initiate time-consuming computations and to 762 not only leave such processes running in the background, but to be able to detach 763 the creating process from them completely, potentially terminating the creating 764 process altogether, and yet also be able to collect the results of the created 765 processes at a later time, potentially in another completely different process. 766 For such situations, we can make use of the <code>BackgroundCallable</code> 767 class, which converts a parallel-aware callable into a callable which will run 768 in a background process when invoked.</p> 769 770 <p>Consider this excerpt from a modified version of the <a 771 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 772 773 <pre> 774 <strong>def task():</strong> 775 776 # Initialise the communications queue with a limit on the number of 777 # channels/processes. 778 779 queue = pprocess.Queue(limit=limit) 780 781 # Initialise an array. 782 783 results = [0] * N * N 784 785 # Wrap the calculate function and manage it. 786 787 calc = queue.manage(pprocess.MakeParallel(calculate)) 788 789 # Perform the work. 790 791 print "Calculating..." 792 for i in range(0, N): 793 for j in range(0, N): 794 calc(i, j) 795 796 # Store the results as they arrive. 797 798 print "Finishing..." 799 for i, j, result in queue: 800 results[i*N+j] = result 801 802 <strong>return results</strong> 803 </pre> 804 805 <p>Here, we have converted the main program into a function, and instead of 806 printing out the results, we return the results list from the function.</p> 807 808 <p>Now, let us consider the new main program (with the relevant mechanisms 809 highlighted):</p> 810 811 <pre> 812 t = time.time() 813 814 if "--reconnect" not in sys.argv: 815 816 # Wrap the computation and manage it. 817 818 <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> 819 820 # Perform the work. 821 822 ptask() 823 824 # Discard the callable. 825 826 del ptask 827 print "Discarded the callable." 828 829 if "--start" not in sys.argv: 830 831 # Open a queue and reconnect to the task. 832 833 print "Opening a queue." 834 <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> 835 836 # Wait for the results. 837 838 print "Waiting for persistent results" 839 for results in queue: 840 pass # should only be one element 841 842 # Show the results. 843 844 for i in range(0, N): 845 for result in results[i*N:i*N+N]: 846 print result, 847 print 848 849 print "Time taken:", time.time() - t 850 </pre> 851 852 <p>(This code in context with <code>import</code> statements and functions is 853 found in the <code>examples/simple_background_queue.py</code> file.)</p> 854 855 <p>This new main program has two parts: the part which initiates the 856 computation, and the part which connects to the computation in order to collect 857 the results. Both parts can be run in the same process, and this should result 858 in similar behaviour to that of the original 859 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> 860 861 <p>In the above program, however, we are free to specify <code>--start</code> as 862 an option when running the program, and the result of this is merely to initiate 863 the computation in a background process, using <code>BackgroundCallable</code> 864 to obtain a callable which, when invoked, creates the background process and 865 runs the computation. After doing this, the program will then exit, but it will 866 leave the computation running as a collection of background processes, and a 867 special file called <code>task.socket</code> will exist in the current working 868 directory.</p> 869 870 <p>When the above program is run using the <code>--reconnect</code> option, an 871 attempt will be made to reconnect to the background processes already created by 872 attempting to contact them using the previously created <code>task.socket</code> 873 special file (which is, in fact, a UNIX-domain socket); this being done using 874 the <code>BackgroundQueue</code> function which will handle the incoming results 875 in a fashion similar to that of a <code>Queue</code> object. Since only one 876 result is returned by the computation (as defined by the <code>return</code> 877 statement in the <code>task</code> function), we need only expect one element to 878 be collected by the queue: a list containing all of the results produced in the 879 computation.</p> 880 881 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> 882 883 <p>In the above example, a single background process was used to manage a number 884 of other processes, with all of them running in the background. However, it can 885 be desirable to manage more than one background process.</p> 886 887 <p>Consider this excerpt from a modified version of the <a 888 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 889 890 <pre> 891 <strong>def task(i):</strong> 892 893 # Initialise the communications queue with a limit on the number of 894 # channels/processes. 895 896 queue = pprocess.Queue(limit=limit) 897 898 # Initialise an array. 899 900 results = [0] * N 901 902 # Wrap the calculate function and manage it. 903 904 calc = queue.manage(pprocess.MakeParallel(calculate)) 905 906 # Perform the work. 907 908 print "Calculating..." 909 <strong>for j in range(0, N):</strong> 910 <strong>calc(i, j)</strong> 911 912 # Store the results as they arrive. 913 914 print "Finishing..." 915 <strong>for i, j, result in queue:</strong> 916 <strong>results[j] = result</strong> 917 918 <strong>return i, results</strong> 919 </pre> 920 921 <p>Just as we see in the previous example, a function called <code>task</code> 922 has been defined to hold a background computation, and this function returns a 923 portion of the results. However, unlike the previous example or the original 924 example, the scope of the results of the computation collected in the function 925 have been changed: here, only results for calculations involving a certain value 926 of <code>i</code> are collected, with the particular value of <code>i</code> 927 returned along with the appropriate portion of the results.</p> 928 929 <p>Now, let us consider the new main program (with the relevant mechanisms 930 highlighted):</p> 931 932 <pre> 933 t = time.time() 934 935 if "--reconnect" not in sys.argv: 936 937 # Wrap the computation and manage it. 938 939 <strong>ptask = pprocess.MakeParallel(task)</strong> 940 941 <strong>for i in range(0, N):</strong> 942 943 # Make a distinct callable for each part of the computation. 944 945 <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> 946 947 # Perform the work. 948 949 <strong>ptask_i(i)</strong> 950 951 # Discard the callable. 952 953 del ptask 954 print "Discarded the callable." 955 956 if "--start" not in sys.argv: 957 958 # Open a queue and reconnect to the task. 959 960 print "Opening a queue." 961 <strong>queue = pprocess.PersistentQueue()</strong> 962 <strong>for i in range(0, N):</strong> 963 <strong>queue.connect("task-%d.socket" % i)</strong> 964 965 # Initialise an array. 966 967 <strong>results = [0] * N</strong> 968 969 # Wait for the results. 970 971 print "Waiting for persistent results" 972 <strong>for i, result in queue:</strong> 973 <strong>results[i] = result</strong> 974 975 # Show the results. 976 977 for i in range(0, N): 978 <strong>for result in results[i]:</strong> 979 print result, 980 print 981 982 print "Time taken:", time.time() - t 983 </pre> 984 985 <p>(This code in context with <code>import</code> statements and functions is 986 found in the <code>examples/simple_persistent_queue.py</code> file.)</p> 987 988 <p>In the first section, the process of making a parallel-aware callable is as 989 expected, but instead of then invoking a single background version of such a 990 callable, as in the previous example, we create a version for each value of 991 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. 992 The result of this is a total of <code>N</code> background processes, each 993 running an invocation of the <code>task</code> function with a distinct value of 994 <code>i</code> (which in turn perform computations), and each employing a 995 UNIX-domain socket for communication with a name of the form 996 <code>task-<em>i</em>.socket</code>.</p> 997 998 <p>In the second section, since we now have more than one background process, we 999 must find a way to monitor them after reconnecting to them; to achieve this, a 1000 <code>PersistentQueue</code> is created, which acts like a regular 1001 <code>Queue</code> object but is instead focused on handling persistent 1002 communications. Upon connecting the queue to each of the previously created 1003 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and 1004 exposes received results through an iterator. Here, the principal difference 1005 from previous examples is the structure of results: instead of collecting each 1006 individual value in a flat <code>i</code> by <code>j</code> array, a list is 1007 returned for each value of <code>i</code> and is stored directly in another 1008 list.</p> 1009 1010 <h3>Applications of Background Computations</h3> 1011 1012 <p>Background computations are useful because they provide flexibility in the 1013 way the results can be collected. One area in which they can be useful is Web 1014 programming, where a process handling an incoming HTTP request may need to 1015 initiate a computation but then immediately send output to the Web client - such 1016 as a page indicating that the computation is "in progress" - without having to 1017 wait for the computation or to allocate resources to monitor it. Moreover, in 1018 some Web architectures, notably those employing the Common Gateway Interface 1019 (CGI), it is necessary for a process handling an incoming request to terminate 1020 before its output will be sent to clients. By using a 1021 <code>BackgroundCallable</code>, a Web server process can initiate a 1022 communication, and then subsequent server processes can be used to reconnect to 1023 the background computation and to wait efficiently for results.</p> 1024 1025 <h2 id="Summary">Summary</h2> 1026 1027 <p>The following table indicates the features used in converting one 1028 sequential example program to another parallel program:</p> 1029 1030 <table border="1" cellspacing="0" cellpadding="5"> 1031 <thead> 1032 <tr> 1033 <th>Sequential Example</th> 1034 <th>Parallel Example</th> 1035 <th>Features Used</th> 1036 </tr> 1037 </thead> 1038 <tbody> 1039 <tr> 1040 <td>simple_map</td> 1041 <td>simple_pmap</td> 1042 <td>pmap</td> 1043 </tr> 1044 <tr> 1045 <td>simple1</td> 1046 <td>simple_managed_map</td> 1047 <td>MakeParallel, Map, manage</td> 1048 </tr> 1049 <tr> 1050 <td rowspan="6">simple2</td> 1051 <td>simple_managed_queue</td> 1052 <td>MakeParallel, Queue, manage</td> 1053 </tr> 1054 <tr> 1055 <td>simple_continuous_queue</td> 1056 <td>Queue, manage (continuous)</td> 1057 </tr> 1058 <tr> 1059 <td>simple_managed</td> 1060 <td>MakeParallel, Exchange (subclass), manage, finish</td> 1061 </tr> 1062 <tr> 1063 <td>simple_start</td> 1064 <td>Channel, Exchange (subclass), start, finish</td> 1065 </tr> 1066 <tr> 1067 <td>simple_background_queue</td> 1068 <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> 1069 </tr> 1070 <tr> 1071 <td>simple_persistent_queue</td> 1072 <td>MakeParallel, BackgroundCallable, PersistentQueue</td> 1073 </tr> 1074 <tr> 1075 <td>simple</td> 1076 <td>simple_create_map</td> 1077 <td>Channel, Map, create, exit</td> 1078 </tr> 1079 </tbody> 1080 </table> 1081 1082 </body> 1083 </html>