1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/parallel.py Mon Sep 26 18:25:58 2005 +0000
1.3 @@ -0,0 +1,120 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +A simple parallel processing API for Python, inspired somewhat by the thread
1.8 +module, slightly less by pypar, and slightly less still by pypvm.
1.9 +
1.10 +To create new processes to run a function or any callable object, specify the
1.11 +"callable" and any arguments as follows:
1.12 +
1.13 +channel = start(fn, arg1, arg2, named1=value1, named2=value2)
1.14 +
1.15 +This returns a channel which can then be used to communicate with the created
1.16 +process. Meanwhile, in the created process, the given callable will be invoked
1.17 +with another channel as its first argument followed by the specified arguments:
1.18 +
1.19 +def fn(channel, arg1, arg2, named1, named2):
1.20 + # Read to and write from the channel.
1.21 + # Return value is ignored.
1.22 + ...
1.23 +
1.24 +To create new processes in a similar way to that employed when using os.fork
1.25 +(ie. the fork system call on various operating systems), use the following
1.26 +method:
1.27 +
1.28 +channel = create()
1.29 +if channel.pid == 0:
1.30 + # This code is run by the created process.
1.31 + # Read to and write from the channel to communicate with the
1.32 + # creating/calling process.
1.33 + # An explicit exit of the process may be desirable to prevent the process
1.34 + # from running code which is intended for the creating/calling process.
1.35 + ...
1.36 +else:
1.37 + # This code is run by the creating/calling process.
1.38 + # Read to and write from the channel to communicate with the created
1.39 + # process.
1.40 + ...
1.41 +"""
1.42 +
1.43 +import os
1.44 +import sys
1.45 +
1.46 +try:
1.47 + import cPickle as pickle
1.48 +except ImportError:
1.49 + import pickle
1.50 +
1.51 +class Channel:
1.52 +
1.53 + "A communications channel."
1.54 +
1.55 + def __init__(self, pid, read_pipe, write_pipe):
1.56 +
1.57 + """
1.58 + Initialise the channel with a process identifier 'pid', a 'read_pipe'
1.59 + from which messages will be received, and a 'write_pipe' into which
1.60 + messages will be sent.
1.61 + """
1.62 +
1.63 + self.pid = pid
1.64 + self.read_pipe = read_pipe
1.65 + self.write_pipe = write_pipe
1.66 +
1.67 + def send(self, obj):
1.68 +
1.69 + "Send the given object 'obj' through the channel."
1.70 +
1.71 + pickle.dump(obj, self.write_pipe)
1.72 + self.write_pipe.flush()
1.73 +
1.74 + def receive(self):
1.75 +
1.76 + "Receive an object through the channel, returning the object."
1.77 +
1.78 + obj = pickle.load(self.read_pipe)
1.79 + if isinstance(obj, Exception):
1.80 + raise obj
1.81 + else:
1.82 + return obj
1.83 +
1.84 +def create():
1.85 +
1.86 + """
1.87 + Create a new process, returning a communications channel to both the
1.88 + creating process and the created process.
1.89 + """
1.90 +
1.91 + parent_read_fd, child_write_fd = os.pipe()
1.92 + child_read_fd, parent_write_fd = os.pipe()
1.93 +
1.94 + pid = os.fork()
1.95 + if pid == 0:
1.96 + return Channel(pid, os.fdopen(child_read_fd, "r"), os.fdopen(child_write_fd, "w"))
1.97 + else:
1.98 + return Channel(pid, os.fdopen(parent_read_fd, "r"), os.fdopen(parent_write_fd, "w"))
1.99 +
1.100 +def start(callable, *args, **kwargs):
1.101 +
1.102 + """
1.103 + Create a new process which shall start running in the given 'callable'.
1.104 + Return a communications channel to the creating process, and supply such a
1.105 + channel to the created process as the 'channel' parameter in the given
1.106 + 'callable'. Additional arguments to the 'callable' can be given as
1.107 + additional arguments to this function.
1.108 + """
1.109 +
1.110 + channel = create()
1.111 + if channel.pid == 0:
1.112 + try:
1.113 + try:
1.114 + callable(channel, *args, **kwargs)
1.115 + except:
1.116 + exc_type, exc_value, exc_traceback = sys.exc_info()
1.117 + channel.send(exc_value)
1.118 + finally:
1.119 + sys.exit(0)
1.120 + else:
1.121 + return channel
1.122 +
1.123 +# vim: tabstop=4 expandtab shiftwidth=4
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
2.2 +++ b/tests/create_loop.py Mon Sep 26 18:25:58 2005 +0000
2.3 @@ -0,0 +1,18 @@
2.4 +#!/usr/bin/env python
2.5 +
2.6 +from parallel import create
2.7 +
2.8 +limit = 100
2.9 +channel = create()
2.10 +if channel.pid == 0:
2.11 + i = channel.receive()
2.12 + while i < limit:
2.13 + print i
2.14 + i = channel.receive()
2.15 + channel.send("Done")
2.16 +else:
2.17 + for i in range(0, limit + 1):
2.18 + channel.send(i)
2.19 + print channel.receive()
2.20 +
2.21 +# vim: tabstop=4 expandtab shiftwidth=4
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
3.2 +++ b/tests/start_loop.py Mon Sep 26 18:25:58 2005 +0000
3.3 @@ -0,0 +1,20 @@
3.4 +#!/usr/bin/env python
3.5 +
3.6 +from parallel import start
3.7 +
3.8 +def loop(channel, limit):
3.9 + print "loop to", limit
3.10 + i = channel.receive()
3.11 + while i < limit:
3.12 + print i
3.13 + i = channel.receive()
3.14 + channel.send("Done")
3.15 +
3.16 +if __name__ == "__main__":
3.17 + limit = 100
3.18 + channel = start(loop, limit)
3.19 + for i in range(0, limit + 1):
3.20 + channel.send(i)
3.21 + print channel.receive()
3.22 +
3.23 +# vim: tabstop=4 expandtab shiftwidth=4