1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <ul> 19 <li><a href="#pmap">Converting Map-Style Code</a></li> 20 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> 21 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a></li> 22 <li><a href="#create">Converting Inline Computations</a></li> 23 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> 24 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> 25 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> 26 <li><a href="#Summary">Summary</a></li> 27 </ul> 28 29 <h2 id="pmap">Converting Map-Style Code</h2> 30 31 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 32 33 <pre> 34 t = time.time() 35 36 # Initialise an array. 37 38 sequence = [] 39 for i in range(0, N): 40 for j in range(0, N): 41 sequence.append((i, j)) 42 43 # Perform the work. 44 45 results = map(calculate, sequence) 46 47 # Show the results. 48 49 for i in range(0, N): 50 for result in results[i*N:i*N+N]: 51 print result, 52 print 53 54 print "Time taken:", time.time() - t</pre> 55 56 <p>(This code in context with <code>import</code> statements and functions is 57 found in the <code>examples/simple_map.py</code> file.)</p> 58 59 <p>The principal features of this program involve the preparation of an array 60 for input purposes, and the use of the <code>map</code> function to iterate 61 over the combinations of <code>i</code> and <code>j</code> in the array. Even 62 if the <code>calculate</code> function could be invoked independently for each 63 input value, we have to wait for each computation to complete before 64 initiating a new one. The <code>calculate</code> function may be defined as 65 follows:</p> 66 67 <pre> 68 def calculate(t): 69 70 "A supposedly time-consuming calculation on 't'." 71 72 i, j = t 73 time.sleep(delay) 74 return i * N + j 75 </pre> 76 77 <p>In order to reduce the processing time - to speed the code up, in other 78 words - we can make this code use several processes instead of just one. Here 79 is the modified code:</p> 80 81 <pre> 82 t = time.time() 83 84 # Initialise an array. 85 86 sequence = [] 87 for i in range(0, N): 88 for j in range(0, N): 89 sequence.append((i, j)) 90 91 # Perform the work. 92 93 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 94 95 # Show the results. 96 97 for i in range(0, N): 98 for result in results[i*N:i*N+N]: 99 print result, 100 print 101 102 print "Time taken:", time.time() - t</pre> 103 104 <p>(This code in context with <code>import</code> statements and functions is 105 found in the <code>examples/simple_pmap.py</code> file.)</p> 106 107 <p>By replacing usage of the <code>map</code> function with the 108 <code>pprocess.pmap</code> function, and specifying the limit on the number of 109 processes to be active at any given time (the value of the <code>limit</code> 110 variable is defined elsewhere), several calculations can now be performed in 111 parallel.</p> 112 113 <h2 id="Map">Converting Invocations to Parallel Operations</h2> 114 115 <p>Although some programs make natural use of the <code>map</code> function, 116 others may employ an invocation in a nested loop. This may also be converted 117 to a parallel program. Consider the following Python code:</p> 118 119 <pre> 120 t = time.time() 121 122 # Initialise an array. 123 124 results = [] 125 126 # Perform the work. 127 128 print "Calculating..." 129 for i in range(0, N): 130 for j in range(0, N): 131 results.append(calculate(i, j)) 132 133 # Show the results. 134 135 for i in range(0, N): 136 for result in results[i*N:i*N+N]: 137 print result, 138 print 139 140 print "Time taken:", time.time() - t</pre> 141 142 <p>(This code in context with <code>import</code> statements and functions is 143 found in the <code>examples/simple1.py</code> file.)</p> 144 145 <p>Here, a computation in the <code>calculate</code> function is performed for 146 each combination of <code>i</code> and <code>j</code> in the nested loop, 147 returning a result value. However, we must wait for the completion of this 148 function for each element before moving on to the next element, and this means 149 that the computations are performed sequentially. Consequently, on a system 150 with more than one processor, even if we could call <code>calculate</code> for 151 more than one combination of <code>i</code> and <code>j</code><code></code> 152 and have the computations executing at the same time, the above program will 153 not take advantage of such capabilities.</p> 154 155 <p>We use a slightly modified version of <code>calculate</code> which employs 156 two parameters instead of one:</p> 157 158 <pre> 159 def calculate(i, j): 160 161 """ 162 A supposedly time-consuming calculation on 'i' and 'j'. 163 """ 164 165 time.sleep(delay) 166 return i * N + j 167 </pre> 168 169 <p>In order to reduce the processing time - to speed the code up, in other 170 words - we can make this code use several processes instead of just one. Here 171 is the modified code:</p> 172 173 <pre id="simple_managed_map"> 174 t = time.time() 175 176 # Initialise the results using a map with a limit on the number of 177 # channels/processes. 178 179 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 180 181 # Wrap the calculate function and manage it. 182 183 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 184 185 # Perform the work. 186 187 print "Calculating..." 188 for i in range(0, N): 189 for j in range(0, N): 190 <strong>calc</strong>(i, j) 191 192 # Show the results. 193 194 for i in range(0, N): 195 for result in results[i*N:i*N+N]: 196 print result, 197 print 198 199 print "Time taken:", time.time() - t</pre> 200 201 <p>(This code in context with <code>import</code> statements and functions is 202 found in the <code>examples/simple_managed_map.py</code> file.)</p> 203 204 <p>The principal changes in the above code involve the use of a 205 <code>pprocess.Map</code> object to collect the results, and a version of the 206 <code>calculate</code> function which is managed by the <code>Map</code> 207 object. What the <code>Map</code> object does is to arrange the results of 208 computations such that iterating over the object or accessing the object using 209 list operations provides the results in the same order as their corresponding 210 inputs.</p> 211 212 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> 213 214 <p>In some programs, it is not important to receive the results of 215 computations in any particular order, usually because either the order of 216 these results is irrelevant, or because the results provide "positional" 217 information which let them be handled in an appropriate way. Consider the 218 following Python code:</p> 219 220 <pre> 221 t = time.time() 222 223 # Initialise an array. 224 225 results = [0] * N * N 226 227 # Perform the work. 228 229 print "Calculating..." 230 for i in range(0, N): 231 for j in range(0, N): 232 i2, j2, result = calculate(i, j) 233 results[i2*N+j2] = result 234 235 # Show the results. 236 237 for i in range(0, N): 238 for result in results[i*N:i*N+N]: 239 print result, 240 print 241 242 print "Time taken:", time.time() - t 243 </pre> 244 245 <p>(This code in context with <code>import</code> statements and functions is 246 found in the <code>examples/simple2.py</code> file.)</p> 247 248 <p>Here, a result array is initialised first and each computation is performed 249 sequentially. A significant difference to the previous examples is the return 250 value of the <code>calculate</code> function: the position details 251 corresponding to <code>i</code> and <code>j</code> are returned alongside the 252 result. Obviously, this is of limited value in the above code because the 253 order of the computations and the reception of results is fixed. However, we 254 get no benefit from parallelisation in the above example.</p> 255 256 <p>We can bring the benefits of parallel processing to the above program with 257 the following code:</p> 258 259 <pre id="simple_managed_queue"> 260 t = time.time() 261 262 # Initialise the communications queue with a limit on the number of 263 # channels/processes. 264 265 <strong>queue = pprocess.Queue(limit=limit)</strong> 266 267 # Initialise an array. 268 269 results = [0] * N * N 270 271 # Wrap the calculate function and manage it. 272 273 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 274 275 # Perform the work. 276 277 print "Calculating..." 278 for i in range(0, N): 279 for j in range(0, N): 280 <strong>calc(i, j)</strong> 281 282 # Store the results as they arrive. 283 284 print "Finishing..." 285 <strong>for i, j, result in queue:</strong> 286 <strong>results[i*N+j] = result</strong> 287 288 # Show the results. 289 290 for i in range(0, N): 291 for result in results[i*N:i*N+N]: 292 print result, 293 print 294 295 print "Time taken:", time.time() - t 296 </pre> 297 298 <p>(This code in context with <code>import</code> statements and functions is 299 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 300 301 <p>This revised code employs a <code>pprocess.Queue</code> object whose 302 purpose is to collect the results of computations and to make them available 303 in the order in which they were received. The code collecting results has been 304 moved into a separate loop independent of the original computation loop and 305 taking advantage of the more relevant "positional" information emerging from 306 the queue.</p> 307 308 <p>We can take this example further, illustrating some of the mechanisms 309 employed by <code>pprocess</code>. Instead of collecting results in a queue, 310 we can define a class containing a method which is called when new results 311 arrive:</p> 312 313 <pre> 314 class MyExchange(pprocess.Exchange): 315 316 "Parallel convenience class containing the array assignment operation." 317 318 def store_data(self, ch): 319 i, j, result = ch.receive() 320 self.D[i*N+j] = result 321 </pre> 322 323 <p>This code exposes the channel paradigm which is used throughout 324 <code>pprocess</code> and is available to applications, if desired. The effect 325 of the method is the storage of a result received through the channel in an 326 attribute of the object. The following code shows how this class can be used, 327 with differences to the previous program illustrated:</p> 328 329 <pre> 330 t = time.time() 331 332 # Initialise the communications exchange with a limit on the number of 333 # channels/processes. 334 335 <strong>exchange = MyExchange(limit=limit)</strong> 336 337 # Initialise an array - it is stored in the exchange to permit automatic 338 # assignment of values as the data arrives. 339 340 <strong>results = exchange.D = [0] * N * N</strong> 341 342 # Wrap the calculate function and manage it. 343 344 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 345 346 # Perform the work. 347 348 print "Calculating..." 349 for i in range(0, N): 350 for j in range(0, N): 351 calc(i, j) 352 353 # Wait for the results. 354 355 print "Finishing..." 356 <strong>exchange.finish()</strong> 357 358 # Show the results. 359 360 for i in range(0, N): 361 for result in results[i*N:i*N+N]: 362 print result, 363 print 364 365 print "Time taken:", time.time() - t 366 </pre> 367 368 <p>(This code in context with <code>import</code> statements and functions is 369 found in the <code>examples/simple_managed.py</code> file.)</p> 370 371 <p>The main visible differences between this and the previous program are the 372 storage of the result array in the exchange, the removal of the queue 373 consumption code from the main program, placing the act of storing values in 374 the exchange's <code>store_data</code> method, and the need to call the 375 <code>finish</code> method on the <code>MyExchange</code> object so that we do 376 not try and access the results too soon. One underlying benefit not visible in 377 the above code is that we no longer need to accumulate results in a queue or 378 other structure so that they may be processed and assigned to the correct 379 positions in the result array.</p> 380 381 <p>For the curious, we may remove some of the remaining conveniences of the 382 above program to expose other features of <code>pprocess</code>. First, we 383 define a slightly modified version of the <code>calculate</code> function:</p> 384 385 <pre> 386 def calculate(ch, i, j): 387 388 """ 389 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 390 communicate with the parent process. 391 """ 392 393 time.sleep(delay) 394 ch.send((i, j, i * N + j)) 395 </pre> 396 397 <p>This function accepts a channel, <code>ch</code>, through which results 398 will be sent, and through which other values could potentially be received, 399 although we choose not to do so here. The program using this function is as 400 follows, with differences to the previous program illustrated:</p> 401 402 <pre> 403 t = time.time() 404 405 # Initialise the communications exchange with a limit on the number of 406 # channels/processes. 407 408 exchange = MyExchange(limit=limit) 409 410 # Initialise an array - it is stored in the exchange to permit automatic 411 # assignment of values as the data arrives. 412 413 results = exchange.D = [0] * N * N 414 415 # Perform the work. 416 417 print "Calculating..." 418 for i in range(0, N): 419 for j in range(0, N): 420 <strong>exchange.start(calculate, i, j)</strong> 421 422 # Wait for the results. 423 424 print "Finishing..." 425 exchange.finish() 426 427 # Show the results. 428 429 for i in range(0, N): 430 for result in results[i*N:i*N+N]: 431 print result, 432 print 433 434 print "Time taken:", time.time() - t 435 </pre> 436 437 <p>(This code in context with <code>import</code> statements and functions is 438 found in the <code>examples/simple_start.py</code> file.)</p> 439 440 <p>Here, we have discarded two conveniences: the wrapping of callables using 441 <code>MakeParallel</code>, which lets us use functions without providing any 442 channel parameters, and the management of callables using the 443 <code>manage</code> method on queues, exchanges, and so on. The 444 <code>start</code> method still calls the provided callable, but using a 445 different notation from that employed previously.</p> 446 447 <h2 id="create">Converting Inline Computations</h2> 448 449 <p>Although many programs employ functions and other useful abstractions which 450 can be treated as parallelisable units, some programs perform computations 451 "inline", meaning that the code responsible appears directly within a loop or 452 related control-flow construct. Consider the following code:</p> 453 454 <pre> 455 t = time.time() 456 457 # Initialise an array. 458 459 results = [0] * N * N 460 461 # Perform the work. 462 463 print "Calculating..." 464 for i in range(0, N): 465 for j in range(0, N): 466 time.sleep(delay) 467 results[i*N+j] = i * N + j 468 469 # Show the results. 470 471 for i in range(0, N): 472 for result in results[i*N:i*N+N]: 473 print result, 474 print 475 476 print "Time taken:", time.time() - t 477 </pre> 478 479 <p>(This code in context with <code>import</code> statements and functions is 480 found in the <code>examples/simple.py</code> file.)</p> 481 482 <p>To simulate "work", as in the different versions of the 483 <code>calculate</code> function, we use the <code>time.sleep</code> function 484 (which does not actually do work, and which will cause a process to be 485 descheduled in most cases, but which simulates the delay associated with work 486 being done). This inline work, which must be performed sequentially in the 487 above program, can be performed in parallel in a somewhat modified version of 488 the program:</p> 489 490 <pre> 491 t = time.time() 492 493 # Initialise the results using a map with a limit on the number of 494 # channels/processes. 495 496 <strong>results = pprocess.Map(limit=limit)</strong> 497 498 # Perform the work. 499 # NOTE: Could use the with statement in the loop to package the 500 # NOTE: try...finally functionality. 501 502 print "Calculating..." 503 for i in range(0, N): 504 for j in range(0, N): 505 <strong>ch = results.create()</strong> 506 <strong>if ch:</strong> 507 <strong>try: # Calculation work.</strong> 508 509 time.sleep(delay) 510 <strong>ch.send(i * N + j)</strong> 511 512 <strong>finally: # Important finalisation.</strong> 513 514 <strong>pprocess.exit(ch)</strong> 515 516 # Show the results. 517 518 for i in range(0, N): 519 for result in results[i*N:i*N+N]: 520 print result, 521 print 522 523 print "Time taken:", time.time() - t 524 </pre> 525 526 <p>(This code in context with <code>import</code> statements and functions is 527 found in the <code>examples/simple_create_map.py</code> file.)</p> 528 529 <p>Although seemingly more complicated, the bulk of the changes in this 530 modified program are focused on obtaining a channel object, <code>ch</code>, 531 at the point where the computations are performed, and the wrapping of the 532 computation code in a <code>try</code>...<code>finally</code> statement which 533 ensures that the process associated with the channel exits when the 534 computation is complete. In order for the results of these computations to be 535 collected, a <code>pprocess.Map</code> object is used, since it will maintain 536 the results in the same order as the initiation of the computations which 537 produced them.</p> 538 539 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> 540 541 <p>One notable aspect of the above programs when parallelised is that each 542 invocation of a computation in parallel creates a new process in which the 543 computation is to be performed, regardless of whether existing processes had 544 just finished producing results and could theoretically have been asked to 545 perform new computations. In other words, processes were created and destroyed 546 instead of being reused.</p> 547 548 <p>However, we can request that processes be reused for computations by 549 enabling the <code>reuse</code> feature of exchange-like objects and employing 550 suitable reusable callables. Consider this modified version of the <a 551 href="#simple_managed_map">simple_managed_map</a> program:</p> 552 553 <pre> 554 t = time.time() 555 556 # Initialise the results using a map with a limit on the number of 557 # channels/processes. 558 559 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 560 561 # Wrap the calculate function and manage it. 562 563 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 564 565 # Perform the work. 566 567 print "Calculating..." 568 for i in range(0, N): 569 for j in range(0, N): 570 calc(i, j) 571 572 # Show the results. 573 574 for i in range(0, N): 575 for result in results[i*N:i*N+N]: 576 print result, 577 print 578 579 print "Time taken:", time.time() - t 580 </pre> 581 582 <p>(This code in context with <code>import</code> statements and functions is 583 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 584 585 <p>By indicating that processes and channels shall be reused, and by wrapping 586 the <code>calculate</code> function with the necessary support, the 587 computations may be performed in parallel using a pool of processes instead of 588 creating a new process for each computation and then discarding it, only to 589 create a new process for the next computation.</p> 590 591 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> 592 593 <p>Occasionally, it is desirable to initiate time-consuming computations and to 594 not only leave such processes running in the background, but to be able to detach 595 the creating process from them completely, potentially terminating the creating 596 process altogether, and yet also be able to collect the results of the created 597 processes at a later time, potentially in another completely different process. 598 For such situations, we can make use of the <code>BackgroundCallable</code> 599 class, which converts a parallel-aware callable into a callable which will run 600 in a background process when invoked.</p> 601 602 <p>Consider this excerpt from a modified version of the <a 603 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 604 605 <pre> 606 <strong>def task():</strong> 607 608 # Initialise the communications queue with a limit on the number of 609 # channels/processes. 610 611 queue = pprocess.Queue(limit=limit) 612 613 # Initialise an array. 614 615 results = [0] * N * N 616 617 # Wrap the calculate function and manage it. 618 619 calc = queue.manage(pprocess.MakeParallel(calculate)) 620 621 # Perform the work. 622 623 print "Calculating..." 624 for i in range(0, N): 625 for j in range(0, N): 626 calc(i, j) 627 628 # Store the results as they arrive. 629 630 print "Finishing..." 631 for i, j, result in queue: 632 results[i*N+j] = result 633 634 <strong>return results</strong> 635 </pre> 636 637 <p>Here, we have converted the main program into a function, and instead of 638 printing out the results, we return the results list from the function.</p> 639 640 <p>Now, let us consider the new main program (with the relevant mechanisms 641 highlighted):</p> 642 643 <pre> 644 t = time.time() 645 646 if "--reconnect" not in sys.argv: 647 648 # Wrap the computation and manage it. 649 650 <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> 651 652 # Perform the work. 653 654 ptask() 655 656 # Discard the callable. 657 658 del ptask 659 print "Discarded the callable." 660 661 if "--start" not in sys.argv: 662 663 # Open a queue and reconnect to the task. 664 665 print "Opening a queue." 666 <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> 667 668 # Wait for the results. 669 670 print "Waiting for persistent results" 671 for results in queue: 672 pass # should only be one element 673 674 # Show the results. 675 676 for i in range(0, N): 677 for result in results[i*N:i*N+N]: 678 print result, 679 print 680 681 print "Time taken:", time.time() - t 682 </pre> 683 684 <p>(This code in context with <code>import</code> statements and functions is 685 found in the <code>examples/simple_background_queue.py</code> file.)</p> 686 687 <p>This new main program has two parts: the part which initiates the 688 computation, and the part which connects to the computation in order to collect 689 the results. Both parts can be run in the same process, and this should result 690 in similar behaviour to that of the original 691 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> 692 693 <p>In the above program, however, we are free to specify <code>--start</code> as 694 an option when running the program, and the result of this is merely to initiate 695 the computation in a background process, using <code>BackgroundCallable</code> 696 to obtain a callable which, when invoked, creates the background process and 697 runs the computation. After doing this, the program will then exit, but it will 698 leave the computation running as a collection of background processes, and a 699 special file called <code>task.socket</code> will exist in the current working 700 directory.</p> 701 702 <p>When the above program is run using the <code>--reconnect</code> option, an 703 attempt will be made to reconnect to the background processes already created by 704 attempting to contact them using the previously created <code>task.socket</code> 705 special file (which is, in fact, a UNIX-domain socket); this being done using 706 the <code>BackgroundQueue</code> function which will handle the incoming results 707 in a fashion similar to that of a <code>Queue</code> object. Since only one 708 result is returned by the computation (as defined by the <code>return</code> 709 statement in the <code>task</code> function), we need only expect one element to 710 be collected by the queue: a list containing all of the results produced in the 711 computation.</p> 712 713 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> 714 715 <p>In the above example, a single background process was used to manage a number 716 of other processes, with all of them running in the background. However, it can 717 be desirable to manage more than one background process.</p> 718 719 <p>Consider this excerpt from a modified version of the <a 720 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 721 722 <pre> 723 <strong>def task(i):</strong> 724 725 # Initialise the communications queue with a limit on the number of 726 # channels/processes. 727 728 queue = pprocess.Queue(limit=limit) 729 730 # Initialise an array. 731 732 results = [0] * N 733 734 # Wrap the calculate function and manage it. 735 736 calc = queue.manage(pprocess.MakeParallel(calculate)) 737 738 # Perform the work. 739 740 print "Calculating..." 741 <strong>for j in range(0, N):</strong> 742 <strong>calc(i, j)</strong> 743 744 # Store the results as they arrive. 745 746 print "Finishing..." 747 <strong>for i, j, result in queue:</strong> 748 <strong>results[j] = result</strong> 749 750 <strong>return i, results</strong> 751 </pre> 752 753 <p>Just as we see in the previous example, a function called <code>task</code> 754 has been defined to hold a background computation, and this function returns a 755 portion of the results. However, unlike the previous example or the original 756 example, the scope of the results of the computation collected in the function 757 have been changed: here, only results for calculations involving a certain value 758 of <code>i</code> are collected, with the particular value of <code>i</code> 759 returned along with the appropriate portion of the results.</p> 760 761 <p>Now, let us consider the new main program (with the relevant mechanisms 762 highlighted):</p> 763 764 <pre> 765 t = time.time() 766 767 if "--reconnect" not in sys.argv: 768 769 # Wrap the computation and manage it. 770 771 <strong>ptask = pprocess.MakeParallel(task)</strong> 772 773 <strong>for i in range(0, N):</strong> 774 775 # Make a distinct callable for each part of the computation. 776 777 <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> 778 779 # Perform the work. 780 781 <strong>ptask_i(i)</strong> 782 783 # Discard the callable. 784 785 del ptask 786 print "Discarded the callable." 787 788 if "--start" not in sys.argv: 789 790 # Open a queue and reconnect to the task. 791 792 print "Opening a queue." 793 <strong>queue = pprocess.PersistentQueue()</strong> 794 <strong>for i in range(0, N):</strong> 795 <strong>queue.connect("task-%d.socket" % i)</strong> 796 797 # Initialise an array. 798 799 <strong>results = [0] * N</strong> 800 801 # Wait for the results. 802 803 print "Waiting for persistent results" 804 <strong>for i, result in queue:</strong> 805 <strong>results[i] = result</strong> 806 807 # Show the results. 808 809 for i in range(0, N): 810 <strong>for result in results[i]:</strong> 811 print result, 812 print 813 814 print "Time taken:", time.time() - t 815 </pre> 816 817 <p>(This code in context with <code>import</code> statements and functions is 818 found in the <code>examples/simple_persistent_queue.py</code> file.)</p> 819 820 <p>In the first section, the process of making a parallel-aware callable is as 821 expected, but instead of then invoking a single background version of such a 822 callable, as in the previous example, we create a version for each value of 823 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. 824 The result of this is a total of <code>N</code> background processes, each 825 running an invocation of the <code>task</code> function with a distinct value of 826 <code>i</code> (which in turn perform computations), and each employing a 827 UNIX-domain socket for communication with a name of the form 828 <code>task-<em>i</em>.socket</code>.</p> 829 830 <p>In the second section, since we now have more than one background process, we 831 must find a way to monitor them after reconnecting to them; to achieve this, a 832 <code>PersistentQueue</code> is created, which acts like a regular 833 <code>Queue</code> object but is instead focused on handling persistent 834 communications. Upon connecting the queue to each of the previously created 835 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and 836 exposes received results through an iterator. Here, the principal difference 837 from previous examples is the structure of results: instead of collecting each 838 individual value in a flat <code>i</code> by <code>j</code> array, a list is 839 returned for each value of <code>i</code> and is stored directly in another 840 list.</p> 841 842 <h3>Applications of Background Computations</h3> 843 844 <p>Background computations are useful because they provide flexibility in the 845 way the results can be collected. One area in which they can be useful is Web 846 programming, where a process handling an incoming HTTP request may need to 847 initiate a computation but then immediately send output to the Web client - such 848 as a page indicating that the computation is "in progress" - without having to 849 wait for the computation or to allocate resources to monitor it. Moreover, in 850 some Web architectures, notably those employing the Common Gateway Interface 851 (CGI), it is necessary for a process handling an incoming request to terminate 852 before its output will be sent to clients. By using a 853 <code>BackgroundCallable</code>, a Web server process can initiate a 854 communication, and then subsequent server processes can be used to reconnect to 855 the background computation and to wait efficiently for results.</p> 856 857 <h2 id="Summary">Summary</h2> 858 859 <p>The following table indicates the features used in converting one 860 sequential example program to another parallel program:</p> 861 862 <table border="1" cellspacing="0" cellpadding="5"> 863 <thead> 864 <tr> 865 <th>Sequential Example</th> 866 <th>Parallel Example</th> 867 <th>Features Used</th> 868 </tr> 869 </thead> 870 <tbody> 871 <tr> 872 <td>simple_map</td> 873 <td>simple_pmap</td> 874 <td>pmap</td> 875 </tr> 876 <tr> 877 <td>simple1</td> 878 <td>simple_managed_map</td> 879 <td>MakeParallel, Map, manage</td> 880 </tr> 881 <tr> 882 <td rowspan="5">simple2</td> 883 <td>simple_managed_queue</td> 884 <td>MakeParallel, Queue, manage</td> 885 </tr> 886 <tr> 887 <td>simple_managed</td> 888 <td>MakeParallel, Exchange (subclass), manage, finish</td> 889 </tr> 890 <tr> 891 <td>simple_start</td> 892 <td>Channel, Exchange (subclass), start, finish</td> 893 </tr> 894 <tr> 895 <td>simple_background_queue</td> 896 <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> 897 </tr> 898 <tr> 899 <td>simple_persistent_queue</td> 900 <td>MakeParallel, BackgroundCallable, PersistentQueue</td> 901 </tr> 902 <tr> 903 <td>simple</td> 904 <td>simple_create_map</td> 905 <td>Channel, Map, create, exit</td> 906 </tr> 907 </tbody> 908 </table> 909 910 </body> 911 </html>