1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 # Constants. 31 32 TERM_INTERVAL = 100 33 DOCUMENT_INTERVAL = 100 34 FIELD_INTERVAL = 100 35 FLUSH_INTERVAL = 1000000 36 37 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 38 FIELD_FILENAMES = "fields", "fields_index" 39 40 compressors = [("b", bz2.compress), ("z", zlib.compress)] 41 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 42 43 # Foundation classes. 44 45 class File: 46 47 "A basic file abstraction." 48 49 def __init__(self, f): 50 self.f = f 51 self.reset() 52 53 def reset(self): 54 55 "To be used to reset the state of the reader or writer between records." 56 57 pass 58 59 def rewind(self): 60 self.f.seek(0) 61 self.reset() 62 63 def close(self): 64 if self.f is not None: 65 self.f.close() 66 self.f = None 67 68 class FileWriter(File): 69 70 "Writing basic data types to files." 71 72 def write_number(self, number): 73 74 "Write 'number' to the file using a variable length encoding." 75 76 # Negative numbers are not supported. 77 78 if number < 0: 79 raise ValueError, "Number %r is negative." % number 80 81 # Special case: one byte containing zero. 82 83 elif number == 0: 84 self.f.write(chr(0)) 85 return 86 87 # Write the number from least to most significant digits. 88 89 bytes = [] 90 91 while number != 0: 92 lsd = number & 127 93 number = number >> 7 94 if number != 0: 95 lsd |= 128 96 bytes.append(chr(lsd)) 97 98 record = "".join(bytes) 99 self.f.write(record) 100 101 def write_string(self, s, compress=0): 102 103 """ 104 Write 's' to the file, recording its length and compressing the string 105 if 'compress' is set to a true value. 106 """ 107 108 # Convert Unicode objects to strings. 109 110 if isinstance(s, unicode): 111 s = s.encode("utf-8") 112 113 # Compress the string if requested. 114 115 if compress: 116 for flag, fn in compressors: 117 cs = fn(s) 118 119 # Take the first string shorter than the original. 120 121 if len(cs) < len(s): 122 s = cs 123 break 124 else: 125 flag = "-" 126 127 # Record whether compression was used. 128 129 self.f.write(flag) 130 131 # Write the length of the data before the data itself. 132 133 length = len(s) 134 self.write_number(length) 135 self.f.write(s) 136 137 class FileReader(File): 138 139 "Reading basic data types from files." 140 141 def read_number(self): 142 143 "Read a number from the file." 144 145 # Read each byte, adding it to the number. 146 147 shift = 0 148 number = 0 149 more = 1 150 151 while more: 152 byte = self.f.read(1) 153 if not byte: 154 raise EOFError 155 156 csd = ord(byte) 157 more = csd & 128 != 0 158 if more: 159 csd &= 127 160 number += (csd << shift) 161 shift += 7 162 163 return number 164 165 def read_string(self, decompress=0): 166 167 """ 168 Read a string from the file, decompressing the stored data if 169 'decompress' is set to a true value. 170 """ 171 172 # Decompress the data if requested. 173 174 if decompress: 175 flag = self.f.read(1) 176 else: 177 flag = "-" 178 179 length = self.read_number() 180 s = self.f.read(length) 181 182 # Perform decompression if applicable. 183 184 if flag != "-": 185 fn = decompressors[flag] 186 s = fn(s) 187 188 # Convert strings to Unicode objects. 189 190 return unicode(s, "utf-8") 191 192 # Specific classes for storing term and position information. 193 194 class PositionWriter(FileWriter): 195 196 "Writing position information to files." 197 198 def reset(self): 199 self.last_docnum = 0 200 201 def write_positions(self, docnum, positions): 202 203 """ 204 Write for the document 'docnum' the given 'positions'. 205 Return the offset of the written record. 206 """ 207 208 if docnum < self.last_docnum: 209 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 210 211 # Record the offset of this record. 212 213 offset = self.f.tell() 214 215 # Write the document number delta. 216 217 self.write_number(docnum - self.last_docnum) 218 219 # Write the number of positions. 220 221 self.write_number(len(positions)) 222 223 # Make sure that the positions are sorted. 224 225 positions.sort() 226 227 # Write the position deltas. 228 229 last = 0 230 for position in positions: 231 pos = position - last 232 self.write_number(pos) 233 last = position 234 235 self.last_docnum = docnum 236 237 return offset 238 239 class PositionReader(FileReader): 240 241 "Reading position information from files." 242 243 def reset(self): 244 self.last_docnum = 0 245 246 def read_positions(self): 247 248 "Read positions, returning a document number and a list of positions." 249 250 # Read the document number delta and add it to the last number. 251 252 self.last_docnum += self.read_number() 253 254 # Read the number of positions. 255 256 npositions = self.read_number() 257 258 # Read the position deltas, adding each previous position to get the 259 # appropriate collection of absolute positions. 260 261 i = 0 262 last = 0 263 positions = [] 264 265 while i < npositions: 266 last += self.read_number() 267 positions.append(last) 268 i += 1 269 270 return self.last_docnum, positions 271 272 def read_term_positions(self, offset, count): 273 274 """ 275 Read all positions from 'offset', seeking to that position in the file 276 before reading. The number of documents available for reading is limited 277 to 'count'. 278 """ 279 280 self.reset() 281 282 # Duplicate the file handle. 283 284 f = fdopen(dup(self.f.fileno()), "rb") 285 f.seek(offset) 286 return PositionIterator(f, count) 287 288 class IteratorBase: 289 290 def __init__(self, count): 291 self.replenish(count) 292 293 def replenish(self, count): 294 self.count = count 295 self.read_documents = 0 296 297 def __len__(self): 298 return self.count 299 300 def sort(self): 301 pass # Stored document positions are already sorted. 302 303 def __iter__(self): 304 return self 305 306 class PositionIterator(PositionReader, IteratorBase): 307 308 "Iterating over document positions." 309 310 def __init__(self, f, count): 311 PositionReader.__init__(self, f) 312 IteratorBase.__init__(self, count) 313 314 def next(self): 315 316 "Read positions for a single document." 317 318 if self.read_documents < self.count: 319 self.read_documents += 1 320 return self.read_positions() 321 else: 322 raise StopIteration 323 324 class PositionIndexWriter(FileWriter): 325 326 "Writing position index information to files." 327 328 def reset(self): 329 self.last_docnum = 0 330 self.last_pos_offset = 0 331 332 def write_positions(self, docnum, pos_offset, count): 333 334 """ 335 Write the given 'docnum, 'pos_offset' and document 'count' to the 336 position index file. 337 """ 338 339 # Record the offset of this record. 340 341 offset = self.f.tell() 342 343 # Write the document number delta. 344 345 self.write_number(docnum - self.last_docnum) 346 self.last_docnum = docnum 347 348 # Write the position file offset delta. 349 350 self.write_number(pos_offset - self.last_pos_offset) 351 self.last_pos_offset = pos_offset 352 353 # Write the document count. 354 355 self.write_number(count) 356 357 return offset 358 359 class PositionIndexReader(FileReader): 360 361 "Reading position index information from files." 362 363 def reset(self): 364 self.last_docnum = 0 365 self.last_pos_offset = 0 366 367 def read_positions(self): 368 369 """ 370 Read a document number, a position file offset for the position index 371 file, and the number of documents in a section of that file. 372 """ 373 374 # Read the document number delta. 375 376 self.last_docnum += self.read_number() 377 378 # Read the offset delta. 379 380 self.last_pos_offset += self.read_number() 381 382 # Read the document count. 383 384 count = self.read_number() 385 386 return self.last_docnum, self.last_pos_offset, count 387 388 def read_term_positions(self, offset, doc_frequency): 389 390 """ 391 Read all positions from 'offset', seeking to that position in the file 392 before reading. The number of documents available for reading is limited 393 to 'doc_frequency'. 394 """ 395 396 # NOTE: This is almost a duplication of PositionReader.read_term_positions. 397 398 self.reset() 399 400 # Duplicate the file handle. 401 402 f = fdopen(dup(self.f.fileno()), "rb") 403 f.seek(offset) 404 return PositionIndexIterator(f, doc_frequency) 405 406 class PositionIndexIterator(PositionIndexReader, IteratorBase): 407 408 "Iterating over document positions." 409 410 def __init__(self, f, count): 411 PositionIndexReader.__init__(self, f) 412 IteratorBase.__init__(self, count) 413 self.section_count = 0 414 415 def next(self): 416 417 "Read positions for a single document." 418 419 self.read_documents += self.section_count 420 if self.read_documents < self.count: 421 docnum, pos_offset, self.section_count = t = self.read_positions() 422 return t 423 else: 424 raise StopIteration 425 426 class PositionDictionaryWriter: 427 428 "Writing position dictionaries." 429 430 def __init__(self, position_writer, position_index_writer, interval): 431 self.position_writer = position_writer 432 self.position_index_writer = position_index_writer 433 self.interval = interval 434 435 def write_term_positions(self, doc_positions): 436 437 """ 438 Write all 'doc_positions' - a collection of tuples of the form (document 439 number, position list) - to the file. 440 441 Add some records to the index, making dictionary entries. 442 443 Return a tuple containing the offset of the written data, the frequency 444 (number of positions), and document frequency (number of documents) for 445 the term involved. 446 """ 447 448 # Reset the writers. 449 450 self.position_writer.reset() 451 self.position_index_writer.reset() 452 453 index_offset = None 454 455 # Write the positions. 456 457 frequency = 0 458 first_docnum = None 459 first_offset = None 460 count = 0 461 462 doc_positions.sort() 463 464 for docnum, positions in doc_positions: 465 pos_offset = self.position_writer.write_positions(docnum, positions) 466 467 # Retain the first record offset for a subsequent index entry. 468 469 if first_offset is None: 470 first_offset = pos_offset 471 first_docnum = docnum 472 473 frequency += len(positions) 474 count += 1 475 476 # Every {interval} entries, write an index entry. 477 478 if count == self.interval: 479 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 480 481 # Remember the first index entry offset. 482 483 if index_offset is None: 484 index_offset = io 485 486 first_offset = None 487 first_docnum = None 488 count = 0 489 490 # Finish writing an index entry for the remaining documents. 491 492 else: 493 if first_offset is not None: 494 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 495 496 # Remember the first index entry offset. 497 498 if index_offset is None: 499 index_offset = io 500 501 return index_offset, frequency, len(doc_positions) 502 503 def close(self): 504 self.position_writer.close() 505 self.position_index_writer.close() 506 507 class PositionDictionaryReader: 508 509 "Reading position dictionaries." 510 511 def __init__(self, position_reader, position_index_reader): 512 self.position_reader = position_reader 513 self.position_index_reader = position_index_reader 514 515 def read_term_positions(self, offset, doc_frequency): 516 517 """ 518 Return an iterator for dictionary entries starting at 'offset' with the 519 given 'doc_frequency'. 520 """ 521 522 return PositionDictionaryIterator(self.position_reader, 523 self.position_index_reader, offset, doc_frequency) 524 525 def close(self): 526 self.position_reader.close() 527 self.position_index_reader.close() 528 529 class PositionDictionaryIterator: 530 531 "Iteration over position dictionary entries." 532 533 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 534 self.position_reader = position_reader 535 self.doc_frequency = doc_frequency 536 537 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 538 self.next_section() 539 self.init_section() 540 541 def __len__(self): 542 return self.doc_frequency 543 544 def sort(self): 545 pass 546 547 def __iter__(self): 548 return self 549 550 def next(self): 551 552 # Attempt to get the next document record from the section in the positions file. 553 554 while 1: 555 556 # Either return the next record. 557 558 try: 559 return self.iterator.next() 560 561 # Or, where a section is finished, get the next section and try again. 562 563 except StopIteration: 564 565 # Where a section follows, update the index iterator, but keep 566 # reading using the same file iterator (since the data should 567 # just follow on from the last section). 568 569 self.next_section() 570 self.iterator.replenish(self.section_count) 571 572 def next_section(self): 573 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 574 575 def init_section(self): 576 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 577 578 class TermWriter(FileWriter): 579 580 "Writing term information to files." 581 582 def reset(self): 583 self.last_term = "" 584 self.last_offset = 0 585 586 def write_term(self, term, offset, frequency, doc_frequency): 587 588 """ 589 Write the given 'term', its position file 'offset', its 'frequency' and 590 its 'doc_frequency' (number of documents in which it appears) to the 591 term information file. Return the offset after the term information was 592 written to the file. 593 """ 594 595 # Write the prefix length and term suffix. 596 597 common = len(commonprefix([self.last_term, term])) 598 suffix = term[common:] 599 600 self.write_number(common) 601 self.write_string(suffix) 602 603 # Write the offset delta. 604 605 self.write_number(offset - self.last_offset) 606 607 # Write the frequency. 608 609 self.write_number(frequency) 610 611 # Write the document frequency. 612 613 self.write_number(doc_frequency) 614 615 self.last_term = term 616 self.last_offset = offset 617 618 return self.f.tell() 619 620 class TermReader(FileReader): 621 622 "Reading term information from files." 623 624 def reset(self): 625 self.last_term = "" 626 self.last_offset = 0 627 628 def read_term(self): 629 630 """ 631 Read a term, its position file offset, its frequency and its document 632 frequence from the term information file. 633 """ 634 635 # Read the prefix length and term suffix. 636 637 common = self.read_number() 638 suffix = self.read_string() 639 640 self.last_term = self.last_term[:common] + suffix 641 642 # Read the offset delta. 643 644 self.last_offset += self.read_number() 645 646 # Read the frequency. 647 648 frequency = self.read_number() 649 650 # Read the document frequency. 651 652 doc_frequency = self.read_number() 653 654 return self.last_term, self.last_offset, frequency, doc_frequency 655 656 def go_to_term(self, term, offset, info_offset): 657 658 """ 659 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 660 permits the scanning for later terms from the specified term. 661 """ 662 663 self.f.seek(info_offset) 664 self.last_term = term 665 self.last_offset = offset 666 667 class TermIndexWriter(TermWriter): 668 669 "Writing term dictionary index details to files." 670 671 def reset(self): 672 TermWriter.reset(self) 673 self.last_info_offset = 0 674 675 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 676 677 """ 678 Write the given 'term', its position file 'offset', its 'frequency' and 679 its 'doc_frequency' to the term dictionary index file, along with the 680 'info_offset' in the term information file. 681 """ 682 683 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 684 685 # Write the information file offset delta. 686 687 self.write_number(info_offset - self.last_info_offset) 688 self.last_info_offset = info_offset 689 690 class TermIndexReader(TermReader): 691 692 "Reading term dictionary index details from files." 693 694 def reset(self): 695 TermReader.reset(self) 696 self.last_info_offset = 0 697 698 def read_term(self): 699 700 """ 701 Read a term, its position file offset, its frequency, its document 702 frequency and a term information file offset from the term dictionary 703 index file. 704 """ 705 706 term, offset, frequency, doc_frequency = TermReader.read_term(self) 707 708 # Read the offset delta. 709 710 self.last_info_offset += self.read_number() 711 712 return term, offset, frequency, doc_frequency, self.last_info_offset 713 714 class TermDictionaryWriter: 715 716 "Writing term dictionaries." 717 718 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 719 self.info_writer = info_writer 720 self.index_writer = index_writer 721 self.position_dict_writer = position_dict_writer 722 self.interval = interval 723 self.entry = 0 724 725 def _write_term(self, term, offset, frequency, doc_frequency): 726 727 """ 728 Write the given 'term', its position file 'offset', its 'frequency' and 729 its 'doc_frequency' (number of documents in which it appears) to the 730 term information file. Return the offset after the term information was 731 written to the file. 732 """ 733 734 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 735 736 if self.entry % self.interval == 0: 737 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 738 739 self.entry += 1 740 741 def write_term_positions(self, term, doc_positions): 742 743 """ 744 Write the given 'term' and the 'doc_positions' recording the documents 745 and positions at which the term is found. 746 """ 747 748 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 749 self._write_term(term, offset, frequency, doc_frequency) 750 751 def close(self): 752 self.info_writer.close() 753 self.index_writer.close() 754 self.position_dict_writer.close() 755 756 class TermDictionaryReader: 757 758 "Reading term dictionaries." 759 760 def __init__(self, info_reader, index_reader, position_reader): 761 self.info_reader = info_reader 762 self.index_reader = index_reader 763 self.position_reader = position_reader 764 765 self.terms = [] 766 try: 767 while 1: 768 self.terms.append(self.index_reader.read_term()) 769 except EOFError: 770 pass 771 772 # Large numbers for ordering purposes. 773 774 self.max_offset = self.terms[-1][1] + 1 775 776 def _find_term(self, term): 777 778 """ 779 Find the position file offset and frequency of 'term' from the term 780 dictionary. 781 """ 782 783 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 784 785 # Get the entry position providing the term or one preceding it. 786 787 if i == -1: 788 return None 789 790 found_term, offset, frequency, doc_frequency, info_offset = self.terms[i] 791 792 # Where the term is found immediately, return the offset and 793 # frequencies. 794 795 if term == found_term: 796 return offset, frequency, doc_frequency 797 798 # Otherwise, seek past the index term's entry in the information file 799 # and scan for the desired term. 800 801 else: 802 self.info_reader.go_to_term(found_term, offset, info_offset) 803 try: 804 while term > found_term: 805 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 806 except EOFError: 807 pass 808 809 # If the term is found, return the offset and frequencies. 810 811 if term == found_term: 812 return offset, frequency, doc_frequency 813 else: 814 return None 815 816 def rewind(self): 817 self.info_reader.rewind() 818 819 def _get_positions(self, offset, doc_frequency): 820 return self.position_reader.read_term_positions(offset, doc_frequency) 821 822 def read_term(self): 823 824 """ 825 Return the next term, its frequency, its document frequency, and the 826 documents and positions at which the term is found. 827 """ 828 829 term, offset, frequency, doc_frequency = self.info_reader.read_term() 830 positions = self._get_positions(offset, doc_frequency) 831 return term, frequency, doc_frequency, positions 832 833 def find_positions(self, term): 834 835 "Return the documents and positions at which the given 'term' is found." 836 837 t = self._find_term(term) 838 if t is None: 839 return None 840 else: 841 offset, frequency, doc_frequency = t 842 return self._get_positions(offset, doc_frequency) 843 844 def get_frequency(self, term): 845 846 "Return the frequency of the given 'term'." 847 848 t = self._find_term(term) 849 if t is None: 850 return None 851 else: 852 offset, frequency, doc_frequency = t 853 return frequency 854 855 def get_document_frequency(self, term): 856 857 "Return the document frequency of the given 'term'." 858 859 t = self._find_term(term) 860 if t is None: 861 return None 862 else: 863 offset, frequency, doc_frequency = t 864 return doc_frequency 865 866 def close(self): 867 self.info_reader.close() 868 self.index_reader.close() 869 self.position_reader.close() 870 871 # Specific classes for storing document information. 872 873 class FieldWriter(FileWriter): 874 875 "Writing field data to files." 876 877 def reset(self): 878 self.last_docnum = 0 879 880 def write_fields(self, docnum, fields): 881 882 """ 883 Write for the given 'docnum', a list of 'fields' (integer, string pairs 884 representing field identifiers and values respectively). 885 Return the offset at which the fields are stored. 886 """ 887 888 offset = self.f.tell() 889 890 # Write the document number delta. 891 892 self.write_number(docnum - self.last_docnum) 893 894 # Write the number of fields. 895 896 self.write_number(len(fields)) 897 898 # Write the fields themselves. 899 900 for i, field in fields: 901 self.write_number(i) 902 self.write_string(field, 1) # compress 903 904 self.last_docnum = docnum 905 return offset 906 907 class FieldReader(FileReader): 908 909 "Reading field data from files." 910 911 def reset(self): 912 self.last_docnum = 0 913 914 def read_fields(self): 915 916 """ 917 Read fields from the file, returning a tuple containing the document 918 number and a list of field (identifier, value) pairs. 919 """ 920 921 # Read the document number. 922 923 self.last_docnum += self.read_number() 924 925 # Read the number of fields. 926 927 nfields = self.read_number() 928 929 # Collect the fields. 930 931 fields = [] 932 i = 0 933 934 while i < nfields: 935 identifier = self.read_number() 936 value = self.read_string(1) # decompress 937 fields.append((identifier, value)) 938 i += 1 939 940 return self.last_docnum, fields 941 942 def read_document_fields(self, docnum, offset): 943 944 """ 945 Read fields for 'docnum' at the given 'offset'. This permits the 946 retrieval of details for the specified document, as well as scanning for 947 later documents. 948 """ 949 950 self.f.seek(offset) 951 bad_docnum, fields = self.read_fields() 952 self.last_docnum = docnum 953 return docnum, fields 954 955 class FieldIndexWriter(FileWriter): 956 957 "Writing field index details to files." 958 959 def reset(self): 960 self.last_docnum = 0 961 self.last_offset = 0 962 963 def write_document(self, docnum, offset): 964 965 """ 966 Write for the given 'docnum', the 'offset' at which the fields for the 967 document are stored in the fields file. 968 """ 969 970 # Write the document number and offset deltas. 971 972 self.write_number(docnum - self.last_docnum) 973 self.write_number(offset - self.last_offset) 974 975 self.last_docnum = docnum 976 self.last_offset = offset 977 978 class FieldIndexReader(FileReader): 979 980 "Reading field index details from files." 981 982 def reset(self): 983 self.last_docnum = 0 984 self.last_offset = 0 985 986 def read_document(self): 987 988 "Read a document number and field file offset." 989 990 # Read the document number delta and offset. 991 992 self.last_docnum += self.read_number() 993 self.last_offset += self.read_number() 994 995 return self.last_docnum, self.last_offset 996 997 class FieldDictionaryWriter: 998 999 "Writing field dictionary details." 1000 1001 def __init__(self, field_writer, field_index_writer, interval): 1002 self.field_writer = field_writer 1003 self.field_index_writer = field_index_writer 1004 self.interval = interval 1005 self.entry = 0 1006 1007 def write_fields(self, docnum, fields): 1008 1009 "Write details of the document with the given 'docnum' and 'fields'." 1010 1011 offset = self.field_writer.write_fields(docnum, fields) 1012 1013 if self.entry % self.interval == 0: 1014 self.field_index_writer.write_document(docnum, offset) 1015 1016 self.entry += 1 1017 1018 def close(self): 1019 self.field_writer.close() 1020 self.field_index_writer.close() 1021 1022 class FieldDictionaryReader: 1023 1024 "Reading field dictionary details." 1025 1026 def __init__(self, field_reader, field_index_reader): 1027 self.field_reader = field_reader 1028 self.field_index_reader = field_index_reader 1029 1030 self.docs = [] 1031 try: 1032 while 1: 1033 self.docs.append(self.field_index_reader.read_document()) 1034 except EOFError: 1035 pass 1036 1037 # Large numbers for ordering purposes. 1038 1039 self.max_offset = self.docs[-1][1] 1040 1041 def rewind(self): 1042 self.field_reader.rewind() 1043 1044 def read_fields(self): 1045 1046 "Return the next document number and fields." 1047 1048 return self.field_reader.read_fields() 1049 1050 def get_fields(self, docnum): 1051 1052 "Read the fields of the document with the given 'docnum'." 1053 1054 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1055 1056 # Get the entry position providing the term or one preceding it. 1057 1058 if i == -1: 1059 return None 1060 1061 found_docnum, offset = self.docs[i] 1062 1063 # Read from the fields file. 1064 1065 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1066 1067 # Scan for the document, if necessary. 1068 1069 try: 1070 while docnum > found_docnum: 1071 found_docnum, fields = self.field_reader.read_fields() 1072 except EOFError: 1073 pass 1074 1075 # If the document is found, return the fields. 1076 1077 if docnum == found_docnum: 1078 return fields 1079 else: 1080 return None 1081 1082 def close(self): 1083 self.field_reader.close() 1084 self.field_index_reader.close() 1085 1086 # Dictionary merging classes. 1087 1088 class Merger: 1089 1090 "Merge files." 1091 1092 def __init__(self, writer, readers): 1093 self.writer = writer 1094 self.readers = readers 1095 1096 def close(self): 1097 for reader in self.readers: 1098 reader.close() 1099 self.writer.close() 1100 1101 class TermDictionaryMerger(Merger): 1102 1103 "Merge term and position files." 1104 1105 def merge(self): 1106 1107 """ 1108 Merge terms and positions from the readers, sending them to the writer. 1109 """ 1110 1111 entries = [] 1112 1113 # Get the first entries from the readers. 1114 1115 for partition, reader in enumerate(self.readers): 1116 reader.rewind() 1117 1118 try: 1119 term, frequency, doc_frequency, positions = reader.read_term() 1120 insort_right(entries, (term, positions, partition)) 1121 except EOFError: 1122 pass 1123 1124 # While entries are available, write them out in order, merging where 1125 # appropriate. 1126 1127 while entries: 1128 term, doc_positions, partition = entries[0] 1129 to_update = [partition] 1130 1131 nentries = len(entries) 1132 i = 1 1133 1134 # Find other entries for the term. 1135 1136 while i < nentries: 1137 other_term, other_doc_positions, other_partition = entries[i] 1138 1139 # For such entries, merge the positions. 1140 1141 if other_term == term: 1142 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1143 to_update.append(other_partition) 1144 i += 1 1145 else: 1146 break 1147 1148 # Write the combined term details. 1149 1150 self.writer.write_term_positions(term, doc_positions) 1151 1152 # Update the entries from the affected readers. 1153 1154 del entries[:i] 1155 1156 for partition in to_update: 1157 try: 1158 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1159 insort_right(entries, (term, positions, partition)) 1160 except EOFError: 1161 pass 1162 1163 def merge_positions(self, doc_positions, other_doc_positions): 1164 1165 """ 1166 Merge 'doc_positions' with 'other_doc_positions' so that common document 1167 records contain positions from both collections. 1168 """ 1169 1170 doc_position_dict = dict(doc_positions) 1171 1172 for docnum, positions in other_doc_positions: 1173 if doc_position_dict.has_key(docnum): 1174 doc_position_dict[docnum] += positions 1175 else: 1176 doc_position_dict[docnum] = positions 1177 1178 return doc_position_dict.items() 1179 1180 class FieldDictionaryMerger(Merger): 1181 1182 "Merge field files." 1183 1184 def merge(self): 1185 1186 """ 1187 Merge fields from the readers, sending them to the writer. 1188 """ 1189 1190 entries = [] 1191 1192 # Get the first entries from the readers. 1193 1194 for partition, reader in enumerate(self.readers): 1195 reader.rewind() 1196 1197 try: 1198 docnum, fields = reader.read_fields() 1199 insort_right(entries, (docnum, fields, partition)) 1200 except EOFError: 1201 pass 1202 1203 # While entries are available, write them out in order, merging where 1204 # appropriate. 1205 1206 while entries: 1207 docnum, fields, partition = entries[0] 1208 to_update = [partition] 1209 1210 nentries = len(entries) 1211 i = 1 1212 1213 # Find other entries for the term. 1214 1215 while i < nentries: 1216 other_docnum, other_fields, other_partition = entries[i] 1217 1218 # For such entries, merge the positions. 1219 1220 if other_docnum == docnum: 1221 fields += other_fields 1222 to_update.append(other_partition) 1223 i += 1 1224 else: 1225 break 1226 1227 # Write the combined term details. 1228 1229 self.writer.write_fields(docnum, fields) 1230 1231 # Update the entries from the affected readers. 1232 1233 del entries[:i] 1234 1235 for partition in to_update: 1236 try: 1237 docnum, fields = self.readers[partition].read_fields() 1238 insort_right(entries, (docnum, fields, partition)) 1239 except EOFError: 1240 pass 1241 1242 # Utility functions. 1243 1244 def get_term_writer(pathname, partition, interval, doc_interval): 1245 1246 """ 1247 Return a term dictionary writer using files under the given 'pathname' 1248 labelled according to the given 'partition', using the given indexing 1249 'interval' for terms and 'doc_interval' for document position records. 1250 """ 1251 1252 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1253 info_writer = TermWriter(tdf) 1254 1255 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1256 index_writer = TermIndexWriter(tdif) 1257 1258 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1259 positions_writer = PositionWriter(tpf) 1260 1261 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1262 positions_index_writer = PositionIndexWriter(tpif) 1263 1264 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1265 1266 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1267 1268 def get_field_writer(pathname, partition, interval): 1269 1270 """ 1271 Return a field dictionary writer using files under the given 'pathname' 1272 labelled according to the given 'partition', using the given indexing 1273 'interval'. 1274 """ 1275 1276 ff = open(join(pathname, "fields-%s" % partition), "wb") 1277 field_writer = FieldWriter(ff) 1278 1279 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1280 field_index_writer = FieldIndexWriter(fif) 1281 1282 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1283 1284 def get_term_reader(pathname, partition): 1285 1286 """ 1287 Return a term dictionary reader using files under the given 'pathname' 1288 labelled according to the given 'partition'. 1289 """ 1290 1291 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1292 info_reader = TermReader(tdf) 1293 1294 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1295 index_reader = TermIndexReader(tdif) 1296 1297 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1298 positions_reader = PositionReader(tpf) 1299 1300 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1301 positions_index_reader = PositionIndexReader(tpif) 1302 1303 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1304 1305 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1306 1307 def get_field_reader(pathname, partition): 1308 1309 """ 1310 Return a field dictionary reader using files under the given 'pathname' 1311 labelled according to the given 'partition'. 1312 """ 1313 1314 ff = open(join(pathname, "fields-%s" % partition), "rb") 1315 field_reader = FieldReader(ff) 1316 1317 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1318 field_index_reader = FieldIndexReader(fif) 1319 1320 return FieldDictionaryReader(field_reader, field_index_reader) 1321 1322 def rename_files(pathname, names, from_partition, to_partition): 1323 for name in names: 1324 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1325 1326 def rename_term_files(pathname, from_partition, to_partition): 1327 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1328 1329 def rename_field_files(pathname, from_partition, to_partition): 1330 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1331 1332 def remove_files(pathname, names, partition): 1333 for name in names: 1334 remove(join(pathname, "%s-%s" % (name, partition))) 1335 1336 def remove_term_files(pathname, partition): 1337 remove_files(pathname, TERM_FILENAMES, partition) 1338 1339 def remove_field_files(pathname, partition): 1340 remove_files(pathname, FIELD_FILENAMES, partition) 1341 1342 # High-level classes. 1343 1344 class IndexWriter: 1345 1346 """ 1347 Building term information and writing it to the term and field dictionaries. 1348 """ 1349 1350 def __init__(self, pathname, interval, doc_interval, flush_interval): 1351 self.pathname = pathname 1352 self.interval = interval 1353 self.doc_interval = doc_interval 1354 self.flush_interval = flush_interval 1355 1356 self.dict_partition = 0 1357 self.field_dict_partition = 0 1358 1359 self.terms = {} 1360 self.docs = {} 1361 1362 self.position_counter = 0 1363 self.field_counter = 0 1364 1365 def add_position(self, term, docnum, position): 1366 1367 """ 1368 Add a position entry for the given 'term' in the document with the given 1369 'docnum', indicating the given 'position'. 1370 """ 1371 1372 if not self.terms.has_key(term): 1373 doc_positions = self.terms[term] = {} 1374 else: 1375 doc_positions = self.terms[term] 1376 1377 if not doc_positions.has_key(docnum): 1378 doc = doc_positions[docnum] = [] 1379 else: 1380 doc = doc_positions[docnum] 1381 1382 doc.append(position) 1383 1384 self.position_counter += 1 1385 if self.flush_interval and self.position_counter >= self.flush_interval: 1386 self.flush_terms() 1387 self.position_counter = 0 1388 1389 def add_field(self, docnum, identifier, value): 1390 1391 """ 1392 Add for the document with the given 'docnum' a field having the given 1393 'identifier' and 'value'. 1394 """ 1395 1396 if not self.docs.has_key(docnum): 1397 doc_fields = self.docs[docnum] = [] 1398 else: 1399 doc_fields = self.docs[docnum] 1400 1401 doc_fields.append((identifier, value)) 1402 1403 self.field_counter += 1 1404 if self.flush_interval and self.field_counter >= self.flush_interval: 1405 self.flush_fields() 1406 self.field_counter = 0 1407 1408 def get_term_writer(self): 1409 1410 "Return a term dictionary writer for the current partition." 1411 1412 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1413 1414 def get_field_writer(self): 1415 1416 "Return a field dictionary writer for the current partition." 1417 1418 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1419 1420 def flush_terms(self): 1421 1422 "Flush terms into the current term dictionary partition." 1423 1424 # Get the terms in order. 1425 1426 terms = self.terms.items() 1427 terms.sort() 1428 1429 dict_writer = self.get_term_writer() 1430 1431 for term, doc_positions in terms: 1432 doc_positions = doc_positions.items() 1433 dict_writer.write_term_positions(term, doc_positions) 1434 1435 dict_writer.close() 1436 1437 self.terms = {} 1438 self.dict_partition += 1 1439 1440 def flush_fields(self): 1441 1442 "Flush fields into the current term dictionary partition." 1443 1444 # Get the documents in order. 1445 1446 docs = self.docs.items() 1447 docs.sort() 1448 1449 field_dict_writer = self.get_field_writer() 1450 1451 for docnum, fields in docs: 1452 field_dict_writer.write_fields(docnum, fields) 1453 1454 field_dict_writer.close() 1455 1456 self.docs = {} 1457 self.field_dict_partition += 1 1458 1459 def close(self): 1460 if self.terms: 1461 self.flush_terms() 1462 if self.docs: 1463 self.flush_fields() 1464 1465 class IndexReader: 1466 1467 "Accessing the term and field dictionaries." 1468 1469 def __init__(self, pathname): 1470 self.dict_reader = get_term_reader(pathname, "merged") 1471 self.field_dict_reader = get_field_reader(pathname, "merged") 1472 1473 def find_positions(self, term): 1474 return self.dict_reader.find_positions(term) 1475 1476 def get_frequency(self, term): 1477 return self.dict_reader.get_frequency(term) 1478 1479 def get_fields(self, docnum): 1480 return self.field_dict_reader.get_fields(docnum) 1481 1482 def close(self): 1483 self.dict_reader.close() 1484 self.field_dict_reader.close() 1485 1486 class Index: 1487 1488 "An inverted index solution encapsulating the various components." 1489 1490 def __init__(self, pathname): 1491 self.pathname = pathname 1492 self.reader = None 1493 self.writer = None 1494 1495 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1496 1497 """ 1498 Return a writer, optionally using the given indexing 'interval', 1499 'doc_interval' and 'flush_interval'. 1500 """ 1501 1502 if not exists(self.pathname): 1503 mkdir(self.pathname) 1504 1505 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1506 return self.writer 1507 1508 def get_reader(self, partition=0): 1509 1510 "Return a reader for the index." 1511 1512 # Ensure that only one partition exists. 1513 1514 self.merge_terms() 1515 self.merge_fields() 1516 1517 return self._get_reader(partition) 1518 1519 def _get_reader(self, partition): 1520 1521 "Return a reader for the index." 1522 1523 if not exists(self.pathname): 1524 raise OSError, "Index path %r does not exist." % self.pathname 1525 1526 self.reader = IndexReader(self.pathname) 1527 return self.reader 1528 1529 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1530 1531 """ 1532 Merge term dictionaries using the given indexing 'interval' and 1533 'doc_interval'. 1534 """ 1535 1536 readers = [] 1537 partitions = [] 1538 1539 for filename in listdir(self.pathname): 1540 if filename.startswith("terms-"): # 6 character prefix 1541 partition = filename[6:] 1542 readers.append(get_term_reader(self.pathname, partition)) 1543 partitions.append(partition) 1544 1545 # Write directly to a dictionary. 1546 1547 if len(readers) > 1: 1548 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1549 merger = TermDictionaryMerger(writer, readers) 1550 merger.merge() 1551 merger.close() 1552 1553 # Remove old files. 1554 1555 for partition in partitions: 1556 remove_term_files(self.pathname, partition) 1557 1558 elif len(readers) == 1 and partitions[0] != "merged": 1559 rename_term_files(self.pathname, partitions[0], "merged") 1560 1561 def merge_fields(self, interval=FIELD_INTERVAL): 1562 1563 "Merge field dictionaries using the given indexing 'interval'." 1564 1565 readers = [] 1566 partitions = [] 1567 1568 for filename in listdir(self.pathname): 1569 if filename.startswith("fields-"): # 7 character prefix 1570 partition = filename[7:] 1571 readers.append(get_field_reader(self.pathname, partition)) 1572 partitions.append(partition) 1573 1574 # Write directly to a dictionary. 1575 1576 if len(readers) > 1: 1577 writer = get_field_writer(self.pathname, "merged", interval) 1578 merger = FieldDictionaryMerger(writer, readers) 1579 merger.merge() 1580 merger.close() 1581 1582 # Remove old files. 1583 1584 for partition in partitions: 1585 remove_field_files(self.pathname, partition) 1586 1587 elif len(readers) == 1 and partitions[0] != "merged": 1588 rename_field_files(self.pathname, partitions[0], "merged") 1589 1590 def close(self): 1591 if self.reader is not None: 1592 self.reader.close() 1593 self.reader = None 1594 if self.writer is not None: 1595 self.writer.close() 1596 self.writer = None 1597 1598 # vim: tabstop=4 expandtab shiftwidth=4