1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 1000000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing zero. 87 88 elif number == 0: 89 self.f.write(chr(0)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Make sure that the positions are sorted. 229 230 positions.sort() 231 232 # Write the position deltas. 233 234 last = 0 235 for position in positions: 236 pos = position - last 237 self.write_number(pos) 238 last = position 239 240 self.last_docnum = docnum 241 242 return offset 243 244 class PositionReader(FileReader): 245 246 "Reading position information from files." 247 248 def reset(self): 249 self.last_docnum = 0 250 251 def read_positions(self): 252 253 "Read positions, returning a document number and a list of positions." 254 255 # Read the document number delta and add it to the last number. 256 257 self.last_docnum += self.read_number() 258 259 # Read the number of positions. 260 261 npositions = self.read_number() 262 263 # Read the position deltas, adding each previous position to get the 264 # appropriate collection of absolute positions. 265 266 i = 0 267 last = 0 268 positions = [] 269 270 while i < npositions: 271 last += self.read_number() 272 positions.append(last) 273 i += 1 274 275 return self.last_docnum, positions 276 277 def read_term_positions(self, offset, count): 278 279 """ 280 Read all positions from 'offset', seeking to that position in the file 281 before reading. The number of documents available for reading is limited 282 to 'count'. 283 """ 284 285 # Duplicate the file handle. 286 287 f = fdopen(dup(self.f.fileno()), "rb") 288 f.seek(offset) 289 return PositionIterator(f, count) 290 291 class PositionIndexWriter(FileWriter): 292 293 "Writing position index information to files." 294 295 def reset(self): 296 self.last_docnum = 0 297 self.last_pos_offset = 0 298 299 def write_positions(self, docnum, pos_offset, count): 300 301 """ 302 Write the given 'docnum, 'pos_offset' and document 'count' to the 303 position index file. 304 """ 305 306 # Record the offset of this record. 307 308 offset = self.f.tell() 309 310 # Write the document number delta. 311 312 self.write_number(docnum - self.last_docnum) 313 self.last_docnum = docnum 314 315 # Write the position file offset delta. 316 317 self.write_number(pos_offset - self.last_pos_offset) 318 self.last_pos_offset = pos_offset 319 320 # Write the document count. 321 322 self.write_number(count) 323 324 return offset 325 326 class PositionIndexReader(FileReader): 327 328 "Reading position index information from files." 329 330 def reset(self): 331 self.last_docnum = 0 332 self.last_pos_offset = 0 333 334 def read_positions(self): 335 336 """ 337 Read a document number, a position file offset for the position index 338 file, and the number of documents in a section of that file. 339 """ 340 341 # Read the document number delta. 342 343 self.last_docnum += self.read_number() 344 345 # Read the offset delta. 346 347 self.last_pos_offset += self.read_number() 348 349 # Read the document count. 350 351 count = self.read_number() 352 353 return self.last_docnum, self.last_pos_offset, count 354 355 def read_term_positions(self, offset, doc_frequency): 356 357 """ 358 Read all positions from 'offset', seeking to that position in the file 359 before reading. The number of documents available for reading is limited 360 to 'doc_frequency'. 361 """ 362 363 # Duplicate the file handle. 364 365 f = fdopen(dup(self.f.fileno()), "rb") 366 f.seek(offset) 367 return PositionIndexIterator(f, doc_frequency) 368 369 # Iterators for position-related files. 370 371 class IteratorBase: 372 373 def __init__(self, count): 374 self.replenish(count) 375 376 def replenish(self, count): 377 self.count = count 378 self.read_documents = 0 379 380 def __len__(self): 381 return self.count 382 383 def sort(self): 384 pass # Stored document positions are already sorted. 385 386 def __iter__(self): 387 return self 388 389 class PositionIterator(PositionReader, IteratorBase): 390 391 "Iterating over document positions." 392 393 def __init__(self, f, count): 394 PositionReader.__init__(self, f) 395 IteratorBase.__init__(self, count) 396 397 def next(self): 398 399 "Read positions for a single document." 400 401 if self.read_documents < self.count: 402 self.read_documents += 1 403 return self.read_positions() 404 else: 405 raise StopIteration 406 407 class PositionIndexIterator(PositionIndexReader, IteratorBase): 408 409 "Iterating over document positions." 410 411 def __init__(self, f, count): 412 PositionIndexReader.__init__(self, f) 413 IteratorBase.__init__(self, count) 414 self.section_count = 0 415 416 def next(self): 417 418 "Read positions for a single document." 419 420 self.read_documents += self.section_count 421 if self.read_documents < self.count: 422 docnum, pos_offset, self.section_count = t = self.read_positions() 423 return t 424 else: 425 raise StopIteration 426 427 class PositionDictionaryWriter: 428 429 "Writing position dictionaries." 430 431 def __init__(self, position_writer, position_index_writer, interval): 432 self.position_writer = position_writer 433 self.position_index_writer = position_index_writer 434 self.interval = interval 435 436 def write_term_positions(self, doc_positions): 437 438 """ 439 Write all 'doc_positions' - a collection of tuples of the form (document 440 number, position list) - to the file. 441 442 Add some records to the index, making dictionary entries. 443 444 Return a tuple containing the offset of the written data, the frequency 445 (number of positions), and document frequency (number of documents) for 446 the term involved. 447 """ 448 449 # Reset the writers. 450 451 self.position_writer.reset() 452 self.position_index_writer.reset() 453 454 index_offset = None 455 456 # Write the positions. 457 458 frequency = 0 459 first_docnum = None 460 first_offset = None 461 count = 0 462 463 doc_positions.sort() 464 465 for docnum, positions in doc_positions: 466 pos_offset = self.position_writer.write_positions(docnum, positions) 467 468 # Retain the first record offset for a subsequent index entry. 469 470 if first_offset is None: 471 first_offset = pos_offset 472 first_docnum = docnum 473 474 frequency += len(positions) 475 count += 1 476 477 # Every {interval} entries, write an index entry. 478 479 if count == self.interval: 480 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 481 482 # Remember the first index entry offset. 483 484 if index_offset is None: 485 index_offset = io 486 487 first_offset = None 488 first_docnum = None 489 count = 0 490 491 # Reset the position writer so that position readers accessing 492 # a section start with the correct document number. 493 494 self.position_writer.reset() 495 496 # Finish writing an index entry for the remaining documents. 497 498 else: 499 if first_offset is not None: 500 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 501 502 # Remember the first index entry offset. 503 504 if index_offset is None: 505 index_offset = io 506 507 return index_offset, frequency, len(doc_positions) 508 509 def close(self): 510 self.position_writer.close() 511 self.position_index_writer.close() 512 513 class PositionDictionaryReader: 514 515 "Reading position dictionaries." 516 517 def __init__(self, position_reader, position_index_reader): 518 self.position_reader = position_reader 519 self.position_index_reader = position_index_reader 520 521 def read_term_positions(self, offset, doc_frequency): 522 523 """ 524 Return an iterator for dictionary entries starting at 'offset' with the 525 given 'doc_frequency'. 526 """ 527 528 return PositionDictionaryIterator(self.position_reader, 529 self.position_index_reader, offset, doc_frequency) 530 531 def close(self): 532 self.position_reader.close() 533 self.position_index_reader.close() 534 535 class PositionDictionaryIterator: 536 537 "Iteration over position dictionary entries." 538 539 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 540 self.position_reader = position_reader 541 self.doc_frequency = doc_frequency 542 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 543 544 # Remember the last values. 545 546 self.found_docnum, self.found_positions = None, None 547 548 # Maintain state for the next index entry, if read. 549 550 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 551 552 # Initialise the current index entry and current position file iterator. 553 554 self._next_section() 555 self._init_section() 556 557 def __len__(self): 558 return self.doc_frequency 559 560 def sort(self): 561 pass 562 563 def __iter__(self): 564 return self 565 566 def next(self): 567 568 """ 569 Attempt to get the next document record from the section in the 570 positions file. 571 """ 572 573 # Return any visited but unrequested record. 574 575 if self.found_docnum is not None: 576 t = self.found_docnum, self.found_positions 577 self.found_docnum, self.found_positions = None, None 578 return t 579 580 # Or search for the next record. 581 582 while 1: 583 584 # Either return the next record. 585 586 try: 587 return self.iterator.next() 588 589 # Or, where a section is finished, get the next section and try again. 590 591 except StopIteration: 592 593 # Where a section follows, update the index iterator, but keep 594 # reading using the same file iterator (since the data should 595 # just follow on from the last section). 596 597 self._next_section() 598 self.iterator.replenish(self.section_count) 599 600 # Reset the state of the iterator to make sure that document 601 # numbers are correct. 602 603 self.iterator.reset() 604 605 def from_document(self, docnum): 606 607 """ 608 Attempt to navigate to a positions entry for the given 'docnum', 609 returning the positions for 'docnum', or None otherwise. 610 """ 611 612 # Return any unrequested document positions. 613 614 if docnum == self.found_docnum: 615 return self.found_positions 616 617 # Read ahead in the index until the next entry refers to a document 618 # later than the desired document. 619 620 try: 621 if self.next_docnum is None: 622 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 623 624 # Read until the next entry is after the desired document number, 625 # or until the end of the results. 626 627 while self.next_docnum <= docnum: 628 self._next_read_section() 629 if self.docnum < docnum: 630 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 631 else: 632 break 633 634 except StopIteration: 635 pass 636 637 # Navigate in the position file to the document. 638 639 self._init_section() 640 641 try: 642 while 1: 643 found_docnum, found_positions = self.iterator.next() 644 645 # Return the desired document positions or those immediately 646 # after. 647 648 if docnum == found_docnum: 649 return found_positions 650 elif docnum < found_docnum: 651 self.found_docnum, self.found_positions = found_docnum, found_positions 652 return None 653 654 except StopIteration: 655 return None 656 657 # Internal methods. 658 659 def _next_section(self): 660 661 "Attempt to get the next section in the index." 662 663 if self.next_docnum is None: 664 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 665 else: 666 self._next_read_section() 667 668 def _next_read_section(self): 669 670 """ 671 Make the next index entry the current one without reading from the 672 index. 673 """ 674 675 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 676 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 677 678 def _init_section(self): 679 680 "Initialise the iterator for the section in the position file." 681 682 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 683 684 class TermWriter(FileWriter): 685 686 "Writing term information to files." 687 688 def reset(self): 689 self.last_term = "" 690 self.last_offset = 0 691 692 def write_term(self, term, offset, frequency, doc_frequency): 693 694 """ 695 Write the given 'term', its position file 'offset', its 'frequency' and 696 its 'doc_frequency' (number of documents in which it appears) to the 697 term information file. Return the offset after the term information was 698 written to the file. 699 """ 700 701 # Write the prefix length and term suffix. 702 703 common = len(commonprefix([self.last_term, term])) 704 suffix = term[common:] 705 706 self.write_number(common) 707 self.write_string(suffix) 708 709 # Write the offset delta. 710 711 self.write_number(offset - self.last_offset) 712 713 # Write the frequency. 714 715 self.write_number(frequency) 716 717 # Write the document frequency. 718 719 self.write_number(doc_frequency) 720 721 self.last_term = term 722 self.last_offset = offset 723 724 return self.f.tell() 725 726 class TermReader(FileReader): 727 728 "Reading term information from files." 729 730 def reset(self): 731 self.last_term = "" 732 self.last_offset = 0 733 734 def read_term(self): 735 736 """ 737 Read a term, its position file offset, its frequency and its document 738 frequence from the term information file. 739 """ 740 741 # Read the prefix length and term suffix. 742 743 common = self.read_number() 744 suffix = self.read_string() 745 746 self.last_term = self.last_term[:common] + suffix 747 748 # Read the offset delta. 749 750 self.last_offset += self.read_number() 751 752 # Read the frequency. 753 754 frequency = self.read_number() 755 756 # Read the document frequency. 757 758 doc_frequency = self.read_number() 759 760 return self.last_term, self.last_offset, frequency, doc_frequency 761 762 def go_to_term(self, term, offset, info_offset): 763 764 """ 765 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 766 permits the scanning for later terms from the specified term. 767 """ 768 769 self.f.seek(info_offset) 770 self.last_term = term 771 self.last_offset = offset 772 773 class TermIndexWriter(TermWriter): 774 775 "Writing term dictionary index details to files." 776 777 def reset(self): 778 TermWriter.reset(self) 779 self.last_info_offset = 0 780 781 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 782 783 """ 784 Write the given 'term', its position file 'offset', its 'frequency' and 785 its 'doc_frequency' to the term dictionary index file, along with the 786 'info_offset' in the term information file. 787 """ 788 789 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 790 791 # Write the information file offset delta. 792 793 self.write_number(info_offset - self.last_info_offset) 794 self.last_info_offset = info_offset 795 796 class TermIndexReader(TermReader): 797 798 "Reading term dictionary index details from files." 799 800 def reset(self): 801 TermReader.reset(self) 802 self.last_info_offset = 0 803 804 def read_term(self): 805 806 """ 807 Read a term, its position file offset, its frequency, its document 808 frequency and a term information file offset from the term dictionary 809 index file. 810 """ 811 812 term, offset, frequency, doc_frequency = TermReader.read_term(self) 813 814 # Read the offset delta. 815 816 self.last_info_offset += self.read_number() 817 818 return term, offset, frequency, doc_frequency, self.last_info_offset 819 820 class TermDictionaryWriter: 821 822 "Writing term dictionaries." 823 824 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 825 self.info_writer = info_writer 826 self.index_writer = index_writer 827 self.position_dict_writer = position_dict_writer 828 self.interval = interval 829 self.entry = 0 830 831 def _write_term(self, term, offset, frequency, doc_frequency): 832 833 """ 834 Write the given 'term', its position file 'offset', its 'frequency' and 835 its 'doc_frequency' (number of documents in which it appears) to the 836 term information file. Return the offset after the term information was 837 written to the file. 838 """ 839 840 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 841 842 if self.entry % self.interval == 0: 843 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 844 845 self.entry += 1 846 847 def write_term_positions(self, term, doc_positions): 848 849 """ 850 Write the given 'term' and the 'doc_positions' recording the documents 851 and positions at which the term is found. 852 """ 853 854 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 855 self._write_term(term, offset, frequency, doc_frequency) 856 857 def close(self): 858 self.info_writer.close() 859 self.index_writer.close() 860 self.position_dict_writer.close() 861 862 class TermDictionaryReader: 863 864 "Reading term dictionaries." 865 866 def __init__(self, info_reader, index_reader, position_dict_reader): 867 self.info_reader = info_reader 868 self.index_reader = index_reader 869 self.position_dict_reader = position_dict_reader 870 871 self.terms = [] 872 try: 873 while 1: 874 self.terms.append(self.index_reader.read_term()) 875 except EOFError: 876 pass 877 878 # Large numbers for ordering purposes. 879 880 self.max_offset = self.terms[-1][1] + 1 881 882 def _find_term(self, term): 883 884 """ 885 Find the position file offset and frequency of 'term' from the term 886 dictionary. 887 """ 888 889 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 890 891 # Get the entry position providing the term or one preceding it. 892 893 if i == -1: 894 return None 895 896 found_term, offset, frequency, doc_frequency, info_offset = self.terms[i] 897 898 # Where the term is found immediately, return the offset and 899 # frequencies. 900 901 if term == found_term: 902 return offset, frequency, doc_frequency 903 904 # Otherwise, seek past the index term's entry in the information file 905 # and scan for the desired term. 906 907 else: 908 self.info_reader.go_to_term(found_term, offset, info_offset) 909 try: 910 while term > found_term: 911 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 912 except EOFError: 913 pass 914 915 # If the term is found, return the offset and frequencies. 916 917 if term == found_term: 918 return offset, frequency, doc_frequency 919 else: 920 return None 921 922 def rewind(self): 923 self.info_reader.rewind() 924 925 def _get_positions(self, offset, doc_frequency): 926 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 927 928 def read_term(self): 929 930 """ 931 Return the next term, its frequency, its document frequency, and the 932 documents and positions at which the term is found. 933 """ 934 935 term, offset, frequency, doc_frequency = self.info_reader.read_term() 936 positions = self._get_positions(offset, doc_frequency) 937 return term, frequency, doc_frequency, positions 938 939 def find_positions(self, term): 940 941 "Return the documents and positions at which the given 'term' is found." 942 943 t = self._find_term(term) 944 if t is None: 945 return None 946 else: 947 offset, frequency, doc_frequency = t 948 return self._get_positions(offset, doc_frequency) 949 950 def get_frequency(self, term): 951 952 "Return the frequency of the given 'term'." 953 954 t = self._find_term(term) 955 if t is None: 956 return None 957 else: 958 offset, frequency, doc_frequency = t 959 return frequency 960 961 def get_document_frequency(self, term): 962 963 "Return the document frequency of the given 'term'." 964 965 t = self._find_term(term) 966 if t is None: 967 return None 968 else: 969 offset, frequency, doc_frequency = t 970 return doc_frequency 971 972 def close(self): 973 self.info_reader.close() 974 self.index_reader.close() 975 self.position_dict_reader.close() 976 977 # Specific classes for storing document information. 978 979 class FieldWriter(FileWriter): 980 981 "Writing field data to files." 982 983 def reset(self): 984 self.last_docnum = 0 985 986 def write_fields(self, docnum, fields): 987 988 """ 989 Write for the given 'docnum', a list of 'fields' (integer, string pairs 990 representing field identifiers and values respectively). 991 Return the offset at which the fields are stored. 992 """ 993 994 offset = self.f.tell() 995 996 # Write the document number delta. 997 998 self.write_number(docnum - self.last_docnum) 999 1000 # Write the number of fields. 1001 1002 self.write_number(len(fields)) 1003 1004 # Write the fields themselves. 1005 1006 for i, field in fields: 1007 self.write_number(i) 1008 self.write_string(field, 1) # compress 1009 1010 self.last_docnum = docnum 1011 return offset 1012 1013 class FieldReader(FileReader): 1014 1015 "Reading field data from files." 1016 1017 def reset(self): 1018 self.last_docnum = 0 1019 1020 def read_fields(self): 1021 1022 """ 1023 Read fields from the file, returning a tuple containing the document 1024 number and a list of field (identifier, value) pairs. 1025 """ 1026 1027 # Read the document number. 1028 1029 self.last_docnum += self.read_number() 1030 1031 # Read the number of fields. 1032 1033 nfields = self.read_number() 1034 1035 # Collect the fields. 1036 1037 fields = [] 1038 i = 0 1039 1040 while i < nfields: 1041 identifier = self.read_number() 1042 value = self.read_string(1) # decompress 1043 fields.append((identifier, value)) 1044 i += 1 1045 1046 return self.last_docnum, fields 1047 1048 def read_document_fields(self, docnum, offset): 1049 1050 """ 1051 Read fields for 'docnum' at the given 'offset'. This permits the 1052 retrieval of details for the specified document, as well as scanning for 1053 later documents. 1054 """ 1055 1056 self.f.seek(offset) 1057 bad_docnum, fields = self.read_fields() 1058 self.last_docnum = docnum 1059 return docnum, fields 1060 1061 class FieldIndexWriter(FileWriter): 1062 1063 "Writing field index details to files." 1064 1065 def reset(self): 1066 self.last_docnum = 0 1067 self.last_offset = 0 1068 1069 def write_document(self, docnum, offset): 1070 1071 """ 1072 Write for the given 'docnum', the 'offset' at which the fields for the 1073 document are stored in the fields file. 1074 """ 1075 1076 # Write the document number and offset deltas. 1077 1078 self.write_number(docnum - self.last_docnum) 1079 self.write_number(offset - self.last_offset) 1080 1081 self.last_docnum = docnum 1082 self.last_offset = offset 1083 1084 class FieldIndexReader(FileReader): 1085 1086 "Reading field index details from files." 1087 1088 def reset(self): 1089 self.last_docnum = 0 1090 self.last_offset = 0 1091 1092 def read_document(self): 1093 1094 "Read a document number and field file offset." 1095 1096 # Read the document number delta and offset. 1097 1098 self.last_docnum += self.read_number() 1099 self.last_offset += self.read_number() 1100 1101 return self.last_docnum, self.last_offset 1102 1103 class FieldDictionaryWriter: 1104 1105 "Writing field dictionary details." 1106 1107 def __init__(self, field_writer, field_index_writer, interval): 1108 self.field_writer = field_writer 1109 self.field_index_writer = field_index_writer 1110 self.interval = interval 1111 self.entry = 0 1112 1113 def write_fields(self, docnum, fields): 1114 1115 "Write details of the document with the given 'docnum' and 'fields'." 1116 1117 offset = self.field_writer.write_fields(docnum, fields) 1118 1119 if self.entry % self.interval == 0: 1120 self.field_index_writer.write_document(docnum, offset) 1121 1122 self.entry += 1 1123 1124 def close(self): 1125 self.field_writer.close() 1126 self.field_index_writer.close() 1127 1128 class FieldDictionaryReader: 1129 1130 "Reading field dictionary details." 1131 1132 def __init__(self, field_reader, field_index_reader): 1133 self.field_reader = field_reader 1134 self.field_index_reader = field_index_reader 1135 1136 self.docs = [] 1137 try: 1138 while 1: 1139 self.docs.append(self.field_index_reader.read_document()) 1140 except EOFError: 1141 pass 1142 1143 # Large numbers for ordering purposes. 1144 1145 self.max_offset = self.docs[-1][1] 1146 1147 def rewind(self): 1148 self.field_reader.rewind() 1149 1150 def read_fields(self): 1151 1152 "Return the next document number and fields." 1153 1154 return self.field_reader.read_fields() 1155 1156 def get_fields(self, docnum): 1157 1158 "Read the fields of the document with the given 'docnum'." 1159 1160 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1161 1162 # Get the entry position providing the term or one preceding it. 1163 1164 if i == -1: 1165 return None 1166 1167 found_docnum, offset = self.docs[i] 1168 1169 # Read from the fields file. 1170 1171 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1172 1173 # Scan for the document, if necessary. 1174 1175 try: 1176 while docnum > found_docnum: 1177 found_docnum, fields = self.field_reader.read_fields() 1178 except EOFError: 1179 pass 1180 1181 # If the document is found, return the fields. 1182 1183 if docnum == found_docnum: 1184 return fields 1185 else: 1186 return None 1187 1188 def close(self): 1189 self.field_reader.close() 1190 self.field_index_reader.close() 1191 1192 # Dictionary merging classes. 1193 1194 class Merger: 1195 1196 "Merge files." 1197 1198 def __init__(self, writer, readers): 1199 self.writer = writer 1200 self.readers = readers 1201 1202 def close(self): 1203 for reader in self.readers: 1204 reader.close() 1205 self.writer.close() 1206 1207 class TermDictionaryMerger(Merger): 1208 1209 "Merge term and position files." 1210 1211 def merge(self): 1212 1213 """ 1214 Merge terms and positions from the readers, sending them to the writer. 1215 """ 1216 1217 entries = [] 1218 1219 # Get the first entries from the readers. 1220 1221 for partition, reader in enumerate(self.readers): 1222 reader.rewind() 1223 1224 try: 1225 term, frequency, doc_frequency, positions = reader.read_term() 1226 insort_right(entries, (term, positions, partition)) 1227 except EOFError: 1228 pass 1229 1230 # While entries are available, write them out in order, merging where 1231 # appropriate. 1232 1233 while entries: 1234 term, doc_positions, partition = entries[0] 1235 to_update = [partition] 1236 1237 nentries = len(entries) 1238 i = 1 1239 1240 # Find other entries for the term. 1241 1242 while i < nentries: 1243 other_term, other_doc_positions, other_partition = entries[i] 1244 1245 # For such entries, merge the positions. 1246 1247 if other_term == term: 1248 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1249 to_update.append(other_partition) 1250 i += 1 1251 else: 1252 break 1253 1254 # Write the combined term details. 1255 1256 self.writer.write_term_positions(term, doc_positions) 1257 1258 # Update the entries from the affected readers. 1259 1260 del entries[:i] 1261 1262 for partition in to_update: 1263 try: 1264 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1265 insort_right(entries, (term, positions, partition)) 1266 except EOFError: 1267 pass 1268 1269 def merge_positions(self, doc_positions, other_doc_positions): 1270 1271 """ 1272 Merge 'doc_positions' with 'other_doc_positions' so that common document 1273 records contain positions from both collections. 1274 """ 1275 1276 doc_position_dict = dict(doc_positions) 1277 1278 for docnum, positions in other_doc_positions: 1279 if doc_position_dict.has_key(docnum): 1280 doc_position_dict[docnum] += positions 1281 else: 1282 doc_position_dict[docnum] = positions 1283 1284 return doc_position_dict.items() 1285 1286 class FieldDictionaryMerger(Merger): 1287 1288 "Merge field files." 1289 1290 def merge(self): 1291 1292 """ 1293 Merge fields from the readers, sending them to the writer. 1294 """ 1295 1296 entries = [] 1297 1298 # Get the first entries from the readers. 1299 1300 for partition, reader in enumerate(self.readers): 1301 reader.rewind() 1302 1303 try: 1304 docnum, fields = reader.read_fields() 1305 insort_right(entries, (docnum, fields, partition)) 1306 except EOFError: 1307 pass 1308 1309 # While entries are available, write them out in order, merging where 1310 # appropriate. 1311 1312 while entries: 1313 docnum, fields, partition = entries[0] 1314 to_update = [partition] 1315 1316 nentries = len(entries) 1317 i = 1 1318 1319 # Find other entries for the term. 1320 1321 while i < nentries: 1322 other_docnum, other_fields, other_partition = entries[i] 1323 1324 # For such entries, merge the positions. 1325 1326 if other_docnum == docnum: 1327 fields += other_fields 1328 to_update.append(other_partition) 1329 i += 1 1330 else: 1331 break 1332 1333 # Write the combined term details. 1334 1335 self.writer.write_fields(docnum, fields) 1336 1337 # Update the entries from the affected readers. 1338 1339 del entries[:i] 1340 1341 for partition in to_update: 1342 try: 1343 docnum, fields = self.readers[partition].read_fields() 1344 insort_right(entries, (docnum, fields, partition)) 1345 except EOFError: 1346 pass 1347 1348 # Utility functions. 1349 1350 def get_term_writer(pathname, partition, interval, doc_interval): 1351 1352 """ 1353 Return a term dictionary writer using files under the given 'pathname' 1354 labelled according to the given 'partition', using the given indexing 1355 'interval' for terms and 'doc_interval' for document position records. 1356 """ 1357 1358 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1359 info_writer = TermWriter(tdf) 1360 1361 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1362 index_writer = TermIndexWriter(tdif) 1363 1364 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1365 positions_writer = PositionWriter(tpf) 1366 1367 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1368 positions_index_writer = PositionIndexWriter(tpif) 1369 1370 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1371 1372 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1373 1374 def get_field_writer(pathname, partition, interval): 1375 1376 """ 1377 Return a field dictionary writer using files under the given 'pathname' 1378 labelled according to the given 'partition', using the given indexing 1379 'interval'. 1380 """ 1381 1382 ff = open(join(pathname, "fields-%s" % partition), "wb") 1383 field_writer = FieldWriter(ff) 1384 1385 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1386 field_index_writer = FieldIndexWriter(fif) 1387 1388 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1389 1390 def get_term_reader(pathname, partition): 1391 1392 """ 1393 Return a term dictionary reader using files under the given 'pathname' 1394 labelled according to the given 'partition'. 1395 """ 1396 1397 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1398 info_reader = TermReader(tdf) 1399 1400 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1401 index_reader = TermIndexReader(tdif) 1402 1403 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1404 positions_reader = PositionReader(tpf) 1405 1406 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1407 positions_index_reader = PositionIndexReader(tpif) 1408 1409 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1410 1411 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1412 1413 def get_field_reader(pathname, partition): 1414 1415 """ 1416 Return a field dictionary reader using files under the given 'pathname' 1417 labelled according to the given 'partition'. 1418 """ 1419 1420 ff = open(join(pathname, "fields-%s" % partition), "rb") 1421 field_reader = FieldReader(ff) 1422 1423 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1424 field_index_reader = FieldIndexReader(fif) 1425 1426 return FieldDictionaryReader(field_reader, field_index_reader) 1427 1428 def rename_files(pathname, names, from_partition, to_partition): 1429 for name in names: 1430 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1431 1432 def rename_term_files(pathname, from_partition, to_partition): 1433 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1434 1435 def rename_field_files(pathname, from_partition, to_partition): 1436 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1437 1438 def remove_files(pathname, names, partition): 1439 for name in names: 1440 remove(join(pathname, "%s-%s" % (name, partition))) 1441 1442 def remove_term_files(pathname, partition): 1443 remove_files(pathname, TERM_FILENAMES, partition) 1444 1445 def remove_field_files(pathname, partition): 1446 remove_files(pathname, FIELD_FILENAMES, partition) 1447 1448 # High-level classes. 1449 1450 class IndexWriter: 1451 1452 """ 1453 Building term information and writing it to the term and field dictionaries. 1454 """ 1455 1456 def __init__(self, pathname, interval, doc_interval, flush_interval): 1457 self.pathname = pathname 1458 self.interval = interval 1459 self.doc_interval = doc_interval 1460 self.flush_interval = flush_interval 1461 1462 self.dict_partition = 0 1463 self.field_dict_partition = 0 1464 1465 self.terms = {} 1466 self.docs = {} 1467 1468 self.position_counter = 0 1469 self.field_counter = 0 1470 1471 def add_position(self, term, docnum, position): 1472 1473 """ 1474 Add a position entry for the given 'term' in the document with the given 1475 'docnum', indicating the given 'position'. 1476 """ 1477 1478 if not self.terms.has_key(term): 1479 doc_positions = self.terms[term] = {} 1480 else: 1481 doc_positions = self.terms[term] 1482 1483 if not doc_positions.has_key(docnum): 1484 doc = doc_positions[docnum] = [] 1485 else: 1486 doc = doc_positions[docnum] 1487 1488 doc.append(position) 1489 1490 self.position_counter += 1 1491 if self.flush_interval and self.position_counter >= self.flush_interval: 1492 self.flush_terms() 1493 self.position_counter = 0 1494 1495 def add_field(self, docnum, identifier, value): 1496 1497 """ 1498 Add for the document with the given 'docnum' a field having the given 1499 'identifier' and 'value'. 1500 """ 1501 1502 if not self.docs.has_key(docnum): 1503 doc_fields = self.docs[docnum] = [] 1504 else: 1505 doc_fields = self.docs[docnum] 1506 1507 doc_fields.append((identifier, value)) 1508 1509 self.field_counter += 1 1510 if self.flush_interval and self.field_counter >= self.flush_interval: 1511 self.flush_fields() 1512 self.field_counter = 0 1513 1514 def get_term_writer(self): 1515 1516 "Return a term dictionary writer for the current partition." 1517 1518 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1519 1520 def get_field_writer(self): 1521 1522 "Return a field dictionary writer for the current partition." 1523 1524 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1525 1526 def flush_terms(self): 1527 1528 "Flush terms into the current term dictionary partition." 1529 1530 # Get the terms in order. 1531 1532 terms = self.terms.items() 1533 terms.sort() 1534 1535 dict_writer = self.get_term_writer() 1536 1537 for term, doc_positions in terms: 1538 doc_positions = doc_positions.items() 1539 dict_writer.write_term_positions(term, doc_positions) 1540 1541 dict_writer.close() 1542 1543 self.terms = {} 1544 self.dict_partition += 1 1545 1546 def flush_fields(self): 1547 1548 "Flush fields into the current term dictionary partition." 1549 1550 # Get the documents in order. 1551 1552 docs = self.docs.items() 1553 docs.sort() 1554 1555 field_dict_writer = self.get_field_writer() 1556 1557 for docnum, fields in docs: 1558 field_dict_writer.write_fields(docnum, fields) 1559 1560 field_dict_writer.close() 1561 1562 self.docs = {} 1563 self.field_dict_partition += 1 1564 1565 def close(self): 1566 if self.terms: 1567 self.flush_terms() 1568 if self.docs: 1569 self.flush_fields() 1570 1571 class IndexReader: 1572 1573 "Accessing the term and field dictionaries." 1574 1575 def __init__(self, pathname): 1576 self.dict_reader = get_term_reader(pathname, "merged") 1577 self.field_dict_reader = get_field_reader(pathname, "merged") 1578 1579 def find_positions(self, term): 1580 return self.dict_reader.find_positions(term) 1581 1582 def get_frequency(self, term): 1583 return self.dict_reader.get_frequency(term) 1584 1585 def get_document_frequency(self, term): 1586 return self.dict_reader.get_document_frequency(term) 1587 1588 def get_fields(self, docnum): 1589 return self.field_dict_reader.get_fields(docnum) 1590 1591 def close(self): 1592 self.dict_reader.close() 1593 self.field_dict_reader.close() 1594 1595 class Index: 1596 1597 "An inverted index solution encapsulating the various components." 1598 1599 def __init__(self, pathname): 1600 self.pathname = pathname 1601 self.reader = None 1602 self.writer = None 1603 1604 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1605 1606 """ 1607 Return a writer, optionally using the given indexing 'interval', 1608 'doc_interval' and 'flush_interval'. 1609 """ 1610 1611 if not exists(self.pathname): 1612 mkdir(self.pathname) 1613 1614 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1615 return self.writer 1616 1617 def get_reader(self, partition=0): 1618 1619 "Return a reader for the index." 1620 1621 # Ensure that only one partition exists. 1622 1623 self.merge_terms() 1624 self.merge_fields() 1625 1626 return self._get_reader(partition) 1627 1628 def _get_reader(self, partition): 1629 1630 "Return a reader for the index." 1631 1632 if not exists(self.pathname): 1633 raise OSError, "Index path %r does not exist." % self.pathname 1634 1635 self.reader = IndexReader(self.pathname) 1636 return self.reader 1637 1638 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1639 1640 """ 1641 Merge term dictionaries using the given indexing 'interval' and 1642 'doc_interval'. 1643 """ 1644 1645 readers = [] 1646 partitions = set() 1647 1648 for filename in listdir(self.pathname): 1649 if filename.startswith("terms-"): # 6 character prefix 1650 partition = filename[6:] 1651 readers.append(get_term_reader(self.pathname, partition)) 1652 partitions.add(partition) 1653 1654 # Write directly to a dictionary. 1655 1656 if len(readers) > 1: 1657 if "merged" in partitions: 1658 rename_term_files(self.pathname, "merged", "old-merged") 1659 partitions.remove("merged") 1660 partitions.add("old-merged") 1661 1662 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1663 merger = TermDictionaryMerger(writer, readers) 1664 merger.merge() 1665 merger.close() 1666 1667 # Remove old files. 1668 1669 for partition in partitions: 1670 remove_term_files(self.pathname, partition) 1671 1672 elif len(readers) == 1: 1673 partition = list(partitions)[0] 1674 if partition != "merged": 1675 rename_term_files(self.pathname, partition, "merged") 1676 1677 def merge_fields(self, interval=FIELD_INTERVAL): 1678 1679 "Merge field dictionaries using the given indexing 'interval'." 1680 1681 readers = [] 1682 partitions = set() 1683 1684 for filename in listdir(self.pathname): 1685 if filename.startswith("fields-"): # 7 character prefix 1686 partition = filename[7:] 1687 readers.append(get_field_reader(self.pathname, partition)) 1688 partitions.add(partition) 1689 1690 # Write directly to a dictionary. 1691 1692 if len(readers) > 1: 1693 if "merged" in partitions: 1694 rename_field_files(self.pathname, "merged", "old-merged") 1695 partitions.remove("merged") 1696 partitions.add("old-merged") 1697 1698 writer = get_field_writer(self.pathname, "merged", interval) 1699 merger = FieldDictionaryMerger(writer, readers) 1700 merger.merge() 1701 merger.close() 1702 1703 # Remove old files. 1704 1705 for partition in partitions: 1706 remove_field_files(self.pathname, partition) 1707 1708 elif len(readers) == 1: 1709 partition = list(partitions)[0] 1710 if partition != "merged": 1711 rename_field_files(self.pathname, partition, "merged") 1712 1713 def close(self): 1714 if self.reader is not None: 1715 self.reader.close() 1716 self.reader = None 1717 if self.writer is not None: 1718 self.writer.close() 1719 self.writer = None 1720 1721 # vim: tabstop=4 expandtab shiftwidth=4