1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 # Constants. 31 32 INTERVAL = 100 33 FLUSH_INTERVAL = 1000000 34 35 compressors = [("b", bz2.compress), ("z", zlib.compress)] 36 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 37 38 # Foundation classes. 39 40 class File: 41 42 "A basic file abstraction." 43 44 def __init__(self, f): 45 self.f = f 46 self.reset() 47 48 def reset(self): 49 50 "To be used to reset the state of the reader or writer between records." 51 52 pass 53 54 def rewind(self): 55 self.f.seek(0) 56 self.reset() 57 58 def close(self): 59 if self.f is not None: 60 self.f.close() 61 self.f = None 62 63 class FileWriter(File): 64 65 "Writing basic data types to files." 66 67 def write_number(self, number): 68 69 "Write 'number' to the file using a variable length encoding." 70 71 # Negative numbers are not supported. 72 73 if number < 0: 74 raise ValueError, "Number %r is negative." % number 75 76 # Special case: one byte containing zero. 77 78 elif number == 0: 79 self.f.write(chr(0)) 80 return 81 82 # Write the number from least to most significant digits. 83 84 bytes = [] 85 86 while number != 0: 87 lsd = number & 127 88 number = number >> 7 89 if number != 0: 90 lsd |= 128 91 bytes.append(chr(lsd)) 92 93 record = "".join(bytes) 94 self.f.write(record) 95 96 def write_string(self, s, compress=0): 97 98 """ 99 Write 's' to the file, recording its length and compressing the string 100 if 'compress' is set to a true value. 101 """ 102 103 # Convert Unicode objects to strings. 104 105 if isinstance(s, unicode): 106 s = s.encode("utf-8") 107 108 # Compress the string if requested. 109 110 if compress: 111 for flag, fn in compressors: 112 cs = fn(s) 113 114 # Take the first string shorter than the original. 115 116 if len(cs) < len(s): 117 s = cs 118 break 119 else: 120 flag = "-" 121 122 # Record whether compression was used. 123 124 self.f.write(flag) 125 126 # Write the length of the data before the data itself. 127 128 length = len(s) 129 self.write_number(length) 130 self.f.write(s) 131 132 class FileReader(File): 133 134 "Reading basic data types from files." 135 136 def read_number(self): 137 138 "Read a number from the file." 139 140 # Read each byte, adding it to the number. 141 142 shift = 0 143 number = 0 144 more = 1 145 146 while more: 147 byte = self.f.read(1) 148 if not byte: 149 raise EOFError 150 151 csd = ord(byte) 152 more = csd & 128 != 0 153 if more: 154 csd &= 127 155 number += (csd << shift) 156 shift += 7 157 158 return number 159 160 def read_string(self, decompress=0): 161 162 """ 163 Read a string from the file, decompressing the stored data if 164 'decompress' is set to a true value. 165 """ 166 167 # Decompress the data if requested. 168 169 if decompress: 170 flag = self.f.read(1) 171 else: 172 flag = "-" 173 174 length = self.read_number() 175 s = self.f.read(length) 176 177 # Perform decompression if applicable. 178 179 if flag != "-": 180 fn = decompressors[flag] 181 s = fn(s) 182 183 # Convert strings to Unicode objects. 184 185 return unicode(s, "utf-8") 186 187 # Specific classes for storing term and position information. 188 189 class PositionWriter(FileWriter): 190 191 "Writing position information to files." 192 193 def reset(self): 194 self.last_docnum = 0 195 196 def write_positions(self, docnum, positions): 197 198 """ 199 Write for the document 'docnum' the given 'positions'. 200 Return the offset of the written record. 201 """ 202 203 if docnum < self.last_docnum: 204 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 205 206 # Record the offset of this record. 207 208 offset = self.f.tell() 209 210 # Write the document number delta. 211 212 self.write_number(docnum - self.last_docnum) 213 214 # Write the number of positions. 215 216 self.write_number(len(positions)) 217 218 # Make sure that the positions are sorted. 219 220 positions.sort() 221 222 # Write the position deltas. 223 224 last = 0 225 for position in positions: 226 pos = position - last 227 self.write_number(pos) 228 last = position 229 230 self.last_docnum = docnum 231 232 return offset 233 234 class PositionReader(FileReader): 235 236 "Reading position information from files." 237 238 def reset(self): 239 self.last_docnum = 0 240 241 def read_positions(self): 242 243 "Read positions, returning a document number and a list of positions." 244 245 # Read the document number delta and add it to the last number. 246 247 self.last_docnum += self.read_number() 248 249 # Read the number of positions. 250 251 npositions = self.read_number() 252 253 # Read the position deltas, adding each previous position to get the 254 # appropriate collection of absolute positions. 255 256 i = 0 257 last = 0 258 positions = [] 259 260 while i < npositions: 261 last += self.read_number() 262 positions.append(last) 263 i += 1 264 265 return self.last_docnum, positions 266 267 def read_term_positions(self, offset, count): 268 269 """ 270 Read all positions from 'offset', seeking to that position in the file 271 before reading. The number of documents available for reading is limited 272 to 'count'. 273 """ 274 275 self.reset() 276 277 # Duplicate the file handle. 278 279 f = fdopen(dup(self.f.fileno()), "rb") 280 f.seek(offset) 281 return PositionIterator(f, count) 282 283 class IteratorBase: 284 285 def __init__(self, count): 286 self.replenish(count) 287 288 def replenish(self, count): 289 self.count = count 290 self.read_documents = 0 291 292 def __len__(self): 293 return self.count 294 295 def sort(self): 296 pass # Stored document positions are already sorted. 297 298 def __iter__(self): 299 return self 300 301 class PositionIterator(PositionReader, IteratorBase): 302 303 "Iterating over document positions." 304 305 def __init__(self, f, count): 306 PositionReader.__init__(self, f) 307 IteratorBase.__init__(self, count) 308 309 def next(self): 310 311 "Read positions for a single document." 312 313 if self.read_documents < self.count: 314 self.read_documents += 1 315 return self.read_positions() 316 else: 317 raise StopIteration 318 319 class PositionIndexWriter(FileWriter): 320 321 "Writing position index information to files." 322 323 def reset(self): 324 self.last_docnum = 0 325 self.last_pos_offset = 0 326 327 def write_positions(self, docnum, pos_offset, count): 328 329 """ 330 Write the given 'docnum, 'pos_offset' and document 'count' to the 331 position index file. 332 """ 333 334 # Record the offset of this record. 335 336 offset = self.f.tell() 337 338 # Write the document number delta. 339 340 self.write_number(docnum - self.last_docnum) 341 self.last_docnum = docnum 342 343 # Write the position file offset delta. 344 345 self.write_number(pos_offset - self.last_pos_offset) 346 self.last_pos_offset = pos_offset 347 348 # Write the document count. 349 350 self.write_number(count) 351 352 return offset 353 354 class PositionIndexReader(FileReader): 355 356 "Reading position index information from files." 357 358 def reset(self): 359 self.last_docnum = 0 360 self.last_pos_offset = 0 361 362 def read_positions(self): 363 364 """ 365 Read a document number, a position file offset for the position index 366 file, and the number of documents in a section of that file. 367 """ 368 369 # Read the document number delta. 370 371 self.last_docnum += self.read_number() 372 373 # Read the offset delta. 374 375 self.last_pos_offset += self.read_number() 376 377 # Read the document count. 378 379 count = self.read_number() 380 381 return self.last_docnum, self.last_pos_offset, count 382 383 def read_term_positions(self, offset, doc_frequency): 384 385 """ 386 Read all positions from 'offset', seeking to that position in the file 387 before reading. The number of documents available for reading is limited 388 to 'doc_frequency'. 389 """ 390 391 # NOTE: This is almost a duplication of PositionReader.read_term_positions. 392 393 self.reset() 394 395 # Duplicate the file handle. 396 397 f = fdopen(dup(self.f.fileno()), "rb") 398 f.seek(offset) 399 return PositionIndexIterator(f, doc_frequency) 400 401 class PositionIndexIterator(PositionIndexReader, IteratorBase): 402 403 "Iterating over document positions." 404 405 def __init__(self, f, count): 406 PositionIndexReader.__init__(self, f) 407 IteratorBase.__init__(self, count) 408 self.section_count = 0 409 410 def next(self): 411 412 "Read positions for a single document." 413 414 self.read_documents += self.section_count 415 if self.read_documents < self.count: 416 docnum, pos_offset, self.section_count = t = self.read_positions() 417 return t 418 else: 419 raise StopIteration 420 421 class PositionDictionaryWriter: 422 423 "Writing position dictionaries." 424 425 def __init__(self, position_writer, position_index_writer, interval): 426 self.position_writer = position_writer 427 self.position_index_writer = position_index_writer 428 self.interval = interval 429 430 def write_term_positions(self, doc_positions): 431 432 """ 433 Write all 'doc_positions' - a collection of tuples of the form (document 434 number, position list) - to the file. 435 436 Add some records to the index, making dictionary entries. 437 438 Return a tuple containing the offset of the written data, the frequency 439 (number of positions), and document frequency (number of documents) for 440 the term involved. 441 """ 442 443 # Reset the writer. 444 445 self.position_writer.reset() 446 index_offset = None 447 448 # Write the positions. 449 450 frequency = 0 451 first_offset = None 452 count = 0 453 454 doc_positions.sort() 455 456 for docnum, positions in doc_positions: 457 pos_offset = self.position_writer.write_positions(docnum, positions) 458 459 # Retain the first record offset for a subsequent index entry. 460 461 if first_offset is None: 462 first_offset = pos_offset 463 464 frequency += len(positions) 465 466 # Every {interval} entries, write an index entry. 467 468 if count == self.interval: 469 io = self.position_index_writer.write_positions(docnum, first_offset, self.interval) 470 471 # Remember the first index entry offset. 472 473 if index_offset is None: 474 index_offset = io 475 476 first_offset = None 477 count = 0 478 479 count += 1 480 481 # Finish writing an index entry for the remaining documents. 482 483 else: 484 if first_offset is not None: 485 io = self.position_index_writer.write_positions(docnum, first_offset, count) 486 487 # Remember the first index entry offset. 488 489 if index_offset is None: 490 index_offset = io 491 492 return index_offset, frequency, len(doc_positions) 493 494 def close(self): 495 self.position_writer.close() 496 self.position_index_writer.close() 497 498 class PositionDictionaryReader: 499 500 "Reading position dictionaries." 501 502 def __init__(self, position_reader, position_index_reader): 503 self.position_reader = position_reader 504 self.position_index_reader = position_index_reader 505 506 def read_term_positions(self, offset, doc_frequency): 507 508 """ 509 Return an iterator for dictionary entries starting at 'offset' with the 510 given 'doc_frequency'. 511 """ 512 513 return PositionDictionaryIterator(self.position_reader, 514 self.position_index_reader, offset, doc_frequency) 515 516 def close(self): 517 self.position_reader.close() 518 self.position_index_reader.close() 519 520 class PositionDictionaryIterator: 521 522 "Iteration over position dictionary entries." 523 524 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 525 self.position_reader = position_reader 526 527 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 528 self.next_section() 529 self.init_section() 530 531 def __iter__(self): 532 return self 533 534 def next(self): 535 536 # Attempt to get the next document record from the section in the positions file. 537 538 while 1: 539 540 # Either return the next record. 541 542 try: 543 return self.iterator.next() 544 545 # Or, where a section is finished, get the next section and try again. 546 547 except StopIteration: 548 self.next_section() 549 self.iterator.replenish(self.section_count) 550 551 def next_section(self): 552 self.docnum, self.pos_offset, self.section_count = self.index_iterator.read_positions() 553 554 def init_section(self): 555 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 556 557 class TermWriter(FileWriter): 558 559 "Writing term information to files." 560 561 def reset(self): 562 self.last_term = "" 563 self.last_offset = 0 564 565 def write_term(self, term, offset, frequency, doc_frequency): 566 567 """ 568 Write the given 'term', its position file 'offset', its 'frequency' and 569 its 'doc_frequency' (number of documents in which it appears) to the 570 term information file. Return the offset after the term information was 571 written to the file. 572 """ 573 574 # Write the prefix length and term suffix. 575 576 common = len(commonprefix([self.last_term, term])) 577 suffix = term[common:] 578 579 self.write_number(common) 580 self.write_string(suffix) 581 582 # Write the offset delta. 583 584 self.write_number(offset - self.last_offset) 585 586 # Write the frequency. 587 588 self.write_number(frequency) 589 590 # Write the document frequency. 591 592 self.write_number(doc_frequency) 593 594 self.last_term = term 595 self.last_offset = offset 596 597 return self.f.tell() 598 599 class TermReader(FileReader): 600 601 "Reading term information from files." 602 603 def reset(self): 604 self.last_term = "" 605 self.last_offset = 0 606 607 def read_term(self): 608 609 """ 610 Read a term, its position file offset, its frequency and its document 611 frequence from the term information file. 612 """ 613 614 # Read the prefix length and term suffix. 615 616 common = self.read_number() 617 suffix = self.read_string() 618 619 self.last_term = self.last_term[:common] + suffix 620 621 # Read the offset delta. 622 623 self.last_offset += self.read_number() 624 625 # Read the frequency. 626 627 frequency = self.read_number() 628 629 # Read the document frequency. 630 631 doc_frequency = self.read_number() 632 633 return self.last_term, self.last_offset, frequency, doc_frequency 634 635 def go_to_term(self, term, offset, info_offset): 636 637 """ 638 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 639 permits the scanning for later terms from the specified term. 640 """ 641 642 self.f.seek(info_offset) 643 self.last_term = term 644 self.last_offset = offset 645 646 class TermIndexWriter(TermWriter): 647 648 "Writing term dictionary index details to files." 649 650 def reset(self): 651 TermWriter.reset(self) 652 self.last_info_offset = 0 653 654 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 655 656 """ 657 Write the given 'term', its position file 'offset', its 'frequency' and 658 its 'doc_frequency' to the term dictionary index file, along with the 659 'info_offset' in the term information file. 660 """ 661 662 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 663 664 # Write the information file offset delta. 665 666 self.write_number(info_offset - self.last_info_offset) 667 self.last_info_offset = info_offset 668 669 class TermIndexReader(TermReader): 670 671 "Reading term dictionary index details from files." 672 673 def reset(self): 674 TermReader.reset(self) 675 self.last_info_offset = 0 676 677 def read_term(self): 678 679 """ 680 Read a term, its position file offset, its frequency, its document 681 frequency and a term information file offset from the term dictionary 682 index file. 683 """ 684 685 term, offset, frequency, doc_frequency = TermReader.read_term(self) 686 687 # Read the offset delta. 688 689 self.last_info_offset += self.read_number() 690 691 return term, offset, frequency, doc_frequency, self.last_info_offset 692 693 class TermDictionaryWriter: 694 695 "Writing term dictionaries." 696 697 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 698 self.info_writer = info_writer 699 self.index_writer = index_writer 700 self.position_dict_writer = position_dict_writer 701 self.interval = interval 702 self.entry = 0 703 704 def _write_term(self, term, offset, frequency, doc_frequency): 705 706 """ 707 Write the given 'term', its position file 'offset', its 'frequency' and 708 its 'doc_frequency' (number of documents in which it appears) to the 709 term information file. Return the offset after the term information was 710 written to the file. 711 """ 712 713 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 714 715 if self.entry % self.interval == 0: 716 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 717 718 self.entry += 1 719 720 def write_term_positions(self, term, doc_positions): 721 722 """ 723 Write the given 'term' and the 'doc_positions' recording the documents 724 and positions at which the term is found. 725 """ 726 727 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 728 self._write_term(term, offset, frequency, doc_frequency) 729 730 def close(self): 731 self.info_writer.close() 732 self.index_writer.close() 733 self.position_dict_writer.close() 734 735 class TermDictionaryReader: 736 737 "Reading term dictionaries." 738 739 def __init__(self, info_reader, index_reader, position_reader): 740 self.info_reader = info_reader 741 self.index_reader = index_reader 742 self.position_reader = position_reader 743 744 self.terms = [] 745 try: 746 while 1: 747 self.terms.append(self.index_reader.read_term()) 748 except EOFError: 749 pass 750 751 # Large numbers for ordering purposes. 752 753 self.max_offset = self.terms[-1][1] + 1 754 755 def _find_term(self, term): 756 757 """ 758 Find the position file offset and frequency of 'term' from the term 759 dictionary. 760 """ 761 762 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 763 764 # Get the entry position providing the term or one preceding it. 765 766 if i == -1: 767 return None 768 769 found_term, offset, frequency, doc_frequency, info_offset = self.terms[i] 770 771 # Where the term is found immediately, return the offset and 772 # frequencies. 773 774 if term == found_term: 775 return offset, frequency, doc_frequency 776 777 # Otherwise, seek past the index term's entry in the information file 778 # and scan for the desired term. 779 780 else: 781 self.info_reader.go_to_term(found_term, offset, info_offset) 782 try: 783 while term > found_term: 784 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 785 except EOFError: 786 pass 787 788 # If the term is found, return the offset and frequencies. 789 790 if term == found_term: 791 return offset, frequency, doc_frequency 792 else: 793 return None 794 795 def rewind(self): 796 self.info_reader.rewind() 797 798 def _get_positions(self, offset, doc_frequency): 799 return self.position_reader.read_term_positions(offset, doc_frequency) 800 801 def read_term(self): 802 803 """ 804 Return the next term, its frequency, its document frequency, and the 805 documents and positions at which the term is found. 806 """ 807 808 term, offset, frequency, doc_frequency = self.info_reader.read_term() 809 positions = self._get_positions(offset, doc_frequency) 810 return term, frequency, doc_frequency, positions 811 812 def find_positions(self, term): 813 814 "Return the documents and positions at which the given 'term' is found." 815 816 t = self._find_term(term) 817 if t is None: 818 return None 819 else: 820 offset, frequency, doc_frequency = t 821 return self._get_positions(offset, doc_frequency) 822 823 def get_frequency(self, term): 824 825 "Return the frequency of the given 'term'." 826 827 t = self._find_term(term) 828 if t is None: 829 return None 830 else: 831 offset, frequency, doc_frequency = t 832 return frequency 833 834 def get_document_frequency(self, term): 835 836 "Return the document frequency of the given 'term'." 837 838 t = self._find_term(term) 839 if t is None: 840 return None 841 else: 842 offset, frequency, doc_frequency = t 843 return doc_frequency 844 845 def close(self): 846 self.info_reader.close() 847 self.index_reader.close() 848 self.position_reader.close() 849 850 # Specific classes for storing document information. 851 852 class FieldWriter(FileWriter): 853 854 "Writing field data to files." 855 856 def reset(self): 857 self.last_docnum = 0 858 859 def write_fields(self, docnum, fields): 860 861 """ 862 Write for the given 'docnum', a list of 'fields' (integer, string pairs 863 representing field identifiers and values respectively). 864 Return the offset at which the fields are stored. 865 """ 866 867 offset = self.f.tell() 868 869 # Write the document number delta. 870 871 self.write_number(docnum - self.last_docnum) 872 873 # Write the number of fields. 874 875 self.write_number(len(fields)) 876 877 # Write the fields themselves. 878 879 for i, field in fields: 880 self.write_number(i) 881 self.write_string(field, 1) # compress 882 883 self.last_docnum = docnum 884 return offset 885 886 class FieldReader(FileReader): 887 888 "Reading field data from files." 889 890 def reset(self): 891 self.last_docnum = 0 892 893 def read_fields(self): 894 895 """ 896 Read fields from the file, returning a tuple containing the document 897 number and a list of field (identifier, value) pairs. 898 """ 899 900 # Read the document number. 901 902 self.last_docnum += self.read_number() 903 904 # Read the number of fields. 905 906 nfields = self.read_number() 907 908 # Collect the fields. 909 910 fields = [] 911 i = 0 912 913 while i < nfields: 914 identifier = self.read_number() 915 value = self.read_string(1) # decompress 916 fields.append((identifier, value)) 917 i += 1 918 919 return self.last_docnum, fields 920 921 def read_document_fields(self, docnum, offset): 922 923 """ 924 Read fields for 'docnum' at the given 'offset'. This permits the 925 retrieval of details for the specified document, as well as scanning for 926 later documents. 927 """ 928 929 self.f.seek(offset) 930 bad_docnum, fields = self.read_fields() 931 self.last_docnum = docnum 932 return docnum, fields 933 934 class FieldIndexWriter(FileWriter): 935 936 "Writing field index details to files." 937 938 def reset(self): 939 self.last_docnum = 0 940 self.last_offset = 0 941 942 def write_document(self, docnum, offset): 943 944 """ 945 Write for the given 'docnum', the 'offset' at which the fields for the 946 document are stored in the fields file. 947 """ 948 949 # Write the document number and offset deltas. 950 951 self.write_number(docnum - self.last_docnum) 952 self.write_number(offset - self.last_offset) 953 954 self.last_docnum = docnum 955 self.last_offset = offset 956 957 class FieldIndexReader(FileReader): 958 959 "Reading field index details from files." 960 961 def reset(self): 962 self.last_docnum = 0 963 self.last_offset = 0 964 965 def read_document(self): 966 967 "Read a document number and field file offset." 968 969 # Read the document number delta and offset. 970 971 self.last_docnum += self.read_number() 972 self.last_offset += self.read_number() 973 974 return self.last_docnum, self.last_offset 975 976 class FieldDictionaryWriter: 977 978 "Writing field dictionary details." 979 980 def __init__(self, field_writer, field_index_writer, interval): 981 self.field_writer = field_writer 982 self.field_index_writer = field_index_writer 983 self.interval = interval 984 self.entry = 0 985 986 def write_fields(self, docnum, fields): 987 988 "Write details of the document with the given 'docnum' and 'fields'." 989 990 offset = self.field_writer.write_fields(docnum, fields) 991 992 if self.entry % self.interval == 0: 993 self.field_index_writer.write_document(docnum, offset) 994 995 self.entry += 1 996 997 def close(self): 998 self.field_writer.close() 999 self.field_index_writer.close() 1000 1001 class FieldDictionaryReader: 1002 1003 "Reading field dictionary details." 1004 1005 def __init__(self, field_reader, field_index_reader): 1006 self.field_reader = field_reader 1007 self.field_index_reader = field_index_reader 1008 1009 self.docs = [] 1010 try: 1011 while 1: 1012 self.docs.append(self.field_index_reader.read_document()) 1013 except EOFError: 1014 pass 1015 1016 # Large numbers for ordering purposes. 1017 1018 self.max_offset = self.docs[-1][1] 1019 1020 def rewind(self): 1021 self.field_reader.rewind() 1022 1023 def read_fields(self): 1024 1025 "Return the next document number and fields." 1026 1027 return self.field_reader.read_fields() 1028 1029 def get_fields(self, docnum): 1030 1031 "Read the fields of the document with the given 'docnum'." 1032 1033 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1034 1035 # Get the entry position providing the term or one preceding it. 1036 1037 if i == -1: 1038 return None 1039 1040 found_docnum, offset = self.docs[i] 1041 1042 # Read from the fields file. 1043 1044 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1045 1046 # Scan for the document, if necessary. 1047 1048 try: 1049 while docnum > found_docnum: 1050 found_docnum, fields = self.field_reader.read_fields() 1051 except EOFError: 1052 pass 1053 1054 # If the document is found, return the fields. 1055 1056 if docnum == found_docnum: 1057 return fields 1058 else: 1059 return None 1060 1061 def close(self): 1062 self.field_reader.close() 1063 self.field_index_reader.close() 1064 1065 # Dictionary merging classes. 1066 1067 class Merger: 1068 1069 "Merge files." 1070 1071 def __init__(self, writer, readers): 1072 self.writer = writer 1073 self.readers = readers 1074 1075 def close(self): 1076 for reader in self.readers: 1077 reader.close() 1078 self.writer.close() 1079 1080 class TermDictionaryMerger(Merger): 1081 1082 "Merge term and position files." 1083 1084 def merge(self): 1085 1086 """ 1087 Merge terms and positions from the readers, sending them to the writer. 1088 """ 1089 1090 entries = [] 1091 1092 # Get the first entries from the readers. 1093 1094 for partition, reader in enumerate(self.readers): 1095 reader.rewind() 1096 1097 try: 1098 term, frequency, doc_frequency, positions = reader.read_term() 1099 insort_right(entries, (term, positions, partition)) 1100 except EOFError: 1101 pass 1102 1103 # While entries are available, write them out in order, merging where 1104 # appropriate. 1105 1106 while entries: 1107 term, doc_positions, partition = entries[0] 1108 to_update = [partition] 1109 1110 nentries = len(entries) 1111 i = 1 1112 1113 # Find other entries for the term. 1114 1115 while i < nentries: 1116 other_term, other_doc_positions, other_partition = entries[i] 1117 1118 # For such entries, merge the positions. 1119 1120 if other_term == term: 1121 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1122 to_update.append(other_partition) 1123 i += 1 1124 else: 1125 break 1126 1127 # Write the combined term details. 1128 1129 self.writer.write_term_positions(term, doc_positions) 1130 1131 # Update the entries from the affected readers. 1132 1133 del entries[:i] 1134 1135 for partition in to_update: 1136 try: 1137 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1138 insort_right(entries, (term, positions, partition)) 1139 except EOFError: 1140 pass 1141 1142 def merge_positions(self, doc_positions, other_doc_positions): 1143 1144 """ 1145 Merge 'doc_positions' with 'other_doc_positions' so that common document 1146 records contain positions from both collections. 1147 """ 1148 1149 doc_position_dict = dict(doc_positions) 1150 1151 for docnum, positions in other_doc_positions: 1152 if doc_position_dict.has_key(docnum): 1153 doc_position_dict[docnum] += positions 1154 else: 1155 doc_position_dict[docnum] = positions 1156 1157 return doc_position_dict.items() 1158 1159 class FieldDictionaryMerger(Merger): 1160 1161 "Merge field files." 1162 1163 def merge(self): 1164 1165 """ 1166 Merge fields from the readers, sending them to the writer. 1167 """ 1168 1169 entries = [] 1170 1171 # Get the first entries from the readers. 1172 1173 for partition, reader in enumerate(self.readers): 1174 reader.rewind() 1175 1176 try: 1177 docnum, fields = reader.read_fields() 1178 insort_right(entries, (docnum, fields, partition)) 1179 except EOFError: 1180 pass 1181 1182 # While entries are available, write them out in order, merging where 1183 # appropriate. 1184 1185 while entries: 1186 docnum, fields, partition = entries[0] 1187 to_update = [partition] 1188 1189 nentries = len(entries) 1190 i = 1 1191 1192 # Find other entries for the term. 1193 1194 while i < nentries: 1195 other_docnum, other_fields, other_partition = entries[i] 1196 1197 # For such entries, merge the positions. 1198 1199 if other_docnum == docnum: 1200 fields += other_fields 1201 to_update.append(other_partition) 1202 i += 1 1203 else: 1204 break 1205 1206 # Write the combined term details. 1207 1208 self.writer.write_fields(docnum, fields) 1209 1210 # Update the entries from the affected readers. 1211 1212 del entries[:i] 1213 1214 for partition in to_update: 1215 try: 1216 docnum, fields = self.readers[partition].read_fields() 1217 insort_right(entries, (docnum, fields, partition)) 1218 except EOFError: 1219 pass 1220 1221 # Utility functions. 1222 1223 def get_term_writer(pathname, partition, interval, doc_interval): 1224 1225 """ 1226 Return a term dictionary writer using files under the given 'pathname' 1227 labelled according to the given 'partition', using the given indexing 1228 'interval' for terms and 'doc_interval' for document position records. 1229 """ 1230 1231 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1232 info_writer = TermWriter(tdf) 1233 1234 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1235 index_writer = TermIndexWriter(tdif) 1236 1237 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1238 positions_writer = PositionWriter(tpf) 1239 1240 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1241 positions_index_writer = PositionIndexWriter(tpif) 1242 1243 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1244 1245 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1246 1247 def get_field_writer(pathname, partition, interval): 1248 1249 """ 1250 Return a field dictionary writer using files under the given 'pathname' 1251 labelled according to the given 'partition', using the given indexing 1252 'interval'. 1253 """ 1254 1255 ff = open(join(pathname, "fields-%s" % partition), "wb") 1256 field_writer = FieldWriter(ff) 1257 1258 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1259 field_index_writer = FieldIndexWriter(fif) 1260 1261 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1262 1263 def get_term_reader(pathname, partition): 1264 1265 """ 1266 Return a term dictionary reader using files under the given 'pathname' 1267 labelled according to the given 'partition'. 1268 """ 1269 1270 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1271 info_reader = TermReader(tdf) 1272 1273 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1274 index_reader = TermIndexReader(tdif) 1275 1276 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1277 positions_reader = PositionReader(tpf) 1278 1279 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1280 positions_index_reader = PositionIndexReader(tpif) 1281 1282 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1283 1284 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1285 1286 def get_field_reader(pathname, partition): 1287 1288 """ 1289 Return a field dictionary reader using files under the given 'pathname' 1290 labelled according to the given 'partition'. 1291 """ 1292 1293 ff = open(join(pathname, "fields-%s" % partition), "rb") 1294 field_reader = FieldReader(ff) 1295 1296 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1297 field_index_reader = FieldIndexReader(fif) 1298 1299 return FieldDictionaryReader(field_reader, field_index_reader) 1300 1301 def rename_files(pathname, names, from_partition, to_partition): 1302 for name in names: 1303 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1304 1305 def rename_term_files(pathname, from_partition, to_partition): 1306 rename_files(pathname, ("terms", "terms_index", "positions"), from_partition, to_partition) 1307 1308 def rename_field_files(pathname, from_partition, to_partition): 1309 rename_files(pathname, ("fields", "fields_index"), from_partition, to_partition) 1310 1311 def remove_files(pathname, names, partition): 1312 for name in names: 1313 remove(join(pathname, "%s-%s" % (name, partition))) 1314 1315 def remove_term_files(pathname, partition): 1316 remove_files(pathname, ("terms", "terms_index", "positions"), partition) 1317 1318 def remove_field_files(pathname, partition): 1319 remove_files(pathname, ("fields", "fields_index"), partition) 1320 1321 # High-level classes. 1322 1323 class IndexWriter: 1324 1325 """ 1326 Building term information and writing it to the term and field dictionaries. 1327 """ 1328 1329 def __init__(self, pathname, interval, flush_interval): 1330 self.pathname = pathname 1331 self.interval = interval 1332 self.flush_interval = flush_interval 1333 1334 self.dict_partition = 0 1335 self.field_dict_partition = 0 1336 1337 self.terms = {} 1338 self.docs = {} 1339 1340 self.position_counter = 0 1341 self.field_counter = 0 1342 1343 def add_position(self, term, docnum, position): 1344 1345 """ 1346 Add a position entry for the given 'term' in the document with the given 1347 'docnum', indicating the given 'position'. 1348 """ 1349 1350 if not self.terms.has_key(term): 1351 doc_positions = self.terms[term] = {} 1352 else: 1353 doc_positions = self.terms[term] 1354 1355 if not doc_positions.has_key(docnum): 1356 doc = doc_positions[docnum] = [] 1357 else: 1358 doc = doc_positions[docnum] 1359 1360 doc.append(position) 1361 1362 self.position_counter += 1 1363 if self.flush_interval and self.position_counter >= self.flush_interval: 1364 self.flush_terms() 1365 self.position_counter = 0 1366 1367 def add_field(self, docnum, identifier, value): 1368 1369 """ 1370 Add for the document with the given 'docnum' a field having the given 1371 'identifier' and 'value'. 1372 """ 1373 1374 if not self.docs.has_key(docnum): 1375 doc_fields = self.docs[docnum] = [] 1376 else: 1377 doc_fields = self.docs[docnum] 1378 1379 doc_fields.append((identifier, value)) 1380 1381 self.field_counter += 1 1382 if self.flush_interval and self.field_counter >= self.flush_interval: 1383 self.flush_fields() 1384 self.field_counter = 0 1385 1386 def get_term_writer(self): 1387 1388 "Return a term dictionary writer for the current partition." 1389 1390 return get_term_writer(self.pathname, self.dict_partition, self.interval) 1391 1392 def get_field_writer(self): 1393 1394 "Return a field dictionary writer for the current partition." 1395 1396 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1397 1398 def flush_terms(self): 1399 1400 "Flush terms into the current term dictionary partition." 1401 1402 # Get the terms in order. 1403 1404 terms = self.terms.items() 1405 terms.sort() 1406 1407 dict_writer = self.get_term_writer() 1408 1409 for term, doc_positions in terms: 1410 doc_positions = doc_positions.items() 1411 dict_writer.write_term_positions(term, doc_positions) 1412 1413 dict_writer.close() 1414 1415 self.terms = {} 1416 self.dict_partition += 1 1417 1418 def flush_fields(self): 1419 1420 "Flush fields into the current term dictionary partition." 1421 1422 # Get the documents in order. 1423 1424 docs = self.docs.items() 1425 docs.sort() 1426 1427 field_dict_writer = self.get_field_writer() 1428 1429 for docnum, fields in docs: 1430 field_dict_writer.write_fields(docnum, fields) 1431 1432 field_dict_writer.close() 1433 1434 self.docs = {} 1435 self.field_dict_partition += 1 1436 1437 def close(self): 1438 if self.terms: 1439 self.flush_terms() 1440 if self.docs: 1441 self.flush_fields() 1442 1443 class IndexReader: 1444 1445 "Accessing the term and field dictionaries." 1446 1447 def __init__(self, pathname): 1448 self.dict_reader = get_term_reader(pathname, "merged") 1449 self.field_dict_reader = get_field_reader(pathname, "merged") 1450 1451 def find_positions(self, term): 1452 return self.dict_reader.find_positions(term) 1453 1454 def get_frequency(self, term): 1455 return self.dict_reader.get_frequency(term) 1456 1457 def get_fields(self, docnum): 1458 return self.field_dict_reader.get_fields(docnum) 1459 1460 def close(self): 1461 self.dict_reader.close() 1462 self.field_dict_reader.close() 1463 1464 class Index: 1465 1466 "An inverted index solution encapsulating the various components." 1467 1468 def __init__(self, pathname): 1469 self.pathname = pathname 1470 self.reader = None 1471 self.writer = None 1472 1473 def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL): 1474 1475 """ 1476 Return a writer, optionally using the given indexing 'interval' and 1477 'flush_interval'. 1478 """ 1479 1480 if not exists(self.pathname): 1481 mkdir(self.pathname) 1482 1483 self.writer = IndexWriter(self.pathname, interval, flush_interval) 1484 return self.writer 1485 1486 def get_reader(self, partition=0): 1487 1488 "Return a reader for the index." 1489 1490 # Ensure that only one partition exists. 1491 1492 self.merge_terms() 1493 self.merge_fields() 1494 1495 return self._get_reader(partition) 1496 1497 def _get_reader(self, partition): 1498 1499 "Return a reader for the index." 1500 1501 if not exists(self.pathname): 1502 raise OSError, "Index path %r does not exist." % self.pathname 1503 1504 self.reader = IndexReader(self.pathname) 1505 return self.reader 1506 1507 def merge_terms(self, interval=INTERVAL): 1508 1509 "Merge term dictionaries using the given indexing 'interval'." 1510 1511 readers = [] 1512 partitions = [] 1513 1514 for filename in listdir(self.pathname): 1515 if filename.startswith("terms-"): # 6 character prefix 1516 partition = filename[6:] 1517 readers.append(get_term_reader(self.pathname, partition)) 1518 partitions.append(partition) 1519 1520 # Write directly to a dictionary. 1521 1522 if len(readers) > 1: 1523 writer = get_term_writer(self.pathname, "merged", interval) 1524 merger = TermDictionaryMerger(writer, readers) 1525 merger.merge() 1526 merger.close() 1527 1528 # Remove old files. 1529 1530 for partition in partitions: 1531 remove_term_files(self.pathname, partition) 1532 1533 elif len(readers) == 1 and partitions[0] != "merged": 1534 rename_term_files(self.pathname, partitions[0], "merged") 1535 1536 def merge_fields(self, interval=INTERVAL): 1537 1538 "Merge field dictionaries using the given indexing 'interval'." 1539 1540 readers = [] 1541 partitions = [] 1542 1543 for filename in listdir(self.pathname): 1544 if filename.startswith("fields-"): # 7 character prefix 1545 partition = filename[7:] 1546 readers.append(get_field_reader(self.pathname, partition)) 1547 partitions.append(partition) 1548 1549 # Write directly to a dictionary. 1550 1551 if len(readers) > 1: 1552 writer = get_field_writer(self.pathname, "merged", interval) 1553 merger = FieldDictionaryMerger(writer, readers) 1554 merger.merge() 1555 merger.close() 1556 1557 # Remove old files. 1558 1559 for partition in partitions: 1560 remove_field_files(self.pathname, partition) 1561 1562 elif len(readers) == 1 and partitions[0] != "merged": 1563 rename_field_files(self.pathname, partitions[0], "merged") 1564 1565 def close(self): 1566 if self.reader is not None: 1567 self.reader.close() 1568 self.reader = None 1569 if self.writer is not None: 1570 self.writer.close() 1571 self.writer = None 1572 1573 # vim: tabstop=4 expandtab shiftwidth=4