# HG changeset patch # User Paul Boddie # Date 1252617553 -7200 # Node ID 452b6b9f2864ca96f84e2d27341ab02d8787aeeb # Parent dafcd23d5fa9b4db9f9c08bea35ce8d34d96fd92 Introduced opener classes to replace the superfluous position and position index reader classes whose iterators based on duplicated file descriptors did not appear to be sufficiently isolated from each other. Replaced the itermerge implementation with a more generic approach inspired somewhat by the heapq.merge function from Python 2.6. Introduced iterator-based merging. diff -r dafcd23d5fa9 -r 452b6b9f2864 iixr.py --- a/iixr.py Wed Sep 09 01:18:04 2009 +0200 +++ b/iixr.py Thu Sep 10 23:19:13 2009 +0200 @@ -24,7 +24,6 @@ from os.path import exists, join from os.path import commonprefix # to find common string prefixes from bisect import bisect_right # to find terms in the dictionary index -from bisect import insort_right # to maintain a sorted list of data for merging import bz2, zlib # for field compression from itermerge import itermerge @@ -152,19 +151,18 @@ shift = 0 number = 0 - more = 1 - - while more: - byte = self.f.read(1) - if not byte: - raise EOFError - - csd = ord(byte) - more = csd & 128 != 0 - if more: - csd &= 127 - number += (csd << shift) - shift += 7 + read = self.f.read + + try: + csd = ord(read(1)) + while csd & 128: + number += ((csd & 127) << shift) + shift += 7 + csd = ord(read(1)) + else: + number += (csd << shift) + except TypeError: + raise EOFError return number @@ -195,6 +193,19 @@ return unicode(s, "utf-8") +class FileOpener: + + "Opening files using their filenames." + + def __init__(self, filename): + self.filename = filename + + def open(self, mode): + return open(self.filename, mode) + + def close(self): + pass + # Specific classes for storing term and position information. class PositionWriter(FileWriter): @@ -242,39 +253,10 @@ return offset -class PositionReader(FileReader): +class PositionOpener(FileOpener): "Reading position information from files." - def reset(self): - self.last_docnum = 0 - - def read_positions(self): - - "Read positions, returning a document number and a list of positions." - - # Read the document number delta and add it to the last number. - - self.last_docnum += self.read_number() - - # Read the number of positions. - - npositions = self.read_number() - - # Read the position deltas, adding each previous position to get the - # appropriate collection of absolute positions. - - i = 0 - last = 0 - positions = [] - - while i < npositions: - last += self.read_number() - positions.append(last) - i += 1 - - return self.last_docnum, positions - def read_term_positions(self, offset, count): """ @@ -285,7 +267,7 @@ # Duplicate the file handle. - f = fdopen(dup(self.f.fileno()), "rb") + f = self.open("rb") f.seek(offset) return PositionIterator(f, count) @@ -324,10 +306,100 @@ return offset -class PositionIndexReader(FileReader): +class PositionIndexOpener(FileOpener): "Reading position index information from files." + def read_term_positions(self, offset, doc_frequency): + + """ + Read all positions from 'offset', seeking to that position in the file + before reading. The number of documents available for reading is limited + to 'doc_frequency'. + """ + + # Duplicate the file handle. + + f = self.open("rb") + f.seek(offset) + return PositionIndexIterator(f, doc_frequency) + +# Iterators for position-related files. + +class IteratorBase: + + def __init__(self, count): + self.replenish(count) + + def replenish(self, count): + self.count = count + self.read_documents = 0 + + def __len__(self): + return self.count + + def sort(self): + pass # Stored document positions are already sorted. + + def __iter__(self): + return self + +class PositionIterator(FileReader, IteratorBase): + + "Iterating over document positions." + + def __init__(self, f, count): + FileReader.__init__(self, f) + IteratorBase.__init__(self, count) + + def reset(self): + self.last_docnum = 0 + + def read_positions(self): + + "Read positions, returning a document number and a list of positions." + + # Read the document number delta and add it to the last number. + + self.last_docnum += self.read_number() + + # Read the number of positions. + + npositions = self.read_number() + + # Read the position deltas, adding each previous position to get the + # appropriate collection of absolute positions. + + i = 0 + last = 0 + positions = [] + + while i < npositions: + last += self.read_number() + positions.append(last) + i += 1 + + return self.last_docnum, positions + + def next(self): + + "Read positions for a single document." + + if self.read_documents < self.count: + self.read_documents += 1 + return self.read_positions() + else: + raise StopIteration + +class PositionIndexIterator(FileReader, IteratorBase): + + "Iterating over document positions." + + def __init__(self, f, count): + FileReader.__init__(self, f) + IteratorBase.__init__(self, count) + self.section_count = 0 + def reset(self): self.last_docnum = 0 self.last_pos_offset = 0 @@ -353,67 +425,6 @@ return self.last_docnum, self.last_pos_offset, count - def read_term_positions(self, offset, doc_frequency): - - """ - Read all positions from 'offset', seeking to that position in the file - before reading. The number of documents available for reading is limited - to 'doc_frequency'. - """ - - # Duplicate the file handle. - - f = fdopen(dup(self.f.fileno()), "rb") - f.seek(offset) - return PositionIndexIterator(f, doc_frequency) - -# Iterators for position-related files. - -class IteratorBase: - - def __init__(self, count): - self.replenish(count) - - def replenish(self, count): - self.count = count - self.read_documents = 0 - - def __len__(self): - return self.count - - def sort(self): - pass # Stored document positions are already sorted. - - def __iter__(self): - return self - -class PositionIterator(PositionReader, IteratorBase): - - "Iterating over document positions." - - def __init__(self, f, count): - PositionReader.__init__(self, f) - IteratorBase.__init__(self, count) - - def next(self): - - "Read positions for a single document." - - if self.read_documents < self.count: - self.read_documents += 1 - return self.read_positions() - else: - raise StopIteration - -class PositionIndexIterator(PositionIndexReader, IteratorBase): - - "Iterating over document positions." - - def __init__(self, f, count): - PositionIndexReader.__init__(self, f) - IteratorBase.__init__(self, count) - self.section_count = 0 - def next(self): "Read positions for a single document." @@ -477,7 +488,7 @@ # Every {interval} entries, write an index entry. - if count == self.interval: + if count % self.interval == 0: io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) # Remember the first index entry offset. @@ -487,7 +498,6 @@ first_offset = None first_docnum = None - count = 0 # Reset the position writer so that position readers accessing # a section start with the correct document number. @@ -498,14 +508,14 @@ else: if first_offset is not None: - io = self.position_index_writer.write_positions(first_docnum, first_offset, count) + io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) # Remember the first index entry offset. if index_offset is None: index_offset = io - return index_offset, frequency, len(doc_positions) + return index_offset, frequency, count def close(self): self.position_writer.close() @@ -515,9 +525,9 @@ "Reading position dictionaries." - def __init__(self, position_reader, position_index_reader): - self.position_reader = position_reader - self.position_index_reader = position_index_reader + def __init__(self, position_opener, position_index_opener): + self.position_opener = position_opener + self.position_index_opener = position_index_opener def read_term_positions(self, offset, doc_frequency): @@ -526,21 +536,21 @@ given 'doc_frequency'. """ - return PositionDictionaryIterator(self.position_reader, - self.position_index_reader, offset, doc_frequency) + return PositionDictionaryIterator(self.position_opener, + self.position_index_opener, offset, doc_frequency) def close(self): - self.position_reader.close() - self.position_index_reader.close() + pass class PositionDictionaryIterator: "Iteration over position dictionary entries." - def __init__(self, position_reader, position_index_reader, offset, doc_frequency): - self.position_reader = position_reader + def __init__(self, position_opener, position_index_opener, offset, doc_frequency): + self.position_opener = position_opener self.doc_frequency = doc_frequency - self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) + self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) + self.iterator = None # Remember the last values. @@ -555,12 +565,16 @@ self._next_section() self._init_section() + # Sequence methods. + def __len__(self): return self.doc_frequency def sort(self): pass + # Iterator methods. + def __iter__(self): return self @@ -680,7 +694,17 @@ "Initialise the iterator for the section in the position file." - self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) + if self.iterator is not None: + self.iterator.close() + self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) + + def close(self): + if self.iterator is not None: + self.iterator.close() + self.iterator = None + if self.index_iterator is not None: + self.index_iterator.close() + self.index_iterator = None class TermWriter(FileWriter): @@ -957,6 +981,18 @@ def _get_positions(self, offset, doc_frequency): return self.position_dict_reader.read_term_positions(offset, doc_frequency) + # Iterator convenience methods. + + def __iter__(self): + self.rewind() + return self + + def next(self): + try: + return self.read_term() + except EOFError: + raise StopIteration + # Sequential access methods. def rewind(self): @@ -1213,6 +1249,20 @@ else: self.max_offset = None + # Iterator convenience methods. + + def __iter__(self): + self.rewind() + return self + + def next(self): + try: + return self.read_fields() + except EOFError: + raise StopIteration + + # Sequential access methods. + def rewind(self): self.field_reader.rewind() @@ -1222,6 +1272,8 @@ return self.field_reader.read_fields() + # Random access methods. + def get_fields(self, docnum): "Read the fields of the document with the given 'docnum'." @@ -1283,57 +1335,20 @@ Merge terms and positions from the readers, sending them to the writer. """ - entries = [] - - # Get the first entries from the readers. - - for partition, reader in enumerate(self.readers): - reader.rewind() - - try: - term, frequency, doc_frequency, positions = reader.read_term() - insort_right(entries, (term, positions, partition)) - except EOFError: - pass - - # While entries are available, write them out in order, merging where - # appropriate. - - while entries: - term, doc_positions, partition = entries[0] - to_update = [partition] - - nentries = len(entries) - i = 1 - - # Find other entries for the term. - - while i < nentries: - other_term, other_doc_positions, other_partition = entries[i] - - # For such entries, merge the positions. - - if other_term == term: - doc_positions = itermerge(doc_positions, other_doc_positions) - to_update.append(other_partition) - i += 1 - else: - break - - # Write the combined term details. - - self.writer.write_term_positions(term, doc_positions) - - # Update the entries from the affected readers. - - del entries[:i] - - for partition in to_update: - try: - term, frequency, doc_frequency, positions = self.readers[partition].read_term() - insort_right(entries, (term, positions, partition)) - except EOFError: - pass + last_term = None + current_readers = [] + + for term, frequency, doc_frequency, positions in itermerge(self.readers): + if term == last_term: + current_readers.append(positions) + else: + if current_readers: + self.writer.write_term_positions(last_term, itermerge(current_readers)) + last_term = term + current_readers = [positions] + else: + if current_readers: + self.writer.write_term_positions(last_term, itermerge(current_readers)) class FieldDictionaryMerger(Merger): @@ -1345,40 +1360,9 @@ Merge fields from the readers, sending them to the writer. """ - entries = [] - - # Get the first entries from the readers. - - for partition, reader in enumerate(self.readers): - reader.rewind() - - try: - docnum, fields = reader.read_fields() - insort_right(entries, (docnum, fields, partition)) - except EOFError: - pass - - # While entries are available, write them out in order, merging where - # appropriate. Since fields from one document should only appear in a - # single partition, only one partition will be updated at a time. - - while entries: - docnum, fields, partition = entries[0] - - # Write the combined term details. - + for docnum, fields in itermerge(self.readers): self.writer.write_fields(docnum, fields) - # Update the entries from the affected readers. - - del entries[0] - - try: - docnum, fields = self.readers[partition].read_fields() - insort_right(entries, (docnum, fields, partition)) - except EOFError: - pass - # Utility functions. def get_term_writer(pathname, partition, interval, doc_interval): @@ -1434,13 +1418,10 @@ tdif = open(join(pathname, "terms_index-%s" % partition), "rb") index_reader = TermIndexReader(tdif) - tpf = open(join(pathname, "positions-%s" % partition), "rb") - positions_reader = PositionReader(tpf) - - tpif = open(join(pathname, "positions_index-%s" % partition), "rb") - positions_index_reader = PositionIndexReader(tpif) - - positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) + positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) + positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) + + positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) diff -r dafcd23d5fa9 -r 452b6b9f2864 itermerge.py --- a/itermerge.py Wed Sep 09 01:18:04 2009 +0200 +++ b/itermerge.py Thu Sep 10 23:19:13 2009 +0200 @@ -1,54 +1,51 @@ #!/usr/bin/env python -def itermerge(seq1, seq2): +"An iterator merging class similar to heapq.merge in Python 2.6." - "Merge 'seq1' and 'seq2' to produce an ordered, combined list of results." +from bisect import insort_right - results = [] +class itermerge: - iter1 = iter(seq1) - iter2 = iter(seq2) + """ + Merge ordered sequences to produce an ordered, combined sequence of + results. + """ - t1 = None - t2 = None + def __init__(self, sequences): + self.iters = [] + + # Prepare the underlying iterators. + + for seq in sequences: + it = iter(seq) + next = it.next + self._add_next(next) - t1 = _itermerge_next(iter1) - if t1 is None: - _itermerge_fill(iter2, results) - return results + def sort(self): + pass # The output should be sorted. + + def __iter__(self): + return self + + def _add_next(self, next): - while 1: - if t1 is None: - t1 = _itermerge_next(iter1) - if t1 is None: - results.append(t2) - _itermerge_fill(iter2, results) - return results + """ + Store the current value for an iterator, alongside the means of + getting the next value: the 'next' method. + """ + + try: + insort_right(self.iters, (next(), next)) + except StopIteration: + pass + + def next(self): + if self.iters: + value, next = self.iters[0] + del self.iters[0] + self._add_next(next) + return value else: - t2 = _itermerge_next(iter2) - if t2 is None: - results.append(t1) - _itermerge_fill(iter1, results) - return results - - if t1 < t2: - results.append(t1) - t1 = None - else: - results.append(t2) - t2 = None - -def _itermerge_next(iter): - try: - return iter.next() - except StopIteration: - return None - -def _itermerge_fill(iter, results): - try: - while 1: - results.append(iter.next()) - except StopIteration: - pass + raise StopIteration # vim: tabstop=4 expandtab shiftwidth=4 diff -r dafcd23d5fa9 -r 452b6b9f2864 test.py --- a/test.py Wed Sep 09 01:18:04 2009 +0200 +++ b/test.py Thu Sep 10 23:19:13 2009 +0200 @@ -60,7 +60,7 @@ w.close() f = open("testP", "rb") -r = iixr.PositionReader(f) +r = iixr.PositionIterator(f, None) for doc_positions in all_doc_positions: for docnum, positions in doc_positions: d, p = r.read_positions() @@ -97,8 +97,7 @@ offsets.append((offset, doc_frequency)) w.close() -f = open("testPI", "rb") -r = iixr.PositionIndexReader(f) +r = iixr.PositionIndexOpener("testPI") offsets.reverse() indexed_positions.reverse() for (offset, doc_frequency), term_positions in zip(offsets, indexed_positions): @@ -122,10 +121,8 @@ offsets.append((offset, doc_frequency)) wd.close() -f = open("testP", "rb") -r = iixr.PositionReader(f) -f2 = open("testPI", "rb") -r2 = iixr.PositionIndexReader(f2) +r = iixr.PositionOpener("testP") +r2 = iixr.PositionIndexOpener("testPI") rd = iixr.PositionDictionaryReader(r, r2) offsets.reverse() all_doc_positions.reverse() @@ -293,10 +290,8 @@ r = iixr.TermReader(f) f2 = open("testI", "rb") r2 = iixr.TermIndexReader(f2) -f3 = open("testP", "rb") -r3 = iixr.PositionReader(f3) -f4 = open("testPI", "rb") -r4 = iixr.PositionIndexReader(f4) +r3 = iixr.PositionOpener("testP") +r4 = iixr.PositionIndexOpener("testPI") rp = iixr.PositionDictionaryReader(r3, r4) rd = iixr.TermDictionaryReader(r, r2, rp) terms_reversed = terms[:] @@ -357,10 +352,8 @@ r = iixr.TermReader(f) f2 = open("testI", "rb") r2 = iixr.TermIndexReader(f2) -f3 = open("testP", "rb") -r3 = iixr.PositionReader(f3) -f4 = open("testPI", "rb") -r4 = iixr.PositionIndexReader(f4) +r3 = iixr.PositionOpener("testP") +r4 = iixr.PositionIndexOpener("testPI") rp = iixr.PositionDictionaryReader(r3, r4) rd = iixr.TermDictionaryReader(r, r2, rp) terms_reversed = terms_with_positions[:]