1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <ul> 19 <li><a href="#pmap">Converting Map-Style Code</a></li> 20 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li> 21 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a></li> 22 <li><a href="#create">Converting Inline Computations</a></li> 23 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li> 24 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li> 25 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li> 26 <li><a href="#Summary">Summary</a></li> 27 </ul> 28 29 <p>For a brief summary of each of the features of <code>pprocess</code>, see 30 the <a href="reference.html">reference document</a>.</p> 31 32 <h2 id="pmap">Converting Map-Style Code</h2> 33 34 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 35 36 <pre> 37 t = time.time() 38 39 # Initialise an array. 40 41 sequence = [] 42 for i in range(0, N): 43 for j in range(0, N): 44 sequence.append((i, j)) 45 46 # Perform the work. 47 48 results = map(calculate, sequence) 49 50 # Show the results. 51 52 for i in range(0, N): 53 for result in results[i*N:i*N+N]: 54 print result, 55 print 56 57 print "Time taken:", time.time() - t</pre> 58 59 <p>(This code in context with <code>import</code> statements and functions is 60 found in the <code>examples/simple_map.py</code> file.)</p> 61 62 <p>The principal features of this program involve the preparation of an array 63 for input purposes, and the use of the <code>map</code> function to iterate 64 over the combinations of <code>i</code> and <code>j</code> in the array. Even 65 if the <code>calculate</code> function could be invoked independently for each 66 input value, we have to wait for each computation to complete before 67 initiating a new one. The <code>calculate</code> function may be defined as 68 follows:</p> 69 70 <pre> 71 def calculate(t): 72 73 "A supposedly time-consuming calculation on 't'." 74 75 i, j = t 76 time.sleep(delay) 77 return i * N + j 78 </pre> 79 80 <p>In order to reduce the processing time - to speed the code up, in other 81 words - we can make this code use several processes instead of just one. Here 82 is the modified code:</p> 83 84 <pre> 85 t = time.time() 86 87 # Initialise an array. 88 89 sequence = [] 90 for i in range(0, N): 91 for j in range(0, N): 92 sequence.append((i, j)) 93 94 # Perform the work. 95 96 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 97 98 # Show the results. 99 100 for i in range(0, N): 101 for result in results[i*N:i*N+N]: 102 print result, 103 print 104 105 print "Time taken:", time.time() - t</pre> 106 107 <p>(This code in context with <code>import</code> statements and functions is 108 found in the <code>examples/simple_pmap.py</code> file.)</p> 109 110 <p>By replacing usage of the <code>map</code> function with the 111 <code>pprocess.pmap</code> function, and specifying the limit on the number of 112 processes to be active at any given time (the value of the <code>limit</code> 113 variable is defined elsewhere), several calculations can now be performed in 114 parallel.</p> 115 116 <h2 id="Map">Converting Invocations to Parallel Operations</h2> 117 118 <p>Although some programs make natural use of the <code>map</code> function, 119 others may employ an invocation in a nested loop. This may also be converted 120 to a parallel program. Consider the following Python code:</p> 121 122 <pre> 123 t = time.time() 124 125 # Initialise an array. 126 127 results = [] 128 129 # Perform the work. 130 131 print "Calculating..." 132 for i in range(0, N): 133 for j in range(0, N): 134 results.append(calculate(i, j)) 135 136 # Show the results. 137 138 for i in range(0, N): 139 for result in results[i*N:i*N+N]: 140 print result, 141 print 142 143 print "Time taken:", time.time() - t</pre> 144 145 <p>(This code in context with <code>import</code> statements and functions is 146 found in the <code>examples/simple1.py</code> file.)</p> 147 148 <p>Here, a computation in the <code>calculate</code> function is performed for 149 each combination of <code>i</code> and <code>j</code> in the nested loop, 150 returning a result value. However, we must wait for the completion of this 151 function for each element before moving on to the next element, and this means 152 that the computations are performed sequentially. Consequently, on a system 153 with more than one processor, even if we could call <code>calculate</code> for 154 more than one combination of <code>i</code> and <code>j</code><code></code> 155 and have the computations executing at the same time, the above program will 156 not take advantage of such capabilities.</p> 157 158 <p>We use a slightly modified version of <code>calculate</code> which employs 159 two parameters instead of one:</p> 160 161 <pre> 162 def calculate(i, j): 163 164 """ 165 A supposedly time-consuming calculation on 'i' and 'j'. 166 """ 167 168 time.sleep(delay) 169 return i * N + j 170 </pre> 171 172 <p>In order to reduce the processing time - to speed the code up, in other 173 words - we can make this code use several processes instead of just one. Here 174 is the modified code:</p> 175 176 <pre id="simple_managed_map"> 177 t = time.time() 178 179 # Initialise the results using a map with a limit on the number of 180 # channels/processes. 181 182 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 183 184 # Wrap the calculate function and manage it. 185 186 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 187 188 # Perform the work. 189 190 print "Calculating..." 191 for i in range(0, N): 192 for j in range(0, N): 193 <strong>calc</strong>(i, j) 194 195 # Show the results. 196 197 for i in range(0, N): 198 for result in results[i*N:i*N+N]: 199 print result, 200 print 201 202 print "Time taken:", time.time() - t</pre> 203 204 <p>(This code in context with <code>import</code> statements and functions is 205 found in the <code>examples/simple_managed_map.py</code> file.)</p> 206 207 <p>The principal changes in the above code involve the use of a 208 <code>pprocess.Map</code> object to collect the results, and a version of the 209 <code>calculate</code> function which is managed by the <code>Map</code> 210 object. What the <code>Map</code> object does is to arrange the results of 211 computations such that iterating over the object or accessing the object using 212 list operations provides the results in the same order as their corresponding 213 inputs.</p> 214 215 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2> 216 217 <p>In some programs, it is not important to receive the results of 218 computations in any particular order, usually because either the order of 219 these results is irrelevant, or because the results provide "positional" 220 information which let them be handled in an appropriate way. Consider the 221 following Python code:</p> 222 223 <pre> 224 t = time.time() 225 226 # Initialise an array. 227 228 results = [0] * N * N 229 230 # Perform the work. 231 232 print "Calculating..." 233 for i in range(0, N): 234 for j in range(0, N): 235 i2, j2, result = calculate(i, j) 236 results[i2*N+j2] = result 237 238 # Show the results. 239 240 for i in range(0, N): 241 for result in results[i*N:i*N+N]: 242 print result, 243 print 244 245 print "Time taken:", time.time() - t 246 </pre> 247 248 <p>(This code in context with <code>import</code> statements and functions is 249 found in the <code>examples/simple2.py</code> file.)</p> 250 251 <p>Here, a result array is initialised first and each computation is performed 252 sequentially. A significant difference to the previous examples is the return 253 value of the <code>calculate</code> function: the position details 254 corresponding to <code>i</code> and <code>j</code> are returned alongside the 255 result. Obviously, this is of limited value in the above code because the 256 order of the computations and the reception of results is fixed. However, we 257 get no benefit from parallelisation in the above example.</p> 258 259 <p>We can bring the benefits of parallel processing to the above program with 260 the following code:</p> 261 262 <pre id="simple_managed_queue"> 263 t = time.time() 264 265 # Initialise the communications queue with a limit on the number of 266 # channels/processes. 267 268 <strong>queue = pprocess.Queue(limit=limit)</strong> 269 270 # Initialise an array. 271 272 results = [0] * N * N 273 274 # Wrap the calculate function and manage it. 275 276 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 277 278 # Perform the work. 279 280 print "Calculating..." 281 for i in range(0, N): 282 for j in range(0, N): 283 <strong>calc(i, j)</strong> 284 285 # Store the results as they arrive. 286 287 print "Finishing..." 288 <strong>for i, j, result in queue:</strong> 289 <strong>results[i*N+j] = result</strong> 290 291 # Show the results. 292 293 for i in range(0, N): 294 for result in results[i*N:i*N+N]: 295 print result, 296 print 297 298 print "Time taken:", time.time() - t 299 </pre> 300 301 <p>(This code in context with <code>import</code> statements and functions is 302 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 303 304 <p>This revised code employs a <code>pprocess.Queue</code> object whose 305 purpose is to collect the results of computations and to make them available 306 in the order in which they were received. The code collecting results has been 307 moved into a separate loop independent of the original computation loop and 308 taking advantage of the more relevant "positional" information emerging from 309 the queue.</p> 310 311 <p>We can take this example further, illustrating some of the mechanisms 312 employed by <code>pprocess</code>. Instead of collecting results in a queue, 313 we can define a class containing a method which is called when new results 314 arrive:</p> 315 316 <pre> 317 class MyExchange(pprocess.Exchange): 318 319 "Parallel convenience class containing the array assignment operation." 320 321 def store_data(self, ch): 322 i, j, result = ch.receive() 323 self.D[i*N+j] = result 324 </pre> 325 326 <p>This code exposes the channel paradigm which is used throughout 327 <code>pprocess</code> and is available to applications, if desired. The effect 328 of the method is the storage of a result received through the channel in an 329 attribute of the object. The following code shows how this class can be used, 330 with differences to the previous program illustrated:</p> 331 332 <pre> 333 t = time.time() 334 335 # Initialise the communications exchange with a limit on the number of 336 # channels/processes. 337 338 <strong>exchange = MyExchange(limit=limit)</strong> 339 340 # Initialise an array - it is stored in the exchange to permit automatic 341 # assignment of values as the data arrives. 342 343 <strong>results = exchange.D = [0] * N * N</strong> 344 345 # Wrap the calculate function and manage it. 346 347 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 348 349 # Perform the work. 350 351 print "Calculating..." 352 for i in range(0, N): 353 for j in range(0, N): 354 calc(i, j) 355 356 # Wait for the results. 357 358 print "Finishing..." 359 <strong>exchange.finish()</strong> 360 361 # Show the results. 362 363 for i in range(0, N): 364 for result in results[i*N:i*N+N]: 365 print result, 366 print 367 368 print "Time taken:", time.time() - t 369 </pre> 370 371 <p>(This code in context with <code>import</code> statements and functions is 372 found in the <code>examples/simple_managed.py</code> file.)</p> 373 374 <p>The main visible differences between this and the previous program are the 375 storage of the result array in the exchange, the removal of the queue 376 consumption code from the main program, placing the act of storing values in 377 the exchange's <code>store_data</code> method, and the need to call the 378 <code>finish</code> method on the <code>MyExchange</code> object so that we do 379 not try and access the results too soon. One underlying benefit not visible in 380 the above code is that we no longer need to accumulate results in a queue or 381 other structure so that they may be processed and assigned to the correct 382 positions in the result array.</p> 383 384 <p>For the curious, we may remove some of the remaining conveniences of the 385 above program to expose other features of <code>pprocess</code>. First, we 386 define a slightly modified version of the <code>calculate</code> function:</p> 387 388 <pre> 389 def calculate(ch, i, j): 390 391 """ 392 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 393 communicate with the parent process. 394 """ 395 396 time.sleep(delay) 397 ch.send((i, j, i * N + j)) 398 </pre> 399 400 <p>This function accepts a channel, <code>ch</code>, through which results 401 will be sent, and through which other values could potentially be received, 402 although we choose not to do so here. The program using this function is as 403 follows, with differences to the previous program illustrated:</p> 404 405 <pre> 406 t = time.time() 407 408 # Initialise the communications exchange with a limit on the number of 409 # channels/processes. 410 411 exchange = MyExchange(limit=limit) 412 413 # Initialise an array - it is stored in the exchange to permit automatic 414 # assignment of values as the data arrives. 415 416 results = exchange.D = [0] * N * N 417 418 # Perform the work. 419 420 print "Calculating..." 421 for i in range(0, N): 422 for j in range(0, N): 423 <strong>exchange.start(calculate, i, j)</strong> 424 425 # Wait for the results. 426 427 print "Finishing..." 428 exchange.finish() 429 430 # Show the results. 431 432 for i in range(0, N): 433 for result in results[i*N:i*N+N]: 434 print result, 435 print 436 437 print "Time taken:", time.time() - t 438 </pre> 439 440 <p>(This code in context with <code>import</code> statements and functions is 441 found in the <code>examples/simple_start.py</code> file.)</p> 442 443 <p>Here, we have discarded two conveniences: the wrapping of callables using 444 <code>MakeParallel</code>, which lets us use functions without providing any 445 channel parameters, and the management of callables using the 446 <code>manage</code> method on queues, exchanges, and so on. The 447 <code>start</code> method still calls the provided callable, but using a 448 different notation from that employed previously.</p> 449 450 <h2 id="create">Converting Inline Computations</h2> 451 452 <p>Although many programs employ functions and other useful abstractions which 453 can be treated as parallelisable units, some programs perform computations 454 "inline", meaning that the code responsible appears directly within a loop or 455 related control-flow construct. Consider the following code:</p> 456 457 <pre> 458 t = time.time() 459 460 # Initialise an array. 461 462 results = [0] * N * N 463 464 # Perform the work. 465 466 print "Calculating..." 467 for i in range(0, N): 468 for j in range(0, N): 469 time.sleep(delay) 470 results[i*N+j] = i * N + j 471 472 # Show the results. 473 474 for i in range(0, N): 475 for result in results[i*N:i*N+N]: 476 print result, 477 print 478 479 print "Time taken:", time.time() - t 480 </pre> 481 482 <p>(This code in context with <code>import</code> statements and functions is 483 found in the <code>examples/simple.py</code> file.)</p> 484 485 <p>To simulate "work", as in the different versions of the 486 <code>calculate</code> function, we use the <code>time.sleep</code> function 487 (which does not actually do work, and which will cause a process to be 488 descheduled in most cases, but which simulates the delay associated with work 489 being done). This inline work, which must be performed sequentially in the 490 above program, can be performed in parallel in a somewhat modified version of 491 the program:</p> 492 493 <pre> 494 t = time.time() 495 496 # Initialise the results using a map with a limit on the number of 497 # channels/processes. 498 499 <strong>results = pprocess.Map(limit=limit)</strong> 500 501 # Perform the work. 502 # NOTE: Could use the with statement in the loop to package the 503 # NOTE: try...finally functionality. 504 505 print "Calculating..." 506 for i in range(0, N): 507 for j in range(0, N): 508 <strong>ch = results.create()</strong> 509 <strong>if ch:</strong> 510 <strong>try: # Calculation work.</strong> 511 512 time.sleep(delay) 513 <strong>ch.send(i * N + j)</strong> 514 515 <strong>finally: # Important finalisation.</strong> 516 517 <strong>pprocess.exit(ch)</strong> 518 519 # Show the results. 520 521 for i in range(0, N): 522 for result in results[i*N:i*N+N]: 523 print result, 524 print 525 526 print "Time taken:", time.time() - t 527 </pre> 528 529 <p>(This code in context with <code>import</code> statements and functions is 530 found in the <code>examples/simple_create_map.py</code> file.)</p> 531 532 <p>Although seemingly more complicated, the bulk of the changes in this 533 modified program are focused on obtaining a channel object, <code>ch</code>, 534 at the point where the computations are performed, and the wrapping of the 535 computation code in a <code>try</code>...<code>finally</code> statement which 536 ensures that the process associated with the channel exits when the 537 computation is complete. In order for the results of these computations to be 538 collected, a <code>pprocess.Map</code> object is used, since it will maintain 539 the results in the same order as the initiation of the computations which 540 produced them.</p> 541 542 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2> 543 544 <p>One notable aspect of the above programs when parallelised is that each 545 invocation of a computation in parallel creates a new process in which the 546 computation is to be performed, regardless of whether existing processes had 547 just finished producing results and could theoretically have been asked to 548 perform new computations. In other words, processes were created and destroyed 549 instead of being reused.</p> 550 551 <p>However, we can request that processes be reused for computations by 552 enabling the <code>reuse</code> feature of exchange-like objects and employing 553 suitable reusable callables. Consider this modified version of the <a 554 href="#simple_managed_map">simple_managed_map</a> program:</p> 555 556 <pre> 557 t = time.time() 558 559 # Initialise the results using a map with a limit on the number of 560 # channels/processes. 561 562 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 563 564 # Wrap the calculate function and manage it. 565 566 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 567 568 # Perform the work. 569 570 print "Calculating..." 571 for i in range(0, N): 572 for j in range(0, N): 573 calc(i, j) 574 575 # Show the results. 576 577 for i in range(0, N): 578 for result in results[i*N:i*N+N]: 579 print result, 580 print 581 582 print "Time taken:", time.time() - t 583 </pre> 584 585 <p>(This code in context with <code>import</code> statements and functions is 586 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 587 588 <p>By indicating that processes and channels shall be reused, and by wrapping 589 the <code>calculate</code> function with the necessary support, the 590 computations may be performed in parallel using a pool of processes instead of 591 creating a new process for each computation and then discarding it, only to 592 create a new process for the next computation.</p> 593 594 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2> 595 596 <p>Occasionally, it is desirable to initiate time-consuming computations and to 597 not only leave such processes running in the background, but to be able to detach 598 the creating process from them completely, potentially terminating the creating 599 process altogether, and yet also be able to collect the results of the created 600 processes at a later time, potentially in another completely different process. 601 For such situations, we can make use of the <code>BackgroundCallable</code> 602 class, which converts a parallel-aware callable into a callable which will run 603 in a background process when invoked.</p> 604 605 <p>Consider this excerpt from a modified version of the <a 606 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 607 608 <pre> 609 <strong>def task():</strong> 610 611 # Initialise the communications queue with a limit on the number of 612 # channels/processes. 613 614 queue = pprocess.Queue(limit=limit) 615 616 # Initialise an array. 617 618 results = [0] * N * N 619 620 # Wrap the calculate function and manage it. 621 622 calc = queue.manage(pprocess.MakeParallel(calculate)) 623 624 # Perform the work. 625 626 print "Calculating..." 627 for i in range(0, N): 628 for j in range(0, N): 629 calc(i, j) 630 631 # Store the results as they arrive. 632 633 print "Finishing..." 634 for i, j, result in queue: 635 results[i*N+j] = result 636 637 <strong>return results</strong> 638 </pre> 639 640 <p>Here, we have converted the main program into a function, and instead of 641 printing out the results, we return the results list from the function.</p> 642 643 <p>Now, let us consider the new main program (with the relevant mechanisms 644 highlighted):</p> 645 646 <pre> 647 t = time.time() 648 649 if "--reconnect" not in sys.argv: 650 651 # Wrap the computation and manage it. 652 653 <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong> 654 655 # Perform the work. 656 657 ptask() 658 659 # Discard the callable. 660 661 del ptask 662 print "Discarded the callable." 663 664 if "--start" not in sys.argv: 665 666 # Open a queue and reconnect to the task. 667 668 print "Opening a queue." 669 <strong>queue = pprocess.BackgroundQueue("task.socket")</strong> 670 671 # Wait for the results. 672 673 print "Waiting for persistent results" 674 for results in queue: 675 pass # should only be one element 676 677 # Show the results. 678 679 for i in range(0, N): 680 for result in results[i*N:i*N+N]: 681 print result, 682 print 683 684 print "Time taken:", time.time() - t 685 </pre> 686 687 <p>(This code in context with <code>import</code> statements and functions is 688 found in the <code>examples/simple_background_queue.py</code> file.)</p> 689 690 <p>This new main program has two parts: the part which initiates the 691 computation, and the part which connects to the computation in order to collect 692 the results. Both parts can be run in the same process, and this should result 693 in similar behaviour to that of the original 694 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p> 695 696 <p>In the above program, however, we are free to specify <code>--start</code> as 697 an option when running the program, and the result of this is merely to initiate 698 the computation in a background process, using <code>BackgroundCallable</code> 699 to obtain a callable which, when invoked, creates the background process and 700 runs the computation. After doing this, the program will then exit, but it will 701 leave the computation running as a collection of background processes, and a 702 special file called <code>task.socket</code> will exist in the current working 703 directory.</p> 704 705 <p>When the above program is run using the <code>--reconnect</code> option, an 706 attempt will be made to reconnect to the background processes already created by 707 attempting to contact them using the previously created <code>task.socket</code> 708 special file (which is, in fact, a UNIX-domain socket); this being done using 709 the <code>BackgroundQueue</code> function which will handle the incoming results 710 in a fashion similar to that of a <code>Queue</code> object. Since only one 711 result is returned by the computation (as defined by the <code>return</code> 712 statement in the <code>task</code> function), we need only expect one element to 713 be collected by the queue: a list containing all of the results produced in the 714 computation.</p> 715 716 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2> 717 718 <p>In the above example, a single background process was used to manage a number 719 of other processes, with all of them running in the background. However, it can 720 be desirable to manage more than one background process.</p> 721 722 <p>Consider this excerpt from a modified version of the <a 723 href="#simple_managed_queue">simple_managed_queue</a> program:</p> 724 725 <pre> 726 <strong>def task(i):</strong> 727 728 # Initialise the communications queue with a limit on the number of 729 # channels/processes. 730 731 queue = pprocess.Queue(limit=limit) 732 733 # Initialise an array. 734 735 results = [0] * N 736 737 # Wrap the calculate function and manage it. 738 739 calc = queue.manage(pprocess.MakeParallel(calculate)) 740 741 # Perform the work. 742 743 print "Calculating..." 744 <strong>for j in range(0, N):</strong> 745 <strong>calc(i, j)</strong> 746 747 # Store the results as they arrive. 748 749 print "Finishing..." 750 <strong>for i, j, result in queue:</strong> 751 <strong>results[j] = result</strong> 752 753 <strong>return i, results</strong> 754 </pre> 755 756 <p>Just as we see in the previous example, a function called <code>task</code> 757 has been defined to hold a background computation, and this function returns a 758 portion of the results. However, unlike the previous example or the original 759 example, the scope of the results of the computation collected in the function 760 have been changed: here, only results for calculations involving a certain value 761 of <code>i</code> are collected, with the particular value of <code>i</code> 762 returned along with the appropriate portion of the results.</p> 763 764 <p>Now, let us consider the new main program (with the relevant mechanisms 765 highlighted):</p> 766 767 <pre> 768 t = time.time() 769 770 if "--reconnect" not in sys.argv: 771 772 # Wrap the computation and manage it. 773 774 <strong>ptask = pprocess.MakeParallel(task)</strong> 775 776 <strong>for i in range(0, N):</strong> 777 778 # Make a distinct callable for each part of the computation. 779 780 <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong> 781 782 # Perform the work. 783 784 <strong>ptask_i(i)</strong> 785 786 # Discard the callable. 787 788 del ptask 789 print "Discarded the callable." 790 791 if "--start" not in sys.argv: 792 793 # Open a queue and reconnect to the task. 794 795 print "Opening a queue." 796 <strong>queue = pprocess.PersistentQueue()</strong> 797 <strong>for i in range(0, N):</strong> 798 <strong>queue.connect("task-%d.socket" % i)</strong> 799 800 # Initialise an array. 801 802 <strong>results = [0] * N</strong> 803 804 # Wait for the results. 805 806 print "Waiting for persistent results" 807 <strong>for i, result in queue:</strong> 808 <strong>results[i] = result</strong> 809 810 # Show the results. 811 812 for i in range(0, N): 813 <strong>for result in results[i]:</strong> 814 print result, 815 print 816 817 print "Time taken:", time.time() - t 818 </pre> 819 820 <p>(This code in context with <code>import</code> statements and functions is 821 found in the <code>examples/simple_persistent_queue.py</code> file.)</p> 822 823 <p>In the first section, the process of making a parallel-aware callable is as 824 expected, but instead of then invoking a single background version of such a 825 callable, as in the previous example, we create a version for each value of 826 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one. 827 The result of this is a total of <code>N</code> background processes, each 828 running an invocation of the <code>task</code> function with a distinct value of 829 <code>i</code> (which in turn perform computations), and each employing a 830 UNIX-domain socket for communication with a name of the form 831 <code>task-<em>i</em>.socket</code>.</p> 832 833 <p>In the second section, since we now have more than one background process, we 834 must find a way to monitor them after reconnecting to them; to achieve this, a 835 <code>PersistentQueue</code> is created, which acts like a regular 836 <code>Queue</code> object but is instead focused on handling persistent 837 communications. Upon connecting the queue to each of the previously created 838 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and 839 exposes received results through an iterator. Here, the principal difference 840 from previous examples is the structure of results: instead of collecting each 841 individual value in a flat <code>i</code> by <code>j</code> array, a list is 842 returned for each value of <code>i</code> and is stored directly in another 843 list.</p> 844 845 <h3>Applications of Background Computations</h3> 846 847 <p>Background computations are useful because they provide flexibility in the 848 way the results can be collected. One area in which they can be useful is Web 849 programming, where a process handling an incoming HTTP request may need to 850 initiate a computation but then immediately send output to the Web client - such 851 as a page indicating that the computation is "in progress" - without having to 852 wait for the computation or to allocate resources to monitor it. Moreover, in 853 some Web architectures, notably those employing the Common Gateway Interface 854 (CGI), it is necessary for a process handling an incoming request to terminate 855 before its output will be sent to clients. By using a 856 <code>BackgroundCallable</code>, a Web server process can initiate a 857 communication, and then subsequent server processes can be used to reconnect to 858 the background computation and to wait efficiently for results.</p> 859 860 <h2 id="Summary">Summary</h2> 861 862 <p>The following table indicates the features used in converting one 863 sequential example program to another parallel program:</p> 864 865 <table border="1" cellspacing="0" cellpadding="5"> 866 <thead> 867 <tr> 868 <th>Sequential Example</th> 869 <th>Parallel Example</th> 870 <th>Features Used</th> 871 </tr> 872 </thead> 873 <tbody> 874 <tr> 875 <td>simple_map</td> 876 <td>simple_pmap</td> 877 <td>pmap</td> 878 </tr> 879 <tr> 880 <td>simple1</td> 881 <td>simple_managed_map</td> 882 <td>MakeParallel, Map, manage</td> 883 </tr> 884 <tr> 885 <td rowspan="5">simple2</td> 886 <td>simple_managed_queue</td> 887 <td>MakeParallel, Queue, manage</td> 888 </tr> 889 <tr> 890 <td>simple_managed</td> 891 <td>MakeParallel, Exchange (subclass), manage, finish</td> 892 </tr> 893 <tr> 894 <td>simple_start</td> 895 <td>Channel, Exchange (subclass), start, finish</td> 896 </tr> 897 <tr> 898 <td>simple_background_queue</td> 899 <td>MakeParallel, BackgroundCallable, BackgroundQueue</td> 900 </tr> 901 <tr> 902 <td>simple_persistent_queue</td> 903 <td>MakeParallel, BackgroundCallable, PersistentQueue</td> 904 </tr> 905 <tr> 906 <td>simple</td> 907 <td>simple_create_map</td> 908 <td>Channel, Map, create, exit</td> 909 </tr> 910 </tbody> 911 </table> 912 913 </body> 914 </html>