pprocess

tests/start_indexer.py

149:eeaa043dbfb1
2008-06-04 paulb [project @ 2008-06-04 22:13:31 by paulb] Added a link to the reference document.
     1 #!/usr/bin/env python     2      3 "A simple text indexing activity."     4      5 from pprocess import start, Exchange     6 from Dict import Indexer, Searcher, Parser     7 import os     8      9 def apply_parser(channel, filenames, delay=None):    10     11     """    12     Apply the parser, returning results through the given 'channel', processing    13     the files with the given 'filenames', and imposing a optional 'delay' in the    14     parser to simulate heavy processing.    15     """    16     17     #print "Parsing", filenames    18     parser = Parser(filenames, delay=delay)    19     parser.send_entries(channel)    20     #print "Done", filenames    21     22 def fill_window(filenames, i, window_size, exchange, slice_size, delay=None):    23     24     """    25     Fill a "window" of channels using the given 'filenames', starting from index    26     'i' in that list, choosing a number of channels limited to 'window_size' and    27     adding those channels to the given 'exchange'. Each channel will be    28     employed by a parser which will be given a number of filenames limited to    29     'slice_size', and which will be requested to wait periodically if specified    30     by the optional 'delay'.    31     """    32     33     number = 0    34     limit = len(filenames)    35     active = len(exchange.active())    36     while i < limit and number < window_size - active:    37         j = i + slice_size    38         channel = start(apply_parser, filenames[i:j], delay)    39         exchange.add(channel)    40         i = j    41         number += 1    42     return i    43     44 def get_searcher(filenames, window_size, slice_size, delay=None):    45     46     """    47     Get a searcher object, providing searching on the contents of the given    48     'filenames', employing a 'window_size' and 'slice_size' as parameters to    49     respectively control the number of monitored channels in the exchange, and    50     the number of filenames given to each created process. The optional 'delay'    51     is used to simulate heavy processing in each created process.    52     """    53     54     master_index = Indexer()    55     56     # Start indexing by filling a window with channels.    57     58     exchange = Exchange()    59     i = fill_window(filenames, 0, window_size, exchange, slice_size, delay)    60     61     # Start listening for responses.    62     63     while exchange.active():    64         #print "(%d)" % len(exchange.active()),    65         for channel in exchange.ready():    66             entry = channel.receive()    67             master_index.add_entry(entry)    68     69         i = fill_window(filenames, i, window_size, exchange, slice_size, delay)    70     71     # Provide a search interface.    72     73     #print    74     return Searcher(master_index.get_index())    75     76 if __name__ == "__main__":    77     import sys, os    78     79     # Get the parameters.    80     81     try:    82         directory = sys.argv[1]    83     except IndexError:    84         print "Please specify a directory where text files reside."    85         print "To investigate other performance factors, you can also specify"    86         print "a window size (eg. 1, 5, 10, ...) indicating the number of"    87         print "channels/processes being used, a slice size (eg. 5, 10, ...)"    88         print "indicating the number of filenames given to each created process"    89         print "and a time delay to simulate heavy processing in each created"    90         print "process (eg. 0.5, 1, ...)."    91         sys.exit(1)    92     93     if len(sys.argv) > 2:    94         window_size = int(sys.argv[2])    95     else:    96         window_size = 4    97     98     if len(sys.argv) > 3:    99         slice_size = int(sys.argv[3])   100     else:   101         slice_size = 5   102    103     if len(sys.argv) > 4:   104         delay = float(sys.argv[4])   105     else:   106         delay = None   107    108     # Build a list of filenames.   109    110     filenames = [os.path.join(directory, filename) for filename in os.listdir(directory)]   111     filenames = [filename for filename in filenames if os.path.isfile(filename)]   112    113     # Get a searcher using an index built in parallel.   114    115     searcher = get_searcher(filenames, window_size, slice_size, delay)   116    117     # Present a user interface.   118    119     if "--noprompt" not in sys.argv:   120         while 1:   121             print "Pattern:",   122             pattern = raw_input()   123             print searcher.find(pattern)   124    125 # vim: tabstop=4 expandtab shiftwidth=4