1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing a 7-bit number. 87 88 elif number < 128: 89 self.f.write(chr(number)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Make sure that the positions are sorted. 229 230 positions.sort() 231 232 # Write the position deltas. 233 234 last = 0 235 for position in positions: 236 pos = position - last 237 self.write_number(pos) 238 last = position 239 240 self.last_docnum = docnum 241 242 return offset 243 244 class PositionReader(FileReader): 245 246 "Reading position information from files." 247 248 def reset(self): 249 self.last_docnum = 0 250 251 def read_positions(self): 252 253 "Read positions, returning a document number and a list of positions." 254 255 # Read the document number delta and add it to the last number. 256 257 self.last_docnum += self.read_number() 258 259 # Read the number of positions. 260 261 npositions = self.read_number() 262 263 # Read the position deltas, adding each previous position to get the 264 # appropriate collection of absolute positions. 265 266 i = 0 267 last = 0 268 positions = [] 269 270 while i < npositions: 271 last += self.read_number() 272 positions.append(last) 273 i += 1 274 275 return self.last_docnum, positions 276 277 def read_term_positions(self, offset, count): 278 279 """ 280 Read all positions from 'offset', seeking to that position in the file 281 before reading. The number of documents available for reading is limited 282 to 'count'. 283 """ 284 285 # Duplicate the file handle. 286 287 f = fdopen(dup(self.f.fileno()), "rb") 288 f.seek(offset) 289 return PositionIterator(f, count) 290 291 class PositionIndexWriter(FileWriter): 292 293 "Writing position index information to files." 294 295 def reset(self): 296 self.last_docnum = 0 297 self.last_pos_offset = 0 298 299 def write_positions(self, docnum, pos_offset, count): 300 301 """ 302 Write the given 'docnum, 'pos_offset' and document 'count' to the 303 position index file. 304 """ 305 306 # Record the offset of this record. 307 308 offset = self.f.tell() 309 310 # Write the document number delta. 311 312 self.write_number(docnum - self.last_docnum) 313 self.last_docnum = docnum 314 315 # Write the position file offset delta. 316 317 self.write_number(pos_offset - self.last_pos_offset) 318 self.last_pos_offset = pos_offset 319 320 # Write the document count. 321 322 self.write_number(count) 323 324 return offset 325 326 class PositionIndexReader(FileReader): 327 328 "Reading position index information from files." 329 330 def reset(self): 331 self.last_docnum = 0 332 self.last_pos_offset = 0 333 334 def read_positions(self): 335 336 """ 337 Read a document number, a position file offset for the position index 338 file, and the number of documents in a section of that file. 339 """ 340 341 # Read the document number delta. 342 343 self.last_docnum += self.read_number() 344 345 # Read the offset delta. 346 347 self.last_pos_offset += self.read_number() 348 349 # Read the document count. 350 351 count = self.read_number() 352 353 return self.last_docnum, self.last_pos_offset, count 354 355 def read_term_positions(self, offset, doc_frequency): 356 357 """ 358 Read all positions from 'offset', seeking to that position in the file 359 before reading. The number of documents available for reading is limited 360 to 'doc_frequency'. 361 """ 362 363 # Duplicate the file handle. 364 365 f = fdopen(dup(self.f.fileno()), "rb") 366 f.seek(offset) 367 return PositionIndexIterator(f, doc_frequency) 368 369 # Iterators for position-related files. 370 371 class IteratorBase: 372 373 def __init__(self, count): 374 self.replenish(count) 375 376 def replenish(self, count): 377 self.count = count 378 self.read_documents = 0 379 380 def __len__(self): 381 return self.count 382 383 def sort(self): 384 pass # Stored document positions are already sorted. 385 386 def __iter__(self): 387 return self 388 389 class PositionIterator(PositionReader, IteratorBase): 390 391 "Iterating over document positions." 392 393 def __init__(self, f, count): 394 PositionReader.__init__(self, f) 395 IteratorBase.__init__(self, count) 396 397 def next(self): 398 399 "Read positions for a single document." 400 401 if self.read_documents < self.count: 402 self.read_documents += 1 403 return self.read_positions() 404 else: 405 raise StopIteration 406 407 class PositionIndexIterator(PositionIndexReader, IteratorBase): 408 409 "Iterating over document positions." 410 411 def __init__(self, f, count): 412 PositionIndexReader.__init__(self, f) 413 IteratorBase.__init__(self, count) 414 self.section_count = 0 415 416 def next(self): 417 418 "Read positions for a single document." 419 420 self.read_documents += self.section_count 421 if self.read_documents < self.count: 422 docnum, pos_offset, self.section_count = t = self.read_positions() 423 return t 424 else: 425 raise StopIteration 426 427 class PositionDictionaryWriter: 428 429 "Writing position dictionaries." 430 431 def __init__(self, position_writer, position_index_writer, interval): 432 self.position_writer = position_writer 433 self.position_index_writer = position_index_writer 434 self.interval = interval 435 436 def write_term_positions(self, doc_positions): 437 438 """ 439 Write all 'doc_positions' - a collection of tuples of the form (document 440 number, position list) - to the file. 441 442 Add some records to the index, making dictionary entries. 443 444 Return a tuple containing the offset of the written data, the frequency 445 (number of positions), and document frequency (number of documents) for 446 the term involved. 447 """ 448 449 # Reset the writers. 450 451 self.position_writer.reset() 452 self.position_index_writer.reset() 453 454 index_offset = None 455 456 # Write the positions. 457 458 frequency = 0 459 first_docnum = None 460 first_offset = None 461 count = 0 462 463 doc_positions.sort() 464 465 for docnum, positions in doc_positions: 466 pos_offset = self.position_writer.write_positions(docnum, positions) 467 468 # Retain the first record offset for a subsequent index entry. 469 470 if first_offset is None: 471 first_offset = pos_offset 472 first_docnum = docnum 473 474 frequency += len(positions) 475 count += 1 476 477 # Every {interval} entries, write an index entry. 478 479 if count == self.interval: 480 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 481 482 # Remember the first index entry offset. 483 484 if index_offset is None: 485 index_offset = io 486 487 first_offset = None 488 first_docnum = None 489 count = 0 490 491 # Reset the position writer so that position readers accessing 492 # a section start with the correct document number. 493 494 self.position_writer.reset() 495 496 # Finish writing an index entry for the remaining documents. 497 498 else: 499 if first_offset is not None: 500 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 501 502 # Remember the first index entry offset. 503 504 if index_offset is None: 505 index_offset = io 506 507 return index_offset, frequency, len(doc_positions) 508 509 def close(self): 510 self.position_writer.close() 511 self.position_index_writer.close() 512 513 class PositionDictionaryReader: 514 515 "Reading position dictionaries." 516 517 def __init__(self, position_reader, position_index_reader): 518 self.position_reader = position_reader 519 self.position_index_reader = position_index_reader 520 521 def read_term_positions(self, offset, doc_frequency): 522 523 """ 524 Return an iterator for dictionary entries starting at 'offset' with the 525 given 'doc_frequency'. 526 """ 527 528 return PositionDictionaryIterator(self.position_reader, 529 self.position_index_reader, offset, doc_frequency) 530 531 def close(self): 532 self.position_reader.close() 533 self.position_index_reader.close() 534 535 class PositionDictionaryIterator: 536 537 "Iteration over position dictionary entries." 538 539 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 540 self.position_reader = position_reader 541 self.doc_frequency = doc_frequency 542 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 543 544 # Remember the last values. 545 546 self.found_docnum, self.found_positions = None, None 547 548 # Maintain state for the next index entry, if read. 549 550 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 551 552 # Initialise the current index entry and current position file iterator. 553 554 self._next_section() 555 self._init_section() 556 557 def __len__(self): 558 return self.doc_frequency 559 560 def sort(self): 561 pass 562 563 def __iter__(self): 564 return self 565 566 def next(self): 567 568 """ 569 Attempt to get the next document record from the section in the 570 positions file. 571 """ 572 573 # Return any visited but unrequested record. 574 575 if self.found_docnum is not None: 576 t = self.found_docnum, self.found_positions 577 self.found_docnum, self.found_positions = None, None 578 return t 579 580 # Or search for the next record. 581 582 while 1: 583 584 # Either return the next record. 585 586 try: 587 return self.iterator.next() 588 589 # Or, where a section is finished, get the next section and try again. 590 591 except StopIteration: 592 593 # Where a section follows, update the index iterator, but keep 594 # reading using the same file iterator (since the data should 595 # just follow on from the last section). 596 597 self._next_section() 598 self.iterator.replenish(self.section_count) 599 600 # Reset the state of the iterator to make sure that document 601 # numbers are correct. 602 603 self.iterator.reset() 604 605 def from_document(self, docnum): 606 607 """ 608 Attempt to navigate to a positions entry for the given 'docnum', 609 returning the positions for 'docnum', or None otherwise. 610 """ 611 612 # Return any unrequested document positions. 613 614 if docnum == self.found_docnum: 615 return self.found_positions 616 617 # Read ahead in the index until the next entry refers to a document 618 # later than the desired document. 619 620 try: 621 if self.next_docnum is None: 622 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 623 624 # Read until the next entry is after the desired document number, 625 # or until the end of the results. 626 627 while self.next_docnum <= docnum: 628 self._next_read_section() 629 if self.docnum < docnum: 630 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 631 else: 632 break 633 634 except StopIteration: 635 pass 636 637 # Navigate in the position file to the document. 638 639 self._init_section() 640 641 try: 642 while 1: 643 found_docnum, found_positions = self.iterator.next() 644 645 # Return the desired document positions or None (retaining the 646 # positions for the document immediately after). 647 648 if docnum == found_docnum: 649 return found_positions 650 elif docnum < found_docnum: 651 self.found_docnum, self.found_positions = found_docnum, found_positions 652 return None 653 654 except StopIteration: 655 return None 656 657 # Internal methods. 658 659 def _next_section(self): 660 661 "Attempt to get the next section in the index." 662 663 if self.next_docnum is None: 664 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 665 else: 666 self._next_read_section() 667 668 def _next_read_section(self): 669 670 """ 671 Make the next index entry the current one without reading from the 672 index. 673 """ 674 675 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 676 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 677 678 def _init_section(self): 679 680 "Initialise the iterator for the section in the position file." 681 682 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 683 684 class TermWriter(FileWriter): 685 686 "Writing term information to files." 687 688 def reset(self): 689 self.last_term = "" 690 self.last_offset = 0 691 692 def write_term(self, term, offset, frequency, doc_frequency): 693 694 """ 695 Write the given 'term', its position file 'offset', its 'frequency' and 696 its 'doc_frequency' (number of documents in which it appears) to the 697 term information file. Return the offset after the term information was 698 written to the file. 699 """ 700 701 # Write the prefix length and term suffix. 702 703 common = len(commonprefix([self.last_term, term])) 704 suffix = term[common:] 705 706 self.write_number(common) 707 self.write_string(suffix) 708 709 # Write the offset delta. 710 711 self.write_number(offset - self.last_offset) 712 713 # Write the frequency. 714 715 self.write_number(frequency) 716 717 # Write the document frequency. 718 719 self.write_number(doc_frequency) 720 721 self.last_term = term 722 self.last_offset = offset 723 724 return self.f.tell() 725 726 class TermReader(FileReader): 727 728 "Reading term information from files." 729 730 def reset(self): 731 self.last_term = "" 732 self.last_offset = 0 733 734 def read_term(self): 735 736 """ 737 Read a term, its position file offset, its frequency and its document 738 frequency from the term information file. 739 """ 740 741 # Read the prefix length and term suffix. 742 743 common = self.read_number() 744 suffix = self.read_string() 745 746 self.last_term = self.last_term[:common] + suffix 747 748 # Read the offset delta. 749 750 self.last_offset += self.read_number() 751 752 # Read the frequency. 753 754 frequency = self.read_number() 755 756 # Read the document frequency. 757 758 doc_frequency = self.read_number() 759 760 return self.last_term, self.last_offset, frequency, doc_frequency 761 762 def go_to_term(self, term, offset, info_offset): 763 764 """ 765 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 766 permits the scanning for later terms from the specified term. 767 """ 768 769 self.f.seek(info_offset) 770 self.last_term = term 771 self.last_offset = offset 772 773 class TermIndexWriter(TermWriter): 774 775 "Writing term dictionary index details to files." 776 777 def reset(self): 778 TermWriter.reset(self) 779 self.last_info_offset = 0 780 781 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 782 783 """ 784 Write the given 'term', its position file 'offset', its 'frequency' and 785 its 'doc_frequency' to the term dictionary index file, along with the 786 'info_offset' in the term information file. 787 """ 788 789 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 790 791 # Write the information file offset delta. 792 793 self.write_number(info_offset - self.last_info_offset) 794 self.last_info_offset = info_offset 795 796 class TermIndexReader(TermReader): 797 798 "Reading term dictionary index details from files." 799 800 def reset(self): 801 TermReader.reset(self) 802 self.last_info_offset = 0 803 804 def read_term(self): 805 806 """ 807 Read a term, its position file offset, its frequency, its document 808 frequency and a term information file offset from the term dictionary 809 index file. 810 """ 811 812 term, offset, frequency, doc_frequency = TermReader.read_term(self) 813 814 # Read the offset delta. 815 816 self.last_info_offset += self.read_number() 817 818 return term, offset, frequency, doc_frequency, self.last_info_offset 819 820 class TermDictionaryWriter: 821 822 "Writing term dictionaries." 823 824 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 825 self.info_writer = info_writer 826 self.index_writer = index_writer 827 self.position_dict_writer = position_dict_writer 828 self.interval = interval 829 self.entry = 0 830 831 def _write_term(self, term, offset, frequency, doc_frequency): 832 833 """ 834 Write the given 'term', its position file 'offset', its 'frequency' and 835 its 'doc_frequency' (number of documents in which it appears) to the 836 term information file. Return the offset after the term information was 837 written to the file. 838 """ 839 840 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 841 842 if self.entry % self.interval == 0: 843 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 844 845 self.entry += 1 846 847 def write_term_positions(self, term, doc_positions): 848 849 """ 850 Write the given 'term' and the 'doc_positions' recording the documents 851 and positions at which the term is found. 852 """ 853 854 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 855 self._write_term(term, offset, frequency, doc_frequency) 856 857 def close(self): 858 self.info_writer.close() 859 self.index_writer.close() 860 self.position_dict_writer.close() 861 862 class TermDictionaryReader: 863 864 "Reading term dictionaries." 865 866 def __init__(self, info_reader, index_reader, position_dict_reader): 867 self.info_reader = info_reader 868 self.index_reader = index_reader 869 self.position_dict_reader = position_dict_reader 870 871 self.terms = [] 872 try: 873 while 1: 874 self.terms.append(self.index_reader.read_term()) 875 except EOFError: 876 pass 877 878 # Large numbers for ordering purposes. 879 880 if self.terms: 881 self.max_offset = self.terms[-1][1] + 1 882 else: 883 self.max_offset = None 884 885 def _find_closest_entry(self, term): 886 887 """ 888 Find the offsets and frequencies of 'term' from the term dictionary or 889 the closest term starting with the value of 'term'. 890 891 Return the closest index entry consisting of a term, the position file 892 offset, the term frequency, the document frequency, and the term details 893 file offset. 894 """ 895 896 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 897 898 # Get the entry position providing the term or one preceding it. 899 # If no entry precedes the requested term, return the very first entry 900 # as the closest. 901 902 if i == -1: 903 return self.terms[0] 904 else: 905 return self.terms[i] 906 907 def _find_closest_term(self, term): 908 909 """ 910 Find the offsets and frequencies of 'term' from the term dictionary or 911 the closest term starting with the value of 'term'. 912 913 Return the closest term (or the term itself), the position file offset, 914 the term frequency, the document frequency, and the term details file 915 offset (or None if the reader is already positioned). 916 """ 917 918 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 919 920 # Where the term is found immediately, return the offset and 921 # frequencies. If the term does not appear, return the details of the 922 # closest entry. 923 924 if term <= found_term: 925 return found_term, offset, frequency, doc_frequency, info_offset 926 927 # Otherwise, seek past the index term's entry in the information file 928 # and scan for the desired term. 929 930 else: 931 self.info_reader.go_to_term(found_term, offset, info_offset) 932 try: 933 while term > found_term: 934 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 935 except EOFError: 936 pass 937 938 return found_term, offset, frequency, doc_frequency, None 939 940 def _find_term(self, term): 941 942 """ 943 Find the position file offset and frequency of 'term' from the term 944 dictionary. 945 """ 946 947 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 948 949 # If the term is found, return the offset and frequencies. 950 951 if term == found_term: 952 return offset, frequency, doc_frequency 953 else: 954 return None 955 956 def _get_positions(self, offset, doc_frequency): 957 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 958 959 # Sequential access methods. 960 961 def rewind(self): 962 self.info_reader.rewind() 963 964 def read_term(self): 965 966 """ 967 Return the next term, its frequency, its document frequency, and the 968 documents and positions at which the term is found. 969 """ 970 971 term, offset, frequency, doc_frequency = self.info_reader.read_term() 972 positions = self._get_positions(offset, doc_frequency) 973 return term, frequency, doc_frequency, positions 974 975 # Query methods. 976 977 def find_terms(self, term): 978 979 "Return all terms whose values start with the value of 'term'." 980 981 terms = [] 982 983 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 984 985 # Position the reader, if necessary. 986 987 if info_offset is not None: 988 self.info_reader.go_to_term(found_term, offset, info_offset) 989 990 # Read and record terms. 991 992 try: 993 # Add the found term if it starts with the specified term. 994 995 while found_term.startswith(term): 996 terms.append(found_term) 997 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 998 999 except EOFError: 1000 pass 1001 1002 return terms 1003 1004 def find_positions(self, term): 1005 1006 "Return the documents and positions at which the given 'term' is found." 1007 1008 t = self._find_term(term) 1009 if t is None: 1010 return None 1011 else: 1012 offset, frequency, doc_frequency = t 1013 return self._get_positions(offset, doc_frequency) 1014 1015 def get_frequency(self, term): 1016 1017 "Return the frequency of the given 'term'." 1018 1019 t = self._find_term(term) 1020 if t is None: 1021 return None 1022 else: 1023 offset, frequency, doc_frequency = t 1024 return frequency 1025 1026 def get_document_frequency(self, term): 1027 1028 "Return the document frequency of the given 'term'." 1029 1030 t = self._find_term(term) 1031 if t is None: 1032 return None 1033 else: 1034 offset, frequency, doc_frequency = t 1035 return doc_frequency 1036 1037 def close(self): 1038 self.info_reader.close() 1039 self.index_reader.close() 1040 self.position_dict_reader.close() 1041 1042 # Specific classes for storing document information. 1043 1044 class FieldWriter(FileWriter): 1045 1046 "Writing field data to files." 1047 1048 def reset(self): 1049 self.last_docnum = 0 1050 1051 def write_fields(self, docnum, fields): 1052 1053 """ 1054 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1055 representing field identifiers and values respectively). 1056 Return the offset at which the fields are stored. 1057 """ 1058 1059 offset = self.f.tell() 1060 1061 # Write the document number delta. 1062 1063 self.write_number(docnum - self.last_docnum) 1064 1065 # Write the number of fields. 1066 1067 self.write_number(len(fields)) 1068 1069 # Write the fields themselves. 1070 1071 for i, field in fields: 1072 self.write_number(i) 1073 self.write_string(field, 1) # compress 1074 1075 self.last_docnum = docnum 1076 return offset 1077 1078 class FieldReader(FileReader): 1079 1080 "Reading field data from files." 1081 1082 def reset(self): 1083 self.last_docnum = 0 1084 1085 def read_fields(self): 1086 1087 """ 1088 Read fields from the file, returning a tuple containing the document 1089 number and a list of field (identifier, value) pairs. 1090 """ 1091 1092 # Read the document number. 1093 1094 self.last_docnum += self.read_number() 1095 1096 # Read the number of fields. 1097 1098 nfields = self.read_number() 1099 1100 # Collect the fields. 1101 1102 fields = [] 1103 i = 0 1104 1105 while i < nfields: 1106 identifier = self.read_number() 1107 value = self.read_string(1) # decompress 1108 fields.append((identifier, value)) 1109 i += 1 1110 1111 return self.last_docnum, fields 1112 1113 def read_document_fields(self, docnum, offset): 1114 1115 """ 1116 Read fields for 'docnum' at the given 'offset'. This permits the 1117 retrieval of details for the specified document, as well as scanning for 1118 later documents. 1119 """ 1120 1121 self.f.seek(offset) 1122 bad_docnum, fields = self.read_fields() 1123 self.last_docnum = docnum 1124 return docnum, fields 1125 1126 class FieldIndexWriter(FileWriter): 1127 1128 "Writing field index details to files." 1129 1130 def reset(self): 1131 self.last_docnum = 0 1132 self.last_offset = 0 1133 1134 def write_document(self, docnum, offset): 1135 1136 """ 1137 Write for the given 'docnum', the 'offset' at which the fields for the 1138 document are stored in the fields file. 1139 """ 1140 1141 # Write the document number and offset deltas. 1142 1143 self.write_number(docnum - self.last_docnum) 1144 self.write_number(offset - self.last_offset) 1145 1146 self.last_docnum = docnum 1147 self.last_offset = offset 1148 1149 class FieldIndexReader(FileReader): 1150 1151 "Reading field index details from files." 1152 1153 def reset(self): 1154 self.last_docnum = 0 1155 self.last_offset = 0 1156 1157 def read_document(self): 1158 1159 "Read a document number and field file offset." 1160 1161 # Read the document number delta and offset. 1162 1163 self.last_docnum += self.read_number() 1164 self.last_offset += self.read_number() 1165 1166 return self.last_docnum, self.last_offset 1167 1168 class FieldDictionaryWriter: 1169 1170 "Writing field dictionary details." 1171 1172 def __init__(self, field_writer, field_index_writer, interval): 1173 self.field_writer = field_writer 1174 self.field_index_writer = field_index_writer 1175 self.interval = interval 1176 self.entry = 0 1177 1178 def write_fields(self, docnum, fields): 1179 1180 "Write details of the document with the given 'docnum' and 'fields'." 1181 1182 offset = self.field_writer.write_fields(docnum, fields) 1183 1184 if self.entry % self.interval == 0: 1185 self.field_index_writer.write_document(docnum, offset) 1186 1187 self.entry += 1 1188 1189 def close(self): 1190 self.field_writer.close() 1191 self.field_index_writer.close() 1192 1193 class FieldDictionaryReader: 1194 1195 "Reading field dictionary details." 1196 1197 def __init__(self, field_reader, field_index_reader): 1198 self.field_reader = field_reader 1199 self.field_index_reader = field_index_reader 1200 1201 self.docs = [] 1202 try: 1203 while 1: 1204 self.docs.append(self.field_index_reader.read_document()) 1205 except EOFError: 1206 pass 1207 1208 # Large numbers for ordering purposes. 1209 1210 if self.docs: 1211 self.max_offset = self.docs[-1][1] 1212 else: 1213 self.max_offset = None 1214 1215 def rewind(self): 1216 self.field_reader.rewind() 1217 1218 def read_fields(self): 1219 1220 "Return the next document number and fields." 1221 1222 return self.field_reader.read_fields() 1223 1224 def get_fields(self, docnum): 1225 1226 "Read the fields of the document with the given 'docnum'." 1227 1228 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1229 1230 # Get the entry position providing the term or one preceding it. 1231 1232 if i == -1: 1233 return None 1234 1235 found_docnum, offset = self.docs[i] 1236 1237 # Read from the fields file. 1238 1239 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1240 1241 # Scan for the document, if necessary. 1242 1243 try: 1244 while docnum > found_docnum: 1245 found_docnum, fields = self.field_reader.read_fields() 1246 except EOFError: 1247 pass 1248 1249 # If the document is found, return the fields. 1250 1251 if docnum == found_docnum: 1252 return fields 1253 else: 1254 return None 1255 1256 def close(self): 1257 self.field_reader.close() 1258 self.field_index_reader.close() 1259 1260 # Dictionary merging classes. 1261 1262 class Merger: 1263 1264 "Merge files." 1265 1266 def __init__(self, writer, readers): 1267 self.writer = writer 1268 self.readers = readers 1269 1270 def close(self): 1271 for reader in self.readers: 1272 reader.close() 1273 self.writer.close() 1274 1275 class TermDictionaryMerger(Merger): 1276 1277 "Merge term and position files." 1278 1279 def merge(self): 1280 1281 """ 1282 Merge terms and positions from the readers, sending them to the writer. 1283 """ 1284 1285 entries = [] 1286 1287 # Get the first entries from the readers. 1288 1289 for partition, reader in enumerate(self.readers): 1290 reader.rewind() 1291 1292 try: 1293 term, frequency, doc_frequency, positions = reader.read_term() 1294 insort_right(entries, (term, positions, partition)) 1295 except EOFError: 1296 pass 1297 1298 # While entries are available, write them out in order, merging where 1299 # appropriate. 1300 1301 while entries: 1302 term, doc_positions, partition = entries[0] 1303 to_update = [partition] 1304 1305 nentries = len(entries) 1306 i = 1 1307 1308 # Find other entries for the term. 1309 1310 while i < nentries: 1311 other_term, other_doc_positions, other_partition = entries[i] 1312 1313 # For such entries, merge the positions. 1314 # Since document positions should only appear in a single 1315 # partition, a simple update should be sufficient. 1316 1317 if other_term == term: 1318 doc_positions.update(other_doc_positions) 1319 to_update.append(other_partition) 1320 i += 1 1321 else: 1322 break 1323 1324 # Write the combined term details. 1325 1326 self.writer.write_term_positions(term, doc_positions) 1327 1328 # Update the entries from the affected readers. 1329 1330 del entries[:i] 1331 1332 for partition in to_update: 1333 try: 1334 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1335 insort_right(entries, (term, positions, partition)) 1336 except EOFError: 1337 pass 1338 1339 class FieldDictionaryMerger(Merger): 1340 1341 "Merge field files." 1342 1343 def merge(self): 1344 1345 """ 1346 Merge fields from the readers, sending them to the writer. 1347 """ 1348 1349 entries = [] 1350 1351 # Get the first entries from the readers. 1352 1353 for partition, reader in enumerate(self.readers): 1354 reader.rewind() 1355 1356 try: 1357 docnum, fields = reader.read_fields() 1358 insort_right(entries, (docnum, fields, partition)) 1359 except EOFError: 1360 pass 1361 1362 # While entries are available, write them out in order, merging where 1363 # appropriate. Since fields from one document should only appear in a 1364 # single partition, only one partition will be updated at a time. 1365 1366 while entries: 1367 docnum, fields, partition = entries[0] 1368 1369 # Write the combined term details. 1370 1371 self.writer.write_fields(docnum, fields) 1372 1373 # Update the entries from the affected readers. 1374 1375 del entries[0] 1376 1377 try: 1378 docnum, fields = self.readers[partition].read_fields() 1379 insort_right(entries, (docnum, fields, partition)) 1380 except EOFError: 1381 pass 1382 1383 # Utility functions. 1384 1385 def get_term_writer(pathname, partition, interval, doc_interval): 1386 1387 """ 1388 Return a term dictionary writer using files under the given 'pathname' 1389 labelled according to the given 'partition', using the given indexing 1390 'interval' for terms and 'doc_interval' for document position records. 1391 """ 1392 1393 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1394 info_writer = TermWriter(tdf) 1395 1396 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1397 index_writer = TermIndexWriter(tdif) 1398 1399 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1400 positions_writer = PositionWriter(tpf) 1401 1402 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1403 positions_index_writer = PositionIndexWriter(tpif) 1404 1405 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1406 1407 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1408 1409 def get_field_writer(pathname, partition, interval): 1410 1411 """ 1412 Return a field dictionary writer using files under the given 'pathname' 1413 labelled according to the given 'partition', using the given indexing 1414 'interval'. 1415 """ 1416 1417 ff = open(join(pathname, "fields-%s" % partition), "wb") 1418 field_writer = FieldWriter(ff) 1419 1420 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1421 field_index_writer = FieldIndexWriter(fif) 1422 1423 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1424 1425 def get_term_reader(pathname, partition): 1426 1427 """ 1428 Return a term dictionary reader using files under the given 'pathname' 1429 labelled according to the given 'partition'. 1430 """ 1431 1432 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1433 info_reader = TermReader(tdf) 1434 1435 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1436 index_reader = TermIndexReader(tdif) 1437 1438 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1439 positions_reader = PositionReader(tpf) 1440 1441 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1442 positions_index_reader = PositionIndexReader(tpif) 1443 1444 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1445 1446 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1447 1448 def get_field_reader(pathname, partition): 1449 1450 """ 1451 Return a field dictionary reader using files under the given 'pathname' 1452 labelled according to the given 'partition'. 1453 """ 1454 1455 ff = open(join(pathname, "fields-%s" % partition), "rb") 1456 field_reader = FieldReader(ff) 1457 1458 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1459 field_index_reader = FieldIndexReader(fif) 1460 1461 return FieldDictionaryReader(field_reader, field_index_reader) 1462 1463 def rename_files(pathname, names, from_partition, to_partition): 1464 for name in names: 1465 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1466 1467 def rename_term_files(pathname, from_partition, to_partition): 1468 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1469 1470 def rename_field_files(pathname, from_partition, to_partition): 1471 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1472 1473 def remove_files(pathname, names, partition): 1474 for name in names: 1475 remove(join(pathname, "%s-%s" % (name, partition))) 1476 1477 def remove_term_files(pathname, partition): 1478 remove_files(pathname, TERM_FILENAMES, partition) 1479 1480 def remove_field_files(pathname, partition): 1481 remove_files(pathname, FIELD_FILENAMES, partition) 1482 1483 # High-level classes. 1484 1485 class Document: 1486 1487 "A container of document information." 1488 1489 def __init__(self, docnum): 1490 self.docnum = docnum 1491 self.fields = [] 1492 self.terms = {} 1493 1494 def add_position(self, term, position): 1495 1496 """ 1497 Add a position entry for the given 'term', indicating the given 1498 'position'. 1499 """ 1500 1501 self.terms.setdefault(term, []).append(position) 1502 1503 def add_field(self, identifier, value): 1504 1505 "Add a field having the given 'identifier' and 'value'." 1506 1507 self.fields.append((identifier, unicode(value))) # convert to string 1508 1509 def set_fields(self, fields): 1510 1511 """ 1512 Set the document's 'fields': a list of tuples each containing an integer 1513 identifier and a string value. 1514 """ 1515 1516 self.fields = fields 1517 1518 class IndexWriter: 1519 1520 """ 1521 Building term information and writing it to the term and field dictionaries. 1522 """ 1523 1524 def __init__(self, pathname, interval, doc_interval, flush_interval): 1525 self.pathname = pathname 1526 self.interval = interval 1527 self.doc_interval = doc_interval 1528 self.flush_interval = flush_interval 1529 1530 self.dict_partition = 0 1531 self.field_dict_partition = 0 1532 1533 self.terms = {} 1534 self.docs = {} 1535 1536 self.doc_counter = 0 1537 1538 def add_document(self, doc): 1539 1540 """ 1541 Add the given document 'doc', updating the document counter and flushing 1542 terms and fields if appropriate. 1543 """ 1544 1545 for term, positions in doc.terms.items(): 1546 self.terms.setdefault(term, {})[doc.docnum] = positions 1547 1548 self.docs[doc.docnum] = doc.fields 1549 1550 self.doc_counter += 1 1551 if self.flush_interval and self.doc_counter >= self.flush_interval: 1552 self.flush_terms() 1553 self.flush_fields() 1554 self.doc_counter = 0 1555 1556 def get_term_writer(self): 1557 1558 "Return a term dictionary writer for the current partition." 1559 1560 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1561 1562 def get_field_writer(self): 1563 1564 "Return a field dictionary writer for the current partition." 1565 1566 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1567 1568 def flush_terms(self): 1569 1570 "Flush terms into the current term dictionary partition." 1571 1572 # Get the terms in order. 1573 1574 all_terms = self.terms 1575 terms = all_terms.keys() 1576 terms.sort() 1577 1578 dict_writer = self.get_term_writer() 1579 1580 for term in terms: 1581 doc_positions = all_terms[term].items() 1582 dict_writer.write_term_positions(term, doc_positions) 1583 1584 dict_writer.close() 1585 1586 self.terms = {} 1587 self.dict_partition += 1 1588 1589 def flush_fields(self): 1590 1591 "Flush fields into the current term dictionary partition." 1592 1593 # Get the documents in order. 1594 1595 docs = self.docs.items() 1596 docs.sort() 1597 1598 field_dict_writer = self.get_field_writer() 1599 1600 for docnum, fields in docs: 1601 field_dict_writer.write_fields(docnum, fields) 1602 1603 field_dict_writer.close() 1604 1605 self.docs = {} 1606 self.field_dict_partition += 1 1607 1608 def close(self): 1609 if self.terms: 1610 self.flush_terms() 1611 if self.docs: 1612 self.flush_fields() 1613 1614 class IndexReader: 1615 1616 "Accessing the term and field dictionaries." 1617 1618 def __init__(self, pathname): 1619 self.dict_reader = get_term_reader(pathname, "merged") 1620 self.field_dict_reader = get_field_reader(pathname, "merged") 1621 1622 def find_terms(self, term): 1623 return self.dict_reader.find_terms(term) 1624 1625 def find_positions(self, term): 1626 return self.dict_reader.find_positions(term) 1627 1628 def get_frequency(self, term): 1629 return self.dict_reader.get_frequency(term) 1630 1631 def get_document_frequency(self, term): 1632 return self.dict_reader.get_document_frequency(term) 1633 1634 def get_fields(self, docnum): 1635 return self.field_dict_reader.get_fields(docnum) 1636 1637 def close(self): 1638 self.dict_reader.close() 1639 self.field_dict_reader.close() 1640 1641 class Index: 1642 1643 "An inverted index solution encapsulating the various components." 1644 1645 def __init__(self, pathname): 1646 self.pathname = pathname 1647 self.reader = None 1648 self.writer = None 1649 1650 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1651 1652 """ 1653 Return a writer, optionally using the given indexing 'interval', 1654 'doc_interval' and 'flush_interval'. 1655 """ 1656 1657 if not exists(self.pathname): 1658 mkdir(self.pathname) 1659 1660 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1661 return self.writer 1662 1663 def get_reader(self, partition=0): 1664 1665 "Return a reader for the index." 1666 1667 # Ensure that only one partition exists. 1668 1669 self.merge() 1670 return self._get_reader(partition) 1671 1672 def _get_reader(self, partition): 1673 1674 "Return a reader for the index." 1675 1676 if not exists(self.pathname): 1677 raise OSError, "Index path %r does not exist." % self.pathname 1678 1679 self.reader = IndexReader(self.pathname) 1680 return self.reader 1681 1682 def merge(self): 1683 1684 "Merge/optimise index partitions." 1685 1686 self.merge_terms() 1687 self.merge_fields() 1688 1689 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1690 1691 """ 1692 Merge term dictionaries using the given indexing 'interval' and 1693 'doc_interval'. 1694 """ 1695 1696 readers = [] 1697 partitions = set() 1698 1699 for filename in listdir(self.pathname): 1700 if filename.startswith("terms-"): # 6 character prefix 1701 partition = filename[6:] 1702 readers.append(get_term_reader(self.pathname, partition)) 1703 partitions.add(partition) 1704 1705 # Write directly to a dictionary. 1706 1707 if len(readers) > 1: 1708 if "merged" in partitions: 1709 rename_term_files(self.pathname, "merged", "old-merged") 1710 partitions.remove("merged") 1711 partitions.add("old-merged") 1712 1713 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1714 merger = TermDictionaryMerger(writer, readers) 1715 merger.merge() 1716 merger.close() 1717 1718 # Remove old files. 1719 1720 for partition in partitions: 1721 remove_term_files(self.pathname, partition) 1722 1723 elif len(readers) == 1: 1724 partition = list(partitions)[0] 1725 if partition != "merged": 1726 rename_term_files(self.pathname, partition, "merged") 1727 1728 def merge_fields(self, interval=FIELD_INTERVAL): 1729 1730 "Merge field dictionaries using the given indexing 'interval'." 1731 1732 readers = [] 1733 partitions = set() 1734 1735 for filename in listdir(self.pathname): 1736 if filename.startswith("fields-"): # 7 character prefix 1737 partition = filename[7:] 1738 readers.append(get_field_reader(self.pathname, partition)) 1739 partitions.add(partition) 1740 1741 # Write directly to a dictionary. 1742 1743 if len(readers) > 1: 1744 if "merged" in partitions: 1745 rename_field_files(self.pathname, "merged", "old-merged") 1746 partitions.remove("merged") 1747 partitions.add("old-merged") 1748 1749 writer = get_field_writer(self.pathname, "merged", interval) 1750 merger = FieldDictionaryMerger(writer, readers) 1751 merger.merge() 1752 merger.close() 1753 1754 # Remove old files. 1755 1756 for partition in partitions: 1757 remove_field_files(self.pathname, partition) 1758 1759 elif len(readers) == 1: 1760 partition = list(partitions)[0] 1761 if partition != "merged": 1762 rename_field_files(self.pathname, partition, "merged") 1763 1764 def close(self): 1765 if self.reader is not None: 1766 self.reader.close() 1767 self.reader = None 1768 if self.writer is not None: 1769 self.writer.close() 1770 self.writer = None 1771 1772 # vim: tabstop=4 expandtab shiftwidth=4