1 #!/usr/bin/env python 2 3 """ 4 A simple parallel processing API for Python, inspired somewhat by the thread 5 module, slightly less by pypar, and slightly less still by pypvm. 6 7 To create new processes to run a function or any callable object, specify the 8 "callable" and any arguments as follows: 9 10 channel = start(fn, arg1, arg2, named1=value1, named2=value2) 11 12 This returns a channel which can then be used to communicate with the created 13 process. Meanwhile, in the created process, the given callable will be invoked 14 with another channel as its first argument followed by the specified arguments: 15 16 def fn(channel, arg1, arg2, named1, named2): 17 # Read to and write from the channel. 18 # Return value is ignored. 19 ... 20 21 To create new processes in a similar way to that employed when using os.fork 22 (ie. the fork system call on various operating systems), use the following 23 method: 24 25 channel = create() 26 if channel.pid == 0: 27 # This code is run by the created process. 28 # Read to and write from the channel to communicate with the 29 # creating/calling process. 30 # An explicit exit of the process may be desirable to prevent the process 31 # from running code which is intended for the creating/calling process. 32 ... 33 else: 34 # This code is run by the creating/calling process. 35 # Read to and write from the channel to communicate with the created 36 # process. 37 ... 38 """ 39 40 import os 41 import sys 42 43 try: 44 import cPickle as pickle 45 except ImportError: 46 import pickle 47 48 class Channel: 49 50 "A communications channel." 51 52 def __init__(self, pid, read_pipe, write_pipe): 53 54 """ 55 Initialise the channel with a process identifier 'pid', a 'read_pipe' 56 from which messages will be received, and a 'write_pipe' into which 57 messages will be sent. 58 """ 59 60 self.pid = pid 61 self.read_pipe = read_pipe 62 self.write_pipe = write_pipe 63 64 def send(self, obj): 65 66 "Send the given object 'obj' through the channel." 67 68 pickle.dump(obj, self.write_pipe) 69 self.write_pipe.flush() 70 71 def receive(self): 72 73 "Receive an object through the channel, returning the object." 74 75 obj = pickle.load(self.read_pipe) 76 if isinstance(obj, Exception): 77 raise obj 78 else: 79 return obj 80 81 def create(): 82 83 """ 84 Create a new process, returning a communications channel to both the 85 creating process and the created process. 86 """ 87 88 parent_read_fd, child_write_fd = os.pipe() 89 child_read_fd, parent_write_fd = os.pipe() 90 91 pid = os.fork() 92 if pid == 0: 93 return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w")) 94 else: 95 return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w")) 96 97 def start(callable, *args, **kwargs): 98 99 """ 100 Create a new process which shall start running in the given 'callable'. 101 Return a communications channel to the creating process, and supply such a 102 channel to the created process as the 'channel' parameter in the given 103 'callable'. Additional arguments to the 'callable' can be given as 104 additional arguments to this function. 105 """ 106 107 channel = create() 108 if channel.pid == 0: 109 try: 110 try: 111 callable(channel, *args, **kwargs) 112 except: 113 exc_type, exc_value, exc_traceback = sys.exc_info() 114 channel.send(exc_value) 115 finally: 116 sys.exit(0) 117 else: 118 return channel 119 120 # vim: tabstop=4 expandtab shiftwidth=4