1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing a 7-bit number. 87 88 elif number < 128: 89 self.f.write(chr(number)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Make sure that the positions are sorted. 229 230 positions.sort() 231 232 # Write the position deltas. 233 234 last = 0 235 for position in positions: 236 pos = position - last 237 self.write_number(pos) 238 last = position 239 240 self.last_docnum = docnum 241 242 return offset 243 244 class PositionReader(FileReader): 245 246 "Reading position information from files." 247 248 def reset(self): 249 self.last_docnum = 0 250 251 def read_positions(self): 252 253 "Read positions, returning a document number and a list of positions." 254 255 # Read the document number delta and add it to the last number. 256 257 self.last_docnum += self.read_number() 258 259 # Read the number of positions. 260 261 npositions = self.read_number() 262 263 # Read the position deltas, adding each previous position to get the 264 # appropriate collection of absolute positions. 265 266 i = 0 267 last = 0 268 positions = [] 269 270 while i < npositions: 271 last += self.read_number() 272 positions.append(last) 273 i += 1 274 275 return self.last_docnum, positions 276 277 def read_term_positions(self, offset, count): 278 279 """ 280 Read all positions from 'offset', seeking to that position in the file 281 before reading. The number of documents available for reading is limited 282 to 'count'. 283 """ 284 285 # Duplicate the file handle. 286 287 f = fdopen(dup(self.f.fileno()), "rb") 288 f.seek(offset) 289 return PositionIterator(f, count) 290 291 class PositionIndexWriter(FileWriter): 292 293 "Writing position index information to files." 294 295 def reset(self): 296 self.last_docnum = 0 297 self.last_pos_offset = 0 298 299 def write_positions(self, docnum, pos_offset, count): 300 301 """ 302 Write the given 'docnum, 'pos_offset' and document 'count' to the 303 position index file. 304 """ 305 306 # Record the offset of this record. 307 308 offset = self.f.tell() 309 310 # Write the document number delta. 311 312 self.write_number(docnum - self.last_docnum) 313 self.last_docnum = docnum 314 315 # Write the position file offset delta. 316 317 self.write_number(pos_offset - self.last_pos_offset) 318 self.last_pos_offset = pos_offset 319 320 # Write the document count. 321 322 self.write_number(count) 323 324 return offset 325 326 class PositionIndexReader(FileReader): 327 328 "Reading position index information from files." 329 330 def reset(self): 331 self.last_docnum = 0 332 self.last_pos_offset = 0 333 334 def read_positions(self): 335 336 """ 337 Read a document number, a position file offset for the position index 338 file, and the number of documents in a section of that file. 339 """ 340 341 # Read the document number delta. 342 343 self.last_docnum += self.read_number() 344 345 # Read the offset delta. 346 347 self.last_pos_offset += self.read_number() 348 349 # Read the document count. 350 351 count = self.read_number() 352 353 return self.last_docnum, self.last_pos_offset, count 354 355 def read_term_positions(self, offset, doc_frequency): 356 357 """ 358 Read all positions from 'offset', seeking to that position in the file 359 before reading. The number of documents available for reading is limited 360 to 'doc_frequency'. 361 """ 362 363 # Duplicate the file handle. 364 365 f = fdopen(dup(self.f.fileno()), "rb") 366 f.seek(offset) 367 return PositionIndexIterator(f, doc_frequency) 368 369 # Iterators for position-related files. 370 371 class IteratorBase: 372 373 def __init__(self, count): 374 self.replenish(count) 375 376 def replenish(self, count): 377 self.count = count 378 self.read_documents = 0 379 380 def __len__(self): 381 return self.count 382 383 def sort(self): 384 pass # Stored document positions are already sorted. 385 386 def __iter__(self): 387 return self 388 389 class PositionIterator(PositionReader, IteratorBase): 390 391 "Iterating over document positions." 392 393 def __init__(self, f, count): 394 PositionReader.__init__(self, f) 395 IteratorBase.__init__(self, count) 396 397 def next(self): 398 399 "Read positions for a single document." 400 401 if self.read_documents < self.count: 402 self.read_documents += 1 403 return self.read_positions() 404 else: 405 raise StopIteration 406 407 class PositionIndexIterator(PositionIndexReader, IteratorBase): 408 409 "Iterating over document positions." 410 411 def __init__(self, f, count): 412 PositionIndexReader.__init__(self, f) 413 IteratorBase.__init__(self, count) 414 self.section_count = 0 415 416 def next(self): 417 418 "Read positions for a single document." 419 420 self.read_documents += self.section_count 421 if self.read_documents < self.count: 422 docnum, pos_offset, self.section_count = t = self.read_positions() 423 return t 424 else: 425 raise StopIteration 426 427 class PositionDictionaryWriter: 428 429 "Writing position dictionaries." 430 431 def __init__(self, position_writer, position_index_writer, interval): 432 self.position_writer = position_writer 433 self.position_index_writer = position_index_writer 434 self.interval = interval 435 436 def write_term_positions(self, doc_positions): 437 438 """ 439 Write all 'doc_positions' - a collection of tuples of the form (document 440 number, position list) - to the file. 441 442 Add some records to the index, making dictionary entries. 443 444 Return a tuple containing the offset of the written data, the frequency 445 (number of positions), and document frequency (number of documents) for 446 the term involved. 447 """ 448 449 # Reset the writers. 450 451 self.position_writer.reset() 452 self.position_index_writer.reset() 453 454 index_offset = None 455 456 # Write the positions. 457 458 frequency = 0 459 first_docnum = None 460 first_offset = None 461 count = 0 462 463 doc_positions.sort() 464 465 for docnum, positions in doc_positions: 466 pos_offset = self.position_writer.write_positions(docnum, positions) 467 468 # Retain the first record offset for a subsequent index entry. 469 470 if first_offset is None: 471 first_offset = pos_offset 472 first_docnum = docnum 473 474 frequency += len(positions) 475 count += 1 476 477 # Every {interval} entries, write an index entry. 478 479 if count == self.interval: 480 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 481 482 # Remember the first index entry offset. 483 484 if index_offset is None: 485 index_offset = io 486 487 first_offset = None 488 first_docnum = None 489 count = 0 490 491 # Reset the position writer so that position readers accessing 492 # a section start with the correct document number. 493 494 self.position_writer.reset() 495 496 # Finish writing an index entry for the remaining documents. 497 498 else: 499 if first_offset is not None: 500 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 501 502 # Remember the first index entry offset. 503 504 if index_offset is None: 505 index_offset = io 506 507 return index_offset, frequency, len(doc_positions) 508 509 def close(self): 510 self.position_writer.close() 511 self.position_index_writer.close() 512 513 class PositionDictionaryReader: 514 515 "Reading position dictionaries." 516 517 def __init__(self, position_reader, position_index_reader): 518 self.position_reader = position_reader 519 self.position_index_reader = position_index_reader 520 521 def read_term_positions(self, offset, doc_frequency): 522 523 """ 524 Return an iterator for dictionary entries starting at 'offset' with the 525 given 'doc_frequency'. 526 """ 527 528 return PositionDictionaryIterator(self.position_reader, 529 self.position_index_reader, offset, doc_frequency) 530 531 def close(self): 532 self.position_reader.close() 533 self.position_index_reader.close() 534 535 class PositionDictionaryIterator: 536 537 "Iteration over position dictionary entries." 538 539 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 540 self.position_reader = position_reader 541 self.doc_frequency = doc_frequency 542 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 543 544 # Remember the last values. 545 546 self.found_docnum, self.found_positions = None, None 547 548 # Maintain state for the next index entry, if read. 549 550 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 551 552 # Initialise the current index entry and current position file iterator. 553 554 self._next_section() 555 self._init_section() 556 557 def __len__(self): 558 return self.doc_frequency 559 560 def sort(self): 561 pass 562 563 def __iter__(self): 564 return self 565 566 def next(self): 567 568 """ 569 Attempt to get the next document record from the section in the 570 positions file. 571 """ 572 573 # Return any visited but unrequested record. 574 575 if self.found_docnum is not None: 576 t = self.found_docnum, self.found_positions 577 self.found_docnum, self.found_positions = None, None 578 return t 579 580 # Or search for the next record. 581 582 while 1: 583 584 # Either return the next record. 585 586 try: 587 return self.iterator.next() 588 589 # Or, where a section is finished, get the next section and try again. 590 591 except StopIteration: 592 593 # Where a section follows, update the index iterator, but keep 594 # reading using the same file iterator (since the data should 595 # just follow on from the last section). 596 597 self._next_section() 598 self.iterator.replenish(self.section_count) 599 600 # Reset the state of the iterator to make sure that document 601 # numbers are correct. 602 603 self.iterator.reset() 604 605 def from_document(self, docnum): 606 607 """ 608 Attempt to navigate to a positions entry for the given 'docnum', 609 returning the positions for 'docnum', or None otherwise. 610 """ 611 612 # Return any unrequested document positions. 613 614 if docnum == self.found_docnum: 615 return self.found_positions 616 617 # Read ahead in the index until the next entry refers to a document 618 # later than the desired document. 619 620 try: 621 if self.next_docnum is None: 622 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 623 624 # Read until the next entry is after the desired document number, 625 # or until the end of the results. 626 627 while self.next_docnum <= docnum: 628 self._next_read_section() 629 if self.docnum < docnum: 630 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 631 else: 632 break 633 634 except StopIteration: 635 pass 636 637 # Navigate in the position file to the document. 638 639 self._init_section() 640 641 try: 642 while 1: 643 found_docnum, found_positions = self.iterator.next() 644 645 # Return the desired document positions or None (retaining the 646 # positions for the document immediately after). 647 648 if docnum == found_docnum: 649 return found_positions 650 elif docnum < found_docnum: 651 self.found_docnum, self.found_positions = found_docnum, found_positions 652 return None 653 654 except StopIteration: 655 return None 656 657 # Internal methods. 658 659 def _next_section(self): 660 661 "Attempt to get the next section in the index." 662 663 if self.next_docnum is None: 664 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 665 else: 666 self._next_read_section() 667 668 def _next_read_section(self): 669 670 """ 671 Make the next index entry the current one without reading from the 672 index. 673 """ 674 675 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 676 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 677 678 def _init_section(self): 679 680 "Initialise the iterator for the section in the position file." 681 682 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 683 684 class TermWriter(FileWriter): 685 686 "Writing term information to files." 687 688 def reset(self): 689 self.last_term = "" 690 self.last_offset = 0 691 692 def write_term(self, term, offset, frequency, doc_frequency): 693 694 """ 695 Write the given 'term', its position file 'offset', its 'frequency' and 696 its 'doc_frequency' (number of documents in which it appears) to the 697 term information file. Return the offset after the term information was 698 written to the file. 699 """ 700 701 # Write the prefix length and term suffix. 702 703 common = len(commonprefix([self.last_term, term])) 704 suffix = term[common:] 705 706 self.write_number(common) 707 self.write_string(suffix) 708 709 # Write the offset delta. 710 711 self.write_number(offset - self.last_offset) 712 713 # Write the frequency. 714 715 self.write_number(frequency) 716 717 # Write the document frequency. 718 719 self.write_number(doc_frequency) 720 721 self.last_term = term 722 self.last_offset = offset 723 724 return self.f.tell() 725 726 class TermReader(FileReader): 727 728 "Reading term information from files." 729 730 def reset(self): 731 self.last_term = "" 732 self.last_offset = 0 733 734 def read_term(self): 735 736 """ 737 Read a term, its position file offset, its frequency and its document 738 frequency from the term information file. 739 """ 740 741 # Read the prefix length and term suffix. 742 743 common = self.read_number() 744 suffix = self.read_string() 745 746 self.last_term = self.last_term[:common] + suffix 747 748 # Read the offset delta. 749 750 self.last_offset += self.read_number() 751 752 # Read the frequency. 753 754 frequency = self.read_number() 755 756 # Read the document frequency. 757 758 doc_frequency = self.read_number() 759 760 return self.last_term, self.last_offset, frequency, doc_frequency 761 762 def go_to_term(self, term, offset, info_offset): 763 764 """ 765 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 766 permits the scanning for later terms from the specified term. 767 """ 768 769 self.f.seek(info_offset) 770 self.last_term = term 771 self.last_offset = offset 772 773 class TermIndexWriter(TermWriter): 774 775 "Writing term dictionary index details to files." 776 777 def reset(self): 778 TermWriter.reset(self) 779 self.last_info_offset = 0 780 781 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 782 783 """ 784 Write the given 'term', its position file 'offset', its 'frequency' and 785 its 'doc_frequency' to the term dictionary index file, along with the 786 'info_offset' in the term information file. 787 """ 788 789 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 790 791 # Write the information file offset delta. 792 793 self.write_number(info_offset - self.last_info_offset) 794 self.last_info_offset = info_offset 795 796 class TermIndexReader(TermReader): 797 798 "Reading term dictionary index details from files." 799 800 def reset(self): 801 TermReader.reset(self) 802 self.last_info_offset = 0 803 804 def read_term(self): 805 806 """ 807 Read a term, its position file offset, its frequency, its document 808 frequency and a term information file offset from the term dictionary 809 index file. 810 """ 811 812 term, offset, frequency, doc_frequency = TermReader.read_term(self) 813 814 # Read the offset delta. 815 816 self.last_info_offset += self.read_number() 817 818 return term, offset, frequency, doc_frequency, self.last_info_offset 819 820 class TermDictionaryWriter: 821 822 "Writing term dictionaries." 823 824 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 825 self.info_writer = info_writer 826 self.index_writer = index_writer 827 self.position_dict_writer = position_dict_writer 828 self.interval = interval 829 self.entry = 0 830 831 def _write_term(self, term, offset, frequency, doc_frequency): 832 833 """ 834 Write the given 'term', its position file 'offset', its 'frequency' and 835 its 'doc_frequency' (number of documents in which it appears) to the 836 term information file. Return the offset after the term information was 837 written to the file. 838 """ 839 840 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 841 842 if self.entry % self.interval == 0: 843 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 844 845 self.entry += 1 846 847 def write_term_positions(self, term, doc_positions): 848 849 """ 850 Write the given 'term' and the 'doc_positions' recording the documents 851 and positions at which the term is found. 852 """ 853 854 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 855 self._write_term(term, offset, frequency, doc_frequency) 856 857 def close(self): 858 self.info_writer.close() 859 self.index_writer.close() 860 self.position_dict_writer.close() 861 862 class TermDictionaryReader: 863 864 "Reading term dictionaries." 865 866 def __init__(self, info_reader, index_reader, position_dict_reader): 867 self.info_reader = info_reader 868 self.index_reader = index_reader 869 self.position_dict_reader = position_dict_reader 870 871 self.terms = [] 872 try: 873 while 1: 874 self.terms.append(self.index_reader.read_term()) 875 except EOFError: 876 pass 877 878 # Large numbers for ordering purposes. 879 880 if self.terms: 881 self.max_offset = self.terms[-1][1] + 1 882 else: 883 self.max_offset = None 884 885 def _find_closest_entry(self, term): 886 887 """ 888 Find the offsets and frequencies of 'term' from the term dictionary or 889 the closest term starting with the value of 'term'. 890 891 Return the closest index entry consisting of a term, the position file 892 offset, the term frequency, the document frequency, and the term details 893 file offset. 894 """ 895 896 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 897 898 # Get the entry position providing the term or one preceding it. 899 # If no entry precedes the requested term, return the very first entry 900 # as the closest. 901 902 if i == -1: 903 return self.terms[0] 904 else: 905 return self.terms[i] 906 907 def _find_closest_term(self, term): 908 909 """ 910 Find the offsets and frequencies of 'term' from the term dictionary or 911 the closest term starting with the value of 'term'. 912 913 Return the closest term (or the term itself), the position file offset, 914 the term frequency, the document frequency, and the term details file 915 offset (or None if the reader is already positioned). 916 """ 917 918 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 919 920 # Where the term is found immediately, return the offset and 921 # frequencies. If the term does not appear, return the details of the 922 # closest entry. 923 924 if term <= found_term: 925 return found_term, offset, frequency, doc_frequency, info_offset 926 927 # Otherwise, seek past the index term's entry in the information file 928 # and scan for the desired term. 929 930 else: 931 self.info_reader.go_to_term(found_term, offset, info_offset) 932 try: 933 while term > found_term: 934 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 935 except EOFError: 936 pass 937 938 return found_term, offset, frequency, doc_frequency, None 939 940 def _find_term(self, term): 941 942 """ 943 Find the position file offset and frequency of 'term' from the term 944 dictionary. 945 """ 946 947 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 948 949 # If the term is found, return the offset and frequencies. 950 951 if term == found_term: 952 return offset, frequency, doc_frequency 953 else: 954 return None 955 956 def _get_positions(self, offset, doc_frequency): 957 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 958 959 # Sequential access methods. 960 961 def rewind(self): 962 self.info_reader.rewind() 963 964 def read_term(self): 965 966 """ 967 Return the next term, its frequency, its document frequency, and the 968 documents and positions at which the term is found. 969 """ 970 971 term, offset, frequency, doc_frequency = self.info_reader.read_term() 972 positions = self._get_positions(offset, doc_frequency) 973 return term, frequency, doc_frequency, positions 974 975 # Query methods. 976 977 def find_terms(self, term): 978 979 "Return all terms whose values start with the value of 'term'." 980 981 terms = [] 982 983 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 984 985 # Position the reader, if necessary. 986 987 if info_offset is not None: 988 self.info_reader.go_to_term(found_term, offset, info_offset) 989 990 # Read and record terms. 991 992 try: 993 # Add the found term if it starts with the specified term. 994 995 while found_term.startswith(term): 996 terms.append(found_term) 997 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 998 999 except EOFError: 1000 pass 1001 1002 return terms 1003 1004 def find_positions(self, term): 1005 1006 "Return the documents and positions at which the given 'term' is found." 1007 1008 t = self._find_term(term) 1009 if t is None: 1010 return None 1011 else: 1012 offset, frequency, doc_frequency = t 1013 return self._get_positions(offset, doc_frequency) 1014 1015 def get_frequency(self, term): 1016 1017 "Return the frequency of the given 'term'." 1018 1019 t = self._find_term(term) 1020 if t is None: 1021 return None 1022 else: 1023 offset, frequency, doc_frequency = t 1024 return frequency 1025 1026 def get_document_frequency(self, term): 1027 1028 "Return the document frequency of the given 'term'." 1029 1030 t = self._find_term(term) 1031 if t is None: 1032 return None 1033 else: 1034 offset, frequency, doc_frequency = t 1035 return doc_frequency 1036 1037 def close(self): 1038 self.info_reader.close() 1039 self.index_reader.close() 1040 self.position_dict_reader.close() 1041 1042 # Specific classes for storing document information. 1043 1044 class FieldWriter(FileWriter): 1045 1046 "Writing field data to files." 1047 1048 def reset(self): 1049 self.last_docnum = 0 1050 1051 def write_fields(self, docnum, fields): 1052 1053 """ 1054 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1055 representing field identifiers and values respectively). 1056 Return the offset at which the fields are stored. 1057 """ 1058 1059 offset = self.f.tell() 1060 1061 # Write the document number delta. 1062 1063 self.write_number(docnum - self.last_docnum) 1064 1065 # Write the number of fields. 1066 1067 self.write_number(len(fields)) 1068 1069 # Write the fields themselves. 1070 1071 for i, field in fields: 1072 self.write_number(i) 1073 self.write_string(field, 1) # compress 1074 1075 self.last_docnum = docnum 1076 return offset 1077 1078 class FieldReader(FileReader): 1079 1080 "Reading field data from files." 1081 1082 def reset(self): 1083 self.last_docnum = 0 1084 1085 def read_fields(self): 1086 1087 """ 1088 Read fields from the file, returning a tuple containing the document 1089 number and a list of field (identifier, value) pairs. 1090 """ 1091 1092 # Read the document number. 1093 1094 self.last_docnum += self.read_number() 1095 1096 # Read the number of fields. 1097 1098 nfields = self.read_number() 1099 1100 # Collect the fields. 1101 1102 fields = [] 1103 i = 0 1104 1105 while i < nfields: 1106 identifier = self.read_number() 1107 value = self.read_string(1) # decompress 1108 fields.append((identifier, value)) 1109 i += 1 1110 1111 return self.last_docnum, fields 1112 1113 def read_document_fields(self, docnum, offset): 1114 1115 """ 1116 Read fields for 'docnum' at the given 'offset'. This permits the 1117 retrieval of details for the specified document, as well as scanning for 1118 later documents. 1119 """ 1120 1121 self.f.seek(offset) 1122 bad_docnum, fields = self.read_fields() 1123 self.last_docnum = docnum 1124 return docnum, fields 1125 1126 class FieldIndexWriter(FileWriter): 1127 1128 "Writing field index details to files." 1129 1130 def reset(self): 1131 self.last_docnum = 0 1132 self.last_offset = 0 1133 1134 def write_document(self, docnum, offset): 1135 1136 """ 1137 Write for the given 'docnum', the 'offset' at which the fields for the 1138 document are stored in the fields file. 1139 """ 1140 1141 # Write the document number and offset deltas. 1142 1143 self.write_number(docnum - self.last_docnum) 1144 self.write_number(offset - self.last_offset) 1145 1146 self.last_docnum = docnum 1147 self.last_offset = offset 1148 1149 class FieldIndexReader(FileReader): 1150 1151 "Reading field index details from files." 1152 1153 def reset(self): 1154 self.last_docnum = 0 1155 self.last_offset = 0 1156 1157 def read_document(self): 1158 1159 "Read a document number and field file offset." 1160 1161 # Read the document number delta and offset. 1162 1163 self.last_docnum += self.read_number() 1164 self.last_offset += self.read_number() 1165 1166 return self.last_docnum, self.last_offset 1167 1168 class FieldDictionaryWriter: 1169 1170 "Writing field dictionary details." 1171 1172 def __init__(self, field_writer, field_index_writer, interval): 1173 self.field_writer = field_writer 1174 self.field_index_writer = field_index_writer 1175 self.interval = interval 1176 self.entry = 0 1177 1178 def write_fields(self, docnum, fields): 1179 1180 "Write details of the document with the given 'docnum' and 'fields'." 1181 1182 offset = self.field_writer.write_fields(docnum, fields) 1183 1184 if self.entry % self.interval == 0: 1185 self.field_index_writer.write_document(docnum, offset) 1186 1187 self.entry += 1 1188 1189 def close(self): 1190 self.field_writer.close() 1191 self.field_index_writer.close() 1192 1193 class FieldDictionaryReader: 1194 1195 "Reading field dictionary details." 1196 1197 def __init__(self, field_reader, field_index_reader): 1198 self.field_reader = field_reader 1199 self.field_index_reader = field_index_reader 1200 1201 self.docs = [] 1202 try: 1203 while 1: 1204 self.docs.append(self.field_index_reader.read_document()) 1205 except EOFError: 1206 pass 1207 1208 # Large numbers for ordering purposes. 1209 1210 if self.docs: 1211 self.max_offset = self.docs[-1][1] 1212 else: 1213 self.max_offset = None 1214 1215 def rewind(self): 1216 self.field_reader.rewind() 1217 1218 def read_fields(self): 1219 1220 "Return the next document number and fields." 1221 1222 return self.field_reader.read_fields() 1223 1224 def get_fields(self, docnum): 1225 1226 "Read the fields of the document with the given 'docnum'." 1227 1228 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1229 1230 # Get the entry position providing the term or one preceding it. 1231 1232 if i == -1: 1233 return None 1234 1235 found_docnum, offset = self.docs[i] 1236 1237 # Read from the fields file. 1238 1239 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1240 1241 # Scan for the document, if necessary. 1242 1243 try: 1244 while docnum > found_docnum: 1245 found_docnum, fields = self.field_reader.read_fields() 1246 except EOFError: 1247 pass 1248 1249 # If the document is found, return the fields. 1250 1251 if docnum == found_docnum: 1252 return fields 1253 else: 1254 return None 1255 1256 def close(self): 1257 self.field_reader.close() 1258 self.field_index_reader.close() 1259 1260 # Dictionary merging classes. 1261 1262 class Merger: 1263 1264 "Merge files." 1265 1266 def __init__(self, writer, readers): 1267 self.writer = writer 1268 self.readers = readers 1269 1270 def close(self): 1271 for reader in self.readers: 1272 reader.close() 1273 self.writer.close() 1274 1275 class TermDictionaryMerger(Merger): 1276 1277 "Merge term and position files." 1278 1279 def merge(self): 1280 1281 """ 1282 Merge terms and positions from the readers, sending them to the writer. 1283 """ 1284 1285 entries = [] 1286 1287 # Get the first entries from the readers. 1288 1289 for partition, reader in enumerate(self.readers): 1290 reader.rewind() 1291 1292 try: 1293 term, frequency, doc_frequency, positions = reader.read_term() 1294 insort_right(entries, (term, positions, partition)) 1295 except EOFError: 1296 pass 1297 1298 # While entries are available, write them out in order, merging where 1299 # appropriate. 1300 1301 while entries: 1302 term, doc_positions, partition = entries[0] 1303 to_update = [partition] 1304 1305 nentries = len(entries) 1306 i = 1 1307 1308 # Find other entries for the term. 1309 1310 while i < nentries: 1311 other_term, other_doc_positions, other_partition = entries[i] 1312 1313 # For such entries, merge the positions. 1314 1315 if other_term == term: 1316 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1317 to_update.append(other_partition) 1318 i += 1 1319 else: 1320 break 1321 1322 # Write the combined term details. 1323 1324 self.writer.write_term_positions(term, doc_positions) 1325 1326 # Update the entries from the affected readers. 1327 1328 del entries[:i] 1329 1330 for partition in to_update: 1331 try: 1332 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1333 insort_right(entries, (term, positions, partition)) 1334 except EOFError: 1335 pass 1336 1337 def merge_positions(self, doc_positions, other_doc_positions): 1338 1339 """ 1340 Merge 'doc_positions' with 'other_doc_positions' so that common document 1341 records contain positions from both collections. 1342 """ 1343 1344 doc_position_dict = dict(doc_positions) 1345 1346 for docnum, positions in other_doc_positions: 1347 if doc_position_dict.has_key(docnum): 1348 doc_position_dict[docnum] += positions 1349 else: 1350 doc_position_dict[docnum] = positions 1351 1352 return doc_position_dict.items() 1353 1354 class FieldDictionaryMerger(Merger): 1355 1356 "Merge field files." 1357 1358 def merge(self): 1359 1360 """ 1361 Merge fields from the readers, sending them to the writer. 1362 """ 1363 1364 entries = [] 1365 1366 # Get the first entries from the readers. 1367 1368 for partition, reader in enumerate(self.readers): 1369 reader.rewind() 1370 1371 try: 1372 docnum, fields = reader.read_fields() 1373 insort_right(entries, (docnum, fields, partition)) 1374 except EOFError: 1375 pass 1376 1377 # While entries are available, write them out in order, merging where 1378 # appropriate. 1379 1380 while entries: 1381 docnum, fields, partition = entries[0] 1382 to_update = [partition] 1383 1384 nentries = len(entries) 1385 i = 1 1386 1387 # Find other entries for the term. 1388 1389 while i < nentries: 1390 other_docnum, other_fields, other_partition = entries[i] 1391 1392 # For such entries, merge the positions. 1393 1394 if other_docnum == docnum: 1395 fields += other_fields 1396 to_update.append(other_partition) 1397 i += 1 1398 else: 1399 break 1400 1401 # Write the combined term details. 1402 1403 self.writer.write_fields(docnum, fields) 1404 1405 # Update the entries from the affected readers. 1406 1407 del entries[:i] 1408 1409 for partition in to_update: 1410 try: 1411 docnum, fields = self.readers[partition].read_fields() 1412 insort_right(entries, (docnum, fields, partition)) 1413 except EOFError: 1414 pass 1415 1416 # Utility functions. 1417 1418 def get_term_writer(pathname, partition, interval, doc_interval): 1419 1420 """ 1421 Return a term dictionary writer using files under the given 'pathname' 1422 labelled according to the given 'partition', using the given indexing 1423 'interval' for terms and 'doc_interval' for document position records. 1424 """ 1425 1426 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1427 info_writer = TermWriter(tdf) 1428 1429 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1430 index_writer = TermIndexWriter(tdif) 1431 1432 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1433 positions_writer = PositionWriter(tpf) 1434 1435 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1436 positions_index_writer = PositionIndexWriter(tpif) 1437 1438 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1439 1440 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1441 1442 def get_field_writer(pathname, partition, interval): 1443 1444 """ 1445 Return a field dictionary writer using files under the given 'pathname' 1446 labelled according to the given 'partition', using the given indexing 1447 'interval'. 1448 """ 1449 1450 ff = open(join(pathname, "fields-%s" % partition), "wb") 1451 field_writer = FieldWriter(ff) 1452 1453 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1454 field_index_writer = FieldIndexWriter(fif) 1455 1456 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1457 1458 def get_term_reader(pathname, partition): 1459 1460 """ 1461 Return a term dictionary reader using files under the given 'pathname' 1462 labelled according to the given 'partition'. 1463 """ 1464 1465 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1466 info_reader = TermReader(tdf) 1467 1468 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1469 index_reader = TermIndexReader(tdif) 1470 1471 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1472 positions_reader = PositionReader(tpf) 1473 1474 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1475 positions_index_reader = PositionIndexReader(tpif) 1476 1477 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1478 1479 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1480 1481 def get_field_reader(pathname, partition): 1482 1483 """ 1484 Return a field dictionary reader using files under the given 'pathname' 1485 labelled according to the given 'partition'. 1486 """ 1487 1488 ff = open(join(pathname, "fields-%s" % partition), "rb") 1489 field_reader = FieldReader(ff) 1490 1491 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1492 field_index_reader = FieldIndexReader(fif) 1493 1494 return FieldDictionaryReader(field_reader, field_index_reader) 1495 1496 def rename_files(pathname, names, from_partition, to_partition): 1497 for name in names: 1498 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1499 1500 def rename_term_files(pathname, from_partition, to_partition): 1501 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1502 1503 def rename_field_files(pathname, from_partition, to_partition): 1504 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1505 1506 def remove_files(pathname, names, partition): 1507 for name in names: 1508 remove(join(pathname, "%s-%s" % (name, partition))) 1509 1510 def remove_term_files(pathname, partition): 1511 remove_files(pathname, TERM_FILENAMES, partition) 1512 1513 def remove_field_files(pathname, partition): 1514 remove_files(pathname, FIELD_FILENAMES, partition) 1515 1516 # High-level classes. 1517 1518 class Document: 1519 1520 "A container of document information." 1521 1522 def __init__(self, docnum): 1523 self.docnum = docnum 1524 self.fields = [] 1525 self.terms = {} 1526 1527 def add_position(self, term, position): 1528 1529 """ 1530 Add a position entry for the given 'term', indicating the given 1531 'position'. 1532 """ 1533 1534 self.terms.setdefault(term, []).append(position) 1535 1536 def add_field(self, identifier, value): 1537 1538 "Add a field having the given 'identifier' and 'value'." 1539 1540 self.fields.append((identifier, unicode(value))) # convert to string 1541 1542 def set_fields(self, fields): 1543 1544 """ 1545 Set the document's 'fields': a list of tuples each containing an integer 1546 identifier and a string value. 1547 """ 1548 1549 self.fields = fields 1550 1551 class IndexWriter: 1552 1553 """ 1554 Building term information and writing it to the term and field dictionaries. 1555 """ 1556 1557 def __init__(self, pathname, interval, doc_interval, flush_interval): 1558 self.pathname = pathname 1559 self.interval = interval 1560 self.doc_interval = doc_interval 1561 self.flush_interval = flush_interval 1562 1563 self.dict_partition = 0 1564 self.field_dict_partition = 0 1565 1566 self.terms = {} 1567 self.docs = {} 1568 1569 self.doc_counter = 0 1570 1571 def add_document(self, doc): 1572 1573 """ 1574 Add the given document 'doc', updating the document counter and flushing 1575 terms and fields if appropriate. 1576 """ 1577 1578 for term, positions in doc.terms.items(): 1579 self.terms.setdefault(term, {})[doc.docnum] = positions 1580 1581 self.docs[doc.docnum] = doc.fields 1582 1583 self.doc_counter += 1 1584 if self.flush_interval and self.doc_counter >= self.flush_interval: 1585 self.flush_terms() 1586 self.flush_fields() 1587 self.doc_counter = 0 1588 1589 def get_term_writer(self): 1590 1591 "Return a term dictionary writer for the current partition." 1592 1593 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1594 1595 def get_field_writer(self): 1596 1597 "Return a field dictionary writer for the current partition." 1598 1599 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1600 1601 def flush_terms(self): 1602 1603 "Flush terms into the current term dictionary partition." 1604 1605 # Get the terms in order. 1606 1607 terms = self.terms.items() 1608 terms.sort() 1609 1610 dict_writer = self.get_term_writer() 1611 1612 for term, doc_positions in terms: 1613 doc_positions = doc_positions.items() 1614 dict_writer.write_term_positions(term, doc_positions) 1615 1616 dict_writer.close() 1617 1618 self.terms = {} 1619 self.dict_partition += 1 1620 1621 def flush_fields(self): 1622 1623 "Flush fields into the current term dictionary partition." 1624 1625 # Get the documents in order. 1626 1627 docs = self.docs.items() 1628 docs.sort() 1629 1630 field_dict_writer = self.get_field_writer() 1631 1632 for docnum, fields in docs: 1633 field_dict_writer.write_fields(docnum, fields) 1634 1635 field_dict_writer.close() 1636 1637 self.docs = {} 1638 self.field_dict_partition += 1 1639 1640 def close(self): 1641 if self.terms: 1642 self.flush_terms() 1643 if self.docs: 1644 self.flush_fields() 1645 1646 class IndexReader: 1647 1648 "Accessing the term and field dictionaries." 1649 1650 def __init__(self, pathname): 1651 self.dict_reader = get_term_reader(pathname, "merged") 1652 self.field_dict_reader = get_field_reader(pathname, "merged") 1653 1654 def find_terms(self, term): 1655 return self.dict_reader.find_terms(term) 1656 1657 def find_positions(self, term): 1658 return self.dict_reader.find_positions(term) 1659 1660 def get_frequency(self, term): 1661 return self.dict_reader.get_frequency(term) 1662 1663 def get_document_frequency(self, term): 1664 return self.dict_reader.get_document_frequency(term) 1665 1666 def get_fields(self, docnum): 1667 return self.field_dict_reader.get_fields(docnum) 1668 1669 def close(self): 1670 self.dict_reader.close() 1671 self.field_dict_reader.close() 1672 1673 class Index: 1674 1675 "An inverted index solution encapsulating the various components." 1676 1677 def __init__(self, pathname): 1678 self.pathname = pathname 1679 self.reader = None 1680 self.writer = None 1681 1682 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1683 1684 """ 1685 Return a writer, optionally using the given indexing 'interval', 1686 'doc_interval' and 'flush_interval'. 1687 """ 1688 1689 if not exists(self.pathname): 1690 mkdir(self.pathname) 1691 1692 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1693 return self.writer 1694 1695 def get_reader(self, partition=0): 1696 1697 "Return a reader for the index." 1698 1699 # Ensure that only one partition exists. 1700 1701 self.merge() 1702 return self._get_reader(partition) 1703 1704 def _get_reader(self, partition): 1705 1706 "Return a reader for the index." 1707 1708 if not exists(self.pathname): 1709 raise OSError, "Index path %r does not exist." % self.pathname 1710 1711 self.reader = IndexReader(self.pathname) 1712 return self.reader 1713 1714 def merge(self): 1715 1716 "Merge/optimise index partitions." 1717 1718 self.merge_terms() 1719 self.merge_fields() 1720 1721 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1722 1723 """ 1724 Merge term dictionaries using the given indexing 'interval' and 1725 'doc_interval'. 1726 """ 1727 1728 readers = [] 1729 partitions = set() 1730 1731 for filename in listdir(self.pathname): 1732 if filename.startswith("terms-"): # 6 character prefix 1733 partition = filename[6:] 1734 readers.append(get_term_reader(self.pathname, partition)) 1735 partitions.add(partition) 1736 1737 # Write directly to a dictionary. 1738 1739 if len(readers) > 1: 1740 if "merged" in partitions: 1741 rename_term_files(self.pathname, "merged", "old-merged") 1742 partitions.remove("merged") 1743 partitions.add("old-merged") 1744 1745 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1746 merger = TermDictionaryMerger(writer, readers) 1747 merger.merge() 1748 merger.close() 1749 1750 # Remove old files. 1751 1752 for partition in partitions: 1753 remove_term_files(self.pathname, partition) 1754 1755 elif len(readers) == 1: 1756 partition = list(partitions)[0] 1757 if partition != "merged": 1758 rename_term_files(self.pathname, partition, "merged") 1759 1760 def merge_fields(self, interval=FIELD_INTERVAL): 1761 1762 "Merge field dictionaries using the given indexing 'interval'." 1763 1764 readers = [] 1765 partitions = set() 1766 1767 for filename in listdir(self.pathname): 1768 if filename.startswith("fields-"): # 7 character prefix 1769 partition = filename[7:] 1770 readers.append(get_field_reader(self.pathname, partition)) 1771 partitions.add(partition) 1772 1773 # Write directly to a dictionary. 1774 1775 if len(readers) > 1: 1776 if "merged" in partitions: 1777 rename_field_files(self.pathname, "merged", "old-merged") 1778 partitions.remove("merged") 1779 partitions.add("old-merged") 1780 1781 writer = get_field_writer(self.pathname, "merged", interval) 1782 merger = FieldDictionaryMerger(writer, readers) 1783 merger.merge() 1784 merger.close() 1785 1786 # Remove old files. 1787 1788 for partition in partitions: 1789 remove_field_files(self.pathname, partition) 1790 1791 elif len(readers) == 1: 1792 partition = list(partitions)[0] 1793 if partition != "merged": 1794 rename_field_files(self.pathname, partition, "merged") 1795 1796 def close(self): 1797 if self.reader is not None: 1798 self.reader.close() 1799 self.reader = None 1800 if self.writer is not None: 1801 self.writer.close() 1802 self.writer = None 1803 1804 # vim: tabstop=4 expandtab shiftwidth=4