# HG changeset patch # User Paul Boddie # Date 1297297153 -3600 # Node ID 218fd6522891327ee0e6121b560c1f9c659d6ef9 # Parent fc0e9882717b26d5f9e39fe6def1c055dcbe7bfb Introduced size declarations for sequences employed by readers and writers. Added a begin method which is used to acquire sequence size information in readers and to set such information in writers. Changed readers to call the begin method upon initialisation and for readers and writers to not call reset upon initialisation, although this could potentially change for readers in future. Changed the itermerge iterator to permit access to the first element in an iteration sequence in order to support data sampling for sequence size information. Added a sizeof function to determine sequence/value lengths. Added some potentially useful sequence conversion functions. diff -r fc0e9882717b -r 218fd6522891 iixr/data.py --- a/iixr/data.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/data.py Thu Feb 10 01:19:13 2011 +0100 @@ -35,14 +35,23 @@ last = current i += 1 +def op_seq_monotonic(x, y, op): + return tuple([op(a, b) for a, b in zip(x, y)]) + def add_seq_monotonic(x, y): return op_seq_monotonic(x, y, operator.add) def sub_seq_monotonic(x, y): return op_seq_monotonic(x, y, operator.sub) -def op_seq_monotonic(x, y, op): - return tuple([op(a, b) for a, b in zip(x, y)]) +def op_first_monotonic(x, y, op): + return (op(x[0], y[0]),) + tuple(zip(x[1:], y[1:])) + +def add_first_monotonic(x, y): + return op_first_monotonic(x, y, operator.add) + +def sub_first_monotonic(x, y): + return op_first_monotonic(x, y, operator.sub) def add_seq(x, y): length = min(len(x), len(y)) @@ -72,6 +81,9 @@ def is_sequence(value): return isinstance(value, (list, tuple)) +def sizeof(value): + return is_sequence(value) and len(value) or 0 + def get_monotonic_adder(value): return is_sequence(value) and add_seq_monotonic or operator.add @@ -176,26 +188,28 @@ # Sequence serialisation. -def sequence_to_array(value, bytes): +def sequence_to_array(value, size, bytes): - "Write the given sequence 'value' to 'bytes'." + "Write the given sequence 'value' with the given 'size' to 'bytes'." - size = is_sequence(value) and len(value) or 0 - vint_to_array(size, bytes) if size: - for a in value: - vint_to_array(a, bytes) + i = 0 + limit = min(len(value), size) + while i < limit: + vint_to_array(value[i], bytes) + i += 1 + while i < size: + vint_to_array(0, bytes) else: vint_to_array(value, bytes) -def sequence_from_array(bytes, start=0): +def sequence_from_array(bytes, size, start=0): """ - Read a sequence from 'bytes', returning the sequence and the first position - after the sequence. + Read a sequence from 'bytes' having the given 'size', returning the sequence + and the first position after the sequence. """ - size, start = vint_from_array_start(bytes, start) if size: j = 0 value = [] diff -r fc0e9882717b -r 218fd6522891 iixr/fields.py --- a/iixr/fields.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/fields.py Thu Feb 10 01:19:13 2011 +0100 @@ -28,6 +28,12 @@ "Writing field data to files." + def begin(self, docnum_size): + self.write_number(docnum_size) + self.end_record() + self.docnum_size = docnum_size + self.data_start = self.tell() + def reset(self): self.end_record() self.last_docnum = None @@ -50,7 +56,7 @@ # Write the document number. - self.write_sequence_value(docnum_seq) + self.write_sequence_value(docnum_seq, self.docnum_size) # Write the number of fields. @@ -68,6 +74,14 @@ "Reading field data from files." + def begin(self): + self.begin_record() + try: + self.docnum_size = self.read_number() + except EOFError: + self.docnum_size = 0 # NOTE: No fields! + self.data_start = self.tell() + def reset(self): self.last_docnum = None self.adder = None @@ -82,7 +96,7 @@ # Read the document number. - docnum = self.read_sequence_value() + docnum = self.read_sequence_value(self.docnum_size) if self.last_docnum is not None: self.last_docnum = self.adder(docnum, self.last_docnum) @@ -120,14 +134,12 @@ self.last_docnum = docnum return docnum, fields -class FieldIndexWriter(FileWriter): +class FieldIndexWriter(FieldWriter): "Writing field index details to files." def reset(self): - self.end_record() - self.last_docnum = None - self.subtractor = None + FieldWriter.reset(self) self.last_offset = 0 def write_document(self, docnum, offset): @@ -147,7 +159,7 @@ # Write the document number. - self.write_sequence_value(docnum_seq) + self.write_sequence_value(docnum_seq, self.docnum_size) # Write the offset delta. @@ -156,15 +168,13 @@ self.last_docnum = docnum self.last_offset = offset -class FieldIndexReader(FileReader): +class FieldIndexReader(FieldReader): "Reading field index details from files." def reset(self): - self.last_docnum = None - self.adder = None + FieldReader.reset(self) self.last_offset = 0 - self.begin_record() def read_document(self): @@ -172,7 +182,7 @@ # Read the document number. - docnum = self.read_sequence_value() + docnum = self.read_sequence_value(self.docnum_size) if self.last_docnum is not None: self.last_docnum = self.adder(docnum, self.last_docnum) @@ -198,7 +208,13 @@ def write_fields(self, docnum, fields): - "Write details of the document with the given 'docnum' and 'fields'." + "Write details of the given 'docnum' and 'fields'." + + if self.entry == 0: + docnum_size = sizeof(docnum) + self.field_writer.begin(docnum_size) + self.field_index_writer.begin(docnum_size) + self.field_index_writer.reset() if self.entry % self.interval == 0: self.field_writer.reset() @@ -221,9 +237,13 @@ def __init__(self, field_reader, field_index_reader): self.field_reader = field_reader self.field_index_reader = field_index_reader - self.entry = 0 + + self.field_reader.reset() + self.field_index_reader.reset() self.cache = {} + + self.entry = 0 self.docs = [] try: while 1: diff -r fc0e9882717b -r 218fd6522891 iixr/files.py --- a/iixr/files.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/files.py Thu Feb 10 01:19:13 2011 +0100 @@ -32,7 +32,7 @@ self.f = f self.data = array('B') # master buffer self.record = array('B') # record buffer - self.reset() + self.data_start = 0 def reset(self): @@ -45,7 +45,7 @@ self.reset() def rewind(self): - self.f.seek(0) + self.f.seek(self.data_start) self.reset() def close(self): @@ -57,17 +57,24 @@ "Writing basic data types to files." + def __init__(self, f): + File.__init__(self, f) + self.written = 0 + def tell(self): # NOTE: Will not be accurate within the current record. - return self.f.tell() + len(self.data) + return self.written def begin_record(self): pass def end_record(self): if self.record: - vint_to_array(len(self.record), self.data) - self.data += self.record + length = len(self.record) + size = vint(length) + self.f.write(size) + self.record.tofile(self.f) + self.written += len(size) + length self.record = array('B') def write_number(self, number): @@ -116,21 +123,21 @@ length = len(s) self.record.fromstring("".join([flag, vint(length), s])) - def write_sequence_value(self, value): - sequence_to_array(value, self.record) + def write_sequence_value(self, value, size): + sequence_to_array(value, size, self.record) - def write_sequence_values(self, values): + def write_sequence_values(self, values, size): vint_to_array(len(values), self.record) for value in values: - self.write_sequence_value(value) + self.write_sequence_value(value, size) - def write_delta_sequence(self, values): + def write_delta_sequence(self, values, size): convert_sequence(values, get_subtractor(values[0])) - self.write_sequence_values(values) + self.write_sequence_values(values, size) - def write_monotonic_sequence(self, values): + def write_monotonic_sequence(self, values, size): convert_sequence(values, get_monotonic_subtractor(values[0])) - self.write_sequence_values(values) + self.write_sequence_values(values, size) def flush(self): if self.f is not None: @@ -146,6 +153,20 @@ "Reading basic data types from files." + def __init__(self, f): + File.__init__(self, f) + self.begin() + + def tell(self): + # NOTE: Will not be accurate within the current record. + return self.f.tell() + + def begin(self): + + "Initialise file-wide parameters." + + pass + def begin_record(self): self.record = array('B') self.start = 0 @@ -185,6 +206,14 @@ n, self.start = vint_from_array_start(self.record, self.start) return n + def read_numbers(self, n): + l = [] + i = 0 + while i < n: + l.append(self.read_number()) + i += 1 + return l + def read_string(self, decompress=0): """ @@ -214,26 +243,26 @@ return unicode(s, "utf-8") - def read_sequence_value(self): - value, self.start = sequence_from_array(self.record, self.start) + def read_sequence_value(self, size): + value, self.start = sequence_from_array(self.record, size, self.start) return value - def read_sequences(self): + def read_sequences(self, size): values = [] length = self.read_number() i = 0 while i < length: - values.append(self.read_sequence_value()) + values.append(self.read_sequence_value(size)) i += 1 return values - def read_delta_sequence(self): - values = self.read_sequences() + def read_delta_sequence(self, size): + values = self.read_sequences(size) convert_sequence(values, get_adder(values[0])) return values - def read_monotonic_sequence(self): - values = self.read_sequences() + def read_monotonic_sequence(self, size): + values = self.read_sequences(size) convert_sequence(values, get_monotonic_adder(values[0])) return values diff -r fc0e9882717b -r 218fd6522891 iixr/index.py --- a/iixr/index.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/index.py Thu Feb 10 01:19:13 2011 +0100 @@ -171,10 +171,8 @@ self.docs.sort() field_dict_writer = self.get_field_writer() - for docnum, fields in self.docs: field_dict_writer.write_fields(docnum, fields) - field_dict_writer.close() self.docs = [] diff -r fc0e9882717b -r 218fd6522891 iixr/positions.py --- a/iixr/positions.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/positions.py Thu Feb 10 01:19:13 2011 +0100 @@ -25,6 +25,13 @@ "Writing position information to files." + def begin(self, docnum_size, position_size): + self.write_numbers((docnum_size, position_size)) + self.end_record() + self.data_start = self.tell() + self.docnum_size = docnum_size + self.position_size = position_size + def reset(self): self.end_record() self.last_docnum = None @@ -57,8 +64,8 @@ self.subtractor = get_subtractor(docnum) docnum_seq = docnum - self.write_sequence_value(docnum_seq) - self.write_monotonic_sequence(positions) + self.write_sequence_value(docnum_seq, self.docnum_size) + self.write_monotonic_sequence(positions, self.position_size) self.last_docnum = docnum @@ -66,6 +73,14 @@ "Reading position information within term-specific regions of a file." + def begin(self): + self.begin_record() + try: + self.docnum_size, self.position_size = self.read_numbers(2) + except EOFError: + self.docnum_size, self.position_size = 0, 0 # NOTE: No positions! + self.data_start = self.tell() + def reset(self): self.last_docnum = None self.adder = None @@ -79,7 +94,7 @@ # Read the document number. - docnum = self.read_sequence_value() + docnum = self.read_sequence_value(self.docnum_size) # Calculate an ongoing delta. @@ -92,18 +107,19 @@ self.adder = get_adder(docnum) self.last_docnum = docnum - positions = self.read_monotonic_sequence() + positions = self.read_monotonic_sequence(self.position_size) return self.last_docnum, positions -class PositionIndexWriter(FileWriter): +class PositionIndexWriter(PositionWriter): "Writing position index information to files." + def begin(self, docnum_size): + PositionWriter.begin(self, docnum_size, 0) + def reset(self): - self.end_record() - self.last_docnum = None - self.subtractor = None + PositionWriter.reset(self) self.last_pos_offset = 0 def write_positions(self, docnum, pos_offset, count): @@ -121,22 +137,20 @@ self.subtractor = get_subtractor(docnum) docnum_seq = docnum - self.write_sequence_value(docnum_seq) + self.write_sequence_value(docnum_seq, self.docnum_size) self.write_number(pos_offset - self.last_pos_offset) self.write_number(count) self.last_docnum = docnum self.last_pos_offset = pos_offset -class PositionIndexReader(FileReader): +class PositionIndexReader(PositionReader): "Reading position index information within term-specific regions of a file." def reset(self): - self.last_docnum = None - self.adder = None + PositionReader.reset(self) self.last_pos_offset = 0 - self.begin_record() def read_positions(self): @@ -147,7 +161,7 @@ # Read the document number. - docnum = self.read_sequence_value() + docnum = self.read_sequence_value(self.docnum_size) if self.last_docnum is not None: self.last_docnum = self.adder(docnum, self.last_docnum) @@ -295,28 +309,38 @@ the term involved. """ - # Reset the writers. - - self.position_writer.reset() - self.position_index_writer.reset() - - # Remember the first index entry offset. - - index_offset = self.position_index_writer.tell() - # Write the positions. frequency = 0 count = 0 if doc_positions: + doc_positions.sort() + + # Look ahead at the first document record. + # NOTE: Any iterator would need to support this. + + first_docnum, first_positions = doc_positions[0] + first_position = first_positions[0] + + # Write out size details. + + docnum_size, position_size = sizeof(first_docnum), sizeof(first_position) + self.position_writer.begin(docnum_size, position_size) + self.position_index_writer.begin(docnum_size) + + # Reset the writers. + + self.position_writer.reset() + self.position_index_writer.reset() + + # Remember the first index entry offset. + + index_offset = self.position_index_writer.tell() # Retain the first record offset for a subsequent index entry. first_offset = self.position_writer.tell() - first_docnum = None - - doc_positions.sort() for docnum, positions in doc_positions: if first_docnum is None: diff -r fc0e9882717b -r 218fd6522891 iixr/terms.py --- a/iixr/terms.py Tue Feb 08 00:08:27 2011 +0100 +++ b/iixr/terms.py Thu Feb 10 01:19:13 2011 +0100 @@ -172,6 +172,8 @@ self.interval = interval self.entry = 0 + self.index_writer.reset() + def _write_term(self, term, offset, frequency, doc_frequency): """ @@ -216,8 +218,11 @@ self.info_reader = info_reader self.index_reader = index_reader self.position_dict_reader = position_dict_reader + + self.info_reader.reset() + self.index_reader.reset() + self.entry = 0 - self.terms = [] try: while 1: diff -r fc0e9882717b -r 218fd6522891 itermerge.py --- a/itermerge.py Tue Feb 08 00:08:27 2011 +0100 +++ b/itermerge.py Thu Feb 10 01:19:13 2011 +0100 @@ -29,6 +29,7 @@ def __init__(self, sequences): self.iters = [] + self.first = None # Prepare the underlying iterators. @@ -43,6 +44,15 @@ next = iterator.next self._add_next(next) + def __getitem__(self, i): + if i == 0: + if self.first is None: + value, next = self.iters[0] + self.first = value + return self.first + else: + raise IndexError, "Index %d cannot be accessed in this iterator." % i + def sort(self): pass # The output should be sorted. diff -r fc0e9882717b -r 218fd6522891 test.py --- a/test.py Tue Feb 08 00:08:27 2011 +0100 +++ b/test.py Thu Feb 10 01:19:13 2011 +0100 @@ -52,14 +52,14 @@ f = open("testMS", "wb") w = FileWriter(f) w.begin_record() -w.write_monotonic_sequence(tuples) +w.write_monotonic_sequence(tuples, 2) w.end_record() w.close() f = open("testMS", "rb") r = FileReader(f) r.begin_record() -for t, t2 in zip(r.read_monotonic_sequence(), tuples): +for t, t2 in zip(r.read_monotonic_sequence(2), tuples): print t == t2, t, t2 r.end_record() r.close() @@ -69,14 +69,14 @@ f = open("testNMS", "wb") w = FileWriter(f) w.begin_record() -w.write_delta_sequence(tuples2) +w.write_delta_sequence(tuples2, 2) w.end_record() w.close() f = open("testNMS", "rb") r = FileReader(f) r.begin_record() -for t, t2 in zip(r.read_delta_sequence(), tuples2): +for t, t2 in zip(r.read_delta_sequence(2), tuples2): print t == t2, t, t2 r.end_record() r.close() @@ -99,20 +99,21 @@ f = open("testP", "wb") w = PositionWriter(f) +w.begin(0, 0) for doc_positions in all_doc_positions: + w.reset() for docnum, positions in doc_positions: w.write_positions(docnum, positions) - w.reset() w.close() f = open("testP", "rb") r = PositionReader(f) for doc_positions in all_doc_positions: + r.reset() for docnum, positions in doc_positions: d, p = r.read_positions() print docnum == d, docnum, d print positions == p, positions, p - r.reset() r.close() all_doc_positions_seq = [ @@ -131,20 +132,21 @@ f = open("testP2", "wb") w = PositionWriter(f) +w.begin(2, 2) for doc_positions in all_doc_positions_seq: + w.reset() for docnum, positions in doc_positions: w.write_positions(docnum, positions) - w.reset() w.close() f = open("testP2", "rb") r = PositionReader(f) for doc_positions in all_doc_positions_seq: + r.reset() for docnum, positions in doc_positions: d, p = r.read_positions() print docnum == d, docnum, d print positions == p, positions, p - r.reset() r.close() print "- Test position index files." @@ -163,6 +165,7 @@ offsets = [] f = open("testPI", "wb") w = PositionIndexWriter(f) +w.begin(0) for term_positions in indexed_positions: offset = None doc_frequency = 0 @@ -224,12 +227,15 @@ f = open("testF", "wb") w = FieldWriter(f) +w.begin(0) +w.reset() for docnum, fields in doc_fields: w.write_fields(docnum, list(enumerate(fields))) w.close() f = open("testF", "rb") r = FieldReader(f) +r.reset() for docnum, fields in doc_fields: dn, df = r.read_fields() print docnum == dn, docnum, dn @@ -246,12 +252,15 @@ f = open("testFI", "wb") w = FieldIndexWriter(f) +w.begin(0) +w.reset() for docnum, offset in indexed_docs: w.write_document(docnum, offset) w.close() f = open("testFI", "rb") r = FieldIndexReader(f) +r.reset() for docnum, offset in indexed_docs: dn, o = r.read_document() print docnum == dn, docnum, dn @@ -306,12 +315,14 @@ f = open("test", "wb") w = TermWriter(f) +w.reset() for term, offset, frequency, doc_frequency in terms: w.write_term(term, offset, frequency, doc_frequency) w.close() f = open("test", "rb") r = TermReader(f) +r.reset() for term, offset, frequency, doc_frequency in terms: t, o, fr, df = r.read_term() print term == t, term, t @@ -334,12 +345,14 @@ f = open("test", "wb") w = TermIndexWriter(f) +w.reset() for term, offset, frequency, doc_frequency, info_offset in indexed_terms: w.write_term(term, offset, frequency, doc_frequency, info_offset) w.close() f = open("test", "rb") r = TermIndexReader(f) +r.reset() for term, offset, frequency, doc_frequency, info_offset in indexed_terms: t, o, fr, df, i = r.read_term() print term == t, term, t