1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb"> 3 <head> 4 <meta http-equiv="content-type" content="text/html; charset=UTF-8" /> 5 <title>pprocess - Reference</title> 6 <link href="styles.css" rel="stylesheet" type="text/css" /> 7 </head> 8 <body> 9 10 <h1>pprocess - Reference</h1> 11 12 <p>The <code>pprocess</code> module defines a simple parallel processing API 13 for Python, inspired somewhat by the <code>thread</code> module, slightly less 14 by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly 15 less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p> 16 17 <p>This document complements the <a href="tutorial.html">tutorial</a> by 18 providing an overview of the different styles of parallel programming supported 19 by the module. For an introduction and in order to get a clearer idea of the 20 most suitable styles for your own programs, consult the 21 <a href="tutorial.html">tutorial</a>.</p> 22 23 <ul> 24 <li><a href="#Thread">Thread-style Processing</a></li> 25 <li><a href="#Fork">Fork-style Processing</a></li> 26 <li><a href="#Exchanges">Message Exchanges</a></li> 27 <li><a href="#Convenient">Convenient Message Exchanges</a></li> 28 <li><a href="#Queues">Exchanges as Queues</a></li> 29 <li><a href="#Maps">Exchanges as Maps</a></li> 30 <li><a href="#Managed">Managed Callables</a></li> 31 <li><a href="#MakeParallel">Making Existing Functions Parallel</a></li> 32 <li><a href="#Map">Map-style Processing</a></li> 33 <li><a href="#Reusing">Reusing Processes and Channels</a></li> 34 <li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li> 35 <li><a href="#BackgroundCallable">Background Processes and Callables</a></li> 36 <li><a href="#PersistentQueue">Background and Persistent Queues</a></li> 37 <li><a href="#Implementation">Implementation Notes</a></li> 38 </ul> 39 40 <h2 id="Thread">Thread-style Processing</h2> 41 42 <p>To create new processes to run a function or any callable object, specify the 43 "callable" and any arguments as follows:</p> 44 45 <pre> 46 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 47 </pre> 48 49 <p>This returns a channel which can then be used to communicate with the created 50 process. Meanwhile, in the created process, the given callable will be invoked 51 with another channel as its first argument followed by the specified arguments:</p> 52 53 <pre> 54 def fn(channel, arg1, arg2, named1, named2): 55 # Read from and write to the channel. 56 # Return value is ignored. 57 ... 58 </pre> 59 60 <h2 id="Fork">Fork-style Processing</h2> 61 62 <p>To create new processes in a similar way to that employed when using <code>os.fork</code> 63 (ie. the <code>fork</code> system call on various operating systems), use the following 64 method:</p> 65 66 <pre> 67 channel = pprocess.create() 68 if channel.pid == 0: 69 # This code is run by the created process. 70 # Read from and write to the channel to communicate with the 71 # creating/calling process. 72 # An explicit exit of the process may be desirable to prevent the process 73 # from running code which is intended for the creating/calling process. 74 ... 75 pprocess.exit(channel) 76 else: 77 # This code is run by the creating/calling process. 78 # Read from and write to the channel to communicate with the created 79 # process. 80 ... 81 </pre> 82 83 <h2 id="Exchanges">Message Exchanges</h2> 84 85 <p>When creating many processes, each providing results for the consumption of the 86 main process, the collection of those results in an efficient fashion can be 87 problematic: if some processes take longer than others, and if we decide to read 88 from those processes when they are not ready instead of other processes which 89 are ready, the whole activity will take much longer than necessary.</p> 90 91 <p>One solution to the problem of knowing when to read from channels is to create 92 an <code>Exchange</code> object, optionally initialising it with a list of channels 93 through which data is expected to arrive:</p> 94 95 <pre> 96 exchange = pprocess.Exchange() # populate the exchange later 97 exchange = pprocess.Exchange(channels) # populate the exchange with channels 98 </pre> 99 100 <p>We can add channels to the exchange using the <code>add</code> method:</p> 101 102 <pre> 103 exchange.add(channel) 104 </pre> 105 106 <p>To test whether an exchange is active - that is, whether it is actually 107 monitoring any channels - we can use the <code>active</code> method which 108 returns all channels being monitored by the exchange:</p> 109 110 <pre> 111 channels = exchange.active() 112 </pre> 113 114 <p>We may then check the exchange to see whether any data is ready to be received; 115 for example:</p> 116 117 <pre> 118 for channel in exchange.ready(): 119 # Read from and write to the channel. 120 ... 121 </pre> 122 123 <p>If we do not wish to wait indefinitely for a list of channels, we can set a 124 timeout value as an argument to the <code>ready</code> method (as a floating 125 point number specifying the timeout in seconds, where <code>0</code> means a 126 non-blocking poll as stated in the <code>select</code> module's <code>select</code> 127 function documentation).</p> 128 129 <h2 id="Convenient">Convenient Message Exchanges</h2> 130 131 <p>A convenient form of message exchanges can be adopted by defining a subclass of 132 the <code>Exchange</code> class and defining a particular method:</p> 133 134 <pre> 135 class MyExchange(pprocess.Exchange): 136 def store_data(self, channel): 137 data = channel.receive() 138 # Do something with data here. 139 </pre> 140 141 <p>The exact operations performed on the received data might be as simple as 142 storing it on an instance attribute. To make use of the exchange, we would 143 instantiate it as usual:</p> 144 145 <pre> 146 exchange = MyExchange() # populate the exchange later 147 exchange = MyExchange(limit=10) # set a limit for later population 148 </pre> 149 150 <p>The exchange can now be used in a simpler fashion than that shown above. We can 151 add channels as before using the <code>add</code> method, or we can choose to only 152 add channels if the specified limit of channels is not exceeded:</p> 153 154 <pre> 155 exchange.add(channel) # add a channel as normal 156 exchange.add_wait(channel) # add a channel, waiting if the limit would be 157 # exceeded 158 </pre> 159 160 <p>Or we can request that the exchange create a channel on our behalf:</p> 161 162 <pre> 163 channel = exchange.create() 164 </pre> 165 166 <p>We can even start processes and monitor channels without ever handling the 167 channel ourselves:</p> 168 169 <pre> 170 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 171 </pre> 172 173 <p>We can explicitly wait for "free space" for channels by calling the 174 <code>wait</code> method, although the <code>start</code> and <code>add_wait</code> 175 methods make this less interesting:</p> 176 177 <pre> 178 exchange.wait() 179 </pre> 180 181 <p>Finally, when finishing the computation, we can choose to merely call the 182 <code>finish</code> method and have the remaining data processed automatically:</p> 183 184 <pre> 185 exchange.finish() 186 </pre> 187 188 <p>Clearly, this approach is less flexible but more convenient than the raw message 189 exchange API as described above. However, it permits much simpler and clearer 190 code.</p> 191 192 <h2 id="Queues">Exchanges as Queues</h2> 193 194 <p>Instead of having to subclass the <code>pprocess.Exchange</code> class and 195 to define the <code>store_data</code> method, it might be more desirable to let 196 the exchange manage the communications between created and creating processes 197 and to let the creating process just consume received data as it arrives, 198 without particular regard for the order of the received data - perhaps the 199 creating process has its own way of managing such issues.</p> 200 201 <p>For such situations, the <code>Queue</code> class may be instantiated and 202 channels added to the queue using the various methods provided:</p> 203 204 <pre> 205 queue = pprocess.Queue(limit=10) 206 channel = queue.create() 207 if channel: 208 # Do some computation. 209 pprocess.exit(channel) 210 </pre> 211 212 <p>The results can then be consumed by treating the queue like an iterator:</p> 213 214 <pre> 215 for result in queue: 216 # Capture each result. 217 </pre> 218 219 <p>This approach does not, of course, require the direct handling of channels. 220 One could instead use the <code>start</code> method on the queue to create 221 processes and to initiate computations (since a queue is merely an enhanced 222 exchange with a specific implementation of the <code>store_data</code> 223 method).</p> 224 225 <h2 id="Maps">Exchanges as Maps</h2> 226 227 <p>Where the above <code>Queue</code> class appears like an attractive solution 228 for the management of the results of computations, but where the order of their 229 consumption by the creating process remains important, the <code>Map</code> 230 class may offer a suitable way of collecting and accessing results:</p> 231 232 <pre> 233 results = pprocess.Map(limit=10) 234 for value in inputs: 235 results.start(fn, args) 236 </pre> 237 238 <p>The results can then be consumed in an order corresponding to the order of the 239 computations which produced them:</p> 240 241 <pre> 242 for result in results: 243 # Process each result. 244 </pre> 245 246 <p>Internally, the <code>Map</code> object records a particular ordering of 247 channels, ensuring that the received results can be mapped to this ordering, 248 and that the results can be made available with this ordering preserved.</p> 249 250 <h2 id="Managed">Managed Callables</h2> 251 252 <p>A further simplification of the above convenient use of message exchanges 253 involves the creation of callables (eg. functions) which are automatically 254 monitored by an exchange. We create such a callable by calling the 255 <code>manage</code> method on an exchange:</p> 256 257 <pre> 258 myfn = exchange.manage(fn) 259 </pre> 260 261 <p>This callable can then be invoked instead of using the exchange's 262 <code>start</code> method:</p> 263 264 <pre> 265 myfn(arg1, arg2, named1=value1, named2=value2) 266 </pre> 267 268 <p>The exchange's <code>finish</code> method can be used as usual to process 269 incoming data.</p> 270 271 <h2 id="MakeParallel">Making Existing Functions Parallel</h2> 272 273 <p>In making a program parallel, existing functions which only return results can 274 be manually modified to accept and use channels to communicate results back to 275 the main process. However, a simple alternative is to use the <code>MakeParallel</code> 276 class to provide a wrapper around unmodified functions which will return the results 277 from those functions in the channels provided. For example:</p> 278 279 <pre> 280 fn = pprocess.MakeParallel(originalfn) 281 </pre> 282 283 <h2 id="Map">Map-style Processing</h2> 284 285 <p>In situations where a callable would normally be used in conjunction with the 286 Python built-in <code>map</code> function, an alternative solution can be adopted by using 287 the <code>pmap</code> function:</p> 288 289 <pre> 290 pprocess.pmap(fn, sequence) 291 </pre> 292 293 <p>Here, the sequence would have to contain elements that each contain the 294 required parameters of the specified callable, <code>fn</code>. Note that the 295 callable does not need to be a parallel-aware function which has a 296 <code>channel</code> argument: the <code>pmap</code> function automatically 297 wraps the given callable internally.</p> 298 299 <h2 id="Reusing">Reusing Processes and Channels</h2> 300 301 <p>So far, all parallel computations have been done with newly-created 302 processes. However, this can seem somewhat inefficient, especially if processes 303 are being continually created and destroyed (although if this happens too 304 often, the amount of work done by each process may be too little, anyway). One 305 solution is to retain processes after they have done their work and request 306 that they perform more work for each new parallel task or invocation. To enable 307 the reuse of processes in this way, a special keyword argument may be specified 308 when creating <code>Exchange</code> instances (and instances of subclasses such 309 as <code>Map</code> and <code>Queue</code>). For example:</p> 310 311 <pre> 312 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes 313 </pre> 314 315 <p>Code invoked through such exchanges must be aware of channels and be 316 constructed in such a way that it does not terminate after sending a result 317 back to the creating process. Instead, it should repeatedly wait for subsequent 318 sets of parameters (compatible with those either in the signature of a callable 319 or with the original values read from the channel). Reusable code is terminated 320 when the special value of <code>None</code> is sent from the creating process 321 to the created process, indicating that no more parameters will be sent; this 322 should cause the code to terminate.</p> 323 324 <h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2> 325 326 <p>An easier way of making reusable code sections for parallel use is to employ the 327 <code>MakeReusable</code> class to wrap an existing callable:</p> 328 329 <pre> 330 fn = pprocess.MakeReusable(originalfn) 331 </pre> 332 333 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but 334 provides the necessary mechanisms described above for reusable code.</p> 335 336 <h2 id="BackgroundCallable">Background Processes and Callables</h2> 337 338 <p>So far, all parallel computations have involved created processes which 339 depend on the existence of the created process to collect results and to 340 communicate with these created processes, preventing the created process from 341 terminating, even if the created processes actually perform work and potentially 342 create output which need not concern the process which created them. In order to 343 separate creating and created processes, the concept of a background process 344 (also known as a daemon process) is introduced.</p> 345 346 <p>The <code>BackgroundCallable</code> class acts somewhat like the 347 <code>manage</code> method on exchange-based objects, although no exchange is 348 immediately involved, and instances of <code>BackgroundCallable</code> provide 349 wrappers around existing parallel-aware callables which then be invoked in order 350 to initiate a background computation in a created process. For example:</p> 351 352 <pre> 353 backgroundfn = pprocess.BackgroundCallable(address, fn) 354 </pre> 355 356 <p>This wraps the supplied callable (which can itself be the result of using 357 <code>MakeParallel</code>), with the resulting wrapper lending itself to 358 invocation like any other function. One distinguishing feature is that of the 359 <code>address</code>: in order to contact the background process after 360 invocation to (amongst other things) receive any result, a specific address 361 must be given to define the contact point between the created process and any 362 processes seeking to connect to it. Since these "persistent" communications 363 employ special files (specifically UNIX-domain sockets), the address must be a 364 suitable filename.</p> 365 366 <h2 id="PersistentQueue">Background and Persistent Queues</h2> 367 368 <p>Background processes employing persistent communications require adaptations 369 of the facilities described in the sections above. For a single background 370 process, the <code>BackgroundQueue</code> function is sufficient to create a 371 queue-like object which can monitor the communications channel between the 372 connecting process and a background process. For example:</p> 373 374 <pre> 375 queue = pprocess.BackgroundQueue(address) 376 </pre> 377 378 <p>This code will cause the process reachable via the given <code>address</code> 379 to be contacted and any results made available via the created queue-like 380 object.</p> 381 382 <p>Where many background processes have been created, a single 383 <code>PersistentQueue</code> object can monitor their communications by being 384 connected to them all, as in the following example:</p> 385 386 <pre> 387 queue = pprocess.PersistentQueue() 388 for address in addresses: 389 queue.connect(address) 390 </pre> 391 392 <p>Here, the queue monitors all previously created processes whose addresses 393 reside in the <code>addresses</code> sequence. Upon iterating over the queue, 394 results will be taken from whichever process happens to have data available in 395 no particular pre-defined order.</p> 396 397 <h2 id="Implementation">Implementation Notes</h2> 398 399 <h3>Signals and Waiting</h3> 400 401 <p>When created/child processes terminate, one would typically want to be informed 402 of such conditions using a signal handler. Unfortunately, Python seems to have 403 issues with restartable reads from file descriptors when interrupted by signals:</p> 404 405 <ul> 406 <li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li> 407 <li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li> 408 </ul> 409 410 <h3>Select and Poll</h3> 411 412 <p>The exact combination of conditions indicating closed pipes remains relatively 413 obscure. Here is a message/thread describing them (in the context of another 414 topic):</p> 415 416 <ul> 417 <li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li> 418 </ul> 419 420 <p>It would seem, from using sockets and from studying the <code>asyncore</code> 421 module, that sockets are more predictable than pipes.</p> 422 423 <p>Notes about <code>poll</code> implementations can be found here:</p> 424 425 <ul> 426 <li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li> 427 </ul> 428 429 </body> 430 </html>