1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Tutorial</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Tutorial</h1> 11 12 <p>The <code>pprocess</code> module provides several mechanisms for running 13 Python code concurrently in several processes. The most straightforward way of 14 making a program parallel-aware - that is, where the program can take 15 advantage of more than one processor to simultaneously process data - is to 16 use the <code>pmap</code> function.</p> 17 18 <h2>Converting Map-Style Code</h2> 19 20 <p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p> 21 22 <pre> 23 t = time.time() 24 25 # Initialise an array. 26 27 sequence = [] 28 for i in range(0, N): 29 for j in range(0, N): 30 sequence.append((i, j)) 31 32 # Perform the work. 33 34 results = map(calculate, sequence) 35 36 # Show the results. 37 38 for i in range(0, N): 39 for result in results[i*N:i*N+N]: 40 print result, 41 print 42 43 print "Time taken:", time.time() - t</pre> 44 45 <p>(This code in context with <code>import</code> statements and functions is 46 found in the <code>examples/simple_map.py</code> file.)</p> 47 48 <p>The principal features of this program involve the preparation of an array 49 for input purposes, and the use of the <code>map</code> function to iterate 50 over the combinations of <code>i</code> and <code>j</code> in the array. Even 51 if the <code>calculate</code> function could be invoked independently for each 52 input value, we have to wait for each computation to complete before 53 initiating a new one. The <code>calculate</code> function may be defined as 54 follows:</p> 55 56 <pre> 57 def calculate(t): 58 59 "A supposedly time-consuming calculation on 't'." 60 61 i, j = t 62 time.sleep(delay) 63 return i * N + j 64 </pre> 65 66 <p>In order to reduce the processing time - to speed the code up, in other 67 words - we can make this code use several processes instead of just one. Here 68 is the modified code:</p> 69 70 <pre> 71 t = time.time() 72 73 # Initialise an array. 74 75 sequence = [] 76 for i in range(0, N): 77 for j in range(0, N): 78 sequence.append((i, j)) 79 80 # Perform the work. 81 82 results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>) 83 84 # Show the results. 85 86 for i in range(0, N): 87 for result in results[i*N:i*N+N]: 88 print result, 89 print 90 91 print "Time taken:", time.time() - t</pre> 92 93 <p>(This code in context with <code>import</code> statements and functions is 94 found in the <code>examples/simple_pmap.py</code> file.)</p> 95 96 <p>By replacing usage of the <code>map</code> function with the 97 <code>pprocess.pmap</code> function, and specifying the limit on the number of 98 processes to be active at any given time (the value of the <code>limit</code> 99 variable is defined elsewhere), several calculations can now be performed in 100 parallel.</p> 101 102 <h2>Converting Invocations to Parallel Operations</h2> 103 104 <p>Although some programs make natural use of the <code>map</code> function, 105 others may employ an invocation in a nested loop. This may also be converted 106 to a parallel program. Consider the following Python code:</p> 107 108 <pre> 109 t = time.time() 110 111 # Initialise an array. 112 113 results = [] 114 115 # Perform the work. 116 117 print "Calculating..." 118 for i in range(0, N): 119 for j in range(0, N): 120 results.append(calculate(i, j)) 121 122 # Show the results. 123 124 for i in range(0, N): 125 for result in results[i*N:i*N+N]: 126 print result, 127 print 128 129 print "Time taken:", time.time() - t</pre> 130 131 <p>(This code in context with <code>import</code> statements and functions is 132 found in the <code>examples/simple1.py</code> file.)</p> 133 134 <p>Here, a computation in the <code>calculate</code> function is performed for 135 each combination of <code>i</code> and <code>j</code> in the nested loop, 136 returning a result value. However, we must wait for the completion of this 137 function for each element before moving on to the next element, and this means 138 that the computations are performed sequentially. Consequently, on a system 139 with more than one processor, even if we could call <code>calculate</code> for 140 more than one combination of <code>i</code> and <code>j</code><code></code> 141 and have the computations executing at the same time, the above program will 142 not take advantage of such capabilities.</p> 143 144 <p>We use a slightly modified version of <code>calculate</code> which employs 145 two parameters instead of one:</p> 146 147 <pre> 148 def calculate(i, j): 149 150 """ 151 A supposedly time-consuming calculation on 'i' and 'j'. 152 """ 153 154 time.sleep(delay) 155 return i * N + j 156 </pre> 157 158 <p>In order to reduce the processing time - to speed the code up, in other 159 words - we can make this code use several processes instead of just one. Here 160 is the modified code:</p> 161 162 <pre id="simple_managed_map"> 163 t = time.time() 164 165 # Initialise the results using a map with a limit on the number of 166 # channels/processes. 167 168 <strong>results = pprocess.Map(limit=limit)</strong><code></code> 169 170 # Wrap the calculate function and manage it. 171 172 <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong> 173 174 # Perform the work. 175 176 print "Calculating..." 177 for i in range(0, N): 178 for j in range(0, N): 179 <strong>calc</strong>(i, j) 180 181 # Show the results. 182 183 for i in range(0, N): 184 for result in results[i*N:i*N+N]: 185 print result, 186 print 187 188 print "Time taken:", time.time() - t</pre> 189 190 <p>(This code in context with <code>import</code> statements and functions is 191 found in the <code>examples/simple_managed_map.py</code> file.)</p> 192 193 <p>The principal changes in the above code involve the use of a 194 <code>pprocess.Map</code> object to collect the results, and a version of the 195 <code>calculate</code> function which is managed by the <code>Map</code> 196 object. What the <code>Map</code> object does is to arrange the results of 197 computations such that iterating over the object or accessing the object using 198 list operations provides the results in the same order as their corresponding 199 inputs.</p> 200 201 <h2>Converting Arbitrarily-Ordered Invocations</h2> 202 203 <p>In some programs, it is not important to receive the results of 204 computations in any particular order, usually because either the order of 205 these results is irrelevant, or because the results provide "positional" 206 information which let them be handled in an appropriate way. Consider the 207 following Python code:</p> 208 209 <pre> 210 t = time.time() 211 212 # Initialise an array. 213 214 results = [0] * N * N 215 216 # Perform the work. 217 218 print "Calculating..." 219 for i in range(0, N): 220 for j in range(0, N): 221 i2, j2, result = calculate(i, j) 222 results[i2*N+j2] = result 223 224 # Show the results. 225 226 for i in range(0, N): 227 for result in results[i*N:i*N+N]: 228 print result, 229 print 230 231 print "Time taken:", time.time() - t 232 </pre> 233 234 <p>(This code in context with <code>import</code> statements and functions is 235 found in the <code>examples/simple2.py</code> file.)</p> 236 237 <p>Here, a result array is initialised first and each computation is performed 238 sequentially. A significant difference to the previous examples is the return 239 value of the <code>calculate</code> function: the position details 240 corresponding to <code>i</code> and <code>j</code> are returned alongside the 241 result. Obviously, this is of limited value in the above code because the 242 order of the computations and the reception of results is fixed. However, we 243 get no benefit from parallelisation in the above example.</p> 244 245 <p>We can bring the benefits of parallel processing to the above program with 246 the following code:</p> 247 248 <pre> 249 t = time.time() 250 251 # Initialise the communications queue with a limit on the number of 252 # channels/processes. 253 254 <strong>queue = pprocess.Queue(limit=limit)</strong> 255 256 # Initialise an array. 257 258 results = [0] * N * N 259 260 # Wrap the calculate function and manage it. 261 262 <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong> 263 264 # Perform the work. 265 266 print "Calculating..." 267 for i in range(0, N): 268 for j in range(0, N): 269 <strong>calc(i, j)</strong> 270 271 # Store the results as they arrive. 272 273 print "Finishing..." 274 <strong>for i, j, result in queue:</strong> 275 <strong>results[i*N+j] = result</strong> 276 277 # Show the results. 278 279 for i in range(0, N): 280 for result in results[i*N:i*N+N]: 281 print result, 282 print 283 284 print "Time taken:", time.time() - t 285 </pre> 286 287 <p>(This code in context with <code>import</code> statements and functions is 288 found in the <code>examples/simple_managed_queue.py</code> file.)</p> 289 290 <p>This revised code employs a <code>pprocess.Queue</code> object whose 291 purpose is to collect the results of computations and to make them available 292 in the order in which they were received. The code collecting results has been 293 moved into a separate loop independent of the original computation loop and 294 taking advantage of the more relevant "positional" information emerging from 295 the queue.</p> 296 297 <p>We can take this example further, illustrating some of the mechanisms 298 employed by <code>pprocess</code>. Instead of collecting results in a queue, 299 we can define a class containing a method which is called when new results 300 arrive:</p> 301 302 <pre> 303 class MyExchange(pprocess.Exchange): 304 305 "Parallel convenience class containing the array assignment operation." 306 307 def store_data(self, ch): 308 i, j, result = ch.receive() 309 self.D[i*N+j] = result 310 </pre> 311 312 <p>This code exposes the channel paradigm which is used throughout 313 <code>pprocess</code> and is available to applications, if desired. The effect 314 of the method is the storage of a result received through the channel in an 315 attribute of the object. The following code shows how this class can be used, 316 with differences to the previous program illustrated:</p> 317 318 <pre> 319 t = time.time() 320 321 # Initialise the communications exchange with a limit on the number of 322 # channels/processes. 323 324 <strong>exchange = MyExchange(limit=limit)</strong> 325 326 # Initialise an array - it is stored in the exchange to permit automatic 327 # assignment of values as the data arrives. 328 329 <strong>results = exchange.D = [0] * N * N</strong> 330 331 # Wrap the calculate function and manage it. 332 333 calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate)) 334 335 # Perform the work. 336 337 print "Calculating..." 338 for i in range(0, N): 339 for j in range(0, N): 340 calc(i, j) 341 342 # Wait for the results. 343 344 print "Finishing..." 345 <strong>exchange.finish()</strong> 346 347 # Show the results. 348 349 for i in range(0, N): 350 for result in results[i*N:i*N+N]: 351 print result, 352 print 353 354 print "Time taken:", time.time() - t 355 </pre> 356 357 <p>(This code in context with <code>import</code> statements and functions is 358 found in the <code>examples/simple_managed.py</code> file.)</p> 359 360 <p>The main visible differences between this and the previous program are the 361 storage of the result array in the exchange, the removal of the queue 362 consumption code from the main program, placing the act of storing values in 363 the exchange's <code>store_data</code> method, and the need to call the 364 <code>finish</code> method on the <code>MyExchange</code> object so that we do 365 not try and access the results too soon. One underlying benefit not visible in 366 the above code is that we no longer need to accumulate results in a queue or 367 other structure so that they may be processed and assigned to the correct 368 positions in the result array.</p> 369 370 <p>For the curious, we may remove some of the remaining conveniences of the 371 above program to expose other features of <code>pprocess</code>. First, we 372 define a slightly modified version of the <code>calculate</code> function:</p> 373 374 <pre> 375 def calculate(ch, i, j): 376 377 """ 378 A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to 379 communicate with the parent process. 380 """ 381 382 time.sleep(delay) 383 ch.send((i, j, i * N + j)) 384 </pre> 385 386 <p>This function accepts a channel, <code>ch</code>, through which results 387 will be sent, and through which other values could potentially be received, 388 although we choose not to do so here. The program using this function is as 389 follows, with differences to the previous program illustrated:</p> 390 391 <pre> 392 t = time.time() 393 394 # Initialise the communications exchange with a limit on the number of 395 # channels/processes. 396 397 exchange = MyExchange(limit=limit) 398 399 # Initialise an array - it is stored in the exchange to permit automatic 400 # assignment of values as the data arrives. 401 402 results = exchange.D = [0] * N * N 403 404 # Perform the work. 405 406 print "Calculating..." 407 for i in range(0, N): 408 for j in range(0, N): 409 <strong>exchange.start(calculate, i, j)</strong> 410 411 # Wait for the results. 412 413 print "Finishing..." 414 exchange.finish() 415 416 # Show the results. 417 418 for i in range(0, N): 419 for result in results[i*N:i*N+N]: 420 print result, 421 print 422 423 print "Time taken:", time.time() - t 424 </pre> 425 426 <p>(This code in context with <code>import</code> statements and functions is 427 found in the <code>examples/simple_start.py</code> file.)</p> 428 429 <p>Here, we have discarded two conveniences: the wrapping of callables using 430 <code>MakeParallel</code>, which lets us use functions without providing any 431 channel parameters, and the management of callables using the 432 <code>manage</code> method on queues, exchanges, and so on. The 433 <code>start</code> method still calls the provided callable, but using a 434 different notation from that employed previously.</p> 435 436 <h2>Converting Inline Computations</h2> 437 438 <p>Although many programs employ functions and other useful abstractions which 439 can be treated as parallelisable units, some programs perform computations 440 "inline", meaning that the code responsible appears directly within a loop or 441 related control-flow construct. Consider the following code:</p> 442 443 <pre> 444 t = time.time() 445 446 # Initialise an array. 447 448 results = [0] * N * N 449 450 # Perform the work. 451 452 print "Calculating..." 453 for i in range(0, N): 454 for j in range(0, N): 455 time.sleep(delay) 456 results[i*N+j] = i * N + j 457 458 # Show the results. 459 460 for i in range(0, N): 461 for result in results[i*N:i*N+N]: 462 print result, 463 print 464 465 print "Time taken:", time.time() - t 466 </pre> 467 468 <p>(This code in context with <code>import</code> statements and functions is 469 found in the <code>examples/simple.py</code> file.)</p> 470 471 <p>To simulate "work", as in the different versions of the 472 <code>calculate</code> function, we use the <code>time.sleep</code> function 473 (which does not actually do work, and which will cause a process to be 474 descheduled in most cases, but which simulates the delay associated with work 475 being done). This inline work, which must be performed sequentially in the 476 above program, can be performed in parallel in a somewhat modified version of 477 the program:</p> 478 479 <pre> 480 t = time.time() 481 482 # Initialise the results using a map with a limit on the number of 483 # channels/processes. 484 485 <strong>results = pprocess.Map(limit=limit)</strong> 486 487 # Perform the work. 488 # NOTE: Could use the with statement in the loop to package the 489 # NOTE: try...finally functionality. 490 491 print "Calculating..." 492 for i in range(0, N): 493 for j in range(0, N): 494 <strong>ch = results.create()</strong> 495 <strong>if ch:</strong> 496 <strong>try: # Calculation work.</strong> 497 498 time.sleep(delay) 499 <strong>ch.send(i * N + j)</strong> 500 501 <strong>finally: # Important finalisation.</strong> 502 503 <strong>pprocess.exit(ch)</strong> 504 505 # Show the results. 506 507 for i in range(0, N): 508 for result in results[i*N:i*N+N]: 509 print result, 510 print 511 512 print "Time taken:", time.time() - t 513 </pre> 514 515 <p>(This code in context with <code>import</code> statements and functions is 516 found in the <code>examples/simple_create_map.py</code> file.)</p> 517 518 <p>Although seemingly more complicated, the bulk of the changes in this 519 modified program are focused on obtaining a channel object, <code>ch</code>, 520 at the point where the computations are performed, and the wrapping of the 521 computation code in a <code>try</code>...<code>finally</code> statement which 522 ensures that the process associated with the channel exits when the 523 computation is complete. In order for the results of these computations to be 524 collected, a <code>pprocess.Map</code> object is used, since it will maintain 525 the results in the same order as the initiation of the computations which 526 produced them.</p> 527 528 <h2>Reusing Processes in Parallel Programs</h2> 529 530 <p>One notable aspect of the above programs when parallelised is that each 531 invocation of a computation in parallel creates a new process in which the 532 computation is to be performed, regardless of whether existing processes had 533 just finished producing results and could theoretically have been asked to 534 perform new computations. In other words, processes were created and destroyed 535 instead of being reused.</p> 536 537 <p>However, we can request that processes be reused for computations by 538 enabling the <code>reuse</code> feature of exchange-like objects and employing 539 suitable reusable callables. Consider this modified version of the <a 540 href="#simple_managed_map">simple_managed_map</a> program:</p> 541 542 <pre> 543 t = time.time() 544 545 # Initialise the results using a map with a limit on the number of 546 # channels/processes. 547 548 results = pprocess.Map(limit=limit<strong>, reuse=1</strong>) 549 550 # Wrap the calculate function and manage it. 551 552 calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate)) 553 554 # Perform the work. 555 556 print "Calculating..." 557 for i in range(0, N): 558 for j in range(0, N): 559 calc(i, j) 560 561 # Show the results. 562 563 for i in range(0, N): 564 for result in results[i*N:i*N+N]: 565 print result, 566 print 567 568 print "Time taken:", time.time() - t 569 </pre> 570 571 <p>(This code in context with <code>import</code> statements and functions is 572 found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p> 573 574 <p>By indicating that processes and channels shall be reused, and by wrapping 575 the <code>calculate</code> function with the necessary support, the 576 computations may be performed in parallel using a pool of processes instead of 577 creating a new process for each computation and then discarding it, only to 578 create a new process for the next computation.</p> 579 580 <h2>Summary</h2> 581 582 <p>The following table indicates the features used in converting one 583 sequential example program to another parallel program:</p> 584 585 <table border="1" cellspacing="0" cellpadding="5"> 586 <thead> 587 <tr> 588 <th>Sequential Example</th> 589 <th>Parallel Example</th> 590 <th>Features Used</th> 591 </tr> 592 </thead> 593 <tbody> 594 <tr> 595 <td>simple_map</td> 596 <td>simple_pmap</td> 597 <td>pmap</td> 598 </tr> 599 <tr> 600 <td>simple1</td> 601 <td>simple_managed_map</td> 602 <td>MakeParallel, Map, manage</td> 603 </tr> 604 <tr> 605 <td rowspan="3">simple2</td> 606 <td>simple_managed_queue</td> 607 <td>MakeParallel, Queue, manage</td> 608 </tr> 609 <tr> 610 <td>simple_managed</td> 611 <td>MakeParallel, Exchange (subclass), manage, finish</td> 612 </tr> 613 <tr> 614 <td>simple_start</td> 615 <td>Channel, Exchange (subclass), start, finish</td> 616 </tr> 617 <tr> 618 <td>simple</td> 619 <td>simple_create_map</td> 620 <td>Channel, Map, create, exit</td> 621 </tr> 622 </tbody> 623 </table> 624 625 </body> 626 </html>