pprocess

docs/reference.html

157:91b47853bf44
2009-06-04 Paul Boddie Added continuous communications support to the Exchange class. Added an example illustrating continuous communications.
     1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">     2 <html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">     3 <head>     4   <meta http-equiv="content-type" content="text/html; charset=UTF-8" />     5   <title>pprocess - Reference</title>     6   <link href="styles.css" rel="stylesheet" type="text/css" />     7 </head>     8 <body>     9     10 <h1>pprocess - Reference</h1>    11     12 <p>The <code>pprocess</code> module defines a simple parallel processing API    13 for Python, inspired somewhat by the <code>thread</code> module, slightly less    14 by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly    15 less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p>    16     17 <p>This document complements the <a href="tutorial.html">tutorial</a> by    18 providing an overview of the different styles of parallel programming supported    19 by the module. For an introduction and in order to get a clearer idea of the    20 most suitable styles for your own programs, consult the    21 <a href="tutorial.html">tutorial</a>.</p>    22     23 <ul>    24 <li><a href="#Thread">Thread-style Processing</a></li>    25 <li><a href="#Fork">Fork-style Processing</a></li>    26 <li><a href="#Exchanges">Message Exchanges</a></li>    27 <li><a href="#Convenient">Convenient Message Exchanges</a></li>    28 <li><a href="#Queues">Exchanges as Queues</a></li>    29 <li><a href="#Maps">Exchanges as Maps</a></li>    30 <li><a href="#Managed">Managed Callables</a></li>    31 <li><a href="#MakeParallel">Making Existing Functions Parallel</a></li>    32 <li><a href="#Map">Map-style Processing</a></li>    33 <li><a href="#Reusing">Reusing Processes and Channels</a></li>    34 <li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li>    35 <li><a href="#BackgroundCallable">Background Processes and Callables</a></li>    36 <li><a href="#PersistentQueue">Background and Persistent Queues</a></li>    37 <li><a href="#Implementation">Implementation Notes</a></li>    38 </ul>    39     40 <h2 id="Thread">Thread-style Processing</h2>    41     42 <p>To create new processes to run a function or any callable object, specify the    43 "callable" and any arguments as follows:</p>    44     45 <pre>    46 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)    47 </pre>    48     49 <p>This returns a channel which can then be used to communicate with the created    50 process. Meanwhile, in the created process, the given callable will be invoked    51 with another channel as its first argument followed by the specified arguments:</p>    52     53 <pre>    54 def fn(channel, arg1, arg2, named1, named2):    55     # Read from and write to the channel.    56     # Return value is ignored.    57     ...    58 </pre>    59     60 <h2 id="Fork">Fork-style Processing</h2>    61     62 <p>To create new processes in a similar way to that employed when using <code>os.fork</code>    63 (ie. the <code>fork</code> system call on various operating systems), use the following    64 method:</p>    65     66 <pre>    67 channel = pprocess.create()    68 if channel.pid == 0:    69     # This code is run by the created process.    70     # Read from and write to the channel to communicate with the    71     # creating/calling process.    72     # An explicit exit of the process may be desirable to prevent the process    73     # from running code which is intended for the creating/calling process.    74     ...    75     pprocess.exit(channel)    76 else:    77     # This code is run by the creating/calling process.    78     # Read from and write to the channel to communicate with the created    79     # process.    80     ...    81 </pre>    82     83 <h2 id="Exchanges">Message Exchanges</h2>    84     85 <p>When creating many processes, each providing results for the consumption of the    86 main process, the collection of those results in an efficient fashion can be    87 problematic: if some processes take longer than others, and if we decide to read    88 from those processes when they are not ready instead of other processes which    89 are ready, the whole activity will take much longer than necessary.</p>    90     91 <p>One solution to the problem of knowing when to read from channels is to create    92 an <code>Exchange</code> object, optionally initialising it with a list of channels    93 through which data is expected to arrive:</p>    94     95 <pre>    96 exchange = pprocess.Exchange()           # populate the exchange later    97 exchange = pprocess.Exchange(channels)   # populate the exchange with channels    98 </pre>    99    100 <p>We can add channels to the exchange using the <code>add</code> method:</p>   101    102 <pre>   103 exchange.add(channel)   104 </pre>   105    106 <p>To test whether an exchange is active - that is, whether it is actually   107 monitoring any channels - we can use the <code>active</code> method which   108 returns all channels being monitored by the exchange:</p>   109    110 <pre>   111 channels = exchange.active()   112 </pre>   113    114 <p>We may then check the exchange to see whether any data is ready to be received;   115 for example:</p>   116    117 <pre>   118 for channel in exchange.ready():   119     # Read from and write to the channel.   120     ...   121 </pre>   122    123 <p>If we do not wish to wait indefinitely for a list of channels, we can set a   124 timeout value as an argument to the <code>ready</code> method (as a floating   125 point number specifying the timeout in seconds, where <code>0</code> means a   126 non-blocking poll as stated in the <code>select</code> module's <code>select</code>   127 function documentation).</p>   128    129 <h2 id="Convenient">Convenient Message Exchanges</h2>   130    131 <p>A convenient form of message exchanges can be adopted by defining a subclass of   132 the <code>Exchange</code> class and defining a particular method:</p>   133    134 <pre>   135 class MyExchange(pprocess.Exchange):   136     def store_data(self, channel):   137         data = channel.receive()   138         # Do something with data here.   139 </pre>   140    141 <p>The exact operations performed on the received data might be as simple as   142 storing it on an instance attribute. To make use of the exchange, we would   143 instantiate it as usual:</p>   144    145 <pre>   146 exchange = MyExchange()         # populate the exchange later   147 exchange = MyExchange(limit=10) # set a limit for later population   148 </pre>   149    150 <p>The exchange can now be used in a simpler fashion than that shown above. We can   151 add channels as before using the <code>add</code> method, or we can choose to only   152 add channels if the specified limit of channels is not exceeded:</p>   153    154 <pre>   155 exchange.add(channel)           # add a channel as normal   156 exchange.add_wait(channel)      # add a channel, waiting if the limit would be   157                                 # exceeded   158 </pre>   159    160 <p>Or we can request that the exchange create a channel on our behalf:</p>   161    162 <pre>   163 channel = exchange.create()   164 </pre>   165    166 <p>We can even start processes and monitor channels without ever handling the   167 channel ourselves:</p>   168    169 <pre>   170 exchange.start(fn, arg1, arg2, named1=value1, named2=value2)   171 </pre>   172    173 <p>We can explicitly wait for "free space" for channels by calling the   174 <code>wait</code> method, although the <code>start</code> and <code>add_wait</code>   175 methods make this less interesting:</p>   176    177 <pre>   178 exchange.wait()   179 </pre>   180    181 <p>Finally, when finishing the computation, we can choose to merely call the   182 <code>finish</code> method and have the remaining data processed automatically:</p>   183    184 <pre>   185 exchange.finish()   186 </pre>   187    188 <p>Clearly, this approach is less flexible but more convenient than the raw message   189 exchange API as described above. However, it permits much simpler and clearer   190 code.</p>   191    192 <h2 id="Queues">Exchanges as Queues</h2>   193    194 <p>Instead of having to subclass the <code>pprocess.Exchange</code> class and   195 to define the <code>store_data</code> method, it might be more desirable to let   196 the exchange manage the communications between created and creating processes   197 and to let the creating process just consume received data as it arrives,   198 without particular regard for the order of the received data - perhaps the   199 creating process has its own way of managing such issues.</p>   200    201 <p>For such situations, the <code>Queue</code> class may be instantiated and   202 channels added to the queue using the various methods provided:</p>   203    204 <pre>   205 queue = pprocess.Queue(limit=10)   206 channel = queue.create()   207 if channel:   208     # Do some computation.   209     pprocess.exit(channel)   210 </pre>   211    212 <p>The results can then be consumed by treating the queue like an iterator:</p>   213    214 <pre>   215 for result in queue:   216     # Capture each result.   217 </pre>   218    219 <p>This approach does not, of course, require the direct handling of channels.   220 One could instead use the <code>start</code> method on the queue to create   221 processes and to initiate computations (since a queue is merely an enhanced   222 exchange with a specific implementation of the <code>store_data</code>   223 method).</p>   224    225 <h2 id="Maps">Exchanges as Maps</h2>   226    227 <p>Where the above <code>Queue</code> class appears like an attractive solution   228 for the management of the results of computations, but where the order of their   229 consumption by the creating process remains important, the <code>Map</code>   230 class may offer a suitable way of collecting and accessing results:</p>   231    232 <pre>   233 results = pprocess.Map(limit=10)   234 for value in inputs:   235     results.start(fn, args)   236 </pre>   237    238 <p>The results can then be consumed in an order corresponding to the order of the   239 computations which produced them:</p>   240    241 <pre>   242 for result in results:   243     # Process each result.   244 </pre>   245    246 <p>Internally, the <code>Map</code> object records a particular ordering of   247 channels, ensuring that the received results can be mapped to this ordering,   248 and that the results can be made available with this ordering preserved.</p>   249    250 <h2 id="Managed">Managed Callables</h2>   251    252 <p>A further simplification of the above convenient use of message exchanges   253 involves the creation of callables (eg. functions) which are automatically   254 monitored by an exchange. We create such a callable by calling the   255 <code>manage</code> method on an exchange:</p>   256    257 <pre>   258 myfn = exchange.manage(fn)   259 </pre>   260    261 <p>This callable can then be invoked instead of using the exchange's   262 <code>start</code> method:</p>   263    264 <pre>   265 myfn(arg1, arg2, named1=value1, named2=value2)   266 </pre>   267    268 <p>The exchange's <code>finish</code> method can be used as usual to process   269 incoming data.</p>   270    271 <h2 id="MakeParallel">Making Existing Functions Parallel</h2>   272    273 <p>In making a program parallel, existing functions which only return results can   274 be manually modified to accept and use channels to communicate results back to   275 the main process. However, a simple alternative is to use the <code>MakeParallel</code>   276 class to provide a wrapper around unmodified functions which will return the results   277 from those functions in the channels provided. For example:</p>   278    279 <pre>   280 fn = pprocess.MakeParallel(originalfn)   281 </pre>   282    283 <h2 id="Map">Map-style Processing</h2>   284    285 <p>In situations where a callable would normally be used in conjunction with the   286 Python built-in <code>map</code> function, an alternative solution can be adopted by using   287 the <code>pmap</code> function:</p>   288    289 <pre>   290 pprocess.pmap(fn, sequence)   291 </pre>   292    293 <p>Here, the sequence would have to contain elements that each contain the   294 required parameters of the specified callable, <code>fn</code>. Note that the   295 callable does not need to be a parallel-aware function which has a   296 <code>channel</code> argument: the <code>pmap</code> function automatically   297 wraps the given callable internally.</p>   298    299 <h2 id="Reusing">Reusing Processes and Channels</h2>   300    301 <p>So far, all parallel computations have been done with newly-created   302 processes. However, this can seem somewhat inefficient, especially if processes   303 are being continually created and destroyed (although if this happens too   304 often, the amount of work done by each process may be too little, anyway). One   305 solution is to retain processes after they have done their work and request   306 that they perform more work for each new parallel task or invocation. To enable   307 the reuse of processes in this way, a special keyword argument may be specified   308 when creating <code>Exchange</code> instances (and instances of subclasses such   309 as <code>Map</code> and <code>Queue</code>). For example:</p>   310    311 <pre>   312 exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes   313 </pre>   314    315 <p>Code invoked through such exchanges must be aware of channels and be   316 constructed in such a way that it does not terminate after sending a result   317 back to the creating process. Instead, it should repeatedly wait for subsequent   318 sets of parameters (compatible with those either in the signature of a callable   319 or with the original values read from the channel). Reusable code is terminated   320 when the special value of <code>None</code> is sent from the creating process   321 to the created process, indicating that no more parameters will be sent; this   322 should cause the code to terminate.</p>   323    324 <h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2>   325    326 <p>An easier way of making reusable code sections for parallel use is to employ the   327 <code>MakeReusable</code> class to wrap an existing callable:</p>   328    329 <pre>   330 fn = pprocess.MakeReusable(originalfn)   331 </pre>   332    333 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but   334 provides the necessary mechanisms described above for reusable code.</p>   335    336 <h2 id="BackgroundCallable">Background Processes and Callables</h2>   337    338 <p>So far, all parallel computations have involved created processes which   339 depend on the existence of the created process to collect results and to   340 communicate with these created processes, preventing the created process from   341 terminating, even if the created processes actually perform work and potentially   342 create output which need not concern the process which created them. In order to   343 separate creating and created processes, the concept of a background process   344 (also known as a daemon process) is introduced.</p>   345    346 <p>The <code>BackgroundCallable</code> class acts somewhat like the   347 <code>manage</code> method on exchange-based objects, although no exchange is   348 immediately involved, and instances of <code>BackgroundCallable</code> provide   349 wrappers around existing parallel-aware callables which then be invoked in order   350 to initiate a background computation in a created process. For example:</p>   351    352 <pre>   353 backgroundfn = pprocess.BackgroundCallable(address, fn)   354 </pre>   355    356 <p>This wraps the supplied callable (which can itself be the result of using   357 <code>MakeParallel</code>), with the resulting wrapper lending itself to   358 invocation like any other function. One distinguishing feature is that of the   359 <code>address</code>: in order to contact the background process after   360 invocation to (amongst other things) receive any result, a specific address   361 must be given to define the contact point between the created process and any   362 processes seeking to connect to it. Since these "persistent" communications   363 employ special files (specifically UNIX-domain sockets), the address must be a   364 suitable filename.</p>   365    366 <h2 id="PersistentQueue">Background and Persistent Queues</h2>   367    368 <p>Background processes employing persistent communications require adaptations   369 of the facilities described in the sections above. For a single background   370 process, the <code>BackgroundQueue</code> function is sufficient to create a   371 queue-like object which can monitor the communications channel between the   372 connecting process and a background process. For example:</p>   373    374 <pre>   375 queue = pprocess.BackgroundQueue(address)   376 </pre>   377    378 <p>This code will cause the process reachable via the given <code>address</code>   379 to be contacted and any results made available via the created queue-like   380 object.</p>   381    382 <p>Where many background processes have been created, a single   383 <code>PersistentQueue</code> object can monitor their communications by being   384 connected to them all, as in the following example:</p>   385    386 <pre>   387 queue = pprocess.PersistentQueue()   388 for address in addresses:   389     queue.connect(address)   390 </pre>   391    392 <p>Here, the queue monitors all previously created processes whose addresses   393 reside in the <code>addresses</code> sequence. Upon iterating over the queue,   394 results will be taken from whichever process happens to have data available in   395 no particular pre-defined order.</p>   396    397 <h2 id="Implementation">Implementation Notes</h2>   398    399 <h3>Signals and Waiting</h3>   400    401 <p>When created/child processes terminate, one would typically want to be informed   402 of such conditions using a signal handler. Unfortunately, Python seems to have   403 issues with restartable reads from file descriptors when interrupted by signals:</p>   404    405 <ul>   406 <li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li>   407 <li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li>   408 </ul>   409    410 <h3>Select and Poll</h3>   411    412 <p>The exact combination of conditions indicating closed pipes remains relatively   413 obscure. Here is a message/thread describing them (in the context of another   414 topic):</p>   415    416 <ul>   417 <li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li>   418 </ul>   419    420 <p>It would seem, from using sockets and from studying the <code>asyncore</code>   421 module, that sockets are more predictable than pipes.</p>   422    423 <p>Notes about <code>poll</code> implementations can be found here:</p>   424    425 <ul>   426 <li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li>   427 </ul>   428    429 </body>   430 </html>