1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 8 9 This program is free software; you can redistribute it and/or modify it under 10 the terms of the GNU Lesser General Public License as published by the Free 11 Software Foundation; either version 3 of the License, or (at your option) any 12 later version. 13 14 This program is distributed in the hope that it will be useful, but WITHOUT 15 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 16 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 17 details. 18 19 You should have received a copy of the GNU Lesser General Public License along 20 with this program. If not, see <http://www.gnu.org/licenses/>. 21 22 -------- 23 24 The recommended styles of programming using pprocess involve the "Thread-style 25 Processing", "Convenient Message Exchanges", "Managed Callables" and "Map-style 26 Processing" sections below, although developers may wish to read the "Message 27 Exchanges" section for more details of the API concerned, and the "Fork-style 28 Processing" section may be of interest to those with experience of large scale 29 parallel processing systems. 30 31 Thread-style Processing 32 ----------------------- 33 34 To create new processes to run a function or any callable object, specify the 35 "callable" and any arguments as follows: 36 37 channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2) 38 39 This returns a channel which can then be used to communicate with the created 40 process. Meanwhile, in the created process, the given callable will be invoked 41 with another channel as its first argument followed by the specified arguments: 42 43 def fn(channel, arg1, arg2, named1, named2): 44 # Read from and write to the channel. 45 # Return value is ignored. 46 ... 47 48 Fork-style Processing 49 --------------------- 50 51 To create new processes in a similar way to that employed when using os.fork 52 (ie. the fork system call on various operating systems), use the following 53 method: 54 55 channel = pprocess.create() 56 if channel.pid == 0: 57 # This code is run by the created process. 58 # Read from and write to the channel to communicate with the 59 # creating/calling process. 60 # An explicit exit of the process may be desirable to prevent the process 61 # from running code which is intended for the creating/calling process. 62 ... 63 pprocess.exit(channel) 64 else: 65 # This code is run by the creating/calling process. 66 # Read from and write to the channel to communicate with the created 67 # process. 68 ... 69 70 Message Exchanges 71 ----------------- 72 73 When creating many processes, each providing results for the consumption of the 74 main process, the collection of those results in an efficient fashion can be 75 problematic: if some processes take longer than others, and if we decide to read 76 from those processes when they are not ready instead of other processes which 77 are ready, the whole activity will take much longer than necessary. 78 79 One solution to the problem of knowing when to read from channels is to create 80 an Exchange object, optionally initialising it with a list of channels through 81 which data is expected to arrive: 82 83 exchange = pprocess.Exchange() # populate the exchange later 84 exchange = pprocess.Exchange(channels) # populate the exchange with channels 85 86 We can add channels to the exchange using the add method: 87 88 exchange.add(channel) 89 90 To test whether an exchange is active - that is, whether it is actually 91 monitoring any channels - we can use the active method which returns all 92 channels being monitored by the exchange: 93 94 channels = exchange.active() 95 96 We may then check the exchange to see whether any data is ready to be received; 97 for example: 98 99 for channel in exchange.ready(): 100 # Read from and write to the channel. 101 ... 102 103 If we do not wish to wait indefinitely for a list of channels, we can set a 104 timeout value as an argument to the ready method (as a floating point number 105 specifying the timeout in seconds, where 0 means a non-blocking poll as stated 106 in the select module's select function documentation). 107 108 Convenient Message Exchanges 109 ---------------------------- 110 111 A convenient form of message exchanges can be adopted by defining a subclass of 112 the Exchange class and defining a particular method: 113 114 class MyExchange(pprocess.Exchange): 115 def store_data(self, channel): 116 data = channel.receive() 117 # Do something with data here. 118 119 The exact operations performed on the received data might be as simple as 120 storing it on an instance attribute. To make use of the exchange, we would 121 instantiate it as usual: 122 123 exchange = MyExchange() # populate the exchange later 124 exchange = MyExchange(limit=10) # set a limit for later population 125 126 The exchange can now be used in a simpler fashion than that shown above. We can 127 add channels as before using the add method, or we can choose to only add 128 channels if the specified limit of channels is not exceeded: 129 130 exchange.add(channel) # add a channel as normal 131 exchange.add_wait(channel) # add a channel, waiting if the limit would be 132 # exceeded 133 134 Or we can request that the exchange create a channel on our behalf: 135 136 channel = exchange.create() 137 138 We can even start processes and monitor channels without ever handling the 139 channel ourselves: 140 141 exchange.start(fn, arg1, arg2, named1=value1, named2=value2) 142 143 We can explicitly wait for "free space" for channels by calling the wait method, 144 although the start and add_wait methods make this less interesting: 145 146 exchange.wait() 147 148 Finally, when finishing the computation, we can choose to merely call the finish 149 method and have the remaining data processed automatically: 150 151 exchange.finish() 152 153 Clearly, this approach is less flexible but more convenient than the raw message 154 exchange API as described above. However, it permits much simpler and clearer 155 code. 156 157 Exchanges as Queues 158 ------------------- 159 160 Instead of having to subclass the pprocess.Exchange class and to define the 161 store_data method, it might be more desirable to let the exchange manage the 162 communications between created and creating processes and to let the creating 163 process just consume received data as it arrives, without particular regard for 164 the order of the received data - perhaps the creating process has its own way of 165 managing such issues. 166 167 For such situations, the Queue class may be instantiated and channels added to 168 the queue using the various methods provided: 169 170 queue = pprocess.Queue(limit=10) 171 channel = queue.create() 172 if channel: 173 # Do some computation. 174 pprocess.exit(channel) 175 176 The results can then be consumed by treating the queue like an iterator: 177 178 for result in queue: 179 # Capture each result. 180 181 This approach does not, of course, require the direct handling of channels. One 182 could instead use the start method on the queue to create processes and to 183 initiate computations (since a queue is merely an enhanced exchange with a 184 specific implementation of the store_data method). 185 186 Managed Callables 187 ----------------- 188 189 A further simplification of the above convenient use of message exchanges 190 involves the creation of callables (eg. functions) which are automatically 191 monitored by an exchange. We create such a callable by calling the manage method 192 on an exchange: 193 194 myfn = exchange.manage(fn) 195 196 This callable can then be invoked instead of using the exchange's start method: 197 198 myfn(arg1, arg2, named1=value1, named2=value2) 199 200 The exchange's finish method can be used as usual to process incoming data. 201 202 Making Existing Functions Parallel 203 ---------------------------------- 204 205 In making a program parallel, existing functions which only return results can 206 be manually modified to accept and use channels to communicate results back to 207 the main process. However, a simple alternative is to use the MakeParallel class 208 to provide a wrapper around unmodified functions which will return the results 209 from those functions in the channels provided. For example: 210 211 fn = pprocess.MakeParallel(originalfn) 212 213 Map-style Processing 214 -------------------- 215 216 In situations where a callable would normally be used in conjunction with the 217 Python built-in map function, an alternative solution can be adopted by using 218 the pmap function: 219 220 pprocess.pmap(fn, sequence) 221 222 Here, the sequence would have to contain elements that each contain the required 223 parameters of the specified callable, fn. Note that the callable does not need 224 to be a parallel-aware function which has a channel argument: the pmap function 225 automatically wraps the given callable internally. 226 227 Signals and Waiting 228 ------------------- 229 230 When created/child processes terminate, one would typically want to be informed 231 of such conditions using a signal handler. Unfortunately, Python seems to have 232 issues with restartable reads from file descriptors when interrupted by signals: 233 234 http://mail.python.org/pipermail/python-dev/2002-September/028572.html 235 http://twistedmatrix.com/bugs/issue733 236 237 Select and Poll 238 --------------- 239 240 The exact combination of conditions indicating closed pipes remains relatively 241 obscure. Here is a message/thread describing them (in the context of another 242 topic): 243 244 http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html 245 246 It would seem, from using sockets and from studying the asyncore module, that 247 sockets are more predictable than pipes. 248 249 Notes about poll implementations can be found here: 250 251 http://www.greenend.org.uk/rjk/2001/06/poll.html 252 """ 253 254 __version__ = "0.2.6" 255 256 import os 257 import sys 258 import select 259 import socket 260 261 try: 262 import cPickle as pickle 263 except ImportError: 264 import pickle 265 266 # Communications. 267 268 class AcknowledgementError(Exception): 269 pass 270 271 class Channel: 272 273 "A communications channel." 274 275 def __init__(self, pid, read_pipe, write_pipe): 276 277 """ 278 Initialise the channel with a process identifier 'pid', a 'read_pipe' 279 from which messages will be received, and a 'write_pipe' into which 280 messages will be sent. 281 """ 282 283 self.pid = pid 284 self.read_pipe = read_pipe 285 self.write_pipe = write_pipe 286 self.closed = 0 287 288 def __del__(self): 289 290 # Since signals don't work well with I/O, we close pipes and wait for 291 # created processes upon finalisation. 292 293 self.close() 294 295 def close(self): 296 297 "Explicitly close the channel." 298 299 if not self.closed: 300 self.closed = 1 301 self.read_pipe.close() 302 self.write_pipe.close() 303 #self.wait(os.WNOHANG) 304 305 def wait(self, options=0): 306 307 "Wait for the created process, if any, to exit." 308 309 if self.pid != 0: 310 try: 311 os.waitpid(self.pid, options) 312 except OSError: 313 pass 314 315 def _send(self, obj): 316 317 "Send the given object 'obj' through the channel." 318 319 pickle.dump(obj, self.write_pipe) 320 self.write_pipe.flush() 321 322 def send(self, obj): 323 324 """ 325 Send the given object 'obj' through the channel. Then wait for an 326 acknowledgement. (The acknowledgement makes the caller wait, thus 327 preventing processes from exiting and disrupting the communications 328 channel and losing data.) 329 """ 330 331 self._send(obj) 332 if self._receive() != "OK": 333 raise AcknowledgementError, obj 334 335 def _receive(self): 336 337 "Receive an object through the channel, returning the object." 338 339 obj = pickle.load(self.read_pipe) 340 if isinstance(obj, Exception): 341 raise obj 342 else: 343 return obj 344 345 def receive(self): 346 347 """ 348 Receive an object through the channel, returning the object. Send an 349 acknowledgement of receipt. (The acknowledgement makes the sender wait, 350 thus preventing processes from exiting and disrupting the communications 351 channel and losing data.) 352 """ 353 354 try: 355 obj = self._receive() 356 return obj 357 finally: 358 self._send("OK") 359 360 # Management of processes and communications. 361 362 class Exchange: 363 364 """ 365 A communications exchange that can be used to detect channels which are 366 ready to communicate. Subclasses of this class can define the 'store_data' 367 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 368 """ 369 370 def __init__(self, channels=None, limit=None, autoclose=1): 371 372 """ 373 Initialise the exchange with an optional list of 'channels'. 374 375 If the optional 'limit' is specified, restrictions on the addition of 376 new channels can be enforced and observed through the 'add_wait', 'wait' 377 and 'finish' methods. To make use of these methods, create a subclass of 378 this class and define a working 'store_data' method. 379 380 If the optional 'autoclose' parameter is set to a false value, channels 381 will not be closed automatically when they are removed from the exchange 382 - by default they are closed when removed. 383 """ 384 385 self.limit = limit 386 self.autoclose = autoclose 387 self.readables = {} 388 self.removed = [] 389 self.poller = select.poll() 390 for channel in channels or []: 391 self.add(channel) 392 393 def add(self, channel): 394 395 "Add the given 'channel' to the exchange." 396 397 self.readables[channel.read_pipe.fileno()] = channel 398 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR) 399 400 def active(self): 401 402 "Return a list of active channels." 403 404 return self.readables.values() 405 406 def ready(self, timeout=None): 407 408 """ 409 Wait for a period of time specified by the optional 'timeout' (or until 410 communication is possible) and return a list of channels which are ready 411 to be read from. 412 """ 413 414 fds = self.poller.poll(timeout) 415 readables = [] 416 self.removed = [] 417 418 for fd, status in fds: 419 channel = self.readables[fd] 420 removed = 0 421 422 # Remove ended/error channels. 423 424 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 425 self.remove(channel) 426 self.removed.append(channel) 427 removed = 1 428 429 # Record readable channels. 430 431 if status & select.POLLIN: 432 if not (removed and self.autoclose): 433 readables.append(channel) 434 435 return readables 436 437 def remove(self, channel): 438 439 """ 440 Remove the given 'channel' from the exchange. 441 """ 442 443 del self.readables[channel.read_pipe.fileno()] 444 self.poller.unregister(channel.read_pipe.fileno()) 445 if self.autoclose: 446 channel.close() 447 channel.wait() 448 449 # Enhanced exchange methods involving channel limits. 450 451 def add_wait(self, channel): 452 453 """ 454 Add the given 'channel' to the exchange, waiting if the limit on active 455 channels would be exceeded by adding the channel. 456 """ 457 458 self.wait() 459 self.add(channel) 460 461 def wait(self): 462 463 """ 464 Test for the limit on channels, blocking and reading incoming data until 465 the number of channels is below the limit. 466 """ 467 468 # If limited, block until channels have been closed. 469 470 while self.limit is not None and len(self.active()) >= self.limit: 471 self.store() 472 473 def finish(self): 474 475 """ 476 Finish the use of the exchange by waiting for all channels to complete. 477 """ 478 479 while self.active(): 480 self.store() 481 482 def store(self): 483 484 "For each ready channel, process the incoming data." 485 486 for channel in self.ready(): 487 self.store_data(channel) 488 489 def store_data(self, channel): 490 491 """ 492 Store incoming data from the specified 'channel'. In subclasses of this 493 class, such data could be stored using instance attributes. 494 """ 495 496 raise NotImplementedError, "store_data" 497 498 # Convenience methods. 499 500 def start(self, callable, *args, **kw): 501 502 """ 503 Using pprocess.start, create a new process for the given 'callable' 504 using any additional arguments provided. Then, monitor the channel 505 created between this process and the created process. 506 """ 507 508 self.add_wait(start(callable, *args, **kw)) 509 510 def create(self): 511 512 """ 513 Using pprocess.create, create a new process and return the created 514 communications channel to the created process. In the creating process, 515 return None - the channel receiving data from the created process will 516 be automatically managed by this exchange. 517 """ 518 519 channel = create() 520 if channel.pid == 0: 521 return channel 522 else: 523 self.add_wait(channel) 524 return None 525 526 def manage(self, callable): 527 528 """ 529 Wrap the given 'callable' in an object which can then be called in the 530 same way as 'callable', but with new processes and communications 531 managed automatically. 532 """ 533 534 return ManagedCallable(callable, self) 535 536 class ManagedCallable: 537 538 "A callable managed by an exchange." 539 540 def __init__(self, callable, exchange): 541 542 """ 543 Wrap the given 'callable', using the given 'exchange' to monitor the 544 channels created for communications between this and the created 545 processes. Note that the 'callable' must be parallel-aware (that is, 546 have a 'channel' parameter). Use the MakeParallel class to wrap other 547 kinds of callable objects. 548 """ 549 550 self.callable = callable 551 self.exchange = exchange 552 553 def __call__(self, *args, **kw): 554 555 "Invoke the callable with the supplied arguments." 556 557 self.exchange.start(self.callable, *args, **kw) 558 559 # Abstractions and utilities. 560 561 class Map(Exchange): 562 563 "An exchange which can be used like the built-in 'map' function." 564 565 def add(self, channel): 566 567 "Add the given 'channel' to the exchange." 568 569 Exchange.add(self, channel) 570 self.channels[channel] = self.channel_number 571 self.channel_number += 1 572 573 def __call__(self, callable, sequence): 574 575 "Wrap and invoke 'callable' for each element in the 'sequence'." 576 577 if not isinstance(callable, MakeParallel): 578 wrapped = MakeParallel(callable) 579 else: 580 wrapped = callable 581 582 # Remember the channel addition order to order output. 583 584 self.channel_number = 0 585 self.channels = {} 586 self.results = [] 587 588 for i in sequence: 589 self.results.append(None) # placeholder 590 self.start(wrapped, i) 591 self.finish() 592 593 # NOTE: Could return results as they arrive, but we would then need to 594 # NOTE: return the position of each result in the original sequence. 595 596 return self.results 597 598 def store_data(self, channel): 599 600 "Accumulate the incoming data, associating results with channels." 601 602 data = channel.receive() 603 self.results[self.channels[channel]] = data 604 del self.channels[channel] 605 606 class Queue(Exchange): 607 608 """ 609 An exchange acting as a queue, making data from created processes available 610 in the order in which it is received. 611 """ 612 613 def __init__(self, *args, **kw): 614 Exchange.__init__(self, *args, **kw) 615 self.queue = [] 616 617 def store_data(self, channel): 618 619 "Accumulate the incoming data, associating results with channels." 620 621 data = channel.receive() 622 self.queue.insert(0, data) 623 624 def __iter__(self): 625 return self 626 627 def next(self): 628 629 "Return the next element in the queue." 630 631 if self.queue: 632 return self.queue.pop() 633 while self.active(): 634 self.store() 635 if self.queue: 636 return self.queue.pop() 637 else: 638 raise StopIteration 639 640 class MakeParallel: 641 642 "A wrapper around functions making them able to communicate results." 643 644 def __init__(self, callable): 645 646 """ 647 Initialise the wrapper with the given 'callable'. This object will then 648 be able to accept a 'channel' parameter when invoked, and to forward the 649 result of the given 'callable' via the channel provided back to the 650 invoking process. 651 """ 652 653 self.callable = callable 654 655 def __call__(self, channel, *args, **kw): 656 657 "Invoke the callable and return its result via the given 'channel'." 658 659 channel.send(self.callable(*args, **kw)) 660 661 # Utility functions. 662 663 def create(): 664 665 """ 666 Create a new process, returning a communications channel to both the 667 creating process and the created process. 668 """ 669 670 parent, child = socket.socketpair() 671 for s in [parent, child]: 672 s.setblocking(1) 673 674 pid = os.fork() 675 if pid == 0: 676 parent.close() 677 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 678 else: 679 child.close() 680 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0)) 681 682 def exit(channel): 683 684 """ 685 Terminate a created process, closing the given 'channel'. 686 """ 687 688 channel.close() 689 os._exit(0) 690 691 def start(callable, *args, **kw): 692 693 """ 694 Create a new process which shall start running in the given 'callable'. 695 Additional arguments to the 'callable' can be given as additional arguments 696 to this function. 697 698 Return a communications channel to the creating process. For the created 699 process, supply a channel as the 'channel' parameter in the given 'callable' 700 so that it may send data back to the creating process. 701 """ 702 703 channel = create() 704 if channel.pid == 0: 705 try: 706 try: 707 callable(channel, *args, **kw) 708 except: 709 exc_type, exc_value, exc_traceback = sys.exc_info() 710 channel.send(exc_value) 711 finally: 712 pprocess.exit(channel) 713 else: 714 return channel 715 716 def waitall(): 717 718 "Wait for all created processes to terminate." 719 720 try: 721 while 1: 722 os.wait() 723 except OSError: 724 pass 725 726 def pmap(callable, sequence, limit=None): 727 728 """ 729 A parallel version of the built-in map function with an optional process 730 'limit'. The given 'callable' should not be parallel-aware (that is, have a 731 'channel' parameter) since it will be wrapped for parallel communications 732 before being invoked. 733 734 Return the processed 'sequence' where each element in the sequence is 735 processed by a different process. 736 """ 737 738 mymap = Map(limit=limit) 739 return mymap(callable, sequence) 740 741 # vim: tabstop=4 expandtab shiftwidth=4