# HG changeset patch # User Paul Boddie # Date 1251594620 -7200 # Node ID 0ba1bf2fa5632cb2bcf603f9db4b976e548fcd83 # Parent 3d86f5cb01c10b27660683dff34a45a88af6a287 Introduced index "partitions", sequential access to term dictionaries, and some support for merging partitions. diff -r 3d86f5cb01c1 -r 0ba1bf2fa563 iixr.py --- a/iixr.py Sat Aug 29 22:12:25 2009 +0200 +++ b/iixr.py Sun Aug 30 03:10:20 2009 +0200 @@ -18,15 +18,17 @@ with this program. If not, see . """ -from os import mkdir # to determine whether to create indexes +from os import listdir, mkdir # index and partition discovery from os.path import exists, join from os.path import commonprefix # to find common string prefixes from bisect import bisect_right # to find terms in the dictionary index +from bisect import insort_right # to maintain a sorted list of data for merging import bz2, zlib # for field compression # Constants. INTERVAL = 100 +FLUSH_INTERVAL = 1000000 compressors = [("b", bz2.compress), ("z", zlib.compress)] decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} @@ -42,8 +44,14 @@ self.reset() def reset(self): + + "To be used to reset the state of the reader or writer between records." + pass + def rewind(self): + self.f.seek(0) + def close(self): if self.f is not None: self.f.close() @@ -211,7 +219,7 @@ self.last_docnum = docnum - def write_all_positions(self, doc_positions): + def write_term_positions(self, doc_positions): """ Write all 'doc_positions' - a collection of tuples of the form (document @@ -272,7 +280,7 @@ return self.last_docnum, positions - def read_all_positions(self, offset): + def read_term_positions(self, offset): """ Read all positions from 'offset', seeking to that position in the file @@ -347,6 +355,10 @@ self.last_term = "" self.last_offset = 0 + def rewind(self): + self.reset() + FileReader.rewind(self) + def read_term(self): """ @@ -461,7 +473,7 @@ and positions at which the term is found. """ - offset, frequency = self.position_writer.write_all_positions(doc_positions) + offset, frequency = self.position_writer.write_term_positions(doc_positions) self._write_term(term, offset, frequency) def close(self): @@ -529,6 +541,23 @@ else: return None + def rewind(self): + self.info_reader.rewind() + + def _get_positions(self, offset): + return self.position_reader.read_term_positions(offset) + + def read_term(self): + + """ + Return the next term, its frequency and the documents and positions at + which the term is found. + """ + + term, offset, frequency = self.info_reader.read_term() + positions = self._get_positions(offset) + return term, frequency, positions + def find_positions(self, term): "Return the documents and positions at which the given 'term' is found." @@ -538,7 +567,7 @@ return None else: offset, frequency = t - return self.position_reader.read_all_positions(offset) + return self._get_positions(offset) def get_frequency(self, term): @@ -635,7 +664,7 @@ bad_docnum, fields = self.read_fields() self.last_docnum = docnum return docnum, fields - + class FieldIndexWriter(FileWriter): "Writing field index details to files." @@ -758,6 +787,70 @@ self.field_reader.close() self.field_index_reader.close() +# Dictionary merging classes. + +class TermDictionaryMerger: + + "Merge position files." + + def __init__(self, writer, readers): + self.writer = writer + self.readers = readers + + def merge(self): + entries = [] + + # Get the first entries from the readers. + + for partition, reader in enumerate(self.readers): + reader.rewind() + + try: + term, frequency, positions = reader.read_term() + insort_right(entries, (term, positions, partition)) + except EOFError: + pass + + # While entries are available, write them out in order, merging where + # appropriate. + + while entries: + term, doc_positions, partition = entries[0] + to_update = [partition] + + nentries = len(entries) + i = 1 + + # Find other entries for the term. + + while i < nentries: + other_term, other_doc_positions, other_partition = entries[i] + + # For such entries, merge the positions. + + if other_term == term: + doc_positions += other_doc_positions + to_update.append(other_partition) + i += 1 + else: + break + + # Write the combined term details. + + doc_positions.sort() + self.writer.write_term_positions(term, doc_positions) + + # Update the entries from the affected readers. + + del entries[:i] + + for partition in to_update: + try: + term, frequency, positions = self_readers[partition].read_term() + insort_right(entries, (term, positions, partition)) + except EOFError: + pass + # High-level classes. class IndexWriter: @@ -766,12 +859,20 @@ Building term information and writing it to the term and field dictionaries. """ - def __init__(self, dict_writer, field_dict_writer): - self.dict_writer = dict_writer - self.field_dict_writer = field_dict_writer + def __init__(self, pathname, interval, flush_interval): + self.pathname = pathname + self.interval = interval + self.flush_interval = flush_interval + + self.dict_partition = 0 + self.field_dict_partition = 0 + self.terms = {} self.docs = {} + self.position_counter = 0 + self.field_counter = 0 + def add_position(self, term, docnum, position): """ @@ -791,6 +892,10 @@ doc.append(position) + self.position_counter += 1 + if self.flush_threshold and self.position_counter >= self.flush_threshold: + self.flush_terms() + def add_fields(self, docnum, fields): "Add for the document with the given 'docnum' a list of 'fields'." @@ -800,41 +905,112 @@ else: self.docs[docnum] += fields - def close(self): - if self.dict_writer is None: - return + self.field_counter += len(fields) + if self.flush_threshold and self.field_counter >= self.flush_threshold: + self.flush_fields() + + def get_term_writer(self): + + "Return a term dictionary writer for the current partition." + + tdf = open(join(self.pathname, "terms-%d" % self.dict_partition), "wb") + info_writer = TermWriter(tdf) + + tdif = open(join(self.pathname, "index-%d" % self.dict_partition), "wb") + index_writer = TermIndexWriter(tdif) + + tpf = open(join(self.pathname, "positions-%d" % self.dict_partition), "wb") + positions_writer = PositionWriter(tpf) + + return TermDictionaryWriter(info_writer, index_writer, positions_writer, self.interval) + + def get_field_writer(self): + + "Return a field dictionary writer for the current partition." + + ff = open(join(self.pathname, "fields-%d" % self.field_dict_partition), "wb") + field_writer = FieldWriter(ff) + + fif = open(join(self.pathname, "fields_index-%d" % self.field_dict_partition), "wb") + field_index_writer = FieldIndexWriter(fif) + + return FieldDictionaryWriter(field_writer, field_index_writer, self.interval) + + def flush_terms(self): + + "Flush terms into the current term dictionary partition." # Get the terms in order. terms = self.terms.items() terms.sort() + dict_writer = self.get_term_writer() + for term, doc_positions in terms: doc_positions = doc_positions.items() doc_positions.sort() - self.dict_writer.write_term_positions(term, doc_positions) + dict_writer.write_term_positions(term, doc_positions) + + dict_writer.close() - self.dict_writer.close() - self.dict_writer = None + self.terms = {} + self.dict_partition += 1 + + def flush_fields(self): + + "Flush fields into the current term dictionary partition." # Get the documents in order. docs = self.docs.items() docs.sort() + field_dict_writer = self.get_field_writer() + for docnum, fields in docs: - self.field_dict_writer.write_fields(docnum, fields) + field_dict_writer.write_fields(docnum, fields) + + field_dict_writer.close() - self.field_dict_writer.close() - self.field_dict_writer = None + self.docs = {} + self.field_dict_partition += 1 + + def close(self): + if self.terms: + self.flush_terms() + if self.docs: + self.flush_fields() class IndexReader: "Accessing the term and field dictionaries." - def __init__(self, dict_reader, field_dict_reader): - self.dict_reader = dict_reader - self.field_dict_reader = field_dict_reader + def __init__(self, pathname, partition=0): + self.pathname = pathname + self.dict_reader = self.get_term_reader(partition) + self.field_dict_reader = self.get_field_reader(partition) + + def get_term_reader(self, partition): + tdf = open(join(self.pathname, "terms-%d" % partition), "rb") + info_reader = TermReader(tdf) + + tdif = open(join(self.pathname, "index-%d" % partition), "rb") + index_reader = TermIndexReader(tdif) + + tpf = open(join(self.pathname, "positions-%d" % partition), "rb") + positions_reader = PositionReader(tpf) + + return TermDictionaryReader(info_reader, index_reader, positions_reader) + + def get_field_reader(self, partition): + ff = open(join(self.pathname, "fields-%d" % partition), "rb") + field_reader = FieldReader(ff) + + fif = open(join(self.pathname, "fields_index-%d" % partition), "rb") + field_index_reader = FieldIndexReader(fif) + + return FieldDictionaryReader(field_reader, field_index_reader) def find_positions(self, term): return self.dict_reader.find_positions(term) @@ -858,63 +1034,41 @@ self.reader = None self.writer = None - def get_writer(self, interval=INTERVAL): + def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL): - "Return a writer, optionally using the given indexing 'interval'." + """ + Return a writer, optionally using the given indexing 'interval' and + 'flush_interval'. + """ if not exists(self.pathname): mkdir(self.pathname) - tdf = open(join(self.pathname, "terms"), "wb") - info_writer = TermWriter(tdf) - - tdif = open(join(self.pathname, "index"), "wb") - index_writer = TermIndexWriter(tdif) - - tpf = open(join(self.pathname, "positions"), "wb") - positions_writer = PositionWriter(tpf) - - dict_writer = TermDictionaryWriter(info_writer, index_writer, positions_writer, interval) - - ff = open(join(self.pathname, "fields"), "wb") - field_writer = FieldWriter(ff) - - fif = open(join(self.pathname, "fields_index"), "wb") - field_index_writer = FieldIndexWriter(fif) - - field_dict_writer = FieldDictionaryWriter(field_writer, field_index_writer, interval) - - self.writer = IndexWriter(dict_writer, field_dict_writer) + self.writer = IndexWriter(self.pathname, interval, flush_interval) return self.writer - def get_reader(self): + def get_reader(self, partition=0): "Return a reader for the index." if not exists(self.pathname): raise OSError, "Index path %r does not exist." % self.pathname - tdf = open(join(self.pathname, "terms"), "rb") - info_reader = TermReader(tdf) + self.reader = IndexReader(self.pathname, partition) + return self.reader - tdif = open(join(self.pathname, "index"), "rb") - index_reader = TermIndexReader(tdif) + def merge_terms(self): - tpf = open(join(self.pathname, "positions"), "rb") - positions_reader = PositionReader(tpf) - - dict_reader = TermDictionaryReader(info_reader, index_reader, positions_reader) + "Merge term dictionaries." - ff = open(join(self.pathname, "fields"), "rb") - field_reader = FieldReader(ff) + readers = [] - fif = open(join(self.pathname, "fields_index"), "rb") - field_index_reader = FieldIndexReader(fif) + for filename in os.listdir(self.pathname): + if filename.startswith("terms-"): # 6 character prefix + partition = int(filename[6:]) + readers.append(self.get_reader(partition)) - field_dict_reader = FieldDictionaryReader(field_reader, field_index_reader) - - self.reader = IndexReader(dict_reader, field_dict_reader) - return self.reader + # NOTE: Make a distinct new writer/index. def close(self): if self.reader is not None: diff -r 3d86f5cb01c1 -r 0ba1bf2fa563 test.py --- a/test.py Sat Aug 29 22:12:25 2009 +0200 +++ b/test.py Sun Aug 30 03:10:20 2009 +0200 @@ -54,7 +54,7 @@ w = iixr.PositionWriter(f) offsets = [] for doc_positions in all_doc_positions: - offset, frequency = w.write_all_positions(doc_positions) + offset, frequency = w.write_term_positions(doc_positions) offsets.append(offset) w.close() @@ -63,7 +63,7 @@ offsets.reverse() all_doc_positions.reverse() for offset, doc_positions in zip(offsets, all_doc_positions): - dp = r.read_all_positions(offset) + dp = r.read_term_positions(offset) print doc_positions == dp, doc_positions, dp r.close() @@ -264,6 +264,14 @@ for term in ("dog", "dingo"): dp = rd.find_positions(term) print dp is None, dp + +# (Test sequential access.) + +rd.rewind() +for term, doc_positions in terms_with_positions: + t, fr, dp = rd.read_term() + print term == t, term, t + print doc_positions == dp, doc_positions, dp rd.close() # Test high-level index operations.