1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <ul> 19 <li><a href="#pmap">Converting Map-Style Code</a></li> 20 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> 21 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a></li> 22 <li><a href="#create">Converting Inline Computations</a></li> 23 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> 24 <li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li> 25 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> 26 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> 27 <li><a href="#Summary">Summary</a></li> 28 </ul> 29 30 <p>For a brief summary of each of the features of <code>pprocess</code>, see 31 the <a href="reference.html">reference document</a>.</p> 32 33 <h2 id="pmap">Converting Map-Style Code</h2> 34 35 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 36 37 <pre> 38 t = time.time() 39 40 # Initialise an array. 41 42 sequence = [] 43 for i in range(0, N): 44 for j in range(0, N): 45 sequence.append((i, j)) 46 47 # Perform the work. 48 49 results = map(calculate, sequence) 50 51 # Show the results. 52 53 for i in range(0, N): 54 for result in results[i*N:i*N+N]: 55 print result, 56 print 57 58 print "Time taken:", time.time() - t</pre> 59 60 <p>(This code in context with <code>import</code> statements and functions is 61 found in the <code>examples/simple_map.py</code> file.)</p> 62 63 <p>The principal features of this program involve the preparation of an array 64 for input purposes, and the use of the <code>map</code> function to iterate 65 over the combinations of <code>i</code> and <code>j</code> in the array. Even 66 if the <code>calculate</code> function could be invoked independently for each 67 input value, we have to wait for each computation to complete before 68 initiating a new one. The <code>calculate</code> function may be defined as 69 follows:</p> 70 71 <pre> 72 def calculate(t): 73 74 "A supposedly time-consuming calculation on 't'." 75 76 i, j = t 77 time.sleep(delay) 78 return i * N + j 79 </pre> 80 81 <p>In order to reduce the processing time - to speed the code up, in other 82 words - we can make this code use several processes instead of just one. Here 83 is the modified code:</p> 84 85 <pre> 86 t = time.time() 87 88 # Initialise an array. 89 90 sequence = [] 91 for i in range(0, N): 92 for j in range(0, N): 93 sequence.append((i, j)) 94 95 # Perform the work. 96 97 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 98 99 # Show the results. 100 101 for i in range(0, N): 102 for result in results[i*N:i*N+N]: 103 print result, 104 print 105 106 print "Time taken:", time.time() - t</pre> 107 108 <p>(This code in context with <code>import</code> statements and functions is 109 found in the <code>examples/simple_pmap.py</code> file.)</p> 110 111 <p>By replacing usage of the <code>map</code> function with the 112 <code>pprocess.pmap</code> function, and specifying the limit on the number of 113 processes to be active at any given time (the value of the <code>limit</code> 114 variable is defined elsewhere), several calculations can now be performed in 115 parallel.</p> 116 117 <h2 id="Map">Converting Invocations to Parallel Operations</h2> 118 119 <p>Although some programs make natural use of the <code>map</code> function, 120 others may employ an invocation in a nested loop. This may also be converted 121 to a parallel program. Consider the following Python code:</p> 122 123 <pre> 124 t = time.time() 125 126 # Initialise an array. 127 128 results = [] 129 130 # Perform the work. 131 132 print "Calculating..." 133 for i in range(0, N): 134 for j in range(0, N): 135 results.append(calculate(i, j)) 136 137 # Show the results. 138 139 for i in range(0, N): 140 for result in results[i*N:i*N+N]: 141 print result, 142 print 143 144 print "Time taken:", time.time() - t</pre> 145 146 <p>(This code in context with <code>import</code> statements and functions is 147 found in the <code>examples/simple1.py</code> file.)</p> 148 149 <p>Here, a computation in the <code>calculate</code> function is performed for 150 each combination of <code>i</code> and <code>j</code> in the nested loop, 151 returning a result value. However, we must wait for the completion of this 152 function for each element before moving on to the next element, and this means 153 that the computations are performed sequentially. Consequently, on a system 154 with more than one processor, even if we could call <code>calculate</code> for 155 more than one combination of <code>i</code> and <code>j</code><code></code> 156 and have the computations executing at the same time, the above program will 157 not take advantage of such capabilities.</p> 158 159 <p>We use a slightly modified version of <code>calculate</code> which employs 160 two parameters instead of one:</p> 161 162 <pre> 163 def calculate(i, j): 164 165 """ 166 A supposedly time-consuming calculation on 'i' and 'j'. 167 """ 168 169 time.sleep(delay) 170 return i * N + j 171 </pre> 172 173 <p>In order to reduce the processing time - to speed the code up, in other 174 words - we can make this code use several processes instead of just one. Here 175 is the modified code:</p> 176 177 <pre id="simple_managed_map"> 178 t = time.time() 179 180 # Initialise the results using a map with a limit on the number of 181 # channels/processes. 182 183 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 184 185 # Wrap the calculate function and manage it. 186 187 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 188 189 # Perform the work. 190 191 print "Calculating..." 192 for i in range(0, N): 193 for j in range(0, N): 194 <strong>calc</strong>(i, j) 195 196 # Show the results. 197 198 for i in range(0, N): 199 for result in results[i*N:i*N+N]: 200 print result, 201 print 202 203 print "Time taken:", time.time() - t</pre> 204 205 <p>(This code in context with <code>import</code> statements and functions is 206 found in the <code>examples/simple_managed_map.py</code> file.)</p> 207 208 <p>The principal changes in the above code involve the use of a 209 <code>pprocess.Map</code> object to collect the results, and a version of the 210 <code>calculate</code> function which is managed by the <code>Map</code> 211 object. What the <code>Map</code> object does is to arrange the results of 212 computations such that iterating over the object or accessing the object using 213 list operations provides the results in the same order as their corresponding 214 inputs.</p> 215 216 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> 217 218 <p>In some programs, it is not important to receive the results of 219 computations in any particular order, usually because either the order of 220 these results is irrelevant, or because the results provide "positional" 221 information which let them be handled in an appropriate way. Consider the 222 following Python code:</p> 223 224 <pre> 225 t = time.time() 226 227 # Initialise an array. 228 229 results = [0] * N * N 230 231 # Perform the work. 232 233 print "Calculating..." 234 for i in range(0, N): 235 for j in range(0, N): 236 i2, j2, result = calculate(i, j) 237 results[i2*N+j2] = result 238 239 # Show the results. 240 241 for i in range(0, N): 242 for result in results[i*N:i*N+N]: 243 print result, 244 print 245 246 print "Time taken:", time.time() - t 247 </pre> 248 249 <p>(This code in context with <code>import</code> statements and functions is 250 found in the <code>examples/simple2.py</code> file.)</p> 251 252 <p>Here, a result array is initialised first and each computation is performed 253 sequentially. A significant difference to the previous examples is the return 254 value of the <code>calculate</code> function: the position details 255 corresponding to <code>i</code> and <code>j</code> are returned alongside the 256 result. Obviously, this is of limited value in the above code because the 257 order of the computations and the reception of results is fixed. However, we 258 get no benefit from parallelisation in the above example.</p> 259 260 <p>We can bring the benefits of parallel processing to the above program with 261 the following code:</p> 262 263 <pre id="simple_managed_queue"> 264 t = time.time() 265 266 # Initialise the communications queue with a limit on the number of 267 # channels/processes. 268 269 <strong>queue = pprocess.Queue(limit=limit)</strong> 270 271 # Initialise an array. 272 273 results = [0] * N * N 274 275 # Wrap the calculate function and manage it. 276 277 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 278 279 # Perform the work. 280 281 print "Calculating..." 282 for i in range(0, N): 283 for j in range(0, N): 284 <strong>calc(i, j)</strong> 285 286 # Store the results as they arrive. 287 288 print "Finishing..." 289 <strong>for i, j, result in queue:</strong> 290 <strong>results[i*N+j] = result</strong> 291 292 # Show the results. 293 294 for i in range(0, N): 295 for result in results[i*N:i*N+N]: 296 print result, 297 print 298 299 print "Time taken:", time.time() - t 300 </pre> 301 302 <p>(This code in context with <code>import</code> statements and functions is 303 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 304 305 <p>This revised code employs a <code>pprocess.Queue</code> object whose 306 purpose is to collect the results of computations and to make them available 307 in the order in which they were received. The code collecting results has been 308 moved into a separate loop independent of the original computation loop and 309 taking advantage of the more relevant "positional" information emerging from 310 the queue.</p> 311 312 <p>We can take this example further, illustrating some of the mechanisms 313 employed by <code>pprocess</code>. Instead of collecting results in a queue, 314 we can define a class containing a method which is called when new results 315 arrive:</p> 316 317 <pre> 318 class MyExchange(pprocess.Exchange): 319 320 "Parallel convenience class containing the array assignment operation." 321 322 def store_data(self, ch): 323 i, j, result = ch.receive() 324 self.D[i*N+j] = result 325 </pre> 326 327 <p>This code exposes the channel paradigm which is used throughout 328 <code>pprocess</code> and is available to applications, if desired. The effect 329 of the method is the storage of a result received through the channel in an 330 attribute of the object. The following code shows how this class can be used, 331 with differences to the previous program illustrated:</p> 332 333 <pre> 334 t = time.time() 335 336 # Initialise the communications exchange with a limit on the number of 337 # channels/processes. 338 339 <strong>exchange = MyExchange(limit=limit)</strong> 340 341 # Initialise an array - it is stored in the exchange to permit automatic 342 # assignment of values as the data arrives. 343 344 <strong>results = exchange.D = [0] * N * N</strong> 345 346 # Wrap the calculate function and manage it. 347 348 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 349 350 # Perform the work. 351 352 print "Calculating..." 353 for i in range(0, N): 354 for j in range(0, N): 355 calc(i, j) 356 357 # Wait for the results. 358 359 print "Finishing..." 360 <strong>exchange.finish()</strong> 361 362 # Show the results. 363 364 for i in range(0, N): 365 for result in results[i*N:i*N+N]: 366 print result, 367 print 368 369 print "Time taken:", time.time() - t 370 </pre> 371 372 <p>(This code in context with <code>import</code> statements and functions is 373 found in the <code>examples/simple_managed.py</code> file.)</p> 374 375 <p>The main visible differences between this and the previous program are the 376 storage of the result array in the exchange, the removal of the queue 377 consumption code from the main program, placing the act of storing values in 378 the exchange's <code>store_data</code> method, and the need to call the 379 <code>finish</code> method on the <code>MyExchange</code> object so that we do 380 not try and access the results too soon. One underlying benefit not visible in 381 the above code is that we no longer need to accumulate results in a queue or 382 other structure so that they may be processed and assigned to the correct 383 positions in the result array.</p> 384 385 <p>For the curious, we may remove some of the remaining conveniences of the 386 above program to expose other features of <code>pprocess</code>. First, we 387 define a slightly modified version of the <code>calculate</code> function:</p> 388 389 <pre> 390 def calculate(ch, i, j): 391 392 """ 393 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 394 communicate with the parent process. 395 """ 396 397 time.sleep(delay) 398 ch.send((i, j, i * N + j)) 399 </pre> 400 401 <p>This function accepts a channel, <code>ch</code>, through which results 402 will be sent, and through which other values could potentially be received, 403 although we choose not to do so here. The program using this function is as 404 follows, with differences to the previous program illustrated:</p> 405 406 <pre> 407 t = time.time() 408 409 # Initialise the communications exchange with a limit on the number of 410 # channels/processes. 411 412 exchange = MyExchange(limit=limit) 413 414 # Initialise an array - it is stored in the exchange to permit automatic 415 # assignment of values as the data arrives. 416 417 results = exchange.D = [0] * N * N 418 419 # Perform the work. 420 421 print "Calculating..." 422 for i in range(0, N): 423 for j in range(0, N): 424 <strong>exchange.start(calculate, i, j)</strong> 425 426 # Wait for the results. 427 428 print "Finishing..." 429 exchange.finish() 430 431 # Show the results. 432 433 for i in range(0, N): 434 for result in results[i*N:i*N+N]: 435 print result, 436 print 437 438 print "Time taken:", time.time() - t 439 </pre> 440 441 <p>(This code in context with <code>import</code> statements and functions is 442 found in the <code>examples/simple_start.py</code> file.)</p> 443 444 <p>Here, we have discarded two conveniences: the wrapping of callables using 445 <code>MakeParallel</code>, which lets us use functions without providing any 446 channel parameters, and the management of callables using the 447 <code>manage</code> method on queues, exchanges, and so on. The 448 <code>start</code> method still calls the provided callable, but using a 449 different notation from that employed previously.</p> 450 451 <h2 id="create">Converting Inline Computations</h2> 452 453 <p>Although many programs employ functions and other useful abstractions which 454 can be treated as parallelisable units, some programs perform computations 455 "inline", meaning that the code responsible appears directly within a loop or 456 related control-flow construct. Consider the following code:</p> 457 458 <pre> 459 t = time.time() 460 461 # Initialise an array. 462 463 results = [0] * N * N 464 465 # Perform the work. 466 467 print "Calculating..." 468 for i in range(0, N): 469 for j in range(0, N): 470 time.sleep(delay) 471 results[i*N+j] = i * N + j 472 473 # Show the results. 474 475 for i in range(0, N): 476 for result in results[i*N:i*N+N]: 477 print result, 478 print 479 480 print "Time taken:", time.time() - t 481 </pre> 482 483 <p>(This code in context with <code>import</code> statements and functions is 484 found in the <code>examples/simple.py</code> file.)</p> 485 486 <p>To simulate "work", as in the different versions of the 487 <code>calculate</code> function, we use the <code>time.sleep</code> function 488 (which does not actually do work, and which will cause a process to be 489 descheduled in most cases, but which simulates the delay associated with work 490 being done). This inline work, which must be performed sequentially in the 491 above program, can be performed in parallel in a somewhat modified version of 492 the program:</p> 493 494 <pre> 495 t = time.time() 496 497 # Initialise the results using a map with a limit on the number of 498 # channels/processes. 499 500 <strong>results = pprocess.Map(limit=limit)</strong> 501 502 # Perform the work. 503 # NOTE: Could use the with statement in the loop to package the 504 # NOTE: try...finally functionality. 505 506 print "Calculating..." 507 for i in range(0, N): 508 for j in range(0, N): 509 <strong>ch = results.create()</strong> 510 <strong>if ch:</strong> 511 <strong>try: # Calculation work.</strong> 512 513 time.sleep(delay) 514 <strong>ch.send(i * N + j)</strong> 515 516 <strong>finally: # Important finalisation.</strong> 517 518 <strong>pprocess.exit(ch)</strong> 519 520 # Show the results. 521 522 for i in range(0, N): 523 for result in results[i*N:i*N+N]: 524 print result, 525 print 526 527 print "Time taken:", time.time() - t 528 </pre> 529 530 <p>(This code in context with <code>import</code> statements and functions is 531 found in the <code>examples/simple_create_map.py</code> file.)</p> 532 533 <p>Although seemingly more complicated, the bulk of the changes in this 534 modified program are focused on obtaining a channel object, <code>ch</code>, 535 at the point where the computations are performed, and the wrapping of the 536 computation code in a <code>try</code>...<code>finally</code> statement which 537 ensures that the process associated with the channel exits when the 538 computation is complete. In order for the results of these computations to be 539 collected, a <code>pprocess.Map</code> object is used, since it will maintain 540 the results in the same order as the initiation of the computations which 541 produced them.</p> 542 543 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> 544 545 <p>One notable aspect of the above programs when parallelised is that each 546 invocation of a computation in parallel creates a new process in which the 547 computation is to be performed, regardless of whether existing processes had 548 just finished producing results and could theoretically have been asked to 549 perform new computations. In other words, processes were created and destroyed 550 instead of being reused.</p> 551 552 <p>However, we can request that processes be reused for computations by 553 enabling the <code>reuse</code> feature of exchange-like objects and employing 554 suitable reusable callables. Consider this modified version of the <a 555 href="#simple_managed_map">simple_managed_map</a> program:</p> 556 557 <pre> 558 t = time.time() 559 560 # Initialise the results using a map with a limit on the number of 561 # channels/processes. 562 563 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 564 565 # Wrap the calculate function and manage it. 566 567 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 568 569 # Perform the work. 570 571 print "Calculating..." 572 for i in range(0, N): 573 for j in range(0, N): 574 calc(i, j) 575 576 # Show the results. 577 578 for i in range(0, N): 579 for result in results[i*N:i*N+N]: 580 print result, 581 print 582 583 print "Time taken:", time.time() - t 584 </pre> 585 586 <p>(This code in context with <code>import</code> statements and functions is 587 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 588 589 <p>By indicating that processes and channels shall be reused, and by wrapping 590 the <code>calculate</code> function with the necessary support, the 591 computations may be performed in parallel using a pool of processes instead of 592 creating a new process for each computation and then discarding it, only to 593 create a new process for the next computation.</p> 594 595 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> 596 597 <p>Occasionally, it is desirable to initiate time-consuming computations and to 598 not only leave such processes running in the background, but to be able to detach 599 the creating process from them completely, potentially terminating the creating 600 process altogether, and yet also be able to collect the results of the created 601 processes at a later time, potentially in another completely different process. 602 For such situations, we can make use of the <code>BackgroundCallable</code> 603 class, which converts a parallel-aware callable into a callable which will run 604 in a background process when invoked.</p> 605 606 <p>Consider this excerpt from a modified version of the <a 607 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 608 609 <pre> 610 <strong>def task():</strong> 611 612 # Initialise the communications queue with a limit on the number of 613 # channels/processes. 614 615 queue = pprocess.Queue(limit=limit) 616 617 # Initialise an array. 618 619 results = [0] * N * N 620 621 # Wrap the calculate function and manage it. 622 623 calc = queue.manage(pprocess.MakeParallel(calculate)) 624 625 # Perform the work. 626 627 print "Calculating..." 628 for i in range(0, N): 629 for j in range(0, N): 630 calc(i, j) 631 632 # Store the results as they arrive. 633 634 print "Finishing..." 635 for i, j, result in queue: 636 results[i*N+j] = result 637 638 <strong>return results</strong> 639 </pre> 640 641 <p>Here, we have converted the main program into a function, and instead of 642 printing out the results, we return the results list from the function.</p> 643 644 <p>Now, let us consider the new main program (with the relevant mechanisms 645 highlighted):</p> 646 647 <pre> 648 t = time.time() 649 650 if "--reconnect" not in sys.argv: 651 652 # Wrap the computation and manage it. 653 654 <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> 655 656 # Perform the work. 657 658 ptask() 659 660 # Discard the callable. 661 662 del ptask 663 print "Discarded the callable." 664 665 if "--start" not in sys.argv: 666 667 # Open a queue and reconnect to the task. 668 669 print "Opening a queue." 670 <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> 671 672 # Wait for the results. 673 674 print "Waiting for persistent results" 675 for results in queue: 676 pass # should only be one element 677 678 # Show the results. 679 680 for i in range(0, N): 681 for result in results[i*N:i*N+N]: 682 print result, 683 print 684 685 print "Time taken:", time.time() - t 686 </pre> 687 688 <p>(This code in context with <code>import</code> statements and functions is 689 found in the <code>examples/simple_background_queue.py</code> file.)</p> 690 691 <p>This new main program has two parts: the part which initiates the 692 computation, and the part which connects to the computation in order to collect 693 the results. Both parts can be run in the same process, and this should result 694 in similar behaviour to that of the original 695 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> 696 697 <p>In the above program, however, we are free to specify <code>--start</code> as 698 an option when running the program, and the result of this is merely to initiate 699 the computation in a background process, using <code>BackgroundCallable</code> 700 to obtain a callable which, when invoked, creates the background process and 701 runs the computation. After doing this, the program will then exit, but it will 702 leave the computation running as a collection of background processes, and a 703 special file called <code>task.socket</code> will exist in the current working 704 directory.</p> 705 706 <p>When the above program is run using the <code>--reconnect</code> option, an 707 attempt will be made to reconnect to the background processes already created by 708 attempting to contact them using the previously created <code>task.socket</code> 709 special file (which is, in fact, a UNIX-domain socket); this being done using 710 the <code>BackgroundQueue</code> function which will handle the incoming results 711 in a fashion similar to that of a <code>Queue</code> object. Since only one 712 result is returned by the computation (as defined by the <code>return</code> 713 statement in the <code>task</code> function), we need only expect one element to 714 be collected by the queue: a list containing all of the results produced in the 715 computation.</p> 716 717 <h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2> 718 719 <p><strong>To be written.</strong></p> 720 721 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> 722 723 <p>In the above example, a single background process was used to manage a number 724 of other processes, with all of them running in the background. However, it can 725 be desirable to manage more than one background process.</p> 726 727 <p>Consider this excerpt from a modified version of the <a 728 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 729 730 <pre> 731 <strong>def task(i):</strong> 732 733 # Initialise the communications queue with a limit on the number of 734 # channels/processes. 735 736 queue = pprocess.Queue(limit=limit) 737 738 # Initialise an array. 739 740 results = [0] * N 741 742 # Wrap the calculate function and manage it. 743 744 calc = queue.manage(pprocess.MakeParallel(calculate)) 745 746 # Perform the work. 747 748 print "Calculating..." 749 <strong>for j in range(0, N):</strong> 750 <strong>calc(i, j)</strong> 751 752 # Store the results as they arrive. 753 754 print "Finishing..." 755 <strong>for i, j, result in queue:</strong> 756 <strong>results[j] = result</strong> 757 758 <strong>return i, results</strong> 759 </pre> 760 761 <p>Just as we see in the previous example, a function called <code>task</code> 762 has been defined to hold a background computation, and this function returns a 763 portion of the results. However, unlike the previous example or the original 764 example, the scope of the results of the computation collected in the function 765 have been changed: here, only results for calculations involving a certain value 766 of <code>i</code> are collected, with the particular value of <code>i</code> 767 returned along with the appropriate portion of the results.</p> 768 769 <p>Now, let us consider the new main program (with the relevant mechanisms 770 highlighted):</p> 771 772 <pre> 773 t = time.time() 774 775 if "--reconnect" not in sys.argv: 776 777 # Wrap the computation and manage it. 778 779 <strong>ptask = pprocess.MakeParallel(task)</strong> 780 781 <strong>for i in range(0, N):</strong> 782 783 # Make a distinct callable for each part of the computation. 784 785 <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> 786 787 # Perform the work. 788 789 <strong>ptask_i(i)</strong> 790 791 # Discard the callable. 792 793 del ptask 794 print "Discarded the callable." 795 796 if "--start" not in sys.argv: 797 798 # Open a queue and reconnect to the task. 799 800 print "Opening a queue." 801 <strong>queue = pprocess.PersistentQueue()</strong> 802 <strong>for i in range(0, N):</strong> 803 <strong>queue.connect("task-%d.socket" % i)</strong> 804 805 # Initialise an array. 806 807 <strong>results = [0] * N</strong> 808 809 # Wait for the results. 810 811 print "Waiting for persistent results" 812 <strong>for i, result in queue:</strong> 813 <strong>results[i] = result</strong> 814 815 # Show the results. 816 817 for i in range(0, N): 818 <strong>for result in results[i]:</strong> 819 print result, 820 print 821 822 print "Time taken:", time.time() - t 823 </pre> 824 825 <p>(This code in context with <code>import</code> statements and functions is 826 found in the <code>examples/simple_persistent_queue.py</code> file.)</p> 827 828 <p>In the first section, the process of making a parallel-aware callable is as 829 expected, but instead of then invoking a single background version of such a 830 callable, as in the previous example, we create a version for each value of 831 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. 832 The result of this is a total of <code>N</code> background processes, each 833 running an invocation of the <code>task</code> function with a distinct value of 834 <code>i</code> (which in turn perform computations), and each employing a 835 UNIX-domain socket for communication with a name of the form 836 <code>task-<em>i</em>.socket</code>.</p> 837 838 <p>In the second section, since we now have more than one background process, we 839 must find a way to monitor them after reconnecting to them; to achieve this, a 840 <code>PersistentQueue</code> is created, which acts like a regular 841 <code>Queue</code> object but is instead focused on handling persistent 842 communications. Upon connecting the queue to each of the previously created 843 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and 844 exposes received results through an iterator. Here, the principal difference 845 from previous examples is the structure of results: instead of collecting each 846 individual value in a flat <code>i</code> by <code>j</code> array, a list is 847 returned for each value of <code>i</code> and is stored directly in another 848 list.</p> 849 850 <h3>Applications of Background Computations</h3> 851 852 <p>Background computations are useful because they provide flexibility in the 853 way the results can be collected. One area in which they can be useful is Web 854 programming, where a process handling an incoming HTTP request may need to 855 initiate a computation but then immediately send output to the Web client - such 856 as a page indicating that the computation is "in progress" - without having to 857 wait for the computation or to allocate resources to monitor it. Moreover, in 858 some Web architectures, notably those employing the Common Gateway Interface 859 (CGI), it is necessary for a process handling an incoming request to terminate 860 before its output will be sent to clients. By using a 861 <code>BackgroundCallable</code>, a Web server process can initiate a 862 communication, and then subsequent server processes can be used to reconnect to 863 the background computation and to wait efficiently for results.</p> 864 865 <h2 id="Summary">Summary</h2> 866 867 <p>The following table indicates the features used in converting one 868 sequential example program to another parallel program:</p> 869 870 <table border="1" cellspacing="0" cellpadding="5"> 871 <thead> 872 <tr> 873 <th>Sequential Example</th> 874 <th>Parallel Example</th> 875 <th>Features Used</th> 876 </tr> 877 </thead> 878 <tbody> 879 <tr> 880 <td>simple_map</td> 881 <td>simple_pmap</td> 882 <td>pmap</td> 883 </tr> 884 <tr> 885 <td>simple1</td> 886 <td>simple_managed_map</td> 887 <td>MakeParallel, Map, manage</td> 888 </tr> 889 <tr> 890 <td rowspan="5">simple2</td> 891 <td>simple_managed_queue</td> 892 <td>MakeParallel, Queue, manage</td> 893 </tr> 894 <tr> 895 <td>simple_managed</td> 896 <td>MakeParallel, Exchange (subclass), manage, finish</td> 897 </tr> 898 <tr> 899 <td>simple_start</td> 900 <td>Channel, Exchange (subclass), start, finish</td> 901 </tr> 902 <tr> 903 <td>simple_background_queue</td> 904 <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> 905 </tr> 906 <tr> 907 <td>simple_persistent_queue</td> 908 <td>MakeParallel, BackgroundCallable, PersistentQueue</td> 909 </tr> 910 <tr> 911 <td>simple</td> 912 <td>simple_create_map</td> 913 <td>Channel, Map, create, exit</td> 914 </tr> 915 </tbody> 916 </table> 917 918 </body> 919 </html>