1.1 --- a/pprocess.py Sat Sep 15 15:57:34 2007 +0000
1.2 +++ b/pprocess.py Sat Sep 15 19:41:09 2007 +0000
1.3 @@ -578,6 +578,18 @@
1.4
1.5 "An exchange which can be used like the built-in 'map' function."
1.6
1.7 + def __init__(self, *args, **kw):
1.8 + Exchange.__init__(self, *args, **kw)
1.9 + self.init()
1.10 +
1.11 + def init(self):
1.12 +
1.13 + "Remember the channel addition order to order output."
1.14 +
1.15 + self.channel_number = 0
1.16 + self.channels = {}
1.17 + self.results = []
1.18 +
1.19 def add(self, channel):
1.20
1.21 "Add the given 'channel' to the exchange."
1.22 @@ -586,6 +598,17 @@
1.23 self.channels[channel] = self.channel_number
1.24 self.channel_number += 1
1.25
1.26 + def start(self, callable, *args, **kw):
1.27 +
1.28 + """
1.29 + Using pprocess.start, create a new process for the given 'callable'
1.30 + using any additional arguments provided. Then, monitor the channel
1.31 + created between this process and the created process.
1.32 + """
1.33 +
1.34 + self.results.append(None) # placeholder
1.35 + Exchange.start(self, callable, *args, **kw)
1.36 +
1.37 def __call__(self, callable, sequence):
1.38
1.39 "Wrap and invoke 'callable' for each element in the 'sequence'."
1.40 @@ -595,21 +618,24 @@
1.41 else:
1.42 wrapped = callable
1.43
1.44 - # Remember the channel addition order to order output.
1.45 + self.init()
1.46
1.47 - self.channel_number = 0
1.48 - self.channels = {}
1.49 - self.results = []
1.50 + # Start processes for each element in the sequence.
1.51
1.52 for i in sequence:
1.53 - self.results.append(None) # placeholder
1.54 self.start(wrapped, i)
1.55 - self.finish()
1.56 +
1.57 + # Access to the results occurs through this object.
1.58 +
1.59 + return self
1.60
1.61 - # NOTE: Could return results as they arrive, but we would then need to
1.62 - # NOTE: return the position of each result in the original sequence.
1.63 + def __getitem__(self, i):
1.64 + self.finish()
1.65 + return self.results[i]
1.66
1.67 - return self.results
1.68 + def __iter__(self):
1.69 + self.finish()
1.70 + return iter(self.results)
1.71
1.72 def store_data(self, channel):
1.73