# HG changeset patch # User Paul Boddie # Date 1251660550 -7200 # Node ID eafc8a8a84ffc3504b91e9b06f6d3b7797674631 # Parent 0ba1bf2fa5632cb2bcf603f9db4b976e548fcd83 Made the rewind method for FileReader use the reset method. Added field identifiers, permitting sensible merging behaviour for field dictionaries. Changed the field writing method in the field dictionary writer from add_fields to add_field, employing an explicit field identifier. Added sequential field reading support in the field dictionary reader. Added more support for merging dictionaries. Created utility functions for opening term and field dictionary writers. Changed filename construction to permit strings as partition identifiers. diff -r 0ba1bf2fa563 -r eafc8a8a84ff iixr.py --- a/iixr.py Sun Aug 30 03:10:20 2009 +0200 +++ b/iixr.py Sun Aug 30 21:29:10 2009 +0200 @@ -51,6 +51,7 @@ def rewind(self): self.f.seek(0) + self.reset() def close(self): if self.f is not None: @@ -321,11 +322,6 @@ information was written to the file. """ - # Too long terms are not currently supported. - - if len(term) > 255: - raise ValueError, "Term %r is too long." % term - # Write the prefix length and term suffix. common = len(commonprefix([self.last_term, term])) @@ -355,10 +351,6 @@ self.last_term = "" self.last_offset = 0 - def rewind(self): - self.reset() - FileReader.rewind(self) - def read_term(self): """ @@ -597,8 +589,9 @@ def write_fields(self, docnum, fields): """ - Write for the given 'docnum', a list of 'fields' (strings representing - field values). Return the offset at which the fields are stored. + Write for the given 'docnum', a list of 'fields' (integer, string pairs + representing field identifiers and values respectively). + Return the offset at which the fields are stored. """ offset = self.f.tell() @@ -613,7 +606,8 @@ # Write the fields themselves. - for field in fields: + for i, field in fields: + self.write_number(i) self.write_string(field, 1) # compress self.last_docnum = docnum @@ -630,7 +624,7 @@ """ Read fields from the file, returning a tuple containing the document - number and a list of field values. + number and a list of field (identifier, value) pairs. """ # Read the document number. @@ -647,7 +641,9 @@ i = 0 while i < nfields: - fields.append(self.read_string(1)) # decompress + identifier = self.read_number() + value = self.read_string(1) # decompress + fields.append((identifier, value)) i += 1 return self.last_docnum, fields @@ -751,7 +747,16 @@ self.max_offset = self.docs[-1][1] - def read_fields(self, docnum): + def rewind(self): + self.field_reader.rewind() + + def read_fields(self): + + "Return the next document number and fields." + + return self.field_reader.read_fields() + + def get_fields(self, docnum): "Read the fields of the document with the given 'docnum'." @@ -789,15 +794,29 @@ # Dictionary merging classes. -class TermDictionaryMerger: +class Merger: - "Merge position files." + "Merge files." def __init__(self, writer, readers): self.writer = writer self.readers = readers + def close(self): + for reader in self.readers: + reader.close() + self.writer.close() + +class TermDictionaryMerger(Merger): + + "Merge term and position files." + def merge(self): + + """ + Merge terms and positions from the readers, sending them to the writer. + """ + entries = [] # Get the first entries from the readers. @@ -829,7 +848,7 @@ # For such entries, merge the positions. if other_term == term: - doc_positions += other_doc_positions + self.merge_positions(doc_positions, other_doc_positions) to_update.append(other_partition) i += 1 else: @@ -837,7 +856,6 @@ # Write the combined term details. - doc_positions.sort() self.writer.write_term_positions(term, doc_positions) # Update the entries from the affected readers. @@ -851,6 +869,124 @@ except EOFError: pass + def merge_positions(self, doc_positions, other_doc_positions): + + """ + Merge 'doc_positions' with 'other_doc_positions' so that common document + records contain positions from both collections. + """ + + doc_position_dict = dict(doc_positions) + + for docnum, positions in other_doc_positions: + if doc_position_dict.has_key(docnum): + doc_position_dict[docnum] += positions + doc_position_dict[docnum].sort() + else: + doc_position_dict[docnum] = positions + + doc_positions = doc_position_dict.items() + return doc_positions + +class FieldDictionaryMerger(Merger): + + "Merge field files." + + def merge(self): + + """ + Merge fields from the readers, sending them to the writer. + """ + + entries = [] + + # Get the first entries from the readers. + + for partition, reader in enumerate(self.readers): + reader.rewind() + + try: + docnum, fields = reader.read_fields() + insort_right(entries, (docnum, fields, partition)) + except EOFError: + pass + + # While entries are available, write them out in order, merging where + # appropriate. + + while entries: + docnum, fields, partition = entries[0] + to_update = [partition] + + nentries = len(entries) + i = 1 + + # Find other entries for the term. + + while i < nentries: + other_docnum, other_fields, other_partition = entries[i] + + # For such entries, merge the positions. + + if other_term == term: + fields += other_fields + to_update.append(other_partition) + i += 1 + else: + break + + # Write the combined term details. + + self.writer.write_fields(docnum, fields) + + # Update the entries from the affected readers. + + del entries[:i] + + for partition in to_update: + try: + docnum, fields = self_readers[partition].read_fields() + insort_right(entries, (docnum, fields, partition)) + except EOFError: + pass + +# Utility functions. + +def get_term_writer(pathname, partition, interval): + + """ + Return a term dictionary writer using files under the given 'pathname' + labelled according to the given 'partition', using the given indexing + 'interval'. + """ + + tdf = open(join(pathname, "terms-%s" % partition), "wb") + info_writer = TermWriter(tdf) + + tdif = open(join(pathname, "index-%s" % partition), "wb") + index_writer = TermIndexWriter(tdif) + + tpf = open(join(pathname, "positions-%s" % partition), "wb") + positions_writer = PositionWriter(tpf) + + return TermDictionaryWriter(info_writer, index_writer, positions_writer, interval) + +def get_field_writer(pathname, partition, interval): + + """ + Return a field dictionary writer using files under the given 'pathname' + labelled according to the given 'partition', using the given indexing + 'interval'. + """ + + ff = open(join(pathname, "fields-%s" % partition), "wb") + field_writer = FieldWriter(ff) + + fif = open(join(pathname, "fields_index-%s" % partition), "wb") + field_index_writer = FieldIndexWriter(fif) + + return FieldDictionaryWriter(field_writer, field_index_writer, interval) + # High-level classes. class IndexWriter: @@ -893,48 +1029,38 @@ doc.append(position) self.position_counter += 1 - if self.flush_threshold and self.position_counter >= self.flush_threshold: + if self.flush_interval and self.position_counter >= self.flush_interval: self.flush_terms() - def add_fields(self, docnum, fields): + def add_field(self, docnum, identifier, value): - "Add for the document with the given 'docnum' a list of 'fields'." + """ + Add for the document with the given 'docnum' a field having the given + 'identifier' and 'value'. + """ if not self.docs.has_key(docnum): - doc_fields = self.docs[docnum] = fields + doc_fields = self.docs[docnum] = [] else: - self.docs[docnum] += fields + doc_fields = self.docs[docnum] - self.field_counter += len(fields) - if self.flush_threshold and self.field_counter >= self.flush_threshold: + doc_fields.append((identifier, value)) + + self.field_counter += 1 + if self.flush_interval and self.field_counter >= self.flush_interval: self.flush_fields() def get_term_writer(self): "Return a term dictionary writer for the current partition." - tdf = open(join(self.pathname, "terms-%d" % self.dict_partition), "wb") - info_writer = TermWriter(tdf) - - tdif = open(join(self.pathname, "index-%d" % self.dict_partition), "wb") - index_writer = TermIndexWriter(tdif) - - tpf = open(join(self.pathname, "positions-%d" % self.dict_partition), "wb") - positions_writer = PositionWriter(tpf) - - return TermDictionaryWriter(info_writer, index_writer, positions_writer, self.interval) + return get_term_writer(self.pathname, self.dict_partition, self.interval) def get_field_writer(self): "Return a field dictionary writer for the current partition." - ff = open(join(self.pathname, "fields-%d" % self.field_dict_partition), "wb") - field_writer = FieldWriter(ff) - - fif = open(join(self.pathname, "fields_index-%d" % self.field_dict_partition), "wb") - field_index_writer = FieldIndexWriter(fif) - - return FieldDictionaryWriter(field_writer, field_index_writer, self.interval) + return get_field_writer(self.pathname, self.field_dict_partition, self.interval) def flush_terms(self): @@ -992,22 +1118,22 @@ self.field_dict_reader = self.get_field_reader(partition) def get_term_reader(self, partition): - tdf = open(join(self.pathname, "terms-%d" % partition), "rb") + tdf = open(join(self.pathname, "terms-%s" % partition), "rb") info_reader = TermReader(tdf) - tdif = open(join(self.pathname, "index-%d" % partition), "rb") + tdif = open(join(self.pathname, "index-%s" % partition), "rb") index_reader = TermIndexReader(tdif) - tpf = open(join(self.pathname, "positions-%d" % partition), "rb") + tpf = open(join(self.pathname, "positions-%s" % partition), "rb") positions_reader = PositionReader(tpf) return TermDictionaryReader(info_reader, index_reader, positions_reader) def get_field_reader(self, partition): - ff = open(join(self.pathname, "fields-%d" % partition), "rb") + ff = open(join(self.pathname, "fields-%s" % partition), "rb") field_reader = FieldReader(ff) - fif = open(join(self.pathname, "fields_index-%d" % partition), "rb") + fif = open(join(self.pathname, "fields_index-%s" % partition), "rb") field_index_reader = FieldIndexReader(fif) return FieldDictionaryReader(field_reader, field_index_reader) @@ -1019,7 +1145,7 @@ return self.dict_reader.get_frequency(term) def get_fields(self, docnum): - return self.field_dict_reader.read_fields(docnum) + return self.field_dict_reader.get_fields(docnum) def close(self): self.dict_reader.close() @@ -1057,9 +1183,9 @@ self.reader = IndexReader(self.pathname, partition) return self.reader - def merge_terms(self): + def merge_terms(self, interval=INTERVAL): - "Merge term dictionaries." + "Merge term dictionaries using the given indexing 'interval'." readers = [] @@ -1068,7 +1194,11 @@ partition = int(filename[6:]) readers.append(self.get_reader(partition)) - # NOTE: Make a distinct new writer/index. + writer = get_writer(self.pathname, "new", interval) + + merger = TermDictionaryMerger(writer, readers) + merger.merge() + merger.close() def close(self): if self.reader is not None: diff -r 0ba1bf2fa563 -r eafc8a8a84ff test.py --- a/test.py Sun Aug 30 03:10:20 2009 +0200 +++ b/test.py Sun Aug 30 21:29:10 2009 +0200 @@ -82,7 +82,7 @@ f = open("testF", "wb") w = iixr.FieldWriter(f) for docnum, fields in doc_fields: - w.write_fields(docnum, fields) + w.write_fields(docnum, list(enumerate(fields))) w.close() f = open("testF", "rb") @@ -90,7 +90,7 @@ for docnum, fields in doc_fields: dn, df = r.read_fields() print docnum == dn, docnum, dn - print fields == df, fields, df + print list(enumerate(fields)) == df, list(enumerate(fields)), df r.close() # Test field index files. @@ -123,7 +123,7 @@ w2 = iixr.FieldIndexWriter(f2) wd = iixr.FieldDictionaryWriter(w, w2, 3) for docnum, fields in doc_fields: - wd.write_fields(docnum, fields) + wd.write_fields(docnum, list(enumerate(fields))) wd.close() f = open("testF", "rb") @@ -134,11 +134,19 @@ doc_fields_reversed = doc_fields[:] doc_fields_reversed.reverse() for docnum, fields in doc_fields_reversed: - df = rd.read_fields(docnum) - print fields == df, fields, df + df = rd.get_fields(docnum) + print list(enumerate(fields)) == df, list(enumerate(fields)), df for docnum in (13579, 246810): - df = rd.read_fields(docnum) + df = rd.get_fields(docnum) print df is None, df + +# (Test sequential access.) + +rd.rewind() +for docnum, fields in doc_fields: + dn, df = rd.read_fields() + print docnum == dn, docnum, dn + print list(enumerate(fields)) == df, list(enumerate(fields)), df rd.close() # Test terms. @@ -297,7 +305,7 @@ for docnum, text in docs: for position, term in enumerate(text.split()): wi.add_position(term, docnum, position) - wi.add_fields(docnum, [text]) + wi.add_field(docnum, 123, text) wi.close() rd = index.get_reader() @@ -308,7 +316,7 @@ print frequency == fr, frequency, fr for docnum, text in docs: df = rd.get_fields(docnum) - print text == df[0], text, df[0] + print (123, text) == df[0], (123, text), df[0] index.close() # vim: tabstop=4 expandtab shiftwidth=4