1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Reference</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Reference</h1> 11 12 <p>The <code>pprocess</code> module defines a simple parallel processing API 13 for Python, inspired somewhat by the <code>thread</code> module, slightly less 14 by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly 15 less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p> 16 17 <p>This document complements the <a href="tutorial.html">tutorial</a> by 18 providing an overview of the different styles of parallel programming supported 19 by the module. For an introduction and in order to get a clearer idea of the 20 most suitable styles for your own programs, consult the 21 <a href="tutorial.html">tutorial</a>.</p> 22 23 <ul> 24 <li><a href="#Thread">Thread-style Processing</a></li> 25 <li><a href="#Fork">Fork-style Processing</a></li> 26 <li><a href="#Exchanges">Message Exchanges</a></li> 27 <li><a href="#Convenient">Convenient Message Exchanges</a></li> 28 <li><a href="#Queues">Exchanges as Queues</a></li> 29 <li><a href="#Maps">Exchanges as Maps</a></li> 30 <li><a href="#Managed">Managed Callables</a></li> 31 <li><a href="#MakeParallel">Making Existing Functions Parallel</a></li> 32 <li><a href="#Map">Map-style Processing</a></li> 33 <li><a href="#Reusing">Reusing Processes and Channels</a></li> 34 <li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li> 35 <li><a href="#Continuous">Continuous Processes and Channels</a></li> 36 <li><a href="#BackgroundCallable">Background Processes and Callables</a></li> 37 <li><a href="#PersistentQueue">Background and Persistent Queues</a></li> 38 <li><a href="#Implementation">Implementation Notes</a></li> 39 </ul> 40 41 <h2 id="Thread">Thread-style Processing</h2> 42 43 <p>To create new processes to run a function or any callable object, specify the 44 "callable" and any arguments as follows:</p> 45 46 <pre> 47 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 48 </pre> 49 50 <p>This returns a channel which can then be used to communicate with the created 51 process. Meanwhile, in the created process, the given callable will be invoked 52 with another channel as its first argument followed by the specified arguments:</p> 53 54 <pre> 55 def fn(channel, arg1, arg2, named1, named2): 56 # Read from and write to the channel. 57 # Return value is ignored. 58 ... 59 </pre> 60 61 <h2 id="Fork">Fork-style Processing</h2> 62 63 <p>To create new processes in a similar way to that employed when using <code>os.fork</code> 64 (ie. the <code>fork</code> system call on various operating systems), use the following 65 method:</p> 66 67 <pre> 68 channel = pprocess.create() 69 if channel.pid == 0: 70 # This code is run by the created process. 71 # Read from and write to the channel to communicate with the 72 # creating/calling process. 73 # An explicit exit of the process may be desirable to prevent the process 74 # from running code which is intended for the creating/calling process. 75 ... 76 pprocess.exit(channel) 77 else: 78 # This code is run by the creating/calling process. 79 # Read from and write to the channel to communicate with the created 80 # process. 81 ... 82 </pre> 83 84 <h2 id="Exchanges">Message Exchanges</h2> 85 86 <p>When creating many processes, each providing results for the consumption of the 87 main process, the collection of those results in an efficient fashion can be 88 problematic: if some processes take longer than others, and if we decide to read 89 from those processes when they are not ready instead of other processes which 90 are ready, the whole activity will take much longer than necessary.</p> 91 92 <p>One solution to the problem of knowing when to read from channels is to create 93 an <code>Exchange</code> object, optionally initialising it with a list of channels 94 through which data is expected to arrive:</p> 95 96 <pre> 97 exchange = pprocess.Exchange() # populate the exchange later 98 exchange = pprocess.Exchange(channels) # populate the exchange with channels 99 </pre> 100 101 <p>We can add channels to the exchange using the <code>add</code> method:</p> 102 103 <pre> 104 exchange.add(channel) 105 </pre> 106 107 <p>To test whether an exchange is active - that is, whether it is actually 108 monitoring any channels - we can use the <code>active</code> method which 109 returns all channels being monitored by the exchange:</p> 110 111 <pre> 112 channels = exchange.active() 113 </pre> 114 115 <p>We may then check the exchange to see whether any data is ready to be received; 116 for example:</p> 117 118 <pre> 119 for channel in exchange.ready(): 120 # Read from and write to the channel. 121 ... 122 </pre> 123 124 <p>If we do not wish to wait indefinitely for a list of channels, we can set a 125 timeout value as an argument to the <code>ready</code> method (as a floating 126 point number specifying the timeout in seconds, where <code>0</code> means a 127 non-blocking poll as stated in the <code>select</code> module's <code>select</code> 128 function documentation).</p> 129 130 <h2 id="Convenient">Convenient Message Exchanges</h2> 131 132 <p>A convenient form of message exchanges can be adopted by defining a subclass of 133 the <code>Exchange</code> class and defining a particular method:</p> 134 135 <pre> 136 class MyExchange(pprocess.Exchange): 137 def store_data(self, channel): 138 data = channel.receive() 139 # Do something with data here. 140 </pre> 141 142 <p>The exact operations performed on the received data might be as simple as 143 storing it on an instance attribute. To make use of the exchange, we would 144 instantiate it as usual:</p> 145 146 <pre> 147 exchange = MyExchange() # populate the exchange later 148 exchange = MyExchange(limit=10) # set a limit for later population 149 </pre> 150 151 <p>The exchange can now be used in a simpler fashion than that shown above. We can 152 add channels as before using the <code>add</code> method, or we can choose to only 153 add channels if the specified limit of channels is not exceeded:</p> 154 155 <pre> 156 exchange.add(channel) # add a channel as normal 157 exchange.add_wait(channel) # add a channel, waiting if the limit would be 158 # exceeded 159 </pre> 160 161 <p>Or we can request that the exchange create a channel on our behalf:</p> 162 163 <pre> 164 channel = exchange.create() 165 </pre> 166 167 <p>We can even start processes and monitor channels without ever handling the 168 channel ourselves:</p> 169 170 <pre> 171 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 172 </pre> 173 174 <p>We can explicitly wait for "free space" for channels by calling the 175 <code>wait</code> method, although the <code>start</code> and <code>add_wait</code> 176 methods make this less interesting:</p> 177 178 <pre> 179 exchange.wait() 180 </pre> 181 182 <p>Finally, when finishing the computation, we can choose to merely call the 183 <code>finish</code> method and have the remaining data processed automatically:</p> 184 185 <pre> 186 exchange.finish() 187 </pre> 188 189 <p>Clearly, this approach is less flexible but more convenient than the raw message 190 exchange API as described above. However, it permits much simpler and clearer 191 code.</p> 192 193 <h2 id="Queues">Exchanges as Queues</h2> 194 195 <p>Instead of having to subclass the <code>pprocess.Exchange</code> class and 196 to define the <code>store_data</code> method, it might be more desirable to let 197 the exchange manage the communications between created and creating processes 198 and to let the creating process just consume received data as it arrives, 199 without particular regard for the order of the received data - perhaps the 200 creating process has its own way of managing such issues.</p> 201 202 <p>For such situations, the <code>Queue</code> class may be instantiated and 203 channels added to the queue using the various methods provided:</p> 204 205 <pre> 206 queue = pprocess.Queue(limit=10) 207 channel = queue.create() 208 if channel: 209 # Do some computation. 210 pprocess.exit(channel) 211 </pre> 212 213 <p>The results can then be consumed by treating the queue like an iterator:</p> 214 215 <pre> 216 for result in queue: 217 # Capture each result. 218 </pre> 219 220 <p>This approach does not, of course, require the direct handling of channels. 221 One could instead use the <code>start</code> method on the queue to create 222 processes and to initiate computations (since a queue is merely an enhanced 223 exchange with a specific implementation of the <code>store_data</code> 224 method).</p> 225 226 <h2 id="Maps">Exchanges as Maps</h2> 227 228 <p>Where the above <code>Queue</code> class appears like an attractive solution 229 for the management of the results of computations, but where the order of their 230 consumption by the creating process remains important, the <code>Map</code> 231 class may offer a suitable way of collecting and accessing results:</p> 232 233 <pre> 234 results = pprocess.Map(limit=10) 235 for value in inputs: 236 results.start(fn, args) 237 </pre> 238 239 <p>The results can then be consumed in an order corresponding to the order of the 240 computations which produced them:</p> 241 242 <pre> 243 for result in results: 244 # Process each result. 245 </pre> 246 247 <p>Internally, the <code>Map</code> object records a particular ordering of 248 channels, ensuring that the received results can be mapped to this ordering, 249 and that the results can be made available with this ordering preserved.</p> 250 251 <h2 id="Managed">Managed Callables</h2> 252 253 <p>A further simplification of the above convenient use of message exchanges 254 involves the creation of callables (eg. functions) which are automatically 255 monitored by an exchange. We create such a callable by calling the 256 <code>manage</code> method on an exchange:</p> 257 258 <pre> 259 myfn = exchange.manage(fn) 260 </pre> 261 262 <p>This callable can then be invoked instead of using the exchange's 263 <code>start</code> method:</p> 264 265 <pre> 266 myfn(arg1, arg2, named1=value1, named2=value2) 267 </pre> 268 269 <p>The exchange's <code>finish</code> method can be used as usual to process 270 incoming data.</p> 271 272 <h2 id="MakeParallel">Making Existing Functions Parallel</h2> 273 274 <p>In making a program parallel, existing functions which only return results can 275 be manually modified to accept and use channels to communicate results back to 276 the main process. However, a simple alternative is to use the <code>MakeParallel</code> 277 class to provide a wrapper around unmodified functions which will return the results 278 from those functions in the channels provided. For example:</p> 279 280 <pre> 281 fn = pprocess.MakeParallel(originalfn) 282 </pre> 283 284 <h2 id="Map">Map-style Processing</h2> 285 286 <p>In situations where a callable would normally be used in conjunction with the 287 Python built-in <code>map</code> function, an alternative solution can be adopted by using 288 the <code>pmap</code> function:</p> 289 290 <pre> 291 pprocess.pmap(fn, sequence) 292 </pre> 293 294 <p>Here, the sequence would have to contain elements that each contain the 295 required parameters of the specified callable, <code>fn</code>. Note that the 296 callable does not need to be a parallel-aware function which has a 297 <code>channel</code> argument: the <code>pmap</code> function automatically 298 wraps the given callable internally.</p> 299 300 <h2 id="Reusing">Reusing Processes and Channels</h2> 301 302 <p>So far, all parallel computations have been done with newly-created 303 processes. However, this can seem somewhat inefficient, especially if processes 304 are being continually created and destroyed (although if this happens too 305 often, the amount of work done by each process may be too little, anyway). One 306 solution is to retain processes after they have done their work and request 307 that they perform more work for each new parallel task or invocation. To enable 308 the reuse of processes in this way, a special keyword argument may be specified 309 when creating <code>Exchange</code> instances (and instances of subclasses such 310 as <code>Map</code> and <code>Queue</code>). For example:</p> 311 312 <pre> 313 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes 314 </pre> 315 316 <p>Code invoked through such exchanges must be aware of channels and be 317 constructed in such a way that it does not terminate after sending a result 318 back to the creating process. Instead, it should repeatedly wait for subsequent 319 sets of parameters (compatible with those either in the signature of a callable 320 or with the original values read from the channel). Reusable code is terminated 321 when the special value of <code>None</code> is sent from the creating process 322 to the created process, indicating that no more parameters will be sent; this 323 should cause the code to terminate.</p> 324 325 <h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2> 326 327 <p>An easier way of making reusable code sections for parallel use is to employ the 328 <code>MakeReusable</code> class to wrap an existing callable:</p> 329 330 <pre> 331 fn = pprocess.MakeReusable(originalfn) 332 </pre> 333 334 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but 335 provides the necessary mechanisms described above for reusable code.</p> 336 337 <h2 id="Continuous">Continuous Processes and Channels</h2> 338 339 <p>Much of the usage of exchanges so far has concentrated on processes which 340 are created, whose callables are invoked, and then, once those callables have 341 returned, either they are invoked again in the same process (when reused) or 342 in a new process (when not reused). However, the underlying mechanisms 343 actually support processes whose callables not only receive input at the start 344 of their execution and send output at the end of their execution, but may 345 provide output on a continuous basis (similar to iterator or generator 346 objects).</p> 347 348 <p>To enable support for continuous communications between processes, a 349 keyword argument must be specified when creating an <code>Exchange</code> 350 instance (or an instance of a subclass of <code>Exchange</code> such as 351 <code>Map</code> or <code>Queue</code>):</p> 352 353 <pre> 354 exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes 355 </pre> 356 357 <p>Code invoked in this mode of communication must be aware of channels, since 358 it will need to explicitly send data via a channel to the creating process, 359 instead of terminating and sending data only once (as would be done 360 automatically using convenience classes such as 361 <code>MakeParallel</code>).</p> 362 363 <h2 id="BackgroundCallable">Background Processes and Callables</h2> 364 365 <p>So far, all parallel computations have involved created processes which 366 depend on the existence of the created process to collect results and to 367 communicate with these created processes, preventing the created process from 368 terminating, even if the created processes actually perform work and potentially 369 create output which need not concern the process which created them. In order to 370 separate creating and created processes, the concept of a background process 371 (also known as a daemon process) is introduced.</p> 372 373 <p>The <code>BackgroundCallable</code> class acts somewhat like the 374 <code>manage</code> method on exchange-based objects, although no exchange is 375 immediately involved, and instances of <code>BackgroundCallable</code> provide 376 wrappers around existing parallel-aware callables which then be invoked in order 377 to initiate a background computation in a created process. For example:</p> 378 379 <pre> 380 backgroundfn = pprocess.BackgroundCallable(address, fn) 381 </pre> 382 383 <p>This wraps the supplied callable (which can itself be the result of using 384 <code>MakeParallel</code>), with the resulting wrapper lending itself to 385 invocation like any other function. One distinguishing feature is that of the 386 <code>address</code>: in order to contact the background process after 387 invocation to (amongst other things) receive any result, a specific address 388 must be given to define the contact point between the created process and any 389 processes seeking to connect to it. Since these "persistent" communications 390 employ special files (specifically UNIX-domain sockets), the address must be a 391 suitable filename.</p> 392 393 <h2 id="PersistentQueue">Background and Persistent Queues</h2> 394 395 <p>Background processes employing persistent communications require adaptations 396 of the facilities described in the sections above. For a single background 397 process, the <code>BackgroundQueue</code> function is sufficient to create a 398 queue-like object which can monitor the communications channel between the 399 connecting process and a background process. For example:</p> 400 401 <pre> 402 queue = pprocess.BackgroundQueue(address) 403 </pre> 404 405 <p>This code will cause the process reachable via the given <code>address</code> 406 to be contacted and any results made available via the created queue-like 407 object.</p> 408 409 <p>Where many background processes have been created, a single 410 <code>PersistentQueue</code> object can monitor their communications by being 411 connected to them all, as in the following example:</p> 412 413 <pre> 414 queue = pprocess.PersistentQueue() 415 for address in addresses: 416 queue.connect(address) 417 </pre> 418 419 <p>Here, the queue monitors all previously created processes whose addresses 420 reside in the <code>addresses</code> sequence. Upon iterating over the queue, 421 results will be taken from whichever process happens to have data available in 422 no particular pre-defined order.</p> 423 424 <h2 id="Implementation">Implementation Notes</h2> 425 426 <h3>Signals and Waiting</h3> 427 428 <p>When created/child processes terminate, one would typically want to be informed 429 of such conditions using a signal handler. Unfortunately, Python seems to have 430 issues with restartable reads from file descriptors when interrupted by signals:</p> 431 432 <ul> 433 <li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li> 434 <li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li> 435 </ul> 436 437 <h3>Select and Poll</h3> 438 439 <p>The exact combination of conditions indicating closed pipes remains relatively 440 obscure. Here is a message/thread describing them (in the context of another 441 topic):</p> 442 443 <ul> 444 <li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li> 445 </ul> 446 447 <p>It would seem, from using sockets and from studying the <code>asyncore</code> 448 module, that sockets are more predictable than pipes.</p> 449 450 <p>Notes about <code>poll</code> implementations can be found here:</p> 451 452 <ul> 453 <li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li> 454 </ul> 455 456 </body> 457 </html>