1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 1000000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing zero. 87 88 elif number == 0: 89 self.f.write(chr(0)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_sorted_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Write the position deltas. 229 230 last = 0 231 for position in positions: 232 pos = position - last 233 self.write_number(pos) 234 last = position 235 236 self.last_docnum = docnum 237 238 return offset 239 240 def write_positions(self, docnum, positions): 241 242 """ 243 Write for the document 'docnum' the given 'positions'. 244 Return the offset of the written record. 245 """ 246 247 # Make sure that the positions are sorted. 248 249 positions.sort() 250 return self.write_sorted_positions(docnum, positions) 251 252 class PositionReader(FileReader): 253 254 "Reading position information from files." 255 256 def reset(self): 257 self.last_docnum = 0 258 259 def read_positions(self): 260 261 "Read positions, returning a document number and a list of positions." 262 263 # Read the document number delta and add it to the last number. 264 265 self.last_docnum += self.read_number() 266 267 # Read the number of positions. 268 269 npositions = self.read_number() 270 271 # Read the position deltas, adding each previous position to get the 272 # appropriate collection of absolute positions. 273 274 i = 0 275 last = 0 276 positions = [] 277 278 while i < npositions: 279 last += self.read_number() 280 positions.append(last) 281 i += 1 282 283 return self.last_docnum, positions 284 285 def read_term_positions(self, offset, count): 286 287 """ 288 Read all positions from 'offset', seeking to that position in the file 289 before reading. The number of documents available for reading is limited 290 to 'count'. 291 """ 292 293 # Duplicate the file handle. 294 295 f = fdopen(dup(self.f.fileno()), "rb") 296 f.seek(offset) 297 return PositionIterator(f, count) 298 299 class PositionIndexWriter(FileWriter): 300 301 "Writing position index information to files." 302 303 def reset(self): 304 self.last_docnum = 0 305 self.last_pos_offset = 0 306 307 def write_positions(self, docnum, pos_offset, count): 308 309 """ 310 Write the given 'docnum, 'pos_offset' and document 'count' to the 311 position index file. 312 """ 313 314 # Record the offset of this record. 315 316 offset = self.f.tell() 317 318 # Write the document number delta. 319 320 self.write_number(docnum - self.last_docnum) 321 self.last_docnum = docnum 322 323 # Write the position file offset delta. 324 325 self.write_number(pos_offset - self.last_pos_offset) 326 self.last_pos_offset = pos_offset 327 328 # Write the document count. 329 330 self.write_number(count) 331 332 return offset 333 334 class PositionIndexReader(FileReader): 335 336 "Reading position index information from files." 337 338 def reset(self): 339 self.last_docnum = 0 340 self.last_pos_offset = 0 341 342 def read_positions(self): 343 344 """ 345 Read a document number, a position file offset for the position index 346 file, and the number of documents in a section of that file. 347 """ 348 349 # Read the document number delta. 350 351 self.last_docnum += self.read_number() 352 353 # Read the offset delta. 354 355 self.last_pos_offset += self.read_number() 356 357 # Read the document count. 358 359 count = self.read_number() 360 361 return self.last_docnum, self.last_pos_offset, count 362 363 def read_term_positions(self, offset, doc_frequency): 364 365 """ 366 Read all positions from 'offset', seeking to that position in the file 367 before reading. The number of documents available for reading is limited 368 to 'doc_frequency'. 369 """ 370 371 # Duplicate the file handle. 372 373 f = fdopen(dup(self.f.fileno()), "rb") 374 f.seek(offset) 375 return PositionIndexIterator(f, doc_frequency) 376 377 # Iterators for position-related files. 378 379 class IteratorBase: 380 381 def __init__(self, count): 382 self.replenish(count) 383 384 def replenish(self, count): 385 self.count = count 386 self.read_documents = 0 387 388 def __len__(self): 389 return self.count 390 391 def sort(self): 392 pass # Stored document positions are already sorted. 393 394 def __iter__(self): 395 return self 396 397 class PositionIterator(PositionReader, IteratorBase): 398 399 "Iterating over document positions." 400 401 def __init__(self, f, count): 402 PositionReader.__init__(self, f) 403 IteratorBase.__init__(self, count) 404 405 def next(self): 406 407 "Read positions for a single document." 408 409 if self.read_documents < self.count: 410 self.read_documents += 1 411 return self.read_positions() 412 else: 413 raise StopIteration 414 415 class PositionIndexIterator(PositionIndexReader, IteratorBase): 416 417 "Iterating over document positions." 418 419 def __init__(self, f, count): 420 PositionIndexReader.__init__(self, f) 421 IteratorBase.__init__(self, count) 422 self.section_count = 0 423 424 def next(self): 425 426 "Read positions for a single document." 427 428 self.read_documents += self.section_count 429 if self.read_documents < self.count: 430 docnum, pos_offset, self.section_count = t = self.read_positions() 431 return t 432 else: 433 raise StopIteration 434 435 class PositionDictionaryWriter: 436 437 "Writing position dictionaries." 438 439 def __init__(self, position_writer, position_index_writer, interval): 440 self.position_writer = position_writer 441 self.position_index_writer = position_index_writer 442 self.interval = interval 443 444 def write_sorted_term_positions(self, doc_positions): 445 446 """ 447 Write all 'doc_positions' - a collection of tuples of the form (document 448 number, position list) - to the file. 449 450 Add some records to the index, making dictionary entries. 451 452 Return a tuple containing the offset of the written data, the frequency 453 (number of positions), and document frequency (number of documents) for 454 the term involved. 455 """ 456 457 # Reset the writers. 458 459 self.position_writer.reset() 460 self.position_index_writer.reset() 461 462 index_offset = None 463 464 # Write the positions. 465 466 frequency = 0 467 first_docnum = None 468 first_offset = None 469 count = 0 470 471 for docnum, positions in doc_positions: 472 pos_offset = self.position_writer.write_positions(docnum, positions) 473 474 # Retain the first record offset for a subsequent index entry. 475 476 if first_offset is None: 477 first_offset = pos_offset 478 first_docnum = docnum 479 480 frequency += len(positions) 481 count += 1 482 483 # Every {interval} entries, write an index entry. 484 485 if count == self.interval: 486 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 487 488 # Remember the first index entry offset. 489 490 if index_offset is None: 491 index_offset = io 492 493 first_offset = None 494 first_docnum = None 495 count = 0 496 497 # Reset the position writer so that position readers accessing 498 # a section start with the correct document number. 499 500 self.position_writer.reset() 501 502 # Finish writing an index entry for the remaining documents. 503 504 else: 505 if first_offset is not None: 506 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 507 508 # Remember the first index entry offset. 509 510 if index_offset is None: 511 index_offset = io 512 513 return index_offset, frequency, len(doc_positions) 514 515 def write_term_positions(self, doc_positions): 516 517 """ 518 Write all 'doc_positions' - a collection of tuples of the form (document 519 number, position list) - to the file. 520 521 Add some records to the index, making dictionary entries. 522 523 Return a tuple containing the offset of the written data, the frequency 524 (number of positions), and document frequency (number of documents) for 525 the term involved. 526 """ 527 528 doc_positions.sort() 529 return self.write_sorted_term_positions(doc_positions) 530 531 def close(self): 532 self.position_writer.close() 533 self.position_index_writer.close() 534 535 class PositionDictionaryReader: 536 537 "Reading position dictionaries." 538 539 def __init__(self, position_reader, position_index_reader): 540 self.position_reader = position_reader 541 self.position_index_reader = position_index_reader 542 543 def read_term_positions(self, offset, doc_frequency): 544 545 """ 546 Return an iterator for dictionary entries starting at 'offset' with the 547 given 'doc_frequency'. 548 """ 549 550 return PositionDictionaryIterator(self.position_reader, 551 self.position_index_reader, offset, doc_frequency) 552 553 def close(self): 554 self.position_reader.close() 555 self.position_index_reader.close() 556 557 class PositionDictionaryIterator: 558 559 "Iteration over position dictionary entries." 560 561 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 562 self.position_reader = position_reader 563 self.doc_frequency = doc_frequency 564 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 565 566 # Remember the last values. 567 568 self.found_docnum, self.found_positions = None, None 569 570 # Maintain state for the next index entry, if read. 571 572 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 573 574 # Initialise the current index entry and current position file iterator. 575 576 self._next_section() 577 self._init_section() 578 579 def __len__(self): 580 return self.doc_frequency 581 582 def sort(self): 583 pass 584 585 def __iter__(self): 586 return self 587 588 def next(self): 589 590 """ 591 Attempt to get the next document record from the section in the 592 positions file. 593 """ 594 595 # Return any visited but unrequested record. 596 597 if self.found_docnum is not None: 598 t = self.found_docnum, self.found_positions 599 self.found_docnum, self.found_positions = None, None 600 return t 601 602 # Or search for the next record. 603 604 while 1: 605 606 # Either return the next record. 607 608 try: 609 return self.iterator.next() 610 611 # Or, where a section is finished, get the next section and try again. 612 613 except StopIteration: 614 615 # Where a section follows, update the index iterator, but keep 616 # reading using the same file iterator (since the data should 617 # just follow on from the last section). 618 619 self._next_section() 620 self.iterator.replenish(self.section_count) 621 622 # Reset the state of the iterator to make sure that document 623 # numbers are correct. 624 625 self.iterator.reset() 626 627 def from_document(self, docnum): 628 629 """ 630 Attempt to navigate to a positions entry for the given 'docnum', 631 returning the positions for 'docnum', or None otherwise. 632 """ 633 634 # Return any unrequested document positions. 635 636 if docnum == self.found_docnum: 637 return self.found_positions 638 639 # Read ahead in the index until the next entry refers to a document 640 # later than the desired document. 641 642 try: 643 if self.next_docnum is None: 644 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 645 646 # Read until the next entry is after the desired document number, 647 # or until the end of the results. 648 649 while self.next_docnum <= docnum: 650 self._next_read_section() 651 if self.docnum < docnum: 652 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 653 else: 654 break 655 656 except StopIteration: 657 pass 658 659 # Navigate in the position file to the document. 660 661 self._init_section() 662 663 try: 664 while 1: 665 found_docnum, found_positions = self.iterator.next() 666 667 # Return the desired document positions or None (retaining the 668 # positions for the document immediately after). 669 670 if docnum == found_docnum: 671 return found_positions 672 elif docnum < found_docnum: 673 self.found_docnum, self.found_positions = found_docnum, found_positions 674 return None 675 676 except StopIteration: 677 return None 678 679 # Internal methods. 680 681 def _next_section(self): 682 683 "Attempt to get the next section in the index." 684 685 if self.next_docnum is None: 686 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 687 else: 688 self._next_read_section() 689 690 def _next_read_section(self): 691 692 """ 693 Make the next index entry the current one without reading from the 694 index. 695 """ 696 697 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 698 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 699 700 def _init_section(self): 701 702 "Initialise the iterator for the section in the position file." 703 704 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 705 706 class TermWriter(FileWriter): 707 708 "Writing term information to files." 709 710 def reset(self): 711 self.last_term = "" 712 self.last_offset = 0 713 714 def write_term(self, term, offset, frequency, doc_frequency): 715 716 """ 717 Write the given 'term', its position file 'offset', its 'frequency' and 718 its 'doc_frequency' (number of documents in which it appears) to the 719 term information file. Return the offset after the term information was 720 written to the file. 721 """ 722 723 # Write the prefix length and term suffix. 724 725 common = len(commonprefix([self.last_term, term])) 726 suffix = term[common:] 727 728 self.write_number(common) 729 self.write_string(suffix) 730 731 # Write the offset delta. 732 733 self.write_number(offset - self.last_offset) 734 735 # Write the frequency. 736 737 self.write_number(frequency) 738 739 # Write the document frequency. 740 741 self.write_number(doc_frequency) 742 743 self.last_term = term 744 self.last_offset = offset 745 746 return self.f.tell() 747 748 class TermReader(FileReader): 749 750 "Reading term information from files." 751 752 def reset(self): 753 self.last_term = "" 754 self.last_offset = 0 755 756 def read_term(self): 757 758 """ 759 Read a term, its position file offset, its frequency and its document 760 frequency from the term information file. 761 """ 762 763 # Read the prefix length and term suffix. 764 765 common = self.read_number() 766 suffix = self.read_string() 767 768 self.last_term = self.last_term[:common] + suffix 769 770 # Read the offset delta. 771 772 self.last_offset += self.read_number() 773 774 # Read the frequency. 775 776 frequency = self.read_number() 777 778 # Read the document frequency. 779 780 doc_frequency = self.read_number() 781 782 return self.last_term, self.last_offset, frequency, doc_frequency 783 784 def go_to_term(self, term, offset, info_offset): 785 786 """ 787 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 788 permits the scanning for later terms from the specified term. 789 """ 790 791 self.f.seek(info_offset) 792 self.last_term = term 793 self.last_offset = offset 794 795 class TermIndexWriter(TermWriter): 796 797 "Writing term dictionary index details to files." 798 799 def reset(self): 800 TermWriter.reset(self) 801 self.last_info_offset = 0 802 803 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 804 805 """ 806 Write the given 'term', its position file 'offset', its 'frequency' and 807 its 'doc_frequency' to the term dictionary index file, along with the 808 'info_offset' in the term information file. 809 """ 810 811 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 812 813 # Write the information file offset delta. 814 815 self.write_number(info_offset - self.last_info_offset) 816 self.last_info_offset = info_offset 817 818 class TermIndexReader(TermReader): 819 820 "Reading term dictionary index details from files." 821 822 def reset(self): 823 TermReader.reset(self) 824 self.last_info_offset = 0 825 826 def read_term(self): 827 828 """ 829 Read a term, its position file offset, its frequency, its document 830 frequency and a term information file offset from the term dictionary 831 index file. 832 """ 833 834 term, offset, frequency, doc_frequency = TermReader.read_term(self) 835 836 # Read the offset delta. 837 838 self.last_info_offset += self.read_number() 839 840 return term, offset, frequency, doc_frequency, self.last_info_offset 841 842 class TermDictionaryWriter: 843 844 "Writing term dictionaries." 845 846 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 847 self.info_writer = info_writer 848 self.index_writer = index_writer 849 self.position_dict_writer = position_dict_writer 850 self.interval = interval 851 self.entry = 0 852 853 def _write_term(self, term, offset, frequency, doc_frequency): 854 855 """ 856 Write the given 'term', its position file 'offset', its 'frequency' and 857 its 'doc_frequency' (number of documents in which it appears) to the 858 term information file. Return the offset after the term information was 859 written to the file. 860 """ 861 862 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 863 864 if self.entry % self.interval == 0: 865 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 866 867 self.entry += 1 868 869 def write_term_positions(self, term, doc_positions): 870 871 """ 872 Write the given 'term' and the 'doc_positions' recording the documents 873 and positions at which the term is found. 874 """ 875 876 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 877 self._write_term(term, offset, frequency, doc_frequency) 878 879 def close(self): 880 self.info_writer.close() 881 self.index_writer.close() 882 self.position_dict_writer.close() 883 884 class TermDictionaryReader: 885 886 "Reading term dictionaries." 887 888 def __init__(self, info_reader, index_reader, position_dict_reader): 889 self.info_reader = info_reader 890 self.index_reader = index_reader 891 self.position_dict_reader = position_dict_reader 892 893 self.terms = [] 894 try: 895 while 1: 896 self.terms.append(self.index_reader.read_term()) 897 except EOFError: 898 pass 899 900 # Large numbers for ordering purposes. 901 902 self.max_offset = self.terms[-1][1] + 1 903 904 def _find_closest_entry(self, term): 905 906 """ 907 Find the offsets and frequencies of 'term' from the term dictionary or 908 the closest term starting with the value of 'term'. 909 910 Return the closest index entry consisting of a term, the position file 911 offset, the term frequency, the document frequency, and the term details 912 file offset. 913 """ 914 915 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 916 917 # Get the entry position providing the term or one preceding it. 918 # If no entry precedes the requested term, return the very first entry 919 # as the closest. 920 921 if i == -1: 922 return self.terms[0] 923 else: 924 return self.terms[i] 925 926 def _find_closest_term(self, term): 927 928 """ 929 Find the offsets and frequencies of 'term' from the term dictionary or 930 the closest term starting with the value of 'term'. 931 932 Return the closest term (or the term itself), the position file offset, 933 the term frequency, the document frequency, and the term details file 934 offset (or None if the reader is already positioned). 935 """ 936 937 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 938 939 # Where the term is found immediately, return the offset and 940 # frequencies. If the term does not appear, return the details of the 941 # closest entry. 942 943 if term <= found_term: 944 return found_term, offset, frequency, doc_frequency, info_offset 945 946 # Otherwise, seek past the index term's entry in the information file 947 # and scan for the desired term. 948 949 else: 950 self.info_reader.go_to_term(found_term, offset, info_offset) 951 try: 952 while term > found_term: 953 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 954 except EOFError: 955 pass 956 957 return found_term, offset, frequency, doc_frequency, None 958 959 def _find_term(self, term): 960 961 """ 962 Find the position file offset and frequency of 'term' from the term 963 dictionary. 964 """ 965 966 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 967 968 # If the term is found, return the offset and frequencies. 969 970 if term == found_term: 971 return offset, frequency, doc_frequency 972 else: 973 return None 974 975 def _get_positions(self, offset, doc_frequency): 976 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 977 978 # Sequential access methods. 979 980 def rewind(self): 981 self.info_reader.rewind() 982 983 def read_term(self): 984 985 """ 986 Return the next term, its frequency, its document frequency, and the 987 documents and positions at which the term is found. 988 """ 989 990 term, offset, frequency, doc_frequency = self.info_reader.read_term() 991 positions = self._get_positions(offset, doc_frequency) 992 return term, frequency, doc_frequency, positions 993 994 # Query methods. 995 996 def find_terms(self, term): 997 998 "Return all terms whose values start with the value of 'term'." 999 1000 terms = [] 1001 1002 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1003 1004 # Position the reader, if necessary. 1005 1006 if info_offset is not None: 1007 self.info_reader.go_to_term(found_term, offset, info_offset) 1008 1009 # Read and record terms. 1010 1011 try: 1012 # Add the found term if it starts with the specified term. 1013 1014 while found_term.startswith(term): 1015 terms.append(found_term) 1016 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1017 1018 except EOFError: 1019 pass 1020 1021 return terms 1022 1023 def find_positions(self, term): 1024 1025 "Return the documents and positions at which the given 'term' is found." 1026 1027 t = self._find_term(term) 1028 if t is None: 1029 return None 1030 else: 1031 offset, frequency, doc_frequency = t 1032 return self._get_positions(offset, doc_frequency) 1033 1034 def get_frequency(self, term): 1035 1036 "Return the frequency of the given 'term'." 1037 1038 t = self._find_term(term) 1039 if t is None: 1040 return None 1041 else: 1042 offset, frequency, doc_frequency = t 1043 return frequency 1044 1045 def get_document_frequency(self, term): 1046 1047 "Return the document frequency of the given 'term'." 1048 1049 t = self._find_term(term) 1050 if t is None: 1051 return None 1052 else: 1053 offset, frequency, doc_frequency = t 1054 return doc_frequency 1055 1056 def close(self): 1057 self.info_reader.close() 1058 self.index_reader.close() 1059 self.position_dict_reader.close() 1060 1061 # Specific classes for storing document information. 1062 1063 class FieldWriter(FileWriter): 1064 1065 "Writing field data to files." 1066 1067 def reset(self): 1068 self.last_docnum = 0 1069 1070 def write_fields(self, docnum, fields): 1071 1072 """ 1073 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1074 representing field identifiers and values respectively). 1075 Return the offset at which the fields are stored. 1076 """ 1077 1078 offset = self.f.tell() 1079 1080 # Write the document number delta. 1081 1082 self.write_number(docnum - self.last_docnum) 1083 1084 # Write the number of fields. 1085 1086 self.write_number(len(fields)) 1087 1088 # Write the fields themselves. 1089 1090 for i, field in fields: 1091 self.write_number(i) 1092 self.write_string(field, 1) # compress 1093 1094 self.last_docnum = docnum 1095 return offset 1096 1097 class FieldReader(FileReader): 1098 1099 "Reading field data from files." 1100 1101 def reset(self): 1102 self.last_docnum = 0 1103 1104 def read_fields(self): 1105 1106 """ 1107 Read fields from the file, returning a tuple containing the document 1108 number and a list of field (identifier, value) pairs. 1109 """ 1110 1111 # Read the document number. 1112 1113 self.last_docnum += self.read_number() 1114 1115 # Read the number of fields. 1116 1117 nfields = self.read_number() 1118 1119 # Collect the fields. 1120 1121 fields = [] 1122 i = 0 1123 1124 while i < nfields: 1125 identifier = self.read_number() 1126 value = self.read_string(1) # decompress 1127 fields.append((identifier, value)) 1128 i += 1 1129 1130 return self.last_docnum, fields 1131 1132 def read_document_fields(self, docnum, offset): 1133 1134 """ 1135 Read fields for 'docnum' at the given 'offset'. This permits the 1136 retrieval of details for the specified document, as well as scanning for 1137 later documents. 1138 """ 1139 1140 self.f.seek(offset) 1141 bad_docnum, fields = self.read_fields() 1142 self.last_docnum = docnum 1143 return docnum, fields 1144 1145 class FieldIndexWriter(FileWriter): 1146 1147 "Writing field index details to files." 1148 1149 def reset(self): 1150 self.last_docnum = 0 1151 self.last_offset = 0 1152 1153 def write_document(self, docnum, offset): 1154 1155 """ 1156 Write for the given 'docnum', the 'offset' at which the fields for the 1157 document are stored in the fields file. 1158 """ 1159 1160 # Write the document number and offset deltas. 1161 1162 self.write_number(docnum - self.last_docnum) 1163 self.write_number(offset - self.last_offset) 1164 1165 self.last_docnum = docnum 1166 self.last_offset = offset 1167 1168 class FieldIndexReader(FileReader): 1169 1170 "Reading field index details from files." 1171 1172 def reset(self): 1173 self.last_docnum = 0 1174 self.last_offset = 0 1175 1176 def read_document(self): 1177 1178 "Read a document number and field file offset." 1179 1180 # Read the document number delta and offset. 1181 1182 self.last_docnum += self.read_number() 1183 self.last_offset += self.read_number() 1184 1185 return self.last_docnum, self.last_offset 1186 1187 class FieldDictionaryWriter: 1188 1189 "Writing field dictionary details." 1190 1191 def __init__(self, field_writer, field_index_writer, interval): 1192 self.field_writer = field_writer 1193 self.field_index_writer = field_index_writer 1194 self.interval = interval 1195 self.entry = 0 1196 1197 def write_fields(self, docnum, fields): 1198 1199 "Write details of the document with the given 'docnum' and 'fields'." 1200 1201 offset = self.field_writer.write_fields(docnum, fields) 1202 1203 if self.entry % self.interval == 0: 1204 self.field_index_writer.write_document(docnum, offset) 1205 1206 self.entry += 1 1207 1208 def close(self): 1209 self.field_writer.close() 1210 self.field_index_writer.close() 1211 1212 class FieldDictionaryReader: 1213 1214 "Reading field dictionary details." 1215 1216 def __init__(self, field_reader, field_index_reader): 1217 self.field_reader = field_reader 1218 self.field_index_reader = field_index_reader 1219 1220 self.docs = [] 1221 try: 1222 while 1: 1223 self.docs.append(self.field_index_reader.read_document()) 1224 except EOFError: 1225 pass 1226 1227 # Large numbers for ordering purposes. 1228 1229 self.max_offset = self.docs[-1][1] 1230 1231 def rewind(self): 1232 self.field_reader.rewind() 1233 1234 def read_fields(self): 1235 1236 "Return the next document number and fields." 1237 1238 return self.field_reader.read_fields() 1239 1240 def get_fields(self, docnum): 1241 1242 "Read the fields of the document with the given 'docnum'." 1243 1244 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1245 1246 # Get the entry position providing the term or one preceding it. 1247 1248 if i == -1: 1249 return None 1250 1251 found_docnum, offset = self.docs[i] 1252 1253 # Read from the fields file. 1254 1255 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1256 1257 # Scan for the document, if necessary. 1258 1259 try: 1260 while docnum > found_docnum: 1261 found_docnum, fields = self.field_reader.read_fields() 1262 except EOFError: 1263 pass 1264 1265 # If the document is found, return the fields. 1266 1267 if docnum == found_docnum: 1268 return fields 1269 else: 1270 return None 1271 1272 def close(self): 1273 self.field_reader.close() 1274 self.field_index_reader.close() 1275 1276 # Dictionary merging classes. 1277 1278 class Merger: 1279 1280 "Merge files." 1281 1282 def __init__(self, writer, readers): 1283 self.writer = writer 1284 self.readers = readers 1285 1286 def close(self): 1287 for reader in self.readers: 1288 reader.close() 1289 self.writer.close() 1290 1291 class TermDictionaryMerger(Merger): 1292 1293 "Merge term and position files." 1294 1295 def merge(self): 1296 1297 """ 1298 Merge terms and positions from the readers, sending them to the writer. 1299 """ 1300 1301 entries = [] 1302 1303 # Get the first entries from the readers. 1304 1305 for partition, reader in enumerate(self.readers): 1306 reader.rewind() 1307 1308 try: 1309 term, frequency, doc_frequency, positions = reader.read_term() 1310 insort_right(entries, (term, positions, partition)) 1311 except EOFError: 1312 pass 1313 1314 # While entries are available, write them out in order, merging where 1315 # appropriate. 1316 1317 while entries: 1318 term, doc_positions, partition = entries[0] 1319 to_update = [partition] 1320 1321 nentries = len(entries) 1322 i = 1 1323 1324 # Find other entries for the term. 1325 1326 while i < nentries: 1327 other_term, other_doc_positions, other_partition = entries[i] 1328 1329 # For such entries, merge the positions. 1330 1331 if other_term == term: 1332 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1333 to_update.append(other_partition) 1334 i += 1 1335 else: 1336 break 1337 1338 # Write the combined term details. 1339 1340 self.writer.write_term_positions(term, doc_positions) 1341 1342 # Update the entries from the affected readers. 1343 1344 del entries[:i] 1345 1346 for partition in to_update: 1347 try: 1348 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1349 insort_right(entries, (term, positions, partition)) 1350 except EOFError: 1351 pass 1352 1353 def merge_positions(self, doc_positions, other_doc_positions): 1354 1355 """ 1356 Merge 'doc_positions' with 'other_doc_positions' so that common document 1357 records contain positions from both collections. 1358 """ 1359 1360 doc_position_dict = dict(doc_positions) 1361 1362 for docnum, positions in other_doc_positions: 1363 if doc_position_dict.has_key(docnum): 1364 doc_position_dict[docnum] += positions 1365 else: 1366 doc_position_dict[docnum] = positions 1367 1368 return doc_position_dict.items() 1369 1370 class FieldDictionaryMerger(Merger): 1371 1372 "Merge field files." 1373 1374 def merge(self): 1375 1376 """ 1377 Merge fields from the readers, sending them to the writer. 1378 """ 1379 1380 entries = [] 1381 1382 # Get the first entries from the readers. 1383 1384 for partition, reader in enumerate(self.readers): 1385 reader.rewind() 1386 1387 try: 1388 docnum, fields = reader.read_fields() 1389 insort_right(entries, (docnum, fields, partition)) 1390 except EOFError: 1391 pass 1392 1393 # While entries are available, write them out in order, merging where 1394 # appropriate. 1395 1396 while entries: 1397 docnum, fields, partition = entries[0] 1398 to_update = [partition] 1399 1400 nentries = len(entries) 1401 i = 1 1402 1403 # Find other entries for the term. 1404 1405 while i < nentries: 1406 other_docnum, other_fields, other_partition = entries[i] 1407 1408 # For such entries, merge the positions. 1409 1410 if other_docnum == docnum: 1411 fields += other_fields 1412 to_update.append(other_partition) 1413 i += 1 1414 else: 1415 break 1416 1417 # Write the combined term details. 1418 1419 self.writer.write_fields(docnum, fields) 1420 1421 # Update the entries from the affected readers. 1422 1423 del entries[:i] 1424 1425 for partition in to_update: 1426 try: 1427 docnum, fields = self.readers[partition].read_fields() 1428 insort_right(entries, (docnum, fields, partition)) 1429 except EOFError: 1430 pass 1431 1432 # Utility functions. 1433 1434 def get_term_writer(pathname, partition, interval, doc_interval): 1435 1436 """ 1437 Return a term dictionary writer using files under the given 'pathname' 1438 labelled according to the given 'partition', using the given indexing 1439 'interval' for terms and 'doc_interval' for document position records. 1440 """ 1441 1442 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1443 info_writer = TermWriter(tdf) 1444 1445 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1446 index_writer = TermIndexWriter(tdif) 1447 1448 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1449 positions_writer = PositionWriter(tpf) 1450 1451 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1452 positions_index_writer = PositionIndexWriter(tpif) 1453 1454 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1455 1456 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1457 1458 def get_field_writer(pathname, partition, interval): 1459 1460 """ 1461 Return a field dictionary writer using files under the given 'pathname' 1462 labelled according to the given 'partition', using the given indexing 1463 'interval'. 1464 """ 1465 1466 ff = open(join(pathname, "fields-%s" % partition), "wb") 1467 field_writer = FieldWriter(ff) 1468 1469 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1470 field_index_writer = FieldIndexWriter(fif) 1471 1472 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1473 1474 def get_term_reader(pathname, partition): 1475 1476 """ 1477 Return a term dictionary reader using files under the given 'pathname' 1478 labelled according to the given 'partition'. 1479 """ 1480 1481 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1482 info_reader = TermReader(tdf) 1483 1484 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1485 index_reader = TermIndexReader(tdif) 1486 1487 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1488 positions_reader = PositionReader(tpf) 1489 1490 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1491 positions_index_reader = PositionIndexReader(tpif) 1492 1493 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1494 1495 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1496 1497 def get_field_reader(pathname, partition): 1498 1499 """ 1500 Return a field dictionary reader using files under the given 'pathname' 1501 labelled according to the given 'partition'. 1502 """ 1503 1504 ff = open(join(pathname, "fields-%s" % partition), "rb") 1505 field_reader = FieldReader(ff) 1506 1507 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1508 field_index_reader = FieldIndexReader(fif) 1509 1510 return FieldDictionaryReader(field_reader, field_index_reader) 1511 1512 def rename_files(pathname, names, from_partition, to_partition): 1513 for name in names: 1514 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1515 1516 def rename_term_files(pathname, from_partition, to_partition): 1517 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1518 1519 def rename_field_files(pathname, from_partition, to_partition): 1520 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1521 1522 def remove_files(pathname, names, partition): 1523 for name in names: 1524 remove(join(pathname, "%s-%s" % (name, partition))) 1525 1526 def remove_term_files(pathname, partition): 1527 remove_files(pathname, TERM_FILENAMES, partition) 1528 1529 def remove_field_files(pathname, partition): 1530 remove_files(pathname, FIELD_FILENAMES, partition) 1531 1532 # High-level classes. 1533 1534 class IndexWriter: 1535 1536 """ 1537 Building term information and writing it to the term and field dictionaries. 1538 """ 1539 1540 def __init__(self, pathname, interval, doc_interval, flush_interval): 1541 self.pathname = pathname 1542 self.interval = interval 1543 self.doc_interval = doc_interval 1544 self.flush_interval = flush_interval 1545 1546 self.dict_partition = 0 1547 self.field_dict_partition = 0 1548 1549 self.terms = {} 1550 self.docs = {} 1551 1552 self.position_counter = 0 1553 self.field_counter = 0 1554 1555 def add_position(self, term, docnum, position): 1556 1557 """ 1558 Add a position entry for the given 'term' in the document with the given 1559 'docnum', indicating the given 'position'. 1560 """ 1561 1562 if not self.terms.has_key(term): 1563 doc_positions = self.terms[term] = {} 1564 else: 1565 doc_positions = self.terms[term] 1566 1567 if not doc_positions.has_key(docnum): 1568 doc = doc_positions[docnum] = [] 1569 else: 1570 doc = doc_positions[docnum] 1571 1572 doc.append(position) 1573 1574 self.position_counter += 1 1575 if self.flush_interval and self.position_counter >= self.flush_interval: 1576 self.flush_terms() 1577 self.position_counter = 0 1578 1579 def add_field(self, docnum, identifier, value): 1580 1581 """ 1582 Add for the document with the given 'docnum' a field having the given 1583 'identifier' and 'value'. 1584 """ 1585 1586 if not self.docs.has_key(docnum): 1587 doc_fields = self.docs[docnum] = [] 1588 else: 1589 doc_fields = self.docs[docnum] 1590 1591 doc_fields.append((identifier, unicode(value))) # convert to string 1592 1593 self.field_counter += 1 1594 if self.flush_interval and self.field_counter >= self.flush_interval: 1595 self.flush_fields() 1596 self.field_counter = 0 1597 1598 def set_fields(self, docnum, fields): 1599 1600 """ 1601 Add for the document with the given 'docnum' the given 'fields': a list 1602 of tuples each containing an integer identifier and a string value. 1603 """ 1604 1605 self.docs[docnum] = fields 1606 1607 self.field_counter += len(fields) 1608 if self.flush_interval and self.field_counter >= self.flush_interval: 1609 self.flush_fields() 1610 self.field_counter = 0 1611 1612 def get_term_writer(self): 1613 1614 "Return a term dictionary writer for the current partition." 1615 1616 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1617 1618 def get_field_writer(self): 1619 1620 "Return a field dictionary writer for the current partition." 1621 1622 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1623 1624 def flush_terms(self): 1625 1626 "Flush terms into the current term dictionary partition." 1627 1628 # Get the terms in order. 1629 1630 terms = self.terms.items() 1631 terms.sort() 1632 1633 dict_writer = self.get_term_writer() 1634 1635 for term, doc_positions in terms: 1636 doc_positions = doc_positions.items() 1637 dict_writer.write_term_positions(term, doc_positions) 1638 1639 dict_writer.close() 1640 1641 self.terms = {} 1642 self.dict_partition += 1 1643 1644 def flush_fields(self): 1645 1646 "Flush fields into the current term dictionary partition." 1647 1648 # Get the documents in order. 1649 1650 docs = self.docs.items() 1651 docs.sort() 1652 1653 field_dict_writer = self.get_field_writer() 1654 1655 for docnum, fields in docs: 1656 field_dict_writer.write_fields(docnum, fields) 1657 1658 field_dict_writer.close() 1659 1660 self.docs = {} 1661 self.field_dict_partition += 1 1662 1663 def close(self): 1664 if self.terms: 1665 self.flush_terms() 1666 if self.docs: 1667 self.flush_fields() 1668 1669 class IndexReader: 1670 1671 "Accessing the term and field dictionaries." 1672 1673 def __init__(self, pathname): 1674 self.dict_reader = get_term_reader(pathname, "merged") 1675 self.field_dict_reader = get_field_reader(pathname, "merged") 1676 1677 def find_terms(self, term): 1678 return self.dict_reader.find_terms(term) 1679 1680 def find_positions(self, term): 1681 return self.dict_reader.find_positions(term) 1682 1683 def get_frequency(self, term): 1684 return self.dict_reader.get_frequency(term) 1685 1686 def get_document_frequency(self, term): 1687 return self.dict_reader.get_document_frequency(term) 1688 1689 def get_fields(self, docnum): 1690 return self.field_dict_reader.get_fields(docnum) 1691 1692 def close(self): 1693 self.dict_reader.close() 1694 self.field_dict_reader.close() 1695 1696 class Index: 1697 1698 "An inverted index solution encapsulating the various components." 1699 1700 def __init__(self, pathname): 1701 self.pathname = pathname 1702 self.reader = None 1703 self.writer = None 1704 1705 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1706 1707 """ 1708 Return a writer, optionally using the given indexing 'interval', 1709 'doc_interval' and 'flush_interval'. 1710 """ 1711 1712 if not exists(self.pathname): 1713 mkdir(self.pathname) 1714 1715 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1716 return self.writer 1717 1718 def get_reader(self, partition=0): 1719 1720 "Return a reader for the index." 1721 1722 # Ensure that only one partition exists. 1723 1724 self.merge() 1725 return self._get_reader(partition) 1726 1727 def _get_reader(self, partition): 1728 1729 "Return a reader for the index." 1730 1731 if not exists(self.pathname): 1732 raise OSError, "Index path %r does not exist." % self.pathname 1733 1734 self.reader = IndexReader(self.pathname) 1735 return self.reader 1736 1737 def merge(self): 1738 1739 "Merge/optimise index partitions." 1740 1741 self.merge_terms() 1742 self.merge_fields() 1743 1744 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1745 1746 """ 1747 Merge term dictionaries using the given indexing 'interval' and 1748 'doc_interval'. 1749 """ 1750 1751 readers = [] 1752 partitions = set() 1753 1754 for filename in listdir(self.pathname): 1755 if filename.startswith("terms-"): # 6 character prefix 1756 partition = filename[6:] 1757 readers.append(get_term_reader(self.pathname, partition)) 1758 partitions.add(partition) 1759 1760 # Write directly to a dictionary. 1761 1762 if len(readers) > 1: 1763 if "merged" in partitions: 1764 rename_term_files(self.pathname, "merged", "old-merged") 1765 partitions.remove("merged") 1766 partitions.add("old-merged") 1767 1768 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1769 merger = TermDictionaryMerger(writer, readers) 1770 merger.merge() 1771 merger.close() 1772 1773 # Remove old files. 1774 1775 for partition in partitions: 1776 remove_term_files(self.pathname, partition) 1777 1778 elif len(readers) == 1: 1779 partition = list(partitions)[0] 1780 if partition != "merged": 1781 rename_term_files(self.pathname, partition, "merged") 1782 1783 def merge_fields(self, interval=FIELD_INTERVAL): 1784 1785 "Merge field dictionaries using the given indexing 'interval'." 1786 1787 readers = [] 1788 partitions = set() 1789 1790 for filename in listdir(self.pathname): 1791 if filename.startswith("fields-"): # 7 character prefix 1792 partition = filename[7:] 1793 readers.append(get_field_reader(self.pathname, partition)) 1794 partitions.add(partition) 1795 1796 # Write directly to a dictionary. 1797 1798 if len(readers) > 1: 1799 if "merged" in partitions: 1800 rename_field_files(self.pathname, "merged", "old-merged") 1801 partitions.remove("merged") 1802 partitions.add("old-merged") 1803 1804 writer = get_field_writer(self.pathname, "merged", interval) 1805 merger = FieldDictionaryMerger(writer, readers) 1806 merger.merge() 1807 merger.close() 1808 1809 # Remove old files. 1810 1811 for partition in partitions: 1812 remove_field_files(self.pathname, partition) 1813 1814 elif len(readers) == 1: 1815 partition = list(partitions)[0] 1816 if partition != "merged": 1817 rename_field_files(self.pathname, partition, "merged") 1818 1819 def close(self): 1820 if self.reader is not None: 1821 self.reader.close() 1822 self.reader = None 1823 if self.writer is not None: 1824 self.writer.close() 1825 self.writer = None 1826 1827 # vim: tabstop=4 expandtab shiftwidth=4