1.1 --- a/pprocess.py Thu Jun 04 22:28:43 2009 +0200
1.2 +++ b/pprocess.py Fri Jun 05 01:46:07 2009 +0200
1.3 @@ -236,6 +236,10 @@
1.4 A communications exchange that can be used to detect channels which are
1.5 ready to communicate. Subclasses of this class can define the 'store_data'
1.6 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
1.7 +
1.8 + Once exchanges are populated with active channels, use of the principal
1.9 + methods of the exchange typically cause the 'store' method to be invoked,
1.10 + resulting in the processing of any incoming data.
1.11 """
1.12
1.13 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
1.14 @@ -275,6 +279,8 @@
1.15 for channel in channels or []:
1.16 self.add(channel)
1.17
1.18 + # Core methods, registering and reporting on channels.
1.19 +
1.20 def add(self, channel):
1.21
1.22 "Add the given 'channel' to the exchange."
1.23 @@ -292,9 +298,9 @@
1.24 def ready(self, timeout=None):
1.25
1.26 """
1.27 - Wait for a period of time specified by the optional 'timeout' (or until
1.28 - communication is possible) and return a list of channels which are ready
1.29 - to be read from.
1.30 + Wait for a period of time specified by the optional 'timeout' in
1.31 + milliseconds (or until communication is possible) and return a list of
1.32 + channels which are ready to be read from.
1.33 """
1.34
1.35 fds = self.poller.poll(timeout)
1.36 @@ -335,6 +341,18 @@
1.37
1.38 # Enhanced exchange methods involving channel limits.
1.39
1.40 + def unfinished(self):
1.41 +
1.42 + "Return whether the exchange still has work scheduled or in progress."
1.43 +
1.44 + return self.active() or self.waiting
1.45 +
1.46 + def busy(self):
1.47 +
1.48 + "Return whether the exchange uses as many channels as it is allowed to."
1.49 +
1.50 + return self.limit is not None and len(self.active()) >= self.limit
1.51 +
1.52 def add_wait(self, channel):
1.53
1.54 """
1.55 @@ -354,9 +372,51 @@
1.56
1.57 # If limited, block until channels have been closed.
1.58
1.59 - while self.limit is not None and len(self.active()) >= self.limit:
1.60 + while self.busy():
1.61 + self.store()
1.62 +
1.63 + def finish(self):
1.64 +
1.65 + """
1.66 + Finish the use of the exchange by waiting for all channels to complete.
1.67 + """
1.68 +
1.69 + while self.unfinished():
1.70 self.store()
1.71
1.72 + def store(self, timeout=None):
1.73 +
1.74 + """
1.75 + For each ready channel, process the incoming data. If the optional
1.76 + 'timeout' parameter (a duration in milliseconds) is specified, wait only
1.77 + for the specified duration if no channels are ready to provide data.
1.78 + """
1.79 +
1.80 + # Either process input from active channels.
1.81 +
1.82 + if self.active():
1.83 + for channel in self.ready(timeout):
1.84 + self.store_data(channel)
1.85 + self.start_waiting(channel)
1.86 +
1.87 + # Or schedule new processes and channels.
1.88 +
1.89 + else:
1.90 + while self.waiting and not self.busy():
1.91 + callable, args, kw = self.waiting.pop()
1.92 + self.start(callable, *args, **kw)
1.93 +
1.94 + def store_data(self, channel):
1.95 +
1.96 + """
1.97 + Store incoming data from the specified 'channel'. In subclasses of this
1.98 + class, such data could be stored using instance attributes.
1.99 + """
1.100 +
1.101 + raise NotImplementedError, "store_data"
1.102 +
1.103 + # Support for the convenience methods.
1.104 +
1.105 def _get_waiting(self, channel):
1.106
1.107 """
1.108 @@ -364,7 +424,12 @@
1.109 the reception of data on the given 'channel'.
1.110 """
1.111
1.112 - if self.waiting:
1.113 + # For continuous channels, no scheduling is requested.
1.114 +
1.115 + if self.waiting and not self.continuous:
1.116 +
1.117 + # Schedule this callable and arguments.
1.118 +
1.119 callable, args, kw = self.waiting.pop()
1.120
1.121 # Try and reuse existing channels if possible.
1.122 @@ -377,12 +442,6 @@
1.123 self.add(channel)
1.124 channel.send((args, kw))
1.125
1.126 - # For continuous channels, no action is taken on the channel or on
1.127 - # new callable information.
1.128 -
1.129 - elif self.continuous:
1.130 - return None
1.131 -
1.132 else:
1.133 return callable, args, kw
1.134
1.135 @@ -394,34 +453,6 @@
1.136
1.137 return None
1.138
1.139 - def finish(self):
1.140 -
1.141 - """
1.142 - Finish the use of the exchange by waiting for all channels to complete.
1.143 - """
1.144 -
1.145 - while self.active():
1.146 - self.store()
1.147 -
1.148 - def store(self):
1.149 -
1.150 - "For each ready channel, process the incoming data."
1.151 -
1.152 - for channel in self.ready():
1.153 - self.store_data(channel)
1.154 - self.start_waiting(channel)
1.155 -
1.156 - def store_data(self, channel):
1.157 -
1.158 - """
1.159 - Store incoming data from the specified 'channel'. In subclasses of this
1.160 - class, such data could be stored using instance attributes.
1.161 - """
1.162 -
1.163 - raise NotImplementedError, "store_data"
1.164 -
1.165 - # Support for the convenience methods.
1.166 -
1.167 def _set_waiting(self, callable, args, kw):
1.168
1.169 """
1.170 @@ -429,7 +460,7 @@
1.171 been queued for later invocation.
1.172 """
1.173
1.174 - if self.limit is not None and len(self.active()) >= self.limit:
1.175 + if self.busy():
1.176 self.waiting.insert(0, (callable, args, kw))
1.177 return 1
1.178 else:
1.179 @@ -724,7 +755,7 @@
1.180 except IndexError:
1.181 pass
1.182
1.183 - while self.active():
1.184 + while self.unfinished():
1.185 self.store()
1.186 try:
1.187 return self._next()
1.188 @@ -742,7 +773,7 @@
1.189 except IndexError:
1.190 pass
1.191
1.192 - while self.active():
1.193 + while self.unfinished():
1.194 self.store()
1.195 try:
1.196 return self._get(i)
1.197 @@ -791,13 +822,20 @@
1.198
1.199 if self.queue:
1.200 return self.queue.pop()
1.201 - while self.active():
1.202 +
1.203 + while self.unfinished():
1.204 self.store()
1.205 if self.queue:
1.206 return self.queue.pop()
1.207 else:
1.208 raise StopIteration
1.209
1.210 + def __len__(self):
1.211 +
1.212 + "Return the current length of the queue."
1.213 +
1.214 + return len(self.queue)
1.215 +
1.216 class MakeParallel:
1.217
1.218 "A wrapper around functions making them able to communicate results."