1 #!/usr/bin/env python 2 3 "A simple text indexing activity." 4 5 from parallel import start, Exchange 6 from Dict import Indexer, Searcher, Parser 7 import os 8 9 def apply_parser(channel, filenames, delay=None): 10 11 """ 12 Apply the parser, returning results through the given 'channel', processing 13 the files with the given 'filenames', and imposing a optional 'delay' in the 14 parser to simulate heavy processing. 15 """ 16 17 #print "Parsing", filenames 18 parser = Parser(filenames, delay=delay) 19 parser.send_entries(channel) 20 #print "Done", filenames 21 22 def fill_window(filenames, i, window_size, exchange, slice_size, delay=None): 23 24 """ 25 Fill a "window" of channels using the given 'filenames', starting from index 26 'i' in that list, choosing a number of channels limited to 'window_size' and 27 adding those channels to the given 'exchange'. Each channel will be 28 employed by a parser which will be given a number of filenames limited to 29 'slice_size', and which will be requested to wait periodically if specified 30 by the optional 'delay'. 31 """ 32 33 number = 0 34 limit = len(filenames) 35 active = len(exchange.active()) 36 while i < limit and number < window_size - active: 37 j = i + slice_size 38 channel = start(apply_parser, filenames[i:j], delay) 39 exchange.add(channel) 40 i = j 41 number += 1 42 return i 43 44 def get_searcher(filenames, window_size, slice_size, delay=None): 45 46 """ 47 Get a searcher object, providing searching on the contents of the given 48 'filenames', employing a 'window_size' and 'slice_size' as parameters to 49 respectively control the number of monitored channels in the exchange, and 50 the number of filenames given to each created process. The optional 'delay' 51 is used to simulate heavy processing in each created process. 52 """ 53 54 master_index = Indexer() 55 56 # Start indexing by filling a window with channels. 57 58 exchange = Exchange() 59 i = fill_window(filenames, 0, window_size, exchange, slice_size, delay) 60 61 # Start listening for responses. 62 63 while exchange.active(): 64 #print "(%d)" % len(exchange.active()), 65 for channel in exchange.ready(): 66 entry = channel.receive() 67 master_index.add_entry(entry) 68 69 i = fill_window(filenames, i, window_size, exchange, slice_size, delay) 70 71 # Provide a search interface. 72 73 #print 74 return Searcher(master_index.get_index()) 75 76 if __name__ == "__main__": 77 import sys, os 78 79 # Get the parameters. 80 81 try: 82 directory = sys.argv[1] 83 except IndexError: 84 print "Please specify a directory where text files reside." 85 print "To investigate other performance factors, you can also specify" 86 print "a window size (eg. 1, 5, 10, ...) indicating the number of" 87 print "channels/processes being used, a slice size (eg. 5, 10, ...)" 88 print "indicating the number of filenames given to each created process" 89 print "and a time delay to simulate heavy processing in each created" 90 print "process (eg. 0.5, 1, ...)." 91 sys.exit(1) 92 93 if len(sys.argv) > 2: 94 window_size = int(sys.argv[2]) 95 else: 96 window_size = 4 97 98 if len(sys.argv) > 3: 99 slice_size = int(sys.argv[3]) 100 else: 101 slice_size = 5 102 103 if len(sys.argv) > 4: 104 delay = float(sys.argv[4]) 105 else: 106 delay = None 107 108 # Build a list of filenames. 109 110 filenames = [os.path.join(directory, filename) for filename in os.listdir(directory)] 111 filenames = [filename for filename in filenames if os.path.isfile(filename)] 112 113 # Get a searcher using an index built in parallel. 114 115 searcher = get_searcher(filenames, window_size, slice_size, delay) 116 117 # Present a user interface. 118 119 if "--noprompt" not in sys.argv: 120 while 1: 121 print "Pattern:", 122 pattern = raw_input() 123 print searcher.find(pattern) 124 125 # vim: tabstop=4 expandtab shiftwidth=4