# HG changeset patch # User Paul Boddie # Date 1251666632 -7200 # Node ID 51cf2f14287979f89ab9ecc856c71fb0962b5830 # Parent eafc8a8a84ffc3504b91e9b06f6d3b7797674631 Fixed term discovery in the term dictionary index within the _find_term method. Fixed various typing errors in the merging functionality. Introduced merging into the process of obtaining a reader from top-level Index instances: only merged dictionaries should be accessed. Introduced the renaming of single partitions for reading and the deletion of partitions after merging. Renamed "index-N" to "terms_index-N" for term dictionary index files. Moved dictionary reader acquisition to utility functions, and changed merging methods to use such readers directly. Introduced merging into the test program. diff -r eafc8a8a84ff -r 51cf2f142879 iixr.py --- a/iixr.py Sun Aug 30 21:29:10 2009 +0200 +++ b/iixr.py Sun Aug 30 23:10:32 2009 +0200 @@ -19,6 +19,7 @@ """ from os import listdir, mkdir # index and partition discovery +from os import remove, rename # partition manipulation from os.path import exists, join from os.path import commonprefix # to find common string prefixes from bisect import bisect_right # to find terms in the dictionary index @@ -491,8 +492,7 @@ # Large numbers for ordering purposes. - self.max_offset = self.terms[-1][1] - self.max_info_offset = self.terms[-1][2] + self.max_offset = self.terms[-1][1] + 1 def _find_term(self, term): @@ -501,7 +501,7 @@ dictionary. """ - i = bisect_right(self.terms, (term, self.max_offset, self.max_info_offset)) - 1 + i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 # Get the entry position providing the term or one preceding it. @@ -848,7 +848,7 @@ # For such entries, merge the positions. if other_term == term: - self.merge_positions(doc_positions, other_doc_positions) + doc_positions = self.merge_positions(doc_positions, other_doc_positions) to_update.append(other_partition) i += 1 else: @@ -864,7 +864,7 @@ for partition in to_update: try: - term, frequency, positions = self_readers[partition].read_term() + term, frequency, positions = self.readers[partition].read_term() insort_right(entries, (term, positions, partition)) except EOFError: pass @@ -881,12 +881,10 @@ for docnum, positions in other_doc_positions: if doc_position_dict.has_key(docnum): doc_position_dict[docnum] += positions - doc_position_dict[docnum].sort() else: doc_position_dict[docnum] = positions - doc_positions = doc_position_dict.items() - return doc_positions + return doc_position_dict.items() class FieldDictionaryMerger(Merger): @@ -945,7 +943,7 @@ for partition in to_update: try: - docnum, fields = self_readers[partition].read_fields() + docnum, fields = self.readers[partition].read_fields() insort_right(entries, (docnum, fields, partition)) except EOFError: pass @@ -963,7 +961,7 @@ tdf = open(join(pathname, "terms-%s" % partition), "wb") info_writer = TermWriter(tdf) - tdif = open(join(pathname, "index-%s" % partition), "wb") + tdif = open(join(pathname, "terms_index-%s" % partition), "wb") index_writer = TermIndexWriter(tdif) tpf = open(join(pathname, "positions-%s" % partition), "wb") @@ -987,6 +985,59 @@ return FieldDictionaryWriter(field_writer, field_index_writer, interval) +def get_term_reader(pathname, partition): + + """ + Return a term dictionary reader using files under the given 'pathname' + labelled according to the given 'partition'. + """ + + tdf = open(join(pathname, "terms-%s" % partition), "rb") + info_reader = TermReader(tdf) + + tdif = open(join(pathname, "terms_index-%s" % partition), "rb") + index_reader = TermIndexReader(tdif) + + tpf = open(join(pathname, "positions-%s" % partition), "rb") + positions_reader = PositionReader(tpf) + + return TermDictionaryReader(info_reader, index_reader, positions_reader) + +def get_field_reader(pathname, partition): + + """ + Return a field dictionary reader using files under the given 'pathname' + labelled according to the given 'partition'. + """ + + ff = open(join(pathname, "fields-%s" % partition), "rb") + field_reader = FieldReader(ff) + + fif = open(join(pathname, "fields_index-%s" % partition), "rb") + field_index_reader = FieldIndexReader(fif) + + return FieldDictionaryReader(field_reader, field_index_reader) + +def rename_files(pathname, names, from_partition, to_partition): + for name in names: + rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) + +def rename_term_files(pathname, from_partition, to_partition): + rename_files(pathname, ("terms", "terms_index", "positions"), from_partition, to_partition) + +def rename_field_files(pathname, from_partition, to_partition): + rename_files(pathname, ("fields", "fields_index"), from_partition, to_partition) + +def remove_files(pathname, names, partition): + for name in names: + remove(join(pathname, "%s-%s" % (name, partition))) + +def remove_term_files(pathname, partition): + remove_files(pathname, ("terms", "terms_index", "positions"), partition) + +def remove_field_files(pathname, partition): + remove_files(pathname, ("fields", "fields_index"), partition) + # High-level classes. class IndexWriter: @@ -1112,31 +1163,9 @@ "Accessing the term and field dictionaries." - def __init__(self, pathname, partition=0): - self.pathname = pathname - self.dict_reader = self.get_term_reader(partition) - self.field_dict_reader = self.get_field_reader(partition) - - def get_term_reader(self, partition): - tdf = open(join(self.pathname, "terms-%s" % partition), "rb") - info_reader = TermReader(tdf) - - tdif = open(join(self.pathname, "index-%s" % partition), "rb") - index_reader = TermIndexReader(tdif) - - tpf = open(join(self.pathname, "positions-%s" % partition), "rb") - positions_reader = PositionReader(tpf) - - return TermDictionaryReader(info_reader, index_reader, positions_reader) - - def get_field_reader(self, partition): - ff = open(join(self.pathname, "fields-%s" % partition), "rb") - field_reader = FieldReader(ff) - - fif = open(join(self.pathname, "fields_index-%s" % partition), "rb") - field_index_reader = FieldIndexReader(fif) - - return FieldDictionaryReader(field_reader, field_index_reader) + def __init__(self, pathname): + self.dict_reader = get_term_reader(pathname, "merged") + self.field_dict_reader = get_field_reader(pathname, "merged") def find_positions(self, term): return self.dict_reader.find_positions(term) @@ -1177,10 +1206,21 @@ "Return a reader for the index." + # Ensure that only one partition exists. + + self.merge_terms() + self.merge_fields() + + return self._get_reader(partition) + + def _get_reader(self, partition): + + "Return a reader for the index." + if not exists(self.pathname): raise OSError, "Index path %r does not exist." % self.pathname - self.reader = IndexReader(self.pathname, partition) + self.reader = IndexReader(self.pathname) return self.reader def merge_terms(self, interval=INTERVAL): @@ -1188,17 +1228,58 @@ "Merge term dictionaries using the given indexing 'interval'." readers = [] + partitions = [] - for filename in os.listdir(self.pathname): + for filename in listdir(self.pathname): if filename.startswith("terms-"): # 6 character prefix - partition = int(filename[6:]) - readers.append(self.get_reader(partition)) + partition = filename[6:] + readers.append(get_term_reader(self.pathname, partition)) + partitions.append(partition) + + # Write directly to a dictionary. + + if len(readers) > 1: + writer = get_term_writer(self.pathname, "merged", interval) + merger = TermDictionaryMerger(writer, readers) + merger.merge() + merger.close() + + # Remove old files. + + for partition in partitions: + remove_term_files(self.pathname, partition) + + elif len(readers) == 1 and partitions[0] != "merged": + rename_term_files(self.pathname, partitions[0], "merged") + + def merge_fields(self, interval=INTERVAL): - writer = get_writer(self.pathname, "new", interval) + "Merge field dictionaries using the given indexing 'interval'." + + readers = [] + partitions = [] + + for filename in listdir(self.pathname): + if filename.startswith("fields-"): # 7 character prefix + partition = filename[7:] + readers.append(get_field_reader(self.pathname, partition)) + partitions.append(partition) + + # Write directly to a dictionary. - merger = TermDictionaryMerger(writer, readers) - merger.merge() - merger.close() + if len(readers) > 1: + writer = get_field_writer(self.pathname, "merged", interval) + merger = FieldDictionaryMerger(writer, readers) + merger.merge() + merger.close() + + # Remove old files. + + for partition in partitions: + remove_field_files(self.pathname, partition) + + elif len(readers) == 1 and partitions[0] != "merged": + rename_field_files(self.pathname, partitions[0], "merged") def close(self): if self.reader is not None: diff -r eafc8a8a84ff -r 51cf2f142879 test.py --- a/test.py Sun Aug 30 21:29:10 2009 +0200 +++ b/test.py Sun Aug 30 23:10:32 2009 +0200 @@ -282,7 +282,7 @@ print doc_positions == dp, doc_positions, dp rd.close() -# Test high-level index operations. +# Test high-level index operations (including merging). docs = [ (1, "The cat sat on the mat"), @@ -301,7 +301,7 @@ ] index = iixr.Index("test_index") -wi = index.get_writer(3) +wi = index.get_writer(3, 6) for docnum, text in docs: for position, term in enumerate(text.split()): wi.add_position(term, docnum, position)