# HG changeset patch # User Paul Boddie # Date 1244159167 -7200 # Node ID 6a69009a657ab0599b4dc0085f0c4045b0018951 # Parent 91b47853bf44e4a33e0a93e6344a5aa3a681d778 Added support for continuous communications in the Exchange class, tidying various methods and adding new methods for monitoring the state of active channels and waiting tasks. Added a timeout method to the Exchange.store method. Added an example of timeouts and continuous communications. Updated release notes and some of the documentation. diff -r 91b47853bf44 -r 6a69009a657a README.txt --- a/README.txt Thu Jun 04 22:28:43 2009 +0200 +++ b/README.txt Fri Jun 05 01:46:07 2009 +0200 @@ -75,6 +75,14 @@ PYTHONPATH=. python examples/simple_managed_map_reusable.py +Continuous Process Communications +--------------------------------- + +Another example not listed above, simple_continuous_queue.py, employs +continuous communications to monitor output from created processes: + +PYTHONPATH=. python examples/simple_continuous_queue.py + Persistent Processes -------------------- @@ -104,6 +112,12 @@ (This should produce a file called test.tif - a TIFF file containing a raytraced scene image.) +Examples from the Concurrency SIG +--------------------------------- + +PYTHONPATH=. python examples/concurrency-sig/bottles.py +PYTHONPATH=. python examples/concurrency-sig/bottles_heartbeat.py + Test Programs ------------- @@ -157,6 +171,8 @@ from completed parts of the sequence of inputs, also adding an iteration interface. * Added an example, simple_pmap_iter.py, to demonstrate iteration over maps. + * Added proper support in the Exchange class for continuous communications + between processes. New in pprocess 0.4 (Changes since pprocess 0.3.1) -------------------------------------------------- diff -r 91b47853bf44 -r 6a69009a657a docs/reference.html --- a/docs/reference.html Thu Jun 04 22:28:43 2009 +0200 +++ b/docs/reference.html Fri Jun 05 01:46:07 2009 +0200 @@ -333,6 +333,32 @@

This wraps the callable in a similar fashion to MakeParallel, but provides the necessary mechanisms described above for reusable code.

+

Continuous Processes and Channels

+ +

Much of the usage of exchanges so far has concentrated on processes which +are created, whose callables are invoked, and then, once those callables have +returned, either they are invoked again in the same process (when reused) or +in a new process (when not reused). However, the underlying mechanisms +actually support processes whose callables not only receive input at the start +of their execution and send output at the end of their execution, but may +provide output on a continuous basis (similar to iterator or generator +objects).

+ +

To enable support for continuous communications between processes, a +keyword argument must be specified when creating an Exchange +instance (or an instance of a subclass of Exchange such as +Map or Queue):

+ +
+exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes
+
+ +

Code invoked in this mode of communication must be aware of channels, since +it will need to explicitly send data via a channel to the creating process, +instead of terminating and sending data only once (as would be done +automatically using convenience classes such as +MakeParallel).

+

Background Processes and Callables

So far, all parallel computations have involved created processes which diff -r 91b47853bf44 -r 6a69009a657a docs/tutorial.html --- a/docs/tutorial.html Thu Jun 04 22:28:43 2009 +0200 +++ b/docs/tutorial.html Fri Jun 05 01:46:07 2009 +0200 @@ -21,6 +21,7 @@

  • Converting Arbitrarily-Ordered Invocations
  • Converting Inline Computations
  • Reusing Processes in Parallel Programs
  • +
  • Supporting Continuous Processes in Parallel Programs
  • Performing Computations in Background Processes
  • Managing Several Background Processes
  • Summary
  • @@ -713,6 +714,10 @@ be collected by the queue: a list containing all of the results produced in the computation.

    +

    Supporting Continuous Processes in Parallel Programs

    + +

    To be written.

    +

    Managing Several Background Processes

    In the above example, a single background process was used to manage a number diff -r 91b47853bf44 -r 6a69009a657a examples/concurrency-sig/bottles_heartbeat.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/concurrency-sig/bottles_heartbeat.py Fri Jun 05 01:46:07 2009 +0200 @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +""" +Example of concurrency when reading files. + +See: http://wiki.python.org/moin/Concurrency/99Bottles + +Based on the generator version. +""" + +import pprocess +import time +import re + +def follow(ch, fname): + f = file(fname) + f.seek(0,2) # go to the end + while True: + l = f.readline() + if not l: # no data + time.sleep(.1) + else: + ch.send(l) + +def grep(ch, lines, pattern): + regex = re.compile(pattern) + for l in lines: + if regex.match(l): + ch.send(l) + +def printer(lines, delay=5000): + while 1: + lines.store(delay) + if len(lines) > 0: + l = lines.next() + print l.strip() + else: + print "No input received for %d ms." % delay + +def multigrep(ch, pattern): + queue = pprocess.Queue(continuous=1) + multifollow = queue.manage(follow) + multifollow('test') # '/var/log/system.log' + multifollow('test2') + multifollow('test3') + + # Handle incoming lines using the specified pattern. + grep(ch, queue, pattern) + +# Permit multiple simultaneous grep activities. +queue = pprocess.Queue(continuous=1) +multigrep = queue.manage(multigrep) +multigrep(".*pants.*") +multigrep(".*trousers.*") +multigrep(".*shorts.*") + +p = printer(queue) + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r 91b47853bf44 -r 6a69009a657a examples/simple_managed_queue.py --- a/examples/simple_managed_queue.py Thu Jun 04 22:28:43 2009 +0200 +++ b/examples/simple_managed_queue.py Fri Jun 05 01:46:07 2009 +0200 @@ -14,7 +14,7 @@ limit = 10 delay = 1 -# Work function and monitoring class. +# Work function. def calculate(i, j): diff -r 91b47853bf44 -r 6a69009a657a pprocess.py --- a/pprocess.py Thu Jun 04 22:28:43 2009 +0200 +++ b/pprocess.py Fri Jun 05 01:46:07 2009 +0200 @@ -236,6 +236,10 @@ A communications exchange that can be used to detect channels which are ready to communicate. Subclasses of this class can define the 'store_data' method in order to enable the 'add_wait', 'wait' and 'finish' methods. + + Once exchanges are populated with active channels, use of the principal + methods of the exchange typically cause the 'store' method to be invoked, + resulting in the processing of any incoming data. """ def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): @@ -275,6 +279,8 @@ for channel in channels or []: self.add(channel) + # Core methods, registering and reporting on channels. + def add(self, channel): "Add the given 'channel' to the exchange." @@ -292,9 +298,9 @@ def ready(self, timeout=None): """ - Wait for a period of time specified by the optional 'timeout' (or until - communication is possible) and return a list of channels which are ready - to be read from. + Wait for a period of time specified by the optional 'timeout' in + milliseconds (or until communication is possible) and return a list of + channels which are ready to be read from. """ fds = self.poller.poll(timeout) @@ -335,6 +341,18 @@ # Enhanced exchange methods involving channel limits. + def unfinished(self): + + "Return whether the exchange still has work scheduled or in progress." + + return self.active() or self.waiting + + def busy(self): + + "Return whether the exchange uses as many channels as it is allowed to." + + return self.limit is not None and len(self.active()) >= self.limit + def add_wait(self, channel): """ @@ -354,9 +372,51 @@ # If limited, block until channels have been closed. - while self.limit is not None and len(self.active()) >= self.limit: + while self.busy(): + self.store() + + def finish(self): + + """ + Finish the use of the exchange by waiting for all channels to complete. + """ + + while self.unfinished(): self.store() + def store(self, timeout=None): + + """ + For each ready channel, process the incoming data. If the optional + 'timeout' parameter (a duration in milliseconds) is specified, wait only + for the specified duration if no channels are ready to provide data. + """ + + # Either process input from active channels. + + if self.active(): + for channel in self.ready(timeout): + self.store_data(channel) + self.start_waiting(channel) + + # Or schedule new processes and channels. + + else: + while self.waiting and not self.busy(): + callable, args, kw = self.waiting.pop() + self.start(callable, *args, **kw) + + def store_data(self, channel): + + """ + Store incoming data from the specified 'channel'. In subclasses of this + class, such data could be stored using instance attributes. + """ + + raise NotImplementedError, "store_data" + + # Support for the convenience methods. + def _get_waiting(self, channel): """ @@ -364,7 +424,12 @@ the reception of data on the given 'channel'. """ - if self.waiting: + # For continuous channels, no scheduling is requested. + + if self.waiting and not self.continuous: + + # Schedule this callable and arguments. + callable, args, kw = self.waiting.pop() # Try and reuse existing channels if possible. @@ -377,12 +442,6 @@ self.add(channel) channel.send((args, kw)) - # For continuous channels, no action is taken on the channel or on - # new callable information. - - elif self.continuous: - return None - else: return callable, args, kw @@ -394,34 +453,6 @@ return None - def finish(self): - - """ - Finish the use of the exchange by waiting for all channels to complete. - """ - - while self.active(): - self.store() - - def store(self): - - "For each ready channel, process the incoming data." - - for channel in self.ready(): - self.store_data(channel) - self.start_waiting(channel) - - def store_data(self, channel): - - """ - Store incoming data from the specified 'channel'. In subclasses of this - class, such data could be stored using instance attributes. - """ - - raise NotImplementedError, "store_data" - - # Support for the convenience methods. - def _set_waiting(self, callable, args, kw): """ @@ -429,7 +460,7 @@ been queued for later invocation. """ - if self.limit is not None and len(self.active()) >= self.limit: + if self.busy(): self.waiting.insert(0, (callable, args, kw)) return 1 else: @@ -724,7 +755,7 @@ except IndexError: pass - while self.active(): + while self.unfinished(): self.store() try: return self._next() @@ -742,7 +773,7 @@ except IndexError: pass - while self.active(): + while self.unfinished(): self.store() try: return self._get(i) @@ -791,13 +822,20 @@ if self.queue: return self.queue.pop() - while self.active(): + + while self.unfinished(): self.store() if self.queue: return self.queue.pop() else: raise StopIteration + def __len__(self): + + "Return the current length of the queue." + + return len(self.queue) + class MakeParallel: "A wrapper around functions making them able to communicate results."