1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 try: 51 from vint import vint 52 except ImportError: 53 54 def vint(number): 55 56 "Write 'number' as a variable-length integer." 57 58 if number >= 0: 59 60 # Special case: one byte containing a 7-bit number. 61 62 if number < 128: 63 return chr(number) 64 65 # Write the number from least to most significant digits. 66 67 bytes = [] 68 69 while number != 0: 70 lsd = number & 127 71 number = number >> 7 72 if number != 0: 73 lsd |= 128 74 bytes.append(chr(lsd)) 75 76 return "".join(bytes) 77 78 # Negative numbers are not supported. 79 80 else: 81 raise ValueError, "Number %r is negative." % number 82 83 # Foundation classes. 84 85 class File: 86 87 "A basic file abstraction." 88 89 def __init__(self, f): 90 self.f = f 91 self.reset() 92 93 def reset(self): 94 95 "To be used to reset the state of the reader or writer between records." 96 97 pass 98 99 def rewind(self): 100 self.f.seek(0) 101 self.reset() 102 103 def close(self): 104 if self.f is not None: 105 self.f.close() 106 self.f = None 107 108 class FileWriter(File): 109 110 "Writing basic data types to files." 111 112 def write_number(self, number): 113 114 "Write 'number' to the file using a variable length encoding." 115 116 self.f.write(vint(number)) 117 118 def write_string(self, s, compress=0): 119 120 """ 121 Write 's' to the file, recording its length and compressing the string 122 if 'compress' is set to a true value. 123 """ 124 125 # Convert Unicode objects to strings. 126 127 if isinstance(s, unicode): 128 s = s.encode("utf-8") 129 130 # Compress the string if requested. 131 132 if compress: 133 for flag, fn in compressors: 134 cs = fn(s) 135 136 # Take the first string shorter than the original. 137 138 if len(cs) < len(s): 139 s = cs 140 break 141 else: 142 flag = "-" 143 144 else: 145 flag = "" 146 147 # Write the length of the data before the data itself. 148 149 length = len(s) 150 self.f.write(flag + vint(length) + s) 151 152 class FileReader(File): 153 154 "Reading basic data types from files." 155 156 def read_number(self): 157 158 "Read a number from the file." 159 160 # Read each byte, adding it to the number. 161 162 shift = 0 163 number = 0 164 read = self.f.read 165 166 try: 167 csd = ord(read(1)) 168 while csd & 128: 169 number += ((csd & 127) << shift) 170 shift += 7 171 csd = ord(read(1)) 172 else: 173 number += (csd << shift) 174 except TypeError: 175 raise EOFError 176 177 return number 178 179 def read_string(self, decompress=0): 180 181 """ 182 Read a string from the file, decompressing the stored data if 183 'decompress' is set to a true value. 184 """ 185 186 # Decompress the data if requested. 187 188 if decompress: 189 flag = self.f.read(1) 190 else: 191 flag = "-" 192 193 length = self.read_number() 194 s = self.f.read(length) 195 196 # Perform decompression if applicable. 197 198 if flag != "-": 199 fn = decompressors[flag] 200 s = fn(s) 201 202 # Convert strings to Unicode objects. 203 204 return unicode(s, "utf-8") 205 206 class FileOpener: 207 208 "Opening files using their filenames." 209 210 def __init__(self, filename): 211 self.filename = filename 212 213 def open(self, mode): 214 return open(self.filename, mode) 215 216 def close(self): 217 pass 218 219 # Specific classes for storing term and position information. 220 221 class PositionWriter(FileWriter): 222 223 "Writing position information to files." 224 225 def reset(self): 226 self.last_docnum = 0 227 228 def write_positions(self, docnum, positions): 229 230 """ 231 Write for the document 'docnum' the given 'positions'. 232 Return the offset of the written record. 233 """ 234 235 if docnum < self.last_docnum: 236 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 237 238 # Record the offset of this record. 239 240 offset = self.f.tell() 241 242 # Make sure that the positions are sorted. 243 244 positions.sort() 245 246 # Write the position deltas. 247 248 output = [] 249 last = 0 250 251 for position in positions: 252 output.append(vint(position - last)) 253 last = position 254 255 # Write the document number delta. 256 # Write the number of positions. 257 # Then write the positions. 258 259 self.f.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 260 261 self.last_docnum = docnum 262 return offset 263 264 class PositionOpener(FileOpener): 265 266 "Reading position information from files." 267 268 def read_term_positions(self, offset, count): 269 270 """ 271 Read all positions from 'offset', seeking to that position in the file 272 before reading. The number of documents available for reading is limited 273 to 'count'. 274 """ 275 276 # Duplicate the file handle. 277 278 f = self.open("rb") 279 f.seek(offset) 280 return PositionIterator(f, count) 281 282 class PositionIndexWriter(FileWriter): 283 284 "Writing position index information to files." 285 286 def reset(self): 287 self.last_docnum = 0 288 self.last_pos_offset = 0 289 290 def write_positions(self, docnum, pos_offset, count): 291 292 """ 293 Write the given 'docnum, 'pos_offset' and document 'count' to the 294 position index file. 295 """ 296 297 # Record the offset of this record. 298 299 offset = self.f.tell() 300 output = [] 301 302 # Write the document number delta. 303 304 output.append(vint(docnum - self.last_docnum)) 305 self.last_docnum = docnum 306 307 # Write the position file offset delta. 308 309 output.append(vint(pos_offset - self.last_pos_offset)) 310 self.last_pos_offset = pos_offset 311 312 # Write the document count. 313 314 output.append(vint(count)) 315 316 # Actually write the data. 317 318 self.f.write("".join(output)) 319 320 return offset 321 322 class PositionIndexOpener(FileOpener): 323 324 "Reading position index information from files." 325 326 def read_term_positions(self, offset, doc_frequency): 327 328 """ 329 Read all positions from 'offset', seeking to that position in the file 330 before reading. The number of documents available for reading is limited 331 to 'doc_frequency'. 332 """ 333 334 # Duplicate the file handle. 335 336 f = self.open("rb") 337 f.seek(offset) 338 return PositionIndexIterator(f, doc_frequency) 339 340 # Iterators for position-related files. 341 342 class IteratorBase: 343 344 def __init__(self, count): 345 self.replenish(count) 346 347 def replenish(self, count): 348 self.count = count 349 self.read_documents = 0 350 351 def __len__(self): 352 return self.count 353 354 def sort(self): 355 pass # Stored document positions are already sorted. 356 357 def __iter__(self): 358 return self 359 360 class PositionIterator(FileReader, IteratorBase): 361 362 "Iterating over document positions." 363 364 def __init__(self, f, count): 365 FileReader.__init__(self, f) 366 IteratorBase.__init__(self, count) 367 368 def reset(self): 369 self.last_docnum = 0 370 371 def read_positions(self): 372 373 "Read positions, returning a document number and a list of positions." 374 375 # Read the document number delta and add it to the last number. 376 377 self.last_docnum += self.read_number() 378 379 # Read the number of positions. 380 381 npositions = self.read_number() 382 383 # Read the position deltas, adding each previous position to get the 384 # appropriate collection of absolute positions. 385 386 i = 0 387 last = 0 388 positions = [] 389 390 while i < npositions: 391 last += self.read_number() 392 positions.append(last) 393 i += 1 394 395 return self.last_docnum, positions 396 397 def next(self): 398 399 "Read positions for a single document." 400 401 if self.read_documents < self.count: 402 self.read_documents += 1 403 return self.read_positions() 404 else: 405 raise StopIteration 406 407 class PositionIndexIterator(FileReader, IteratorBase): 408 409 "Iterating over document positions." 410 411 def __init__(self, f, count): 412 FileReader.__init__(self, f) 413 IteratorBase.__init__(self, count) 414 self.section_count = 0 415 416 def reset(self): 417 self.last_docnum = 0 418 self.last_pos_offset = 0 419 420 def read_positions(self): 421 422 """ 423 Read a document number, a position file offset for the position index 424 file, and the number of documents in a section of that file. 425 """ 426 427 # Read the document number delta. 428 429 self.last_docnum += self.read_number() 430 431 # Read the offset delta. 432 433 self.last_pos_offset += self.read_number() 434 435 # Read the document count. 436 437 count = self.read_number() 438 439 return self.last_docnum, self.last_pos_offset, count 440 441 def next(self): 442 443 "Read positions for a single document." 444 445 self.read_documents += self.section_count 446 if self.read_documents < self.count: 447 docnum, pos_offset, self.section_count = t = self.read_positions() 448 return t 449 else: 450 raise StopIteration 451 452 class PositionDictionaryWriter: 453 454 "Writing position dictionaries." 455 456 def __init__(self, position_writer, position_index_writer, interval): 457 self.position_writer = position_writer 458 self.position_index_writer = position_index_writer 459 self.interval = interval 460 461 def write_term_positions(self, doc_positions): 462 463 """ 464 Write all 'doc_positions' - a collection of tuples of the form (document 465 number, position list) - to the file. 466 467 Add some records to the index, making dictionary entries. 468 469 Return a tuple containing the offset of the written data, the frequency 470 (number of positions), and document frequency (number of documents) for 471 the term involved. 472 """ 473 474 # Reset the writers. 475 476 self.position_writer.reset() 477 self.position_index_writer.reset() 478 479 index_offset = None 480 481 # Write the positions. 482 483 frequency = 0 484 first_docnum = None 485 first_offset = None 486 count = 0 487 488 doc_positions.sort() 489 490 for docnum, positions in doc_positions: 491 pos_offset = self.position_writer.write_positions(docnum, positions) 492 493 # Retain the first record offset for a subsequent index entry. 494 495 if first_offset is None: 496 first_offset = pos_offset 497 first_docnum = docnum 498 499 frequency += len(positions) 500 count += 1 501 502 # Every {interval} entries, write an index entry. 503 504 if count % self.interval == 0: 505 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 506 507 # Remember the first index entry offset. 508 509 if index_offset is None: 510 index_offset = io 511 512 first_offset = None 513 first_docnum = None 514 515 # Reset the position writer so that position readers accessing 516 # a section start with the correct document number. 517 518 self.position_writer.reset() 519 520 # Finish writing an index entry for the remaining documents. 521 522 else: 523 if first_offset is not None: 524 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 525 526 # Remember the first index entry offset. 527 528 if index_offset is None: 529 index_offset = io 530 531 return index_offset, frequency, count 532 533 def close(self): 534 self.position_writer.close() 535 self.position_index_writer.close() 536 537 class PositionDictionaryReader: 538 539 "Reading position dictionaries." 540 541 def __init__(self, position_opener, position_index_opener): 542 self.position_opener = position_opener 543 self.position_index_opener = position_index_opener 544 545 def read_term_positions(self, offset, doc_frequency): 546 547 """ 548 Return an iterator for dictionary entries starting at 'offset' with the 549 given 'doc_frequency'. 550 """ 551 552 return PositionDictionaryIterator(self.position_opener, 553 self.position_index_opener, offset, doc_frequency) 554 555 def close(self): 556 pass 557 558 class PositionDictionaryIterator: 559 560 "Iteration over position dictionary entries." 561 562 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 563 self.position_opener = position_opener 564 self.doc_frequency = doc_frequency 565 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 566 self.iterator = None 567 568 # Remember the last values. 569 570 self.found_docnum, self.found_positions = None, None 571 572 # Maintain state for the next index entry, if read. 573 574 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 575 576 # Initialise the current index entry and current position file iterator. 577 578 self._next_section() 579 self._init_section() 580 581 # Sequence methods. 582 583 def __len__(self): 584 return self.doc_frequency 585 586 def sort(self): 587 pass 588 589 # Iterator methods. 590 591 def __iter__(self): 592 return self 593 594 def next(self): 595 596 """ 597 Attempt to get the next document record from the section in the 598 positions file. 599 """ 600 601 # Return any visited but unrequested record. 602 603 if self.found_docnum is not None: 604 t = self.found_docnum, self.found_positions 605 self.found_docnum, self.found_positions = None, None 606 return t 607 608 # Or search for the next record. 609 610 while 1: 611 612 # Either return the next record. 613 614 try: 615 return self.iterator.next() 616 617 # Or, where a section is finished, get the next section and try again. 618 619 except StopIteration: 620 621 # Where a section follows, update the index iterator, but keep 622 # reading using the same file iterator (since the data should 623 # just follow on from the last section). 624 625 self._next_section() 626 self.iterator.replenish(self.section_count) 627 628 # Reset the state of the iterator to make sure that document 629 # numbers are correct. 630 631 self.iterator.reset() 632 633 def from_document(self, docnum): 634 635 """ 636 Attempt to navigate to a positions entry for the given 'docnum', 637 returning the positions for 'docnum', or None otherwise. 638 """ 639 640 # Return any unrequested document positions. 641 642 if docnum == self.found_docnum: 643 return self.found_positions 644 645 # Read ahead in the index until the next entry refers to a document 646 # later than the desired document. 647 648 try: 649 if self.next_docnum is None: 650 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 651 652 # Read until the next entry is after the desired document number, 653 # or until the end of the results. 654 655 while self.next_docnum <= docnum: 656 self._next_read_section() 657 if self.docnum < docnum: 658 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 659 else: 660 break 661 662 except StopIteration: 663 pass 664 665 # Navigate in the position file to the document. 666 667 self._init_section() 668 669 try: 670 while 1: 671 found_docnum, found_positions = self.iterator.next() 672 673 # Return the desired document positions or None (retaining the 674 # positions for the document immediately after). 675 676 if docnum == found_docnum: 677 return found_positions 678 elif docnum < found_docnum: 679 self.found_docnum, self.found_positions = found_docnum, found_positions 680 return None 681 682 except StopIteration: 683 return None 684 685 # Internal methods. 686 687 def _next_section(self): 688 689 "Attempt to get the next section in the index." 690 691 if self.next_docnum is None: 692 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 693 else: 694 self._next_read_section() 695 696 def _next_read_section(self): 697 698 """ 699 Make the next index entry the current one without reading from the 700 index. 701 """ 702 703 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 704 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 705 706 def _init_section(self): 707 708 "Initialise the iterator for the section in the position file." 709 710 if self.iterator is not None: 711 self.iterator.close() 712 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 713 714 def close(self): 715 if self.iterator is not None: 716 self.iterator.close() 717 self.iterator = None 718 if self.index_iterator is not None: 719 self.index_iterator.close() 720 self.index_iterator = None 721 722 class TermWriter(FileWriter): 723 724 "Writing term information to files." 725 726 def reset(self): 727 self.last_term = "" 728 self.last_offset = 0 729 730 def write_term(self, term, offset, frequency, doc_frequency): 731 732 """ 733 Write the given 'term', its position file 'offset', its 'frequency' and 734 its 'doc_frequency' (number of documents in which it appears) to the 735 term information file. Return the offset after the term information was 736 written to the file. 737 """ 738 739 # Write the prefix length and term suffix. 740 741 common = len(commonprefix([self.last_term, term])) 742 suffix = term[common:] 743 744 self.write_number(common) 745 self.write_string(suffix) 746 747 # Write the offset delta. 748 749 self.write_number(offset - self.last_offset) 750 751 # Write the frequency. 752 753 self.write_number(frequency) 754 755 # Write the document frequency. 756 757 self.write_number(doc_frequency) 758 759 self.last_term = term 760 self.last_offset = offset 761 762 return self.f.tell() 763 764 class TermReader(FileReader): 765 766 "Reading term information from files." 767 768 def reset(self): 769 self.last_term = "" 770 self.last_offset = 0 771 772 def read_term(self): 773 774 """ 775 Read a term, its position file offset, its frequency and its document 776 frequency from the term information file. 777 """ 778 779 # Read the prefix length and term suffix. 780 781 common = self.read_number() 782 suffix = self.read_string() 783 784 self.last_term = self.last_term[:common] + suffix 785 786 # Read the offset delta. 787 788 self.last_offset += self.read_number() 789 790 # Read the frequency. 791 792 frequency = self.read_number() 793 794 # Read the document frequency. 795 796 doc_frequency = self.read_number() 797 798 return self.last_term, self.last_offset, frequency, doc_frequency 799 800 def go_to_term(self, term, offset, info_offset): 801 802 """ 803 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 804 permits the scanning for later terms from the specified term. 805 """ 806 807 self.f.seek(info_offset) 808 self.last_term = term 809 self.last_offset = offset 810 811 class TermIndexWriter(TermWriter): 812 813 "Writing term dictionary index details to files." 814 815 def reset(self): 816 TermWriter.reset(self) 817 self.last_info_offset = 0 818 819 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 820 821 """ 822 Write the given 'term', its position file 'offset', its 'frequency' and 823 its 'doc_frequency' to the term dictionary index file, along with the 824 'info_offset' in the term information file. 825 """ 826 827 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 828 829 # Write the information file offset delta. 830 831 self.write_number(info_offset - self.last_info_offset) 832 self.last_info_offset = info_offset 833 834 class TermIndexReader(TermReader): 835 836 "Reading term dictionary index details from files." 837 838 def reset(self): 839 TermReader.reset(self) 840 self.last_info_offset = 0 841 842 def read_term(self): 843 844 """ 845 Read a term, its position file offset, its frequency, its document 846 frequency and a term information file offset from the term dictionary 847 index file. 848 """ 849 850 term, offset, frequency, doc_frequency = TermReader.read_term(self) 851 852 # Read the offset delta. 853 854 self.last_info_offset += self.read_number() 855 856 return term, offset, frequency, doc_frequency, self.last_info_offset 857 858 class TermDictionaryWriter: 859 860 "Writing term dictionaries." 861 862 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 863 self.info_writer = info_writer 864 self.index_writer = index_writer 865 self.position_dict_writer = position_dict_writer 866 self.interval = interval 867 self.entry = 0 868 869 def _write_term(self, term, offset, frequency, doc_frequency): 870 871 """ 872 Write the given 'term', its position file 'offset', its 'frequency' and 873 its 'doc_frequency' (number of documents in which it appears) to the 874 term information file. Return the offset after the term information was 875 written to the file. 876 """ 877 878 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 879 880 if self.entry % self.interval == 0: 881 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 882 883 self.entry += 1 884 885 def write_term_positions(self, term, doc_positions): 886 887 """ 888 Write the given 'term' and the 'doc_positions' recording the documents 889 and positions at which the term is found. 890 """ 891 892 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 893 self._write_term(term, offset, frequency, doc_frequency) 894 895 def close(self): 896 self.info_writer.close() 897 self.index_writer.close() 898 self.position_dict_writer.close() 899 900 class TermDictionaryReader: 901 902 "Reading term dictionaries." 903 904 def __init__(self, info_reader, index_reader, position_dict_reader): 905 self.info_reader = info_reader 906 self.index_reader = index_reader 907 self.position_dict_reader = position_dict_reader 908 909 self.terms = [] 910 try: 911 while 1: 912 self.terms.append(self.index_reader.read_term()) 913 except EOFError: 914 pass 915 916 # Large numbers for ordering purposes. 917 918 if self.terms: 919 self.max_offset = self.terms[-1][1] + 1 920 else: 921 self.max_offset = None 922 923 def _find_closest_entry(self, term): 924 925 """ 926 Find the offsets and frequencies of 'term' from the term dictionary or 927 the closest term starting with the value of 'term'. 928 929 Return the closest index entry consisting of a term, the position file 930 offset, the term frequency, the document frequency, and the term details 931 file offset. 932 """ 933 934 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 935 936 # Get the entry position providing the term or one preceding it. 937 # If no entry precedes the requested term, return the very first entry 938 # as the closest. 939 940 if i == -1: 941 return self.terms[0] 942 else: 943 return self.terms[i] 944 945 def _find_closest_term(self, term): 946 947 """ 948 Find the offsets and frequencies of 'term' from the term dictionary or 949 the closest term starting with the value of 'term'. 950 951 Return the closest term (or the term itself), the position file offset, 952 the term frequency, the document frequency, and the term details file 953 offset (or None if the reader is already positioned). 954 """ 955 956 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 957 958 # Where the term is found immediately, return the offset and 959 # frequencies. If the term does not appear, return the details of the 960 # closest entry. 961 962 if term <= found_term: 963 return found_term, offset, frequency, doc_frequency, info_offset 964 965 # Otherwise, seek past the index term's entry in the information file 966 # and scan for the desired term. 967 968 else: 969 self.info_reader.go_to_term(found_term, offset, info_offset) 970 try: 971 while term > found_term: 972 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 973 except EOFError: 974 pass 975 976 return found_term, offset, frequency, doc_frequency, None 977 978 def _find_term(self, term): 979 980 """ 981 Find the position file offset and frequency of 'term' from the term 982 dictionary. 983 """ 984 985 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 986 987 # If the term is found, return the offset and frequencies. 988 989 if term == found_term: 990 return offset, frequency, doc_frequency 991 else: 992 return None 993 994 def _get_positions(self, offset, doc_frequency): 995 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 996 997 # Iterator convenience methods. 998 999 def __iter__(self): 1000 self.rewind() 1001 return self 1002 1003 def next(self): 1004 try: 1005 return self.read_term() 1006 except EOFError: 1007 raise StopIteration 1008 1009 # Sequential access methods. 1010 1011 def rewind(self): 1012 self.info_reader.rewind() 1013 1014 def read_term(self): 1015 1016 """ 1017 Return the next term, its frequency, its document frequency, and the 1018 documents and positions at which the term is found. 1019 """ 1020 1021 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1022 positions = self._get_positions(offset, doc_frequency) 1023 return term, frequency, doc_frequency, positions 1024 1025 # Query methods. 1026 1027 def find_terms(self, term): 1028 1029 "Return all terms whose values start with the value of 'term'." 1030 1031 terms = [] 1032 1033 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1034 1035 # Position the reader, if necessary. 1036 1037 if info_offset is not None: 1038 self.info_reader.go_to_term(found_term, offset, info_offset) 1039 1040 # Read and record terms. 1041 1042 try: 1043 # Add the found term if it starts with the specified term. 1044 1045 while found_term.startswith(term): 1046 terms.append(found_term) 1047 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1048 1049 except EOFError: 1050 pass 1051 1052 return terms 1053 1054 def find_positions(self, term): 1055 1056 "Return the documents and positions at which the given 'term' is found." 1057 1058 t = self._find_term(term) 1059 if t is None: 1060 return None 1061 else: 1062 offset, frequency, doc_frequency = t 1063 return self._get_positions(offset, doc_frequency) 1064 1065 def get_frequency(self, term): 1066 1067 "Return the frequency of the given 'term'." 1068 1069 t = self._find_term(term) 1070 if t is None: 1071 return None 1072 else: 1073 offset, frequency, doc_frequency = t 1074 return frequency 1075 1076 def get_document_frequency(self, term): 1077 1078 "Return the document frequency of the given 'term'." 1079 1080 t = self._find_term(term) 1081 if t is None: 1082 return None 1083 else: 1084 offset, frequency, doc_frequency = t 1085 return doc_frequency 1086 1087 def close(self): 1088 self.info_reader.close() 1089 self.index_reader.close() 1090 self.position_dict_reader.close() 1091 1092 # Specific classes for storing document information. 1093 1094 class FieldWriter(FileWriter): 1095 1096 "Writing field data to files." 1097 1098 def reset(self): 1099 self.last_docnum = 0 1100 1101 def write_fields(self, docnum, fields): 1102 1103 """ 1104 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1105 representing field identifiers and values respectively). 1106 Return the offset at which the fields are stored. 1107 """ 1108 1109 offset = self.f.tell() 1110 1111 # Write the document number delta. 1112 1113 self.write_number(docnum - self.last_docnum) 1114 1115 # Write the number of fields. 1116 1117 self.write_number(len(fields)) 1118 1119 # Write the fields themselves. 1120 1121 for i, field in fields: 1122 self.write_number(i) 1123 self.write_string(field, 1) # compress 1124 1125 self.last_docnum = docnum 1126 return offset 1127 1128 class FieldReader(FileReader): 1129 1130 "Reading field data from files." 1131 1132 def reset(self): 1133 self.last_docnum = 0 1134 1135 def read_fields(self): 1136 1137 """ 1138 Read fields from the file, returning a tuple containing the document 1139 number and a list of field (identifier, value) pairs. 1140 """ 1141 1142 # Read the document number. 1143 1144 self.last_docnum += self.read_number() 1145 1146 # Read the number of fields. 1147 1148 nfields = self.read_number() 1149 1150 # Collect the fields. 1151 1152 fields = [] 1153 i = 0 1154 1155 while i < nfields: 1156 identifier = self.read_number() 1157 value = self.read_string(1) # decompress 1158 fields.append((identifier, value)) 1159 i += 1 1160 1161 return self.last_docnum, fields 1162 1163 def read_document_fields(self, docnum, offset): 1164 1165 """ 1166 Read fields for 'docnum' at the given 'offset'. This permits the 1167 retrieval of details for the specified document, as well as scanning for 1168 later documents. 1169 """ 1170 1171 self.f.seek(offset) 1172 bad_docnum, fields = self.read_fields() 1173 self.last_docnum = docnum 1174 return docnum, fields 1175 1176 class FieldIndexWriter(FileWriter): 1177 1178 "Writing field index details to files." 1179 1180 def reset(self): 1181 self.last_docnum = 0 1182 self.last_offset = 0 1183 1184 def write_document(self, docnum, offset): 1185 1186 """ 1187 Write for the given 'docnum', the 'offset' at which the fields for the 1188 document are stored in the fields file. 1189 """ 1190 1191 # Write the document number and offset deltas. 1192 1193 self.write_number(docnum - self.last_docnum) 1194 self.write_number(offset - self.last_offset) 1195 1196 self.last_docnum = docnum 1197 self.last_offset = offset 1198 1199 class FieldIndexReader(FileReader): 1200 1201 "Reading field index details from files." 1202 1203 def reset(self): 1204 self.last_docnum = 0 1205 self.last_offset = 0 1206 1207 def read_document(self): 1208 1209 "Read a document number and field file offset." 1210 1211 # Read the document number delta and offset. 1212 1213 self.last_docnum += self.read_number() 1214 self.last_offset += self.read_number() 1215 1216 return self.last_docnum, self.last_offset 1217 1218 class FieldDictionaryWriter: 1219 1220 "Writing field dictionary details." 1221 1222 def __init__(self, field_writer, field_index_writer, interval): 1223 self.field_writer = field_writer 1224 self.field_index_writer = field_index_writer 1225 self.interval = interval 1226 self.entry = 0 1227 1228 def write_fields(self, docnum, fields): 1229 1230 "Write details of the document with the given 'docnum' and 'fields'." 1231 1232 offset = self.field_writer.write_fields(docnum, fields) 1233 1234 if self.entry % self.interval == 0: 1235 self.field_index_writer.write_document(docnum, offset) 1236 1237 self.entry += 1 1238 1239 def close(self): 1240 self.field_writer.close() 1241 self.field_index_writer.close() 1242 1243 class FieldDictionaryReader: 1244 1245 "Reading field dictionary details." 1246 1247 def __init__(self, field_reader, field_index_reader): 1248 self.field_reader = field_reader 1249 self.field_index_reader = field_index_reader 1250 1251 self.docs = [] 1252 try: 1253 while 1: 1254 self.docs.append(self.field_index_reader.read_document()) 1255 except EOFError: 1256 pass 1257 1258 # Large numbers for ordering purposes. 1259 1260 if self.docs: 1261 self.max_offset = self.docs[-1][1] 1262 else: 1263 self.max_offset = None 1264 1265 # Iterator convenience methods. 1266 1267 def __iter__(self): 1268 self.rewind() 1269 return self 1270 1271 def next(self): 1272 try: 1273 return self.read_fields() 1274 except EOFError: 1275 raise StopIteration 1276 1277 # Sequential access methods. 1278 1279 def rewind(self): 1280 self.field_reader.rewind() 1281 1282 def read_fields(self): 1283 1284 "Return the next document number and fields." 1285 1286 return self.field_reader.read_fields() 1287 1288 # Random access methods. 1289 1290 def get_fields(self, docnum): 1291 1292 "Read the fields of the document with the given 'docnum'." 1293 1294 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1295 1296 # Get the entry position providing the term or one preceding it. 1297 1298 if i == -1: 1299 return None 1300 1301 found_docnum, offset = self.docs[i] 1302 1303 # Read from the fields file. 1304 1305 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1306 1307 # Scan for the document, if necessary. 1308 1309 try: 1310 while docnum > found_docnum: 1311 found_docnum, fields = self.field_reader.read_fields() 1312 except EOFError: 1313 pass 1314 1315 # If the document is found, return the fields. 1316 1317 if docnum == found_docnum: 1318 return fields 1319 else: 1320 return None 1321 1322 def close(self): 1323 self.field_reader.close() 1324 self.field_index_reader.close() 1325 1326 # Dictionary merging classes. 1327 1328 class Merger: 1329 1330 "Merge files." 1331 1332 def __init__(self, writer, readers): 1333 self.writer = writer 1334 self.readers = readers 1335 1336 def close(self): 1337 for reader in self.readers: 1338 reader.close() 1339 self.writer.close() 1340 1341 class TermDictionaryMerger(Merger): 1342 1343 "Merge term and position files." 1344 1345 def merge(self): 1346 1347 """ 1348 Merge terms and positions from the readers, sending them to the writer. 1349 """ 1350 1351 last_term = None 1352 current_readers = [] 1353 1354 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1355 if term == last_term: 1356 current_readers.append(positions) 1357 else: 1358 if current_readers: 1359 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1360 last_term = term 1361 current_readers = [positions] 1362 else: 1363 if current_readers: 1364 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1365 1366 class FieldDictionaryMerger(Merger): 1367 1368 "Merge field files." 1369 1370 def merge(self): 1371 1372 """ 1373 Merge fields from the readers, sending them to the writer. 1374 """ 1375 1376 for docnum, fields in itermerge(self.readers): 1377 self.writer.write_fields(docnum, fields) 1378 1379 # Utility functions. 1380 1381 def get_term_writer(pathname, partition, interval, doc_interval): 1382 1383 """ 1384 Return a term dictionary writer using files under the given 'pathname' 1385 labelled according to the given 'partition', using the given indexing 1386 'interval' for terms and 'doc_interval' for document position records. 1387 """ 1388 1389 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1390 info_writer = TermWriter(tdf) 1391 1392 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1393 index_writer = TermIndexWriter(tdif) 1394 1395 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1396 positions_writer = PositionWriter(tpf) 1397 1398 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1399 positions_index_writer = PositionIndexWriter(tpif) 1400 1401 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1402 1403 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1404 1405 def get_field_writer(pathname, partition, interval): 1406 1407 """ 1408 Return a field dictionary writer using files under the given 'pathname' 1409 labelled according to the given 'partition', using the given indexing 1410 'interval'. 1411 """ 1412 1413 ff = open(join(pathname, "fields-%s" % partition), "wb") 1414 field_writer = FieldWriter(ff) 1415 1416 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1417 field_index_writer = FieldIndexWriter(fif) 1418 1419 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1420 1421 def get_term_reader(pathname, partition): 1422 1423 """ 1424 Return a term dictionary reader using files under the given 'pathname' 1425 labelled according to the given 'partition'. 1426 """ 1427 1428 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1429 info_reader = TermReader(tdf) 1430 1431 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1432 index_reader = TermIndexReader(tdif) 1433 1434 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1435 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1436 1437 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1438 1439 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1440 1441 def get_field_reader(pathname, partition): 1442 1443 """ 1444 Return a field dictionary reader using files under the given 'pathname' 1445 labelled according to the given 'partition'. 1446 """ 1447 1448 ff = open(join(pathname, "fields-%s" % partition), "rb") 1449 field_reader = FieldReader(ff) 1450 1451 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1452 field_index_reader = FieldIndexReader(fif) 1453 1454 return FieldDictionaryReader(field_reader, field_index_reader) 1455 1456 def rename_files(pathname, names, from_partition, to_partition): 1457 for name in names: 1458 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1459 1460 def rename_term_files(pathname, from_partition, to_partition): 1461 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1462 1463 def rename_field_files(pathname, from_partition, to_partition): 1464 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1465 1466 def remove_files(pathname, names, partition): 1467 for name in names: 1468 remove(join(pathname, "%s-%s" % (name, partition))) 1469 1470 def remove_term_files(pathname, partition): 1471 remove_files(pathname, TERM_FILENAMES, partition) 1472 1473 def remove_field_files(pathname, partition): 1474 remove_files(pathname, FIELD_FILENAMES, partition) 1475 1476 # High-level classes. 1477 1478 class Document: 1479 1480 "A container of document information." 1481 1482 def __init__(self, docnum): 1483 self.docnum = docnum 1484 self.fields = [] 1485 self.terms = {} 1486 1487 def add_position(self, term, position): 1488 1489 """ 1490 Add a position entry for the given 'term', indicating the given 1491 'position'. 1492 """ 1493 1494 self.terms.setdefault(term, []).append(position) 1495 1496 def add_field(self, identifier, value): 1497 1498 "Add a field having the given 'identifier' and 'value'." 1499 1500 self.fields.append((identifier, unicode(value))) # convert to string 1501 1502 def set_fields(self, fields): 1503 1504 """ 1505 Set the document's 'fields': a list of tuples each containing an integer 1506 identifier and a string value. 1507 """ 1508 1509 self.fields = fields 1510 1511 class IndexWriter: 1512 1513 """ 1514 Building term information and writing it to the term and field dictionaries. 1515 """ 1516 1517 def __init__(self, pathname, interval, doc_interval, flush_interval): 1518 self.pathname = pathname 1519 self.interval = interval 1520 self.doc_interval = doc_interval 1521 self.flush_interval = flush_interval 1522 1523 self.dict_partition = 0 1524 self.field_dict_partition = 0 1525 1526 self.terms = {} 1527 self.docs = {} 1528 1529 self.doc_counter = 0 1530 1531 def add_document(self, doc): 1532 1533 """ 1534 Add the given document 'doc', updating the document counter and flushing 1535 terms and fields if appropriate. 1536 """ 1537 1538 for term, positions in doc.terms.items(): 1539 self.terms.setdefault(term, {})[doc.docnum] = positions 1540 1541 self.docs[doc.docnum] = doc.fields 1542 1543 self.doc_counter += 1 1544 if self.flush_interval and self.doc_counter >= self.flush_interval: 1545 self.flush_terms() 1546 self.flush_fields() 1547 self.doc_counter = 0 1548 1549 def get_term_writer(self): 1550 1551 "Return a term dictionary writer for the current partition." 1552 1553 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1554 1555 def get_field_writer(self): 1556 1557 "Return a field dictionary writer for the current partition." 1558 1559 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1560 1561 def flush_terms(self): 1562 1563 "Flush terms into the current term dictionary partition." 1564 1565 # Get the terms in order. 1566 1567 all_terms = self.terms 1568 terms = all_terms.keys() 1569 terms.sort() 1570 1571 dict_writer = self.get_term_writer() 1572 1573 for term in terms: 1574 doc_positions = all_terms[term].items() 1575 dict_writer.write_term_positions(term, doc_positions) 1576 1577 dict_writer.close() 1578 1579 self.terms = {} 1580 self.dict_partition += 1 1581 1582 def flush_fields(self): 1583 1584 "Flush fields into the current term dictionary partition." 1585 1586 # Get the documents in order. 1587 1588 docs = self.docs.items() 1589 docs.sort() 1590 1591 field_dict_writer = self.get_field_writer() 1592 1593 for docnum, fields in docs: 1594 field_dict_writer.write_fields(docnum, fields) 1595 1596 field_dict_writer.close() 1597 1598 self.docs = {} 1599 self.field_dict_partition += 1 1600 1601 def close(self): 1602 if self.terms: 1603 self.flush_terms() 1604 if self.docs: 1605 self.flush_fields() 1606 1607 class IndexReader: 1608 1609 "Accessing the term and field dictionaries." 1610 1611 def __init__(self, pathname): 1612 self.dict_reader = get_term_reader(pathname, "merged") 1613 self.field_dict_reader = get_field_reader(pathname, "merged") 1614 1615 def find_terms(self, term): 1616 return self.dict_reader.find_terms(term) 1617 1618 def find_positions(self, term): 1619 return self.dict_reader.find_positions(term) 1620 1621 def get_frequency(self, term): 1622 return self.dict_reader.get_frequency(term) 1623 1624 def get_document_frequency(self, term): 1625 return self.dict_reader.get_document_frequency(term) 1626 1627 def get_fields(self, docnum): 1628 return self.field_dict_reader.get_fields(docnum) 1629 1630 def close(self): 1631 self.dict_reader.close() 1632 self.field_dict_reader.close() 1633 1634 class Index: 1635 1636 "An inverted index solution encapsulating the various components." 1637 1638 def __init__(self, pathname): 1639 self.pathname = pathname 1640 self.reader = None 1641 self.writer = None 1642 1643 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1644 1645 """ 1646 Return a writer, optionally using the given indexing 'interval', 1647 'doc_interval' and 'flush_interval'. 1648 """ 1649 1650 if not exists(self.pathname): 1651 mkdir(self.pathname) 1652 1653 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1654 return self.writer 1655 1656 def get_reader(self, partition=0): 1657 1658 "Return a reader for the index." 1659 1660 # Ensure that only one partition exists. 1661 1662 self.merge() 1663 return self._get_reader(partition) 1664 1665 def _get_reader(self, partition): 1666 1667 "Return a reader for the index." 1668 1669 if not exists(self.pathname): 1670 raise OSError, "Index path %r does not exist." % self.pathname 1671 1672 self.reader = IndexReader(self.pathname) 1673 return self.reader 1674 1675 def merge(self): 1676 1677 "Merge/optimise index partitions." 1678 1679 self.merge_terms() 1680 self.merge_fields() 1681 1682 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1683 1684 """ 1685 Merge term dictionaries using the given indexing 'interval' and 1686 'doc_interval'. 1687 """ 1688 1689 readers = [] 1690 partitions = set() 1691 1692 for filename in listdir(self.pathname): 1693 if filename.startswith("terms-"): # 6 character prefix 1694 partition = filename[6:] 1695 readers.append(get_term_reader(self.pathname, partition)) 1696 partitions.add(partition) 1697 1698 # Write directly to a dictionary. 1699 1700 if len(readers) > 1: 1701 if "merged" in partitions: 1702 rename_term_files(self.pathname, "merged", "old-merged") 1703 partitions.remove("merged") 1704 partitions.add("old-merged") 1705 1706 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1707 merger = TermDictionaryMerger(writer, readers) 1708 merger.merge() 1709 merger.close() 1710 1711 # Remove old files. 1712 1713 for partition in partitions: 1714 remove_term_files(self.pathname, partition) 1715 1716 elif len(readers) == 1: 1717 partition = list(partitions)[0] 1718 if partition != "merged": 1719 rename_term_files(self.pathname, partition, "merged") 1720 1721 def merge_fields(self, interval=FIELD_INTERVAL): 1722 1723 "Merge field dictionaries using the given indexing 'interval'." 1724 1725 readers = [] 1726 partitions = set() 1727 1728 for filename in listdir(self.pathname): 1729 if filename.startswith("fields-"): # 7 character prefix 1730 partition = filename[7:] 1731 readers.append(get_field_reader(self.pathname, partition)) 1732 partitions.add(partition) 1733 1734 # Write directly to a dictionary. 1735 1736 if len(readers) > 1: 1737 if "merged" in partitions: 1738 rename_field_files(self.pathname, "merged", "old-merged") 1739 partitions.remove("merged") 1740 partitions.add("old-merged") 1741 1742 writer = get_field_writer(self.pathname, "merged", interval) 1743 merger = FieldDictionaryMerger(writer, readers) 1744 merger.merge() 1745 merger.close() 1746 1747 # Remove old files. 1748 1749 for partition in partitions: 1750 remove_field_files(self.pathname, partition) 1751 1752 elif len(readers) == 1: 1753 partition = list(partitions)[0] 1754 if partition != "merged": 1755 rename_field_files(self.pathname, partition, "merged") 1756 1757 def close(self): 1758 if self.reader is not None: 1759 self.reader.close() 1760 self.reader = None 1761 if self.writer is not None: 1762 self.writer.close() 1763 self.writer = None 1764 1765 # vim: tabstop=4 expandtab shiftwidth=4