# HG changeset patch # User Paul Boddie # Date 1244147323 -7200 # Node ID 91b47853bf44e4a33e0a93e6344a5aa3a681d778 # Parent a560bcdaca9601ef45c495c7c139d043c2bed497 Added continuous communications support to the Exchange class. Added an example illustrating continuous communications. diff -r a560bcdaca96 -r 91b47853bf44 examples/concurrency-sig/bottles.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/concurrency-sig/bottles.py Thu Jun 04 22:28:43 2009 +0200 @@ -0,0 +1,54 @@ +#!/usr/bin/env python + +""" +Example of concurrency when reading files. + +See: http://wiki.python.org/moin/Concurrency/99Bottles + +Based on the generator version. +""" + +import pprocess +import time +import re + +def follow(ch, fname): + f = file(fname) + f.seek(0,2) # go to the end + while True: + l = f.readline() + if not l: # no data + time.sleep(.1) + else: + ch.send(l) + +def grep(ch, lines, pattern): + regex = re.compile(pattern) + for l in lines: + if regex.match(l): + ch.send(l) + +def printer(lines): + for l in lines: + print l.strip() + +def multigrep(ch, pattern): + queue = pprocess.Queue(continuous=1) + multifollow = queue.manage(follow) + multifollow('test') # '/var/log/system.log' + multifollow('test2') + multifollow('test3') + + # Handle incoming lines using the specified pattern. + grep(ch, queue, pattern) + +# Permit multiple simultaneous grep activities. +queue = pprocess.Queue(continuous=1) +multigrep = queue.manage(multigrep) +multigrep(".*pants.*") +multigrep(".*trousers.*") +multigrep(".*shorts.*") + +p = printer(queue) + +# vim: tabstop=4 expandtab shiftwidth=4 diff -r a560bcdaca96 -r 91b47853bf44 pprocess.py --- a/pprocess.py Thu May 21 19:24:06 2009 +0200 +++ b/pprocess.py Thu Jun 04 22:28:43 2009 +0200 @@ -238,7 +238,7 @@ method in order to enable the 'add_wait', 'wait' and 'finish' methods. """ - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1): + def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1): """ Initialise the exchange with an optional list of 'channels'. @@ -249,7 +249,13 @@ this class and define a working 'store_data' method. If the optional 'reuse' parameter is set to a true value, channels and - processes will be reused for waiting computations. + processes will be reused for waiting computations, but the callable will + be invoked for each computation. + + If the optional 'continuous' parameter is set to a true value, channels + and processes will be retained after receiving data sent from such + processes, since it will be assumed that they will communicate more + data. If the optional 'autoclose' parameter is set to a false value, channels will not be closed automatically when they are removed from the exchange @@ -259,10 +265,13 @@ self.limit = limit self.reuse = reuse self.autoclose = autoclose + self.continuous = continuous + self.waiting = [] self.readables = {} self.removed = [] self.poller = select.poll() + for channel in channels or []: self.add(channel) @@ -367,6 +376,13 @@ self.add(channel) channel.send((args, kw)) + + # For continuous channels, no action is taken on the channel or on + # new callable information. + + elif self.continuous: + return None + else: return callable, args, kw