1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 1000000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Foundation classes. 49 50 class File: 51 52 "A basic file abstraction." 53 54 def __init__(self, f): 55 self.f = f 56 self.reset() 57 58 def reset(self): 59 60 "To be used to reset the state of the reader or writer between records." 61 62 pass 63 64 def rewind(self): 65 self.f.seek(0) 66 self.reset() 67 68 def close(self): 69 if self.f is not None: 70 self.f.close() 71 self.f = None 72 73 class FileWriter(File): 74 75 "Writing basic data types to files." 76 77 def write_number(self, number): 78 79 "Write 'number' to the file using a variable length encoding." 80 81 # Negative numbers are not supported. 82 83 if number < 0: 84 raise ValueError, "Number %r is negative." % number 85 86 # Special case: one byte containing zero. 87 88 elif number == 0: 89 self.f.write(chr(0)) 90 return 91 92 # Write the number from least to most significant digits. 93 94 bytes = [] 95 96 while number != 0: 97 lsd = number & 127 98 number = number >> 7 99 if number != 0: 100 lsd |= 128 101 bytes.append(chr(lsd)) 102 103 record = "".join(bytes) 104 self.f.write(record) 105 106 def write_string(self, s, compress=0): 107 108 """ 109 Write 's' to the file, recording its length and compressing the string 110 if 'compress' is set to a true value. 111 """ 112 113 # Convert Unicode objects to strings. 114 115 if isinstance(s, unicode): 116 s = s.encode("utf-8") 117 118 # Compress the string if requested. 119 120 if compress: 121 for flag, fn in compressors: 122 cs = fn(s) 123 124 # Take the first string shorter than the original. 125 126 if len(cs) < len(s): 127 s = cs 128 break 129 else: 130 flag = "-" 131 132 # Record whether compression was used. 133 134 self.f.write(flag) 135 136 # Write the length of the data before the data itself. 137 138 length = len(s) 139 self.write_number(length) 140 self.f.write(s) 141 142 class FileReader(File): 143 144 "Reading basic data types from files." 145 146 def read_number(self): 147 148 "Read a number from the file." 149 150 # Read each byte, adding it to the number. 151 152 shift = 0 153 number = 0 154 more = 1 155 156 while more: 157 byte = self.f.read(1) 158 if not byte: 159 raise EOFError 160 161 csd = ord(byte) 162 more = csd & 128 != 0 163 if more: 164 csd &= 127 165 number += (csd << shift) 166 shift += 7 167 168 return number 169 170 def read_string(self, decompress=0): 171 172 """ 173 Read a string from the file, decompressing the stored data if 174 'decompress' is set to a true value. 175 """ 176 177 # Decompress the data if requested. 178 179 if decompress: 180 flag = self.f.read(1) 181 else: 182 flag = "-" 183 184 length = self.read_number() 185 s = self.f.read(length) 186 187 # Perform decompression if applicable. 188 189 if flag != "-": 190 fn = decompressors[flag] 191 s = fn(s) 192 193 # Convert strings to Unicode objects. 194 195 return unicode(s, "utf-8") 196 197 # Specific classes for storing term and position information. 198 199 class PositionWriter(FileWriter): 200 201 "Writing position information to files." 202 203 def reset(self): 204 self.last_docnum = 0 205 206 def write_positions(self, docnum, positions): 207 208 """ 209 Write for the document 'docnum' the given 'positions'. 210 Return the offset of the written record. 211 """ 212 213 if docnum < self.last_docnum: 214 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 215 216 # Record the offset of this record. 217 218 offset = self.f.tell() 219 220 # Write the document number delta. 221 222 self.write_number(docnum - self.last_docnum) 223 224 # Write the number of positions. 225 226 self.write_number(len(positions)) 227 228 # Make sure that the positions are sorted. 229 230 positions.sort() 231 232 # Write the position deltas. 233 234 last = 0 235 for position in positions: 236 pos = position - last 237 self.write_number(pos) 238 last = position 239 240 self.last_docnum = docnum 241 242 return offset 243 244 class PositionReader(FileReader): 245 246 "Reading position information from files." 247 248 def reset(self): 249 self.last_docnum = 0 250 251 def read_positions(self): 252 253 "Read positions, returning a document number and a list of positions." 254 255 # Read the document number delta and add it to the last number. 256 257 self.last_docnum += self.read_number() 258 259 # Read the number of positions. 260 261 npositions = self.read_number() 262 263 # Read the position deltas, adding each previous position to get the 264 # appropriate collection of absolute positions. 265 266 i = 0 267 last = 0 268 positions = [] 269 270 while i < npositions: 271 last += self.read_number() 272 positions.append(last) 273 i += 1 274 275 return self.last_docnum, positions 276 277 def read_term_positions(self, offset, count): 278 279 """ 280 Read all positions from 'offset', seeking to that position in the file 281 before reading. The number of documents available for reading is limited 282 to 'count'. 283 """ 284 285 self.reset() 286 287 # Duplicate the file handle. 288 289 f = fdopen(dup(self.f.fileno()), "rb") 290 f.seek(offset) 291 return PositionIterator(f, count) 292 293 class PositionIndexWriter(FileWriter): 294 295 "Writing position index information to files." 296 297 def reset(self): 298 self.last_docnum = 0 299 self.last_pos_offset = 0 300 301 def write_positions(self, docnum, pos_offset, count): 302 303 """ 304 Write the given 'docnum, 'pos_offset' and document 'count' to the 305 position index file. 306 """ 307 308 # Record the offset of this record. 309 310 offset = self.f.tell() 311 312 # Write the document number delta. 313 314 self.write_number(docnum - self.last_docnum) 315 self.last_docnum = docnum 316 317 # Write the position file offset delta. 318 319 self.write_number(pos_offset - self.last_pos_offset) 320 self.last_pos_offset = pos_offset 321 322 # Write the document count. 323 324 self.write_number(count) 325 326 return offset 327 328 class PositionIndexReader(FileReader): 329 330 "Reading position index information from files." 331 332 def reset(self): 333 self.last_docnum = 0 334 self.last_pos_offset = 0 335 336 def read_positions(self): 337 338 """ 339 Read a document number, a position file offset for the position index 340 file, and the number of documents in a section of that file. 341 """ 342 343 # Read the document number delta. 344 345 self.last_docnum += self.read_number() 346 347 # Read the offset delta. 348 349 self.last_pos_offset += self.read_number() 350 351 # Read the document count. 352 353 count = self.read_number() 354 355 return self.last_docnum, self.last_pos_offset, count 356 357 def read_term_positions(self, offset, doc_frequency): 358 359 """ 360 Read all positions from 'offset', seeking to that position in the file 361 before reading. The number of documents available for reading is limited 362 to 'doc_frequency'. 363 """ 364 365 # NOTE: This is almost a duplication of PositionReader.read_term_positions. 366 367 self.reset() 368 369 # Duplicate the file handle. 370 371 f = fdopen(dup(self.f.fileno()), "rb") 372 f.seek(offset) 373 return PositionIndexIterator(f, doc_frequency) 374 375 # Iterators for position-related files. 376 377 class IteratorBase: 378 379 def __init__(self, count): 380 self.replenish(count) 381 382 def replenish(self, count): 383 self.count = count 384 self.read_documents = 0 385 386 def __len__(self): 387 return self.count 388 389 def sort(self): 390 pass # Stored document positions are already sorted. 391 392 def __iter__(self): 393 return self 394 395 class PositionIterator(PositionReader, IteratorBase): 396 397 "Iterating over document positions." 398 399 def __init__(self, f, count): 400 PositionReader.__init__(self, f) 401 IteratorBase.__init__(self, count) 402 403 def next(self): 404 405 "Read positions for a single document." 406 407 if self.read_documents < self.count: 408 self.read_documents += 1 409 return self.read_positions() 410 else: 411 raise StopIteration 412 413 class PositionIndexIterator(PositionIndexReader, IteratorBase): 414 415 "Iterating over document positions." 416 417 def __init__(self, f, count): 418 PositionIndexReader.__init__(self, f) 419 IteratorBase.__init__(self, count) 420 self.section_count = 0 421 422 def next(self): 423 424 "Read positions for a single document." 425 426 self.read_documents += self.section_count 427 if self.read_documents < self.count: 428 docnum, pos_offset, self.section_count = t = self.read_positions() 429 return t 430 else: 431 raise StopIteration 432 433 class PositionDictionaryWriter: 434 435 "Writing position dictionaries." 436 437 def __init__(self, position_writer, position_index_writer, interval): 438 self.position_writer = position_writer 439 self.position_index_writer = position_index_writer 440 self.interval = interval 441 442 def write_term_positions(self, doc_positions): 443 444 """ 445 Write all 'doc_positions' - a collection of tuples of the form (document 446 number, position list) - to the file. 447 448 Add some records to the index, making dictionary entries. 449 450 Return a tuple containing the offset of the written data, the frequency 451 (number of positions), and document frequency (number of documents) for 452 the term involved. 453 """ 454 455 # Reset the writers. 456 457 self.position_writer.reset() 458 self.position_index_writer.reset() 459 460 index_offset = None 461 462 # Write the positions. 463 464 frequency = 0 465 first_docnum = None 466 first_offset = None 467 count = 0 468 469 doc_positions.sort() 470 471 for docnum, positions in doc_positions: 472 pos_offset = self.position_writer.write_positions(docnum, positions) 473 474 # Retain the first record offset for a subsequent index entry. 475 476 if first_offset is None: 477 first_offset = pos_offset 478 first_docnum = docnum 479 480 frequency += len(positions) 481 count += 1 482 483 # Every {interval} entries, write an index entry. 484 485 if count == self.interval: 486 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 487 488 # Remember the first index entry offset. 489 490 if index_offset is None: 491 index_offset = io 492 493 first_offset = None 494 first_docnum = None 495 count = 0 496 497 # Finish writing an index entry for the remaining documents. 498 499 else: 500 if first_offset is not None: 501 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 502 503 # Remember the first index entry offset. 504 505 if index_offset is None: 506 index_offset = io 507 508 return index_offset, frequency, len(doc_positions) 509 510 def close(self): 511 self.position_writer.close() 512 self.position_index_writer.close() 513 514 class PositionDictionaryReader: 515 516 "Reading position dictionaries." 517 518 def __init__(self, position_reader, position_index_reader): 519 self.position_reader = position_reader 520 self.position_index_reader = position_index_reader 521 522 def read_term_positions(self, offset, doc_frequency): 523 524 """ 525 Return an iterator for dictionary entries starting at 'offset' with the 526 given 'doc_frequency'. 527 """ 528 529 return PositionDictionaryIterator(self.position_reader, 530 self.position_index_reader, offset, doc_frequency) 531 532 def close(self): 533 self.position_reader.close() 534 self.position_index_reader.close() 535 536 class PositionDictionaryIterator: 537 538 "Iteration over position dictionary entries." 539 540 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 541 self.position_reader = position_reader 542 self.doc_frequency = doc_frequency 543 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 544 545 # Maintain state for the next index entry, if read. 546 547 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 548 549 # Initialise the current index entry and current position file iterator. 550 551 self._next_section() 552 self._init_section() 553 554 def __len__(self): 555 return self.doc_frequency 556 557 def sort(self): 558 pass 559 560 def __iter__(self): 561 return self 562 563 def next(self): 564 565 """ 566 Attempt to get the next document record from the section in the 567 positions file. 568 """ 569 570 while 1: 571 572 # Either return the next record. 573 574 try: 575 return self.iterator.next() 576 577 # Or, where a section is finished, get the next section and try again. 578 579 except StopIteration: 580 581 # Where a section follows, update the index iterator, but keep 582 # reading using the same file iterator (since the data should 583 # just follow on from the last section). 584 585 self._next_section() 586 self.iterator.replenish(self.section_count) 587 588 def __getitem__(self, docnum): 589 590 """ 591 Attempt to navigate to a positions entry for the given 'docnum', 592 returning the positions, if present, or None otherwise. 593 """ 594 595 # Read ahead in the index until the next entry refers to a document 596 # later than the desired document. 597 598 try: 599 if self.next_docnum is None: 600 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 601 602 while self.next_docnum < docnum: 603 self._next_read_section() 604 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 605 606 except StopIteration: 607 pass 608 609 # Navigate in the position file to the document. 610 611 self._init_section() 612 613 try: 614 while 1: 615 found_docnum, positions = self.iterator.next() 616 if docnum == found_docnum: 617 return positions 618 elif docnum < found_docnum: 619 return None 620 except StopIteration: 621 return None 622 623 # Internal methods. 624 625 def _next_section(self): 626 627 "Attempt to get the next section in the index." 628 629 if self.next_docnum is None: 630 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 631 else: 632 self._next_read_section() 633 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 634 635 def _next_read_section(self): 636 637 """ 638 Make the next index entry the current one without reading from the 639 index. 640 """ 641 642 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 643 644 def _init_section(self): 645 646 "Initialise the iterator for the section in the position file." 647 648 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 649 650 class TermWriter(FileWriter): 651 652 "Writing term information to files." 653 654 def reset(self): 655 self.last_term = "" 656 self.last_offset = 0 657 658 def write_term(self, term, offset, frequency, doc_frequency): 659 660 """ 661 Write the given 'term', its position file 'offset', its 'frequency' and 662 its 'doc_frequency' (number of documents in which it appears) to the 663 term information file. Return the offset after the term information was 664 written to the file. 665 """ 666 667 # Write the prefix length and term suffix. 668 669 common = len(commonprefix([self.last_term, term])) 670 suffix = term[common:] 671 672 self.write_number(common) 673 self.write_string(suffix) 674 675 # Write the offset delta. 676 677 self.write_number(offset - self.last_offset) 678 679 # Write the frequency. 680 681 self.write_number(frequency) 682 683 # Write the document frequency. 684 685 self.write_number(doc_frequency) 686 687 self.last_term = term 688 self.last_offset = offset 689 690 return self.f.tell() 691 692 class TermReader(FileReader): 693 694 "Reading term information from files." 695 696 def reset(self): 697 self.last_term = "" 698 self.last_offset = 0 699 700 def read_term(self): 701 702 """ 703 Read a term, its position file offset, its frequency and its document 704 frequence from the term information file. 705 """ 706 707 # Read the prefix length and term suffix. 708 709 common = self.read_number() 710 suffix = self.read_string() 711 712 self.last_term = self.last_term[:common] + suffix 713 714 # Read the offset delta. 715 716 self.last_offset += self.read_number() 717 718 # Read the frequency. 719 720 frequency = self.read_number() 721 722 # Read the document frequency. 723 724 doc_frequency = self.read_number() 725 726 return self.last_term, self.last_offset, frequency, doc_frequency 727 728 def go_to_term(self, term, offset, info_offset): 729 730 """ 731 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 732 permits the scanning for later terms from the specified term. 733 """ 734 735 self.f.seek(info_offset) 736 self.last_term = term 737 self.last_offset = offset 738 739 class TermIndexWriter(TermWriter): 740 741 "Writing term dictionary index details to files." 742 743 def reset(self): 744 TermWriter.reset(self) 745 self.last_info_offset = 0 746 747 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 748 749 """ 750 Write the given 'term', its position file 'offset', its 'frequency' and 751 its 'doc_frequency' to the term dictionary index file, along with the 752 'info_offset' in the term information file. 753 """ 754 755 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 756 757 # Write the information file offset delta. 758 759 self.write_number(info_offset - self.last_info_offset) 760 self.last_info_offset = info_offset 761 762 class TermIndexReader(TermReader): 763 764 "Reading term dictionary index details from files." 765 766 def reset(self): 767 TermReader.reset(self) 768 self.last_info_offset = 0 769 770 def read_term(self): 771 772 """ 773 Read a term, its position file offset, its frequency, its document 774 frequency and a term information file offset from the term dictionary 775 index file. 776 """ 777 778 term, offset, frequency, doc_frequency = TermReader.read_term(self) 779 780 # Read the offset delta. 781 782 self.last_info_offset += self.read_number() 783 784 return term, offset, frequency, doc_frequency, self.last_info_offset 785 786 class TermDictionaryWriter: 787 788 "Writing term dictionaries." 789 790 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 791 self.info_writer = info_writer 792 self.index_writer = index_writer 793 self.position_dict_writer = position_dict_writer 794 self.interval = interval 795 self.entry = 0 796 797 def _write_term(self, term, offset, frequency, doc_frequency): 798 799 """ 800 Write the given 'term', its position file 'offset', its 'frequency' and 801 its 'doc_frequency' (number of documents in which it appears) to the 802 term information file. Return the offset after the term information was 803 written to the file. 804 """ 805 806 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 807 808 if self.entry % self.interval == 0: 809 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 810 811 self.entry += 1 812 813 def write_term_positions(self, term, doc_positions): 814 815 """ 816 Write the given 'term' and the 'doc_positions' recording the documents 817 and positions at which the term is found. 818 """ 819 820 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 821 self._write_term(term, offset, frequency, doc_frequency) 822 823 def close(self): 824 self.info_writer.close() 825 self.index_writer.close() 826 self.position_dict_writer.close() 827 828 class TermDictionaryReader: 829 830 "Reading term dictionaries." 831 832 def __init__(self, info_reader, index_reader, position_reader): 833 self.info_reader = info_reader 834 self.index_reader = index_reader 835 self.position_reader = position_reader 836 837 self.terms = [] 838 try: 839 while 1: 840 self.terms.append(self.index_reader.read_term()) 841 except EOFError: 842 pass 843 844 # Large numbers for ordering purposes. 845 846 self.max_offset = self.terms[-1][1] + 1 847 848 def _find_term(self, term): 849 850 """ 851 Find the position file offset and frequency of 'term' from the term 852 dictionary. 853 """ 854 855 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 856 857 # Get the entry position providing the term or one preceding it. 858 859 if i == -1: 860 return None 861 862 found_term, offset, frequency, doc_frequency, info_offset = self.terms[i] 863 864 # Where the term is found immediately, return the offset and 865 # frequencies. 866 867 if term == found_term: 868 return offset, frequency, doc_frequency 869 870 # Otherwise, seek past the index term's entry in the information file 871 # and scan for the desired term. 872 873 else: 874 self.info_reader.go_to_term(found_term, offset, info_offset) 875 try: 876 while term > found_term: 877 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 878 except EOFError: 879 pass 880 881 # If the term is found, return the offset and frequencies. 882 883 if term == found_term: 884 return offset, frequency, doc_frequency 885 else: 886 return None 887 888 def rewind(self): 889 self.info_reader.rewind() 890 891 def _get_positions(self, offset, doc_frequency): 892 return self.position_reader.read_term_positions(offset, doc_frequency) 893 894 def read_term(self): 895 896 """ 897 Return the next term, its frequency, its document frequency, and the 898 documents and positions at which the term is found. 899 """ 900 901 term, offset, frequency, doc_frequency = self.info_reader.read_term() 902 positions = self._get_positions(offset, doc_frequency) 903 return term, frequency, doc_frequency, positions 904 905 def find_positions(self, term): 906 907 "Return the documents and positions at which the given 'term' is found." 908 909 t = self._find_term(term) 910 if t is None: 911 return None 912 else: 913 offset, frequency, doc_frequency = t 914 return self._get_positions(offset, doc_frequency) 915 916 def get_frequency(self, term): 917 918 "Return the frequency of the given 'term'." 919 920 t = self._find_term(term) 921 if t is None: 922 return None 923 else: 924 offset, frequency, doc_frequency = t 925 return frequency 926 927 def get_document_frequency(self, term): 928 929 "Return the document frequency of the given 'term'." 930 931 t = self._find_term(term) 932 if t is None: 933 return None 934 else: 935 offset, frequency, doc_frequency = t 936 return doc_frequency 937 938 def close(self): 939 self.info_reader.close() 940 self.index_reader.close() 941 self.position_reader.close() 942 943 # Specific classes for storing document information. 944 945 class FieldWriter(FileWriter): 946 947 "Writing field data to files." 948 949 def reset(self): 950 self.last_docnum = 0 951 952 def write_fields(self, docnum, fields): 953 954 """ 955 Write for the given 'docnum', a list of 'fields' (integer, string pairs 956 representing field identifiers and values respectively). 957 Return the offset at which the fields are stored. 958 """ 959 960 offset = self.f.tell() 961 962 # Write the document number delta. 963 964 self.write_number(docnum - self.last_docnum) 965 966 # Write the number of fields. 967 968 self.write_number(len(fields)) 969 970 # Write the fields themselves. 971 972 for i, field in fields: 973 self.write_number(i) 974 self.write_string(field, 1) # compress 975 976 self.last_docnum = docnum 977 return offset 978 979 class FieldReader(FileReader): 980 981 "Reading field data from files." 982 983 def reset(self): 984 self.last_docnum = 0 985 986 def read_fields(self): 987 988 """ 989 Read fields from the file, returning a tuple containing the document 990 number and a list of field (identifier, value) pairs. 991 """ 992 993 # Read the document number. 994 995 self.last_docnum += self.read_number() 996 997 # Read the number of fields. 998 999 nfields = self.read_number() 1000 1001 # Collect the fields. 1002 1003 fields = [] 1004 i = 0 1005 1006 while i < nfields: 1007 identifier = self.read_number() 1008 value = self.read_string(1) # decompress 1009 fields.append((identifier, value)) 1010 i += 1 1011 1012 return self.last_docnum, fields 1013 1014 def read_document_fields(self, docnum, offset): 1015 1016 """ 1017 Read fields for 'docnum' at the given 'offset'. This permits the 1018 retrieval of details for the specified document, as well as scanning for 1019 later documents. 1020 """ 1021 1022 self.f.seek(offset) 1023 bad_docnum, fields = self.read_fields() 1024 self.last_docnum = docnum 1025 return docnum, fields 1026 1027 class FieldIndexWriter(FileWriter): 1028 1029 "Writing field index details to files." 1030 1031 def reset(self): 1032 self.last_docnum = 0 1033 self.last_offset = 0 1034 1035 def write_document(self, docnum, offset): 1036 1037 """ 1038 Write for the given 'docnum', the 'offset' at which the fields for the 1039 document are stored in the fields file. 1040 """ 1041 1042 # Write the document number and offset deltas. 1043 1044 self.write_number(docnum - self.last_docnum) 1045 self.write_number(offset - self.last_offset) 1046 1047 self.last_docnum = docnum 1048 self.last_offset = offset 1049 1050 class FieldIndexReader(FileReader): 1051 1052 "Reading field index details from files." 1053 1054 def reset(self): 1055 self.last_docnum = 0 1056 self.last_offset = 0 1057 1058 def read_document(self): 1059 1060 "Read a document number and field file offset." 1061 1062 # Read the document number delta and offset. 1063 1064 self.last_docnum += self.read_number() 1065 self.last_offset += self.read_number() 1066 1067 return self.last_docnum, self.last_offset 1068 1069 class FieldDictionaryWriter: 1070 1071 "Writing field dictionary details." 1072 1073 def __init__(self, field_writer, field_index_writer, interval): 1074 self.field_writer = field_writer 1075 self.field_index_writer = field_index_writer 1076 self.interval = interval 1077 self.entry = 0 1078 1079 def write_fields(self, docnum, fields): 1080 1081 "Write details of the document with the given 'docnum' and 'fields'." 1082 1083 offset = self.field_writer.write_fields(docnum, fields) 1084 1085 if self.entry % self.interval == 0: 1086 self.field_index_writer.write_document(docnum, offset) 1087 1088 self.entry += 1 1089 1090 def close(self): 1091 self.field_writer.close() 1092 self.field_index_writer.close() 1093 1094 class FieldDictionaryReader: 1095 1096 "Reading field dictionary details." 1097 1098 def __init__(self, field_reader, field_index_reader): 1099 self.field_reader = field_reader 1100 self.field_index_reader = field_index_reader 1101 1102 self.docs = [] 1103 try: 1104 while 1: 1105 self.docs.append(self.field_index_reader.read_document()) 1106 except EOFError: 1107 pass 1108 1109 # Large numbers for ordering purposes. 1110 1111 self.max_offset = self.docs[-1][1] 1112 1113 def rewind(self): 1114 self.field_reader.rewind() 1115 1116 def read_fields(self): 1117 1118 "Return the next document number and fields." 1119 1120 return self.field_reader.read_fields() 1121 1122 def get_fields(self, docnum): 1123 1124 "Read the fields of the document with the given 'docnum'." 1125 1126 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1127 1128 # Get the entry position providing the term or one preceding it. 1129 1130 if i == -1: 1131 return None 1132 1133 found_docnum, offset = self.docs[i] 1134 1135 # Read from the fields file. 1136 1137 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1138 1139 # Scan for the document, if necessary. 1140 1141 try: 1142 while docnum > found_docnum: 1143 found_docnum, fields = self.field_reader.read_fields() 1144 except EOFError: 1145 pass 1146 1147 # If the document is found, return the fields. 1148 1149 if docnum == found_docnum: 1150 return fields 1151 else: 1152 return None 1153 1154 def close(self): 1155 self.field_reader.close() 1156 self.field_index_reader.close() 1157 1158 # Dictionary merging classes. 1159 1160 class Merger: 1161 1162 "Merge files." 1163 1164 def __init__(self, writer, readers): 1165 self.writer = writer 1166 self.readers = readers 1167 1168 def close(self): 1169 for reader in self.readers: 1170 reader.close() 1171 self.writer.close() 1172 1173 class TermDictionaryMerger(Merger): 1174 1175 "Merge term and position files." 1176 1177 def merge(self): 1178 1179 """ 1180 Merge terms and positions from the readers, sending them to the writer. 1181 """ 1182 1183 entries = [] 1184 1185 # Get the first entries from the readers. 1186 1187 for partition, reader in enumerate(self.readers): 1188 reader.rewind() 1189 1190 try: 1191 term, frequency, doc_frequency, positions = reader.read_term() 1192 insort_right(entries, (term, positions, partition)) 1193 except EOFError: 1194 pass 1195 1196 # While entries are available, write them out in order, merging where 1197 # appropriate. 1198 1199 while entries: 1200 term, doc_positions, partition = entries[0] 1201 to_update = [partition] 1202 1203 nentries = len(entries) 1204 i = 1 1205 1206 # Find other entries for the term. 1207 1208 while i < nentries: 1209 other_term, other_doc_positions, other_partition = entries[i] 1210 1211 # For such entries, merge the positions. 1212 1213 if other_term == term: 1214 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 1215 to_update.append(other_partition) 1216 i += 1 1217 else: 1218 break 1219 1220 # Write the combined term details. 1221 1222 self.writer.write_term_positions(term, doc_positions) 1223 1224 # Update the entries from the affected readers. 1225 1226 del entries[:i] 1227 1228 for partition in to_update: 1229 try: 1230 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1231 insort_right(entries, (term, positions, partition)) 1232 except EOFError: 1233 pass 1234 1235 def merge_positions(self, doc_positions, other_doc_positions): 1236 1237 """ 1238 Merge 'doc_positions' with 'other_doc_positions' so that common document 1239 records contain positions from both collections. 1240 """ 1241 1242 doc_position_dict = dict(doc_positions) 1243 1244 for docnum, positions in other_doc_positions: 1245 if doc_position_dict.has_key(docnum): 1246 doc_position_dict[docnum] += positions 1247 else: 1248 doc_position_dict[docnum] = positions 1249 1250 return doc_position_dict.items() 1251 1252 class FieldDictionaryMerger(Merger): 1253 1254 "Merge field files." 1255 1256 def merge(self): 1257 1258 """ 1259 Merge fields from the readers, sending them to the writer. 1260 """ 1261 1262 entries = [] 1263 1264 # Get the first entries from the readers. 1265 1266 for partition, reader in enumerate(self.readers): 1267 reader.rewind() 1268 1269 try: 1270 docnum, fields = reader.read_fields() 1271 insort_right(entries, (docnum, fields, partition)) 1272 except EOFError: 1273 pass 1274 1275 # While entries are available, write them out in order, merging where 1276 # appropriate. 1277 1278 while entries: 1279 docnum, fields, partition = entries[0] 1280 to_update = [partition] 1281 1282 nentries = len(entries) 1283 i = 1 1284 1285 # Find other entries for the term. 1286 1287 while i < nentries: 1288 other_docnum, other_fields, other_partition = entries[i] 1289 1290 # For such entries, merge the positions. 1291 1292 if other_docnum == docnum: 1293 fields += other_fields 1294 to_update.append(other_partition) 1295 i += 1 1296 else: 1297 break 1298 1299 # Write the combined term details. 1300 1301 self.writer.write_fields(docnum, fields) 1302 1303 # Update the entries from the affected readers. 1304 1305 del entries[:i] 1306 1307 for partition in to_update: 1308 try: 1309 docnum, fields = self.readers[partition].read_fields() 1310 insort_right(entries, (docnum, fields, partition)) 1311 except EOFError: 1312 pass 1313 1314 # Utility functions. 1315 1316 def get_term_writer(pathname, partition, interval, doc_interval): 1317 1318 """ 1319 Return a term dictionary writer using files under the given 'pathname' 1320 labelled according to the given 'partition', using the given indexing 1321 'interval' for terms and 'doc_interval' for document position records. 1322 """ 1323 1324 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1325 info_writer = TermWriter(tdf) 1326 1327 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1328 index_writer = TermIndexWriter(tdif) 1329 1330 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1331 positions_writer = PositionWriter(tpf) 1332 1333 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1334 positions_index_writer = PositionIndexWriter(tpif) 1335 1336 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1337 1338 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1339 1340 def get_field_writer(pathname, partition, interval): 1341 1342 """ 1343 Return a field dictionary writer using files under the given 'pathname' 1344 labelled according to the given 'partition', using the given indexing 1345 'interval'. 1346 """ 1347 1348 ff = open(join(pathname, "fields-%s" % partition), "wb") 1349 field_writer = FieldWriter(ff) 1350 1351 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1352 field_index_writer = FieldIndexWriter(fif) 1353 1354 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1355 1356 def get_term_reader(pathname, partition): 1357 1358 """ 1359 Return a term dictionary reader using files under the given 'pathname' 1360 labelled according to the given 'partition'. 1361 """ 1362 1363 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1364 info_reader = TermReader(tdf) 1365 1366 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1367 index_reader = TermIndexReader(tdif) 1368 1369 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1370 positions_reader = PositionReader(tpf) 1371 1372 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1373 positions_index_reader = PositionIndexReader(tpif) 1374 1375 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1376 1377 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1378 1379 def get_field_reader(pathname, partition): 1380 1381 """ 1382 Return a field dictionary reader using files under the given 'pathname' 1383 labelled according to the given 'partition'. 1384 """ 1385 1386 ff = open(join(pathname, "fields-%s" % partition), "rb") 1387 field_reader = FieldReader(ff) 1388 1389 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1390 field_index_reader = FieldIndexReader(fif) 1391 1392 return FieldDictionaryReader(field_reader, field_index_reader) 1393 1394 def rename_files(pathname, names, from_partition, to_partition): 1395 for name in names: 1396 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1397 1398 def rename_term_files(pathname, from_partition, to_partition): 1399 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1400 1401 def rename_field_files(pathname, from_partition, to_partition): 1402 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1403 1404 def remove_files(pathname, names, partition): 1405 for name in names: 1406 remove(join(pathname, "%s-%s" % (name, partition))) 1407 1408 def remove_term_files(pathname, partition): 1409 remove_files(pathname, TERM_FILENAMES, partition) 1410 1411 def remove_field_files(pathname, partition): 1412 remove_files(pathname, FIELD_FILENAMES, partition) 1413 1414 # High-level classes. 1415 1416 class IndexWriter: 1417 1418 """ 1419 Building term information and writing it to the term and field dictionaries. 1420 """ 1421 1422 def __init__(self, pathname, interval, doc_interval, flush_interval): 1423 self.pathname = pathname 1424 self.interval = interval 1425 self.doc_interval = doc_interval 1426 self.flush_interval = flush_interval 1427 1428 self.dict_partition = 0 1429 self.field_dict_partition = 0 1430 1431 self.terms = {} 1432 self.docs = {} 1433 1434 self.position_counter = 0 1435 self.field_counter = 0 1436 1437 def add_position(self, term, docnum, position): 1438 1439 """ 1440 Add a position entry for the given 'term' in the document with the given 1441 'docnum', indicating the given 'position'. 1442 """ 1443 1444 if not self.terms.has_key(term): 1445 doc_positions = self.terms[term] = {} 1446 else: 1447 doc_positions = self.terms[term] 1448 1449 if not doc_positions.has_key(docnum): 1450 doc = doc_positions[docnum] = [] 1451 else: 1452 doc = doc_positions[docnum] 1453 1454 doc.append(position) 1455 1456 self.position_counter += 1 1457 if self.flush_interval and self.position_counter >= self.flush_interval: 1458 self.flush_terms() 1459 self.position_counter = 0 1460 1461 def add_field(self, docnum, identifier, value): 1462 1463 """ 1464 Add for the document with the given 'docnum' a field having the given 1465 'identifier' and 'value'. 1466 """ 1467 1468 if not self.docs.has_key(docnum): 1469 doc_fields = self.docs[docnum] = [] 1470 else: 1471 doc_fields = self.docs[docnum] 1472 1473 doc_fields.append((identifier, value)) 1474 1475 self.field_counter += 1 1476 if self.flush_interval and self.field_counter >= self.flush_interval: 1477 self.flush_fields() 1478 self.field_counter = 0 1479 1480 def get_term_writer(self): 1481 1482 "Return a term dictionary writer for the current partition." 1483 1484 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1485 1486 def get_field_writer(self): 1487 1488 "Return a field dictionary writer for the current partition." 1489 1490 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1491 1492 def flush_terms(self): 1493 1494 "Flush terms into the current term dictionary partition." 1495 1496 # Get the terms in order. 1497 1498 terms = self.terms.items() 1499 terms.sort() 1500 1501 dict_writer = self.get_term_writer() 1502 1503 for term, doc_positions in terms: 1504 doc_positions = doc_positions.items() 1505 dict_writer.write_term_positions(term, doc_positions) 1506 1507 dict_writer.close() 1508 1509 self.terms = {} 1510 self.dict_partition += 1 1511 1512 def flush_fields(self): 1513 1514 "Flush fields into the current term dictionary partition." 1515 1516 # Get the documents in order. 1517 1518 docs = self.docs.items() 1519 docs.sort() 1520 1521 field_dict_writer = self.get_field_writer() 1522 1523 for docnum, fields in docs: 1524 field_dict_writer.write_fields(docnum, fields) 1525 1526 field_dict_writer.close() 1527 1528 self.docs = {} 1529 self.field_dict_partition += 1 1530 1531 def close(self): 1532 if self.terms: 1533 self.flush_terms() 1534 if self.docs: 1535 self.flush_fields() 1536 1537 class IndexReader: 1538 1539 "Accessing the term and field dictionaries." 1540 1541 def __init__(self, pathname): 1542 self.dict_reader = get_term_reader(pathname, "merged") 1543 self.field_dict_reader = get_field_reader(pathname, "merged") 1544 1545 def find_positions(self, term): 1546 return self.dict_reader.find_positions(term) 1547 1548 def get_frequency(self, term): 1549 return self.dict_reader.get_frequency(term) 1550 1551 def get_fields(self, docnum): 1552 return self.field_dict_reader.get_fields(docnum) 1553 1554 def close(self): 1555 self.dict_reader.close() 1556 self.field_dict_reader.close() 1557 1558 class Index: 1559 1560 "An inverted index solution encapsulating the various components." 1561 1562 def __init__(self, pathname): 1563 self.pathname = pathname 1564 self.reader = None 1565 self.writer = None 1566 1567 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1568 1569 """ 1570 Return a writer, optionally using the given indexing 'interval', 1571 'doc_interval' and 'flush_interval'. 1572 """ 1573 1574 if not exists(self.pathname): 1575 mkdir(self.pathname) 1576 1577 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1578 return self.writer 1579 1580 def get_reader(self, partition=0): 1581 1582 "Return a reader for the index." 1583 1584 # Ensure that only one partition exists. 1585 1586 self.merge_terms() 1587 self.merge_fields() 1588 1589 return self._get_reader(partition) 1590 1591 def _get_reader(self, partition): 1592 1593 "Return a reader for the index." 1594 1595 if not exists(self.pathname): 1596 raise OSError, "Index path %r does not exist." % self.pathname 1597 1598 self.reader = IndexReader(self.pathname) 1599 return self.reader 1600 1601 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1602 1603 """ 1604 Merge term dictionaries using the given indexing 'interval' and 1605 'doc_interval'. 1606 """ 1607 1608 readers = [] 1609 partitions = set() 1610 1611 for filename in listdir(self.pathname): 1612 if filename.startswith("terms-"): # 6 character prefix 1613 partition = filename[6:] 1614 readers.append(get_term_reader(self.pathname, partition)) 1615 partitions.add(partition) 1616 1617 # Write directly to a dictionary. 1618 1619 if len(readers) > 1: 1620 if "merged" in partitions: 1621 rename_term_files(self.pathname, "merged", "old-merged") 1622 partitions.remove("merged") 1623 partitions.add("old-merged") 1624 1625 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1626 merger = TermDictionaryMerger(writer, readers) 1627 merger.merge() 1628 merger.close() 1629 1630 # Remove old files. 1631 1632 for partition in partitions: 1633 remove_term_files(self.pathname, partition) 1634 1635 elif len(readers) == 1: 1636 partition = list(partitions)[0] 1637 if partition != "merged": 1638 rename_term_files(self.pathname, partition, "merged") 1639 1640 def merge_fields(self, interval=FIELD_INTERVAL): 1641 1642 "Merge field dictionaries using the given indexing 'interval'." 1643 1644 readers = [] 1645 partitions = set() 1646 1647 for filename in listdir(self.pathname): 1648 if filename.startswith("fields-"): # 7 character prefix 1649 partition = filename[7:] 1650 readers.append(get_field_reader(self.pathname, partition)) 1651 partitions.add(partition) 1652 1653 # Write directly to a dictionary. 1654 1655 if len(readers) > 1: 1656 if "merged" in partitions: 1657 rename_field_files(self.pathname, "merged", "old-merged") 1658 partitions.remove("merged") 1659 partitions.add("old-merged") 1660 1661 writer = get_field_writer(self.pathname, "merged", interval) 1662 merger = FieldDictionaryMerger(writer, readers) 1663 merger.merge() 1664 merger.close() 1665 1666 # Remove old files. 1667 1668 for partition in partitions: 1669 remove_field_files(self.pathname, partition) 1670 1671 elif len(readers) == 1: 1672 partition = list(partitions)[0] 1673 if partition != "merged": 1674 rename_field_files(self.pathname, partition, "merged") 1675 1676 def close(self): 1677 if self.reader is not None: 1678 self.reader.close() 1679 self.reader = None 1680 if self.writer is not None: 1681 self.writer.close() 1682 self.writer = None 1683 1684 # vim: tabstop=4 expandtab shiftwidth=4