1.1 --- a/pprocess.py Tue Oct 20 23:16:46 2015 +0200
1.2 +++ b/pprocess.py Tue Oct 04 00:16:09 2016 +0200
1.3 @@ -4,7 +4,8 @@
1.4 A simple parallel processing API for Python, inspired somewhat by the thread
1.5 module, slightly less by pypar, and slightly less still by pypvm.
1.6
1.7 -Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013 Paul Boddie <paul@boddie.org.uk>
1.8 +Copyright (C) 2005, 2006, 2007, 2008, 2009, 2013, 2014,
1.9 + 2016 Paul Boddie <paul@boddie.org.uk>
1.10 Copyright (C) 2013 Yaroslav Halchenko <debian@onerussian.com>
1.11
1.12 This program is free software; you can redistribute it and/or modify it under
1.13 @@ -28,7 +29,9 @@
1.14 import select
1.15 import socket
1.16 import platform
1.17 +import errno
1.18
1.19 +from time import sleep
1.20 from warnings import warn
1.21
1.22 try:
1.23 @@ -402,16 +405,22 @@
1.24 try:
1.25 self.store_data(channel)
1.26 self.start_waiting(channel)
1.27 - except IOError, exc:
1.28 + except (IOError, OSError), exc:
1.29 self.remove(channel)
1.30 - warn("Removed channel %r due to IOError: %s" % (channel, exc))
1.31 + warn("Removed channel %r due to exception: %s" % (channel, exc))
1.32
1.33 # Or schedule new processes and channels.
1.34
1.35 else:
1.36 while self.waiting and not self.busy():
1.37 - callable, args, kw = self.waiting.pop()
1.38 - self.start(callable, *args, **kw)
1.39 + details = self.waiting.pop()
1.40 +
1.41 + # Stop actively scheduling if resources are exhausted.
1.42 +
1.43 + if not self.start_new_waiting(details):
1.44 + if not self.active():
1.45 + sleep(1)
1.46 + break
1.47
1.48 def store_data(self, channel):
1.49
1.50 @@ -449,6 +458,8 @@
1.51 self.add(channel)
1.52 channel.send((args, kw))
1.53
1.54 + # Return the details for a new channel.
1.55 +
1.56 else:
1.57 return callable, args, kw
1.58
1.59 @@ -496,9 +507,31 @@
1.60 """
1.61
1.62 details = self._get_waiting(channel)
1.63 +
1.64 if details is not None:
1.65 - callable, args, kw = details
1.66 - self.add(start(callable, *args, **kw))
1.67 + self.start_new_waiting(details)
1.68 +
1.69 + def start_new_waiting(self, details):
1.70 +
1.71 + """
1.72 + Start a waiting process with the given 'details', obtaining a new
1.73 + channel.
1.74 + """
1.75 +
1.76 + callable, args, kw = details
1.77 + channel = self._start(callable, *args, **kw)
1.78 +
1.79 + # Monitor any newly-created process.
1.80 +
1.81 + if channel:
1.82 + self.add(channel)
1.83 + return True
1.84 +
1.85 + # Push the details back onto the end of the waiting list.
1.86 +
1.87 + else:
1.88 + self.waiting.append(details)
1.89 + return False
1.90
1.91 # Convenience methods.
1.92
1.93 @@ -511,9 +544,21 @@
1.94 """
1.95
1.96 if self._set_waiting(callable, args, kw):
1.97 - return
1.98 + return False
1.99 +
1.100 + channel = self._start(callable, *args, **kw)
1.101 +
1.102 + # Monitor any newly-created process.
1.103
1.104 - self.add_wait(start(callable, *args, **kw))
1.105 + if channel:
1.106 + self.add_wait(channel)
1.107 + return True
1.108 +
1.109 + # Otherwise, add the details to the waiting list unconditionally.
1.110 +
1.111 + else:
1.112 + self.waiting.insert(0, (callable, args, kw))
1.113 + return False
1.114
1.115 def create(self):
1.116
1.117 @@ -537,6 +582,22 @@
1.118
1.119 return ManagedCallable(callable, self)
1.120
1.121 + def _start(self, callable, *args, **kw):
1.122 +
1.123 + """
1.124 + Create a new process for the given 'callable' using any additional
1.125 + arguments provided. Return any successfully created channel or None if
1.126 + no process could be created at the present time.
1.127 + """
1.128 +
1.129 + try:
1.130 + return start(callable, *args, **kw)
1.131 + except OSError, exc:
1.132 + if exc.errno != errno.EAGAIN:
1.133 + raise
1.134 + else:
1.135 + return None
1.136 +
1.137 class Persistent:
1.138
1.139 """