1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <ul> 19 <li><a href="#pmap">Converting Map-Style Code</a></li> 20 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> 21 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a> 22 <ul> 23 <li><a href="#Exchange">Replacing Queues with Exchanges</li></a> 24 <li><a href="#channel">Using Channels in Callables</li></a> 25 </ul> 26 </li> 27 <li><a href="#create">Converting Inline Computations</a></li> 28 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> 29 <li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li> 30 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> 31 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> 32 <li><a href="#Summary">Summary</a></li> 33 </ul> 34 35 <p>For a brief summary of each of the features of <code>pprocess</code>, see 36 the <a href="reference.html">reference document</a>.</p> 37 38 <h2 id="pmap">Converting Map-Style Code</h2> 39 40 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 41 42 <pre> 43 t = time.time() 44 45 # Initialise an array. 46 47 sequence = [] 48 for i in range(0, N): 49 for j in range(0, N): 50 sequence.append((i, j)) 51 52 # Perform the work. 53 54 results = map(calculate, sequence) 55 56 # Show the results. 57 58 for i in range(0, N): 59 for result in results[i*N:i*N+N]: 60 print result, 61 print 62 63 print "Time taken:", time.time() - t</pre> 64 65 <p>(This code in context with <code>import</code> statements and functions is 66 found in the <code>examples/simple_map.py</code> file.)</p> 67 68 <p>The principal features of this program involve the preparation of an array 69 for input purposes, and the use of the <code>map</code> function to iterate 70 over the combinations of <code>i</code> and <code>j</code> in the array. Even 71 if the <code>calculate</code> function could be invoked independently for each 72 input value, we have to wait for each computation to complete before 73 initiating a new one. The <code>calculate</code> function may be defined as 74 follows:</p> 75 76 <pre> 77 def calculate(t): 78 79 "A supposedly time-consuming calculation on 't'." 80 81 i, j = t 82 time.sleep(delay) 83 return i * N + j 84 </pre> 85 86 <p>In order to reduce the processing time - to speed the code up, in other 87 words - we can make this code use several processes instead of just one. Here 88 is the modified code:</p> 89 90 <pre> 91 t = time.time() 92 93 # Initialise an array. 94 95 sequence = [] 96 for i in range(0, N): 97 for j in range(0, N): 98 sequence.append((i, j)) 99 100 # Perform the work. 101 102 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 103 104 # Show the results. 105 106 for i in range(0, N): 107 for result in results[i*N:i*N+N]: 108 print result, 109 print 110 111 print "Time taken:", time.time() - t</pre> 112 113 <p>(This code in context with <code>import</code> statements and functions is 114 found in the <code>examples/simple_pmap.py</code> file.)</p> 115 116 <p>By replacing usage of the <code>map</code> function with the 117 <code>pprocess.pmap</code> function, and specifying the limit on the number of 118 processes to be active at any given time (the value of the <code>limit</code> 119 variable is defined elsewhere), several calculations can now be performed in 120 parallel.</p> 121 122 <h2 id="Map">Converting Invocations to Parallel Operations</h2> 123 124 <p>Although some programs make natural use of the <code>map</code> function, 125 others may employ an invocation in a nested loop. This may also be converted 126 to a parallel program. Consider the following Python code:</p> 127 128 <pre> 129 t = time.time() 130 131 # Initialise an array. 132 133 results = [] 134 135 # Perform the work. 136 137 print "Calculating..." 138 for i in range(0, N): 139 for j in range(0, N): 140 results.append(calculate(i, j)) 141 142 # Show the results. 143 144 for i in range(0, N): 145 for result in results[i*N:i*N+N]: 146 print result, 147 print 148 149 print "Time taken:", time.time() - t</pre> 150 151 <p>(This code in context with <code>import</code> statements and functions is 152 found in the <code>examples/simple1.py</code> file.)</p> 153 154 <p>Here, a computation in the <code>calculate</code> function is performed for 155 each combination of <code>i</code> and <code>j</code> in the nested loop, 156 returning a result value. However, we must wait for the completion of this 157 function for each element before moving on to the next element, and this means 158 that the computations are performed sequentially. Consequently, on a system 159 with more than one processor, even if we could call <code>calculate</code> for 160 more than one combination of <code>i</code> and <code>j</code><code></code> 161 and have the computations executing at the same time, the above program will 162 not take advantage of such capabilities.</p> 163 164 <p>We use a slightly modified version of <code>calculate</code> which employs 165 two parameters instead of one:</p> 166 167 <pre> 168 def calculate(i, j): 169 170 """ 171 A supposedly time-consuming calculation on 'i' and 'j'. 172 """ 173 174 time.sleep(delay) 175 return i * N + j 176 </pre> 177 178 <p>In order to reduce the processing time - to speed the code up, in other 179 words - we can make this code use several processes instead of just one. Here 180 is the modified code:</p> 181 182 <pre id="simple_managed_map"> 183 t = time.time() 184 185 # Initialise the results using a map with a limit on the number of 186 # channels/processes. 187 188 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 189 190 # Wrap the calculate function and manage it. 191 192 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 193 194 # Perform the work. 195 196 print "Calculating..." 197 for i in range(0, N): 198 for j in range(0, N): 199 <strong>calc</strong>(i, j) 200 201 # Show the results. 202 203 for i in range(0, N): 204 for result in results[i*N:i*N+N]: 205 print result, 206 print 207 208 print "Time taken:", time.time() - t</pre> 209 210 <p>(This code in context with <code>import</code> statements and functions is 211 found in the <code>examples/simple_managed_map.py</code> file.)</p> 212 213 <p>The principal changes in the above code involve the use of a 214 <code>pprocess.Map</code> object to collect the results, and a version of the 215 <code>calculate</code> function which is managed by the <code>Map</code> 216 object. What the <code>Map</code> object does is to arrange the results of 217 computations such that iterating over the object or accessing the object using 218 list operations provides the results in the same order as their corresponding 219 inputs.</p> 220 221 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> 222 223 <p>In some programs, it is not important to receive the results of 224 computations in any particular order, usually because either the order of 225 these results is irrelevant, or because the results provide "positional" 226 information which let them be handled in an appropriate way. Consider the 227 following Python code:</p> 228 229 <pre> 230 t = time.time() 231 232 # Initialise an array. 233 234 results = [0] * N * N 235 236 # Perform the work. 237 238 print "Calculating..." 239 for i in range(0, N): 240 for j in range(0, N): 241 i2, j2, result = calculate(i, j) 242 results[i2*N+j2] = result 243 244 # Show the results. 245 246 for i in range(0, N): 247 for result in results[i*N:i*N+N]: 248 print result, 249 print 250 251 print "Time taken:", time.time() - t 252 </pre> 253 254 <p>(This code in context with <code>import</code> statements and functions is 255 found in the <code>examples/simple2.py</code> file.)</p> 256 257 <p>Here, a result array is initialised first and each computation is performed 258 sequentially. A significant difference to the previous examples is the return 259 value of the <code>calculate</code> function: the position details 260 corresponding to <code>i</code> and <code>j</code> are returned alongside the 261 result. Obviously, this is of limited value in the above code because the 262 order of the computations and the reception of results is fixed. However, we 263 get no benefit from parallelisation in the above example.</p> 264 265 <p>We can bring the benefits of parallel processing to the above program with 266 the following code:</p> 267 268 <pre id="simple_managed_queue"> 269 t = time.time() 270 271 # Initialise the communications queue with a limit on the number of 272 # channels/processes. 273 274 <strong>queue = pprocess.Queue(limit=limit)</strong> 275 276 # Initialise an array. 277 278 results = [0] * N * N 279 280 # Wrap the calculate function and manage it. 281 282 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 283 284 # Perform the work. 285 286 print "Calculating..." 287 for i in range(0, N): 288 for j in range(0, N): 289 <strong>calc(i, j)</strong> 290 291 # Store the results as they arrive. 292 293 print "Finishing..." 294 <strong>for i, j, result in queue:</strong> 295 <strong>results[i*N+j] = result</strong> 296 297 # Show the results. 298 299 for i in range(0, N): 300 for result in results[i*N:i*N+N]: 301 print result, 302 print 303 304 print "Time taken:", time.time() - t 305 </pre> 306 307 <p>(This code in context with <code>import</code> statements and functions is 308 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 309 310 <p>This revised code employs a <code>pprocess.Queue</code> object whose 311 purpose is to collect the results of computations and to make them available 312 in the order in which they were received. The code collecting results has been 313 moved into a separate loop independent of the original computation loop and 314 taking advantage of the more relevant "positional" information emerging from 315 the queue.</p> 316 317 <h3 id="Exchange">Replacing Queues with Exchanges</h3> 318 319 <p>We can take this example further, illustrating some of the mechanisms 320 employed by <code>pprocess</code>. Instead of collecting results in a queue, 321 we can define a class containing a method which is called when new results 322 arrive:</p> 323 324 <pre> 325 class MyExchange(pprocess.Exchange): 326 327 "Parallel convenience class containing the array assignment operation." 328 329 def store_data(self, ch): 330 i, j, result = ch.receive() 331 self.D[i*N+j] = result 332 </pre> 333 334 <p>This code exposes the channel paradigm which is used throughout 335 <code>pprocess</code> and is available to applications, if desired. The effect 336 of the method is the storage of a result received through the channel in an 337 attribute of the object. The following code shows how this class can be used, 338 with differences to the previous program illustrated:</p> 339 340 <pre> 341 t = time.time() 342 343 # Initialise the communications exchange with a limit on the number of 344 # channels/processes. 345 346 <strong>exchange = MyExchange(limit=limit)</strong> 347 348 # Initialise an array - it is stored in the exchange to permit automatic 349 # assignment of values as the data arrives. 350 351 <strong>results = exchange.D = [0] * N * N</strong> 352 353 # Wrap the calculate function and manage it. 354 355 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 356 357 # Perform the work. 358 359 print "Calculating..." 360 for i in range(0, N): 361 for j in range(0, N): 362 calc(i, j) 363 364 # Wait for the results. 365 366 print "Finishing..." 367 <strong>exchange.finish()</strong> 368 369 # Show the results. 370 371 for i in range(0, N): 372 for result in results[i*N:i*N+N]: 373 print result, 374 print 375 376 print "Time taken:", time.time() - t 377 </pre> 378 379 <p>(This code in context with <code>import</code> statements and functions is 380 found in the <code>examples/simple_managed.py</code> file.)</p> 381 382 <p>The main visible differences between this and the previous program are the 383 storage of the result array in the exchange, the removal of the queue 384 consumption code from the main program, placing the act of storing values in 385 the exchange's <code>store_data</code> method, and the need to call the 386 <code>finish</code> method on the <code>MyExchange</code> object so that we do 387 not try and access the results too soon. One underlying benefit not visible in 388 the above code is that we no longer need to accumulate results in a queue or 389 other structure so that they may be processed and assigned to the correct 390 positions in the result array.</p> 391 392 <h3 id="channel">Using Channels in Callables</h3> 393 394 <p>For the curious, we may remove some of the remaining conveniences of the 395 above program to expose other features of <code>pprocess</code>. First, we 396 define a slightly modified version of the <code>calculate</code> function:</p> 397 398 <pre> 399 def calculate(ch, i, j): 400 401 """ 402 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 403 communicate with the parent process. 404 """ 405 406 time.sleep(delay) 407 ch.send((i, j, i * N + j)) 408 </pre> 409 410 <p>This function accepts a channel, <code>ch</code>, through which results 411 will be sent, and through which other values could potentially be received, 412 although we choose not to do so here. The program using this function is as 413 follows, with differences to the previous program illustrated:</p> 414 415 <pre> 416 t = time.time() 417 418 # Initialise the communications exchange with a limit on the number of 419 # channels/processes. 420 421 exchange = MyExchange(limit=limit) 422 423 # Initialise an array - it is stored in the exchange to permit automatic 424 # assignment of values as the data arrives. 425 426 results = exchange.D = [0] * N * N 427 428 # Perform the work. 429 430 print "Calculating..." 431 for i in range(0, N): 432 for j in range(0, N): 433 <strong>exchange.start(calculate, i, j)</strong> 434 435 # Wait for the results. 436 437 print "Finishing..." 438 exchange.finish() 439 440 # Show the results. 441 442 for i in range(0, N): 443 for result in results[i*N:i*N+N]: 444 print result, 445 print 446 447 print "Time taken:", time.time() - t 448 </pre> 449 450 <p>(This code in context with <code>import</code> statements and functions is 451 found in the <code>examples/simple_start.py</code> file.)</p> 452 453 <p>Here, we have discarded two conveniences: the wrapping of callables using 454 <code>MakeParallel</code>, which lets us use functions without providing any 455 channel parameters, and the management of callables using the 456 <code>manage</code> method on queues, exchanges, and so on. The 457 <code>start</code> method still calls the provided callable, but using a 458 different notation from that employed previously.</p> 459 460 <h2 id="create">Converting Inline Computations</h2> 461 462 <p>Although many programs employ functions and other useful abstractions which 463 can be treated as parallelisable units, some programs perform computations 464 "inline", meaning that the code responsible appears directly within a loop or 465 related control-flow construct. Consider the following code:</p> 466 467 <pre> 468 t = time.time() 469 470 # Initialise an array. 471 472 results = [0] * N * N 473 474 # Perform the work. 475 476 print "Calculating..." 477 for i in range(0, N): 478 for j in range(0, N): 479 time.sleep(delay) 480 results[i*N+j] = i * N + j 481 482 # Show the results. 483 484 for i in range(0, N): 485 for result in results[i*N:i*N+N]: 486 print result, 487 print 488 489 print "Time taken:", time.time() - t 490 </pre> 491 492 <p>(This code in context with <code>import</code> statements and functions is 493 found in the <code>examples/simple.py</code> file.)</p> 494 495 <p>To simulate "work", as in the different versions of the 496 <code>calculate</code> function, we use the <code>time.sleep</code> function 497 (which does not actually do work, and which will cause a process to be 498 descheduled in most cases, but which simulates the delay associated with work 499 being done). This inline work, which must be performed sequentially in the 500 above program, can be performed in parallel in a somewhat modified version of 501 the program:</p> 502 503 <pre> 504 t = time.time() 505 506 # Initialise the results using a map with a limit on the number of 507 # channels/processes. 508 509 <strong>results = pprocess.Map(limit=limit)</strong> 510 511 # Perform the work. 512 # NOTE: Could use the with statement in the loop to package the 513 # NOTE: try...finally functionality. 514 515 print "Calculating..." 516 for i in range(0, N): 517 for j in range(0, N): 518 <strong>ch = results.create()</strong> 519 <strong>if ch:</strong> 520 <strong>try: # Calculation work.</strong> 521 522 time.sleep(delay) 523 <strong>ch.send(i * N + j)</strong> 524 525 <strong>finally: # Important finalisation.</strong> 526 527 <strong>pprocess.exit(ch)</strong> 528 529 # Show the results. 530 531 for i in range(0, N): 532 for result in results[i*N:i*N+N]: 533 print result, 534 print 535 536 print "Time taken:", time.time() - t 537 </pre> 538 539 <p>(This code in context with <code>import</code> statements and functions is 540 found in the <code>examples/simple_create_map.py</code> file.)</p> 541 542 <p>Although seemingly more complicated, the bulk of the changes in this 543 modified program are focused on obtaining a channel object, <code>ch</code>, 544 at the point where the computations are performed, and the wrapping of the 545 computation code in a <code>try</code>...<code>finally</code> statement which 546 ensures that the process associated with the channel exits when the 547 computation is complete. In order for the results of these computations to be 548 collected, a <code>pprocess.Map</code> object is used, since it will maintain 549 the results in the same order as the initiation of the computations which 550 produced them.</p> 551 552 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> 553 554 <p>One notable aspect of the above programs when parallelised is that each 555 invocation of a computation in parallel creates a new process in which the 556 computation is to be performed, regardless of whether existing processes had 557 just finished producing results and could theoretically have been asked to 558 perform new computations. In other words, processes were created and destroyed 559 instead of being reused.</p> 560 561 <p>However, we can request that processes be reused for computations by 562 enabling the <code>reuse</code> feature of exchange-like objects and employing 563 suitable reusable callables. Consider this modified version of the <a 564 href="#simple_managed_map">simple_managed_map</a> program:</p> 565 566 <pre> 567 t = time.time() 568 569 # Initialise the results using a map with a limit on the number of 570 # channels/processes. 571 572 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 573 574 # Wrap the calculate function and manage it. 575 576 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 577 578 # Perform the work. 579 580 print "Calculating..." 581 for i in range(0, N): 582 for j in range(0, N): 583 calc(i, j) 584 585 # Show the results. 586 587 for i in range(0, N): 588 for result in results[i*N:i*N+N]: 589 print result, 590 print 591 592 print "Time taken:", time.time() - t 593 </pre> 594 595 <p>(This code in context with <code>import</code> statements and functions is 596 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 597 598 <p>By indicating that processes and channels shall be reused, and by wrapping 599 the <code>calculate</code> function with the necessary support, the 600 computations may be performed in parallel using a pool of processes instead of 601 creating a new process for each computation and then discarding it, only to 602 create a new process for the next computation.</p> 603 604 <h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2> 605 606 <p>Although reusable processes offer the opportunity to invoke a callable over 607 and over within the same created process, they do not fully support the 608 potential of the underlying mechanisms in <code>pprocess</code>: created 609 processes can communicate multiple values to the creating process and can 610 theoretically run within the same callable forever.</p> 611 612 <p>Consider this modified form of the <code>calculate</code> function:</p> 613 614 <pre> 615 def calculate(ch, i): 616 617 """ 618 A supposedly time-consuming calculation on 'i'. 619 """ 620 621 for j in range(0, N): 622 time.sleep(delay) 623 ch.send((i, j, i * N + j)) 624 </pre> 625 626 <p>This function accepts a channel <code>ch</code> together with an argument 627 <code>i</code> corresponding to an entire row of the input array, as opposed 628 to having two arguments (<code>i</code> and <code>j</code>) corresponding to a 629 single cell in the input array. In this function, a series of calculations are 630 performed and a number of values are returned through the channel, without the 631 function terminating until all values have been returned for the row data.</p> 632 633 <p>To use this modified function, a modified version of the 634 <a href="#simple_managed_queue">simple_managed_queue</a> program is used:</p> 635 636 <pre> 637 t = time.time() 638 639 # Initialise the communications queue with a limit on the number of 640 # channels/processes. 641 642 queue = pprocess.Queue(limit=limit<strong>, continuous=1</strong>) 643 644 # Initialise an array. 645 646 results = [0] * N * N 647 648 # Manage the calculate function. 649 650 calc = queue.manage(<strong>calculate</strong>) 651 652 # Perform the work. 653 654 print "Calculating..." 655 for i in range(0, N): 656 <strong>calc(i)</strong> 657 658 # Store the results as they arrive. 659 660 print "Finishing..." 661 for i, j, result in queue: 662 results[i*N+j] = result 663 664 # Show the results. 665 666 for i in range(0, N): 667 for result in results[i*N:i*N+N]: 668 print result, 669 print 670 671 print "Time taken:", time.time() - t 672 </pre> 673 674 <p>(This code in context with <code>import</code> statements and functions is 675 found in the <code>examples/simple_continuous_queue.py</code> file.)</p> 676 677 <p>Although the inner loop in the work section relocated to the 678 <code>calculate</code> function, the queue still receives outputs from that 679 function with positional information and a result for the result array. Thus, 680 no change is needed for the retrieval of the results: they arrive in the queue 681 as before.</p> 682 683 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> 684 685 <p>Occasionally, it is desirable to initiate time-consuming computations and to 686 not only leave such processes running in the background, but to be able to detach 687 the creating process from them completely, potentially terminating the creating 688 process altogether, and yet also be able to collect the results of the created 689 processes at a later time, potentially in another completely different process. 690 For such situations, we can make use of the <code>BackgroundCallable</code> 691 class, which converts a parallel-aware callable into a callable which will run 692 in a background process when invoked.</p> 693 694 <p>Consider this excerpt from a modified version of the <a 695 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 696 697 <pre> 698 <strong>def task():</strong> 699 700 # Initialise the communications queue with a limit on the number of 701 # channels/processes. 702 703 queue = pprocess.Queue(limit=limit) 704 705 # Initialise an array. 706 707 results = [0] * N * N 708 709 # Wrap the calculate function and manage it. 710 711 calc = queue.manage(pprocess.MakeParallel(calculate)) 712 713 # Perform the work. 714 715 print "Calculating..." 716 for i in range(0, N): 717 for j in range(0, N): 718 calc(i, j) 719 720 # Store the results as they arrive. 721 722 print "Finishing..." 723 for i, j, result in queue: 724 results[i*N+j] = result 725 726 <strong>return results</strong> 727 </pre> 728 729 <p>Here, we have converted the main program into a function, and instead of 730 printing out the results, we return the results list from the function.</p> 731 732 <p>Now, let us consider the new main program (with the relevant mechanisms 733 highlighted):</p> 734 735 <pre> 736 t = time.time() 737 738 if "--reconnect" not in sys.argv: 739 740 # Wrap the computation and manage it. 741 742 <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> 743 744 # Perform the work. 745 746 ptask() 747 748 # Discard the callable. 749 750 del ptask 751 print "Discarded the callable." 752 753 if "--start" not in sys.argv: 754 755 # Open a queue and reconnect to the task. 756 757 print "Opening a queue." 758 <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> 759 760 # Wait for the results. 761 762 print "Waiting for persistent results" 763 for results in queue: 764 pass # should only be one element 765 766 # Show the results. 767 768 for i in range(0, N): 769 for result in results[i*N:i*N+N]: 770 print result, 771 print 772 773 print "Time taken:", time.time() - t 774 </pre> 775 776 <p>(This code in context with <code>import</code> statements and functions is 777 found in the <code>examples/simple_background_queue.py</code> file.)</p> 778 779 <p>This new main program has two parts: the part which initiates the 780 computation, and the part which connects to the computation in order to collect 781 the results. Both parts can be run in the same process, and this should result 782 in similar behaviour to that of the original 783 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> 784 785 <p>In the above program, however, we are free to specify <code>--start</code> as 786 an option when running the program, and the result of this is merely to initiate 787 the computation in a background process, using <code>BackgroundCallable</code> 788 to obtain a callable which, when invoked, creates the background process and 789 runs the computation. After doing this, the program will then exit, but it will 790 leave the computation running as a collection of background processes, and a 791 special file called <code>task.socket</code> will exist in the current working 792 directory.</p> 793 794 <p>When the above program is run using the <code>--reconnect</code> option, an 795 attempt will be made to reconnect to the background processes already created by 796 attempting to contact them using the previously created <code>task.socket</code> 797 special file (which is, in fact, a UNIX-domain socket); this being done using 798 the <code>BackgroundQueue</code> function which will handle the incoming results 799 in a fashion similar to that of a <code>Queue</code> object. Since only one 800 result is returned by the computation (as defined by the <code>return</code> 801 statement in the <code>task</code> function), we need only expect one element to 802 be collected by the queue: a list containing all of the results produced in the 803 computation.</p> 804 805 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> 806 807 <p>In the above example, a single background process was used to manage a number 808 of other processes, with all of them running in the background. However, it can 809 be desirable to manage more than one background process.</p> 810 811 <p>Consider this excerpt from a modified version of the <a 812 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 813 814 <pre> 815 <strong>def task(i):</strong> 816 817 # Initialise the communications queue with a limit on the number of 818 # channels/processes. 819 820 queue = pprocess.Queue(limit=limit) 821 822 # Initialise an array. 823 824 results = [0] * N 825 826 # Wrap the calculate function and manage it. 827 828 calc = queue.manage(pprocess.MakeParallel(calculate)) 829 830 # Perform the work. 831 832 print "Calculating..." 833 <strong>for j in range(0, N):</strong> 834 <strong>calc(i, j)</strong> 835 836 # Store the results as they arrive. 837 838 print "Finishing..." 839 <strong>for i, j, result in queue:</strong> 840 <strong>results[j] = result</strong> 841 842 <strong>return i, results</strong> 843 </pre> 844 845 <p>Just as we see in the previous example, a function called <code>task</code> 846 has been defined to hold a background computation, and this function returns a 847 portion of the results. However, unlike the previous example or the original 848 example, the scope of the results of the computation collected in the function 849 have been changed: here, only results for calculations involving a certain value 850 of <code>i</code> are collected, with the particular value of <code>i</code> 851 returned along with the appropriate portion of the results.</p> 852 853 <p>Now, let us consider the new main program (with the relevant mechanisms 854 highlighted):</p> 855 856 <pre> 857 t = time.time() 858 859 if "--reconnect" not in sys.argv: 860 861 # Wrap the computation and manage it. 862 863 <strong>ptask = pprocess.MakeParallel(task)</strong> 864 865 <strong>for i in range(0, N):</strong> 866 867 # Make a distinct callable for each part of the computation. 868 869 <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> 870 871 # Perform the work. 872 873 <strong>ptask_i(i)</strong> 874 875 # Discard the callable. 876 877 del ptask 878 print "Discarded the callable." 879 880 if "--start" not in sys.argv: 881 882 # Open a queue and reconnect to the task. 883 884 print "Opening a queue." 885 <strong>queue = pprocess.PersistentQueue()</strong> 886 <strong>for i in range(0, N):</strong> 887 <strong>queue.connect("task-%d.socket" % i)</strong> 888 889 # Initialise an array. 890 891 <strong>results = [0] * N</strong> 892 893 # Wait for the results. 894 895 print "Waiting for persistent results" 896 <strong>for i, result in queue:</strong> 897 <strong>results[i] = result</strong> 898 899 # Show the results. 900 901 for i in range(0, N): 902 <strong>for result in results[i]:</strong> 903 print result, 904 print 905 906 print "Time taken:", time.time() - t 907 </pre> 908 909 <p>(This code in context with <code>import</code> statements and functions is 910 found in the <code>examples/simple_persistent_queue.py</code> file.)</p> 911 912 <p>In the first section, the process of making a parallel-aware callable is as 913 expected, but instead of then invoking a single background version of such a 914 callable, as in the previous example, we create a version for each value of 915 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. 916 The result of this is a total of <code>N</code> background processes, each 917 running an invocation of the <code>task</code> function with a distinct value of 918 <code>i</code> (which in turn perform computations), and each employing a 919 UNIX-domain socket for communication with a name of the form 920 <code>task-<em>i</em>.socket</code>.</p> 921 922 <p>In the second section, since we now have more than one background process, we 923 must find a way to monitor them after reconnecting to them; to achieve this, a 924 <code>PersistentQueue</code> is created, which acts like a regular 925 <code>Queue</code> object but is instead focused on handling persistent 926 communications. Upon connecting the queue to each of the previously created 927 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and 928 exposes received results through an iterator. Here, the principal difference 929 from previous examples is the structure of results: instead of collecting each 930 individual value in a flat <code>i</code> by <code>j</code> array, a list is 931 returned for each value of <code>i</code> and is stored directly in another 932 list.</p> 933 934 <h3>Applications of Background Computations</h3> 935 936 <p>Background computations are useful because they provide flexibility in the 937 way the results can be collected. One area in which they can be useful is Web 938 programming, where a process handling an incoming HTTP request may need to 939 initiate a computation but then immediately send output to the Web client - such 940 as a page indicating that the computation is "in progress" - without having to 941 wait for the computation or to allocate resources to monitor it. Moreover, in 942 some Web architectures, notably those employing the Common Gateway Interface 943 (CGI), it is necessary for a process handling an incoming request to terminate 944 before its output will be sent to clients. By using a 945 <code>BackgroundCallable</code>, a Web server process can initiate a 946 communication, and then subsequent server processes can be used to reconnect to 947 the background computation and to wait efficiently for results.</p> 948 949 <h2 id="Summary">Summary</h2> 950 951 <p>The following table indicates the features used in converting one 952 sequential example program to another parallel program:</p> 953 954 <table border="1" cellspacing="0" cellpadding="5"> 955 <thead> 956 <tr> 957 <th>Sequential Example</th> 958 <th>Parallel Example</th> 959 <th>Features Used</th> 960 </tr> 961 </thead> 962 <tbody> 963 <tr> 964 <td>simple_map</td> 965 <td>simple_pmap</td> 966 <td>pmap</td> 967 </tr> 968 <tr> 969 <td>simple1</td> 970 <td>simple_managed_map</td> 971 <td>MakeParallel, Map, manage</td> 972 </tr> 973 <tr> 974 <td rowspan="6">simple2</td> 975 <td>simple_managed_queue</td> 976 <td>MakeParallel, Queue, manage</td> 977 </tr> 978 <tr> 979 <td>simple_continuous_queue</td> 980 <td>Queue, manage (continuous)</td> 981 </tr> 982 <tr> 983 <td>simple_managed</td> 984 <td>MakeParallel, Exchange (subclass), manage, finish</td> 985 </tr> 986 <tr> 987 <td>simple_start</td> 988 <td>Channel, Exchange (subclass), start, finish</td> 989 </tr> 990 <tr> 991 <td>simple_background_queue</td> 992 <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> 993 </tr> 994 <tr> 995 <td>simple_persistent_queue</td> 996 <td>MakeParallel, BackgroundCallable, PersistentQueue</td> 997 </tr> 998 <tr> 999 <td>simple</td> 1000 <td>simple_create_map</td> 1001 <td>Channel, Map, create, exit</td> 1002 </tr> 1003 </tbody> 1004 </table> 1005 1006 </body> 1007 </html>