# HG changeset patch # User paulb # Date 1128015053 0 # Node ID 758590a372127012404422bd6eda415b84f24333 # Parent c0e6ca347c6b3dbfd3ccfadcc92669fd91c0ae8d [project @ 2005-09-29 17:30:53 by paulb] Changed the indexing demonstration to accept various other arguments, with the first argument being the directory containing files to index. Made the code observe these new parameters, using them in conjunction with the new Indexer functionality. Changed the Indexer to process many files and to impose a delay after each file if specified. diff -r c0e6ca347c6b -r 758590a37212 tests/Dict.py --- a/tests/Dict.py Thu Sep 29 17:28:37 2005 +0000 +++ b/tests/Dict.py Thu Sep 29 17:30:53 2005 +0000 @@ -3,17 +3,19 @@ "A simple file indexer." import codecs +import time class Parser: - def __init__(self, dict_location, encoding=None): - self.dict_location = dict_location + def __init__(self, filenames, encoding=None, delay=None): + self.filenames = filenames self.encoding = encoding + self.delay = delay - def _get_file_content(self): + def _get_file_content(self, filename): if self.encoding is None: - f = open(self.dict_location) + f = open(filename) else: - f = codecs.open(self.dict_location, encoding=self.encoding) + f = codecs.open(filename, encoding=self.encoding) s = f.read() f.close() return s @@ -22,15 +24,21 @@ "Send word entries from the file." - tokens = self._get_file_content().split() - index = {} + for filename in self.filenames: + tokens = self._get_file_content(filename).split() + index = {} - words = [] - for token in tokens: - token = self._strip(token) - if token not in words: - channel.send((token, self.dict_location)) - words.append(token) + words = [] + for token in tokens: + token = self._strip(token) + if token not in words: + channel.send((token, filename)) + words.append(token) + + # Introduce a delay to simulate hard work. + + if self.delay: + time.sleep(self.delay) def _strip(self, token): @@ -56,9 +64,9 @@ def add_entry(self, entry): - "Add the given word 'entry' (token, dict_location) to the index." + "Add the given word 'entry' (token, filename) to the index." - token, dict_location = entry + token, filename = entry if not token: return @@ -71,7 +79,7 @@ if not words.has_key(token): words[token] = [] - words[token].append(dict_location) + words[token].append(filename) class Searcher: def __init__(self, index): diff -r c0e6ca347c6b -r 758590a37212 tests/start_indexer.py --- a/tests/start_indexer.py Thu Sep 29 17:28:37 2005 +0000 +++ b/tests/start_indexer.py Thu Sep 29 17:30:53 2005 +0000 @@ -1,30 +1,62 @@ #!/usr/bin/env python +"A simple text indexing activity." + from parallel import start, Exchange from Dict import Indexer, Searcher, Parser import os -def apply_parser(channel, filename): - #print "Parsing", filename - parser = Parser(filename) +def apply_parser(channel, filenames, delay=None): + + """ + Apply the parser, returning results through the given 'channel', processing + the files with the given 'filenames', and imposing a optional 'delay' in the + parser to simulate heavy processing. + """ + + #print "Parsing", filenames + parser = Parser(filenames, delay=delay) parser.send_entries(channel) - #print "Done", filename + #print "Done", filenames + +def fill_window(filenames, i, window_size, exchange, slice_size, delay=None): -def fill_window(filenames, i, window_size, exchange): - limit = min(len(filenames), i + window_size - len(exchange.active())) - while i < limit: - channel = start(apply_parser, filenames[i]) + """ + Fill a "window" of channels using the given 'filenames', starting from index + 'i' in that list, choosing a number of channels limited to 'window_size' and + adding those channels to the given 'exchange'. Each channel will be + employed by a parser which will be given a number of filenames limited to + 'slice_size', and which will be requested to wait periodically if specified + by the optional 'delay'. + """ + + number = 0 + limit = len(filenames) + active = len(exchange.active()) + while i < limit and number < window_size - active: + j = i + slice_size + channel = start(apply_parser, filenames[i:j], delay) exchange.add(channel) - i = i + 1 + i = j + number += 1 return i -def get_searcher(filenames, window_size): +def get_searcher(filenames, window_size, slice_size, delay=None): + + """ + Get a searcher object, providing searching on the contents of the given + 'filenames', employing a 'window_size' and 'slice_size' as parameters to + respectively control the number of monitored channels in the exchange, and + the number of filenames given to each created process. The optional 'delay' + is used to simulate heavy processing in each created process. + """ + master_index = Indexer() # Start indexing by filling a window with channels. exchange = Exchange() - i = fill_window(filenames, 0, window_size, exchange) + i = fill_window(filenames, 0, window_size, exchange, slice_size, delay) # Start listening for responses. @@ -34,7 +66,7 @@ entry = channel.receive() master_index.add_entry(entry) - i = fill_window(filenames, i, window_size, exchange) + i = fill_window(filenames, i, window_size, exchange, slice_size, delay) # Provide a search interface. @@ -46,7 +78,32 @@ # Get the parameters. - window_size, directory = int(sys.argv[1]), sys.argv[2] + try: + directory = sys.argv[1] + except IndexError: + print "Please specify a directory where text files reside." + print "To investigate other performance factors, you can also specify" + print "a window size (eg. 1, 5, 10, ...) indicating the number of" + print "channels/processes being used, a slice size (eg. 5, 10, ...)" + print "indicating the number of filenames given to each created process" + print "and a time delay to simulate heavy processing in each created" + print "process (eg. 0.5, 1, ...)." + sys.exit(1) + + if len(sys.argv) > 2: + window_size = int(sys.argv[2]) + else: + window_size = 4 + + if len(sys.argv) > 3: + slice_size = int(sys.argv[3]) + else: + slice_size = 5 + + if len(sys.argv) > 4: + delay = float(sys.argv[4]) + else: + delay = None # Build a list of filenames. @@ -55,7 +112,7 @@ # Get a searcher using an index built in parallel. - searcher = get_searcher(filenames, window_size) + searcher = get_searcher(filenames, window_size, slice_size, delay) # Present a user interface.