1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/examples/concurrency-sig/bottles.py Thu Jun 04 22:28:43 2009 +0200
1.3 @@ -0,0 +1,54 @@
1.4 +#!/usr/bin/env python
1.5 +
1.6 +"""
1.7 +Example of concurrency when reading files.
1.8 +
1.9 +See: http://wiki.python.org/moin/Concurrency/99Bottles
1.10 +
1.11 +Based on the generator version.
1.12 +"""
1.13 +
1.14 +import pprocess
1.15 +import time
1.16 +import re
1.17 +
1.18 +def follow(ch, fname):
1.19 + f = file(fname)
1.20 + f.seek(0,2) # go to the end
1.21 + while True:
1.22 + l = f.readline()
1.23 + if not l: # no data
1.24 + time.sleep(.1)
1.25 + else:
1.26 + ch.send(l)
1.27 +
1.28 +def grep(ch, lines, pattern):
1.29 + regex = re.compile(pattern)
1.30 + for l in lines:
1.31 + if regex.match(l):
1.32 + ch.send(l)
1.33 +
1.34 +def printer(lines):
1.35 + for l in lines:
1.36 + print l.strip()
1.37 +
1.38 +def multigrep(ch, pattern):
1.39 + queue = pprocess.Queue(continuous=1)
1.40 + multifollow = queue.manage(follow)
1.41 + multifollow('test') # '/var/log/system.log'
1.42 + multifollow('test2')
1.43 + multifollow('test3')
1.44 +
1.45 + # Handle incoming lines using the specified pattern.
1.46 + grep(ch, queue, pattern)
1.47 +
1.48 +# Permit multiple simultaneous grep activities.
1.49 +queue = pprocess.Queue(continuous=1)
1.50 +multigrep = queue.manage(multigrep)
1.51 +multigrep(".*pants.*")
1.52 +multigrep(".*trousers.*")
1.53 +multigrep(".*shorts.*")
1.54 +
1.55 +p = printer(queue)
1.56 +
1.57 +# vim: tabstop=4 expandtab shiftwidth=4
2.1 --- a/pprocess.py Thu May 21 19:24:06 2009 +0200
2.2 +++ b/pprocess.py Thu Jun 04 22:28:43 2009 +0200
2.3 @@ -238,7 +238,7 @@
2.4 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
2.5 """
2.6
2.7 - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
2.8 + def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
2.9
2.10 """
2.11 Initialise the exchange with an optional list of 'channels'.
2.12 @@ -249,7 +249,13 @@
2.13 this class and define a working 'store_data' method.
2.14
2.15 If the optional 'reuse' parameter is set to a true value, channels and
2.16 - processes will be reused for waiting computations.
2.17 + processes will be reused for waiting computations, but the callable will
2.18 + be invoked for each computation.
2.19 +
2.20 + If the optional 'continuous' parameter is set to a true value, channels
2.21 + and processes will be retained after receiving data sent from such
2.22 + processes, since it will be assumed that they will communicate more
2.23 + data.
2.24
2.25 If the optional 'autoclose' parameter is set to a false value, channels
2.26 will not be closed automatically when they are removed from the exchange
2.27 @@ -259,10 +265,13 @@
2.28 self.limit = limit
2.29 self.reuse = reuse
2.30 self.autoclose = autoclose
2.31 + self.continuous = continuous
2.32 +
2.33 self.waiting = []
2.34 self.readables = {}
2.35 self.removed = []
2.36 self.poller = select.poll()
2.37 +
2.38 for channel in channels or []:
2.39 self.add(channel)
2.40
2.41 @@ -367,6 +376,13 @@
2.42
2.43 self.add(channel)
2.44 channel.send((args, kw))
2.45 +
2.46 + # For continuous channels, no action is taken on the channel or on
2.47 + # new callable information.
2.48 +
2.49 + elif self.continuous:
2.50 + return None
2.51 +
2.52 else:
2.53 return callable, args, kw
2.54