1.1 --- a/parallel.py Thu Jun 19 21:42:02 2008 +0200
1.2 +++ b/parallel.py Mon Sep 26 21:58:04 2005 +0000
1.3 @@ -4,6 +4,9 @@
1.4 A simple parallel processing API for Python, inspired somewhat by the thread
1.5 module, slightly less by pypar, and slightly less still by pypvm.
1.6
1.7 +Thread-style Processing
1.8 +-----------------------
1.9 +
1.10 To create new processes to run a function or any callable object, specify the
1.11 "callable" and any arguments as follows:
1.12
1.13 @@ -14,10 +17,13 @@
1.14 with another channel as its first argument followed by the specified arguments:
1.15
1.16 def fn(channel, arg1, arg2, named1, named2):
1.17 - # Read to and write from the channel.
1.18 + # Read from and write to the channel.
1.19 # Return value is ignored.
1.20 ...
1.21
1.22 +Fork-style Processing
1.23 +---------------------
1.24 +
1.25 To create new processes in a similar way to that employed when using os.fork
1.26 (ie. the fork system call on various operating systems), use the following
1.27 method:
1.28 @@ -25,20 +31,49 @@
1.29 channel = create()
1.30 if channel.pid == 0:
1.31 # This code is run by the created process.
1.32 - # Read to and write from the channel to communicate with the
1.33 + # Read from and write to the channel to communicate with the
1.34 # creating/calling process.
1.35 # An explicit exit of the process may be desirable to prevent the process
1.36 # from running code which is intended for the creating/calling process.
1.37 ...
1.38 else:
1.39 # This code is run by the creating/calling process.
1.40 - # Read to and write from the channel to communicate with the created
1.41 + # Read from and write to the channel to communicate with the created
1.42 # process.
1.43 ...
1.44 +
1.45 +Message Exchanges
1.46 +-----------------
1.47 +
1.48 +When creating many processes, each providing results for the consumption of the
1.49 +main process, the collection of those results in an efficient fashion can be
1.50 +problematic: if some processes take longer than others, and if we decide to read
1.51 +from those processes when they are not ready instead of other processes which
1.52 +are ready, the whole activity will take much longer than necessary.
1.53 +
1.54 +One solution to the problem of knowing when to read from channels is to create
1.55 +an Exchange object, initialising it with a list of channels through which data
1.56 +is expected to arrive:
1.57 +
1.58 +exchange = Exchange(channels)
1.59 +
1.60 +We may then check the exchange to see whether any data is ready to be received;
1.61 +for example:
1.62 +
1.63 +for channel in exchange.ready():
1.64 + # Read from and write to the channel.
1.65 + ...
1.66 +
1.67 +If we do not wish to wait indefinitely for a list of channels, we can set a
1.68 +timeout value as an argument to the ready method (as a floating point number
1.69 +specifying the timeout in seconds, where 0 means a non-blocking poll as stated
1.70 +in the select module's select function documentation).
1.71 """
1.72
1.73 import os
1.74 import sys
1.75 +from select import select
1.76 +from signal import signal, SIGCHLD
1.77
1.78 try:
1.79 import cPickle as pickle
1.80 @@ -78,6 +113,38 @@
1.81 else:
1.82 return obj
1.83
1.84 +class Exchange:
1.85 +
1.86 + """
1.87 + A communications exchange that can be used to detect channels which are
1.88 + ready to communicate.
1.89 + """
1.90 +
1.91 + def __init__(self, channels):
1.92 +
1.93 + "Initialise the exchange with the given 'channels'."
1.94 +
1.95 + self.readables = {}
1.96 + for channel in channels:
1.97 + self.readables[channel.read_pipe] = channel
1.98 +
1.99 + def ready(self, timeout=None):
1.100 +
1.101 + """
1.102 + Wait for a period of time specified by the optional 'timeout' (or until
1.103 + communication is possible) and return a list of channels which are ready
1.104 + to be read from.
1.105 + """
1.106 +
1.107 + if timeout is not None:
1.108 + t = select(self.readables.keys(), [], [], timeout)
1.109 + else:
1.110 + t = select(self.readables.keys(), [], [])
1.111 +
1.112 + readable_fds, writable_fds, exceptional_fds = t
1.113 + readable = [self.readables[fd] for fd in readable_fds]
1.114 + return readable
1.115 +
1.116 def create():
1.117
1.118 """
1.119 @@ -117,4 +184,11 @@
1.120 else:
1.121 return channel
1.122
1.123 +# Define and install a handler which waits for terminated child processes.
1.124 +
1.125 +def handler(number, frame):
1.126 + os.wait()
1.127 +
1.128 +signal(SIGCHLD, handler)
1.129 +
1.130 # vim: tabstop=4 expandtab shiftwidth=4