1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing a 7-bit number. 87 88 elif number < 128: 89 self.f.write(chr(number)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 read = self.f.read 155 156 try: 157 csd = ord(read(1)) 158 while csd & 128: 159 number += ((csd & 127) << shift) 160 shift += 7 161 csd = ord(read(1)) 162 else: 163 number += (csd << shift) 164 except TypeError: 165 raise EOFError 166 167 return number 168 169 def read_string(self, decompress=0): 170 171 """ 172 Read a string from the file, decompressing the stored data if 173 'decompress' is set to a true value. 174 """ 175 176 # Decompress the data if requested. 177 178 if decompress: 179 flag = self.f.read(1) 180 else: 181 flag = "-" 182 183 length = self.read_number() 184 s = self.f.read(length) 185 186 # Perform decompression if applicable. 187 188 if flag != "-": 189 fn = decompressors[flag] 190 s = fn(s) 191 192 # Convert strings to Unicode objects. 193 194 return unicode(s, "utf-8") 195 196 class FileOpener: 197 198 "Opening files using their filenames." 199 200 def __init__(self, filename): 201 self.filename = filename 202 203 def open(self, mode): 204 return open(self.filename, mode) 205 206 def close(self): 207 pass 208 209 # Specific classes for storing term and position information. 210 211 class PositionWriter(FileWriter): 212 213 "Writing position information to files." 214 215 def reset(self): 216 self.last_docnum = 0 217 218 def write_positions(self, docnum, positions): 219 220 """ 221 Write for the document 'docnum' the given 'positions'. 222 Return the offset of the written record. 223 """ 224 225 if docnum < self.last_docnum: 226 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 227 228 # Record the offset of this record. 229 230 offset = self.f.tell() 231 232 # Write the document number delta. 233 234 self.write_number(docnum - self.last_docnum) 235 236 # Write the number of positions. 237 238 self.write_number(len(positions)) 239 240 # Make sure that the positions are sorted. 241 242 positions.sort() 243 244 # Write the position deltas. 245 246 last = 0 247 for position in positions: 248 pos = position - last 249 self.write_number(pos) 250 last = position 251 252 self.last_docnum = docnum 253 254 return offset 255 256 class PositionOpener(FileOpener): 257 258 "Reading position information from files." 259 260 def read_term_positions(self, offset, count): 261 262 """ 263 Read all positions from 'offset', seeking to that position in the file 264 before reading. The number of documents available for reading is limited 265 to 'count'. 266 """ 267 268 # Duplicate the file handle. 269 270 f = self.open("rb") 271 f.seek(offset) 272 return PositionIterator(f, count) 273 274 class PositionIndexWriter(FileWriter): 275 276 "Writing position index information to files." 277 278 def reset(self): 279 self.last_docnum = 0 280 self.last_pos_offset = 0 281 282 def write_positions(self, docnum, pos_offset, count): 283 284 """ 285 Write the given 'docnum, 'pos_offset' and document 'count' to the 286 position index file. 287 """ 288 289 # Record the offset of this record. 290 291 offset = self.f.tell() 292 293 # Write the document number delta. 294 295 self.write_number(docnum - self.last_docnum) 296 self.last_docnum = docnum 297 298 # Write the position file offset delta. 299 300 self.write_number(pos_offset - self.last_pos_offset) 301 self.last_pos_offset = pos_offset 302 303 # Write the document count. 304 305 self.write_number(count) 306 307 return offset 308 309 class PositionIndexOpener(FileOpener): 310 311 "Reading position index information from files." 312 313 def read_term_positions(self, offset, doc_frequency): 314 315 """ 316 Read all positions from 'offset', seeking to that position in the file 317 before reading. The number of documents available for reading is limited 318 to 'doc_frequency'. 319 """ 320 321 # Duplicate the file handle. 322 323 f = self.open("rb") 324 f.seek(offset) 325 return PositionIndexIterator(f, doc_frequency) 326 327 # Iterators for position-related files. 328 329 class IteratorBase: 330 331 def __init__(self, count): 332 self.replenish(count) 333 334 def replenish(self, count): 335 self.count = count 336 self.read_documents = 0 337 338 def __len__(self): 339 return self.count 340 341 def sort(self): 342 pass # Stored document positions are already sorted. 343 344 def __iter__(self): 345 return self 346 347 class PositionIterator(FileReader, IteratorBase): 348 349 "Iterating over document positions." 350 351 def __init__(self, f, count): 352 FileReader.__init__(self, f) 353 IteratorBase.__init__(self, count) 354 355 def reset(self): 356 self.last_docnum = 0 357 358 def read_positions(self): 359 360 "Read positions, returning a document number and a list of positions." 361 362 # Read the document number delta and add it to the last number. 363 364 self.last_docnum += self.read_number() 365 366 # Read the number of positions. 367 368 npositions = self.read_number() 369 370 # Read the position deltas, adding each previous position to get the 371 # appropriate collection of absolute positions. 372 373 i = 0 374 last = 0 375 positions = [] 376 377 while i < npositions: 378 last += self.read_number() 379 positions.append(last) 380 i += 1 381 382 return self.last_docnum, positions 383 384 def next(self): 385 386 "Read positions for a single document." 387 388 if self.read_documents < self.count: 389 self.read_documents += 1 390 return self.read_positions() 391 else: 392 raise StopIteration 393 394 class PositionIndexIterator(FileReader, IteratorBase): 395 396 "Iterating over document positions." 397 398 def __init__(self, f, count): 399 FileReader.__init__(self, f) 400 IteratorBase.__init__(self, count) 401 self.section_count = 0 402 403 def reset(self): 404 self.last_docnum = 0 405 self.last_pos_offset = 0 406 407 def read_positions(self): 408 409 """ 410 Read a document number, a position file offset for the position index 411 file, and the number of documents in a section of that file. 412 """ 413 414 # Read the document number delta. 415 416 self.last_docnum += self.read_number() 417 418 # Read the offset delta. 419 420 self.last_pos_offset += self.read_number() 421 422 # Read the document count. 423 424 count = self.read_number() 425 426 return self.last_docnum, self.last_pos_offset, count 427 428 def next(self): 429 430 "Read positions for a single document." 431 432 self.read_documents += self.section_count 433 if self.read_documents < self.count: 434 docnum, pos_offset, self.section_count = t = self.read_positions() 435 return t 436 else: 437 raise StopIteration 438 439 class PositionDictionaryWriter: 440 441 "Writing position dictionaries." 442 443 def __init__(self, position_writer, position_index_writer, interval): 444 self.position_writer = position_writer 445 self.position_index_writer = position_index_writer 446 self.interval = interval 447 448 def write_term_positions(self, doc_positions): 449 450 """ 451 Write all 'doc_positions' - a collection of tuples of the form (document 452 number, position list) - to the file. 453 454 Add some records to the index, making dictionary entries. 455 456 Return a tuple containing the offset of the written data, the frequency 457 (number of positions), and document frequency (number of documents) for 458 the term involved. 459 """ 460 461 # Reset the writers. 462 463 self.position_writer.reset() 464 self.position_index_writer.reset() 465 466 index_offset = None 467 468 # Write the positions. 469 470 frequency = 0 471 first_docnum = None 472 first_offset = None 473 count = 0 474 475 doc_positions.sort() 476 477 for docnum, positions in doc_positions: 478 pos_offset = self.position_writer.write_positions(docnum, positions) 479 480 # Retain the first record offset for a subsequent index entry. 481 482 if first_offset is None: 483 first_offset = pos_offset 484 first_docnum = docnum 485 486 frequency += len(positions) 487 count += 1 488 489 # Every {interval} entries, write an index entry. 490 491 if count % self.interval == 0: 492 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 493 494 # Remember the first index entry offset. 495 496 if index_offset is None: 497 index_offset = io 498 499 first_offset = None 500 first_docnum = None 501 502 # Reset the position writer so that position readers accessing 503 # a section start with the correct document number. 504 505 self.position_writer.reset() 506 507 # Finish writing an index entry for the remaining documents. 508 509 else: 510 if first_offset is not None: 511 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 512 513 # Remember the first index entry offset. 514 515 if index_offset is None: 516 index_offset = io 517 518 return index_offset, frequency, count 519 520 def close(self): 521 self.position_writer.close() 522 self.position_index_writer.close() 523 524 class PositionDictionaryReader: 525 526 "Reading position dictionaries." 527 528 def __init__(self, position_opener, position_index_opener): 529 self.position_opener = position_opener 530 self.position_index_opener = position_index_opener 531 532 def read_term_positions(self, offset, doc_frequency): 533 534 """ 535 Return an iterator for dictionary entries starting at 'offset' with the 536 given 'doc_frequency'. 537 """ 538 539 return PositionDictionaryIterator(self.position_opener, 540 self.position_index_opener, offset, doc_frequency) 541 542 def close(self): 543 pass 544 545 class PositionDictionaryIterator: 546 547 "Iteration over position dictionary entries." 548 549 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 550 self.position_opener = position_opener 551 self.doc_frequency = doc_frequency 552 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 553 self.iterator = None 554 555 # Remember the last values. 556 557 self.found_docnum, self.found_positions = None, None 558 559 # Maintain state for the next index entry, if read. 560 561 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 562 563 # Initialise the current index entry and current position file iterator. 564 565 self._next_section() 566 self._init_section() 567 568 # Sequence methods. 569 570 def __len__(self): 571 return self.doc_frequency 572 573 def sort(self): 574 pass 575 576 # Iterator methods. 577 578 def __iter__(self): 579 return self 580 581 def next(self): 582 583 """ 584 Attempt to get the next document record from the section in the 585 positions file. 586 """ 587 588 # Return any visited but unrequested record. 589 590 if self.found_docnum is not None: 591 t = self.found_docnum, self.found_positions 592 self.found_docnum, self.found_positions = None, None 593 return t 594 595 # Or search for the next record. 596 597 while 1: 598 599 # Either return the next record. 600 601 try: 602 return self.iterator.next() 603 604 # Or, where a section is finished, get the next section and try again. 605 606 except StopIteration: 607 608 # Where a section follows, update the index iterator, but keep 609 # reading using the same file iterator (since the data should 610 # just follow on from the last section). 611 612 self._next_section() 613 self.iterator.replenish(self.section_count) 614 615 # Reset the state of the iterator to make sure that document 616 # numbers are correct. 617 618 self.iterator.reset() 619 620 def from_document(self, docnum): 621 622 """ 623 Attempt to navigate to a positions entry for the given 'docnum', 624 returning the positions for 'docnum', or None otherwise. 625 """ 626 627 # Return any unrequested document positions. 628 629 if docnum == self.found_docnum: 630 return self.found_positions 631 632 # Read ahead in the index until the next entry refers to a document 633 # later than the desired document. 634 635 try: 636 if self.next_docnum is None: 637 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 638 639 # Read until the next entry is after the desired document number, 640 # or until the end of the results. 641 642 while self.next_docnum <= docnum: 643 self._next_read_section() 644 if self.docnum < docnum: 645 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 646 else: 647 break 648 649 except StopIteration: 650 pass 651 652 # Navigate in the position file to the document. 653 654 self._init_section() 655 656 try: 657 while 1: 658 found_docnum, found_positions = self.iterator.next() 659 660 # Return the desired document positions or None (retaining the 661 # positions for the document immediately after). 662 663 if docnum == found_docnum: 664 return found_positions 665 elif docnum < found_docnum: 666 self.found_docnum, self.found_positions = found_docnum, found_positions 667 return None 668 669 except StopIteration: 670 return None 671 672 # Internal methods. 673 674 def _next_section(self): 675 676 "Attempt to get the next section in the index." 677 678 if self.next_docnum is None: 679 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 680 else: 681 self._next_read_section() 682 683 def _next_read_section(self): 684 685 """ 686 Make the next index entry the current one without reading from the 687 index. 688 """ 689 690 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 691 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 692 693 def _init_section(self): 694 695 "Initialise the iterator for the section in the position file." 696 697 if self.iterator is not None: 698 self.iterator.close() 699 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 700 701 def close(self): 702 if self.iterator is not None: 703 self.iterator.close() 704 self.iterator = None 705 if self.index_iterator is not None: 706 self.index_iterator.close() 707 self.index_iterator = None 708 709 class TermWriter(FileWriter): 710 711 "Writing term information to files." 712 713 def reset(self): 714 self.last_term = "" 715 self.last_offset = 0 716 717 def write_term(self, term, offset, frequency, doc_frequency): 718 719 """ 720 Write the given 'term', its position file 'offset', its 'frequency' and 721 its 'doc_frequency' (number of documents in which it appears) to the 722 term information file. Return the offset after the term information was 723 written to the file. 724 """ 725 726 # Write the prefix length and term suffix. 727 728 common = len(commonprefix([self.last_term, term])) 729 suffix = term[common:] 730 731 self.write_number(common) 732 self.write_string(suffix) 733 734 # Write the offset delta. 735 736 self.write_number(offset - self.last_offset) 737 738 # Write the frequency. 739 740 self.write_number(frequency) 741 742 # Write the document frequency. 743 744 self.write_number(doc_frequency) 745 746 self.last_term = term 747 self.last_offset = offset 748 749 return self.f.tell() 750 751 class TermReader(FileReader): 752 753 "Reading term information from files." 754 755 def reset(self): 756 self.last_term = "" 757 self.last_offset = 0 758 759 def read_term(self): 760 761 """ 762 Read a term, its position file offset, its frequency and its document 763 frequency from the term information file. 764 """ 765 766 # Read the prefix length and term suffix. 767 768 common = self.read_number() 769 suffix = self.read_string() 770 771 self.last_term = self.last_term[:common] + suffix 772 773 # Read the offset delta. 774 775 self.last_offset += self.read_number() 776 777 # Read the frequency. 778 779 frequency = self.read_number() 780 781 # Read the document frequency. 782 783 doc_frequency = self.read_number() 784 785 return self.last_term, self.last_offset, frequency, doc_frequency 786 787 def go_to_term(self, term, offset, info_offset): 788 789 """ 790 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 791 permits the scanning for later terms from the specified term. 792 """ 793 794 self.f.seek(info_offset) 795 self.last_term = term 796 self.last_offset = offset 797 798 class TermIndexWriter(TermWriter): 799 800 "Writing term dictionary index details to files." 801 802 def reset(self): 803 TermWriter.reset(self) 804 self.last_info_offset = 0 805 806 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 807 808 """ 809 Write the given 'term', its position file 'offset', its 'frequency' and 810 its 'doc_frequency' to the term dictionary index file, along with the 811 'info_offset' in the term information file. 812 """ 813 814 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 815 816 # Write the information file offset delta. 817 818 self.write_number(info_offset - self.last_info_offset) 819 self.last_info_offset = info_offset 820 821 class TermIndexReader(TermReader): 822 823 "Reading term dictionary index details from files." 824 825 def reset(self): 826 TermReader.reset(self) 827 self.last_info_offset = 0 828 829 def read_term(self): 830 831 """ 832 Read a term, its position file offset, its frequency, its document 833 frequency and a term information file offset from the term dictionary 834 index file. 835 """ 836 837 term, offset, frequency, doc_frequency = TermReader.read_term(self) 838 839 # Read the offset delta. 840 841 self.last_info_offset += self.read_number() 842 843 return term, offset, frequency, doc_frequency, self.last_info_offset 844 845 class TermDictionaryWriter: 846 847 "Writing term dictionaries." 848 849 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 850 self.info_writer = info_writer 851 self.index_writer = index_writer 852 self.position_dict_writer = position_dict_writer 853 self.interval = interval 854 self.entry = 0 855 856 def _write_term(self, term, offset, frequency, doc_frequency): 857 858 """ 859 Write the given 'term', its position file 'offset', its 'frequency' and 860 its 'doc_frequency' (number of documents in which it appears) to the 861 term information file. Return the offset after the term information was 862 written to the file. 863 """ 864 865 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 866 867 if self.entry % self.interval == 0: 868 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 869 870 self.entry += 1 871 872 def write_term_positions(self, term, doc_positions): 873 874 """ 875 Write the given 'term' and the 'doc_positions' recording the documents 876 and positions at which the term is found. 877 """ 878 879 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 880 self._write_term(term, offset, frequency, doc_frequency) 881 882 def close(self): 883 self.info_writer.close() 884 self.index_writer.close() 885 self.position_dict_writer.close() 886 887 class TermDictionaryReader: 888 889 "Reading term dictionaries." 890 891 def __init__(self, info_reader, index_reader, position_dict_reader): 892 self.info_reader = info_reader 893 self.index_reader = index_reader 894 self.position_dict_reader = position_dict_reader 895 896 self.terms = [] 897 try: 898 while 1: 899 self.terms.append(self.index_reader.read_term()) 900 except EOFError: 901 pass 902 903 # Large numbers for ordering purposes. 904 905 if self.terms: 906 self.max_offset = self.terms[-1][1] + 1 907 else: 908 self.max_offset = None 909 910 def _find_closest_entry(self, term): 911 912 """ 913 Find the offsets and frequencies of 'term' from the term dictionary or 914 the closest term starting with the value of 'term'. 915 916 Return the closest index entry consisting of a term, the position file 917 offset, the term frequency, the document frequency, and the term details 918 file offset. 919 """ 920 921 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 922 923 # Get the entry position providing the term or one preceding it. 924 # If no entry precedes the requested term, return the very first entry 925 # as the closest. 926 927 if i == -1: 928 return self.terms[0] 929 else: 930 return self.terms[i] 931 932 def _find_closest_term(self, term): 933 934 """ 935 Find the offsets and frequencies of 'term' from the term dictionary or 936 the closest term starting with the value of 'term'. 937 938 Return the closest term (or the term itself), the position file offset, 939 the term frequency, the document frequency, and the term details file 940 offset (or None if the reader is already positioned). 941 """ 942 943 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 944 945 # Where the term is found immediately, return the offset and 946 # frequencies. If the term does not appear, return the details of the 947 # closest entry. 948 949 if term <= found_term: 950 return found_term, offset, frequency, doc_frequency, info_offset 951 952 # Otherwise, seek past the index term's entry in the information file 953 # and scan for the desired term. 954 955 else: 956 self.info_reader.go_to_term(found_term, offset, info_offset) 957 try: 958 while term > found_term: 959 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 960 except EOFError: 961 pass 962 963 return found_term, offset, frequency, doc_frequency, None 964 965 def _find_term(self, term): 966 967 """ 968 Find the position file offset and frequency of 'term' from the term 969 dictionary. 970 """ 971 972 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 973 974 # If the term is found, return the offset and frequencies. 975 976 if term == found_term: 977 return offset, frequency, doc_frequency 978 else: 979 return None 980 981 def _get_positions(self, offset, doc_frequency): 982 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 983 984 # Iterator convenience methods. 985 986 def __iter__(self): 987 self.rewind() 988 return self 989 990 def next(self): 991 try: 992 return self.read_term() 993 except EOFError: 994 raise StopIteration 995 996 # Sequential access methods. 997 998 def rewind(self): 999 self.info_reader.rewind() 1000 1001 def read_term(self): 1002 1003 """ 1004 Return the next term, its frequency, its document frequency, and the 1005 documents and positions at which the term is found. 1006 """ 1007 1008 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1009 positions = self._get_positions(offset, doc_frequency) 1010 return term, frequency, doc_frequency, positions 1011 1012 # Query methods. 1013 1014 def find_terms(self, term): 1015 1016 "Return all terms whose values start with the value of 'term'." 1017 1018 terms = [] 1019 1020 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1021 1022 # Position the reader, if necessary. 1023 1024 if info_offset is not None: 1025 self.info_reader.go_to_term(found_term, offset, info_offset) 1026 1027 # Read and record terms. 1028 1029 try: 1030 # Add the found term if it starts with the specified term. 1031 1032 while found_term.startswith(term): 1033 terms.append(found_term) 1034 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1035 1036 except EOFError: 1037 pass 1038 1039 return terms 1040 1041 def find_positions(self, term): 1042 1043 "Return the documents and positions at which the given 'term' is found." 1044 1045 t = self._find_term(term) 1046 if t is None: 1047 return None 1048 else: 1049 offset, frequency, doc_frequency = t 1050 return self._get_positions(offset, doc_frequency) 1051 1052 def get_frequency(self, term): 1053 1054 "Return the frequency of the given 'term'." 1055 1056 t = self._find_term(term) 1057 if t is None: 1058 return None 1059 else: 1060 offset, frequency, doc_frequency = t 1061 return frequency 1062 1063 def get_document_frequency(self, term): 1064 1065 "Return the document frequency of the given 'term'." 1066 1067 t = self._find_term(term) 1068 if t is None: 1069 return None 1070 else: 1071 offset, frequency, doc_frequency = t 1072 return doc_frequency 1073 1074 def close(self): 1075 self.info_reader.close() 1076 self.index_reader.close() 1077 self.position_dict_reader.close() 1078 1079 # Specific classes for storing document information. 1080 1081 class FieldWriter(FileWriter): 1082 1083 "Writing field data to files." 1084 1085 def reset(self): 1086 self.last_docnum = 0 1087 1088 def write_fields(self, docnum, fields): 1089 1090 """ 1091 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1092 representing field identifiers and values respectively). 1093 Return the offset at which the fields are stored. 1094 """ 1095 1096 offset = self.f.tell() 1097 1098 # Write the document number delta. 1099 1100 self.write_number(docnum - self.last_docnum) 1101 1102 # Write the number of fields. 1103 1104 self.write_number(len(fields)) 1105 1106 # Write the fields themselves. 1107 1108 for i, field in fields: 1109 self.write_number(i) 1110 self.write_string(field, 1) # compress 1111 1112 self.last_docnum = docnum 1113 return offset 1114 1115 class FieldReader(FileReader): 1116 1117 "Reading field data from files." 1118 1119 def reset(self): 1120 self.last_docnum = 0 1121 1122 def read_fields(self): 1123 1124 """ 1125 Read fields from the file, returning a tuple containing the document 1126 number and a list of field (identifier, value) pairs. 1127 """ 1128 1129 # Read the document number. 1130 1131 self.last_docnum += self.read_number() 1132 1133 # Read the number of fields. 1134 1135 nfields = self.read_number() 1136 1137 # Collect the fields. 1138 1139 fields = [] 1140 i = 0 1141 1142 while i < nfields: 1143 identifier = self.read_number() 1144 value = self.read_string(1) # decompress 1145 fields.append((identifier, value)) 1146 i += 1 1147 1148 return self.last_docnum, fields 1149 1150 def read_document_fields(self, docnum, offset): 1151 1152 """ 1153 Read fields for 'docnum' at the given 'offset'. This permits the 1154 retrieval of details for the specified document, as well as scanning for 1155 later documents. 1156 """ 1157 1158 self.f.seek(offset) 1159 bad_docnum, fields = self.read_fields() 1160 self.last_docnum = docnum 1161 return docnum, fields 1162 1163 class FieldIndexWriter(FileWriter): 1164 1165 "Writing field index details to files." 1166 1167 def reset(self): 1168 self.last_docnum = 0 1169 self.last_offset = 0 1170 1171 def write_document(self, docnum, offset): 1172 1173 """ 1174 Write for the given 'docnum', the 'offset' at which the fields for the 1175 document are stored in the fields file. 1176 """ 1177 1178 # Write the document number and offset deltas. 1179 1180 self.write_number(docnum - self.last_docnum) 1181 self.write_number(offset - self.last_offset) 1182 1183 self.last_docnum = docnum 1184 self.last_offset = offset 1185 1186 class FieldIndexReader(FileReader): 1187 1188 "Reading field index details from files." 1189 1190 def reset(self): 1191 self.last_docnum = 0 1192 self.last_offset = 0 1193 1194 def read_document(self): 1195 1196 "Read a document number and field file offset." 1197 1198 # Read the document number delta and offset. 1199 1200 self.last_docnum += self.read_number() 1201 self.last_offset += self.read_number() 1202 1203 return self.last_docnum, self.last_offset 1204 1205 class FieldDictionaryWriter: 1206 1207 "Writing field dictionary details." 1208 1209 def __init__(self, field_writer, field_index_writer, interval): 1210 self.field_writer = field_writer 1211 self.field_index_writer = field_index_writer 1212 self.interval = interval 1213 self.entry = 0 1214 1215 def write_fields(self, docnum, fields): 1216 1217 "Write details of the document with the given 'docnum' and 'fields'." 1218 1219 offset = self.field_writer.write_fields(docnum, fields) 1220 1221 if self.entry % self.interval == 0: 1222 self.field_index_writer.write_document(docnum, offset) 1223 1224 self.entry += 1 1225 1226 def close(self): 1227 self.field_writer.close() 1228 self.field_index_writer.close() 1229 1230 class FieldDictionaryReader: 1231 1232 "Reading field dictionary details." 1233 1234 def __init__(self, field_reader, field_index_reader): 1235 self.field_reader = field_reader 1236 self.field_index_reader = field_index_reader 1237 1238 self.docs = [] 1239 try: 1240 while 1: 1241 self.docs.append(self.field_index_reader.read_document()) 1242 except EOFError: 1243 pass 1244 1245 # Large numbers for ordering purposes. 1246 1247 if self.docs: 1248 self.max_offset = self.docs[-1][1] 1249 else: 1250 self.max_offset = None 1251 1252 # Iterator convenience methods. 1253 1254 def __iter__(self): 1255 self.rewind() 1256 return self 1257 1258 def next(self): 1259 try: 1260 return self.read_fields() 1261 except EOFError: 1262 raise StopIteration 1263 1264 # Sequential access methods. 1265 1266 def rewind(self): 1267 self.field_reader.rewind() 1268 1269 def read_fields(self): 1270 1271 "Return the next document number and fields." 1272 1273 return self.field_reader.read_fields() 1274 1275 # Random access methods. 1276 1277 def get_fields(self, docnum): 1278 1279 "Read the fields of the document with the given 'docnum'." 1280 1281 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1282 1283 # Get the entry position providing the term or one preceding it. 1284 1285 if i == -1: 1286 return None 1287 1288 found_docnum, offset = self.docs[i] 1289 1290 # Read from the fields file. 1291 1292 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1293 1294 # Scan for the document, if necessary. 1295 1296 try: 1297 while docnum > found_docnum: 1298 found_docnum, fields = self.field_reader.read_fields() 1299 except EOFError: 1300 pass 1301 1302 # If the document is found, return the fields. 1303 1304 if docnum == found_docnum: 1305 return fields 1306 else: 1307 return None 1308 1309 def close(self): 1310 self.field_reader.close() 1311 self.field_index_reader.close() 1312 1313 # Dictionary merging classes. 1314 1315 class Merger: 1316 1317 "Merge files." 1318 1319 def __init__(self, writer, readers): 1320 self.writer = writer 1321 self.readers = readers 1322 1323 def close(self): 1324 for reader in self.readers: 1325 reader.close() 1326 self.writer.close() 1327 1328 class TermDictionaryMerger(Merger): 1329 1330 "Merge term and position files." 1331 1332 def merge(self): 1333 1334 """ 1335 Merge terms and positions from the readers, sending them to the writer. 1336 """ 1337 1338 last_term = None 1339 current_readers = [] 1340 1341 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1342 if term == last_term: 1343 current_readers.append(positions) 1344 else: 1345 if current_readers: 1346 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1347 last_term = term 1348 current_readers = [positions] 1349 else: 1350 if current_readers: 1351 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1352 1353 class FieldDictionaryMerger(Merger): 1354 1355 "Merge field files." 1356 1357 def merge(self): 1358 1359 """ 1360 Merge fields from the readers, sending them to the writer. 1361 """ 1362 1363 for docnum, fields in itermerge(self.readers): 1364 self.writer.write_fields(docnum, fields) 1365 1366 # Utility functions. 1367 1368 def get_term_writer(pathname, partition, interval, doc_interval): 1369 1370 """ 1371 Return a term dictionary writer using files under the given 'pathname' 1372 labelled according to the given 'partition', using the given indexing 1373 'interval' for terms and 'doc_interval' for document position records. 1374 """ 1375 1376 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1377 info_writer = TermWriter(tdf) 1378 1379 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1380 index_writer = TermIndexWriter(tdif) 1381 1382 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1383 positions_writer = PositionWriter(tpf) 1384 1385 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1386 positions_index_writer = PositionIndexWriter(tpif) 1387 1388 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1389 1390 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1391 1392 def get_field_writer(pathname, partition, interval): 1393 1394 """ 1395 Return a field dictionary writer using files under the given 'pathname' 1396 labelled according to the given 'partition', using the given indexing 1397 'interval'. 1398 """ 1399 1400 ff = open(join(pathname, "fields-%s" % partition), "wb") 1401 field_writer = FieldWriter(ff) 1402 1403 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1404 field_index_writer = FieldIndexWriter(fif) 1405 1406 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1407 1408 def get_term_reader(pathname, partition): 1409 1410 """ 1411 Return a term dictionary reader using files under the given 'pathname' 1412 labelled according to the given 'partition'. 1413 """ 1414 1415 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1416 info_reader = TermReader(tdf) 1417 1418 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1419 index_reader = TermIndexReader(tdif) 1420 1421 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1422 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1423 1424 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1425 1426 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1427 1428 def get_field_reader(pathname, partition): 1429 1430 """ 1431 Return a field dictionary reader using files under the given 'pathname' 1432 labelled according to the given 'partition'. 1433 """ 1434 1435 ff = open(join(pathname, "fields-%s" % partition), "rb") 1436 field_reader = FieldReader(ff) 1437 1438 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1439 field_index_reader = FieldIndexReader(fif) 1440 1441 return FieldDictionaryReader(field_reader, field_index_reader) 1442 1443 def rename_files(pathname, names, from_partition, to_partition): 1444 for name in names: 1445 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1446 1447 def rename_term_files(pathname, from_partition, to_partition): 1448 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1449 1450 def rename_field_files(pathname, from_partition, to_partition): 1451 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1452 1453 def remove_files(pathname, names, partition): 1454 for name in names: 1455 remove(join(pathname, "%s-%s" % (name, partition))) 1456 1457 def remove_term_files(pathname, partition): 1458 remove_files(pathname, TERM_FILENAMES, partition) 1459 1460 def remove_field_files(pathname, partition): 1461 remove_files(pathname, FIELD_FILENAMES, partition) 1462 1463 # High-level classes. 1464 1465 class Document: 1466 1467 "A container of document information." 1468 1469 def __init__(self, docnum): 1470 self.docnum = docnum 1471 self.fields = [] 1472 self.terms = {} 1473 1474 def add_position(self, term, position): 1475 1476 """ 1477 Add a position entry for the given 'term', indicating the given 1478 'position'. 1479 """ 1480 1481 self.terms.setdefault(term, []).append(position) 1482 1483 def add_field(self, identifier, value): 1484 1485 "Add a field having the given 'identifier' and 'value'." 1486 1487 self.fields.append((identifier, unicode(value))) # convert to string 1488 1489 def set_fields(self, fields): 1490 1491 """ 1492 Set the document's 'fields': a list of tuples each containing an integer 1493 identifier and a string value. 1494 """ 1495 1496 self.fields = fields 1497 1498 class IndexWriter: 1499 1500 """ 1501 Building term information and writing it to the term and field dictionaries. 1502 """ 1503 1504 def __init__(self, pathname, interval, doc_interval, flush_interval): 1505 self.pathname = pathname 1506 self.interval = interval 1507 self.doc_interval = doc_interval 1508 self.flush_interval = flush_interval 1509 1510 self.dict_partition = 0 1511 self.field_dict_partition = 0 1512 1513 self.terms = {} 1514 self.docs = {} 1515 1516 self.doc_counter = 0 1517 1518 def add_document(self, doc): 1519 1520 """ 1521 Add the given document 'doc', updating the document counter and flushing 1522 terms and fields if appropriate. 1523 """ 1524 1525 for term, positions in doc.terms.items(): 1526 self.terms.setdefault(term, {})[doc.docnum] = positions 1527 1528 self.docs[doc.docnum] = doc.fields 1529 1530 self.doc_counter += 1 1531 if self.flush_interval and self.doc_counter >= self.flush_interval: 1532 self.flush_terms() 1533 self.flush_fields() 1534 self.doc_counter = 0 1535 1536 def get_term_writer(self): 1537 1538 "Return a term dictionary writer for the current partition." 1539 1540 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1541 1542 def get_field_writer(self): 1543 1544 "Return a field dictionary writer for the current partition." 1545 1546 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1547 1548 def flush_terms(self): 1549 1550 "Flush terms into the current term dictionary partition." 1551 1552 # Get the terms in order. 1553 1554 all_terms = self.terms 1555 terms = all_terms.keys() 1556 terms.sort() 1557 1558 dict_writer = self.get_term_writer() 1559 1560 for term in terms: 1561 doc_positions = all_terms[term].items() 1562 dict_writer.write_term_positions(term, doc_positions) 1563 1564 dict_writer.close() 1565 1566 self.terms = {} 1567 self.dict_partition += 1 1568 1569 def flush_fields(self): 1570 1571 "Flush fields into the current term dictionary partition." 1572 1573 # Get the documents in order. 1574 1575 docs = self.docs.items() 1576 docs.sort() 1577 1578 field_dict_writer = self.get_field_writer() 1579 1580 for docnum, fields in docs: 1581 field_dict_writer.write_fields(docnum, fields) 1582 1583 field_dict_writer.close() 1584 1585 self.docs = {} 1586 self.field_dict_partition += 1 1587 1588 def close(self): 1589 if self.terms: 1590 self.flush_terms() 1591 if self.docs: 1592 self.flush_fields() 1593 1594 class IndexReader: 1595 1596 "Accessing the term and field dictionaries." 1597 1598 def __init__(self, pathname): 1599 self.dict_reader = get_term_reader(pathname, "merged") 1600 self.field_dict_reader = get_field_reader(pathname, "merged") 1601 1602 def find_terms(self, term): 1603 return self.dict_reader.find_terms(term) 1604 1605 def find_positions(self, term): 1606 return self.dict_reader.find_positions(term) 1607 1608 def get_frequency(self, term): 1609 return self.dict_reader.get_frequency(term) 1610 1611 def get_document_frequency(self, term): 1612 return self.dict_reader.get_document_frequency(term) 1613 1614 def get_fields(self, docnum): 1615 return self.field_dict_reader.get_fields(docnum) 1616 1617 def close(self): 1618 self.dict_reader.close() 1619 self.field_dict_reader.close() 1620 1621 class Index: 1622 1623 "An inverted index solution encapsulating the various components." 1624 1625 def __init__(self, pathname): 1626 self.pathname = pathname 1627 self.reader = None 1628 self.writer = None 1629 1630 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1631 1632 """ 1633 Return a writer, optionally using the given indexing 'interval', 1634 'doc_interval' and 'flush_interval'. 1635 """ 1636 1637 if not exists(self.pathname): 1638 mkdir(self.pathname) 1639 1640 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1641 return self.writer 1642 1643 def get_reader(self, partition=0): 1644 1645 "Return a reader for the index." 1646 1647 # Ensure that only one partition exists. 1648 1649 self.merge() 1650 return self._get_reader(partition) 1651 1652 def _get_reader(self, partition): 1653 1654 "Return a reader for the index." 1655 1656 if not exists(self.pathname): 1657 raise OSError, "Index path %r does not exist." % self.pathname 1658 1659 self.reader = IndexReader(self.pathname) 1660 return self.reader 1661 1662 def merge(self): 1663 1664 "Merge/optimise index partitions." 1665 1666 self.merge_terms() 1667 self.merge_fields() 1668 1669 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1670 1671 """ 1672 Merge term dictionaries using the given indexing 'interval' and 1673 'doc_interval'. 1674 """ 1675 1676 readers = [] 1677 partitions = set() 1678 1679 for filename in listdir(self.pathname): 1680 if filename.startswith("terms-"): # 6 character prefix 1681 partition = filename[6:] 1682 readers.append(get_term_reader(self.pathname, partition)) 1683 partitions.add(partition) 1684 1685 # Write directly to a dictionary. 1686 1687 if len(readers) > 1: 1688 if "merged" in partitions: 1689 rename_term_files(self.pathname, "merged", "old-merged") 1690 partitions.remove("merged") 1691 partitions.add("old-merged") 1692 1693 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1694 merger = TermDictionaryMerger(writer, readers) 1695 merger.merge() 1696 merger.close() 1697 1698 # Remove old files. 1699 1700 for partition in partitions: 1701 remove_term_files(self.pathname, partition) 1702 1703 elif len(readers) == 1: 1704 partition = list(partitions)[0] 1705 if partition != "merged": 1706 rename_term_files(self.pathname, partition, "merged") 1707 1708 def merge_fields(self, interval=FIELD_INTERVAL): 1709 1710 "Merge field dictionaries using the given indexing 'interval'." 1711 1712 readers = [] 1713 partitions = set() 1714 1715 for filename in listdir(self.pathname): 1716 if filename.startswith("fields-"): # 7 character prefix 1717 partition = filename[7:] 1718 readers.append(get_field_reader(self.pathname, partition)) 1719 partitions.add(partition) 1720 1721 # Write directly to a dictionary. 1722 1723 if len(readers) > 1: 1724 if "merged" in partitions: 1725 rename_field_files(self.pathname, "merged", "old-merged") 1726 partitions.remove("merged") 1727 partitions.add("old-merged") 1728 1729 writer = get_field_writer(self.pathname, "merged", interval) 1730 merger = FieldDictionaryMerger(writer, readers) 1731 merger.merge() 1732 merger.close() 1733 1734 # Remove old files. 1735 1736 for partition in partitions: 1737 remove_field_files(self.pathname, partition) 1738 1739 elif len(readers) == 1: 1740 partition = list(partitions)[0] 1741 if partition != "merged": 1742 rename_field_files(self.pathname, partition, "merged") 1743 1744 def close(self): 1745 if self.reader is not None: 1746 self.reader.close() 1747 self.reader = None 1748 if self.writer is not None: 1749 self.writer.close() 1750 self.writer = None 1751 1752 # vim: tabstop=4 expandtab shiftwidth=4