1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import listdir, mkdir # index and partition discovery 22 from os import remove, rename # partition manipulation 23 from os.path import exists, join 24 from os.path import commonprefix # to find common string prefixes 25 from bisect import bisect_right # to find terms in the dictionary index 26 from bisect import insort_right # to maintain a sorted list of data for merging 27 import bz2, zlib # for field compression 28 29 # Constants. 30 31 INTERVAL = 100 32 FLUSH_INTERVAL = 1000000 33 34 compressors = [("b", bz2.compress), ("z", zlib.compress)] 35 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 36 37 # Foundation classes. 38 39 class File: 40 41 "A basic file abstraction." 42 43 def __init__(self, f): 44 self.f = f 45 self.reset() 46 47 def reset(self): 48 49 "To be used to reset the state of the reader or writer between records." 50 51 pass 52 53 def rewind(self): 54 self.f.seek(0) 55 self.reset() 56 57 def close(self): 58 if self.f is not None: 59 self.f.close() 60 self.f = None 61 62 class FileWriter(File): 63 64 "Writing basic data types to files." 65 66 def write_number(self, number): 67 68 "Write 'number' to the file using a variable length encoding." 69 70 # Negative numbers are not supported. 71 72 if number < 0: 73 raise ValueError, "Number %r is negative." % number 74 75 # Special case: one byte containing zero. 76 77 elif number == 0: 78 self.f.write(chr(0)) 79 return 80 81 # Write the number from least to most significant digits. 82 83 bytes = [] 84 85 while number != 0: 86 lsd = number & 127 87 number = number >> 7 88 if number != 0: 89 lsd |= 128 90 bytes.append(chr(lsd)) 91 92 record = "".join(bytes) 93 self.f.write(record) 94 95 def write_string(self, s, compress=0): 96 97 """ 98 Write 's' to the file, recording its length and compressing the string 99 if 'compress' is set to a true value. 100 """ 101 102 # Convert Unicode objects to strings. 103 104 if isinstance(s, unicode): 105 s = s.encode("utf-8") 106 107 # Compress the string if requested. 108 109 if compress: 110 for flag, fn in compressors: 111 cs = fn(s) 112 113 # Take the first string shorter than the original. 114 115 if len(cs) < len(s): 116 s = cs 117 break 118 else: 119 flag = "-" 120 121 # Record whether compression was used. 122 123 self.f.write(flag) 124 125 # Write the length of the data before the data itself. 126 127 length = len(s) 128 self.write_number(length) 129 self.f.write(s) 130 131 class FileReader(File): 132 133 "Reading basic data types from files." 134 135 def read_number(self): 136 137 "Read a number from the file." 138 139 # Read each byte, adding it to the number. 140 141 shift = 0 142 number = 0 143 more = 1 144 145 while more: 146 byte = self.f.read(1) 147 if not byte: 148 raise EOFError 149 150 csd = ord(byte) 151 more = csd & 128 != 0 152 if more: 153 csd &= 127 154 number += (csd << shift) 155 shift += 7 156 157 return number 158 159 def read_string(self, decompress=0): 160 161 """ 162 Read a string from the file, decompressing the stored data if 163 'decompress' is set to a true value. 164 """ 165 166 # Decompress the data if requested. 167 168 if decompress: 169 flag = self.f.read(1) 170 else: 171 flag = "-" 172 173 length = self.read_number() 174 s = self.f.read(length) 175 176 # Perform decompression if applicable. 177 178 if flag != "-": 179 fn = decompressors[flag] 180 s = fn(s) 181 182 # Convert strings to Unicode objects. 183 184 return unicode(s, "utf-8") 185 186 # Specific classes for storing term and position information. 187 188 class PositionWriter(FileWriter): 189 190 "Writing position information to files." 191 192 def reset(self): 193 self.last_docnum = 0 194 195 def write_positions(self, docnum, positions): 196 197 "Write for the document 'docnum' the given 'positions'." 198 199 if docnum < self.last_docnum: 200 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 201 202 # Write the document number delta. 203 204 self.write_number(docnum - self.last_docnum) 205 206 # Write the number of positions. 207 208 self.write_number(len(positions)) 209 210 # Make sure that the positions are sorted. 211 212 positions.sort() 213 214 # Write the position deltas. 215 216 last = 0 217 for position in positions: 218 pos = position - last 219 self.write_number(pos) 220 last = position 221 222 self.last_docnum = docnum 223 224 def write_term_positions(self, doc_positions): 225 226 """ 227 Write all 'doc_positions' - a collection of tuples of the form (document 228 number, position list) - to the file, returning a tuple containing the 229 offset at which they were stored together with the frequency (number of 230 positions) for the term involved. 231 """ 232 233 # Reset the writer and record the current file offset. 234 235 self.reset() 236 offset = self.f.tell() 237 238 # Write the number of documents. 239 240 self.write_number(len(doc_positions)) 241 242 # Write the positions. 243 244 frequency = 0 245 246 for docnum, positions in doc_positions: 247 self.write_positions(docnum, positions) 248 frequency += len(positions) 249 250 return offset, frequency 251 252 class PositionReader(FileReader): 253 254 "Reading position information from files." 255 256 def reset(self): 257 self.last_docnum = 0 258 259 def read_positions(self): 260 261 "Read positions, returning a document number and a list of positions." 262 263 # Read the document number delta and add it to the last number. 264 265 self.last_docnum += self.read_number() 266 267 # Read the number of positions. 268 269 npositions = self.read_number() 270 271 # Read the position deltas, adding each previous position to get the 272 # appropriate collection of absolute positions. 273 274 i = 0 275 last = 0 276 positions = [] 277 278 while i < npositions: 279 last += self.read_number() 280 positions.append(last) 281 i += 1 282 283 return self.last_docnum, positions 284 285 def read_term_positions(self, offset): 286 287 """ 288 Read all positions from 'offset', seeking to that position in the file 289 before reading. 290 """ 291 292 self.reset() 293 self.f.seek(offset) 294 295 # Read the number of documents. 296 297 ndocuments = self.read_number() 298 299 # Read all records. 300 301 i = 0 302 doc_positions = [] 303 304 while i < ndocuments: 305 doc_positions.append(self.read_positions()) 306 i += 1 307 308 return doc_positions 309 310 class TermWriter(FileWriter): 311 312 "Writing term information to files." 313 314 def reset(self): 315 self.last_term = "" 316 self.last_offset = 0 317 318 def write_term(self, term, offset, frequency): 319 320 """ 321 Write the given 'term', its position file 'offset', and its 'frequency' 322 to the term information file. Return the offset after the term 323 information was written to the file. 324 """ 325 326 # Write the prefix length and term suffix. 327 328 common = len(commonprefix([self.last_term, term])) 329 suffix = term[common:] 330 331 self.write_number(common) 332 self.write_string(suffix) 333 334 # Write the offset delta. 335 336 self.write_number(offset - self.last_offset) 337 338 # Write the frequency. 339 340 self.write_number(frequency) 341 342 self.last_term = term 343 self.last_offset = offset 344 345 return self.f.tell() 346 347 class TermReader(FileReader): 348 349 "Reading term information from files." 350 351 def reset(self): 352 self.last_term = "" 353 self.last_offset = 0 354 355 def read_term(self): 356 357 """ 358 Read a term, its position file offset, and its frequency from the term 359 information file. 360 """ 361 362 # Read the prefix length and term suffix. 363 364 common = self.read_number() 365 suffix = self.read_string() 366 367 self.last_term = self.last_term[:common] + suffix 368 369 # Read the offset delta. 370 371 self.last_offset += self.read_number() 372 373 # Read the frequency. 374 375 frequency = self.read_number() 376 377 return self.last_term, self.last_offset, frequency 378 379 def go_to_term(self, term, offset, info_offset): 380 381 """ 382 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 383 permits the scanning for later terms from the specified term. 384 """ 385 386 self.f.seek(info_offset) 387 self.last_term = term 388 self.last_offset = offset 389 390 class TermIndexWriter(TermWriter): 391 392 "Writing term dictionary index details to files." 393 394 def reset(self): 395 TermWriter.reset(self) 396 self.last_info_offset = 0 397 398 def write_term(self, term, offset, frequency, info_offset): 399 400 """ 401 Write the given 'term', its position file 'offset', and its 'frequency' 402 to the term dictionary index file, along with the 'info_offset' in the 403 term information file. 404 """ 405 406 TermWriter.write_term(self, term, offset, frequency) 407 408 # Write the information file offset delta. 409 410 self.write_number(info_offset - self.last_info_offset) 411 self.last_info_offset = info_offset 412 413 class TermIndexReader(TermReader): 414 415 "Reading term dictionary index details from files." 416 417 def reset(self): 418 TermReader.reset(self) 419 self.last_info_offset = 0 420 421 def read_term(self): 422 423 """ 424 Read a term, its position file offset, its frequency, and its term 425 information file offset from the term dictionary index file. 426 """ 427 428 term, offset, frequency = TermReader.read_term(self) 429 430 # Read the offset delta. 431 432 self.last_info_offset += self.read_number() 433 434 return term, offset, frequency, self.last_info_offset 435 436 class TermDictionaryWriter: 437 438 "Writing term dictionaries." 439 440 def __init__(self, info_writer, index_writer, position_writer, interval): 441 self.info_writer = info_writer 442 self.index_writer = index_writer 443 self.position_writer = position_writer 444 self.interval = interval 445 self.entry = 0 446 447 def _write_term(self, term, offset, frequency): 448 449 """ 450 Write the given 'term', its position file 'offset', and its 'frequency' 451 to the term information file and optionally to the index, making a 452 dictionary entry. 453 """ 454 455 info_offset = self.info_writer.write_term(term, offset, frequency) 456 457 if self.entry % self.interval == 0: 458 self.index_writer.write_term(term, offset, frequency, info_offset) 459 460 self.entry += 1 461 462 def write_term_positions(self, term, doc_positions): 463 464 """ 465 Write the given 'term' and the 'doc_positions' recording the documents 466 and positions at which the term is found. 467 """ 468 469 offset, frequency = self.position_writer.write_term_positions(doc_positions) 470 self._write_term(term, offset, frequency) 471 472 def close(self): 473 self.info_writer.close() 474 self.index_writer.close() 475 self.position_writer.close() 476 477 class TermDictionaryReader: 478 479 "Reading term dictionaries." 480 481 def __init__(self, info_reader, index_reader, position_reader): 482 self.info_reader = info_reader 483 self.index_reader = index_reader 484 self.position_reader = position_reader 485 486 self.terms = [] 487 try: 488 while 1: 489 self.terms.append(self.index_reader.read_term()) 490 except EOFError: 491 pass 492 493 # Large numbers for ordering purposes. 494 495 self.max_offset = self.terms[-1][1] + 1 496 497 def _find_term(self, term): 498 499 """ 500 Find the position file offset and frequency of 'term' from the term 501 dictionary. 502 """ 503 504 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 505 506 # Get the entry position providing the term or one preceding it. 507 508 if i == -1: 509 return None 510 511 found_term, offset, frequency, info_offset = self.terms[i] 512 513 # Where the term is found immediately, return the offset. 514 515 if term == found_term: 516 return offset, frequency 517 518 # Otherwise, seek past the index term's entry in the information file 519 # and scan for the desired term. 520 521 else: 522 self.info_reader.go_to_term(found_term, offset, info_offset) 523 try: 524 while term > found_term: 525 found_term, offset, frequency = self.info_reader.read_term() 526 except EOFError: 527 pass 528 529 # If the term is found, return the offset and frequency. 530 531 if term == found_term: 532 return offset, frequency 533 else: 534 return None 535 536 def rewind(self): 537 self.info_reader.rewind() 538 539 def _get_positions(self, offset): 540 return self.position_reader.read_term_positions(offset) 541 542 def read_term(self): 543 544 """ 545 Return the next term, its frequency and the documents and positions at 546 which the term is found. 547 """ 548 549 term, offset, frequency = self.info_reader.read_term() 550 positions = self._get_positions(offset) 551 return term, frequency, positions 552 553 def find_positions(self, term): 554 555 "Return the documents and positions at which the given 'term' is found." 556 557 t = self._find_term(term) 558 if t is None: 559 return None 560 else: 561 offset, frequency = t 562 return self._get_positions(offset) 563 564 def get_frequency(self, term): 565 566 "Return the frequency of the given 'term'." 567 568 t = self._find_term(term) 569 if t is None: 570 return None 571 else: 572 offset, frequency = t 573 return frequency 574 575 def close(self): 576 self.info_reader.close() 577 self.index_reader.close() 578 self.position_reader.close() 579 580 # Specific classes for storing document information. 581 582 class FieldWriter(FileWriter): 583 584 "Writing field data to files." 585 586 def reset(self): 587 self.last_docnum = 0 588 589 def write_fields(self, docnum, fields): 590 591 """ 592 Write for the given 'docnum', a list of 'fields' (integer, string pairs 593 representing field identifiers and values respectively). 594 Return the offset at which the fields are stored. 595 """ 596 597 offset = self.f.tell() 598 599 # Write the document number delta. 600 601 self.write_number(docnum - self.last_docnum) 602 603 # Write the number of fields. 604 605 self.write_number(len(fields)) 606 607 # Write the fields themselves. 608 609 for i, field in fields: 610 self.write_number(i) 611 self.write_string(field, 1) # compress 612 613 self.last_docnum = docnum 614 return offset 615 616 class FieldReader(FileReader): 617 618 "Reading field data from files." 619 620 def reset(self): 621 self.last_docnum = 0 622 623 def read_fields(self): 624 625 """ 626 Read fields from the file, returning a tuple containing the document 627 number and a list of field (identifier, value) pairs. 628 """ 629 630 # Read the document number. 631 632 self.last_docnum += self.read_number() 633 634 # Read the number of fields. 635 636 nfields = self.read_number() 637 638 # Collect the fields. 639 640 fields = [] 641 i = 0 642 643 while i < nfields: 644 identifier = self.read_number() 645 value = self.read_string(1) # decompress 646 fields.append((identifier, value)) 647 i += 1 648 649 return self.last_docnum, fields 650 651 def read_document_fields(self, docnum, offset): 652 653 """ 654 Read fields for 'docnum' at the given 'offset'. This permits the 655 retrieval of details for the specified document, as well as scanning for 656 later documents. 657 """ 658 659 self.f.seek(offset) 660 bad_docnum, fields = self.read_fields() 661 self.last_docnum = docnum 662 return docnum, fields 663 664 class FieldIndexWriter(FileWriter): 665 666 "Writing field index details to files." 667 668 def reset(self): 669 self.last_docnum = 0 670 self.last_offset = 0 671 672 def write_document(self, docnum, offset): 673 674 """ 675 Write for the given 'docnum', the 'offset' at which the fields for the 676 document are stored in the fields file. 677 """ 678 679 # Write the document number and offset deltas. 680 681 self.write_number(docnum - self.last_docnum) 682 self.write_number(offset - self.last_offset) 683 684 self.last_docnum = docnum 685 self.last_offset = offset 686 687 class FieldIndexReader(FileReader): 688 689 "Reading field index details from files." 690 691 def reset(self): 692 self.last_docnum = 0 693 self.last_offset = 0 694 695 def read_document(self): 696 697 "Read a document number and field file offset." 698 699 # Read the document number delta and offset. 700 701 self.last_docnum += self.read_number() 702 self.last_offset += self.read_number() 703 704 return self.last_docnum, self.last_offset 705 706 class FieldDictionaryWriter: 707 708 "Writing field dictionary details." 709 710 def __init__(self, field_writer, field_index_writer, interval): 711 self.field_writer = field_writer 712 self.field_index_writer = field_index_writer 713 self.interval = interval 714 self.entry = 0 715 716 def write_fields(self, docnum, fields): 717 718 "Write details of the document with the given 'docnum' and 'fields'." 719 720 offset = self.field_writer.write_fields(docnum, fields) 721 722 if self.entry % self.interval == 0: 723 self.field_index_writer.write_document(docnum, offset) 724 725 self.entry += 1 726 727 def close(self): 728 self.field_writer.close() 729 self.field_index_writer.close() 730 731 class FieldDictionaryReader: 732 733 "Reading field dictionary details." 734 735 def __init__(self, field_reader, field_index_reader): 736 self.field_reader = field_reader 737 self.field_index_reader = field_index_reader 738 739 self.docs = [] 740 try: 741 while 1: 742 self.docs.append(self.field_index_reader.read_document()) 743 except EOFError: 744 pass 745 746 # Large numbers for ordering purposes. 747 748 self.max_offset = self.docs[-1][1] 749 750 def rewind(self): 751 self.field_reader.rewind() 752 753 def read_fields(self): 754 755 "Return the next document number and fields." 756 757 return self.field_reader.read_fields() 758 759 def get_fields(self, docnum): 760 761 "Read the fields of the document with the given 'docnum'." 762 763 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 764 765 # Get the entry position providing the term or one preceding it. 766 767 if i == -1: 768 return None 769 770 found_docnum, offset = self.docs[i] 771 772 # Read from the fields file. 773 774 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 775 776 # Scan for the document, if necessary. 777 778 try: 779 while docnum > found_docnum: 780 found_docnum, fields = self.field_reader.read_fields() 781 except EOFError: 782 pass 783 784 # If the document is found, return the fields. 785 786 if docnum == found_docnum: 787 return fields 788 else: 789 return None 790 791 def close(self): 792 self.field_reader.close() 793 self.field_index_reader.close() 794 795 # Dictionary merging classes. 796 797 class Merger: 798 799 "Merge files." 800 801 def __init__(self, writer, readers): 802 self.writer = writer 803 self.readers = readers 804 805 def close(self): 806 for reader in self.readers: 807 reader.close() 808 self.writer.close() 809 810 class TermDictionaryMerger(Merger): 811 812 "Merge term and position files." 813 814 def merge(self): 815 816 """ 817 Merge terms and positions from the readers, sending them to the writer. 818 """ 819 820 entries = [] 821 822 # Get the first entries from the readers. 823 824 for partition, reader in enumerate(self.readers): 825 reader.rewind() 826 827 try: 828 term, frequency, positions = reader.read_term() 829 insort_right(entries, (term, positions, partition)) 830 except EOFError: 831 pass 832 833 # While entries are available, write them out in order, merging where 834 # appropriate. 835 836 while entries: 837 term, doc_positions, partition = entries[0] 838 to_update = [partition] 839 840 nentries = len(entries) 841 i = 1 842 843 # Find other entries for the term. 844 845 while i < nentries: 846 other_term, other_doc_positions, other_partition = entries[i] 847 848 # For such entries, merge the positions. 849 850 if other_term == term: 851 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 852 to_update.append(other_partition) 853 i += 1 854 else: 855 break 856 857 # Write the combined term details. 858 859 self.writer.write_term_positions(term, doc_positions) 860 861 # Update the entries from the affected readers. 862 863 del entries[:i] 864 865 for partition in to_update: 866 try: 867 term, frequency, positions = self.readers[partition].read_term() 868 insort_right(entries, (term, positions, partition)) 869 except EOFError: 870 pass 871 872 def merge_positions(self, doc_positions, other_doc_positions): 873 874 """ 875 Merge 'doc_positions' with 'other_doc_positions' so that common document 876 records contain positions from both collections. 877 """ 878 879 doc_position_dict = dict(doc_positions) 880 881 for docnum, positions in other_doc_positions: 882 if doc_position_dict.has_key(docnum): 883 doc_position_dict[docnum] += positions 884 else: 885 doc_position_dict[docnum] = positions 886 887 return doc_position_dict.items() 888 889 class FieldDictionaryMerger(Merger): 890 891 "Merge field files." 892 893 def merge(self): 894 895 """ 896 Merge fields from the readers, sending them to the writer. 897 """ 898 899 entries = [] 900 901 # Get the first entries from the readers. 902 903 for partition, reader in enumerate(self.readers): 904 reader.rewind() 905 906 try: 907 docnum, fields = reader.read_fields() 908 insort_right(entries, (docnum, fields, partition)) 909 except EOFError: 910 pass 911 912 # While entries are available, write them out in order, merging where 913 # appropriate. 914 915 while entries: 916 docnum, fields, partition = entries[0] 917 to_update = [partition] 918 919 nentries = len(entries) 920 i = 1 921 922 # Find other entries for the term. 923 924 while i < nentries: 925 other_docnum, other_fields, other_partition = entries[i] 926 927 # For such entries, merge the positions. 928 929 if other_term == term: 930 fields += other_fields 931 to_update.append(other_partition) 932 i += 1 933 else: 934 break 935 936 # Write the combined term details. 937 938 self.writer.write_fields(docnum, fields) 939 940 # Update the entries from the affected readers. 941 942 del entries[:i] 943 944 for partition in to_update: 945 try: 946 docnum, fields = self.readers[partition].read_fields() 947 insort_right(entries, (docnum, fields, partition)) 948 except EOFError: 949 pass 950 951 # Utility functions. 952 953 def get_term_writer(pathname, partition, interval): 954 955 """ 956 Return a term dictionary writer using files under the given 'pathname' 957 labelled according to the given 'partition', using the given indexing 958 'interval'. 959 """ 960 961 tdf = open(join(pathname, "terms-%s" % partition), "wb") 962 info_writer = TermWriter(tdf) 963 964 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 965 index_writer = TermIndexWriter(tdif) 966 967 tpf = open(join(pathname, "positions-%s" % partition), "wb") 968 positions_writer = PositionWriter(tpf) 969 970 return TermDictionaryWriter(info_writer, index_writer, positions_writer, interval) 971 972 def get_field_writer(pathname, partition, interval): 973 974 """ 975 Return a field dictionary writer using files under the given 'pathname' 976 labelled according to the given 'partition', using the given indexing 977 'interval'. 978 """ 979 980 ff = open(join(pathname, "fields-%s" % partition), "wb") 981 field_writer = FieldWriter(ff) 982 983 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 984 field_index_writer = FieldIndexWriter(fif) 985 986 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 987 988 def get_term_reader(pathname, partition): 989 990 """ 991 Return a term dictionary reader using files under the given 'pathname' 992 labelled according to the given 'partition'. 993 """ 994 995 tdf = open(join(pathname, "terms-%s" % partition), "rb") 996 info_reader = TermReader(tdf) 997 998 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 999 index_reader = TermIndexReader(tdif) 1000 1001 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1002 positions_reader = PositionReader(tpf) 1003 1004 return TermDictionaryReader(info_reader, index_reader, positions_reader) 1005 1006 def get_field_reader(pathname, partition): 1007 1008 """ 1009 Return a field dictionary reader using files under the given 'pathname' 1010 labelled according to the given 'partition'. 1011 """ 1012 1013 ff = open(join(pathname, "fields-%s" % partition), "rb") 1014 field_reader = FieldReader(ff) 1015 1016 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1017 field_index_reader = FieldIndexReader(fif) 1018 1019 return FieldDictionaryReader(field_reader, field_index_reader) 1020 1021 def rename_files(pathname, names, from_partition, to_partition): 1022 for name in names: 1023 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1024 1025 def rename_term_files(pathname, from_partition, to_partition): 1026 rename_files(pathname, ("terms", "terms_index", "positions"), from_partition, to_partition) 1027 1028 def rename_field_files(pathname, from_partition, to_partition): 1029 rename_files(pathname, ("fields", "fields_index"), from_partition, to_partition) 1030 1031 def remove_files(pathname, names, partition): 1032 for name in names: 1033 remove(join(pathname, "%s-%s" % (name, partition))) 1034 1035 def remove_term_files(pathname, partition): 1036 remove_files(pathname, ("terms", "terms_index", "positions"), partition) 1037 1038 def remove_field_files(pathname, partition): 1039 remove_files(pathname, ("fields", "fields_index"), partition) 1040 1041 # High-level classes. 1042 1043 class IndexWriter: 1044 1045 """ 1046 Building term information and writing it to the term and field dictionaries. 1047 """ 1048 1049 def __init__(self, pathname, interval, flush_interval): 1050 self.pathname = pathname 1051 self.interval = interval 1052 self.flush_interval = flush_interval 1053 1054 self.dict_partition = 0 1055 self.field_dict_partition = 0 1056 1057 self.terms = {} 1058 self.docs = {} 1059 1060 self.position_counter = 0 1061 self.field_counter = 0 1062 1063 def add_position(self, term, docnum, position): 1064 1065 """ 1066 Add a position entry for the given 'term' in the document with the given 1067 'docnum', indicating the given 'position'. 1068 """ 1069 1070 if not self.terms.has_key(term): 1071 doc_positions = self.terms[term] = {} 1072 else: 1073 doc_positions = self.terms[term] 1074 1075 if not doc_positions.has_key(docnum): 1076 doc = doc_positions[docnum] = [] 1077 else: 1078 doc = doc_positions[docnum] 1079 1080 doc.append(position) 1081 1082 self.position_counter += 1 1083 if self.flush_interval and self.position_counter >= self.flush_interval: 1084 self.flush_terms() 1085 self.position_counter = 0 1086 1087 def add_field(self, docnum, identifier, value): 1088 1089 """ 1090 Add for the document with the given 'docnum' a field having the given 1091 'identifier' and 'value'. 1092 """ 1093 1094 if not self.docs.has_key(docnum): 1095 doc_fields = self.docs[docnum] = [] 1096 else: 1097 doc_fields = self.docs[docnum] 1098 1099 doc_fields.append((identifier, value)) 1100 1101 self.field_counter += 1 1102 if self.flush_interval and self.field_counter >= self.flush_interval: 1103 self.flush_fields() 1104 self.field_counter = 0 1105 1106 def get_term_writer(self): 1107 1108 "Return a term dictionary writer for the current partition." 1109 1110 return get_term_writer(self.pathname, self.dict_partition, self.interval) 1111 1112 def get_field_writer(self): 1113 1114 "Return a field dictionary writer for the current partition." 1115 1116 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1117 1118 def flush_terms(self): 1119 1120 "Flush terms into the current term dictionary partition." 1121 1122 # Get the terms in order. 1123 1124 terms = self.terms.items() 1125 terms.sort() 1126 1127 dict_writer = self.get_term_writer() 1128 1129 for term, doc_positions in terms: 1130 doc_positions = doc_positions.items() 1131 doc_positions.sort() 1132 dict_writer.write_term_positions(term, doc_positions) 1133 1134 dict_writer.close() 1135 1136 self.terms = {} 1137 self.dict_partition += 1 1138 1139 def flush_fields(self): 1140 1141 "Flush fields into the current term dictionary partition." 1142 1143 # Get the documents in order. 1144 1145 docs = self.docs.items() 1146 docs.sort() 1147 1148 field_dict_writer = self.get_field_writer() 1149 1150 for docnum, fields in docs: 1151 field_dict_writer.write_fields(docnum, fields) 1152 1153 field_dict_writer.close() 1154 1155 self.docs = {} 1156 self.field_dict_partition += 1 1157 1158 def close(self): 1159 if self.terms: 1160 self.flush_terms() 1161 if self.docs: 1162 self.flush_fields() 1163 1164 class IndexReader: 1165 1166 "Accessing the term and field dictionaries." 1167 1168 def __init__(self, pathname): 1169 self.dict_reader = get_term_reader(pathname, "merged") 1170 self.field_dict_reader = get_field_reader(pathname, "merged") 1171 1172 def find_positions(self, term): 1173 return self.dict_reader.find_positions(term) 1174 1175 def get_frequency(self, term): 1176 return self.dict_reader.get_frequency(term) 1177 1178 def get_fields(self, docnum): 1179 return self.field_dict_reader.get_fields(docnum) 1180 1181 def close(self): 1182 self.dict_reader.close() 1183 self.field_dict_reader.close() 1184 1185 class Index: 1186 1187 "An inverted index solution encapsulating the various components." 1188 1189 def __init__(self, pathname): 1190 self.pathname = pathname 1191 self.reader = None 1192 self.writer = None 1193 1194 def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL): 1195 1196 """ 1197 Return a writer, optionally using the given indexing 'interval' and 1198 'flush_interval'. 1199 """ 1200 1201 if not exists(self.pathname): 1202 mkdir(self.pathname) 1203 1204 self.writer = IndexWriter(self.pathname, interval, flush_interval) 1205 return self.writer 1206 1207 def get_reader(self, partition=0): 1208 1209 "Return a reader for the index." 1210 1211 # Ensure that only one partition exists. 1212 1213 self.merge_terms() 1214 self.merge_fields() 1215 1216 return self._get_reader(partition) 1217 1218 def _get_reader(self, partition): 1219 1220 "Return a reader for the index." 1221 1222 if not exists(self.pathname): 1223 raise OSError, "Index path %r does not exist." % self.pathname 1224 1225 self.reader = IndexReader(self.pathname) 1226 return self.reader 1227 1228 def merge_terms(self, interval=INTERVAL): 1229 1230 "Merge term dictionaries using the given indexing 'interval'." 1231 1232 readers = [] 1233 partitions = [] 1234 1235 for filename in listdir(self.pathname): 1236 if filename.startswith("terms-"): # 6 character prefix 1237 partition = filename[6:] 1238 readers.append(get_term_reader(self.pathname, partition)) 1239 partitions.append(partition) 1240 1241 # Write directly to a dictionary. 1242 1243 if len(readers) > 1: 1244 writer = get_term_writer(self.pathname, "merged", interval) 1245 merger = TermDictionaryMerger(writer, readers) 1246 merger.merge() 1247 merger.close() 1248 1249 # Remove old files. 1250 1251 for partition in partitions: 1252 remove_term_files(self.pathname, partition) 1253 1254 elif len(readers) == 1 and partitions[0] != "merged": 1255 rename_term_files(self.pathname, partitions[0], "merged") 1256 1257 def merge_fields(self, interval=INTERVAL): 1258 1259 "Merge field dictionaries using the given indexing 'interval'." 1260 1261 readers = [] 1262 partitions = [] 1263 1264 for filename in listdir(self.pathname): 1265 if filename.startswith("fields-"): # 7 character prefix 1266 partition = filename[7:] 1267 readers.append(get_field_reader(self.pathname, partition)) 1268 partitions.append(partition) 1269 1270 # Write directly to a dictionary. 1271 1272 if len(readers) > 1: 1273 writer = get_field_writer(self.pathname, "merged", interval) 1274 merger = FieldDictionaryMerger(writer, readers) 1275 merger.merge() 1276 merger.close() 1277 1278 # Remove old files. 1279 1280 for partition in partitions: 1281 remove_field_files(self.pathname, partition) 1282 1283 elif len(readers) == 1 and partitions[0] != "merged": 1284 rename_field_files(self.pathname, partitions[0], "merged") 1285 1286 def close(self): 1287 if self.reader is not None: 1288 self.reader.close() 1289 self.reader = None 1290 if self.writer is not None: 1291 self.writer.close() 1292 self.writer = None 1293 1294 # vim: tabstop=4 expandtab shiftwidth=4