# HG changeset patch # User paulb # Date 1128010890 0 # Node ID db417142f29dfb0fb3ed5d2cd8e31ca30a265ebd # Parent ab1476a5606eb46ecfa445fcc6ac557a8cc63962 [project @ 2005-09-29 16:21:30 by paulb] Introduced a window filling mechanism so that the exchange always has an optimal number of channels to monitor. Removed the waitall calls since the exchange should now tidy up created processes. diff -r ab1476a5606e -r db417142f29d tests/start_indexer.py --- a/tests/start_indexer.py Thu Sep 29 16:20:06 2005 +0000 +++ b/tests/start_indexer.py Thu Sep 29 16:21:30 2005 +0000 @@ -1,7 +1,8 @@ #!/usr/bin/env python -from parallel import start, Exchange, waitall +from parallel import start, Exchange from Dict import Indexer, Searcher, Parser +import os def apply_parser(channel, filename): print "Parsing", filename @@ -9,31 +10,31 @@ parser.send_entries(channel) print "Done", filename +def fill_window(filenames, i, window_size, exchange): + limit = min(len(filenames), i + window_size - len(exchange.active())) + while i < limit: + channel = start(apply_parser, filenames[i]) + exchange.add(channel) + i = i + 1 + return i + def get_searcher(filenames, window_size): master_index = Indexer() - # Start indexing. + # Start indexing by filling a window with channels. - for i in range(0, len(filenames), window_size): - channels = [] - for filename in filenames[i:i + window_size]: - channels.append(start(apply_parser, filename)) + exchange = Exchange() + i = fill_window(filenames, 0, window_size, exchange) - # Start listening for responses. + # Start listening for responses. - exchange = Exchange(channels) - while exchange.active(): - #print "Waiting for %d channels..." % len(exchange.active()) - for channel in exchange.ready(): - entry = channel.receive() - master_index.add_entry(entry) + while exchange.active(): + print "Waiting for %d channels..." % len(exchange.active()) + for channel in exchange.ready(): + entry = channel.receive() + master_index.add_entry(entry) - # Tidy up as we go. - - for channel in channels: - channel.close() - - waitall() + i = fill_window(filenames, i, window_size, exchange) # Provide a search interface. @@ -54,7 +55,6 @@ # Get a searcher using an index built in parallel. searcher = get_searcher(filenames, window_size) - waitall() # Present a user interface.