# HG changeset patch
# User Paul Boddie This wraps the callable in a similar fashion to Much of the usage of exchanges so far has concentrated on processes which
+are created, whose callables are invoked, and then, once those callables have
+returned, either they are invoked again in the same process (when reused) or
+in a new process (when not reused). However, the underlying mechanisms
+actually support processes whose callables not only receive input at the start
+of their execution and send output at the end of their execution, but may
+provide output on a continuous basis (similar to iterator or generator
+objects). To enable support for continuous communications between processes, a
+keyword argument must be specified when creating an Code invoked in this mode of communication must be aware of channels, since
+it will need to explicitly send data via a channel to the creating process,
+instead of terminating and sending data only once (as would be done
+automatically using convenience classes such as
+ So far, all parallel computations have involved created processes which
diff -r 91b47853bf44 -r 6a69009a657a docs/tutorial.html
--- a/docs/tutorial.html Thu Jun 04 22:28:43 2009 +0200
+++ b/docs/tutorial.html Fri Jun 05 01:46:07 2009 +0200
@@ -21,6 +21,7 @@
MakeParallel
, but
provides the necessary mechanisms described above for reusable code.Continuous Processes and Channels
+
+Exchange
+instance (or an instance of a subclass of Exchange
such as
+Map
or Queue
):
+exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes
+
+
+MakeParallel
).Background Processes and Callables
To be written.
+In the above example, a single background process was used to manage a number diff -r 91b47853bf44 -r 6a69009a657a examples/concurrency-sig/bottles_heartbeat.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/concurrency-sig/bottles_heartbeat.py Fri Jun 05 01:46:07 2009 +0200 @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +""" +Example of concurrency when reading files. + +See: http://wiki.python.org/moin/Concurrency/99Bottles + +Based on the generator version. +""" + +import pprocess +import time +import re + +def follow(ch, fname): + f = file(fname) + f.seek(0,2) # go to the end + while True: + l = f.readline() + if not l: # no data + time.sleep(.1) + else: + ch.send(l) + +def grep(ch, lines, pattern): + regex = re.compile(pattern) + for l in lines: + if regex.match(l): + ch.send(l) + +def printer(lines, delay=5000): + while 1: + lines.store(delay) + if len(lines) > 0: + l = lines.next() + print l.strip() + else: + print "No input received for %d ms." % delay + +def multigrep(ch, pattern): + queue = pprocess.Queue(continuous=1) + multifollow = queue.manage(follow) + multifollow('test') # '/var/log/system.log' + multifollow('test2') + multifollow('test3') + + # Handle incoming lines using the specified pattern. + grep(ch, queue, pattern) + +# Permit multiple simultaneous grep activities. +queue = pprocess.Queue(continuous=1) +multigrep = queue.manage(multigrep) +multigrep(".*pants.*") +multigrep(".*trousers.*") +multigrep(".*shorts.*") + +p = printer(queue) + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 91b47853bf44 -r 6a69009a657a examples/simple_managed_queue.py --- a/examples/simple_managed_queue.py Thu Jun 04 22:28:43 2009 +0200 +++ b/examples/simple_managed_queue.py Fri Jun 05 01:46:07 2009 +0200 @@ -14,7 +14,7 @@ limit = 10 delay = 1 -# Work function and monitoring class. +# Work function. def calculate(i, j): diff -r 91b47853bf44 -r 6a69009a657a pprocess.py --- a/pprocess.py Thu Jun 04 22:28:43 2009 +0200 +++ b/pprocess.py Fri Jun 05 01:46:07 2009 +0200 @@ -236,6 +236,10 @@ A communications exchange that can be used to detect channels which are ready to communicate. Subclasses of this class can define the 'store_data' method in order to enable the 'add_wait', 'wait' and 'finish' methods. + + Once exchanges are populated with active channels, use of the principal + methods of the exchange typically cause the 'store' method to be invoked, + resulting in the processing of any incoming data. """ def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): @@ -275,6 +279,8 @@ for channel in channels or []: self.add(channel) + # Core methods, registering and reporting on channels. + def add(self, channel): "Add the given 'channel' to the exchange." @@ -292,9 +298,9 @@ def ready(self, timeout=None): """ - Wait for a period of time specified by the optional 'timeout' (or until - communication is possible) and return a list of channels which are ready - to be read from. + Wait for a period of time specified by the optional 'timeout' in + milliseconds (or until communication is possible) and return a list of + channels which are ready to be read from. """ fds = self.poller.poll(timeout) @@ -335,6 +341,18 @@ # Enhanced exchange methods involving channel limits. + def unfinished(self): + + "Return whether the exchange still has work scheduled or in progress." + + return self.active() or self.waiting + + def busy(self): + + "Return whether the exchange uses as many channels as it is allowed to." + + return self.limit is not None and len(self.active()) >= self.limit + def add_wait(self, channel): """ @@ -354,9 +372,51 @@ # If limited, block until channels have been closed. - while self.limit is not None and len(self.active()) >= self.limit: + while self.busy(): + self.store() + + def finish(self): + + """ + Finish the use of the exchange by waiting for all channels to complete. + """ + + while self.unfinished(): self.store() + def store(self, timeout=None): + + """ + For each ready channel, process the incoming data. If the optional + 'timeout' parameter (a duration in milliseconds) is specified, wait only + for the specified duration if no channels are ready to provide data. + """ + + # Either process input from active channels. + + if self.active(): + for channel in self.ready(timeout): + self.store_data(channel) + self.start_waiting(channel) + + # Or schedule new processes and channels. + + else: + while self.waiting and not self.busy(): + callable, args, kw = self.waiting.pop() + self.start(callable, *args, **kw) + + def store_data(self, channel): + + """ + Store incoming data from the specified 'channel'. In subclasses of this + class, such data could be stored using instance attributes. + """ + + raise NotImplementedError, "store_data" + + # Support for the convenience methods. + def _get_waiting(self, channel): """ @@ -364,7 +424,12 @@ the reception of data on the given 'channel'. """ - if self.waiting: + # For continuous channels, no scheduling is requested. + + if self.waiting and not self.continuous: + + # Schedule this callable and arguments. + callable, args, kw = self.waiting.pop() # Try and reuse existing channels if possible. @@ -377,12 +442,6 @@ self.add(channel) channel.send((args, kw)) - # For continuous channels, no action is taken on the channel or on - # new callable information. - - elif self.continuous: - return None - else: return callable, args, kw @@ -394,34 +453,6 @@ return None - def finish(self): - - """ - Finish the use of the exchange by waiting for all channels to complete. - """ - - while self.active(): - self.store() - - def store(self): - - "For each ready channel, process the incoming data." - - for channel in self.ready(): - self.store_data(channel) - self.start_waiting(channel) - - def store_data(self, channel): - - """ - Store incoming data from the specified 'channel'. In subclasses of this - class, such data could be stored using instance attributes. - """ - - raise NotImplementedError, "store_data" - - # Support for the convenience methods. - def _set_waiting(self, callable, args, kw): """ @@ -429,7 +460,7 @@ been queued for later invocation. """ - if self.limit is not None and len(self.active()) >= self.limit: + if self.busy(): self.waiting.insert(0, (callable, args, kw)) return 1 else: @@ -724,7 +755,7 @@ except IndexError: pass - while self.active(): + while self.unfinished(): self.store() try: return self._next() @@ -742,7 +773,7 @@ except IndexError: pass - while self.active(): + while self.unfinished(): self.store() try: return self._get(i) @@ -791,13 +822,20 @@ if self.queue: return self.queue.pop() - while self.active(): + + while self.unfinished(): self.store() if self.queue: return self.queue.pop() else: raise StopIteration + def __len__(self): + + "Return the current length of the queue." + + return len(self.queue) + class MakeParallel: "A wrapper around functions making them able to communicate results."