1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 else: 64 # This code is run by the creating/calling process. 65 # Read from and write to the channel to communicate with the created 66 # process. 67 ... 68 69 Message Exchanges 70 ----------------- 71 72 When creating many processes, each providing results for the consumption of the 73 main process, the collection of those results in an efficient fashion can be 74 problematic: if some processes take longer than others, and if we decide to read 75 from those processes when they are not ready instead of other processes which 76 are ready, the whole activity will take much longer than necessary. 77 78 One solution to the problem of knowing when to read from channels is to create 79 an Exchange object, optionally initialising it with a list of channels through 80 which data is expected to arrive: 81 82 exchange = pprocess.Exchange() # populate the exchange later 83 exchange = pprocess.Exchange(channels) # populate the exchange with channels 84 85 We can add channels to the exchange using the add method: 86 87 exchange.add(channel) 88 89 To test whether an exchange is active - that is, whether it is actually 90 monitoring any channels - we can use the active method which returns all 91 channels being monitored by the exchange: 92 93 channels = exchange.active() 94 95 We may then check the exchange to see whether any data is ready to be received; 96 for example: 97 98 for channel in exchange.ready(): 99 # Read from and write to the channel. 100 ... 101 102 If we do not wish to wait indefinitely for a list of channels, we can set a 103 timeout value as an argument to the ready method (as a floating point number 104 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 105 in the select module's select function documentation). 106 107 Convenient Message Exchanges 108 ---------------------------- 109 110 A convenient form of message exchanges can be adopted by defining a subclass of 111 the Exchange class and defining a particular method: 112 113 class MyExchange(pprocess.Exchange): 114 def store_data(self, channel): 115 data = channel.receive() 116 # Do something with data here. 117 118 The exact operations performed on the received data might be as simple as 119 storing it on an instance attribute. To make use of the exchange, we would 120 instantiate it as usual: 121 122 exchange = MyExchange() # populate the exchange later 123 exchange = MyExchange(limit=10) # set a limit for later population 124 125 The exchange can now be used in a simpler fashion than that shown above. We can 126 add channels as before using the add method, or we can choose to only add 127 channels if the specified limit of channels is not exceeded: 128 129 exchange.add(channel) # add a channel as normal 130 exchange.add_wait(channel) # add a channel, waiting if the limit would be 131 # exceeded 132 133 We can even start processes and monitor channels without ever handling the 134 channel ourselves: 135 136 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 137 138 We can explicitly wait for "free space" for channels by calling the wait method, 139 although the start and add_wait methods make this less interesting: 140 141 exchange.wait() 142 143 Finally, when finishing the computation, we can choose to merely call the finish 144 method and have the remaining data processed automatically: 145 146 exchange.finish() 147 148 Clearly, this approach is less flexible but more convenient than the raw message 149 exchange API as described above. However, it permits much simpler and clearer 150 code. 151 152 Managed Callables 153 ----------------- 154 155 A further simplification of the above convenient use of message exchanges 156 involves the creation of callables (eg. functions) which are automatically 157 monitored by an exchange. We create such a callable by calling the manage method 158 on an exchange: 159 160 myfn = exchange.manage(fn) 161 162 This callable can then be invoked instead of using the exchange's start method: 163 164 myfn(arg1, arg2, named1=value1, named2=value2) 165 166 The exchange's finish method can be used as usual to process incoming data. 167 168 Making Existing Functions Parallel 169 ---------------------------------- 170 171 In making a program parallel, existing functions which only return results can 172 be manually modified to accept and use channels to communicate results back to 173 the main process. However, a simple alternative is to use the MakeParallel class 174 to provide a wrapper around unmodified functions which will return the results 175 from those functions in the channels provided. For example: 176 177 fn = pprocess.MakeParallel(originalfn) 178 179 Map-style Processing 180 -------------------- 181 182 In situations where a callable would normally be used in conjunction with the 183 Python built-in map function, an alternative solution can be adopted by using 184 the pmap function: 185 186 pprocess.pmap(fn, sequence) 187 188 Here, the sequence would have to contain elements that each contain the required 189 parameters of the specified callable, fn. Note that the callable does not need 190 to be a parallel-aware function which has a channel argument: the pmap function 191 automatically wraps the given callable internally. 192 193 Signals and Waiting 194 ------------------- 195 196 When created/child processes terminate, one would typically want to be informed 197 of such conditions using a signal handler. Unfortunately, Python seems to have 198 issues with restartable reads from file descriptors when interrupted by signals: 199 200 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 201 http://twistedmatrix.com/bugs/issue733 202 203 Select and Poll 204 --------------- 205 206 The exact combination of conditions indicating closed pipes remains relatively 207 obscure. Here is a message/thread describing them (in the context of another 208 topic): 209 210 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 211 212 It would seem, from using sockets and from studying the asyncore module, that 213 sockets are more predictable than pipes. 214 215 Notes about poll implementations can be found here: 216 217 http://www.greenend.org.uk/rjk/2001/06/poll.html 218 """ 219 220 __version__ = "0.2.6" 221 222 import os 223 import sys 224 import select 225 import socket 226 227 try: 228 import cPickle as pickle 229 except ImportError: 230 import pickle 231 232 # Communications. 233 234 class AcknowledgementError(Exception): 235 pass 236 237 class Channel: 238 239 "A communications channel." 240 241 def __init__(self, pid, read_pipe, write_pipe): 242 243 """ 244 Initialise the channel with a process identifier 'pid', a 'read_pipe' 245 from which messages will be received, and a 'write_pipe' into which 246 messages will be sent. 247 """ 248 249 self.pid = pid 250 self.read_pipe = read_pipe 251 self.write_pipe = write_pipe 252 self.closed = 0 253 254 def __del__(self): 255 256 # Since signals don't work well with I/O, we close pipes and wait for 257 # created processes upon finalisation. 258 259 self.close() 260 261 def close(self): 262 263 "Explicitly close the channel." 264 265 if not self.closed: 266 self.closed = 1 267 self.read_pipe.close() 268 self.write_pipe.close() 269 #self.wait(os.WNOHANG) 270 271 def wait(self, options=0): 272 273 "Wait for the created process, if any, to exit." 274 275 if self.pid != 0: 276 try: 277 os.waitpid(self.pid, options) 278 except OSError: 279 pass 280 281 def _send(self, obj): 282 283 "Send the given object 'obj' through the channel." 284 285 pickle.dump(obj, self.write_pipe) 286 self.write_pipe.flush() 287 288 def send(self, obj): 289 290 """ 291 Send the given object 'obj' through the channel. Then wait for an 292 acknowledgement. (The acknowledgement makes the caller wait, thus 293 preventing processes from exiting and disrupting the communications 294 channel and losing data.) 295 """ 296 297 self._send(obj) 298 if self._receive() != "OK": 299 raise AcknowledgementError, obj 300 301 def _receive(self): 302 303 "Receive an object through the channel, returning the object." 304 305 obj = pickle.load(self.read_pipe) 306 if isinstance(obj, Exception): 307 raise obj 308 else: 309 return obj 310 311 def receive(self): 312 313 """ 314 Receive an object through the channel, returning the object. Send an 315 acknowledgement of receipt. (The acknowledgement makes the sender wait, 316 thus preventing processes from exiting and disrupting the communications 317 channel and losing data.) 318 """ 319 320 try: 321 obj = self._receive() 322 return obj 323 finally: 324 self._send("OK") 325 326 # Management of processes and communications. 327 328 class Exchange: 329 330 """ 331 A communications exchange that can be used to detect channels which are 332 ready to communicate. Subclasses of this class can define the 'store_data' 333 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 334 """ 335 336 def __init__(self, channels=None, limit=None, autoclose=1): 337 338 """ 339 Initialise the exchange with an optional list of 'channels'. 340 341 If the optional 'limit' is specified, restrictions on the addition of 342 new channels can be enforced and observed through the 'add_wait', 'wait' 343 and 'finish' methods. To make use of these methods, create a subclass of 344 this class and define a working 'store_data' method. 345 346 If the optional 'autoclose' parameter is set to a false value, channels 347 will not be closed automatically when they are removed from the exchange 348 - by default they are closed when removed. 349 """ 350 351 self.limit = limit 352 self.autoclose = autoclose 353 self.readables = {} 354 self.removed = [] 355 self.poller = select.poll() 356 for channel in channels or []: 357 self.add(channel) 358 359 def add(self, channel): 360 361 "Add the given 'channel' to the exchange." 362 363 self.readables[channel.read_pipe.fileno()] = channel 364 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 365 366 def active(self): 367 368 "Return a list of active channels." 369 370 return self.readables.values() 371 372 def ready(self, timeout=None): 373 374 """ 375 Wait for a period of time specified by the optional 'timeout' (or until 376 communication is possible) and return a list of channels which are ready 377 to be read from. 378 """ 379 380 fds = self.poller.poll(timeout) 381 readables = [] 382 self.removed = [] 383 384 for fd, status in fds: 385 channel = self.readables[fd] 386 removed = 0 387 388 # Remove ended/error channels. 389 390 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 391 self.remove(channel) 392 self.removed.append(channel) 393 removed = 1 394 395 # Record readable channels. 396 397 if status & select.POLLIN: 398 if not (removed and self.autoclose): 399 readables.append(channel) 400 401 return readables 402 403 def remove(self, channel): 404 405 """ 406 Remove the given 'channel' from the exchange. 407 """ 408 409 del self.readables[channel.read_pipe.fileno()] 410 self.poller.unregister(channel.read_pipe.fileno()) 411 if self.autoclose: 412 channel.close() 413 channel.wait() 414 415 # Enhanced exchange methods involving channel limits. 416 417 def add_wait(self, channel): 418 419 """ 420 Add the given 'channel' to the exchange, waiting if the limit on active 421 channels would be exceeded by adding the channel. 422 """ 423 424 self.wait() 425 self.add(channel) 426 427 def wait(self): 428 429 """ 430 Test for the limit on channels, blocking and reading incoming data until 431 the number of channels is below the limit. 432 """ 433 434 # If limited, block until channels have been closed. 435 436 while self.limit is not None and len(self.active()) >= self.limit: 437 self.store() 438 439 def finish(self): 440 441 """ 442 Finish the use of the exchange by waiting for all channels to complete. 443 """ 444 445 while self.active(): 446 self.store() 447 448 def store(self): 449 450 "For each ready channel, process the incoming data." 451 452 for channel in self.ready(): 453 self.store_data(channel) 454 455 def store_data(self, channel): 456 457 """ 458 Store incoming data from the specified 'channel'. In subclasses of this 459 class, such data could be stored using instance attributes. 460 """ 461 462 raise NotImplementedError, "store_data" 463 464 # Convenience methods. 465 466 def start(self, callable, *args, **kw): 467 468 """ 469 Using pprocess.start, create a new process for the given 'callable' 470 using any additional arguments provided. Then, monitor the channel 471 created between this process and the created process. 472 """ 473 474 self.add_wait(start(callable, *args, **kw)) 475 476 def manage(self, callable): 477 478 """ 479 Wrap the given 'callable' in an object which can then be called in the 480 same way as 'callable', but with new processes and communications 481 managed automatically. 482 """ 483 484 return ManagedCallable(callable, self) 485 486 class ManagedCallable: 487 488 "A callable managed by an exchange." 489 490 def __init__(self, callable, exchange): 491 492 """ 493 Wrap the given 'callable', using the given 'exchange' to monitor the 494 channels created for communications between this and the created 495 processes. 496 """ 497 498 self.callable = callable 499 self.exchange = exchange 500 501 def __call__(self, *args, **kw): 502 503 "Invoke the callable with the supplied arguments." 504 505 self.exchange.start(self.callable, *args, **kw) 506 507 # Abstractions and utilities. 508 509 class Map(Exchange): 510 511 "An exchange which can be used like the built-in 'map' function." 512 513 def add(self, channel): 514 515 "Add the given 'channel' to the exchange." 516 517 Exchange.add(self, channel) 518 self.channels[channel] = self.channel_number 519 self.channel_number += 1 520 521 def __call__(self, callable, sequence): 522 523 "Wrap and invoke 'callable' for each element in the 'sequence'." 524 525 if not isinstance(callable, MakeParallel): 526 wrapped = MakeParallel(callable) 527 else: 528 wrapped = callable 529 530 # Remember the channel addition order to order output. 531 532 self.channel_number = 0 533 self.channels = {} 534 self.results = [] 535 536 for i in sequence: 537 self.results.append(None) # placeholder 538 self.start(wrapped, i) 539 self.finish() 540 541 # NOTE: Could return results as they arrive, but we would then need to 542 # NOTE: return the position of each result in the original sequence. 543 544 return self.results 545 546 def store_data(self, channel): 547 548 "Accumulate the incoming data, associating results with channels." 549 550 data = channel.receive() 551 self.results[self.channels[channel]] = data 552 del self.channels[channel] 553 554 class MakeParallel: 555 556 "A wrapper around functions making them able to communicate results." 557 558 def __init__(self, callable): 559 560 "Initialise the wrapper with the given 'callable'." 561 562 self.callable = callable 563 564 def __call__(self, channel, *args, **kw): 565 566 "Invoke the callable and return its result via the given 'channel'." 567 568 channel.send(self.callable(*args, **kw)) 569 570 # Utility functions. 571 572 def create(): 573 574 """ 575 Create a new process, returning a communications channel to both the 576 creating process and the created process. 577 """ 578 579 parent, child = socket.socketpair() 580 for s in [parent, child]: 581 s.setblocking(1) 582 583 pid = os.fork() 584 if pid == 0: 585 parent.close() 586 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 587 else: 588 child.close() 589 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 590 591 def start(callable, *args, **kw): 592 593 """ 594 Create a new process which shall start running in the given 'callable'. 595 Return a communications channel to the creating process, and supply such a 596 channel to the created process as the 'channel' parameter in the given 597 'callable'. Additional arguments to the 'callable' can be given as 598 additional arguments to this function. 599 """ 600 601 channel = create() 602 if channel.pid == 0: 603 try: 604 try: 605 callable(channel, *args, **kw) 606 except: 607 exc_type, exc_value, exc_traceback = sys.exc_info() 608 channel.send(exc_value) 609 finally: 610 channel.close() 611 os._exit(0) 612 else: 613 return channel 614 615 def waitall(): 616 617 "Wait for all created processes to terminate." 618 619 try: 620 while 1: 621 os.wait() 622 except OSError: 623 pass 624 625 def pmap(callable, sequence, limit=None): 626 627 """ 628 A parallel version of the built-in map function with an optional process 629 'limit'. The given 'callable' need not be parallel-aware since it will be 630 wrapped for parallel communications before invocation. 631 """ 632 633 mymap = Map(limit=limit) 634 return mymap(callable, sequence) 635 636 # vim: tabstop=4 expandtab shiftwidth=4