1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 def vint(number): 51 52 "Write 'number' as a variable-length integer." 53 54 if number >= 0: 55 56 # Special case: one byte containing a 7-bit number. 57 58 if number < 128: 59 return chr(number) 60 61 # Write the number from least to most significant digits. 62 63 bytes = [] 64 65 while number != 0: 66 lsd = number & 127 67 number = number >> 7 68 if number != 0: 69 lsd |= 128 70 bytes.append(chr(lsd)) 71 72 return "".join(bytes) 73 74 # Negative numbers are not supported. 75 76 else: 77 raise ValueError, "Number %r is negative." % number 78 79 # Foundation classes. 80 81 class File: 82 83 "A basic file abstraction." 84 85 def __init__(self, f): 86 self.f = f 87 self.reset() 88 89 def reset(self): 90 91 "To be used to reset the state of the reader or writer between records." 92 93 pass 94 95 def rewind(self): 96 self.f.seek(0) 97 self.reset() 98 99 def close(self): 100 if self.f is not None: 101 self.f.close() 102 self.f = None 103 104 class FileWriter(File): 105 106 "Writing basic data types to files." 107 108 def write_number(self, number): 109 110 "Write 'number' to the file using a variable length encoding." 111 112 self.f.write(vint(number)) 113 114 def write_string(self, s, compress=0): 115 116 """ 117 Write 's' to the file, recording its length and compressing the string 118 if 'compress' is set to a true value. 119 """ 120 121 # Convert Unicode objects to strings. 122 123 if isinstance(s, unicode): 124 s = s.encode("utf-8") 125 126 # Compress the string if requested. 127 128 if compress: 129 for flag, fn in compressors: 130 cs = fn(s) 131 132 # Take the first string shorter than the original. 133 134 if len(cs) < len(s): 135 s = cs 136 break 137 else: 138 flag = "-" 139 140 else: 141 flag = "" 142 143 # Write the length of the data before the data itself. 144 145 length = len(s) 146 self.f.write(flag + vint(length) + s) 147 148 class FileReader(File): 149 150 "Reading basic data types from files." 151 152 def read_number(self): 153 154 "Read a number from the file." 155 156 # Read each byte, adding it to the number. 157 158 shift = 0 159 number = 0 160 read = self.f.read 161 162 try: 163 csd = ord(read(1)) 164 while csd & 128: 165 number += ((csd & 127) << shift) 166 shift += 7 167 csd = ord(read(1)) 168 else: 169 number += (csd << shift) 170 except TypeError: 171 raise EOFError 172 173 return number 174 175 def read_string(self, decompress=0): 176 177 """ 178 Read a string from the file, decompressing the stored data if 179 'decompress' is set to a true value. 180 """ 181 182 # Decompress the data if requested. 183 184 if decompress: 185 flag = self.f.read(1) 186 else: 187 flag = "-" 188 189 length = self.read_number() 190 s = self.f.read(length) 191 192 # Perform decompression if applicable. 193 194 if flag != "-": 195 fn = decompressors[flag] 196 s = fn(s) 197 198 # Convert strings to Unicode objects. 199 200 return unicode(s, "utf-8") 201 202 class FileOpener: 203 204 "Opening files using their filenames." 205 206 def __init__(self, filename): 207 self.filename = filename 208 209 def open(self, mode): 210 return open(self.filename, mode) 211 212 def close(self): 213 pass 214 215 # Specific classes for storing term and position information. 216 217 class PositionWriter(FileWriter): 218 219 "Writing position information to files." 220 221 def reset(self): 222 self.last_docnum = 0 223 224 def write_positions(self, docnum, positions): 225 226 """ 227 Write for the document 'docnum' the given 'positions'. 228 Return the offset of the written record. 229 """ 230 231 if docnum < self.last_docnum: 232 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 233 234 # Record the offset of this record. 235 236 offset = self.f.tell() 237 238 # Make sure that the positions are sorted. 239 240 positions.sort() 241 242 # Write the position deltas. 243 244 output = [] 245 last = 0 246 247 for position in positions: 248 output.append(vint(position - last)) 249 last = position 250 251 # Write the document number delta. 252 # Write the number of positions. 253 # Then write the positions. 254 255 self.f.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 256 257 self.last_docnum = docnum 258 return offset 259 260 class PositionOpener(FileOpener): 261 262 "Reading position information from files." 263 264 def read_term_positions(self, offset, count): 265 266 """ 267 Read all positions from 'offset', seeking to that position in the file 268 before reading. The number of documents available for reading is limited 269 to 'count'. 270 """ 271 272 # Duplicate the file handle. 273 274 f = self.open("rb") 275 f.seek(offset) 276 return PositionIterator(f, count) 277 278 class PositionIndexWriter(FileWriter): 279 280 "Writing position index information to files." 281 282 def reset(self): 283 self.last_docnum = 0 284 self.last_pos_offset = 0 285 286 def write_positions(self, docnum, pos_offset, count): 287 288 """ 289 Write the given 'docnum, 'pos_offset' and document 'count' to the 290 position index file. 291 """ 292 293 # Record the offset of this record. 294 295 offset = self.f.tell() 296 output = [] 297 298 # Write the document number delta. 299 300 output.append(vint(docnum - self.last_docnum)) 301 self.last_docnum = docnum 302 303 # Write the position file offset delta. 304 305 output.append(vint(pos_offset - self.last_pos_offset)) 306 self.last_pos_offset = pos_offset 307 308 # Write the document count. 309 310 output.append(vint(count)) 311 312 # Actually write the data. 313 314 self.f.write("".join(output)) 315 316 return offset 317 318 class PositionIndexOpener(FileOpener): 319 320 "Reading position index information from files." 321 322 def read_term_positions(self, offset, doc_frequency): 323 324 """ 325 Read all positions from 'offset', seeking to that position in the file 326 before reading. The number of documents available for reading is limited 327 to 'doc_frequency'. 328 """ 329 330 # Duplicate the file handle. 331 332 f = self.open("rb") 333 f.seek(offset) 334 return PositionIndexIterator(f, doc_frequency) 335 336 # Iterators for position-related files. 337 338 class IteratorBase: 339 340 def __init__(self, count): 341 self.replenish(count) 342 343 def replenish(self, count): 344 self.count = count 345 self.read_documents = 0 346 347 def __len__(self): 348 return self.count 349 350 def sort(self): 351 pass # Stored document positions are already sorted. 352 353 def __iter__(self): 354 return self 355 356 class PositionIterator(FileReader, IteratorBase): 357 358 "Iterating over document positions." 359 360 def __init__(self, f, count): 361 FileReader.__init__(self, f) 362 IteratorBase.__init__(self, count) 363 364 def reset(self): 365 self.last_docnum = 0 366 367 def read_positions(self): 368 369 "Read positions, returning a document number and a list of positions." 370 371 # Read the document number delta and add it to the last number. 372 373 self.last_docnum += self.read_number() 374 375 # Read the number of positions. 376 377 npositions = self.read_number() 378 379 # Read the position deltas, adding each previous position to get the 380 # appropriate collection of absolute positions. 381 382 i = 0 383 last = 0 384 positions = [] 385 386 while i < npositions: 387 last += self.read_number() 388 positions.append(last) 389 i += 1 390 391 return self.last_docnum, positions 392 393 def next(self): 394 395 "Read positions for a single document." 396 397 if self.read_documents < self.count: 398 self.read_documents += 1 399 return self.read_positions() 400 else: 401 raise StopIteration 402 403 class PositionIndexIterator(FileReader, IteratorBase): 404 405 "Iterating over document positions." 406 407 def __init__(self, f, count): 408 FileReader.__init__(self, f) 409 IteratorBase.__init__(self, count) 410 self.section_count = 0 411 412 def reset(self): 413 self.last_docnum = 0 414 self.last_pos_offset = 0 415 416 def read_positions(self): 417 418 """ 419 Read a document number, a position file offset for the position index 420 file, and the number of documents in a section of that file. 421 """ 422 423 # Read the document number delta. 424 425 self.last_docnum += self.read_number() 426 427 # Read the offset delta. 428 429 self.last_pos_offset += self.read_number() 430 431 # Read the document count. 432 433 count = self.read_number() 434 435 return self.last_docnum, self.last_pos_offset, count 436 437 def next(self): 438 439 "Read positions for a single document." 440 441 self.read_documents += self.section_count 442 if self.read_documents < self.count: 443 docnum, pos_offset, self.section_count = t = self.read_positions() 444 return t 445 else: 446 raise StopIteration 447 448 class PositionDictionaryWriter: 449 450 "Writing position dictionaries." 451 452 def __init__(self, position_writer, position_index_writer, interval): 453 self.position_writer = position_writer 454 self.position_index_writer = position_index_writer 455 self.interval = interval 456 457 def write_term_positions(self, doc_positions): 458 459 """ 460 Write all 'doc_positions' - a collection of tuples of the form (document 461 number, position list) - to the file. 462 463 Add some records to the index, making dictionary entries. 464 465 Return a tuple containing the offset of the written data, the frequency 466 (number of positions), and document frequency (number of documents) for 467 the term involved. 468 """ 469 470 # Reset the writers. 471 472 self.position_writer.reset() 473 self.position_index_writer.reset() 474 475 index_offset = None 476 477 # Write the positions. 478 479 frequency = 0 480 first_docnum = None 481 first_offset = None 482 count = 0 483 484 doc_positions.sort() 485 486 for docnum, positions in doc_positions: 487 pos_offset = self.position_writer.write_positions(docnum, positions) 488 489 # Retain the first record offset for a subsequent index entry. 490 491 if first_offset is None: 492 first_offset = pos_offset 493 first_docnum = docnum 494 495 frequency += len(positions) 496 count += 1 497 498 # Every {interval} entries, write an index entry. 499 500 if count % self.interval == 0: 501 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 502 503 # Remember the first index entry offset. 504 505 if index_offset is None: 506 index_offset = io 507 508 first_offset = None 509 first_docnum = None 510 511 # Reset the position writer so that position readers accessing 512 # a section start with the correct document number. 513 514 self.position_writer.reset() 515 516 # Finish writing an index entry for the remaining documents. 517 518 else: 519 if first_offset is not None: 520 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 521 522 # Remember the first index entry offset. 523 524 if index_offset is None: 525 index_offset = io 526 527 return index_offset, frequency, count 528 529 def close(self): 530 self.position_writer.close() 531 self.position_index_writer.close() 532 533 class PositionDictionaryReader: 534 535 "Reading position dictionaries." 536 537 def __init__(self, position_opener, position_index_opener): 538 self.position_opener = position_opener 539 self.position_index_opener = position_index_opener 540 541 def read_term_positions(self, offset, doc_frequency): 542 543 """ 544 Return an iterator for dictionary entries starting at 'offset' with the 545 given 'doc_frequency'. 546 """ 547 548 return PositionDictionaryIterator(self.position_opener, 549 self.position_index_opener, offset, doc_frequency) 550 551 def close(self): 552 pass 553 554 class PositionDictionaryIterator: 555 556 "Iteration over position dictionary entries." 557 558 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 559 self.position_opener = position_opener 560 self.doc_frequency = doc_frequency 561 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 562 self.iterator = None 563 564 # Remember the last values. 565 566 self.found_docnum, self.found_positions = None, None 567 568 # Maintain state for the next index entry, if read. 569 570 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 571 572 # Initialise the current index entry and current position file iterator. 573 574 self._next_section() 575 self._init_section() 576 577 # Sequence methods. 578 579 def __len__(self): 580 return self.doc_frequency 581 582 def sort(self): 583 pass 584 585 # Iterator methods. 586 587 def __iter__(self): 588 return self 589 590 def next(self): 591 592 """ 593 Attempt to get the next document record from the section in the 594 positions file. 595 """ 596 597 # Return any visited but unrequested record. 598 599 if self.found_docnum is not None: 600 t = self.found_docnum, self.found_positions 601 self.found_docnum, self.found_positions = None, None 602 return t 603 604 # Or search for the next record. 605 606 while 1: 607 608 # Either return the next record. 609 610 try: 611 return self.iterator.next() 612 613 # Or, where a section is finished, get the next section and try again. 614 615 except StopIteration: 616 617 # Where a section follows, update the index iterator, but keep 618 # reading using the same file iterator (since the data should 619 # just follow on from the last section). 620 621 self._next_section() 622 self.iterator.replenish(self.section_count) 623 624 # Reset the state of the iterator to make sure that document 625 # numbers are correct. 626 627 self.iterator.reset() 628 629 def from_document(self, docnum): 630 631 """ 632 Attempt to navigate to a positions entry for the given 'docnum', 633 returning the positions for 'docnum', or None otherwise. 634 """ 635 636 # Return any unrequested document positions. 637 638 if docnum == self.found_docnum: 639 return self.found_positions 640 641 # Read ahead in the index until the next entry refers to a document 642 # later than the desired document. 643 644 try: 645 if self.next_docnum is None: 646 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 647 648 # Read until the next entry is after the desired document number, 649 # or until the end of the results. 650 651 while self.next_docnum <= docnum: 652 self._next_read_section() 653 if self.docnum < docnum: 654 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 655 else: 656 break 657 658 except StopIteration: 659 pass 660 661 # Navigate in the position file to the document. 662 663 self._init_section() 664 665 try: 666 while 1: 667 found_docnum, found_positions = self.iterator.next() 668 669 # Return the desired document positions or None (retaining the 670 # positions for the document immediately after). 671 672 if docnum == found_docnum: 673 return found_positions 674 elif docnum < found_docnum: 675 self.found_docnum, self.found_positions = found_docnum, found_positions 676 return None 677 678 except StopIteration: 679 return None 680 681 # Internal methods. 682 683 def _next_section(self): 684 685 "Attempt to get the next section in the index." 686 687 if self.next_docnum is None: 688 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 689 else: 690 self._next_read_section() 691 692 def _next_read_section(self): 693 694 """ 695 Make the next index entry the current one without reading from the 696 index. 697 """ 698 699 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 700 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 701 702 def _init_section(self): 703 704 "Initialise the iterator for the section in the position file." 705 706 if self.iterator is not None: 707 self.iterator.close() 708 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 709 710 def close(self): 711 if self.iterator is not None: 712 self.iterator.close() 713 self.iterator = None 714 if self.index_iterator is not None: 715 self.index_iterator.close() 716 self.index_iterator = None 717 718 class TermWriter(FileWriter): 719 720 "Writing term information to files." 721 722 def reset(self): 723 self.last_term = "" 724 self.last_offset = 0 725 726 def write_term(self, term, offset, frequency, doc_frequency): 727 728 """ 729 Write the given 'term', its position file 'offset', its 'frequency' and 730 its 'doc_frequency' (number of documents in which it appears) to the 731 term information file. Return the offset after the term information was 732 written to the file. 733 """ 734 735 # Write the prefix length and term suffix. 736 737 common = len(commonprefix([self.last_term, term])) 738 suffix = term[common:] 739 740 self.write_number(common) 741 self.write_string(suffix) 742 743 # Write the offset delta. 744 745 self.write_number(offset - self.last_offset) 746 747 # Write the frequency. 748 749 self.write_number(frequency) 750 751 # Write the document frequency. 752 753 self.write_number(doc_frequency) 754 755 self.last_term = term 756 self.last_offset = offset 757 758 return self.f.tell() 759 760 class TermReader(FileReader): 761 762 "Reading term information from files." 763 764 def reset(self): 765 self.last_term = "" 766 self.last_offset = 0 767 768 def read_term(self): 769 770 """ 771 Read a term, its position file offset, its frequency and its document 772 frequency from the term information file. 773 """ 774 775 # Read the prefix length and term suffix. 776 777 common = self.read_number() 778 suffix = self.read_string() 779 780 self.last_term = self.last_term[:common] + suffix 781 782 # Read the offset delta. 783 784 self.last_offset += self.read_number() 785 786 # Read the frequency. 787 788 frequency = self.read_number() 789 790 # Read the document frequency. 791 792 doc_frequency = self.read_number() 793 794 return self.last_term, self.last_offset, frequency, doc_frequency 795 796 def go_to_term(self, term, offset, info_offset): 797 798 """ 799 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 800 permits the scanning for later terms from the specified term. 801 """ 802 803 self.f.seek(info_offset) 804 self.last_term = term 805 self.last_offset = offset 806 807 class TermIndexWriter(TermWriter): 808 809 "Writing term dictionary index details to files." 810 811 def reset(self): 812 TermWriter.reset(self) 813 self.last_info_offset = 0 814 815 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 816 817 """ 818 Write the given 'term', its position file 'offset', its 'frequency' and 819 its 'doc_frequency' to the term dictionary index file, along with the 820 'info_offset' in the term information file. 821 """ 822 823 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 824 825 # Write the information file offset delta. 826 827 self.write_number(info_offset - self.last_info_offset) 828 self.last_info_offset = info_offset 829 830 class TermIndexReader(TermReader): 831 832 "Reading term dictionary index details from files." 833 834 def reset(self): 835 TermReader.reset(self) 836 self.last_info_offset = 0 837 838 def read_term(self): 839 840 """ 841 Read a term, its position file offset, its frequency, its document 842 frequency and a term information file offset from the term dictionary 843 index file. 844 """ 845 846 term, offset, frequency, doc_frequency = TermReader.read_term(self) 847 848 # Read the offset delta. 849 850 self.last_info_offset += self.read_number() 851 852 return term, offset, frequency, doc_frequency, self.last_info_offset 853 854 class TermDictionaryWriter: 855 856 "Writing term dictionaries." 857 858 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 859 self.info_writer = info_writer 860 self.index_writer = index_writer 861 self.position_dict_writer = position_dict_writer 862 self.interval = interval 863 self.entry = 0 864 865 def _write_term(self, term, offset, frequency, doc_frequency): 866 867 """ 868 Write the given 'term', its position file 'offset', its 'frequency' and 869 its 'doc_frequency' (number of documents in which it appears) to the 870 term information file. Return the offset after the term information was 871 written to the file. 872 """ 873 874 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 875 876 if self.entry % self.interval == 0: 877 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 878 879 self.entry += 1 880 881 def write_term_positions(self, term, doc_positions): 882 883 """ 884 Write the given 'term' and the 'doc_positions' recording the documents 885 and positions at which the term is found. 886 """ 887 888 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 889 self._write_term(term, offset, frequency, doc_frequency) 890 891 def close(self): 892 self.info_writer.close() 893 self.index_writer.close() 894 self.position_dict_writer.close() 895 896 class TermDictionaryReader: 897 898 "Reading term dictionaries." 899 900 def __init__(self, info_reader, index_reader, position_dict_reader): 901 self.info_reader = info_reader 902 self.index_reader = index_reader 903 self.position_dict_reader = position_dict_reader 904 905 self.terms = [] 906 try: 907 while 1: 908 self.terms.append(self.index_reader.read_term()) 909 except EOFError: 910 pass 911 912 # Large numbers for ordering purposes. 913 914 if self.terms: 915 self.max_offset = self.terms[-1][1] + 1 916 else: 917 self.max_offset = None 918 919 def _find_closest_entry(self, term): 920 921 """ 922 Find the offsets and frequencies of 'term' from the term dictionary or 923 the closest term starting with the value of 'term'. 924 925 Return the closest index entry consisting of a term, the position file 926 offset, the term frequency, the document frequency, and the term details 927 file offset. 928 """ 929 930 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 931 932 # Get the entry position providing the term or one preceding it. 933 # If no entry precedes the requested term, return the very first entry 934 # as the closest. 935 936 if i == -1: 937 return self.terms[0] 938 else: 939 return self.terms[i] 940 941 def _find_closest_term(self, term): 942 943 """ 944 Find the offsets and frequencies of 'term' from the term dictionary or 945 the closest term starting with the value of 'term'. 946 947 Return the closest term (or the term itself), the position file offset, 948 the term frequency, the document frequency, and the term details file 949 offset (or None if the reader is already positioned). 950 """ 951 952 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 953 954 # Where the term is found immediately, return the offset and 955 # frequencies. If the term does not appear, return the details of the 956 # closest entry. 957 958 if term <= found_term: 959 return found_term, offset, frequency, doc_frequency, info_offset 960 961 # Otherwise, seek past the index term's entry in the information file 962 # and scan for the desired term. 963 964 else: 965 self.info_reader.go_to_term(found_term, offset, info_offset) 966 try: 967 while term > found_term: 968 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 969 except EOFError: 970 pass 971 972 return found_term, offset, frequency, doc_frequency, None 973 974 def _find_term(self, term): 975 976 """ 977 Find the position file offset and frequency of 'term' from the term 978 dictionary. 979 """ 980 981 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 982 983 # If the term is found, return the offset and frequencies. 984 985 if term == found_term: 986 return offset, frequency, doc_frequency 987 else: 988 return None 989 990 def _get_positions(self, offset, doc_frequency): 991 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 992 993 # Iterator convenience methods. 994 995 def __iter__(self): 996 self.rewind() 997 return self 998 999 def next(self): 1000 try: 1001 return self.read_term() 1002 except EOFError: 1003 raise StopIteration 1004 1005 # Sequential access methods. 1006 1007 def rewind(self): 1008 self.info_reader.rewind() 1009 1010 def read_term(self): 1011 1012 """ 1013 Return the next term, its frequency, its document frequency, and the 1014 documents and positions at which the term is found. 1015 """ 1016 1017 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1018 positions = self._get_positions(offset, doc_frequency) 1019 return term, frequency, doc_frequency, positions 1020 1021 # Query methods. 1022 1023 def find_terms(self, term): 1024 1025 "Return all terms whose values start with the value of 'term'." 1026 1027 terms = [] 1028 1029 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1030 1031 # Position the reader, if necessary. 1032 1033 if info_offset is not None: 1034 self.info_reader.go_to_term(found_term, offset, info_offset) 1035 1036 # Read and record terms. 1037 1038 try: 1039 # Add the found term if it starts with the specified term. 1040 1041 while found_term.startswith(term): 1042 terms.append(found_term) 1043 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1044 1045 except EOFError: 1046 pass 1047 1048 return terms 1049 1050 def find_positions(self, term): 1051 1052 "Return the documents and positions at which the given 'term' is found." 1053 1054 t = self._find_term(term) 1055 if t is None: 1056 return None 1057 else: 1058 offset, frequency, doc_frequency = t 1059 return self._get_positions(offset, doc_frequency) 1060 1061 def get_frequency(self, term): 1062 1063 "Return the frequency of the given 'term'." 1064 1065 t = self._find_term(term) 1066 if t is None: 1067 return None 1068 else: 1069 offset, frequency, doc_frequency = t 1070 return frequency 1071 1072 def get_document_frequency(self, term): 1073 1074 "Return the document frequency of the given 'term'." 1075 1076 t = self._find_term(term) 1077 if t is None: 1078 return None 1079 else: 1080 offset, frequency, doc_frequency = t 1081 return doc_frequency 1082 1083 def close(self): 1084 self.info_reader.close() 1085 self.index_reader.close() 1086 self.position_dict_reader.close() 1087 1088 # Specific classes for storing document information. 1089 1090 class FieldWriter(FileWriter): 1091 1092 "Writing field data to files." 1093 1094 def reset(self): 1095 self.last_docnum = 0 1096 1097 def write_fields(self, docnum, fields): 1098 1099 """ 1100 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1101 representing field identifiers and values respectively). 1102 Return the offset at which the fields are stored. 1103 """ 1104 1105 offset = self.f.tell() 1106 1107 # Write the document number delta. 1108 1109 self.write_number(docnum - self.last_docnum) 1110 1111 # Write the number of fields. 1112 1113 self.write_number(len(fields)) 1114 1115 # Write the fields themselves. 1116 1117 for i, field in fields: 1118 self.write_number(i) 1119 self.write_string(field, 1) # compress 1120 1121 self.last_docnum = docnum 1122 return offset 1123 1124 class FieldReader(FileReader): 1125 1126 "Reading field data from files." 1127 1128 def reset(self): 1129 self.last_docnum = 0 1130 1131 def read_fields(self): 1132 1133 """ 1134 Read fields from the file, returning a tuple containing the document 1135 number and a list of field (identifier, value) pairs. 1136 """ 1137 1138 # Read the document number. 1139 1140 self.last_docnum += self.read_number() 1141 1142 # Read the number of fields. 1143 1144 nfields = self.read_number() 1145 1146 # Collect the fields. 1147 1148 fields = [] 1149 i = 0 1150 1151 while i < nfields: 1152 identifier = self.read_number() 1153 value = self.read_string(1) # decompress 1154 fields.append((identifier, value)) 1155 i += 1 1156 1157 return self.last_docnum, fields 1158 1159 def read_document_fields(self, docnum, offset): 1160 1161 """ 1162 Read fields for 'docnum' at the given 'offset'. This permits the 1163 retrieval of details for the specified document, as well as scanning for 1164 later documents. 1165 """ 1166 1167 self.f.seek(offset) 1168 bad_docnum, fields = self.read_fields() 1169 self.last_docnum = docnum 1170 return docnum, fields 1171 1172 class FieldIndexWriter(FileWriter): 1173 1174 "Writing field index details to files." 1175 1176 def reset(self): 1177 self.last_docnum = 0 1178 self.last_offset = 0 1179 1180 def write_document(self, docnum, offset): 1181 1182 """ 1183 Write for the given 'docnum', the 'offset' at which the fields for the 1184 document are stored in the fields file. 1185 """ 1186 1187 # Write the document number and offset deltas. 1188 1189 self.write_number(docnum - self.last_docnum) 1190 self.write_number(offset - self.last_offset) 1191 1192 self.last_docnum = docnum 1193 self.last_offset = offset 1194 1195 class FieldIndexReader(FileReader): 1196 1197 "Reading field index details from files." 1198 1199 def reset(self): 1200 self.last_docnum = 0 1201 self.last_offset = 0 1202 1203 def read_document(self): 1204 1205 "Read a document number and field file offset." 1206 1207 # Read the document number delta and offset. 1208 1209 self.last_docnum += self.read_number() 1210 self.last_offset += self.read_number() 1211 1212 return self.last_docnum, self.last_offset 1213 1214 class FieldDictionaryWriter: 1215 1216 "Writing field dictionary details." 1217 1218 def __init__(self, field_writer, field_index_writer, interval): 1219 self.field_writer = field_writer 1220 self.field_index_writer = field_index_writer 1221 self.interval = interval 1222 self.entry = 0 1223 1224 def write_fields(self, docnum, fields): 1225 1226 "Write details of the document with the given 'docnum' and 'fields'." 1227 1228 offset = self.field_writer.write_fields(docnum, fields) 1229 1230 if self.entry % self.interval == 0: 1231 self.field_index_writer.write_document(docnum, offset) 1232 1233 self.entry += 1 1234 1235 def close(self): 1236 self.field_writer.close() 1237 self.field_index_writer.close() 1238 1239 class FieldDictionaryReader: 1240 1241 "Reading field dictionary details." 1242 1243 def __init__(self, field_reader, field_index_reader): 1244 self.field_reader = field_reader 1245 self.field_index_reader = field_index_reader 1246 1247 self.docs = [] 1248 try: 1249 while 1: 1250 self.docs.append(self.field_index_reader.read_document()) 1251 except EOFError: 1252 pass 1253 1254 # Large numbers for ordering purposes. 1255 1256 if self.docs: 1257 self.max_offset = self.docs[-1][1] 1258 else: 1259 self.max_offset = None 1260 1261 # Iterator convenience methods. 1262 1263 def __iter__(self): 1264 self.rewind() 1265 return self 1266 1267 def next(self): 1268 try: 1269 return self.read_fields() 1270 except EOFError: 1271 raise StopIteration 1272 1273 # Sequential access methods. 1274 1275 def rewind(self): 1276 self.field_reader.rewind() 1277 1278 def read_fields(self): 1279 1280 "Return the next document number and fields." 1281 1282 return self.field_reader.read_fields() 1283 1284 # Random access methods. 1285 1286 def get_fields(self, docnum): 1287 1288 "Read the fields of the document with the given 'docnum'." 1289 1290 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1291 1292 # Get the entry position providing the term or one preceding it. 1293 1294 if i == -1: 1295 return None 1296 1297 found_docnum, offset = self.docs[i] 1298 1299 # Read from the fields file. 1300 1301 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1302 1303 # Scan for the document, if necessary. 1304 1305 try: 1306 while docnum > found_docnum: 1307 found_docnum, fields = self.field_reader.read_fields() 1308 except EOFError: 1309 pass 1310 1311 # If the document is found, return the fields. 1312 1313 if docnum == found_docnum: 1314 return fields 1315 else: 1316 return None 1317 1318 def close(self): 1319 self.field_reader.close() 1320 self.field_index_reader.close() 1321 1322 # Dictionary merging classes. 1323 1324 class Merger: 1325 1326 "Merge files." 1327 1328 def __init__(self, writer, readers): 1329 self.writer = writer 1330 self.readers = readers 1331 1332 def close(self): 1333 for reader in self.readers: 1334 reader.close() 1335 self.writer.close() 1336 1337 class TermDictionaryMerger(Merger): 1338 1339 "Merge term and position files." 1340 1341 def merge(self): 1342 1343 """ 1344 Merge terms and positions from the readers, sending them to the writer. 1345 """ 1346 1347 last_term = None 1348 current_readers = [] 1349 1350 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1351 if term == last_term: 1352 current_readers.append(positions) 1353 else: 1354 if current_readers: 1355 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1356 last_term = term 1357 current_readers = [positions] 1358 else: 1359 if current_readers: 1360 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1361 1362 class FieldDictionaryMerger(Merger): 1363 1364 "Merge field files." 1365 1366 def merge(self): 1367 1368 """ 1369 Merge fields from the readers, sending them to the writer. 1370 """ 1371 1372 for docnum, fields in itermerge(self.readers): 1373 self.writer.write_fields(docnum, fields) 1374 1375 # Utility functions. 1376 1377 def get_term_writer(pathname, partition, interval, doc_interval): 1378 1379 """ 1380 Return a term dictionary writer using files under the given 'pathname' 1381 labelled according to the given 'partition', using the given indexing 1382 'interval' for terms and 'doc_interval' for document position records. 1383 """ 1384 1385 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1386 info_writer = TermWriter(tdf) 1387 1388 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1389 index_writer = TermIndexWriter(tdif) 1390 1391 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1392 positions_writer = PositionWriter(tpf) 1393 1394 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1395 positions_index_writer = PositionIndexWriter(tpif) 1396 1397 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1398 1399 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1400 1401 def get_field_writer(pathname, partition, interval): 1402 1403 """ 1404 Return a field dictionary writer using files under the given 'pathname' 1405 labelled according to the given 'partition', using the given indexing 1406 'interval'. 1407 """ 1408 1409 ff = open(join(pathname, "fields-%s" % partition), "wb") 1410 field_writer = FieldWriter(ff) 1411 1412 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1413 field_index_writer = FieldIndexWriter(fif) 1414 1415 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1416 1417 def get_term_reader(pathname, partition): 1418 1419 """ 1420 Return a term dictionary reader using files under the given 'pathname' 1421 labelled according to the given 'partition'. 1422 """ 1423 1424 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1425 info_reader = TermReader(tdf) 1426 1427 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1428 index_reader = TermIndexReader(tdif) 1429 1430 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1431 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1432 1433 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1434 1435 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1436 1437 def get_field_reader(pathname, partition): 1438 1439 """ 1440 Return a field dictionary reader using files under the given 'pathname' 1441 labelled according to the given 'partition'. 1442 """ 1443 1444 ff = open(join(pathname, "fields-%s" % partition), "rb") 1445 field_reader = FieldReader(ff) 1446 1447 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1448 field_index_reader = FieldIndexReader(fif) 1449 1450 return FieldDictionaryReader(field_reader, field_index_reader) 1451 1452 def rename_files(pathname, names, from_partition, to_partition): 1453 for name in names: 1454 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1455 1456 def rename_term_files(pathname, from_partition, to_partition): 1457 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1458 1459 def rename_field_files(pathname, from_partition, to_partition): 1460 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1461 1462 def remove_files(pathname, names, partition): 1463 for name in names: 1464 remove(join(pathname, "%s-%s" % (name, partition))) 1465 1466 def remove_term_files(pathname, partition): 1467 remove_files(pathname, TERM_FILENAMES, partition) 1468 1469 def remove_field_files(pathname, partition): 1470 remove_files(pathname, FIELD_FILENAMES, partition) 1471 1472 # High-level classes. 1473 1474 class Document: 1475 1476 "A container of document information." 1477 1478 def __init__(self, docnum): 1479 self.docnum = docnum 1480 self.fields = [] 1481 self.terms = {} 1482 1483 def add_position(self, term, position): 1484 1485 """ 1486 Add a position entry for the given 'term', indicating the given 1487 'position'. 1488 """ 1489 1490 self.terms.setdefault(term, []).append(position) 1491 1492 def add_field(self, identifier, value): 1493 1494 "Add a field having the given 'identifier' and 'value'." 1495 1496 self.fields.append((identifier, unicode(value))) # convert to string 1497 1498 def set_fields(self, fields): 1499 1500 """ 1501 Set the document's 'fields': a list of tuples each containing an integer 1502 identifier and a string value. 1503 """ 1504 1505 self.fields = fields 1506 1507 class IndexWriter: 1508 1509 """ 1510 Building term information and writing it to the term and field dictionaries. 1511 """ 1512 1513 def __init__(self, pathname, interval, doc_interval, flush_interval): 1514 self.pathname = pathname 1515 self.interval = interval 1516 self.doc_interval = doc_interval 1517 self.flush_interval = flush_interval 1518 1519 self.dict_partition = 0 1520 self.field_dict_partition = 0 1521 1522 self.terms = {} 1523 self.docs = {} 1524 1525 self.doc_counter = 0 1526 1527 def add_document(self, doc): 1528 1529 """ 1530 Add the given document 'doc', updating the document counter and flushing 1531 terms and fields if appropriate. 1532 """ 1533 1534 for term, positions in doc.terms.items(): 1535 self.terms.setdefault(term, {})[doc.docnum] = positions 1536 1537 self.docs[doc.docnum] = doc.fields 1538 1539 self.doc_counter += 1 1540 if self.flush_interval and self.doc_counter >= self.flush_interval: 1541 self.flush_terms() 1542 self.flush_fields() 1543 self.doc_counter = 0 1544 1545 def get_term_writer(self): 1546 1547 "Return a term dictionary writer for the current partition." 1548 1549 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1550 1551 def get_field_writer(self): 1552 1553 "Return a field dictionary writer for the current partition." 1554 1555 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1556 1557 def flush_terms(self): 1558 1559 "Flush terms into the current term dictionary partition." 1560 1561 # Get the terms in order. 1562 1563 all_terms = self.terms 1564 terms = all_terms.keys() 1565 terms.sort() 1566 1567 dict_writer = self.get_term_writer() 1568 1569 for term in terms: 1570 doc_positions = all_terms[term].items() 1571 dict_writer.write_term_positions(term, doc_positions) 1572 1573 dict_writer.close() 1574 1575 self.terms = {} 1576 self.dict_partition += 1 1577 1578 def flush_fields(self): 1579 1580 "Flush fields into the current term dictionary partition." 1581 1582 # Get the documents in order. 1583 1584 docs = self.docs.items() 1585 docs.sort() 1586 1587 field_dict_writer = self.get_field_writer() 1588 1589 for docnum, fields in docs: 1590 field_dict_writer.write_fields(docnum, fields) 1591 1592 field_dict_writer.close() 1593 1594 self.docs = {} 1595 self.field_dict_partition += 1 1596 1597 def close(self): 1598 if self.terms: 1599 self.flush_terms() 1600 if self.docs: 1601 self.flush_fields() 1602 1603 class IndexReader: 1604 1605 "Accessing the term and field dictionaries." 1606 1607 def __init__(self, pathname): 1608 self.dict_reader = get_term_reader(pathname, "merged") 1609 self.field_dict_reader = get_field_reader(pathname, "merged") 1610 1611 def find_terms(self, term): 1612 return self.dict_reader.find_terms(term) 1613 1614 def find_positions(self, term): 1615 return self.dict_reader.find_positions(term) 1616 1617 def get_frequency(self, term): 1618 return self.dict_reader.get_frequency(term) 1619 1620 def get_document_frequency(self, term): 1621 return self.dict_reader.get_document_frequency(term) 1622 1623 def get_fields(self, docnum): 1624 return self.field_dict_reader.get_fields(docnum) 1625 1626 def close(self): 1627 self.dict_reader.close() 1628 self.field_dict_reader.close() 1629 1630 class Index: 1631 1632 "An inverted index solution encapsulating the various components." 1633 1634 def __init__(self, pathname): 1635 self.pathname = pathname 1636 self.reader = None 1637 self.writer = None 1638 1639 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1640 1641 """ 1642 Return a writer, optionally using the given indexing 'interval', 1643 'doc_interval' and 'flush_interval'. 1644 """ 1645 1646 if not exists(self.pathname): 1647 mkdir(self.pathname) 1648 1649 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1650 return self.writer 1651 1652 def get_reader(self, partition=0): 1653 1654 "Return a reader for the index." 1655 1656 # Ensure that only one partition exists. 1657 1658 self.merge() 1659 return self._get_reader(partition) 1660 1661 def _get_reader(self, partition): 1662 1663 "Return a reader for the index." 1664 1665 if not exists(self.pathname): 1666 raise OSError, "Index path %r does not exist." % self.pathname 1667 1668 self.reader = IndexReader(self.pathname) 1669 return self.reader 1670 1671 def merge(self): 1672 1673 "Merge/optimise index partitions." 1674 1675 self.merge_terms() 1676 self.merge_fields() 1677 1678 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1679 1680 """ 1681 Merge term dictionaries using the given indexing 'interval' and 1682 'doc_interval'. 1683 """ 1684 1685 readers = [] 1686 partitions = set() 1687 1688 for filename in listdir(self.pathname): 1689 if filename.startswith("terms-"): # 6 character prefix 1690 partition = filename[6:] 1691 readers.append(get_term_reader(self.pathname, partition)) 1692 partitions.add(partition) 1693 1694 # Write directly to a dictionary. 1695 1696 if len(readers) > 1: 1697 if "merged" in partitions: 1698 rename_term_files(self.pathname, "merged", "old-merged") 1699 partitions.remove("merged") 1700 partitions.add("old-merged") 1701 1702 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1703 merger = TermDictionaryMerger(writer, readers) 1704 merger.merge() 1705 merger.close() 1706 1707 # Remove old files. 1708 1709 for partition in partitions: 1710 remove_term_files(self.pathname, partition) 1711 1712 elif len(readers) == 1: 1713 partition = list(partitions)[0] 1714 if partition != "merged": 1715 rename_term_files(self.pathname, partition, "merged") 1716 1717 def merge_fields(self, interval=FIELD_INTERVAL): 1718 1719 "Merge field dictionaries using the given indexing 'interval'." 1720 1721 readers = [] 1722 partitions = set() 1723 1724 for filename in listdir(self.pathname): 1725 if filename.startswith("fields-"): # 7 character prefix 1726 partition = filename[7:] 1727 readers.append(get_field_reader(self.pathname, partition)) 1728 partitions.add(partition) 1729 1730 # Write directly to a dictionary. 1731 1732 if len(readers) > 1: 1733 if "merged" in partitions: 1734 rename_field_files(self.pathname, "merged", "old-merged") 1735 partitions.remove("merged") 1736 partitions.add("old-merged") 1737 1738 writer = get_field_writer(self.pathname, "merged", interval) 1739 merger = FieldDictionaryMerger(writer, readers) 1740 merger.merge() 1741 merger.close() 1742 1743 # Remove old files. 1744 1745 for partition in partitions: 1746 remove_field_files(self.pathname, partition) 1747 1748 elif len(readers) == 1: 1749 partition = list(partitions)[0] 1750 if partition != "merged": 1751 rename_field_files(self.pathname, partition, "merged") 1752 1753 def close(self): 1754 if self.reader is not None: 1755 self.reader.close() 1756 self.reader = None 1757 if self.writer is not None: 1758 self.writer.close() 1759 self.writer = None 1760 1761 # vim: tabstop=4 expandtab shiftwidth=4