pprocess

docs/tutorial.html

131:26a480cc2c4b
2007-11-24 paulb [project @ 2007-11-24 00:08:29 by paulb] Updated release notes.
     1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">     2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">     3 <head>     4   <meta http-equiv="content-type" content="text/html; charset=UTF-8" />     5   <title>pprocess - Tutorial</title>     6   <link href="styles.css" rel="stylesheet" type="text/css" />     7 </head>     8 <body>     9     10 <h1>pprocess - Tutorial</h1>    11     12 <p>The <code>pprocess</code> module provides several mechanisms for running    13 Python code concurrently in several processes. The most straightforward way of    14 making a program parallel-aware - that is, where the program can take    15 advantage of more than one processor to simultaneously process data - is to    16 use the <code>pmap</code> function.</p>    17     18 <h2>Converting Map-Style Code</h2>    19     20 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>    21     22 <pre>    23     t = time.time()    24     25     # Initialise an array.    26     27     sequence = []    28     for i in range(0, N):    29         for j in range(0, N):    30             sequence.append((i, j))    31     32     # Perform the work.    33     34     results = map(calculate, sequence)    35     36     # Show the results.    37     38     for i in range(0, N):    39         for result in results[i*N:i*N+N]:    40             print result,    41         print    42     43     print "Time taken:", time.time() - t</pre>    44     45 <p>(This code in context with <code>import</code> statements and functions is    46 found in the <code>examples/simple_map.py</code> file.)</p>    47     48 <p>The principal features of this program involve the preparation of an array    49 for input purposes, and the use of the <code>map</code> function to iterate    50 over the combinations of <code>i</code> and <code>j</code> in the array. Even    51 if the <code>calculate</code> function could be invoked independently for each    52 input value, we have to wait for each computation to complete before    53 initiating a new one. The <code>calculate</code> function may be defined as    54 follows:</p>    55     56 <pre>    57 def calculate(t):    58     59     "A supposedly time-consuming calculation on 't'."    60     61     i, j = t    62     time.sleep(delay)    63     return i * N + j    64 </pre>    65     66 <p>In order to reduce the processing time - to speed the code up, in other    67 words - we can make this code use several processes instead of just one. Here    68 is the modified code:</p>    69     70 <pre>    71     t = time.time()    72     73     # Initialise an array.    74     75     sequence = []    76     for i in range(0, N):    77         for j in range(0, N):    78             sequence.append((i, j))    79     80     # Perform the work.    81     82     results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)    83     84     # Show the results.    85     86     for i in range(0, N):    87         for result in results[i*N:i*N+N]:    88             print result,    89         print    90     91     print "Time taken:", time.time() - t</pre>    92     93 <p>(This code in context with <code>import</code> statements and functions is    94 found in the <code>examples/simple_pmap.py</code> file.)</p>    95     96 <p>By replacing usage of the <code>map</code> function with the    97 <code>pprocess.pmap</code> function, and specifying the limit on the number of    98 processes to be active at any given time (the value of the <code>limit</code>    99 variable is defined elsewhere), several calculations can now be performed in   100 parallel.</p>   101    102 <h2>Converting Invocations to Parallel Operations</h2>   103    104 <p>Although some programs make natural use of the <code>map</code> function,   105 others may employ an invocation in a nested loop. This may also be converted   106 to a parallel program. Consider the following Python code:</p>   107    108 <pre>   109     t = time.time()   110    111     # Initialise an array.   112    113     results = []   114    115     # Perform the work.   116    117     print "Calculating..."   118     for i in range(0, N):   119         for j in range(0, N):   120             results.append(calculate(i, j))   121    122     # Show the results.   123    124     for i in range(0, N):   125         for result in results[i*N:i*N+N]:   126             print result,   127         print   128    129     print "Time taken:", time.time() - t</pre>   130    131 <p>(This code in context with <code>import</code> statements and functions is   132 found in the <code>examples/simple1.py</code> file.)</p>   133    134 <p>Here, a computation in the <code>calculate</code> function is performed for   135 each combination of <code>i</code> and <code>j</code> in the nested loop,   136 returning a result value. However, we must wait for the completion of this   137 function for each element before moving on to the next element, and this means   138 that the computations are performed sequentially. Consequently, on a system   139 with more than one processor, even if we could call <code>calculate</code> for   140 more than one combination of <code>i</code> and <code>j</code><code></code>   141 and have the computations executing at the same time, the above program will   142 not take advantage of such capabilities.</p>   143    144 <p>We use a slightly modified version of <code>calculate</code> which employs   145 two parameters instead of one:</p>   146    147 <pre>   148 def calculate(i, j):   149    150     """   151     A supposedly time-consuming calculation on 'i' and 'j'.   152     """   153    154     time.sleep(delay)   155     return i * N + j   156 </pre>   157    158 <p>In order to reduce the processing time - to speed the code up, in other   159 words - we can make this code use several processes instead of just one. Here   160 is the modified code:</p>   161    162 <pre id="simple_managed_map">   163     t = time.time()   164    165     # Initialise the results using a map with a limit on the number of   166     # channels/processes.   167    168     <strong>results = pprocess.Map(limit=limit)</strong><code></code>   169    170     # Wrap the calculate function and manage it.   171    172     <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>   173    174     # Perform the work.   175    176     print "Calculating..."   177     for i in range(0, N):   178         for j in range(0, N):   179             <strong>calc</strong>(i, j)   180    181     # Show the results.   182    183     for i in range(0, N):   184         for result in results[i*N:i*N+N]:   185             print result,   186         print   187    188     print "Time taken:", time.time() - t</pre>   189    190 <p>(This code in context with <code>import</code> statements and functions is   191 found in the <code>examples/simple_managed_map.py</code> file.)</p>   192    193 <p>The principal changes in the above code involve the use of a   194 <code>pprocess.Map</code> object to collect the results, and a version of the   195 <code>calculate</code> function which is managed by the <code>Map</code>   196 object. What the <code>Map</code> object does is to arrange the results of   197 computations such that iterating over the object or accessing the object using   198 list operations provides the results in the same order as their corresponding   199 inputs.</p>   200    201 <h2>Converting Arbitrarily-Ordered Invocations</h2>   202    203 <p>In some programs, it is not important to receive the results of   204 computations in any particular order, usually because either the order of   205 these results is irrelevant, or because the results provide "positional"   206 information which let them be handled in an appropriate way. Consider the   207 following Python code:</p>   208    209 <pre>   210     t = time.time()   211    212     # Initialise an array.   213    214     results = [0] * N * N   215    216     # Perform the work.   217    218     print "Calculating..."   219     for i in range(0, N):   220         for j in range(0, N):   221             i2, j2, result = calculate(i, j)   222             results[i2*N+j2] = result   223    224     # Show the results.   225    226     for i in range(0, N):   227         for result in results[i*N:i*N+N]:   228             print result,   229         print   230    231     print "Time taken:", time.time() - t   232 </pre>   233    234 <p>(This code in context with <code>import</code> statements and functions is   235 found in the <code>examples/simple2.py</code> file.)</p>   236    237 <p>Here, a result array is initialised first and each computation is performed   238 sequentially. A significant difference to the previous examples is the return   239 value of the <code>calculate</code> function: the position details   240 corresponding to <code>i</code> and <code>j</code> are returned alongside the   241 result. Obviously, this is of limited value in the above code because the   242 order of the computations and the reception of results is fixed. However, we   243 get no benefit from parallelisation in the above example.</p>   244    245 <p>We can bring the benefits of parallel processing to the above program with   246 the following code:</p>   247    248 <pre>   249     t = time.time()   250    251     # Initialise the communications queue with a limit on the number of   252     # channels/processes.   253    254     <strong>queue = pprocess.Queue(limit=limit)</strong>   255    256     # Initialise an array.   257    258     results = [0] * N * N   259    260     # Wrap the calculate function and manage it.   261    262     <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>   263    264     # Perform the work.   265    266     print "Calculating..."   267     for i in range(0, N):   268         for j in range(0, N):   269             <strong>calc(i, j)</strong>   270    271     # Store the results as they arrive.   272    273     print "Finishing..."   274     <strong>for i, j, result in queue:</strong>   275         <strong>results[i*N+j] = result</strong>   276    277     # Show the results.   278    279     for i in range(0, N):   280         for result in results[i*N:i*N+N]:   281             print result,   282         print   283    284     print "Time taken:", time.time() - t   285 </pre>   286    287 <p>(This code in context with <code>import</code> statements and functions is   288 found in the <code>examples/simple_managed_queue.py</code> file.)</p>   289    290 <p>This revised code employs a <code>pprocess.Queue</code> object whose   291 purpose is to collect the results of computations and to make them available   292 in the order in which they were received. The code collecting results has been   293 moved into a separate loop independent of the original computation loop and   294 taking advantage of the more relevant "positional" information emerging from   295 the queue.</p>   296    297 <p>We can take this example further, illustrating some of the mechanisms   298 employed by <code>pprocess</code>. Instead of collecting results in a queue,   299 we can define a class containing a method which is called when new results   300 arrive:</p>   301    302 <pre>   303 class MyExchange(pprocess.Exchange):   304    305     "Parallel convenience class containing the array assignment operation."   306    307     def store_data(self, ch):   308         i, j, result = ch.receive()   309         self.D[i*N+j] = result   310 </pre>   311    312 <p>This code exposes the channel paradigm which is used throughout   313 <code>pprocess</code> and is available to applications, if desired. The effect   314 of the method is the storage of a result received through the channel in an   315 attribute of the object. The following code shows how this class can be used,   316 with differences to the previous program illustrated:</p>   317    318 <pre>   319     t = time.time()   320    321     # Initialise the communications exchange with a limit on the number of   322     # channels/processes.   323    324     <strong>exchange = MyExchange(limit=limit)</strong>   325    326     # Initialise an array - it is stored in the exchange to permit automatic   327     # assignment of values as the data arrives.   328    329     <strong>results = exchange.D = [0] * N * N</strong>   330    331     # Wrap the calculate function and manage it.   332    333     calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))   334    335     # Perform the work.   336    337     print "Calculating..."   338     for i in range(0, N):   339         for j in range(0, N):   340             calc(i, j)   341    342     # Wait for the results.   343    344     print "Finishing..."   345     <strong>exchange.finish()</strong>   346    347     # Show the results.   348    349     for i in range(0, N):   350         for result in results[i*N:i*N+N]:   351             print result,   352         print   353    354     print "Time taken:", time.time() - t   355 </pre>   356    357 <p>(This code in context with <code>import</code> statements and functions is   358 found in the <code>examples/simple_managed.py</code> file.)</p>   359    360 <p>The main visible differences between this and the previous program are the   361 storage of the result array in the exchange, the removal of the queue   362 consumption code from the main program, placing the act of storing values in   363 the exchange's <code>store_data</code> method, and the need to call the   364 <code>finish</code> method on the <code>MyExchange</code> object so that we do   365 not try and access the results too soon. One underlying benefit not visible in   366 the above code is that we no longer need to accumulate results in a queue or   367 other structure so that they may be processed and assigned to the correct   368 positions in the result array.</p>   369    370 <p>For the curious, we may remove some of the remaining conveniences of the   371 above program to expose other features of <code>pprocess</code>. First, we   372 define a slightly modified version of the <code>calculate</code> function:</p>   373    374 <pre>   375 def calculate(ch, i, j):   376    377     """   378     A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to   379     communicate with the parent process.   380     """   381    382     time.sleep(delay)   383     ch.send((i, j, i * N + j))   384 </pre>   385    386 <p>This function accepts a channel, <code>ch</code>, through which results   387 will be sent, and through which other values could potentially be received,   388 although we choose not to do so here. The program using this function is as   389 follows, with differences to the previous program illustrated:</p>   390    391 <pre>   392     t = time.time()   393    394     # Initialise the communications exchange with a limit on the number of   395     # channels/processes.   396    397     exchange = MyExchange(limit=limit)   398    399     # Initialise an array - it is stored in the exchange to permit automatic   400     # assignment of values as the data arrives.   401    402     results = exchange.D = [0] * N * N   403    404     # Perform the work.   405    406     print "Calculating..."   407     for i in range(0, N):   408         for j in range(0, N):   409             <strong>exchange.start(calculate, i, j)</strong>   410    411     # Wait for the results.   412    413     print "Finishing..."   414     exchange.finish()   415    416     # Show the results.   417    418     for i in range(0, N):   419         for result in results[i*N:i*N+N]:   420             print result,   421         print   422    423     print "Time taken:", time.time() - t   424 </pre>   425    426 <p>(This code in context with <code>import</code> statements and functions is   427 found in the <code>examples/simple_start.py</code> file.)</p>   428    429 <p>Here, we have discarded two conveniences: the wrapping of callables using   430 <code>MakeParallel</code>, which lets us use functions without providing any   431 channel parameters, and the management of callables using the   432 <code>manage</code> method on queues, exchanges, and so on. The   433 <code>start</code> method still calls the provided callable, but using a   434 different notation from that employed previously.</p>   435    436 <h2>Converting Inline Computations</h2>   437    438 <p>Although many programs employ functions and other useful abstractions which   439 can be treated as parallelisable units, some programs perform computations   440 "inline", meaning that the code responsible appears directly within a loop or   441 related control-flow construct. Consider the following code:</p>   442    443 <pre>   444     t = time.time()   445    446     # Initialise an array.   447    448     results = [0] * N * N   449    450     # Perform the work.   451    452     print "Calculating..."   453     for i in range(0, N):   454         for j in range(0, N):   455             time.sleep(delay)   456             results[i*N+j] = i * N + j   457    458     # Show the results.   459    460     for i in range(0, N):   461         for result in results[i*N:i*N+N]:   462             print result,   463         print   464    465     print "Time taken:", time.time() - t   466 </pre>   467    468 <p>(This code in context with <code>import</code> statements and functions is   469 found in the <code>examples/simple.py</code> file.)</p>   470    471 <p>To simulate "work", as in the different versions of the   472 <code>calculate</code> function, we use the <code>time.sleep</code> function   473 (which does not actually do work, and which will cause a process to be   474 descheduled in most cases, but which simulates the delay associated with work   475 being done). This inline work, which must be performed sequentially in the   476 above program, can be performed in parallel in a somewhat modified version of   477 the program:</p>   478    479 <pre>   480     t = time.time()   481    482     # Initialise the results using a map with a limit on the number of   483     # channels/processes.   484    485     <strong>results = pprocess.Map(limit=limit)</strong>   486    487     # Perform the work.   488     # NOTE: Could use the with statement in the loop to package the   489     # NOTE: try...finally functionality.   490    491     print "Calculating..."   492     for i in range(0, N):   493         for j in range(0, N):   494             <strong>ch = results.create()</strong>   495             <strong>if ch:</strong>   496                 <strong>try: # Calculation work.</strong>   497    498                     time.sleep(delay)   499                     <strong>ch.send(i * N + j)</strong>   500    501                 <strong>finally: # Important finalisation.</strong>   502    503                     <strong>pprocess.exit(ch)</strong>   504    505     # Show the results.   506    507     for i in range(0, N):   508         for result in results[i*N:i*N+N]:   509             print result,   510         print   511    512     print "Time taken:", time.time() - t   513 </pre>   514    515 <p>(This code in context with <code>import</code> statements and functions is   516 found in the <code>examples/simple_create_map.py</code> file.)</p>   517    518 <p>Although seemingly more complicated, the bulk of the changes in this   519 modified program are focused on obtaining a channel object, <code>ch</code>,   520 at the point where the computations are performed, and the wrapping of the   521 computation code in a <code>try</code>...<code>finally</code> statement which   522 ensures that the process associated with the channel exits when the   523 computation is complete. In order for the results of these computations to be   524 collected, a <code>pprocess.Map</code> object is used, since it will maintain   525 the results in the same order as the initiation of the computations which   526 produced them.</p>   527    528 <h2>Reusing Processes in Parallel Programs</h2>   529    530 <p>One notable aspect of the above programs when parallelised is that each   531 invocation of a computation in parallel creates a new process in which the   532 computation is to be performed, regardless of whether existing processes had   533 just finished producing results and could theoretically have been asked to   534 perform new computations. In other words, processes were created and destroyed   535 instead of being reused.</p>   536    537 <p>However, we can request that processes be reused for computations by   538 enabling the <code>reuse</code> feature of exchange-like objects and employing   539 suitable reusable callables. Consider this modified version of the <a   540 href="#simple_managed_map">simple_managed_map</a> program:</p>   541    542 <pre>   543     t = time.time()   544    545     # Initialise the results using a map with a limit on the number of   546     # channels/processes.   547    548     results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)   549    550     # Wrap the calculate function and manage it.   551    552     calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))   553    554     # Perform the work.   555    556     print "Calculating..."   557     for i in range(0, N):   558         for j in range(0, N):   559             calc(i, j)   560    561     # Show the results.   562    563     for i in range(0, N):   564         for result in results[i*N:i*N+N]:   565             print result,   566         print   567    568     print "Time taken:", time.time() - t   569 </pre>   570    571 <p>(This code in context with <code>import</code> statements and functions is   572 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>   573    574 <p>By indicating that processes and channels shall be reused, and by wrapping   575 the <code>calculate</code> function with the necessary support, the   576 computations may be performed in parallel using a pool of processes instead of   577 creating a new process for each computation and then discarding it, only to   578 create a new process for the next computation.</p>   579    580 <h2>Summary</h2>   581    582 <p>The following table indicates the features used in converting one   583 sequential example program to another parallel program:</p>   584    585 <table border="1" cellspacing="0" cellpadding="5">   586   <thead>   587     <tr>   588       <th>Sequential Example</th>   589       <th>Parallel Example</th>   590       <th>Features Used</th>   591     </tr>   592   </thead>   593   <tbody>   594     <tr>   595       <td>simple_map</td>   596       <td>simple_pmap</td>   597       <td>pmap</td>   598     </tr>   599     <tr>   600       <td>simple1</td>   601       <td>simple_managed_map</td>   602       <td>MakeParallel, Map, manage</td>   603     </tr>   604     <tr>   605       <td rowspan="3">simple2</td>   606       <td>simple_managed_queue</td>   607       <td>MakeParallel, Queue, manage</td>   608     </tr>   609     <tr>   610       <td>simple_managed</td>   611       <td>MakeParallel, Exchange (subclass), manage, finish</td>   612     </tr>   613     <tr>   614       <td>simple_start</td>   615       <td>Channel, Exchange (subclass), start, finish</td>   616     </tr>   617     <tr>   618       <td>simple</td>   619       <td>simple_create_map</td>   620       <td>Channel, Map, create, exit</td>   621     </tr>   622   </tbody>   623 </table>   624    625 </body>   626 </html>