1.1 --- a/README.txt Thu Jun 04 22:28:43 2009 +0200
1.2 +++ b/README.txt Fri Jun 05 01:46:07 2009 +0200
1.3 @@ -75,6 +75,14 @@
1.4
1.5 PYTHONPATH=. python examples/simple_managed_map_reusable.py
1.6
1.7 +Continuous Process Communications
1.8 +---------------------------------
1.9 +
1.10 +Another example not listed above, simple_continuous_queue.py, employs
1.11 +continuous communications to monitor output from created processes:
1.12 +
1.13 +PYTHONPATH=. python examples/simple_continuous_queue.py
1.14 +
1.15 Persistent Processes
1.16 --------------------
1.17
1.18 @@ -104,6 +112,12 @@
1.19 (This should produce a file called test.tif - a TIFF file containing a
1.20 raytraced scene image.)
1.21
1.22 +Examples from the Concurrency SIG
1.23 +---------------------------------
1.24 +
1.25 +PYTHONPATH=. python examples/concurrency-sig/bottles.py
1.26 +PYTHONPATH=. python examples/concurrency-sig/bottles_heartbeat.py
1.27 +
1.28 Test Programs
1.29 -------------
1.30
1.31 @@ -157,6 +171,8 @@
1.32 from completed parts of the sequence of inputs, also adding an iteration
1.33 interface.
1.34 * Added an example, simple_pmap_iter.py, to demonstrate iteration over maps.
1.35 + * Added proper support in the Exchange class for continuous communications
1.36 + between processes.
1.37
1.38 New in pprocess 0.4 (Changes since pprocess 0.3.1)
1.39 --------------------------------------------------
2.1 --- a/docs/reference.html Thu Jun 04 22:28:43 2009 +0200
2.2 +++ b/docs/reference.html Fri Jun 05 01:46:07 2009 +0200
2.3 @@ -333,6 +333,32 @@
2.4 <p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but
2.5 provides the necessary mechanisms described above for reusable code.</p>
2.6
2.7 +<h2 id="Continuous">Continuous Processes and Channels</h2>
2.8 +
2.9 +<p>Much of the usage of exchanges so far has concentrated on processes which
2.10 +are created, whose callables are invoked, and then, once those callables have
2.11 +returned, either they are invoked again in the same process (when reused) or
2.12 +in a new process (when not reused). However, the underlying mechanisms
2.13 +actually support processes whose callables not only receive input at the start
2.14 +of their execution and send output at the end of their execution, but may
2.15 +provide output on a continuous basis (similar to iterator or generator
2.16 +objects).</p>
2.17 +
2.18 +<p>To enable support for continuous communications between processes, a
2.19 +keyword argument must be specified when creating an <code>Exchange</code>
2.20 +instance (or an instance of a subclass of <code>Exchange</code> such as
2.21 +<code>Map</code> or <code>Queue</code>):</p>
2.22 +
2.23 +<pre>
2.24 +exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes
2.25 +</pre>
2.26 +
2.27 +<p>Code invoked in this mode of communication must be aware of channels, since
2.28 +it will need to explicitly send data via a channel to the creating process,
2.29 +instead of terminating and sending data only once (as would be done
2.30 +automatically using convenience classes such as
2.31 +<code>MakeParallel</code>).</p>
2.32 +
2.33 <h2 id="BackgroundCallable">Background Processes and Callables</h2>
2.34
2.35 <p>So far, all parallel computations have involved created processes which
3.1 --- a/docs/tutorial.html Thu Jun 04 22:28:43 2009 +0200
3.2 +++ b/docs/tutorial.html Fri Jun 05 01:46:07 2009 +0200
3.3 @@ -21,6 +21,7 @@
3.4 <li><a href="#Queue">Converting Arbitrarily-Ordered Invocations</a></li>
3.5 <li><a href="#create">Converting Inline Computations</a></li>
3.6 <li><a href="#MakeReusable">Reusing Processes in Parallel Programs</a></li>
3.7 +<li><a href="#continuous">Supporting Continuous Processes in Parallel Programs</a></li>
3.8 <li><a href="#BackgroundCallable">Performing Computations in Background Processes</a></li>
3.9 <li><a href="#ManagingBackgroundProcesses">Managing Several Background Processes</a></li>
3.10 <li><a href="#Summary">Summary</a></li>
3.11 @@ -713,6 +714,10 @@
3.12 be collected by the queue: a list containing all of the results produced in the
3.13 computation.</p>
3.14
3.15 +<h2 id="continuous">Supporting Continuous Processes in Parallel Programs</h2>
3.16 +
3.17 +<p><strong>To be written.</strong></p>
3.18 +
3.19 <h2 id="ManagingBackgroundProcesses">Managing Several Background Processes</h2>
3.20
3.21 <p>In the above example, a single background process was used to manage a number
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
4.2 +++ b/examples/concurrency-sig/bottles_heartbeat.py Fri Jun 05 01:46:07 2009 +0200
4.3 @@ -0,0 +1,59 @@
4.4 +#!/usr/bin/env python
4.5 +
4.6 +"""
4.7 +Example of concurrency when reading files.
4.8 +
4.9 +See: http://wiki.python.org/moin/Concurrency/99Bottles
4.10 +
4.11 +Based on the generator version.
4.12 +"""
4.13 +
4.14 +import pprocess
4.15 +import time
4.16 +import re
4.17 +
4.18 +def follow(ch, fname):
4.19 + f = file(fname)
4.20 + f.seek(0,2) # go to the end
4.21 + while True:
4.22 + l = f.readline()
4.23 + if not l: # no data
4.24 + time.sleep(.1)
4.25 + else:
4.26 + ch.send(l)
4.27 +
4.28 +def grep(ch, lines, pattern):
4.29 + regex = re.compile(pattern)
4.30 + for l in lines:
4.31 + if regex.match(l):
4.32 + ch.send(l)
4.33 +
4.34 +def printer(lines, delay=5000):
4.35 + while 1:
4.36 + lines.store(delay)
4.37 + if len(lines) > 0:
4.38 + l = lines.next()
4.39 + print l.strip()
4.40 + else:
4.41 + print "No input received for %d ms." % delay
4.42 +
4.43 +def multigrep(ch, pattern):
4.44 + queue = pprocess.Queue(continuous=1)
4.45 + multifollow = queue.manage(follow)
4.46 + multifollow('test') # '/var/log/system.log'
4.47 + multifollow('test2')
4.48 + multifollow('test3')
4.49 +
4.50 + # Handle incoming lines using the specified pattern.
4.51 + grep(ch, queue, pattern)
4.52 +
4.53 +# Permit multiple simultaneous grep activities.
4.54 +queue = pprocess.Queue(continuous=1)
4.55 +multigrep = queue.manage(multigrep)
4.56 +multigrep(".*pants.*")
4.57 +multigrep(".*trousers.*")
4.58 +multigrep(".*shorts.*")
4.59 +
4.60 +p = printer(queue)
4.61 +
4.62 +# vim: tabstop=4 expandtab shiftwidth=4
5.1 --- a/examples/simple_managed_queue.py Thu Jun 04 22:28:43 2009 +0200
5.2 +++ b/examples/simple_managed_queue.py Fri Jun 05 01:46:07 2009 +0200
5.3 @@ -14,7 +14,7 @@
5.4 limit = 10
5.5 delay = 1
5.6
5.7 -# Work function and monitoring class.
5.8 +# Work function.
5.9
5.10 def calculate(i, j):
5.11
6.1 --- a/pprocess.py Thu Jun 04 22:28:43 2009 +0200
6.2 +++ b/pprocess.py Fri Jun 05 01:46:07 2009 +0200
6.3 @@ -236,6 +236,10 @@
6.4 A communications exchange that can be used to detect channels which are
6.5 ready to communicate. Subclasses of this class can define the 'store_data'
6.6 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
6.7 +
6.8 + Once exchanges are populated with active channels, use of the principal
6.9 + methods of the exchange typically cause the 'store' method to be invoked,
6.10 + resulting in the processing of any incoming data.
6.11 """
6.12
6.13 def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
6.14 @@ -275,6 +279,8 @@
6.15 for channel in channels or []:
6.16 self.add(channel)
6.17
6.18 + # Core methods, registering and reporting on channels.
6.19 +
6.20 def add(self, channel):
6.21
6.22 "Add the given 'channel' to the exchange."
6.23 @@ -292,9 +298,9 @@
6.24 def ready(self, timeout=None):
6.25
6.26 """
6.27 - Wait for a period of time specified by the optional 'timeout' (or until
6.28 - communication is possible) and return a list of channels which are ready
6.29 - to be read from.
6.30 + Wait for a period of time specified by the optional 'timeout' in
6.31 + milliseconds (or until communication is possible) and return a list of
6.32 + channels which are ready to be read from.
6.33 """
6.34
6.35 fds = self.poller.poll(timeout)
6.36 @@ -335,6 +341,18 @@
6.37
6.38 # Enhanced exchange methods involving channel limits.
6.39
6.40 + def unfinished(self):
6.41 +
6.42 + "Return whether the exchange still has work scheduled or in progress."
6.43 +
6.44 + return self.active() or self.waiting
6.45 +
6.46 + def busy(self):
6.47 +
6.48 + "Return whether the exchange uses as many channels as it is allowed to."
6.49 +
6.50 + return self.limit is not None and len(self.active()) >= self.limit
6.51 +
6.52 def add_wait(self, channel):
6.53
6.54 """
6.55 @@ -354,9 +372,51 @@
6.56
6.57 # If limited, block until channels have been closed.
6.58
6.59 - while self.limit is not None and len(self.active()) >= self.limit:
6.60 + while self.busy():
6.61 + self.store()
6.62 +
6.63 + def finish(self):
6.64 +
6.65 + """
6.66 + Finish the use of the exchange by waiting for all channels to complete.
6.67 + """
6.68 +
6.69 + while self.unfinished():
6.70 self.store()
6.71
6.72 + def store(self, timeout=None):
6.73 +
6.74 + """
6.75 + For each ready channel, process the incoming data. If the optional
6.76 + 'timeout' parameter (a duration in milliseconds) is specified, wait only
6.77 + for the specified duration if no channels are ready to provide data.
6.78 + """
6.79 +
6.80 + # Either process input from active channels.
6.81 +
6.82 + if self.active():
6.83 + for channel in self.ready(timeout):
6.84 + self.store_data(channel)
6.85 + self.start_waiting(channel)
6.86 +
6.87 + # Or schedule new processes and channels.
6.88 +
6.89 + else:
6.90 + while self.waiting and not self.busy():
6.91 + callable, args, kw = self.waiting.pop()
6.92 + self.start(callable, *args, **kw)
6.93 +
6.94 + def store_data(self, channel):
6.95 +
6.96 + """
6.97 + Store incoming data from the specified 'channel'. In subclasses of this
6.98 + class, such data could be stored using instance attributes.
6.99 + """
6.100 +
6.101 + raise NotImplementedError, "store_data"
6.102 +
6.103 + # Support for the convenience methods.
6.104 +
6.105 def _get_waiting(self, channel):
6.106
6.107 """
6.108 @@ -364,7 +424,12 @@
6.109 the reception of data on the given 'channel'.
6.110 """
6.111
6.112 - if self.waiting:
6.113 + # For continuous channels, no scheduling is requested.
6.114 +
6.115 + if self.waiting and not self.continuous:
6.116 +
6.117 + # Schedule this callable and arguments.
6.118 +
6.119 callable, args, kw = self.waiting.pop()
6.120
6.121 # Try and reuse existing channels if possible.
6.122 @@ -377,12 +442,6 @@
6.123 self.add(channel)
6.124 channel.send((args, kw))
6.125
6.126 - # For continuous channels, no action is taken on the channel or on
6.127 - # new callable information.
6.128 -
6.129 - elif self.continuous:
6.130 - return None
6.131 -
6.132 else:
6.133 return callable, args, kw
6.134
6.135 @@ -394,34 +453,6 @@
6.136
6.137 return None
6.138
6.139 - def finish(self):
6.140 -
6.141 - """
6.142 - Finish the use of the exchange by waiting for all channels to complete.
6.143 - """
6.144 -
6.145 - while self.active():
6.146 - self.store()
6.147 -
6.148 - def store(self):
6.149 -
6.150 - "For each ready channel, process the incoming data."
6.151 -
6.152 - for channel in self.ready():
6.153 - self.store_data(channel)
6.154 - self.start_waiting(channel)
6.155 -
6.156 - def store_data(self, channel):
6.157 -
6.158 - """
6.159 - Store incoming data from the specified 'channel'. In subclasses of this
6.160 - class, such data could be stored using instance attributes.
6.161 - """
6.162 -
6.163 - raise NotImplementedError, "store_data"
6.164 -
6.165 - # Support for the convenience methods.
6.166 -
6.167 def _set_waiting(self, callable, args, kw):
6.168
6.169 """
6.170 @@ -429,7 +460,7 @@
6.171 been queued for later invocation.
6.172 """
6.173
6.174 - if self.limit is not None and len(self.active()) >= self.limit:
6.175 + if self.busy():
6.176 self.waiting.insert(0, (callable, args, kw))
6.177 return 1
6.178 else:
6.179 @@ -724,7 +755,7 @@
6.180 except IndexError:
6.181 pass
6.182
6.183 - while self.active():
6.184 + while self.unfinished():
6.185 self.store()
6.186 try:
6.187 return self._next()
6.188 @@ -742,7 +773,7 @@
6.189 except IndexError:
6.190 pass
6.191
6.192 - while self.active():
6.193 + while self.unfinished():
6.194 self.store()
6.195 try:
6.196 return self._get(i)
6.197 @@ -791,13 +822,20 @@
6.198
6.199 if self.queue:
6.200 return self.queue.pop()
6.201 - while self.active():
6.202 +
6.203 + while self.unfinished():
6.204 self.store()
6.205 if self.queue:
6.206 return self.queue.pop()
6.207 else:
6.208 raise StopIteration
6.209
6.210 + def __len__(self):
6.211 +
6.212 + "Return the current length of the queue."
6.213 +
6.214 + return len(self.queue)
6.215 +
6.216 class MakeParallel:
6.217
6.218 "A wrapper around functions making them able to communicate results."