pprocess

docs/tutorial.html

157:91b47853bf44
2009-06-04 Paul Boddie Added continuous communications support to the Exchange class. Added an example illustrating continuous communications.
     1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">     2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">     3 <head>     4   <meta http-equiv="content-type" content="text/html; charset=UTF-8" />     5   <title>pprocess - Tutorial</title>     6   <link href="styles.css" rel="stylesheet" type="text/css" />     7 </head>     8 <body>     9     10 <h1>pprocess - Tutorial</h1>    11     12 <p>The <code>pprocess</code> module provides several mechanisms for running    13 Python code concurrently in several processes. The most straightforward way of    14 making a program parallel-aware - that is, where the program can take    15 advantage of more than one processor to simultaneously process data - is to    16 use the <code>pmap</code> function.</p>    17     18 <ul>    19 <li><a href="#pmap">Converting Map-Style Code</a></li>    20 <li><a href="#Map">Converting Invocations to Parallel Operations</a></li>    21 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a></li>    22 <li><a href="#create">Converting Inline Computations</a></li>    23 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li>    24 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li>    25 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li>    26 <li><a href="#Summary">Summary</a></li>    27 </ul>    28     29 <p>For a brief summary of each of the features of <code>pprocess</code>, see    30 the <a href="reference.html">reference document</a>.</p>    31     32 <h2 id="pmap">Converting Map-Style Code</h2>    33     34 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>    35     36 <pre>    37     t = time.time()    38     39     # Initialise an array.    40     41     sequence = []    42     for i in range(0, N):    43         for j in range(0, N):    44             sequence.append((i, j))    45     46     # Perform the work.    47     48     results = map(calculate, sequence)    49     50     # Show the results.    51     52     for i in range(0, N):    53         for result in results[i*N:i*N+N]:    54             print result,    55         print    56     57     print "Time taken:", time.time() - t</pre>    58     59 <p>(This code in context with <code>import</code> statements and functions is    60 found in the <code>examples/simple_map.py</code> file.)</p>    61     62 <p>The principal features of this program involve the preparation of an array    63 for input purposes, and the use of the <code>map</code> function to iterate    64 over the combinations of <code>i</code> and <code>j</code> in the array. Even    65 if the <code>calculate</code> function could be invoked independently for each    66 input value, we have to wait for each computation to complete before    67 initiating a new one. The <code>calculate</code> function may be defined as    68 follows:</p>    69     70 <pre>    71 def calculate(t):    72     73     "A supposedly time-consuming calculation on 't'."    74     75     i, j = t    76     time.sleep(delay)    77     return i * N + j    78 </pre>    79     80 <p>In order to reduce the processing time - to speed the code up, in other    81 words - we can make this code use several processes instead of just one. Here    82 is the modified code:</p>    83     84 <pre>    85     t = time.time()    86     87     # Initialise an array.    88     89     sequence = []    90     for i in range(0, N):    91         for j in range(0, N):    92             sequence.append((i, j))    93     94     # Perform the work.    95     96     results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)    97     98     # Show the results.    99    100     for i in range(0, N):   101         for result in results[i*N:i*N+N]:   102             print result,   103         print   104    105     print "Time taken:", time.time() - t</pre>   106    107 <p>(This code in context with <code>import</code> statements and functions is   108 found in the <code>examples/simple_pmap.py</code> file.)</p>   109    110 <p>By replacing usage of the <code>map</code> function with the   111 <code>pprocess.pmap</code> function, and specifying the limit on the number of   112 processes to be active at any given time (the value of the <code>limit</code>   113 variable is defined elsewhere), several calculations can now be performed in   114 parallel.</p>   115    116 <h2 id="Map">Converting Invocations to Parallel Operations</h2>   117    118 <p>Although some programs make natural use of the <code>map</code> function,   119 others may employ an invocation in a nested loop. This may also be converted   120 to a parallel program. Consider the following Python code:</p>   121    122 <pre>   123     t = time.time()   124    125     # Initialise an array.   126    127     results = []   128    129     # Perform the work.   130    131     print "Calculating..."   132     for i in range(0, N):   133         for j in range(0, N):   134             results.append(calculate(i, j))   135    136     # Show the results.   137    138     for i in range(0, N):   139         for result in results[i*N:i*N+N]:   140             print result,   141         print   142    143     print "Time taken:", time.time() - t</pre>   144    145 <p>(This code in context with <code>import</code> statements and functions is   146 found in the <code>examples/simple1.py</code> file.)</p>   147    148 <p>Here, a computation in the <code>calculate</code> function is performed for   149 each combination of <code>i</code> and <code>j</code> in the nested loop,   150 returning a result value. However, we must wait for the completion of this   151 function for each element before moving on to the next element, and this means   152 that the computations are performed sequentially. Consequently, on a system   153 with more than one processor, even if we could call <code>calculate</code> for   154 more than one combination of <code>i</code> and <code>j</code><code></code>   155 and have the computations executing at the same time, the above program will   156 not take advantage of such capabilities.</p>   157    158 <p>We use a slightly modified version of <code>calculate</code> which employs   159 two parameters instead of one:</p>   160    161 <pre>   162 def calculate(i, j):   163    164     """   165     A supposedly time-consuming calculation on 'i' and 'j'.   166     """   167    168     time.sleep(delay)   169     return i * N + j   170 </pre>   171    172 <p>In order to reduce the processing time - to speed the code up, in other   173 words - we can make this code use several processes instead of just one. Here   174 is the modified code:</p>   175    176 <pre id="simple_managed_map">   177     t = time.time()   178    179     # Initialise the results using a map with a limit on the number of   180     # channels/processes.   181    182     <strong>results = pprocess.Map(limit=limit)</strong><code></code>   183    184     # Wrap the calculate function and manage it.   185    186     <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>   187    188     # Perform the work.   189    190     print "Calculating..."   191     for i in range(0, N):   192         for j in range(0, N):   193             <strong>calc</strong>(i, j)   194    195     # Show the results.   196    197     for i in range(0, N):   198         for result in results[i*N:i*N+N]:   199             print result,   200         print   201    202     print "Time taken:", time.time() - t</pre>   203    204 <p>(This code in context with <code>import</code> statements and functions is   205 found in the <code>examples/simple_managed_map.py</code> file.)</p>   206    207 <p>The principal changes in the above code involve the use of a   208 <code>pprocess.Map</code> object to collect the results, and a version of the   209 <code>calculate</code> function which is managed by the <code>Map</code>   210 object. What the <code>Map</code> object does is to arrange the results of   211 computations such that iterating over the object or accessing the object using   212 list operations provides the results in the same order as their corresponding   213 inputs.</p>   214    215 <h2 id="Queue">Converting Arbitrarily-Ordered Invocations</h2>   216    217 <p>In some programs, it is not important to receive the results of   218 computations in any particular order, usually because either the order of   219 these results is irrelevant, or because the results provide "positional"   220 information which let them be handled in an appropriate way. Consider the   221 following Python code:</p>   222    223 <pre>   224     t = time.time()   225    226     # Initialise an array.   227    228     results = [0] * N * N   229    230     # Perform the work.   231    232     print "Calculating..."   233     for i in range(0, N):   234         for j in range(0, N):   235             i2, j2, result = calculate(i, j)   236             results[i2*N+j2] = result   237    238     # Show the results.   239    240     for i in range(0, N):   241         for result in results[i*N:i*N+N]:   242             print result,   243         print   244    245     print "Time taken:", time.time() - t   246 </pre>   247    248 <p>(This code in context with <code>import</code> statements and functions is   249 found in the <code>examples/simple2.py</code> file.)</p>   250    251 <p>Here, a result array is initialised first and each computation is performed   252 sequentially. A significant difference to the previous examples is the return   253 value of the <code>calculate</code> function: the position details   254 corresponding to <code>i</code> and <code>j</code> are returned alongside the   255 result. Obviously, this is of limited value in the above code because the   256 order of the computations and the reception of results is fixed. However, we   257 get no benefit from parallelisation in the above example.</p>   258    259 <p>We can bring the benefits of parallel processing to the above program with   260 the following code:</p>   261    262 <pre id="simple_managed_queue">   263     t = time.time()   264    265     # Initialise the communications queue with a limit on the number of   266     # channels/processes.   267    268     <strong>queue = pprocess.Queue(limit=limit)</strong>   269    270     # Initialise an array.   271    272     results = [0] * N * N   273    274     # Wrap the calculate function and manage it.   275    276     <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>   277    278     # Perform the work.   279    280     print "Calculating..."   281     for i in range(0, N):   282         for j in range(0, N):   283             <strong>calc(i, j)</strong>   284    285     # Store the results as they arrive.   286    287     print "Finishing..."   288     <strong>for i, j, result in queue:</strong>   289         <strong>results[i*N+j] = result</strong>   290    291     # Show the results.   292    293     for i in range(0, N):   294         for result in results[i*N:i*N+N]:   295             print result,   296         print   297    298     print "Time taken:", time.time() - t   299 </pre>   300    301 <p>(This code in context with <code>import</code> statements and functions is   302 found in the <code>examples/simple_managed_queue.py</code> file.)</p>   303    304 <p>This revised code employs a <code>pprocess.Queue</code> object whose   305 purpose is to collect the results of computations and to make them available   306 in the order in which they were received. The code collecting results has been   307 moved into a separate loop independent of the original computation loop and   308 taking advantage of the more relevant "positional" information emerging from   309 the queue.</p>   310    311 <p>We can take this example further, illustrating some of the mechanisms   312 employed by <code>pprocess</code>. Instead of collecting results in a queue,   313 we can define a class containing a method which is called when new results   314 arrive:</p>   315    316 <pre>   317 class MyExchange(pprocess.Exchange):   318    319     "Parallel convenience class containing the array assignment operation."   320    321     def store_data(self, ch):   322         i, j, result = ch.receive()   323         self.D[i*N+j] = result   324 </pre>   325    326 <p>This code exposes the channel paradigm which is used throughout   327 <code>pprocess</code> and is available to applications, if desired. The effect   328 of the method is the storage of a result received through the channel in an   329 attribute of the object. The following code shows how this class can be used,   330 with differences to the previous program illustrated:</p>   331    332 <pre>   333     t = time.time()   334    335     # Initialise the communications exchange with a limit on the number of   336     # channels/processes.   337    338     <strong>exchange = MyExchange(limit=limit)</strong>   339    340     # Initialise an array - it is stored in the exchange to permit automatic   341     # assignment of values as the data arrives.   342    343     <strong>results = exchange.D = [0] * N * N</strong>   344    345     # Wrap the calculate function and manage it.   346    347     calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))   348    349     # Perform the work.   350    351     print "Calculating..."   352     for i in range(0, N):   353         for j in range(0, N):   354             calc(i, j)   355    356     # Wait for the results.   357    358     print "Finishing..."   359     <strong>exchange.finish()</strong>   360    361     # Show the results.   362    363     for i in range(0, N):   364         for result in results[i*N:i*N+N]:   365             print result,   366         print   367    368     print "Time taken:", time.time() - t   369 </pre>   370    371 <p>(This code in context with <code>import</code> statements and functions is   372 found in the <code>examples/simple_managed.py</code> file.)</p>   373    374 <p>The main visible differences between this and the previous program are the   375 storage of the result array in the exchange, the removal of the queue   376 consumption code from the main program, placing the act of storing values in   377 the exchange's <code>store_data</code> method, and the need to call the   378 <code>finish</code> method on the <code>MyExchange</code> object so that we do   379 not try and access the results too soon. One underlying benefit not visible in   380 the above code is that we no longer need to accumulate results in a queue or   381 other structure so that they may be processed and assigned to the correct   382 positions in the result array.</p>   383    384 <p>For the curious, we may remove some of the remaining conveniences of the   385 above program to expose other features of <code>pprocess</code>. First, we   386 define a slightly modified version of the <code>calculate</code> function:</p>   387    388 <pre>   389 def calculate(ch, i, j):   390    391     """   392     A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to   393     communicate with the parent process.   394     """   395    396     time.sleep(delay)   397     ch.send((i, j, i * N + j))   398 </pre>   399    400 <p>This function accepts a channel, <code>ch</code>, through which results   401 will be sent, and through which other values could potentially be received,   402 although we choose not to do so here. The program using this function is as   403 follows, with differences to the previous program illustrated:</p>   404    405 <pre>   406     t = time.time()   407    408     # Initialise the communications exchange with a limit on the number of   409     # channels/processes.   410    411     exchange = MyExchange(limit=limit)   412    413     # Initialise an array - it is stored in the exchange to permit automatic   414     # assignment of values as the data arrives.   415    416     results = exchange.D = [0] * N * N   417    418     # Perform the work.   419    420     print "Calculating..."   421     for i in range(0, N):   422         for j in range(0, N):   423             <strong>exchange.start(calculate, i, j)</strong>   424    425     # Wait for the results.   426    427     print "Finishing..."   428     exchange.finish()   429    430     # Show the results.   431    432     for i in range(0, N):   433         for result in results[i*N:i*N+N]:   434             print result,   435         print   436    437     print "Time taken:", time.time() - t   438 </pre>   439    440 <p>(This code in context with <code>import</code> statements and functions is   441 found in the <code>examples/simple_start.py</code> file.)</p>   442    443 <p>Here, we have discarded two conveniences: the wrapping of callables using   444 <code>MakeParallel</code>, which lets us use functions without providing any   445 channel parameters, and the management of callables using the   446 <code>manage</code> method on queues, exchanges, and so on. The   447 <code>start</code> method still calls the provided callable, but using a   448 different notation from that employed previously.</p>   449    450 <h2 id="create">Converting Inline Computations</h2>   451    452 <p>Although many programs employ functions and other useful abstractions which   453 can be treated as parallelisable units, some programs perform computations   454 "inline", meaning that the code responsible appears directly within a loop or   455 related control-flow construct. Consider the following code:</p>   456    457 <pre>   458     t = time.time()   459    460     # Initialise an array.   461    462     results = [0] * N * N   463    464     # Perform the work.   465    466     print "Calculating..."   467     for i in range(0, N):   468         for j in range(0, N):   469             time.sleep(delay)   470             results[i*N+j] = i * N + j   471    472     # Show the results.   473    474     for i in range(0, N):   475         for result in results[i*N:i*N+N]:   476             print result,   477         print   478    479     print "Time taken:", time.time() - t   480 </pre>   481    482 <p>(This code in context with <code>import</code> statements and functions is   483 found in the <code>examples/simple.py</code> file.)</p>   484    485 <p>To simulate "work", as in the different versions of the   486 <code>calculate</code> function, we use the <code>time.sleep</code> function   487 (which does not actually do work, and which will cause a process to be   488 descheduled in most cases, but which simulates the delay associated with work   489 being done). This inline work, which must be performed sequentially in the   490 above program, can be performed in parallel in a somewhat modified version of   491 the program:</p>   492    493 <pre>   494     t = time.time()   495    496     # Initialise the results using a map with a limit on the number of   497     # channels/processes.   498    499     <strong>results = pprocess.Map(limit=limit)</strong>   500    501     # Perform the work.   502     # NOTE: Could use the with statement in the loop to package the   503     # NOTE: try...finally functionality.   504    505     print "Calculating..."   506     for i in range(0, N):   507         for j in range(0, N):   508             <strong>ch = results.create()</strong>   509             <strong>if ch:</strong>   510                 <strong>try: # Calculation work.</strong>   511    512                     time.sleep(delay)   513                     <strong>ch.send(i * N + j)</strong>   514    515                 <strong>finally: # Important finalisation.</strong>   516    517                     <strong>pprocess.exit(ch)</strong>   518    519     # Show the results.   520    521     for i in range(0, N):   522         for result in results[i*N:i*N+N]:   523             print result,   524         print   525    526     print "Time taken:", time.time() - t   527 </pre>   528    529 <p>(This code in context with <code>import</code> statements and functions is   530 found in the <code>examples/simple_create_map.py</code> file.)</p>   531    532 <p>Although seemingly more complicated, the bulk of the changes in this   533 modified program are focused on obtaining a channel object, <code>ch</code>,   534 at the point where the computations are performed, and the wrapping of the   535 computation code in a <code>try</code>...<code>finally</code> statement which   536 ensures that the process associated with the channel exits when the   537 computation is complete. In order for the results of these computations to be   538 collected, a <code>pprocess.Map</code> object is used, since it will maintain   539 the results in the same order as the initiation of the computations which   540 produced them.</p>   541    542 <h2 id="MakeReusable">Reusing Processes in Parallel Programs</h2>   543    544 <p>One notable aspect of the above programs when parallelised is that each   545 invocation of a computation in parallel creates a new process in which the   546 computation is to be performed, regardless of whether existing processes had   547 just finished producing results and could theoretically have been asked to   548 perform new computations. In other words, processes were created and destroyed   549 instead of being reused.</p>   550    551 <p>However, we can request that processes be reused for computations by   552 enabling the <code>reuse</code> feature of exchange-like objects and employing   553 suitable reusable callables. Consider this modified version of the <a   554 href="#simple_managed_map">simple_managed_map</a> program:</p>   555    556 <pre>   557     t = time.time()   558    559     # Initialise the results using a map with a limit on the number of   560     # channels/processes.   561    562     results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)   563    564     # Wrap the calculate function and manage it.   565    566     calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))   567    568     # Perform the work.   569    570     print "Calculating..."   571     for i in range(0, N):   572         for j in range(0, N):   573             calc(i, j)   574    575     # Show the results.   576    577     for i in range(0, N):   578         for result in results[i*N:i*N+N]:   579             print result,   580         print   581    582     print "Time taken:", time.time() - t   583 </pre>   584    585 <p>(This code in context with <code>import</code> statements and functions is   586 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>   587    588 <p>By indicating that processes and channels shall be reused, and by wrapping   589 the <code>calculate</code> function with the necessary support, the   590 computations may be performed in parallel using a pool of processes instead of   591 creating a new process for each computation and then discarding it, only to   592 create a new process for the next computation.</p>   593    594 <h2 id="BackgroundCallable">Performing Computations in Background Processes</h2>   595    596 <p>Occasionally, it is desirable to initiate time-consuming computations and to   597 not only leave such processes running in the background, but to be able to detach   598 the creating process from them completely, potentially terminating the creating   599 process altogether, and yet also be able to collect the results of the created   600 processes at a later time, potentially in another completely different process.   601 For such situations, we can make use of the <code>BackgroundCallable</code>   602 class, which converts a parallel-aware callable into a callable which will run   603 in a background process when invoked.</p>   604    605 <p>Consider this excerpt from a modified version of the <a   606 href="#simple_managed_queue">simple_managed_queue</a> program:</p>   607    608 <pre>   609 <strong>def task():</strong>   610    611     # Initialise the communications queue with a limit on the number of   612     # channels/processes.   613    614     queue = pprocess.Queue(limit=limit)   615    616     # Initialise an array.   617    618     results = [0] * N * N   619    620     # Wrap the calculate function and manage it.   621    622     calc = queue.manage(pprocess.MakeParallel(calculate))   623    624     # Perform the work.   625    626     print "Calculating..."   627     for i in range(0, N):   628         for j in range(0, N):   629             calc(i, j)   630    631     # Store the results as they arrive.   632    633     print "Finishing..."   634     for i, j, result in queue:   635         results[i*N+j] = result   636    637     <strong>return results</strong>   638 </pre>   639    640 <p>Here, we have converted the main program into a function, and instead of   641 printing out the results, we return the results list from the function.</p>   642    643 <p>Now, let us consider the new main program (with the relevant mechanisms   644 highlighted):</p>   645    646 <pre>   647     t = time.time()   648    649     if "--reconnect" not in sys.argv:   650    651         # Wrap the computation and manage it.   652    653         <strong>ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))</strong>   654    655         # Perform the work.   656    657         ptask()   658    659         # Discard the callable.   660    661         del ptask   662         print "Discarded the callable."   663    664     if "--start" not in sys.argv:   665    666         # Open a queue and reconnect to the task.   667    668         print "Opening a queue."   669         <strong>queue = pprocess.BackgroundQueue("task.socket")</strong>   670    671         # Wait for the results.   672    673         print "Waiting for persistent results"   674         for results in queue:   675             pass # should only be one element   676    677         # Show the results.   678    679         for i in range(0, N):   680             for result in results[i*N:i*N+N]:   681                 print result,   682             print   683    684         print "Time taken:", time.time() - t   685 </pre>   686    687 <p>(This code in context with <code>import</code> statements and functions is   688 found in the <code>examples/simple_background_queue.py</code> file.)</p>   689    690 <p>This new main program has two parts: the part which initiates the   691 computation, and the part which connects to the computation in order to collect   692 the results. Both parts can be run in the same process, and this should result   693 in similar behaviour to that of the original   694 <a href="#simple_managed_queue">simple_managed_queue</a> program.</p>   695    696 <p>In the above program, however, we are free to specify <code>--start</code> as   697 an option when running the program, and the result of this is merely to initiate   698 the computation in a background process, using <code>BackgroundCallable</code>   699 to obtain a callable which, when invoked, creates the background process and   700 runs the computation. After doing this, the program will then exit, but it will   701 leave the computation running as a collection of background processes, and a   702 special file called <code>task.socket</code> will exist in the current working   703 directory.</p>   704    705 <p>When the above program is run using the <code>--reconnect</code> option, an   706 attempt will be made to reconnect to the background processes already created by   707 attempting to contact them using the previously created <code>task.socket</code>   708 special file (which is, in fact, a UNIX-domain socket); this being done using   709 the <code>BackgroundQueue</code> function which will handle the incoming results   710 in a fashion similar to that of a <code>Queue</code> object. Since only one   711 result is returned by the computation (as defined by the <code>return</code>   712 statement in the <code>task</code> function), we need only expect one element to   713 be collected by the queue: a list containing all of the results produced in the   714 computation.</p>   715    716 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2>   717    718 <p>In the above example, a single background process was used to manage a number   719 of other processes, with all of them running in the background. However, it can   720 be desirable to manage more than one background process.</p>   721    722 <p>Consider this excerpt from a modified version of the <a   723 href="#simple_managed_queue">simple_managed_queue</a> program:</p>   724    725 <pre>   726 <strong>def task(i):</strong>   727    728     # Initialise the communications queue with a limit on the number of   729     # channels/processes.   730    731     queue = pprocess.Queue(limit=limit)   732    733     # Initialise an array.   734    735     results = [0] * N   736    737     # Wrap the calculate function and manage it.   738    739     calc = queue.manage(pprocess.MakeParallel(calculate))   740    741     # Perform the work.   742    743     print "Calculating..."   744     <strong>for j in range(0, N):</strong>   745         <strong>calc(i, j)</strong>   746    747     # Store the results as they arrive.   748    749     print "Finishing..."   750     <strong>for i, j, result in queue:</strong>   751         <strong>results[j] = result</strong>   752    753     <strong>return i, results</strong>   754 </pre>   755    756 <p>Just as we see in the previous example, a function called <code>task</code>   757 has been defined to hold a background computation, and this function returns a   758 portion of the results. However, unlike the previous example or the original   759 example, the scope of the results of the computation collected in the function   760 have been changed: here, only results for calculations involving a certain value   761 of <code>i</code> are collected, with the particular value of <code>i</code>   762 returned along with the appropriate portion of the results.</p>   763    764 <p>Now, let us consider the new main program (with the relevant mechanisms   765 highlighted):</p>   766    767 <pre>   768     t = time.time()   769    770     if "--reconnect" not in sys.argv:   771    772         # Wrap the computation and manage it.   773    774         <strong>ptask = pprocess.MakeParallel(task)</strong>   775    776         <strong>for i in range(0, N):</strong>   777    778             # Make a distinct callable for each part of the computation.   779    780             <strong>ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)</strong>   781    782             # Perform the work.   783    784             <strong>ptask_i(i)</strong>   785    786         # Discard the callable.   787    788         del ptask   789         print "Discarded the callable."   790    791     if "--start" not in sys.argv:   792    793         # Open a queue and reconnect to the task.   794    795         print "Opening a queue."   796         <strong>queue = pprocess.PersistentQueue()</strong>   797         <strong>for i in range(0, N):</strong>   798             <strong>queue.connect("task-%d.socket" % i)</strong>   799    800         # Initialise an array.   801    802         <strong>results = [0] * N</strong>   803    804         # Wait for the results.   805    806         print "Waiting for persistent results"   807         <strong>for i, result in queue:</strong>   808             <strong>results[i] = result</strong>   809    810         # Show the results.   811    812         for i in range(0, N):   813             <strong>for result in results[i]:</strong>   814                 print result,   815             print   816    817         print "Time taken:", time.time() - t   818 </pre>   819    820 <p>(This code in context with <code>import</code> statements and functions is   821 found in the <code>examples/simple_persistent_queue.py</code> file.)</p>   822    823 <p>In the first section, the process of making a parallel-aware callable is as   824 expected, but instead of then invoking a single background version of such a   825 callable, as in the previous example, we create a version for each value of   826 <code>i</code> (using <code>BackgroundCallable</code>) and then invoke each one.   827 The result of this is a total of <code>N</code> background processes, each   828 running an invocation of the <code>task</code> function with a distinct value of   829 <code>i</code> (which in turn perform computations), and each employing a   830 UNIX-domain socket for communication with a name of the form   831 <code>task-<em>i</em>.socket</code>.</p>   832    833 <p>In the second section, since we now have more than one background process, we   834 must find a way to monitor them after reconnecting to them; to achieve this, a   835 <code>PersistentQueue</code> is created, which acts like a regular   836 <code>Queue</code> object but is instead focused on handling persistent   837 communications. Upon connecting the queue to each of the previously created   838 UNIX-domain sockets, the queue acts like a regular <code>Queue</code> and   839 exposes received results through an iterator. Here, the principal difference   840 from previous examples is the structure of results: instead of collecting each   841 individual value in a flat <code>i</code> by <code>j</code> array, a list is   842 returned for each value of <code>i</code> and is stored directly in another   843 list.</p>   844    845 <h3>Applications of Background Computations</h3>   846    847 <p>Background computations are useful because they provide flexibility in the   848 way the results can be collected. One area in which they can be useful is Web   849 programming, where a process handling an incoming HTTP request may need to   850 initiate a computation but then immediately send output to the Web client - such   851 as a page indicating that the computation is "in progress" - without having to   852 wait for the computation or to allocate resources to monitor it. Moreover, in   853 some Web architectures, notably those employing the Common Gateway Interface   854 (CGI), it is necessary for a process handling an incoming request to terminate   855 before its output will be sent to clients. By using a   856 <code>BackgroundCallable</code>, a Web server process can initiate a   857 communication, and then subsequent server processes can be used to reconnect to   858 the background computation and to wait efficiently for results.</p>   859    860 <h2 id="Summary">Summary</h2>   861    862 <p>The following table indicates the features used in converting one   863 sequential example program to another parallel program:</p>   864    865 <table border="1" cellspacing="0" cellpadding="5">   866   <thead>   867     <tr>   868       <th>Sequential Example</th>   869       <th>Parallel Example</th>   870       <th>Features Used</th>   871     </tr>   872   </thead>   873   <tbody>   874     <tr>   875       <td>simple_map</td>   876       <td>simple_pmap</td>   877       <td>pmap</td>   878     </tr>   879     <tr>   880       <td>simple1</td>   881       <td>simple_managed_map</td>   882       <td>MakeParallel, Map, manage</td>   883     </tr>   884     <tr>   885       <td rowspan="5">simple2</td>   886       <td>simple_managed_queue</td>   887       <td>MakeParallel, Queue, manage</td>   888     </tr>   889     <tr>   890       <td>simple_managed</td>   891       <td>MakeParallel, Exchange (subclass), manage, finish</td>   892     </tr>   893     <tr>   894       <td>simple_start</td>   895       <td>Channel, Exchange (subclass), start, finish</td>   896     </tr>   897     <tr>   898       <td>simple_background_queue</td>   899       <td>MakeParallel, BackgroundCallable, BackgroundQueue</td>   900     </tr>   901     <tr>   902       <td>simple_persistent_queue</td>   903       <td>MakeParallel, BackgroundCallable, PersistentQueue</td>   904     </tr>   905     <tr>   906       <td>simple</td>   907       <td>simple_create_map</td>   908       <td>Channel, Map, create, exit</td>   909     </tr>   910   </tbody>   911 </table>   912    913 </body>   914 </html>