1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 def vint(number): 51 52 # Negative numbers are not supported. 53 54 if number < 0: 55 raise ValueError, "Number %r is negative." % number 56 57 # Special case: one byte containing a 7-bit number. 58 59 elif number < 128: 60 return chr(number) 61 62 # Write the number from least to most significant digits. 63 64 bytes = [] 65 66 while number != 0: 67 lsd = number & 127 68 number = number >> 7 69 if number != 0: 70 lsd |= 128 71 bytes.append(chr(lsd)) 72 73 return "".join(bytes) 74 75 # Foundation classes. 76 77 class File: 78 79 "A basic file abstraction." 80 81 def __init__(self, f): 82 self.f = f 83 self.reset() 84 85 def reset(self): 86 87 "To be used to reset the state of the reader or writer between records." 88 89 pass 90 91 def rewind(self): 92 self.f.seek(0) 93 self.reset() 94 95 def close(self): 96 if self.f is not None: 97 self.f.close() 98 self.f = None 99 100 class FileWriter(File): 101 102 "Writing basic data types to files." 103 104 def write_number(self, number): 105 106 "Write 'number' to the file using a variable length encoding." 107 108 self.f.write(vint(number)) 109 110 def write_string(self, s, compress=0): 111 112 """ 113 Write 's' to the file, recording its length and compressing the string 114 if 'compress' is set to a true value. 115 """ 116 117 # Convert Unicode objects to strings. 118 119 if isinstance(s, unicode): 120 s = s.encode("utf-8") 121 122 # Compress the string if requested. 123 124 if compress: 125 for flag, fn in compressors: 126 cs = fn(s) 127 128 # Take the first string shorter than the original. 129 130 if len(cs) < len(s): 131 s = cs 132 break 133 else: 134 flag = "-" 135 136 else: 137 flag = "" 138 139 # Write the length of the data before the data itself. 140 141 length = len(s) 142 self.f.write(flag + vint(length) + s) 143 144 class FileReader(File): 145 146 "Reading basic data types from files." 147 148 def read_number(self): 149 150 "Read a number from the file." 151 152 # Read each byte, adding it to the number. 153 154 shift = 0 155 number = 0 156 read = self.f.read 157 158 try: 159 csd = ord(read(1)) 160 while csd & 128: 161 number += ((csd & 127) << shift) 162 shift += 7 163 csd = ord(read(1)) 164 else: 165 number += (csd << shift) 166 except TypeError: 167 raise EOFError 168 169 return number 170 171 def read_string(self, decompress=0): 172 173 """ 174 Read a string from the file, decompressing the stored data if 175 'decompress' is set to a true value. 176 """ 177 178 # Decompress the data if requested. 179 180 if decompress: 181 flag = self.f.read(1) 182 else: 183 flag = "-" 184 185 length = self.read_number() 186 s = self.f.read(length) 187 188 # Perform decompression if applicable. 189 190 if flag != "-": 191 fn = decompressors[flag] 192 s = fn(s) 193 194 # Convert strings to Unicode objects. 195 196 return unicode(s, "utf-8") 197 198 class FileOpener: 199 200 "Opening files using their filenames." 201 202 def __init__(self, filename): 203 self.filename = filename 204 205 def open(self, mode): 206 return open(self.filename, mode) 207 208 def close(self): 209 pass 210 211 # Specific classes for storing term and position information. 212 213 class PositionWriter(FileWriter): 214 215 "Writing position information to files." 216 217 def reset(self): 218 self.last_docnum = 0 219 220 def write_positions(self, docnum, positions): 221 222 """ 223 Write for the document 'docnum' the given 'positions'. 224 Return the offset of the written record. 225 """ 226 227 if docnum < self.last_docnum: 228 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 229 230 # Record the offset of this record. 231 232 offset = self.f.tell() 233 output = [] 234 235 # Write the document number delta. 236 237 output.append(vint(docnum - self.last_docnum)) 238 239 # Write the number of positions. 240 241 output.append(vint(len(positions))) 242 243 # Make sure that the positions are sorted. 244 245 positions.sort() 246 247 # Write the position deltas. 248 249 last = 0 250 for position in positions: 251 pos = position - last 252 output.append(vint(pos)) 253 last = position 254 255 # Actually write the data. 256 257 self.f.write("".join(output)) 258 259 self.last_docnum = docnum 260 return offset 261 262 class PositionOpener(FileOpener): 263 264 "Reading position information from files." 265 266 def read_term_positions(self, offset, count): 267 268 """ 269 Read all positions from 'offset', seeking to that position in the file 270 before reading. The number of documents available for reading is limited 271 to 'count'. 272 """ 273 274 # Duplicate the file handle. 275 276 f = self.open("rb") 277 f.seek(offset) 278 return PositionIterator(f, count) 279 280 class PositionIndexWriter(FileWriter): 281 282 "Writing position index information to files." 283 284 def reset(self): 285 self.last_docnum = 0 286 self.last_pos_offset = 0 287 288 def write_positions(self, docnum, pos_offset, count): 289 290 """ 291 Write the given 'docnum, 'pos_offset' and document 'count' to the 292 position index file. 293 """ 294 295 # Record the offset of this record. 296 297 offset = self.f.tell() 298 output = [] 299 300 # Write the document number delta. 301 302 output.append(vint(docnum - self.last_docnum)) 303 self.last_docnum = docnum 304 305 # Write the position file offset delta. 306 307 output.append(vint(pos_offset - self.last_pos_offset)) 308 self.last_pos_offset = pos_offset 309 310 # Write the document count. 311 312 output.append(vint(count)) 313 314 # Actually write the data. 315 316 self.f.write("".join(output)) 317 318 return offset 319 320 class PositionIndexOpener(FileOpener): 321 322 "Reading position index information from files." 323 324 def read_term_positions(self, offset, doc_frequency): 325 326 """ 327 Read all positions from 'offset', seeking to that position in the file 328 before reading. The number of documents available for reading is limited 329 to 'doc_frequency'. 330 """ 331 332 # Duplicate the file handle. 333 334 f = self.open("rb") 335 f.seek(offset) 336 return PositionIndexIterator(f, doc_frequency) 337 338 # Iterators for position-related files. 339 340 class IteratorBase: 341 342 def __init__(self, count): 343 self.replenish(count) 344 345 def replenish(self, count): 346 self.count = count 347 self.read_documents = 0 348 349 def __len__(self): 350 return self.count 351 352 def sort(self): 353 pass # Stored document positions are already sorted. 354 355 def __iter__(self): 356 return self 357 358 class PositionIterator(FileReader, IteratorBase): 359 360 "Iterating over document positions." 361 362 def __init__(self, f, count): 363 FileReader.__init__(self, f) 364 IteratorBase.__init__(self, count) 365 366 def reset(self): 367 self.last_docnum = 0 368 369 def read_positions(self): 370 371 "Read positions, returning a document number and a list of positions." 372 373 # Read the document number delta and add it to the last number. 374 375 self.last_docnum += self.read_number() 376 377 # Read the number of positions. 378 379 npositions = self.read_number() 380 381 # Read the position deltas, adding each previous position to get the 382 # appropriate collection of absolute positions. 383 384 i = 0 385 last = 0 386 positions = [] 387 388 while i < npositions: 389 last += self.read_number() 390 positions.append(last) 391 i += 1 392 393 return self.last_docnum, positions 394 395 def next(self): 396 397 "Read positions for a single document." 398 399 if self.read_documents < self.count: 400 self.read_documents += 1 401 return self.read_positions() 402 else: 403 raise StopIteration 404 405 class PositionIndexIterator(FileReader, IteratorBase): 406 407 "Iterating over document positions." 408 409 def __init__(self, f, count): 410 FileReader.__init__(self, f) 411 IteratorBase.__init__(self, count) 412 self.section_count = 0 413 414 def reset(self): 415 self.last_docnum = 0 416 self.last_pos_offset = 0 417 418 def read_positions(self): 419 420 """ 421 Read a document number, a position file offset for the position index 422 file, and the number of documents in a section of that file. 423 """ 424 425 # Read the document number delta. 426 427 self.last_docnum += self.read_number() 428 429 # Read the offset delta. 430 431 self.last_pos_offset += self.read_number() 432 433 # Read the document count. 434 435 count = self.read_number() 436 437 return self.last_docnum, self.last_pos_offset, count 438 439 def next(self): 440 441 "Read positions for a single document." 442 443 self.read_documents += self.section_count 444 if self.read_documents < self.count: 445 docnum, pos_offset, self.section_count = t = self.read_positions() 446 return t 447 else: 448 raise StopIteration 449 450 class PositionDictionaryWriter: 451 452 "Writing position dictionaries." 453 454 def __init__(self, position_writer, position_index_writer, interval): 455 self.position_writer = position_writer 456 self.position_index_writer = position_index_writer 457 self.interval = interval 458 459 def write_term_positions(self, doc_positions): 460 461 """ 462 Write all 'doc_positions' - a collection of tuples of the form (document 463 number, position list) - to the file. 464 465 Add some records to the index, making dictionary entries. 466 467 Return a tuple containing the offset of the written data, the frequency 468 (number of positions), and document frequency (number of documents) for 469 the term involved. 470 """ 471 472 # Reset the writers. 473 474 self.position_writer.reset() 475 self.position_index_writer.reset() 476 477 index_offset = None 478 479 # Write the positions. 480 481 frequency = 0 482 first_docnum = None 483 first_offset = None 484 count = 0 485 486 doc_positions.sort() 487 488 for docnum, positions in doc_positions: 489 pos_offset = self.position_writer.write_positions(docnum, positions) 490 491 # Retain the first record offset for a subsequent index entry. 492 493 if first_offset is None: 494 first_offset = pos_offset 495 first_docnum = docnum 496 497 frequency += len(positions) 498 count += 1 499 500 # Every {interval} entries, write an index entry. 501 502 if count % self.interval == 0: 503 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 504 505 # Remember the first index entry offset. 506 507 if index_offset is None: 508 index_offset = io 509 510 first_offset = None 511 first_docnum = None 512 513 # Reset the position writer so that position readers accessing 514 # a section start with the correct document number. 515 516 self.position_writer.reset() 517 518 # Finish writing an index entry for the remaining documents. 519 520 else: 521 if first_offset is not None: 522 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 523 524 # Remember the first index entry offset. 525 526 if index_offset is None: 527 index_offset = io 528 529 return index_offset, frequency, count 530 531 def close(self): 532 self.position_writer.close() 533 self.position_index_writer.close() 534 535 class PositionDictionaryReader: 536 537 "Reading position dictionaries." 538 539 def __init__(self, position_opener, position_index_opener): 540 self.position_opener = position_opener 541 self.position_index_opener = position_index_opener 542 543 def read_term_positions(self, offset, doc_frequency): 544 545 """ 546 Return an iterator for dictionary entries starting at 'offset' with the 547 given 'doc_frequency'. 548 """ 549 550 return PositionDictionaryIterator(self.position_opener, 551 self.position_index_opener, offset, doc_frequency) 552 553 def close(self): 554 pass 555 556 class PositionDictionaryIterator: 557 558 "Iteration over position dictionary entries." 559 560 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 561 self.position_opener = position_opener 562 self.doc_frequency = doc_frequency 563 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 564 self.iterator = None 565 566 # Remember the last values. 567 568 self.found_docnum, self.found_positions = None, None 569 570 # Maintain state for the next index entry, if read. 571 572 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 573 574 # Initialise the current index entry and current position file iterator. 575 576 self._next_section() 577 self._init_section() 578 579 # Sequence methods. 580 581 def __len__(self): 582 return self.doc_frequency 583 584 def sort(self): 585 pass 586 587 # Iterator methods. 588 589 def __iter__(self): 590 return self 591 592 def next(self): 593 594 """ 595 Attempt to get the next document record from the section in the 596 positions file. 597 """ 598 599 # Return any visited but unrequested record. 600 601 if self.found_docnum is not None: 602 t = self.found_docnum, self.found_positions 603 self.found_docnum, self.found_positions = None, None 604 return t 605 606 # Or search for the next record. 607 608 while 1: 609 610 # Either return the next record. 611 612 try: 613 return self.iterator.next() 614 615 # Or, where a section is finished, get the next section and try again. 616 617 except StopIteration: 618 619 # Where a section follows, update the index iterator, but keep 620 # reading using the same file iterator (since the data should 621 # just follow on from the last section). 622 623 self._next_section() 624 self.iterator.replenish(self.section_count) 625 626 # Reset the state of the iterator to make sure that document 627 # numbers are correct. 628 629 self.iterator.reset() 630 631 def from_document(self, docnum): 632 633 """ 634 Attempt to navigate to a positions entry for the given 'docnum', 635 returning the positions for 'docnum', or None otherwise. 636 """ 637 638 # Return any unrequested document positions. 639 640 if docnum == self.found_docnum: 641 return self.found_positions 642 643 # Read ahead in the index until the next entry refers to a document 644 # later than the desired document. 645 646 try: 647 if self.next_docnum is None: 648 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 649 650 # Read until the next entry is after the desired document number, 651 # or until the end of the results. 652 653 while self.next_docnum <= docnum: 654 self._next_read_section() 655 if self.docnum < docnum: 656 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 657 else: 658 break 659 660 except StopIteration: 661 pass 662 663 # Navigate in the position file to the document. 664 665 self._init_section() 666 667 try: 668 while 1: 669 found_docnum, found_positions = self.iterator.next() 670 671 # Return the desired document positions or None (retaining the 672 # positions for the document immediately after). 673 674 if docnum == found_docnum: 675 return found_positions 676 elif docnum < found_docnum: 677 self.found_docnum, self.found_positions = found_docnum, found_positions 678 return None 679 680 except StopIteration: 681 return None 682 683 # Internal methods. 684 685 def _next_section(self): 686 687 "Attempt to get the next section in the index." 688 689 if self.next_docnum is None: 690 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 691 else: 692 self._next_read_section() 693 694 def _next_read_section(self): 695 696 """ 697 Make the next index entry the current one without reading from the 698 index. 699 """ 700 701 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 702 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 703 704 def _init_section(self): 705 706 "Initialise the iterator for the section in the position file." 707 708 if self.iterator is not None: 709 self.iterator.close() 710 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 711 712 def close(self): 713 if self.iterator is not None: 714 self.iterator.close() 715 self.iterator = None 716 if self.index_iterator is not None: 717 self.index_iterator.close() 718 self.index_iterator = None 719 720 class TermWriter(FileWriter): 721 722 "Writing term information to files." 723 724 def reset(self): 725 self.last_term = "" 726 self.last_offset = 0 727 728 def write_term(self, term, offset, frequency, doc_frequency): 729 730 """ 731 Write the given 'term', its position file 'offset', its 'frequency' and 732 its 'doc_frequency' (number of documents in which it appears) to the 733 term information file. Return the offset after the term information was 734 written to the file. 735 """ 736 737 # Write the prefix length and term suffix. 738 739 common = len(commonprefix([self.last_term, term])) 740 suffix = term[common:] 741 742 self.write_number(common) 743 self.write_string(suffix) 744 745 # Write the offset delta. 746 747 self.write_number(offset - self.last_offset) 748 749 # Write the frequency. 750 751 self.write_number(frequency) 752 753 # Write the document frequency. 754 755 self.write_number(doc_frequency) 756 757 self.last_term = term 758 self.last_offset = offset 759 760 return self.f.tell() 761 762 class TermReader(FileReader): 763 764 "Reading term information from files." 765 766 def reset(self): 767 self.last_term = "" 768 self.last_offset = 0 769 770 def read_term(self): 771 772 """ 773 Read a term, its position file offset, its frequency and its document 774 frequency from the term information file. 775 """ 776 777 # Read the prefix length and term suffix. 778 779 common = self.read_number() 780 suffix = self.read_string() 781 782 self.last_term = self.last_term[:common] + suffix 783 784 # Read the offset delta. 785 786 self.last_offset += self.read_number() 787 788 # Read the frequency. 789 790 frequency = self.read_number() 791 792 # Read the document frequency. 793 794 doc_frequency = self.read_number() 795 796 return self.last_term, self.last_offset, frequency, doc_frequency 797 798 def go_to_term(self, term, offset, info_offset): 799 800 """ 801 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 802 permits the scanning for later terms from the specified term. 803 """ 804 805 self.f.seek(info_offset) 806 self.last_term = term 807 self.last_offset = offset 808 809 class TermIndexWriter(TermWriter): 810 811 "Writing term dictionary index details to files." 812 813 def reset(self): 814 TermWriter.reset(self) 815 self.last_info_offset = 0 816 817 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 818 819 """ 820 Write the given 'term', its position file 'offset', its 'frequency' and 821 its 'doc_frequency' to the term dictionary index file, along with the 822 'info_offset' in the term information file. 823 """ 824 825 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 826 827 # Write the information file offset delta. 828 829 self.write_number(info_offset - self.last_info_offset) 830 self.last_info_offset = info_offset 831 832 class TermIndexReader(TermReader): 833 834 "Reading term dictionary index details from files." 835 836 def reset(self): 837 TermReader.reset(self) 838 self.last_info_offset = 0 839 840 def read_term(self): 841 842 """ 843 Read a term, its position file offset, its frequency, its document 844 frequency and a term information file offset from the term dictionary 845 index file. 846 """ 847 848 term, offset, frequency, doc_frequency = TermReader.read_term(self) 849 850 # Read the offset delta. 851 852 self.last_info_offset += self.read_number() 853 854 return term, offset, frequency, doc_frequency, self.last_info_offset 855 856 class TermDictionaryWriter: 857 858 "Writing term dictionaries." 859 860 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 861 self.info_writer = info_writer 862 self.index_writer = index_writer 863 self.position_dict_writer = position_dict_writer 864 self.interval = interval 865 self.entry = 0 866 867 def _write_term(self, term, offset, frequency, doc_frequency): 868 869 """ 870 Write the given 'term', its position file 'offset', its 'frequency' and 871 its 'doc_frequency' (number of documents in which it appears) to the 872 term information file. Return the offset after the term information was 873 written to the file. 874 """ 875 876 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 877 878 if self.entry % self.interval == 0: 879 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 880 881 self.entry += 1 882 883 def write_term_positions(self, term, doc_positions): 884 885 """ 886 Write the given 'term' and the 'doc_positions' recording the documents 887 and positions at which the term is found. 888 """ 889 890 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 891 self._write_term(term, offset, frequency, doc_frequency) 892 893 def close(self): 894 self.info_writer.close() 895 self.index_writer.close() 896 self.position_dict_writer.close() 897 898 class TermDictionaryReader: 899 900 "Reading term dictionaries." 901 902 def __init__(self, info_reader, index_reader, position_dict_reader): 903 self.info_reader = info_reader 904 self.index_reader = index_reader 905 self.position_dict_reader = position_dict_reader 906 907 self.terms = [] 908 try: 909 while 1: 910 self.terms.append(self.index_reader.read_term()) 911 except EOFError: 912 pass 913 914 # Large numbers for ordering purposes. 915 916 if self.terms: 917 self.max_offset = self.terms[-1][1] + 1 918 else: 919 self.max_offset = None 920 921 def _find_closest_entry(self, term): 922 923 """ 924 Find the offsets and frequencies of 'term' from the term dictionary or 925 the closest term starting with the value of 'term'. 926 927 Return the closest index entry consisting of a term, the position file 928 offset, the term frequency, the document frequency, and the term details 929 file offset. 930 """ 931 932 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 933 934 # Get the entry position providing the term or one preceding it. 935 # If no entry precedes the requested term, return the very first entry 936 # as the closest. 937 938 if i == -1: 939 return self.terms[0] 940 else: 941 return self.terms[i] 942 943 def _find_closest_term(self, term): 944 945 """ 946 Find the offsets and frequencies of 'term' from the term dictionary or 947 the closest term starting with the value of 'term'. 948 949 Return the closest term (or the term itself), the position file offset, 950 the term frequency, the document frequency, and the term details file 951 offset (or None if the reader is already positioned). 952 """ 953 954 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 955 956 # Where the term is found immediately, return the offset and 957 # frequencies. If the term does not appear, return the details of the 958 # closest entry. 959 960 if term <= found_term: 961 return found_term, offset, frequency, doc_frequency, info_offset 962 963 # Otherwise, seek past the index term's entry in the information file 964 # and scan for the desired term. 965 966 else: 967 self.info_reader.go_to_term(found_term, offset, info_offset) 968 try: 969 while term > found_term: 970 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 971 except EOFError: 972 pass 973 974 return found_term, offset, frequency, doc_frequency, None 975 976 def _find_term(self, term): 977 978 """ 979 Find the position file offset and frequency of 'term' from the term 980 dictionary. 981 """ 982 983 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 984 985 # If the term is found, return the offset and frequencies. 986 987 if term == found_term: 988 return offset, frequency, doc_frequency 989 else: 990 return None 991 992 def _get_positions(self, offset, doc_frequency): 993 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 994 995 # Iterator convenience methods. 996 997 def __iter__(self): 998 self.rewind() 999 return self 1000 1001 def next(self): 1002 try: 1003 return self.read_term() 1004 except EOFError: 1005 raise StopIteration 1006 1007 # Sequential access methods. 1008 1009 def rewind(self): 1010 self.info_reader.rewind() 1011 1012 def read_term(self): 1013 1014 """ 1015 Return the next term, its frequency, its document frequency, and the 1016 documents and positions at which the term is found. 1017 """ 1018 1019 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1020 positions = self._get_positions(offset, doc_frequency) 1021 return term, frequency, doc_frequency, positions 1022 1023 # Query methods. 1024 1025 def find_terms(self, term): 1026 1027 "Return all terms whose values start with the value of 'term'." 1028 1029 terms = [] 1030 1031 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1032 1033 # Position the reader, if necessary. 1034 1035 if info_offset is not None: 1036 self.info_reader.go_to_term(found_term, offset, info_offset) 1037 1038 # Read and record terms. 1039 1040 try: 1041 # Add the found term if it starts with the specified term. 1042 1043 while found_term.startswith(term): 1044 terms.append(found_term) 1045 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1046 1047 except EOFError: 1048 pass 1049 1050 return terms 1051 1052 def find_positions(self, term): 1053 1054 "Return the documents and positions at which the given 'term' is found." 1055 1056 t = self._find_term(term) 1057 if t is None: 1058 return None 1059 else: 1060 offset, frequency, doc_frequency = t 1061 return self._get_positions(offset, doc_frequency) 1062 1063 def get_frequency(self, term): 1064 1065 "Return the frequency of the given 'term'." 1066 1067 t = self._find_term(term) 1068 if t is None: 1069 return None 1070 else: 1071 offset, frequency, doc_frequency = t 1072 return frequency 1073 1074 def get_document_frequency(self, term): 1075 1076 "Return the document frequency of the given 'term'." 1077 1078 t = self._find_term(term) 1079 if t is None: 1080 return None 1081 else: 1082 offset, frequency, doc_frequency = t 1083 return doc_frequency 1084 1085 def close(self): 1086 self.info_reader.close() 1087 self.index_reader.close() 1088 self.position_dict_reader.close() 1089 1090 # Specific classes for storing document information. 1091 1092 class FieldWriter(FileWriter): 1093 1094 "Writing field data to files." 1095 1096 def reset(self): 1097 self.last_docnum = 0 1098 1099 def write_fields(self, docnum, fields): 1100 1101 """ 1102 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1103 representing field identifiers and values respectively). 1104 Return the offset at which the fields are stored. 1105 """ 1106 1107 offset = self.f.tell() 1108 1109 # Write the document number delta. 1110 1111 self.write_number(docnum - self.last_docnum) 1112 1113 # Write the number of fields. 1114 1115 self.write_number(len(fields)) 1116 1117 # Write the fields themselves. 1118 1119 for i, field in fields: 1120 self.write_number(i) 1121 self.write_string(field, 1) # compress 1122 1123 self.last_docnum = docnum 1124 return offset 1125 1126 class FieldReader(FileReader): 1127 1128 "Reading field data from files." 1129 1130 def reset(self): 1131 self.last_docnum = 0 1132 1133 def read_fields(self): 1134 1135 """ 1136 Read fields from the file, returning a tuple containing the document 1137 number and a list of field (identifier, value) pairs. 1138 """ 1139 1140 # Read the document number. 1141 1142 self.last_docnum += self.read_number() 1143 1144 # Read the number of fields. 1145 1146 nfields = self.read_number() 1147 1148 # Collect the fields. 1149 1150 fields = [] 1151 i = 0 1152 1153 while i < nfields: 1154 identifier = self.read_number() 1155 value = self.read_string(1) # decompress 1156 fields.append((identifier, value)) 1157 i += 1 1158 1159 return self.last_docnum, fields 1160 1161 def read_document_fields(self, docnum, offset): 1162 1163 """ 1164 Read fields for 'docnum' at the given 'offset'. This permits the 1165 retrieval of details for the specified document, as well as scanning for 1166 later documents. 1167 """ 1168 1169 self.f.seek(offset) 1170 bad_docnum, fields = self.read_fields() 1171 self.last_docnum = docnum 1172 return docnum, fields 1173 1174 class FieldIndexWriter(FileWriter): 1175 1176 "Writing field index details to files." 1177 1178 def reset(self): 1179 self.last_docnum = 0 1180 self.last_offset = 0 1181 1182 def write_document(self, docnum, offset): 1183 1184 """ 1185 Write for the given 'docnum', the 'offset' at which the fields for the 1186 document are stored in the fields file. 1187 """ 1188 1189 # Write the document number and offset deltas. 1190 1191 self.write_number(docnum - self.last_docnum) 1192 self.write_number(offset - self.last_offset) 1193 1194 self.last_docnum = docnum 1195 self.last_offset = offset 1196 1197 class FieldIndexReader(FileReader): 1198 1199 "Reading field index details from files." 1200 1201 def reset(self): 1202 self.last_docnum = 0 1203 self.last_offset = 0 1204 1205 def read_document(self): 1206 1207 "Read a document number and field file offset." 1208 1209 # Read the document number delta and offset. 1210 1211 self.last_docnum += self.read_number() 1212 self.last_offset += self.read_number() 1213 1214 return self.last_docnum, self.last_offset 1215 1216 class FieldDictionaryWriter: 1217 1218 "Writing field dictionary details." 1219 1220 def __init__(self, field_writer, field_index_writer, interval): 1221 self.field_writer = field_writer 1222 self.field_index_writer = field_index_writer 1223 self.interval = interval 1224 self.entry = 0 1225 1226 def write_fields(self, docnum, fields): 1227 1228 "Write details of the document with the given 'docnum' and 'fields'." 1229 1230 offset = self.field_writer.write_fields(docnum, fields) 1231 1232 if self.entry % self.interval == 0: 1233 self.field_index_writer.write_document(docnum, offset) 1234 1235 self.entry += 1 1236 1237 def close(self): 1238 self.field_writer.close() 1239 self.field_index_writer.close() 1240 1241 class FieldDictionaryReader: 1242 1243 "Reading field dictionary details." 1244 1245 def __init__(self, field_reader, field_index_reader): 1246 self.field_reader = field_reader 1247 self.field_index_reader = field_index_reader 1248 1249 self.docs = [] 1250 try: 1251 while 1: 1252 self.docs.append(self.field_index_reader.read_document()) 1253 except EOFError: 1254 pass 1255 1256 # Large numbers for ordering purposes. 1257 1258 if self.docs: 1259 self.max_offset = self.docs[-1][1] 1260 else: 1261 self.max_offset = None 1262 1263 # Iterator convenience methods. 1264 1265 def __iter__(self): 1266 self.rewind() 1267 return self 1268 1269 def next(self): 1270 try: 1271 return self.read_fields() 1272 except EOFError: 1273 raise StopIteration 1274 1275 # Sequential access methods. 1276 1277 def rewind(self): 1278 self.field_reader.rewind() 1279 1280 def read_fields(self): 1281 1282 "Return the next document number and fields." 1283 1284 return self.field_reader.read_fields() 1285 1286 # Random access methods. 1287 1288 def get_fields(self, docnum): 1289 1290 "Read the fields of the document with the given 'docnum'." 1291 1292 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1293 1294 # Get the entry position providing the term or one preceding it. 1295 1296 if i == -1: 1297 return None 1298 1299 found_docnum, offset = self.docs[i] 1300 1301 # Read from the fields file. 1302 1303 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1304 1305 # Scan for the document, if necessary. 1306 1307 try: 1308 while docnum > found_docnum: 1309 found_docnum, fields = self.field_reader.read_fields() 1310 except EOFError: 1311 pass 1312 1313 # If the document is found, return the fields. 1314 1315 if docnum == found_docnum: 1316 return fields 1317 else: 1318 return None 1319 1320 def close(self): 1321 self.field_reader.close() 1322 self.field_index_reader.close() 1323 1324 # Dictionary merging classes. 1325 1326 class Merger: 1327 1328 "Merge files." 1329 1330 def __init__(self, writer, readers): 1331 self.writer = writer 1332 self.readers = readers 1333 1334 def close(self): 1335 for reader in self.readers: 1336 reader.close() 1337 self.writer.close() 1338 1339 class TermDictionaryMerger(Merger): 1340 1341 "Merge term and position files." 1342 1343 def merge(self): 1344 1345 """ 1346 Merge terms and positions from the readers, sending them to the writer. 1347 """ 1348 1349 last_term = None 1350 current_readers = [] 1351 1352 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1353 if term == last_term: 1354 current_readers.append(positions) 1355 else: 1356 if current_readers: 1357 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1358 last_term = term 1359 current_readers = [positions] 1360 else: 1361 if current_readers: 1362 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1363 1364 class FieldDictionaryMerger(Merger): 1365 1366 "Merge field files." 1367 1368 def merge(self): 1369 1370 """ 1371 Merge fields from the readers, sending them to the writer. 1372 """ 1373 1374 for docnum, fields in itermerge(self.readers): 1375 self.writer.write_fields(docnum, fields) 1376 1377 # Utility functions. 1378 1379 def get_term_writer(pathname, partition, interval, doc_interval): 1380 1381 """ 1382 Return a term dictionary writer using files under the given 'pathname' 1383 labelled according to the given 'partition', using the given indexing 1384 'interval' for terms and 'doc_interval' for document position records. 1385 """ 1386 1387 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1388 info_writer = TermWriter(tdf) 1389 1390 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1391 index_writer = TermIndexWriter(tdif) 1392 1393 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1394 positions_writer = PositionWriter(tpf) 1395 1396 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1397 positions_index_writer = PositionIndexWriter(tpif) 1398 1399 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1400 1401 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1402 1403 def get_field_writer(pathname, partition, interval): 1404 1405 """ 1406 Return a field dictionary writer using files under the given 'pathname' 1407 labelled according to the given 'partition', using the given indexing 1408 'interval'. 1409 """ 1410 1411 ff = open(join(pathname, "fields-%s" % partition), "wb") 1412 field_writer = FieldWriter(ff) 1413 1414 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1415 field_index_writer = FieldIndexWriter(fif) 1416 1417 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1418 1419 def get_term_reader(pathname, partition): 1420 1421 """ 1422 Return a term dictionary reader using files under the given 'pathname' 1423 labelled according to the given 'partition'. 1424 """ 1425 1426 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1427 info_reader = TermReader(tdf) 1428 1429 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1430 index_reader = TermIndexReader(tdif) 1431 1432 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1433 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1434 1435 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1436 1437 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1438 1439 def get_field_reader(pathname, partition): 1440 1441 """ 1442 Return a field dictionary reader using files under the given 'pathname' 1443 labelled according to the given 'partition'. 1444 """ 1445 1446 ff = open(join(pathname, "fields-%s" % partition), "rb") 1447 field_reader = FieldReader(ff) 1448 1449 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1450 field_index_reader = FieldIndexReader(fif) 1451 1452 return FieldDictionaryReader(field_reader, field_index_reader) 1453 1454 def rename_files(pathname, names, from_partition, to_partition): 1455 for name in names: 1456 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1457 1458 def rename_term_files(pathname, from_partition, to_partition): 1459 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1460 1461 def rename_field_files(pathname, from_partition, to_partition): 1462 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1463 1464 def remove_files(pathname, names, partition): 1465 for name in names: 1466 remove(join(pathname, "%s-%s" % (name, partition))) 1467 1468 def remove_term_files(pathname, partition): 1469 remove_files(pathname, TERM_FILENAMES, partition) 1470 1471 def remove_field_files(pathname, partition): 1472 remove_files(pathname, FIELD_FILENAMES, partition) 1473 1474 # High-level classes. 1475 1476 class Document: 1477 1478 "A container of document information." 1479 1480 def __init__(self, docnum): 1481 self.docnum = docnum 1482 self.fields = [] 1483 self.terms = {} 1484 1485 def add_position(self, term, position): 1486 1487 """ 1488 Add a position entry for the given 'term', indicating the given 1489 'position'. 1490 """ 1491 1492 self.terms.setdefault(term, []).append(position) 1493 1494 def add_field(self, identifier, value): 1495 1496 "Add a field having the given 'identifier' and 'value'." 1497 1498 self.fields.append((identifier, unicode(value))) # convert to string 1499 1500 def set_fields(self, fields): 1501 1502 """ 1503 Set the document's 'fields': a list of tuples each containing an integer 1504 identifier and a string value. 1505 """ 1506 1507 self.fields = fields 1508 1509 class IndexWriter: 1510 1511 """ 1512 Building term information and writing it to the term and field dictionaries. 1513 """ 1514 1515 def __init__(self, pathname, interval, doc_interval, flush_interval): 1516 self.pathname = pathname 1517 self.interval = interval 1518 self.doc_interval = doc_interval 1519 self.flush_interval = flush_interval 1520 1521 self.dict_partition = 0 1522 self.field_dict_partition = 0 1523 1524 self.terms = {} 1525 self.docs = {} 1526 1527 self.doc_counter = 0 1528 1529 def add_document(self, doc): 1530 1531 """ 1532 Add the given document 'doc', updating the document counter and flushing 1533 terms and fields if appropriate. 1534 """ 1535 1536 for term, positions in doc.terms.items(): 1537 self.terms.setdefault(term, {})[doc.docnum] = positions 1538 1539 self.docs[doc.docnum] = doc.fields 1540 1541 self.doc_counter += 1 1542 if self.flush_interval and self.doc_counter >= self.flush_interval: 1543 self.flush_terms() 1544 self.flush_fields() 1545 self.doc_counter = 0 1546 1547 def get_term_writer(self): 1548 1549 "Return a term dictionary writer for the current partition." 1550 1551 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1552 1553 def get_field_writer(self): 1554 1555 "Return a field dictionary writer for the current partition." 1556 1557 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1558 1559 def flush_terms(self): 1560 1561 "Flush terms into the current term dictionary partition." 1562 1563 # Get the terms in order. 1564 1565 all_terms = self.terms 1566 terms = all_terms.keys() 1567 terms.sort() 1568 1569 dict_writer = self.get_term_writer() 1570 1571 for term in terms: 1572 doc_positions = all_terms[term].items() 1573 dict_writer.write_term_positions(term, doc_positions) 1574 1575 dict_writer.close() 1576 1577 self.terms = {} 1578 self.dict_partition += 1 1579 1580 def flush_fields(self): 1581 1582 "Flush fields into the current term dictionary partition." 1583 1584 # Get the documents in order. 1585 1586 docs = self.docs.items() 1587 docs.sort() 1588 1589 field_dict_writer = self.get_field_writer() 1590 1591 for docnum, fields in docs: 1592 field_dict_writer.write_fields(docnum, fields) 1593 1594 field_dict_writer.close() 1595 1596 self.docs = {} 1597 self.field_dict_partition += 1 1598 1599 def close(self): 1600 if self.terms: 1601 self.flush_terms() 1602 if self.docs: 1603 self.flush_fields() 1604 1605 class IndexReader: 1606 1607 "Accessing the term and field dictionaries." 1608 1609 def __init__(self, pathname): 1610 self.dict_reader = get_term_reader(pathname, "merged") 1611 self.field_dict_reader = get_field_reader(pathname, "merged") 1612 1613 def find_terms(self, term): 1614 return self.dict_reader.find_terms(term) 1615 1616 def find_positions(self, term): 1617 return self.dict_reader.find_positions(term) 1618 1619 def get_frequency(self, term): 1620 return self.dict_reader.get_frequency(term) 1621 1622 def get_document_frequency(self, term): 1623 return self.dict_reader.get_document_frequency(term) 1624 1625 def get_fields(self, docnum): 1626 return self.field_dict_reader.get_fields(docnum) 1627 1628 def close(self): 1629 self.dict_reader.close() 1630 self.field_dict_reader.close() 1631 1632 class Index: 1633 1634 "An inverted index solution encapsulating the various components." 1635 1636 def __init__(self, pathname): 1637 self.pathname = pathname 1638 self.reader = None 1639 self.writer = None 1640 1641 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1642 1643 """ 1644 Return a writer, optionally using the given indexing 'interval', 1645 'doc_interval' and 'flush_interval'. 1646 """ 1647 1648 if not exists(self.pathname): 1649 mkdir(self.pathname) 1650 1651 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1652 return self.writer 1653 1654 def get_reader(self, partition=0): 1655 1656 "Return a reader for the index." 1657 1658 # Ensure that only one partition exists. 1659 1660 self.merge() 1661 return self._get_reader(partition) 1662 1663 def _get_reader(self, partition): 1664 1665 "Return a reader for the index." 1666 1667 if not exists(self.pathname): 1668 raise OSError, "Index path %r does not exist." % self.pathname 1669 1670 self.reader = IndexReader(self.pathname) 1671 return self.reader 1672 1673 def merge(self): 1674 1675 "Merge/optimise index partitions." 1676 1677 self.merge_terms() 1678 self.merge_fields() 1679 1680 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1681 1682 """ 1683 Merge term dictionaries using the given indexing 'interval' and 1684 'doc_interval'. 1685 """ 1686 1687 readers = [] 1688 partitions = set() 1689 1690 for filename in listdir(self.pathname): 1691 if filename.startswith("terms-"): # 6 character prefix 1692 partition = filename[6:] 1693 readers.append(get_term_reader(self.pathname, partition)) 1694 partitions.add(partition) 1695 1696 # Write directly to a dictionary. 1697 1698 if len(readers) > 1: 1699 if "merged" in partitions: 1700 rename_term_files(self.pathname, "merged", "old-merged") 1701 partitions.remove("merged") 1702 partitions.add("old-merged") 1703 1704 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1705 merger = TermDictionaryMerger(writer, readers) 1706 merger.merge() 1707 merger.close() 1708 1709 # Remove old files. 1710 1711 for partition in partitions: 1712 remove_term_files(self.pathname, partition) 1713 1714 elif len(readers) == 1: 1715 partition = list(partitions)[0] 1716 if partition != "merged": 1717 rename_term_files(self.pathname, partition, "merged") 1718 1719 def merge_fields(self, interval=FIELD_INTERVAL): 1720 1721 "Merge field dictionaries using the given indexing 'interval'." 1722 1723 readers = [] 1724 partitions = set() 1725 1726 for filename in listdir(self.pathname): 1727 if filename.startswith("fields-"): # 7 character prefix 1728 partition = filename[7:] 1729 readers.append(get_field_reader(self.pathname, partition)) 1730 partitions.add(partition) 1731 1732 # Write directly to a dictionary. 1733 1734 if len(readers) > 1: 1735 if "merged" in partitions: 1736 rename_field_files(self.pathname, "merged", "old-merged") 1737 partitions.remove("merged") 1738 partitions.add("old-merged") 1739 1740 writer = get_field_writer(self.pathname, "merged", interval) 1741 merger = FieldDictionaryMerger(writer, readers) 1742 merger.merge() 1743 merger.close() 1744 1745 # Remove old files. 1746 1747 for partition in partitions: 1748 remove_field_files(self.pathname, partition) 1749 1750 elif len(readers) == 1: 1751 partition = list(partitions)[0] 1752 if partition != "merged": 1753 rename_field_files(self.pathname, partition, "merged") 1754 1755 def close(self): 1756 if self.reader is not None: 1757 self.reader.close() 1758 self.reader = None 1759 if self.writer is not None: 1760 self.writer.close() 1761 self.writer = None 1762 1763 # vim: tabstop=4 expandtab shiftwidth=4