1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 100000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing a 7-bit number. 87 88 elif number < 128: 89 self.f.write(chr(number)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Make sure that the positions are sorted. 229 230 positions.sort() 231 232 # Write the position deltas. 233 234 last = 0 235 for position in positions: 236 pos = position - last 237 self.write_number(pos) 238 last = position 239 240 self.last_docnum = docnum 241 242 return offset 243 244 class PositionReader(FileReader): 245 246 "Reading position information from files." 247 248 def reset(self): 249 self.last_docnum = 0 250 251 def read_positions(self): 252 253 "Read positions, returning a document number and a list of positions." 254 255 # Read the document number delta and add it to the last number. 256 257 self.last_docnum += self.read_number() 258 259 # Read the number of positions. 260 261 npositions = self.read_number() 262 263 # Read the position deltas, adding each previous position to get the 264 # appropriate collection of absolute positions. 265 266 i = 0 267 last = 0 268 positions = [] 269 270 while i < npositions: 271 last += self.read_number() 272 positions.append(last) 273 i += 1 274 275 return self.last_docnum, positions 276 277 def read_term_positions(self, offset, count): 278 279 """ 280 Read all positions from 'offset', seeking to that position in the file 281 before reading. The number of documents available for reading is limited 282 to 'count'. 283 """ 284 285 # Duplicate the file handle. 286 287 f = fdopen(dup(self.f.fileno()), "rb") 288 f.seek(offset) 289 return PositionIterator(f, count) 290 291 class PositionIndexWriter(FileWriter): 292 293 "Writing position index information to files." 294 295 def reset(self): 296 self.last_docnum = 0 297 self.last_pos_offset = 0 298 299 def write_positions(self, docnum, pos_offset, count): 300 301 """ 302 Write the given 'docnum, 'pos_offset' and document 'count' to the 303 position index file. 304 """ 305 306 # Record the offset of this record. 307 308 offset = self.f.tell() 309 310 # Write the document number delta. 311 312 self.write_number(docnum - self.last_docnum) 313 self.last_docnum = docnum 314 315 # Write the position file offset delta. 316 317 self.write_number(pos_offset - self.last_pos_offset) 318 self.last_pos_offset = pos_offset 319 320 # Write the document count. 321 322 self.write_number(count) 323 324 return offset 325 326 class PositionIndexReader(FileReader): 327 328 "Reading position index information from files." 329 330 def reset(self): 331 self.last_docnum = 0 332 self.last_pos_offset = 0 333 334 def read_positions(self): 335 336 """ 337 Read a document number, a position file offset for the position index 338 file, and the number of documents in a section of that file. 339 """ 340 341 # Read the document number delta. 342 343 self.last_docnum += self.read_number() 344 345 # Read the offset delta. 346 347 self.last_pos_offset += self.read_number() 348 349 # Read the document count. 350 351 count = self.read_number() 352 353 return self.last_docnum, self.last_pos_offset, count 354 355 def read_term_positions(self, offset, doc_frequency): 356 357 """ 358 Read all positions from 'offset', seeking to that position in the file 359 before reading. The number of documents available for reading is limited 360 to 'doc_frequency'. 361 """ 362 363 # Duplicate the file handle. 364 365 f = fdopen(dup(self.f.fileno()), "rb") 366 f.seek(offset) 367 return PositionIndexIterator(f, doc_frequency) 368 369 # Iterators for position-related files. 370 371 class IteratorBase: 372 373 def __init__(self, count): 374 self.replenish(count) 375 376 def replenish(self, count): 377 self.count = count 378 self.read_documents = 0 379 380 def __len__(self): 381 return self.count 382 383 def sort(self): 384 pass # Stored document positions are already sorted. 385 386 def __iter__(self): 387 return self 388 389 class PositionIterator(PositionReader, IteratorBase): 390 391 "Iterating over document positions." 392 393 def __init__(self, f, count): 394 PositionReader.__init__(self, f) 395 IteratorBase.__init__(self, count) 396 397 def next(self): 398 399 "Read positions for a single document." 400 401 if self.read_documents < self.count: 402 self.read_documents += 1 403 return self.read_positions() 404 else: 405 raise StopIteration 406 407 class PositionIndexIterator(PositionIndexReader, IteratorBase): 408 409 "Iterating over document positions." 410 411 def __init__(self, f, count): 412 PositionIndexReader.__init__(self, f) 413 IteratorBase.__init__(self, count) 414 self.section_count = 0 415 416 def next(self): 417 418 "Read positions for a single document." 419 420 self.read_documents += self.section_count 421 if self.read_documents < self.count: 422 docnum, pos_offset, self.section_count = t = self.read_positions() 423 return t 424 else: 425 raise StopIteration 426 427 class PositionDictionaryWriter: 428 429 "Writing position dictionaries." 430 431 def __init__(self, position_writer, position_index_writer, interval): 432 self.position_writer = position_writer 433 self.position_index_writer = position_index_writer 434 self.interval = interval 435 436 def write_term_positions(self, doc_positions): 437 438 """ 439 Write all 'doc_positions' - a collection of tuples of the form (document 440 number, position list) - to the file. 441 442 Add some records to the index, making dictionary entries. 443 444 Return a tuple containing the offset of the written data, the frequency 445 (number of positions), and document frequency (number of documents) for 446 the term involved. 447 """ 448 449 # Reset the writers. 450 451 self.position_writer.reset() 452 self.position_index_writer.reset() 453 454 index_offset = None 455 456 # Write the positions. 457 458 frequency = 0 459 first_docnum = None 460 first_offset = None 461 count = 0 462 463 doc_positions.sort() 464 465 for docnum, positions in doc_positions: 466 pos_offset = self.position_writer.write_positions(docnum, positions) 467 468 # Retain the first record offset for a subsequent index entry. 469 470 if first_offset is None: 471 first_offset = pos_offset 472 first_docnum = docnum 473 474 frequency += len(positions) 475 count += 1 476 477 # Every {interval} entries, write an index entry. 478 479 if count == self.interval: 480 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 481 482 # Remember the first index entry offset. 483 484 if index_offset is None: 485 index_offset = io 486 487 first_offset = None 488 first_docnum = None 489 count = 0 490 491 # Reset the position writer so that position readers accessing 492 # a section start with the correct document number. 493 494 self.position_writer.reset() 495 496 # Finish writing an index entry for the remaining documents. 497 498 else: 499 if first_offset is not None: 500 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 501 502 # Remember the first index entry offset. 503 504 if index_offset is None: 505 index_offset = io 506 507 return index_offset, frequency, len(doc_positions) 508 509 def close(self): 510 self.position_writer.close() 511 self.position_index_writer.close() 512 513 class PositionDictionaryReader: 514 515 "Reading position dictionaries." 516 517 def __init__(self, position_reader, position_index_reader): 518 self.position_reader = position_reader 519 self.position_index_reader = position_index_reader 520 521 def read_term_positions(self, offset, doc_frequency): 522 523 """ 524 Return an iterator for dictionary entries starting at 'offset' with the 525 given 'doc_frequency'. 526 """ 527 528 return PositionDictionaryIterator(self.position_reader, 529 self.position_index_reader, offset, doc_frequency) 530 531 def close(self): 532 self.position_reader.close() 533 self.position_index_reader.close() 534 535 class PositionDictionaryIterator: 536 537 "Iteration over position dictionary entries." 538 539 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 540 self.position_reader = position_reader 541 self.doc_frequency = doc_frequency 542 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 543 544 # Remember the last values. 545 546 self.found_docnum, self.found_positions = None, None 547 548 # Maintain state for the next index entry, if read. 549 550 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 551 552 # Initialise the current index entry and current position file iterator. 553 554 self._next_section() 555 self._init_section() 556 557 def __len__(self): 558 return self.doc_frequency 559 560 def sort(self): 561 pass 562 563 def __iter__(self): 564 return self 565 566 def next(self): 567 568 """ 569 Attempt to get the next document record from the section in the 570 positions file. 571 """ 572 573 # Return any visited but unrequested record. 574 575 if self.found_docnum is not None: 576 t = self.found_docnum, self.found_positions 577 self.found_docnum, self.found_positions = None, None 578 return t 579 580 # Or search for the next record. 581 582 while 1: 583 584 # Either return the next record. 585 586 try: 587 return self.iterator.next() 588 589 # Or, where a section is finished, get the next section and try again. 590 591 except StopIteration: 592 593 # Where a section follows, update the index iterator, but keep 594 # reading using the same file iterator (since the data should 595 # just follow on from the last section). 596 597 self._next_section() 598 self.iterator.replenish(self.section_count) 599 600 # Reset the state of the iterator to make sure that document 601 # numbers are correct. 602 603 self.iterator.reset() 604 605 def from_document(self, docnum): 606 607 """ 608 Attempt to navigate to a positions entry for the given 'docnum', 609 returning the positions for 'docnum', or None otherwise. 610 """ 611 612 # Return any unrequested document positions. 613 614 if docnum == self.found_docnum: 615 return self.found_positions 616 617 # Read ahead in the index until the next entry refers to a document 618 # later than the desired document. 619 620 try: 621 if self.next_docnum is None: 622 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 623 624 # Read until the next entry is after the desired document number, 625 # or until the end of the results. 626 627 while self.next_docnum <= docnum: 628 self._next_read_section() 629 if self.docnum < docnum: 630 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 631 else: 632 break 633 634 except StopIteration: 635 pass 636 637 # Navigate in the position file to the document. 638 639 self._init_section() 640 641 try: 642 while 1: 643 found_docnum, found_positions = self.iterator.next() 644 645 # Return the desired document positions or None (retaining the 646 # positions for the document immediately after). 647 648 if docnum == found_docnum: 649 return found_positions 650 elif docnum < found_docnum: 651 self.found_docnum, self.found_positions = found_docnum, found_positions 652 return None 653 654 except StopIteration: 655 return None 656 657 # Internal methods. 658 659 def _next_section(self): 660 661 "Attempt to get the next section in the index." 662 663 if self.next_docnum is None: 664 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 665 else: 666 self._next_read_section() 667 668 def _next_read_section(self): 669 670 """ 671 Make the next index entry the current one without reading from the 672 index. 673 """ 674 675 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 676 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 677 678 def _init_section(self): 679 680 "Initialise the iterator for the section in the position file." 681 682 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 683 684 class TermWriter(FileWriter): 685 686 "Writing term information to files." 687 688 def reset(self): 689 self.last_term = "" 690 self.last_offset = 0 691 692 def write_term(self, term, offset, frequency, doc_frequency): 693 694 """ 695 Write the given 'term', its position file 'offset', its 'frequency' and 696 its 'doc_frequency' (number of documents in which it appears) to the 697 term information file. Return the offset after the term information was 698 written to the file. 699 """ 700 701 # Write the prefix length and term suffix. 702 703 common = len(commonprefix([self.last_term, term])) 704 suffix = term[common:] 705 706 self.write_number(common) 707 self.write_string(suffix) 708 709 # Write the offset delta. 710 711 self.write_number(offset - self.last_offset) 712 713 # Write the frequency. 714 715 self.write_number(frequency) 716 717 # Write the document frequency. 718 719 self.write_number(doc_frequency) 720 721 self.last_term = term 722 self.last_offset = offset 723 724 return self.f.tell() 725 726 class TermReader(FileReader): 727 728 "Reading term information from files." 729 730 def reset(self): 731 self.last_term = "" 732 self.last_offset = 0 733 734 def read_term(self): 735 736 """ 737 Read a term, its position file offset, its frequency and its document 738 frequency from the term information file. 739 """ 740 741 # Read the prefix length and term suffix. 742 743 common = self.read_number() 744 suffix = self.read_string() 745 746 self.last_term = self.last_term[:common] + suffix 747 748 # Read the offset delta. 749 750 self.last_offset += self.read_number() 751 752 # Read the frequency. 753 754 frequency = self.read_number() 755 756 # Read the document frequency. 757 758 doc_frequency = self.read_number() 759 760 return self.last_term, self.last_offset, frequency, doc_frequency 761 762 def go_to_term(self, term, offset, info_offset): 763 764 """ 765 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 766 permits the scanning for later terms from the specified term. 767 """ 768 769 self.f.seek(info_offset) 770 self.last_term = term 771 self.last_offset = offset 772 773 class TermIndexWriter(TermWriter): 774 775 "Writing term dictionary index details to files." 776 777 def reset(self): 778 TermWriter.reset(self) 779 self.last_info_offset = 0 780 781 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 782 783 """ 784 Write the given 'term', its position file 'offset', its 'frequency' and 785 its 'doc_frequency' to the term dictionary index file, along with the 786 'info_offset' in the term information file. 787 """ 788 789 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 790 791 # Write the information file offset delta. 792 793 self.write_number(info_offset - self.last_info_offset) 794 self.last_info_offset = info_offset 795 796 class TermIndexReader(TermReader): 797 798 "Reading term dictionary index details from files." 799 800 def reset(self): 801 TermReader.reset(self) 802 self.last_info_offset = 0 803 804 def read_term(self): 805 806 """ 807 Read a term, its position file offset, its frequency, its document 808 frequency and a term information file offset from the term dictionary 809 index file. 810 """ 811 812 term, offset, frequency, doc_frequency = TermReader.read_term(self) 813 814 # Read the offset delta. 815 816 self.last_info_offset += self.read_number() 817 818 return term, offset, frequency, doc_frequency, self.last_info_offset 819 820 class TermDictionaryWriter: 821 822 "Writing term dictionaries." 823 824 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 825 self.info_writer = info_writer 826 self.index_writer = index_writer 827 self.position_dict_writer = position_dict_writer 828 self.interval = interval 829 self.entry = 0 830 831 def _write_term(self, term, offset, frequency, doc_frequency): 832 833 """ 834 Write the given 'term', its position file 'offset', its 'frequency' and 835 its 'doc_frequency' (number of documents in which it appears) to the 836 term information file. Return the offset after the term information was 837 written to the file. 838 """ 839 840 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 841 842 if self.entry % self.interval == 0: 843 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 844 845 self.entry += 1 846 847 def write_term_positions(self, term, doc_positions): 848 849 """ 850 Write the given 'term' and the 'doc_positions' recording the documents 851 and positions at which the term is found. 852 """ 853 854 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 855 self._write_term(term, offset, frequency, doc_frequency) 856 857 def close(self): 858 self.info_writer.close() 859 self.index_writer.close() 860 self.position_dict_writer.close() 861 862 class TermDictionaryReader: 863 864 "Reading term dictionaries." 865 866 def __init__(self, info_reader, index_reader, position_dict_reader): 867 self.info_reader = info_reader 868 self.index_reader = index_reader 869 self.position_dict_reader = position_dict_reader 870 871 self.terms = [] 872 try: 873 while 1: 874 self.terms.append(self.index_reader.read_term()) 875 except EOFError: 876 pass 877 878 # Large numbers for ordering purposes. 879 880 self.max_offset = self.terms[-1][1] + 1 881 882 def _find_closest_entry(self, term): 883 884 """ 885 Find the offsets and frequencies of 'term' from the term dictionary or 886 the closest term starting with the value of 'term'. 887 888 Return the closest index entry consisting of a term, the position file 889 offset, the term frequency, the document frequency, and the term details 890 file offset. 891 """ 892 893 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 894 895 # Get the entry position providing the term or one preceding it. 896 # If no entry precedes the requested term, return the very first entry 897 # as the closest. 898 899 if i == -1: 900 return self.terms[0] 901 else: 902 return self.terms[i] 903 904 def _find_closest_term(self, term): 905 906 """ 907 Find the offsets and frequencies of 'term' from the term dictionary or 908 the closest term starting with the value of 'term'. 909 910 Return the closest term (or the term itself), the position file offset, 911 the term frequency, the document frequency, and the term details file 912 offset (or None if the reader is already positioned). 913 """ 914 915 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 916 917 # Where the term is found immediately, return the offset and 918 # frequencies. If the term does not appear, return the details of the 919 # closest entry. 920 921 if term <= found_term: 922 return found_term, offset, frequency, doc_frequency, info_offset 923 924 # Otherwise, seek past the index term's entry in the information file 925 # and scan for the desired term. 926 927 else: 928 self.info_reader.go_to_term(found_term, offset, info_offset) 929 try: 930 while term > found_term: 931 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 932 except EOFError: 933 pass 934 935 return found_term, offset, frequency, doc_frequency, None 936 937 def _find_term(self, term): 938 939 """ 940 Find the position file offset and frequency of 'term' from the term 941 dictionary. 942 """ 943 944 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 945 946 # If the term is found, return the offset and frequencies. 947 948 if term == found_term: 949 return offset, frequency, doc_frequency 950 else: 951 return None 952 953 def _get_positions(self, offset, doc_frequency): 954 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 955 956 # Sequential access methods. 957 958 def rewind(self): 959 self.info_reader.rewind() 960 961 def read_term(self): 962 963 """ 964 Return the next term, its frequency, its document frequency, and the 965 documents and positions at which the term is found. 966 """ 967 968 term, offset, frequency, doc_frequency = self.info_reader.read_term() 969 positions = self._get_positions(offset, doc_frequency) 970 return term, frequency, doc_frequency, positions 971 972 # Query methods. 973 974 def find_terms(self, term): 975 976 "Return all terms whose values start with the value of 'term'." 977 978 terms = [] 979 980 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 981 982 # Position the reader, if necessary. 983 984 if info_offset is not None: 985 self.info_reader.go_to_term(found_term, offset, info_offset) 986 987 # Read and record terms. 988 989 try: 990 # Add the found term if it starts with the specified term. 991 992 while found_term.startswith(term): 993 terms.append(found_term) 994 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 995 996 except EOFError: 997 pass 998 999 return terms 1000 1001 def find_positions(self, term): 1002 1003 "Return the documents and positions at which the given 'term' is found." 1004 1005 t = self._find_term(term) 1006 if t is None: 1007 return None 1008 else: 1009 offset, frequency, doc_frequency = t 1010 return self._get_positions(offset, doc_frequency) 1011 1012 def get_frequency(self, term): 1013 1014 "Return the frequency of the given 'term'." 1015 1016 t = self._find_term(term) 1017 if t is None: 1018 return None 1019 else: 1020 offset, frequency, doc_frequency = t 1021 return frequency 1022 1023 def get_document_frequency(self, term): 1024 1025 "Return the document frequency of the given 'term'." 1026 1027 t = self._find_term(term) 1028 if t is None: 1029 return None 1030 else: 1031 offset, frequency, doc_frequency = t 1032 return doc_frequency 1033 1034 def close(self): 1035 self.info_reader.close() 1036 self.index_reader.close() 1037 self.position_dict_reader.close() 1038 1039 # Specific classes for storing document information. 1040 1041 class FieldWriter(FileWriter): 1042 1043 "Writing field data to files." 1044 1045 def reset(self): 1046 self.last_docnum = 0 1047 1048 def write_fields(self, docnum, fields): 1049 1050 """ 1051 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1052 representing field identifiers and values respectively). 1053 Return the offset at which the fields are stored. 1054 """ 1055 1056 offset = self.f.tell() 1057 1058 # Write the document number delta. 1059 1060 self.write_number(docnum - self.last_docnum) 1061 1062 # Write the number of fields. 1063 1064 self.write_number(len(fields)) 1065 1066 # Write the fields themselves. 1067 1068 for i, field in fields: 1069 self.write_number(i) 1070 self.write_string(field, 1) # compress 1071 1072 self.last_docnum = docnum 1073 return offset 1074 1075 class FieldReader(FileReader): 1076 1077 "Reading field data from files." 1078 1079 def reset(self): 1080 self.last_docnum = 0 1081 1082 def read_fields(self): 1083 1084 """ 1085 Read fields from the file, returning a tuple containing the document 1086 number and a list of field (identifier, value) pairs. 1087 """ 1088 1089 # Read the document number. 1090 1091 self.last_docnum += self.read_number() 1092 1093 # Read the number of fields. 1094 1095 nfields = self.read_number() 1096 1097 # Collect the fields. 1098 1099 fields = [] 1100 i = 0 1101 1102 while i < nfields: 1103 identifier = self.read_number() 1104 value = self.read_string(1) # decompress 1105 fields.append((identifier, value)) 1106 i += 1 1107 1108 return self.last_docnum, fields 1109 1110 def read_document_fields(self, docnum, offset): 1111 1112 """ 1113 Read fields for 'docnum' at the given 'offset'. This permits the 1114 retrieval of details for the specified document, as well as scanning for 1115 later documents. 1116 """ 1117 1118 self.f.seek(offset) 1119 bad_docnum, fields = self.read_fields() 1120 self.last_docnum = docnum 1121 return docnum, fields 1122 1123 class FieldIndexWriter(FileWriter): 1124 1125 "Writing field index details to files." 1126 1127 def reset(self): 1128 self.last_docnum = 0 1129 self.last_offset = 0 1130 1131 def write_document(self, docnum, offset): 1132 1133 """ 1134 Write for the given 'docnum', the 'offset' at which the fields for the 1135 document are stored in the fields file. 1136 """ 1137 1138 # Write the document number and offset deltas. 1139 1140 self.write_number(docnum - self.last_docnum) 1141 self.write_number(offset - self.last_offset) 1142 1143 self.last_docnum = docnum 1144 self.last_offset = offset 1145 1146 class FieldIndexReader(FileReader): 1147 1148 "Reading field index details from files." 1149 1150 def reset(self): 1151 self.last_docnum = 0 1152 self.last_offset = 0 1153 1154 def read_document(self): 1155 1156 "Read a document number and field file offset." 1157 1158 # Read the document number delta and offset. 1159 1160 self.last_docnum += self.read_number() 1161 self.last_offset += self.read_number() 1162 1163 return self.last_docnum, self.last_offset 1164 1165 class FieldDictionaryWriter: 1166 1167 "Writing field dictionary details." 1168 1169 def __init__(self, field_writer, field_index_writer, interval): 1170 self.field_writer = field_writer 1171 self.field_index_writer = field_index_writer 1172 self.interval = interval 1173 self.entry = 0 1174 1175 def write_fields(self, docnum, fields): 1176 1177 "Write details of the document with the given 'docnum' and 'fields'." 1178 1179 offset = self.field_writer.write_fields(docnum, fields) 1180 1181 if self.entry % self.interval == 0: 1182 self.field_index_writer.write_document(docnum, offset) 1183 1184 self.entry += 1 1185 1186 def close(self): 1187 self.field_writer.close() 1188 self.field_index_writer.close() 1189 1190 class FieldDictionaryReader: 1191 1192 "Reading field dictionary details." 1193 1194 def __init__(self, field_reader, field_index_reader): 1195 self.field_reader = field_reader 1196 self.field_index_reader = field_index_reader 1197 1198 self.docs = [] 1199 try: 1200 while 1: 1201 self.docs.append(self.field_index_reader.read_document()) 1202 except EOFError: 1203 pass 1204 1205 # Large numbers for ordering purposes. 1206 1207 self.max_offset = self.docs[-1][1] 1208 1209 def rewind(self): 1210 self.field_reader.rewind() 1211 1212 def read_fields(self): 1213 1214 "Return the next document number and fields." 1215 1216 return self.field_reader.read_fields() 1217 1218 def get_fields(self, docnum): 1219 1220 "Read the fields of the document with the given 'docnum'." 1221 1222 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1223 1224 # Get the entry position providing the term or one preceding it. 1225 1226 if i == -1: 1227 return None 1228 1229 found_docnum, offset = self.docs[i] 1230 1231 # Read from the fields file. 1232 1233 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1234 1235 # Scan for the document, if necessary. 1236 1237 try: 1238 while docnum > found_docnum: 1239 found_docnum, fields = self.field_reader.read_fields() 1240 except EOFError: 1241 pass 1242 1243 # If the document is found, return the fields. 1244 1245 if docnum == found_docnum: 1246 return fields 1247 else: 1248 return None 1249 1250 def close(self): 1251 self.field_reader.close() 1252 self.field_index_reader.close() 1253 1254 # Dictionary merging classes. 1255 1256 class Merger: 1257 1258 "Merge files." 1259 1260 def __init__(self, writer, readers): 1261 self.writer = writer 1262 self.readers = readers 1263 1264 def close(self): 1265 for reader in self.readers: 1266 reader.close() 1267 self.writer.close() 1268 1269 class TermDictionaryMerger(Merger): 1270 1271 "Merge term and position files." 1272 1273 def merge(self): 1274 1275 """ 1276 Merge terms and positions from the readers, sending them to the writer. 1277 """ 1278 1279 entries = [] 1280 1281 # Get the first entries from the readers. 1282 1283 for partition, reader in enumerate(self.readers): 1284 reader.rewind() 1285 1286 try: 1287 term, frequency, doc_frequency, positions = reader.read_term() 1288 insort_right(entries, (term, positions, partition)) 1289 except EOFError: 1290 pass 1291 1292 # While entries are available, write them out in order, merging where 1293 # appropriate. 1294 1295 while entries: 1296 term, doc_positions, partition = entries[0] 1297 to_update = [partition] 1298 1299 nentries = len(entries) 1300 i = 1 1301 1302 # Find other entries for the term. 1303 1304 while i < nentries: 1305 other_term, other_doc_positions, other_partition = entries[i] 1306 1307 # For such entries, merge the positions. 1308 1309 if other_term == term: 1310 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1311 to_update.append(other_partition) 1312 i += 1 1313 else: 1314 break 1315 1316 # Write the combined term details. 1317 1318 self.writer.write_term_positions(term, doc_positions) 1319 1320 # Update the entries from the affected readers. 1321 1322 del entries[:i] 1323 1324 for partition in to_update: 1325 try: 1326 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1327 insort_right(entries, (term, positions, partition)) 1328 except EOFError: 1329 pass 1330 1331 def merge_positions(self, doc_positions, other_doc_positions): 1332 1333 """ 1334 Merge 'doc_positions' with 'other_doc_positions' so that common document 1335 records contain positions from both collections. 1336 """ 1337 1338 doc_position_dict = dict(doc_positions) 1339 1340 for docnum, positions in other_doc_positions: 1341 if doc_position_dict.has_key(docnum): 1342 doc_position_dict[docnum] += positions 1343 else: 1344 doc_position_dict[docnum] = positions 1345 1346 return doc_position_dict.items() 1347 1348 class FieldDictionaryMerger(Merger): 1349 1350 "Merge field files." 1351 1352 def merge(self): 1353 1354 """ 1355 Merge fields from the readers, sending them to the writer. 1356 """ 1357 1358 entries = [] 1359 1360 # Get the first entries from the readers. 1361 1362 for partition, reader in enumerate(self.readers): 1363 reader.rewind() 1364 1365 try: 1366 docnum, fields = reader.read_fields() 1367 insort_right(entries, (docnum, fields, partition)) 1368 except EOFError: 1369 pass 1370 1371 # While entries are available, write them out in order, merging where 1372 # appropriate. 1373 1374 while entries: 1375 docnum, fields, partition = entries[0] 1376 to_update = [partition] 1377 1378 nentries = len(entries) 1379 i = 1 1380 1381 # Find other entries for the term. 1382 1383 while i < nentries: 1384 other_docnum, other_fields, other_partition = entries[i] 1385 1386 # For such entries, merge the positions. 1387 1388 if other_docnum == docnum: 1389 fields += other_fields 1390 to_update.append(other_partition) 1391 i += 1 1392 else: 1393 break 1394 1395 # Write the combined term details. 1396 1397 self.writer.write_fields(docnum, fields) 1398 1399 # Update the entries from the affected readers. 1400 1401 del entries[:i] 1402 1403 for partition in to_update: 1404 try: 1405 docnum, fields = self.readers[partition].read_fields() 1406 insort_right(entries, (docnum, fields, partition)) 1407 except EOFError: 1408 pass 1409 1410 # Utility functions. 1411 1412 def get_term_writer(pathname, partition, interval, doc_interval): 1413 1414 """ 1415 Return a term dictionary writer using files under the given 'pathname' 1416 labelled according to the given 'partition', using the given indexing 1417 'interval' for terms and 'doc_interval' for document position records. 1418 """ 1419 1420 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1421 info_writer = TermWriter(tdf) 1422 1423 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1424 index_writer = TermIndexWriter(tdif) 1425 1426 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1427 positions_writer = PositionWriter(tpf) 1428 1429 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1430 positions_index_writer = PositionIndexWriter(tpif) 1431 1432 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1433 1434 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1435 1436 def get_field_writer(pathname, partition, interval): 1437 1438 """ 1439 Return a field dictionary writer using files under the given 'pathname' 1440 labelled according to the given 'partition', using the given indexing 1441 'interval'. 1442 """ 1443 1444 ff = open(join(pathname, "fields-%s" % partition), "wb") 1445 field_writer = FieldWriter(ff) 1446 1447 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1448 field_index_writer = FieldIndexWriter(fif) 1449 1450 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1451 1452 def get_term_reader(pathname, partition): 1453 1454 """ 1455 Return a term dictionary reader using files under the given 'pathname' 1456 labelled according to the given 'partition'. 1457 """ 1458 1459 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1460 info_reader = TermReader(tdf) 1461 1462 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1463 index_reader = TermIndexReader(tdif) 1464 1465 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1466 positions_reader = PositionReader(tpf) 1467 1468 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1469 positions_index_reader = PositionIndexReader(tpif) 1470 1471 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1472 1473 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1474 1475 def get_field_reader(pathname, partition): 1476 1477 """ 1478 Return a field dictionary reader using files under the given 'pathname' 1479 labelled according to the given 'partition'. 1480 """ 1481 1482 ff = open(join(pathname, "fields-%s" % partition), "rb") 1483 field_reader = FieldReader(ff) 1484 1485 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1486 field_index_reader = FieldIndexReader(fif) 1487 1488 return FieldDictionaryReader(field_reader, field_index_reader) 1489 1490 def rename_files(pathname, names, from_partition, to_partition): 1491 for name in names: 1492 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1493 1494 def rename_term_files(pathname, from_partition, to_partition): 1495 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1496 1497 def rename_field_files(pathname, from_partition, to_partition): 1498 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1499 1500 def remove_files(pathname, names, partition): 1501 for name in names: 1502 remove(join(pathname, "%s-%s" % (name, partition))) 1503 1504 def remove_term_files(pathname, partition): 1505 remove_files(pathname, TERM_FILENAMES, partition) 1506 1507 def remove_field_files(pathname, partition): 1508 remove_files(pathname, FIELD_FILENAMES, partition) 1509 1510 # High-level classes. 1511 1512 class IndexWriter: 1513 1514 """ 1515 Building term information and writing it to the term and field dictionaries. 1516 """ 1517 1518 def __init__(self, pathname, interval, doc_interval, flush_interval): 1519 self.pathname = pathname 1520 self.interval = interval 1521 self.doc_interval = doc_interval 1522 self.flush_interval = flush_interval 1523 1524 self.dict_partition = 0 1525 self.field_dict_partition = 0 1526 1527 self.terms = {} 1528 self.docs = {} 1529 1530 self.doc_counter = 0 1531 1532 def add_position(self, term, docnum, position): 1533 1534 """ 1535 Add a position entry for the given 'term' in the document with the given 1536 'docnum', indicating the given 'position'. 1537 """ 1538 1539 if not self.terms.has_key(term): 1540 doc_positions = self.terms[term] = {} 1541 else: 1542 doc_positions = self.terms[term] 1543 1544 if not doc_positions.has_key(docnum): 1545 doc = doc_positions[docnum] = [] 1546 else: 1547 doc = doc_positions[docnum] 1548 1549 doc.append(position) 1550 1551 def add_field(self, docnum, identifier, value): 1552 1553 """ 1554 Add for the document with the given 'docnum' a field having the given 1555 'identifier' and 'value'. 1556 """ 1557 1558 if not self.docs.has_key(docnum): 1559 doc_fields = self.docs[docnum] = [] 1560 else: 1561 doc_fields = self.docs[docnum] 1562 1563 doc_fields.append((identifier, unicode(value))) # convert to string 1564 1565 def set_fields(self, docnum, fields): 1566 1567 """ 1568 Add for the document with the given 'docnum' the given 'fields': a list 1569 of tuples each containing an integer identifier and a string value. 1570 """ 1571 1572 self.docs[docnum] = fields 1573 1574 def commit_document(self): 1575 1576 "Update the document counter, flushing terms and fields if appropriate." 1577 1578 self.doc_counter += 1 1579 if self.flush_interval and self.doc_counter >= self.flush_interval: 1580 self.flush_terms() 1581 self.flush_fields() 1582 self.doc_counter = 0 1583 1584 def get_term_writer(self): 1585 1586 "Return a term dictionary writer for the current partition." 1587 1588 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1589 1590 def get_field_writer(self): 1591 1592 "Return a field dictionary writer for the current partition." 1593 1594 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1595 1596 def flush_terms(self): 1597 1598 "Flush terms into the current term dictionary partition." 1599 1600 # Get the terms in order. 1601 1602 terms = self.terms.items() 1603 terms.sort() 1604 1605 dict_writer = self.get_term_writer() 1606 1607 for term, doc_positions in terms: 1608 doc_positions = doc_positions.items() 1609 dict_writer.write_term_positions(term, doc_positions) 1610 1611 dict_writer.close() 1612 1613 self.terms = {} 1614 self.dict_partition += 1 1615 1616 def flush_fields(self): 1617 1618 "Flush fields into the current term dictionary partition." 1619 1620 # Get the documents in order. 1621 1622 docs = self.docs.items() 1623 docs.sort() 1624 1625 field_dict_writer = self.get_field_writer() 1626 1627 for docnum, fields in docs: 1628 field_dict_writer.write_fields(docnum, fields) 1629 1630 field_dict_writer.close() 1631 1632 self.docs = {} 1633 self.field_dict_partition += 1 1634 1635 def close(self): 1636 if self.terms: 1637 self.flush_terms() 1638 if self.docs: 1639 self.flush_fields() 1640 1641 class IndexReader: 1642 1643 "Accessing the term and field dictionaries." 1644 1645 def __init__(self, pathname): 1646 self.dict_reader = get_term_reader(pathname, "merged") 1647 self.field_dict_reader = get_field_reader(pathname, "merged") 1648 1649 def find_terms(self, term): 1650 return self.dict_reader.find_terms(term) 1651 1652 def find_positions(self, term): 1653 return self.dict_reader.find_positions(term) 1654 1655 def get_frequency(self, term): 1656 return self.dict_reader.get_frequency(term) 1657 1658 def get_document_frequency(self, term): 1659 return self.dict_reader.get_document_frequency(term) 1660 1661 def get_fields(self, docnum): 1662 return self.field_dict_reader.get_fields(docnum) 1663 1664 def close(self): 1665 self.dict_reader.close() 1666 self.field_dict_reader.close() 1667 1668 class Index: 1669 1670 "An inverted index solution encapsulating the various components." 1671 1672 def __init__(self, pathname): 1673 self.pathname = pathname 1674 self.reader = None 1675 self.writer = None 1676 1677 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1678 1679 """ 1680 Return a writer, optionally using the given indexing 'interval', 1681 'doc_interval' and 'flush_interval'. 1682 """ 1683 1684 if not exists(self.pathname): 1685 mkdir(self.pathname) 1686 1687 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1688 return self.writer 1689 1690 def get_reader(self, partition=0): 1691 1692 "Return a reader for the index." 1693 1694 # Ensure that only one partition exists. 1695 1696 self.merge() 1697 return self._get_reader(partition) 1698 1699 def _get_reader(self, partition): 1700 1701 "Return a reader for the index." 1702 1703 if not exists(self.pathname): 1704 raise OSError, "Index path %r does not exist." % self.pathname 1705 1706 self.reader = IndexReader(self.pathname) 1707 return self.reader 1708 1709 def merge(self): 1710 1711 "Merge/optimise index partitions." 1712 1713 self.merge_terms() 1714 self.merge_fields() 1715 1716 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1717 1718 """ 1719 Merge term dictionaries using the given indexing 'interval' and 1720 'doc_interval'. 1721 """ 1722 1723 readers = [] 1724 partitions = set() 1725 1726 for filename in listdir(self.pathname): 1727 if filename.startswith("terms-"): # 6 character prefix 1728 partition = filename[6:] 1729 readers.append(get_term_reader(self.pathname, partition)) 1730 partitions.add(partition) 1731 1732 # Write directly to a dictionary. 1733 1734 if len(readers) > 1: 1735 if "merged" in partitions: 1736 rename_term_files(self.pathname, "merged", "old-merged") 1737 partitions.remove("merged") 1738 partitions.add("old-merged") 1739 1740 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1741 merger = TermDictionaryMerger(writer, readers) 1742 merger.merge() 1743 merger.close() 1744 1745 # Remove old files. 1746 1747 for partition in partitions: 1748 remove_term_files(self.pathname, partition) 1749 1750 elif len(readers) == 1: 1751 partition = list(partitions)[0] 1752 if partition != "merged": 1753 rename_term_files(self.pathname, partition, "merged") 1754 1755 def merge_fields(self, interval=FIELD_INTERVAL): 1756 1757 "Merge field dictionaries using the given indexing 'interval'." 1758 1759 readers = [] 1760 partitions = set() 1761 1762 for filename in listdir(self.pathname): 1763 if filename.startswith("fields-"): # 7 character prefix 1764 partition = filename[7:] 1765 readers.append(get_field_reader(self.pathname, partition)) 1766 partitions.add(partition) 1767 1768 # Write directly to a dictionary. 1769 1770 if len(readers) > 1: 1771 if "merged" in partitions: 1772 rename_field_files(self.pathname, "merged", "old-merged") 1773 partitions.remove("merged") 1774 partitions.add("old-merged") 1775 1776 writer = get_field_writer(self.pathname, "merged", interval) 1777 merger = FieldDictionaryMerger(writer, readers) 1778 merger.merge() 1779 merger.close() 1780 1781 # Remove old files. 1782 1783 for partition in partitions: 1784 remove_field_files(self.pathname, partition) 1785 1786 elif len(readers) == 1: 1787 partition = list(partitions)[0] 1788 if partition != "merged": 1789 rename_field_files(self.pathname, partition, "merged") 1790 1791 def close(self): 1792 if self.reader is not None: 1793 self.reader.close() 1794 self.reader = None 1795 if self.writer is not None: 1796 self.writer.close() 1797 self.writer = None 1798 1799 # vim: tabstop=4 expandtab shiftwidth=4