1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 from bisect import insort_right # to maintain a sorted list of data for merging 28 import bz2, zlib # for field compression 29 from itermerge import itermerge 30 31 try: 32 set 33 except NameError: 34 from sets import Set as set 35 36 # Constants. 37 38 TERM_INTERVAL = 100 39 DOCUMENT_INTERVAL = 100 40 FIELD_INTERVAL = 100 41 FLUSH_INTERVAL = 10000 42 43 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 44 FIELD_FILENAMES = "fields", "fields_index" 45 46 compressors = [("b", bz2.compress), ("z", zlib.compress)] 47 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 48 49 # Foundation classes. 50 51 class File: 52 53 "A basic file abstraction." 54 55 def __init__(self, f): 56 self.f = f 57 self.reset() 58 59 def reset(self): 60 61 "To be used to reset the state of the reader or writer between records." 62 63 pass 64 65 def rewind(self): 66 self.f.seek(0) 67 self.reset() 68 69 def close(self): 70 if self.f is not None: 71 self.f.close() 72 self.f = None 73 74 class FileWriter(File): 75 76 "Writing basic data types to files." 77 78 def write_number(self, number): 79 80 "Write 'number' to the file using a variable length encoding." 81 82 # Negative numbers are not supported. 83 84 if number < 0: 85 raise ValueError, "Number %r is negative." % number 86 87 # Special case: one byte containing a 7-bit number. 88 89 elif number < 128: 90 self.f.write(chr(number)) 91 return 92 93 # Write the number from least to most significant digits. 94 95 bytes = [] 96 97 while number != 0: 98 lsd = number & 127 99 number = number >> 7 100 if number != 0: 101 lsd |= 128 102 bytes.append(chr(lsd)) 103 104 record = "".join(bytes) 105 self.f.write(record) 106 107 def write_string(self, s, compress=0): 108 109 """ 110 Write 's' to the file, recording its length and compressing the string 111 if 'compress' is set to a true value. 112 """ 113 114 # Convert Unicode objects to strings. 115 116 if isinstance(s, unicode): 117 s = s.encode("utf-8") 118 119 # Compress the string if requested. 120 121 if compress: 122 for flag, fn in compressors: 123 cs = fn(s) 124 125 # Take the first string shorter than the original. 126 127 if len(cs) < len(s): 128 s = cs 129 break 130 else: 131 flag = "-" 132 133 # Record whether compression was used. 134 135 self.f.write(flag) 136 137 # Write the length of the data before the data itself. 138 139 length = len(s) 140 self.write_number(length) 141 self.f.write(s) 142 143 class FileReader(File): 144 145 "Reading basic data types from files." 146 147 def read_number(self): 148 149 "Read a number from the file." 150 151 # Read each byte, adding it to the number. 152 153 shift = 0 154 number = 0 155 more = 1 156 157 while more: 158 byte = self.f.read(1) 159 if not byte: 160 raise EOFError 161 162 csd = ord(byte) 163 more = csd & 128 != 0 164 if more: 165 csd &= 127 166 number += (csd << shift) 167 shift += 7 168 169 return number 170 171 def read_string(self, decompress=0): 172 173 """ 174 Read a string from the file, decompressing the stored data if 175 'decompress' is set to a true value. 176 """ 177 178 # Decompress the data if requested. 179 180 if decompress: 181 flag = self.f.read(1) 182 else: 183 flag = "-" 184 185 length = self.read_number() 186 s = self.f.read(length) 187 188 # Perform decompression if applicable. 189 190 if flag != "-": 191 fn = decompressors[flag] 192 s = fn(s) 193 194 # Convert strings to Unicode objects. 195 196 return unicode(s, "utf-8") 197 198 # Specific classes for storing term and position information. 199 200 class PositionWriter(FileWriter): 201 202 "Writing position information to files." 203 204 def reset(self): 205 self.last_docnum = 0 206 207 def write_positions(self, docnum, positions): 208 209 """ 210 Write for the document 'docnum' the given 'positions'. 211 Return the offset of the written record. 212 """ 213 214 if docnum < self.last_docnum: 215 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 216 217 # Record the offset of this record. 218 219 offset = self.f.tell() 220 221 # Write the document number delta. 222 223 self.write_number(docnum - self.last_docnum) 224 225 # Write the number of positions. 226 227 self.write_number(len(positions)) 228 229 # Make sure that the positions are sorted. 230 231 positions.sort() 232 233 # Write the position deltas. 234 235 last = 0 236 for position in positions: 237 pos = position - last 238 self.write_number(pos) 239 last = position 240 241 self.last_docnum = docnum 242 243 return offset 244 245 class PositionReader(FileReader): 246 247 "Reading position information from files." 248 249 def reset(self): 250 self.last_docnum = 0 251 252 def read_positions(self): 253 254 "Read positions, returning a document number and a list of positions." 255 256 # Read the document number delta and add it to the last number. 257 258 self.last_docnum += self.read_number() 259 260 # Read the number of positions. 261 262 npositions = self.read_number() 263 264 # Read the position deltas, adding each previous position to get the 265 # appropriate collection of absolute positions. 266 267 i = 0 268 last = 0 269 positions = [] 270 271 while i < npositions: 272 last += self.read_number() 273 positions.append(last) 274 i += 1 275 276 return self.last_docnum, positions 277 278 def read_term_positions(self, offset, count): 279 280 """ 281 Read all positions from 'offset', seeking to that position in the file 282 before reading. The number of documents available for reading is limited 283 to 'count'. 284 """ 285 286 # Duplicate the file handle. 287 288 f = fdopen(dup(self.f.fileno()), "rb") 289 f.seek(offset) 290 return PositionIterator(f, count) 291 292 class PositionIndexWriter(FileWriter): 293 294 "Writing position index information to files." 295 296 def reset(self): 297 self.last_docnum = 0 298 self.last_pos_offset = 0 299 300 def write_positions(self, docnum, pos_offset, count): 301 302 """ 303 Write the given 'docnum, 'pos_offset' and document 'count' to the 304 position index file. 305 """ 306 307 # Record the offset of this record. 308 309 offset = self.f.tell() 310 311 # Write the document number delta. 312 313 self.write_number(docnum - self.last_docnum) 314 self.last_docnum = docnum 315 316 # Write the position file offset delta. 317 318 self.write_number(pos_offset - self.last_pos_offset) 319 self.last_pos_offset = pos_offset 320 321 # Write the document count. 322 323 self.write_number(count) 324 325 return offset 326 327 class PositionIndexReader(FileReader): 328 329 "Reading position index information from files." 330 331 def reset(self): 332 self.last_docnum = 0 333 self.last_pos_offset = 0 334 335 def read_positions(self): 336 337 """ 338 Read a document number, a position file offset for the position index 339 file, and the number of documents in a section of that file. 340 """ 341 342 # Read the document number delta. 343 344 self.last_docnum += self.read_number() 345 346 # Read the offset delta. 347 348 self.last_pos_offset += self.read_number() 349 350 # Read the document count. 351 352 count = self.read_number() 353 354 return self.last_docnum, self.last_pos_offset, count 355 356 def read_term_positions(self, offset, doc_frequency): 357 358 """ 359 Read all positions from 'offset', seeking to that position in the file 360 before reading. The number of documents available for reading is limited 361 to 'doc_frequency'. 362 """ 363 364 # Duplicate the file handle. 365 366 f = fdopen(dup(self.f.fileno()), "rb") 367 f.seek(offset) 368 return PositionIndexIterator(f, doc_frequency) 369 370 # Iterators for position-related files. 371 372 class IteratorBase: 373 374 def __init__(self, count): 375 self.replenish(count) 376 377 def replenish(self, count): 378 self.count = count 379 self.read_documents = 0 380 381 def __len__(self): 382 return self.count 383 384 def sort(self): 385 pass # Stored document positions are already sorted. 386 387 def __iter__(self): 388 return self 389 390 class PositionIterator(PositionReader, IteratorBase): 391 392 "Iterating over document positions." 393 394 def __init__(self, f, count): 395 PositionReader.__init__(self, f) 396 IteratorBase.__init__(self, count) 397 398 def next(self): 399 400 "Read positions for a single document." 401 402 if self.read_documents < self.count: 403 self.read_documents += 1 404 return self.read_positions() 405 else: 406 raise StopIteration 407 408 class PositionIndexIterator(PositionIndexReader, IteratorBase): 409 410 "Iterating over document positions." 411 412 def __init__(self, f, count): 413 PositionIndexReader.__init__(self, f) 414 IteratorBase.__init__(self, count) 415 self.section_count = 0 416 417 def next(self): 418 419 "Read positions for a single document." 420 421 self.read_documents += self.section_count 422 if self.read_documents < self.count: 423 docnum, pos_offset, self.section_count = t = self.read_positions() 424 return t 425 else: 426 raise StopIteration 427 428 class PositionDictionaryWriter: 429 430 "Writing position dictionaries." 431 432 def __init__(self, position_writer, position_index_writer, interval): 433 self.position_writer = position_writer 434 self.position_index_writer = position_index_writer 435 self.interval = interval 436 437 def write_term_positions(self, doc_positions): 438 439 """ 440 Write all 'doc_positions' - a collection of tuples of the form (document 441 number, position list) - to the file. 442 443 Add some records to the index, making dictionary entries. 444 445 Return a tuple containing the offset of the written data, the frequency 446 (number of positions), and document frequency (number of documents) for 447 the term involved. 448 """ 449 450 # Reset the writers. 451 452 self.position_writer.reset() 453 self.position_index_writer.reset() 454 455 index_offset = None 456 457 # Write the positions. 458 459 frequency = 0 460 first_docnum = None 461 first_offset = None 462 count = 0 463 464 doc_positions.sort() 465 466 for docnum, positions in doc_positions: 467 pos_offset = self.position_writer.write_positions(docnum, positions) 468 469 # Retain the first record offset for a subsequent index entry. 470 471 if first_offset is None: 472 first_offset = pos_offset 473 first_docnum = docnum 474 475 frequency += len(positions) 476 count += 1 477 478 # Every {interval} entries, write an index entry. 479 480 if count == self.interval: 481 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 482 483 # Remember the first index entry offset. 484 485 if index_offset is None: 486 index_offset = io 487 488 first_offset = None 489 first_docnum = None 490 count = 0 491 492 # Reset the position writer so that position readers accessing 493 # a section start with the correct document number. 494 495 self.position_writer.reset() 496 497 # Finish writing an index entry for the remaining documents. 498 499 else: 500 if first_offset is not None: 501 io = self.position_index_writer.write_positions(first_docnum, first_offset, count) 502 503 # Remember the first index entry offset. 504 505 if index_offset is None: 506 index_offset = io 507 508 return index_offset, frequency, len(doc_positions) 509 510 def close(self): 511 self.position_writer.close() 512 self.position_index_writer.close() 513 514 class PositionDictionaryReader: 515 516 "Reading position dictionaries." 517 518 def __init__(self, position_reader, position_index_reader): 519 self.position_reader = position_reader 520 self.position_index_reader = position_index_reader 521 522 def read_term_positions(self, offset, doc_frequency): 523 524 """ 525 Return an iterator for dictionary entries starting at 'offset' with the 526 given 'doc_frequency'. 527 """ 528 529 return PositionDictionaryIterator(self.position_reader, 530 self.position_index_reader, offset, doc_frequency) 531 532 def close(self): 533 self.position_reader.close() 534 self.position_index_reader.close() 535 536 class PositionDictionaryIterator: 537 538 "Iteration over position dictionary entries." 539 540 def __init__(self, position_reader, position_index_reader, offset, doc_frequency): 541 self.position_reader = position_reader 542 self.doc_frequency = doc_frequency 543 self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency) 544 545 # Remember the last values. 546 547 self.found_docnum, self.found_positions = None, None 548 549 # Maintain state for the next index entry, if read. 550 551 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 552 553 # Initialise the current index entry and current position file iterator. 554 555 self._next_section() 556 self._init_section() 557 558 def __len__(self): 559 return self.doc_frequency 560 561 def sort(self): 562 pass 563 564 def __iter__(self): 565 return self 566 567 def next(self): 568 569 """ 570 Attempt to get the next document record from the section in the 571 positions file. 572 """ 573 574 # Return any visited but unrequested record. 575 576 if self.found_docnum is not None: 577 t = self.found_docnum, self.found_positions 578 self.found_docnum, self.found_positions = None, None 579 return t 580 581 # Or search for the next record. 582 583 while 1: 584 585 # Either return the next record. 586 587 try: 588 return self.iterator.next() 589 590 # Or, where a section is finished, get the next section and try again. 591 592 except StopIteration: 593 594 # Where a section follows, update the index iterator, but keep 595 # reading using the same file iterator (since the data should 596 # just follow on from the last section). 597 598 self._next_section() 599 self.iterator.replenish(self.section_count) 600 601 # Reset the state of the iterator to make sure that document 602 # numbers are correct. 603 604 self.iterator.reset() 605 606 def from_document(self, docnum): 607 608 """ 609 Attempt to navigate to a positions entry for the given 'docnum', 610 returning the positions for 'docnum', or None otherwise. 611 """ 612 613 # Return any unrequested document positions. 614 615 if docnum == self.found_docnum: 616 return self.found_positions 617 618 # Read ahead in the index until the next entry refers to a document 619 # later than the desired document. 620 621 try: 622 if self.next_docnum is None: 623 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 624 625 # Read until the next entry is after the desired document number, 626 # or until the end of the results. 627 628 while self.next_docnum <= docnum: 629 self._next_read_section() 630 if self.docnum < docnum: 631 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 632 else: 633 break 634 635 except StopIteration: 636 pass 637 638 # Navigate in the position file to the document. 639 640 self._init_section() 641 642 try: 643 while 1: 644 found_docnum, found_positions = self.iterator.next() 645 646 # Return the desired document positions or None (retaining the 647 # positions for the document immediately after). 648 649 if docnum == found_docnum: 650 return found_positions 651 elif docnum < found_docnum: 652 self.found_docnum, self.found_positions = found_docnum, found_positions 653 return None 654 655 except StopIteration: 656 return None 657 658 # Internal methods. 659 660 def _next_section(self): 661 662 "Attempt to get the next section in the index." 663 664 if self.next_docnum is None: 665 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 666 else: 667 self._next_read_section() 668 669 def _next_read_section(self): 670 671 """ 672 Make the next index entry the current one without reading from the 673 index. 674 """ 675 676 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 677 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 678 679 def _init_section(self): 680 681 "Initialise the iterator for the section in the position file." 682 683 self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count) 684 685 class TermWriter(FileWriter): 686 687 "Writing term information to files." 688 689 def reset(self): 690 self.last_term = "" 691 self.last_offset = 0 692 693 def write_term(self, term, offset, frequency, doc_frequency): 694 695 """ 696 Write the given 'term', its position file 'offset', its 'frequency' and 697 its 'doc_frequency' (number of documents in which it appears) to the 698 term information file. Return the offset after the term information was 699 written to the file. 700 """ 701 702 # Write the prefix length and term suffix. 703 704 common = len(commonprefix([self.last_term, term])) 705 suffix = term[common:] 706 707 self.write_number(common) 708 self.write_string(suffix) 709 710 # Write the offset delta. 711 712 self.write_number(offset - self.last_offset) 713 714 # Write the frequency. 715 716 self.write_number(frequency) 717 718 # Write the document frequency. 719 720 self.write_number(doc_frequency) 721 722 self.last_term = term 723 self.last_offset = offset 724 725 return self.f.tell() 726 727 class TermReader(FileReader): 728 729 "Reading term information from files." 730 731 def reset(self): 732 self.last_term = "" 733 self.last_offset = 0 734 735 def read_term(self): 736 737 """ 738 Read a term, its position file offset, its frequency and its document 739 frequency from the term information file. 740 """ 741 742 # Read the prefix length and term suffix. 743 744 common = self.read_number() 745 suffix = self.read_string() 746 747 self.last_term = self.last_term[:common] + suffix 748 749 # Read the offset delta. 750 751 self.last_offset += self.read_number() 752 753 # Read the frequency. 754 755 frequency = self.read_number() 756 757 # Read the document frequency. 758 759 doc_frequency = self.read_number() 760 761 return self.last_term, self.last_offset, frequency, doc_frequency 762 763 def go_to_term(self, term, offset, info_offset): 764 765 """ 766 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 767 permits the scanning for later terms from the specified term. 768 """ 769 770 self.f.seek(info_offset) 771 self.last_term = term 772 self.last_offset = offset 773 774 class TermIndexWriter(TermWriter): 775 776 "Writing term dictionary index details to files." 777 778 def reset(self): 779 TermWriter.reset(self) 780 self.last_info_offset = 0 781 782 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 783 784 """ 785 Write the given 'term', its position file 'offset', its 'frequency' and 786 its 'doc_frequency' to the term dictionary index file, along with the 787 'info_offset' in the term information file. 788 """ 789 790 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 791 792 # Write the information file offset delta. 793 794 self.write_number(info_offset - self.last_info_offset) 795 self.last_info_offset = info_offset 796 797 class TermIndexReader(TermReader): 798 799 "Reading term dictionary index details from files." 800 801 def reset(self): 802 TermReader.reset(self) 803 self.last_info_offset = 0 804 805 def read_term(self): 806 807 """ 808 Read a term, its position file offset, its frequency, its document 809 frequency and a term information file offset from the term dictionary 810 index file. 811 """ 812 813 term, offset, frequency, doc_frequency = TermReader.read_term(self) 814 815 # Read the offset delta. 816 817 self.last_info_offset += self.read_number() 818 819 return term, offset, frequency, doc_frequency, self.last_info_offset 820 821 class TermDictionaryWriter: 822 823 "Writing term dictionaries." 824 825 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 826 self.info_writer = info_writer 827 self.index_writer = index_writer 828 self.position_dict_writer = position_dict_writer 829 self.interval = interval 830 self.entry = 0 831 832 def _write_term(self, term, offset, frequency, doc_frequency): 833 834 """ 835 Write the given 'term', its position file 'offset', its 'frequency' and 836 its 'doc_frequency' (number of documents in which it appears) to the 837 term information file. Return the offset after the term information was 838 written to the file. 839 """ 840 841 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 842 843 if self.entry % self.interval == 0: 844 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 845 846 self.entry += 1 847 848 def write_term_positions(self, term, doc_positions): 849 850 """ 851 Write the given 'term' and the 'doc_positions' recording the documents 852 and positions at which the term is found. 853 """ 854 855 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 856 self._write_term(term, offset, frequency, doc_frequency) 857 858 def close(self): 859 self.info_writer.close() 860 self.index_writer.close() 861 self.position_dict_writer.close() 862 863 class TermDictionaryReader: 864 865 "Reading term dictionaries." 866 867 def __init__(self, info_reader, index_reader, position_dict_reader): 868 self.info_reader = info_reader 869 self.index_reader = index_reader 870 self.position_dict_reader = position_dict_reader 871 872 self.terms = [] 873 try: 874 while 1: 875 self.terms.append(self.index_reader.read_term()) 876 except EOFError: 877 pass 878 879 # Large numbers for ordering purposes. 880 881 if self.terms: 882 self.max_offset = self.terms[-1][1] + 1 883 else: 884 self.max_offset = None 885 886 def _find_closest_entry(self, term): 887 888 """ 889 Find the offsets and frequencies of 'term' from the term dictionary or 890 the closest term starting with the value of 'term'. 891 892 Return the closest index entry consisting of a term, the position file 893 offset, the term frequency, the document frequency, and the term details 894 file offset. 895 """ 896 897 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 898 899 # Get the entry position providing the term or one preceding it. 900 # If no entry precedes the requested term, return the very first entry 901 # as the closest. 902 903 if i == -1: 904 return self.terms[0] 905 else: 906 return self.terms[i] 907 908 def _find_closest_term(self, term): 909 910 """ 911 Find the offsets and frequencies of 'term' from the term dictionary or 912 the closest term starting with the value of 'term'. 913 914 Return the closest term (or the term itself), the position file offset, 915 the term frequency, the document frequency, and the term details file 916 offset (or None if the reader is already positioned). 917 """ 918 919 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 920 921 # Where the term is found immediately, return the offset and 922 # frequencies. If the term does not appear, return the details of the 923 # closest entry. 924 925 if term <= found_term: 926 return found_term, offset, frequency, doc_frequency, info_offset 927 928 # Otherwise, seek past the index term's entry in the information file 929 # and scan for the desired term. 930 931 else: 932 self.info_reader.go_to_term(found_term, offset, info_offset) 933 try: 934 while term > found_term: 935 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 936 except EOFError: 937 pass 938 939 return found_term, offset, frequency, doc_frequency, None 940 941 def _find_term(self, term): 942 943 """ 944 Find the position file offset and frequency of 'term' from the term 945 dictionary. 946 """ 947 948 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 949 950 # If the term is found, return the offset and frequencies. 951 952 if term == found_term: 953 return offset, frequency, doc_frequency 954 else: 955 return None 956 957 def _get_positions(self, offset, doc_frequency): 958 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 959 960 # Sequential access methods. 961 962 def rewind(self): 963 self.info_reader.rewind() 964 965 def read_term(self): 966 967 """ 968 Return the next term, its frequency, its document frequency, and the 969 documents and positions at which the term is found. 970 """ 971 972 term, offset, frequency, doc_frequency = self.info_reader.read_term() 973 positions = self._get_positions(offset, doc_frequency) 974 return term, frequency, doc_frequency, positions 975 976 # Query methods. 977 978 def find_terms(self, term): 979 980 "Return all terms whose values start with the value of 'term'." 981 982 terms = [] 983 984 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 985 986 # Position the reader, if necessary. 987 988 if info_offset is not None: 989 self.info_reader.go_to_term(found_term, offset, info_offset) 990 991 # Read and record terms. 992 993 try: 994 # Add the found term if it starts with the specified term. 995 996 while found_term.startswith(term): 997 terms.append(found_term) 998 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 999 1000 except EOFError: 1001 pass 1002 1003 return terms 1004 1005 def find_positions(self, term): 1006 1007 "Return the documents and positions at which the given 'term' is found." 1008 1009 t = self._find_term(term) 1010 if t is None: 1011 return None 1012 else: 1013 offset, frequency, doc_frequency = t 1014 return self._get_positions(offset, doc_frequency) 1015 1016 def get_frequency(self, term): 1017 1018 "Return the frequency of the given 'term'." 1019 1020 t = self._find_term(term) 1021 if t is None: 1022 return None 1023 else: 1024 offset, frequency, doc_frequency = t 1025 return frequency 1026 1027 def get_document_frequency(self, term): 1028 1029 "Return the document frequency of the given 'term'." 1030 1031 t = self._find_term(term) 1032 if t is None: 1033 return None 1034 else: 1035 offset, frequency, doc_frequency = t 1036 return doc_frequency 1037 1038 def close(self): 1039 self.info_reader.close() 1040 self.index_reader.close() 1041 self.position_dict_reader.close() 1042 1043 # Specific classes for storing document information. 1044 1045 class FieldWriter(FileWriter): 1046 1047 "Writing field data to files." 1048 1049 def reset(self): 1050 self.last_docnum = 0 1051 1052 def write_fields(self, docnum, fields): 1053 1054 """ 1055 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1056 representing field identifiers and values respectively). 1057 Return the offset at which the fields are stored. 1058 """ 1059 1060 offset = self.f.tell() 1061 1062 # Write the document number delta. 1063 1064 self.write_number(docnum - self.last_docnum) 1065 1066 # Write the number of fields. 1067 1068 self.write_number(len(fields)) 1069 1070 # Write the fields themselves. 1071 1072 for i, field in fields: 1073 self.write_number(i) 1074 self.write_string(field, 1) # compress 1075 1076 self.last_docnum = docnum 1077 return offset 1078 1079 class FieldReader(FileReader): 1080 1081 "Reading field data from files." 1082 1083 def reset(self): 1084 self.last_docnum = 0 1085 1086 def read_fields(self): 1087 1088 """ 1089 Read fields from the file, returning a tuple containing the document 1090 number and a list of field (identifier, value) pairs. 1091 """ 1092 1093 # Read the document number. 1094 1095 self.last_docnum += self.read_number() 1096 1097 # Read the number of fields. 1098 1099 nfields = self.read_number() 1100 1101 # Collect the fields. 1102 1103 fields = [] 1104 i = 0 1105 1106 while i < nfields: 1107 identifier = self.read_number() 1108 value = self.read_string(1) # decompress 1109 fields.append((identifier, value)) 1110 i += 1 1111 1112 return self.last_docnum, fields 1113 1114 def read_document_fields(self, docnum, offset): 1115 1116 """ 1117 Read fields for 'docnum' at the given 'offset'. This permits the 1118 retrieval of details for the specified document, as well as scanning for 1119 later documents. 1120 """ 1121 1122 self.f.seek(offset) 1123 bad_docnum, fields = self.read_fields() 1124 self.last_docnum = docnum 1125 return docnum, fields 1126 1127 class FieldIndexWriter(FileWriter): 1128 1129 "Writing field index details to files." 1130 1131 def reset(self): 1132 self.last_docnum = 0 1133 self.last_offset = 0 1134 1135 def write_document(self, docnum, offset): 1136 1137 """ 1138 Write for the given 'docnum', the 'offset' at which the fields for the 1139 document are stored in the fields file. 1140 """ 1141 1142 # Write the document number and offset deltas. 1143 1144 self.write_number(docnum - self.last_docnum) 1145 self.write_number(offset - self.last_offset) 1146 1147 self.last_docnum = docnum 1148 self.last_offset = offset 1149 1150 class FieldIndexReader(FileReader): 1151 1152 "Reading field index details from files." 1153 1154 def reset(self): 1155 self.last_docnum = 0 1156 self.last_offset = 0 1157 1158 def read_document(self): 1159 1160 "Read a document number and field file offset." 1161 1162 # Read the document number delta and offset. 1163 1164 self.last_docnum += self.read_number() 1165 self.last_offset += self.read_number() 1166 1167 return self.last_docnum, self.last_offset 1168 1169 class FieldDictionaryWriter: 1170 1171 "Writing field dictionary details." 1172 1173 def __init__(self, field_writer, field_index_writer, interval): 1174 self.field_writer = field_writer 1175 self.field_index_writer = field_index_writer 1176 self.interval = interval 1177 self.entry = 0 1178 1179 def write_fields(self, docnum, fields): 1180 1181 "Write details of the document with the given 'docnum' and 'fields'." 1182 1183 offset = self.field_writer.write_fields(docnum, fields) 1184 1185 if self.entry % self.interval == 0: 1186 self.field_index_writer.write_document(docnum, offset) 1187 1188 self.entry += 1 1189 1190 def close(self): 1191 self.field_writer.close() 1192 self.field_index_writer.close() 1193 1194 class FieldDictionaryReader: 1195 1196 "Reading field dictionary details." 1197 1198 def __init__(self, field_reader, field_index_reader): 1199 self.field_reader = field_reader 1200 self.field_index_reader = field_index_reader 1201 1202 self.docs = [] 1203 try: 1204 while 1: 1205 self.docs.append(self.field_index_reader.read_document()) 1206 except EOFError: 1207 pass 1208 1209 # Large numbers for ordering purposes. 1210 1211 if self.docs: 1212 self.max_offset = self.docs[-1][1] 1213 else: 1214 self.max_offset = None 1215 1216 def rewind(self): 1217 self.field_reader.rewind() 1218 1219 def read_fields(self): 1220 1221 "Return the next document number and fields." 1222 1223 return self.field_reader.read_fields() 1224 1225 def get_fields(self, docnum): 1226 1227 "Read the fields of the document with the given 'docnum'." 1228 1229 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1230 1231 # Get the entry position providing the term or one preceding it. 1232 1233 if i == -1: 1234 return None 1235 1236 found_docnum, offset = self.docs[i] 1237 1238 # Read from the fields file. 1239 1240 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1241 1242 # Scan for the document, if necessary. 1243 1244 try: 1245 while docnum > found_docnum: 1246 found_docnum, fields = self.field_reader.read_fields() 1247 except EOFError: 1248 pass 1249 1250 # If the document is found, return the fields. 1251 1252 if docnum == found_docnum: 1253 return fields 1254 else: 1255 return None 1256 1257 def close(self): 1258 self.field_reader.close() 1259 self.field_index_reader.close() 1260 1261 # Dictionary merging classes. 1262 1263 class Merger: 1264 1265 "Merge files." 1266 1267 def __init__(self, writer, readers): 1268 self.writer = writer 1269 self.readers = readers 1270 1271 def close(self): 1272 for reader in self.readers: 1273 reader.close() 1274 self.writer.close() 1275 1276 class TermDictionaryMerger(Merger): 1277 1278 "Merge term and position files." 1279 1280 def merge(self): 1281 1282 """ 1283 Merge terms and positions from the readers, sending them to the writer. 1284 """ 1285 1286 entries = [] 1287 1288 # Get the first entries from the readers. 1289 1290 for partition, reader in enumerate(self.readers): 1291 reader.rewind() 1292 1293 try: 1294 term, frequency, doc_frequency, positions = reader.read_term() 1295 insort_right(entries, (term, positions, partition)) 1296 except EOFError: 1297 pass 1298 1299 # While entries are available, write them out in order, merging where 1300 # appropriate. 1301 1302 while entries: 1303 term, doc_positions, partition = entries[0] 1304 to_update = [partition] 1305 1306 nentries = len(entries) 1307 i = 1 1308 1309 # Find other entries for the term. 1310 1311 while i < nentries: 1312 other_term, other_doc_positions, other_partition = entries[i] 1313 1314 # For such entries, merge the positions. 1315 1316 if other_term == term: 1317 doc_positions = itermerge(doc_positions, other_doc_positions) 1318 to_update.append(other_partition) 1319 i += 1 1320 else: 1321 break 1322 1323 # Write the combined term details. 1324 1325 self.writer.write_term_positions(term, doc_positions) 1326 1327 # Update the entries from the affected readers. 1328 1329 del entries[:i] 1330 1331 for partition in to_update: 1332 try: 1333 term, frequency, doc_frequency, positions = self.readers[partition].read_term() 1334 insort_right(entries, (term, positions, partition)) 1335 except EOFError: 1336 pass 1337 1338 class FieldDictionaryMerger(Merger): 1339 1340 "Merge field files." 1341 1342 def merge(self): 1343 1344 """ 1345 Merge fields from the readers, sending them to the writer. 1346 """ 1347 1348 entries = [] 1349 1350 # Get the first entries from the readers. 1351 1352 for partition, reader in enumerate(self.readers): 1353 reader.rewind() 1354 1355 try: 1356 docnum, fields = reader.read_fields() 1357 insort_right(entries, (docnum, fields, partition)) 1358 except EOFError: 1359 pass 1360 1361 # While entries are available, write them out in order, merging where 1362 # appropriate. Since fields from one document should only appear in a 1363 # single partition, only one partition will be updated at a time. 1364 1365 while entries: 1366 docnum, fields, partition = entries[0] 1367 1368 # Write the combined term details. 1369 1370 self.writer.write_fields(docnum, fields) 1371 1372 # Update the entries from the affected readers. 1373 1374 del entries[0] 1375 1376 try: 1377 docnum, fields = self.readers[partition].read_fields() 1378 insort_right(entries, (docnum, fields, partition)) 1379 except EOFError: 1380 pass 1381 1382 # Utility functions. 1383 1384 def get_term_writer(pathname, partition, interval, doc_interval): 1385 1386 """ 1387 Return a term dictionary writer using files under the given 'pathname' 1388 labelled according to the given 'partition', using the given indexing 1389 'interval' for terms and 'doc_interval' for document position records. 1390 """ 1391 1392 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1393 info_writer = TermWriter(tdf) 1394 1395 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1396 index_writer = TermIndexWriter(tdif) 1397 1398 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1399 positions_writer = PositionWriter(tpf) 1400 1401 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1402 positions_index_writer = PositionIndexWriter(tpif) 1403 1404 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1405 1406 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1407 1408 def get_field_writer(pathname, partition, interval): 1409 1410 """ 1411 Return a field dictionary writer using files under the given 'pathname' 1412 labelled according to the given 'partition', using the given indexing 1413 'interval'. 1414 """ 1415 1416 ff = open(join(pathname, "fields-%s" % partition), "wb") 1417 field_writer = FieldWriter(ff) 1418 1419 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1420 field_index_writer = FieldIndexWriter(fif) 1421 1422 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1423 1424 def get_term_reader(pathname, partition): 1425 1426 """ 1427 Return a term dictionary reader using files under the given 'pathname' 1428 labelled according to the given 'partition'. 1429 """ 1430 1431 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1432 info_reader = TermReader(tdf) 1433 1434 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1435 index_reader = TermIndexReader(tdif) 1436 1437 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1438 positions_reader = PositionReader(tpf) 1439 1440 tpif = open(join(pathname, "positions_index-%s" % partition), "rb") 1441 positions_index_reader = PositionIndexReader(tpif) 1442 1443 positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader) 1444 1445 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1446 1447 def get_field_reader(pathname, partition): 1448 1449 """ 1450 Return a field dictionary reader using files under the given 'pathname' 1451 labelled according to the given 'partition'. 1452 """ 1453 1454 ff = open(join(pathname, "fields-%s" % partition), "rb") 1455 field_reader = FieldReader(ff) 1456 1457 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1458 field_index_reader = FieldIndexReader(fif) 1459 1460 return FieldDictionaryReader(field_reader, field_index_reader) 1461 1462 def rename_files(pathname, names, from_partition, to_partition): 1463 for name in names: 1464 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1465 1466 def rename_term_files(pathname, from_partition, to_partition): 1467 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1468 1469 def rename_field_files(pathname, from_partition, to_partition): 1470 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1471 1472 def remove_files(pathname, names, partition): 1473 for name in names: 1474 remove(join(pathname, "%s-%s" % (name, partition))) 1475 1476 def remove_term_files(pathname, partition): 1477 remove_files(pathname, TERM_FILENAMES, partition) 1478 1479 def remove_field_files(pathname, partition): 1480 remove_files(pathname, FIELD_FILENAMES, partition) 1481 1482 # High-level classes. 1483 1484 class Document: 1485 1486 "A container of document information." 1487 1488 def __init__(self, docnum): 1489 self.docnum = docnum 1490 self.fields = [] 1491 self.terms = {} 1492 1493 def add_position(self, term, position): 1494 1495 """ 1496 Add a position entry for the given 'term', indicating the given 1497 'position'. 1498 """ 1499 1500 self.terms.setdefault(term, []).append(position) 1501 1502 def add_field(self, identifier, value): 1503 1504 "Add a field having the given 'identifier' and 'value'." 1505 1506 self.fields.append((identifier, unicode(value))) # convert to string 1507 1508 def set_fields(self, fields): 1509 1510 """ 1511 Set the document's 'fields': a list of tuples each containing an integer 1512 identifier and a string value. 1513 """ 1514 1515 self.fields = fields 1516 1517 class IndexWriter: 1518 1519 """ 1520 Building term information and writing it to the term and field dictionaries. 1521 """ 1522 1523 def __init__(self, pathname, interval, doc_interval, flush_interval): 1524 self.pathname = pathname 1525 self.interval = interval 1526 self.doc_interval = doc_interval 1527 self.flush_interval = flush_interval 1528 1529 self.dict_partition = 0 1530 self.field_dict_partition = 0 1531 1532 self.terms = {} 1533 self.docs = {} 1534 1535 self.doc_counter = 0 1536 1537 def add_document(self, doc): 1538 1539 """ 1540 Add the given document 'doc', updating the document counter and flushing 1541 terms and fields if appropriate. 1542 """ 1543 1544 for term, positions in doc.terms.items(): 1545 self.terms.setdefault(term, {})[doc.docnum] = positions 1546 1547 self.docs[doc.docnum] = doc.fields 1548 1549 self.doc_counter += 1 1550 if self.flush_interval and self.doc_counter >= self.flush_interval: 1551 self.flush_terms() 1552 self.flush_fields() 1553 self.doc_counter = 0 1554 1555 def get_term_writer(self): 1556 1557 "Return a term dictionary writer for the current partition." 1558 1559 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1560 1561 def get_field_writer(self): 1562 1563 "Return a field dictionary writer for the current partition." 1564 1565 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1566 1567 def flush_terms(self): 1568 1569 "Flush terms into the current term dictionary partition." 1570 1571 # Get the terms in order. 1572 1573 all_terms = self.terms 1574 terms = all_terms.keys() 1575 terms.sort() 1576 1577 dict_writer = self.get_term_writer() 1578 1579 for term in terms: 1580 doc_positions = all_terms[term].items() 1581 dict_writer.write_term_positions(term, doc_positions) 1582 1583 dict_writer.close() 1584 1585 self.terms = {} 1586 self.dict_partition += 1 1587 1588 def flush_fields(self): 1589 1590 "Flush fields into the current term dictionary partition." 1591 1592 # Get the documents in order. 1593 1594 docs = self.docs.items() 1595 docs.sort() 1596 1597 field_dict_writer = self.get_field_writer() 1598 1599 for docnum, fields in docs: 1600 field_dict_writer.write_fields(docnum, fields) 1601 1602 field_dict_writer.close() 1603 1604 self.docs = {} 1605 self.field_dict_partition += 1 1606 1607 def close(self): 1608 if self.terms: 1609 self.flush_terms() 1610 if self.docs: 1611 self.flush_fields() 1612 1613 class IndexReader: 1614 1615 "Accessing the term and field dictionaries." 1616 1617 def __init__(self, pathname): 1618 self.dict_reader = get_term_reader(pathname, "merged") 1619 self.field_dict_reader = get_field_reader(pathname, "merged") 1620 1621 def find_terms(self, term): 1622 return self.dict_reader.find_terms(term) 1623 1624 def find_positions(self, term): 1625 return self.dict_reader.find_positions(term) 1626 1627 def get_frequency(self, term): 1628 return self.dict_reader.get_frequency(term) 1629 1630 def get_document_frequency(self, term): 1631 return self.dict_reader.get_document_frequency(term) 1632 1633 def get_fields(self, docnum): 1634 return self.field_dict_reader.get_fields(docnum) 1635 1636 def close(self): 1637 self.dict_reader.close() 1638 self.field_dict_reader.close() 1639 1640 class Index: 1641 1642 "An inverted index solution encapsulating the various components." 1643 1644 def __init__(self, pathname): 1645 self.pathname = pathname 1646 self.reader = None 1647 self.writer = None 1648 1649 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1650 1651 """ 1652 Return a writer, optionally using the given indexing 'interval', 1653 'doc_interval' and 'flush_interval'. 1654 """ 1655 1656 if not exists(self.pathname): 1657 mkdir(self.pathname) 1658 1659 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1660 return self.writer 1661 1662 def get_reader(self, partition=0): 1663 1664 "Return a reader for the index." 1665 1666 # Ensure that only one partition exists. 1667 1668 self.merge() 1669 return self._get_reader(partition) 1670 1671 def _get_reader(self, partition): 1672 1673 "Return a reader for the index." 1674 1675 if not exists(self.pathname): 1676 raise OSError, "Index path %r does not exist." % self.pathname 1677 1678 self.reader = IndexReader(self.pathname) 1679 return self.reader 1680 1681 def merge(self): 1682 1683 "Merge/optimise index partitions." 1684 1685 self.merge_terms() 1686 self.merge_fields() 1687 1688 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1689 1690 """ 1691 Merge term dictionaries using the given indexing 'interval' and 1692 'doc_interval'. 1693 """ 1694 1695 readers = [] 1696 partitions = set() 1697 1698 for filename in listdir(self.pathname): 1699 if filename.startswith("terms-"): # 6 character prefix 1700 partition = filename[6:] 1701 readers.append(get_term_reader(self.pathname, partition)) 1702 partitions.add(partition) 1703 1704 # Write directly to a dictionary. 1705 1706 if len(readers) > 1: 1707 if "merged" in partitions: 1708 rename_term_files(self.pathname, "merged", "old-merged") 1709 partitions.remove("merged") 1710 partitions.add("old-merged") 1711 1712 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1713 merger = TermDictionaryMerger(writer, readers) 1714 merger.merge() 1715 merger.close() 1716 1717 # Remove old files. 1718 1719 for partition in partitions: 1720 remove_term_files(self.pathname, partition) 1721 1722 elif len(readers) == 1: 1723 partition = list(partitions)[0] 1724 if partition != "merged": 1725 rename_term_files(self.pathname, partition, "merged") 1726 1727 def merge_fields(self, interval=FIELD_INTERVAL): 1728 1729 "Merge field dictionaries using the given indexing 'interval'." 1730 1731 readers = [] 1732 partitions = set() 1733 1734 for filename in listdir(self.pathname): 1735 if filename.startswith("fields-"): # 7 character prefix 1736 partition = filename[7:] 1737 readers.append(get_field_reader(self.pathname, partition)) 1738 partitions.add(partition) 1739 1740 # Write directly to a dictionary. 1741 1742 if len(readers) > 1: 1743 if "merged" in partitions: 1744 rename_field_files(self.pathname, "merged", "old-merged") 1745 partitions.remove("merged") 1746 partitions.add("old-merged") 1747 1748 writer = get_field_writer(self.pathname, "merged", interval) 1749 merger = FieldDictionaryMerger(writer, readers) 1750 merger.merge() 1751 merger.close() 1752 1753 # Remove old files. 1754 1755 for partition in partitions: 1756 remove_field_files(self.pathname, partition) 1757 1758 elif len(readers) == 1: 1759 partition = list(partitions)[0] 1760 if partition != "merged": 1761 rename_field_files(self.pathname, partition, "merged") 1762 1763 def close(self): 1764 if self.reader is not None: 1765 self.reader.close() 1766 self.reader = None 1767 if self.writer is not None: 1768 self.writer.close() 1769 self.writer = None 1770 1771 # vim: tabstop=4 expandtab shiftwidth=4