1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import listdir, mkdir # index and partition discovery 22 from os import remove, rename # partition manipulation 23 from os.path import exists, join 24 from os.path import commonprefix # to find common string prefixes 25 from bisect import bisect_right # to find terms in the dictionary index 26 from bisect import insort_right # to maintain a sorted list of data for merging 27 import bz2, zlib # for field compression 28 29 # Constants. 30 31 INTERVAL = 100 32 FLUSH_INTERVAL = 1000000 33 34 compressors = [("b", bz2.compress), ("z", zlib.compress)] 35 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 36 37 # Foundation classes. 38 39 class File: 40 41 "A basic file abstraction." 42 43 def __init__(self, f): 44 self.f = f 45 self.reset() 46 47 def reset(self): 48 49 "To be used to reset the state of the reader or writer between records." 50 51 pass 52 53 def rewind(self): 54 self.f.seek(0) 55 self.reset() 56 57 def close(self): 58 if self.f is not None: 59 self.f.close() 60 self.f = None 61 62 class FileWriter(File): 63 64 "Writing basic data types to files." 65 66 def write_number(self, number): 67 68 "Write 'number' to the file using a variable length encoding." 69 70 # Negative numbers are not supported. 71 72 if number < 0: 73 raise ValueError, "Number %r is negative." % number 74 75 # Special case: one byte containing zero. 76 77 elif number == 0: 78 self.f.write(chr(0)) 79 return 80 81 # Write the number from least to most significant digits. 82 83 bytes = [] 84 85 while number != 0: 86 lsd = number & 127 87 number = number >> 7 88 if number != 0: 89 lsd |= 128 90 bytes.append(chr(lsd)) 91 92 record = "".join(bytes) 93 self.f.write(record) 94 95 def write_string(self, s, compress=0): 96 97 """ 98 Write 's' to the file, recording its length and compressing the string 99 if 'compress' is set to a true value. 100 """ 101 102 # Convert Unicode objects to strings. 103 104 if isinstance(s, unicode): 105 s = s.encode("utf-8") 106 107 # Compress the string if requested. 108 109 if compress: 110 for flag, fn in compressors: 111 cs = fn(s) 112 113 # Take the first string shorter than the original. 114 115 if len(cs) < len(s): 116 s = cs 117 break 118 else: 119 flag = "-" 120 121 # Record whether compression was used. 122 123 self.f.write(flag) 124 125 # Write the length of the data before the data itself. 126 127 length = len(s) 128 self.write_number(length) 129 self.f.write(s) 130 131 class FileReader(File): 132 133 "Reading basic data types from files." 134 135 def read_number(self): 136 137 "Read a number from the file." 138 139 # Read each byte, adding it to the number. 140 141 shift = 0 142 number = 0 143 more = 1 144 145 while more: 146 byte = self.f.read(1) 147 if not byte: 148 raise EOFError 149 150 csd = ord(byte) 151 more = csd & 128 != 0 152 if more: 153 csd &= 127 154 number += (csd << shift) 155 shift += 7 156 157 return number 158 159 def read_string(self, decompress=0): 160 161 """ 162 Read a string from the file, decompressing the stored data if 163 'decompress' is set to a true value. 164 """ 165 166 # Decompress the data if requested. 167 168 if decompress: 169 flag = self.f.read(1) 170 else: 171 flag = "-" 172 173 length = self.read_number() 174 s = self.f.read(length) 175 176 # Perform decompression if applicable. 177 178 if flag != "-": 179 fn = decompressors[flag] 180 s = fn(s) 181 182 # Convert strings to Unicode objects. 183 184 return unicode(s, "utf-8") 185 186 # Specific classes for storing term and position information. 187 188 class PositionWriter(FileWriter): 189 190 "Writing position information to files." 191 192 def reset(self): 193 self.last_docnum = 0 194 195 def write_positions(self, docnum, positions): 196 197 "Write for the document 'docnum' the given 'positions'." 198 199 if docnum < self.last_docnum: 200 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 201 202 # Write the document number delta. 203 204 self.write_number(docnum - self.last_docnum) 205 206 # Write the number of positions. 207 208 self.write_number(len(positions)) 209 210 # Make sure that the positions are sorted. 211 212 positions.sort() 213 214 # Write the position deltas. 215 216 last = 0 217 for position in positions: 218 pos = position - last 219 self.write_number(pos) 220 last = position 221 222 self.last_docnum = docnum 223 224 def write_term_positions(self, doc_positions): 225 226 """ 227 Write all 'doc_positions' - a collection of tuples of the form (document 228 number, position list) - to the file, returning a tuple containing the 229 offset at which they were stored together with the frequency (number of 230 positions) for the term involved. 231 """ 232 233 # Reset the writer and record the current file offset. 234 235 self.reset() 236 offset = self.f.tell() 237 238 # Write the number of documents. 239 240 self.write_number(len(doc_positions)) 241 doc_positions.sort() 242 243 # Write the positions. 244 245 frequency = 0 246 247 for docnum, positions in doc_positions: 248 self.write_positions(docnum, positions) 249 frequency += len(positions) 250 251 return offset, frequency 252 253 class PositionReader(FileReader): 254 255 "Reading position information from files." 256 257 def reset(self): 258 self.last_docnum = 0 259 260 def read_positions(self): 261 262 "Read positions, returning a document number and a list of positions." 263 264 # Read the document number delta and add it to the last number. 265 266 self.last_docnum += self.read_number() 267 268 # Read the number of positions. 269 270 npositions = self.read_number() 271 272 # Read the position deltas, adding each previous position to get the 273 # appropriate collection of absolute positions. 274 275 i = 0 276 last = 0 277 positions = [] 278 279 while i < npositions: 280 last += self.read_number() 281 positions.append(last) 282 i += 1 283 284 return self.last_docnum, positions 285 286 def read_term_positions(self, offset): 287 288 """ 289 Read all positions from 'offset', seeking to that position in the file 290 before reading. 291 """ 292 293 self.reset() 294 self.f.seek(offset) 295 296 # Read the number of documents. 297 298 ndocuments = self.read_number() 299 300 # Read all records. 301 302 i = 0 303 doc_positions = [] 304 305 while i < ndocuments: 306 doc_positions.append(self.read_positions()) 307 i += 1 308 309 return doc_positions 310 311 class TermWriter(FileWriter): 312 313 "Writing term information to files." 314 315 def reset(self): 316 self.last_term = "" 317 self.last_offset = 0 318 319 def write_term(self, term, offset, frequency): 320 321 """ 322 Write the given 'term', its position file 'offset', and its 'frequency' 323 to the term information file. Return the offset after the term 324 information was written to the file. 325 """ 326 327 # Write the prefix length and term suffix. 328 329 common = len(commonprefix([self.last_term, term])) 330 suffix = term[common:] 331 332 self.write_number(common) 333 self.write_string(suffix) 334 335 # Write the offset delta. 336 337 self.write_number(offset - self.last_offset) 338 339 # Write the frequency. 340 341 self.write_number(frequency) 342 343 self.last_term = term 344 self.last_offset = offset 345 346 return self.f.tell() 347 348 class TermReader(FileReader): 349 350 "Reading term information from files." 351 352 def reset(self): 353 self.last_term = "" 354 self.last_offset = 0 355 356 def read_term(self): 357 358 """ 359 Read a term, its position file offset, and its frequency from the term 360 information file. 361 """ 362 363 # Read the prefix length and term suffix. 364 365 common = self.read_number() 366 suffix = self.read_string() 367 368 self.last_term = self.last_term[:common] + suffix 369 370 # Read the offset delta. 371 372 self.last_offset += self.read_number() 373 374 # Read the frequency. 375 376 frequency = self.read_number() 377 378 return self.last_term, self.last_offset, frequency 379 380 def go_to_term(self, term, offset, info_offset): 381 382 """ 383 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 384 permits the scanning for later terms from the specified term. 385 """ 386 387 self.f.seek(info_offset) 388 self.last_term = term 389 self.last_offset = offset 390 391 class TermIndexWriter(TermWriter): 392 393 "Writing term dictionary index details to files." 394 395 def reset(self): 396 TermWriter.reset(self) 397 self.last_info_offset = 0 398 399 def write_term(self, term, offset, frequency, info_offset): 400 401 """ 402 Write the given 'term', its position file 'offset', and its 'frequency' 403 to the term dictionary index file, along with the 'info_offset' in the 404 term information file. 405 """ 406 407 TermWriter.write_term(self, term, offset, frequency) 408 409 # Write the information file offset delta. 410 411 self.write_number(info_offset - self.last_info_offset) 412 self.last_info_offset = info_offset 413 414 class TermIndexReader(TermReader): 415 416 "Reading term dictionary index details from files." 417 418 def reset(self): 419 TermReader.reset(self) 420 self.last_info_offset = 0 421 422 def read_term(self): 423 424 """ 425 Read a term, its position file offset, its frequency, and its term 426 information file offset from the term dictionary index file. 427 """ 428 429 term, offset, frequency = TermReader.read_term(self) 430 431 # Read the offset delta. 432 433 self.last_info_offset += self.read_number() 434 435 return term, offset, frequency, self.last_info_offset 436 437 class TermDictionaryWriter: 438 439 "Writing term dictionaries." 440 441 def __init__(self, info_writer, index_writer, position_writer, interval): 442 self.info_writer = info_writer 443 self.index_writer = index_writer 444 self.position_writer = position_writer 445 self.interval = interval 446 self.entry = 0 447 448 def _write_term(self, term, offset, frequency): 449 450 """ 451 Write the given 'term', its position file 'offset', and its 'frequency' 452 to the term information file and optionally to the index, making a 453 dictionary entry. 454 """ 455 456 info_offset = self.info_writer.write_term(term, offset, frequency) 457 458 if self.entry % self.interval == 0: 459 self.index_writer.write_term(term, offset, frequency, info_offset) 460 461 self.entry += 1 462 463 def write_term_positions(self, term, doc_positions): 464 465 """ 466 Write the given 'term' and the 'doc_positions' recording the documents 467 and positions at which the term is found. 468 """ 469 470 offset, frequency = self.position_writer.write_term_positions(doc_positions) 471 self._write_term(term, offset, frequency) 472 473 def close(self): 474 self.info_writer.close() 475 self.index_writer.close() 476 self.position_writer.close() 477 478 class TermDictionaryReader: 479 480 "Reading term dictionaries." 481 482 def __init__(self, info_reader, index_reader, position_reader): 483 self.info_reader = info_reader 484 self.index_reader = index_reader 485 self.position_reader = position_reader 486 487 self.terms = [] 488 try: 489 while 1: 490 self.terms.append(self.index_reader.read_term()) 491 except EOFError: 492 pass 493 494 # Large numbers for ordering purposes. 495 496 self.max_offset = self.terms[-1][1] + 1 497 498 def _find_term(self, term): 499 500 """ 501 Find the position file offset and frequency of 'term' from the term 502 dictionary. 503 """ 504 505 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 506 507 # Get the entry position providing the term or one preceding it. 508 509 if i == -1: 510 return None 511 512 found_term, offset, frequency, info_offset = self.terms[i] 513 514 # Where the term is found immediately, return the offset. 515 516 if term == found_term: 517 return offset, frequency 518 519 # Otherwise, seek past the index term's entry in the information file 520 # and scan for the desired term. 521 522 else: 523 self.info_reader.go_to_term(found_term, offset, info_offset) 524 try: 525 while term > found_term: 526 found_term, offset, frequency = self.info_reader.read_term() 527 except EOFError: 528 pass 529 530 # If the term is found, return the offset and frequency. 531 532 if term == found_term: 533 return offset, frequency 534 else: 535 return None 536 537 def rewind(self): 538 self.info_reader.rewind() 539 540 def _get_positions(self, offset): 541 return self.position_reader.read_term_positions(offset) 542 543 def read_term(self): 544 545 """ 546 Return the next term, its frequency and the documents and positions at 547 which the term is found. 548 """ 549 550 term, offset, frequency = self.info_reader.read_term() 551 positions = self._get_positions(offset) 552 return term, frequency, positions 553 554 def find_positions(self, term): 555 556 "Return the documents and positions at which the given 'term' is found." 557 558 t = self._find_term(term) 559 if t is None: 560 return None 561 else: 562 offset, frequency = t 563 return self._get_positions(offset) 564 565 def get_frequency(self, term): 566 567 "Return the frequency of the given 'term'." 568 569 t = self._find_term(term) 570 if t is None: 571 return None 572 else: 573 offset, frequency = t 574 return frequency 575 576 def close(self): 577 self.info_reader.close() 578 self.index_reader.close() 579 self.position_reader.close() 580 581 # Specific classes for storing document information. 582 583 class FieldWriter(FileWriter): 584 585 "Writing field data to files." 586 587 def reset(self): 588 self.last_docnum = 0 589 590 def write_fields(self, docnum, fields): 591 592 """ 593 Write for the given 'docnum', a list of 'fields' (integer, string pairs 594 representing field identifiers and values respectively). 595 Return the offset at which the fields are stored. 596 """ 597 598 offset = self.f.tell() 599 600 # Write the document number delta. 601 602 self.write_number(docnum - self.last_docnum) 603 604 # Write the number of fields. 605 606 self.write_number(len(fields)) 607 608 # Write the fields themselves. 609 610 for i, field in fields: 611 self.write_number(i) 612 self.write_string(field, 1) # compress 613 614 self.last_docnum = docnum 615 return offset 616 617 class FieldReader(FileReader): 618 619 "Reading field data from files." 620 621 def reset(self): 622 self.last_docnum = 0 623 624 def read_fields(self): 625 626 """ 627 Read fields from the file, returning a tuple containing the document 628 number and a list of field (identifier, value) pairs. 629 """ 630 631 # Read the document number. 632 633 self.last_docnum += self.read_number() 634 635 # Read the number of fields. 636 637 nfields = self.read_number() 638 639 # Collect the fields. 640 641 fields = [] 642 i = 0 643 644 while i < nfields: 645 identifier = self.read_number() 646 value = self.read_string(1) # decompress 647 fields.append((identifier, value)) 648 i += 1 649 650 return self.last_docnum, fields 651 652 def read_document_fields(self, docnum, offset): 653 654 """ 655 Read fields for 'docnum' at the given 'offset'. This permits the 656 retrieval of details for the specified document, as well as scanning for 657 later documents. 658 """ 659 660 self.f.seek(offset) 661 bad_docnum, fields = self.read_fields() 662 self.last_docnum = docnum 663 return docnum, fields 664 665 class FieldIndexWriter(FileWriter): 666 667 "Writing field index details to files." 668 669 def reset(self): 670 self.last_docnum = 0 671 self.last_offset = 0 672 673 def write_document(self, docnum, offset): 674 675 """ 676 Write for the given 'docnum', the 'offset' at which the fields for the 677 document are stored in the fields file. 678 """ 679 680 # Write the document number and offset deltas. 681 682 self.write_number(docnum - self.last_docnum) 683 self.write_number(offset - self.last_offset) 684 685 self.last_docnum = docnum 686 self.last_offset = offset 687 688 class FieldIndexReader(FileReader): 689 690 "Reading field index details from files." 691 692 def reset(self): 693 self.last_docnum = 0 694 self.last_offset = 0 695 696 def read_document(self): 697 698 "Read a document number and field file offset." 699 700 # Read the document number delta and offset. 701 702 self.last_docnum += self.read_number() 703 self.last_offset += self.read_number() 704 705 return self.last_docnum, self.last_offset 706 707 class FieldDictionaryWriter: 708 709 "Writing field dictionary details." 710 711 def __init__(self, field_writer, field_index_writer, interval): 712 self.field_writer = field_writer 713 self.field_index_writer = field_index_writer 714 self.interval = interval 715 self.entry = 0 716 717 def write_fields(self, docnum, fields): 718 719 "Write details of the document with the given 'docnum' and 'fields'." 720 721 offset = self.field_writer.write_fields(docnum, fields) 722 723 if self.entry % self.interval == 0: 724 self.field_index_writer.write_document(docnum, offset) 725 726 self.entry += 1 727 728 def close(self): 729 self.field_writer.close() 730 self.field_index_writer.close() 731 732 class FieldDictionaryReader: 733 734 "Reading field dictionary details." 735 736 def __init__(self, field_reader, field_index_reader): 737 self.field_reader = field_reader 738 self.field_index_reader = field_index_reader 739 740 self.docs = [] 741 try: 742 while 1: 743 self.docs.append(self.field_index_reader.read_document()) 744 except EOFError: 745 pass 746 747 # Large numbers for ordering purposes. 748 749 self.max_offset = self.docs[-1][1] 750 751 def rewind(self): 752 self.field_reader.rewind() 753 754 def read_fields(self): 755 756 "Return the next document number and fields." 757 758 return self.field_reader.read_fields() 759 760 def get_fields(self, docnum): 761 762 "Read the fields of the document with the given 'docnum'." 763 764 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 765 766 # Get the entry position providing the term or one preceding it. 767 768 if i == -1: 769 return None 770 771 found_docnum, offset = self.docs[i] 772 773 # Read from the fields file. 774 775 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 776 777 # Scan for the document, if necessary. 778 779 try: 780 while docnum > found_docnum: 781 found_docnum, fields = self.field_reader.read_fields() 782 except EOFError: 783 pass 784 785 # If the document is found, return the fields. 786 787 if docnum == found_docnum: 788 return fields 789 else: 790 return None 791 792 def close(self): 793 self.field_reader.close() 794 self.field_index_reader.close() 795 796 # Dictionary merging classes. 797 798 class Merger: 799 800 "Merge files." 801 802 def __init__(self, writer, readers): 803 self.writer = writer 804 self.readers = readers 805 806 def close(self): 807 for reader in self.readers: 808 reader.close() 809 self.writer.close() 810 811 class TermDictionaryMerger(Merger): 812 813 "Merge term and position files." 814 815 def merge(self): 816 817 """ 818 Merge terms and positions from the readers, sending them to the writer. 819 """ 820 821 entries = [] 822 823 # Get the first entries from the readers. 824 825 for partition, reader in enumerate(self.readers): 826 reader.rewind() 827 828 try: 829 term, frequency, positions = reader.read_term() 830 insort_right(entries, (term, positions, partition)) 831 except EOFError: 832 pass 833 834 # While entries are available, write them out in order, merging where 835 # appropriate. 836 837 while entries: 838 term, doc_positions, partition = entries[0] 839 to_update = [partition] 840 841 nentries = len(entries) 842 i = 1 843 844 # Find other entries for the term. 845 846 while i < nentries: 847 other_term, other_doc_positions, other_partition = entries[i] 848 849 # For such entries, merge the positions. 850 851 if other_term == term: 852 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 853 to_update.append(other_partition) 854 i += 1 855 else: 856 break 857 858 # Write the combined term details. 859 860 self.writer.write_term_positions(term, doc_positions) 861 862 # Update the entries from the affected readers. 863 864 del entries[:i] 865 866 for partition in to_update: 867 try: 868 term, frequency, positions = self.readers[partition].read_term() 869 insort_right(entries, (term, positions, partition)) 870 except EOFError: 871 pass 872 873 def merge_positions(self, doc_positions, other_doc_positions): 874 875 """ 876 Merge 'doc_positions' with 'other_doc_positions' so that common document 877 records contain positions from both collections. 878 """ 879 880 doc_position_dict = dict(doc_positions) 881 882 for docnum, positions in other_doc_positions: 883 if doc_position_dict.has_key(docnum): 884 doc_position_dict[docnum] += positions 885 else: 886 doc_position_dict[docnum] = positions 887 888 return doc_position_dict.items() 889 890 class FieldDictionaryMerger(Merger): 891 892 "Merge field files." 893 894 def merge(self): 895 896 """ 897 Merge fields from the readers, sending them to the writer. 898 """ 899 900 entries = [] 901 902 # Get the first entries from the readers. 903 904 for partition, reader in enumerate(self.readers): 905 reader.rewind() 906 907 try: 908 docnum, fields = reader.read_fields() 909 insort_right(entries, (docnum, fields, partition)) 910 except EOFError: 911 pass 912 913 # While entries are available, write them out in order, merging where 914 # appropriate. 915 916 while entries: 917 docnum, fields, partition = entries[0] 918 to_update = [partition] 919 920 nentries = len(entries) 921 i = 1 922 923 # Find other entries for the term. 924 925 while i < nentries: 926 other_docnum, other_fields, other_partition = entries[i] 927 928 # For such entries, merge the positions. 929 930 if other_term == term: 931 fields += other_fields 932 to_update.append(other_partition) 933 i += 1 934 else: 935 break 936 937 # Write the combined term details. 938 939 self.writer.write_fields(docnum, fields) 940 941 # Update the entries from the affected readers. 942 943 del entries[:i] 944 945 for partition in to_update: 946 try: 947 docnum, fields = self.readers[partition].read_fields() 948 insort_right(entries, (docnum, fields, partition)) 949 except EOFError: 950 pass 951 952 # Utility functions. 953 954 def get_term_writer(pathname, partition, interval): 955 956 """ 957 Return a term dictionary writer using files under the given 'pathname' 958 labelled according to the given 'partition', using the given indexing 959 'interval'. 960 """ 961 962 tdf = open(join(pathname, "terms-%s" % partition), "wb") 963 info_writer = TermWriter(tdf) 964 965 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 966 index_writer = TermIndexWriter(tdif) 967 968 tpf = open(join(pathname, "positions-%s" % partition), "wb") 969 positions_writer = PositionWriter(tpf) 970 971 return TermDictionaryWriter(info_writer, index_writer, positions_writer, interval) 972 973 def get_field_writer(pathname, partition, interval): 974 975 """ 976 Return a field dictionary writer using files under the given 'pathname' 977 labelled according to the given 'partition', using the given indexing 978 'interval'. 979 """ 980 981 ff = open(join(pathname, "fields-%s" % partition), "wb") 982 field_writer = FieldWriter(ff) 983 984 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 985 field_index_writer = FieldIndexWriter(fif) 986 987 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 988 989 def get_term_reader(pathname, partition): 990 991 """ 992 Return a term dictionary reader using files under the given 'pathname' 993 labelled according to the given 'partition'. 994 """ 995 996 tdf = open(join(pathname, "terms-%s" % partition), "rb") 997 info_reader = TermReader(tdf) 998 999 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1000 index_reader = TermIndexReader(tdif) 1001 1002 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1003 positions_reader = PositionReader(tpf) 1004 1005 return TermDictionaryReader(info_reader, index_reader, positions_reader) 1006 1007 def get_field_reader(pathname, partition): 1008 1009 """ 1010 Return a field dictionary reader using files under the given 'pathname' 1011 labelled according to the given 'partition'. 1012 """ 1013 1014 ff = open(join(pathname, "fields-%s" % partition), "rb") 1015 field_reader = FieldReader(ff) 1016 1017 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1018 field_index_reader = FieldIndexReader(fif) 1019 1020 return FieldDictionaryReader(field_reader, field_index_reader) 1021 1022 def rename_files(pathname, names, from_partition, to_partition): 1023 for name in names: 1024 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1025 1026 def rename_term_files(pathname, from_partition, to_partition): 1027 rename_files(pathname, ("terms", "terms_index", "positions"), from_partition, to_partition) 1028 1029 def rename_field_files(pathname, from_partition, to_partition): 1030 rename_files(pathname, ("fields", "fields_index"), from_partition, to_partition) 1031 1032 def remove_files(pathname, names, partition): 1033 for name in names: 1034 remove(join(pathname, "%s-%s" % (name, partition))) 1035 1036 def remove_term_files(pathname, partition): 1037 remove_files(pathname, ("terms", "terms_index", "positions"), partition) 1038 1039 def remove_field_files(pathname, partition): 1040 remove_files(pathname, ("fields", "fields_index"), partition) 1041 1042 # High-level classes. 1043 1044 class IndexWriter: 1045 1046 """ 1047 Building term information and writing it to the term and field dictionaries. 1048 """ 1049 1050 def __init__(self, pathname, interval, flush_interval): 1051 self.pathname = pathname 1052 self.interval = interval 1053 self.flush_interval = flush_interval 1054 1055 self.dict_partition = 0 1056 self.field_dict_partition = 0 1057 1058 self.terms = {} 1059 self.docs = {} 1060 1061 self.position_counter = 0 1062 self.field_counter = 0 1063 1064 def add_position(self, term, docnum, position): 1065 1066 """ 1067 Add a position entry for the given 'term' in the document with the given 1068 'docnum', indicating the given 'position'. 1069 """ 1070 1071 if not self.terms.has_key(term): 1072 doc_positions = self.terms[term] = {} 1073 else: 1074 doc_positions = self.terms[term] 1075 1076 if not doc_positions.has_key(docnum): 1077 doc = doc_positions[docnum] = [] 1078 else: 1079 doc = doc_positions[docnum] 1080 1081 doc.append(position) 1082 1083 self.position_counter += 1 1084 if self.flush_interval and self.position_counter >= self.flush_interval: 1085 self.flush_terms() 1086 self.position_counter = 0 1087 1088 def add_field(self, docnum, identifier, value): 1089 1090 """ 1091 Add for the document with the given 'docnum' a field having the given 1092 'identifier' and 'value'. 1093 """ 1094 1095 if not self.docs.has_key(docnum): 1096 doc_fields = self.docs[docnum] = [] 1097 else: 1098 doc_fields = self.docs[docnum] 1099 1100 doc_fields.append((identifier, value)) 1101 1102 self.field_counter += 1 1103 if self.flush_interval and self.field_counter >= self.flush_interval: 1104 self.flush_fields() 1105 self.field_counter = 0 1106 1107 def get_term_writer(self): 1108 1109 "Return a term dictionary writer for the current partition." 1110 1111 return get_term_writer(self.pathname, self.dict_partition, self.interval) 1112 1113 def get_field_writer(self): 1114 1115 "Return a field dictionary writer for the current partition." 1116 1117 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1118 1119 def flush_terms(self): 1120 1121 "Flush terms into the current term dictionary partition." 1122 1123 # Get the terms in order. 1124 1125 terms = self.terms.items() 1126 terms.sort() 1127 1128 dict_writer = self.get_term_writer() 1129 1130 for term, doc_positions in terms: 1131 doc_positions = doc_positions.items() 1132 dict_writer.write_term_positions(term, doc_positions) 1133 1134 dict_writer.close() 1135 1136 self.terms = {} 1137 self.dict_partition += 1 1138 1139 def flush_fields(self): 1140 1141 "Flush fields into the current term dictionary partition." 1142 1143 # Get the documents in order. 1144 1145 docs = self.docs.items() 1146 docs.sort() 1147 1148 field_dict_writer = self.get_field_writer() 1149 1150 for docnum, fields in docs: 1151 field_dict_writer.write_fields(docnum, fields) 1152 1153 field_dict_writer.close() 1154 1155 self.docs = {} 1156 self.field_dict_partition += 1 1157 1158 def close(self): 1159 if self.terms: 1160 self.flush_terms() 1161 if self.docs: 1162 self.flush_fields() 1163 1164 class IndexReader: 1165 1166 "Accessing the term and field dictionaries." 1167 1168 def __init__(self, pathname): 1169 self.dict_reader = get_term_reader(pathname, "merged") 1170 self.field_dict_reader = get_field_reader(pathname, "merged") 1171 1172 def find_positions(self, term): 1173 return self.dict_reader.find_positions(term) 1174 1175 def get_frequency(self, term): 1176 return self.dict_reader.get_frequency(term) 1177 1178 def get_fields(self, docnum): 1179 return self.field_dict_reader.get_fields(docnum) 1180 1181 def close(self): 1182 self.dict_reader.close() 1183 self.field_dict_reader.close() 1184 1185 class Index: 1186 1187 "An inverted index solution encapsulating the various components." 1188 1189 def __init__(self, pathname): 1190 self.pathname = pathname 1191 self.reader = None 1192 self.writer = None 1193 1194 def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL): 1195 1196 """ 1197 Return a writer, optionally using the given indexing 'interval' and 1198 'flush_interval'. 1199 """ 1200 1201 if not exists(self.pathname): 1202 mkdir(self.pathname) 1203 1204 self.writer = IndexWriter(self.pathname, interval, flush_interval) 1205 return self.writer 1206 1207 def get_reader(self, partition=0): 1208 1209 "Return a reader for the index." 1210 1211 # Ensure that only one partition exists. 1212 1213 self.merge_terms() 1214 self.merge_fields() 1215 1216 return self._get_reader(partition) 1217 1218 def _get_reader(self, partition): 1219 1220 "Return a reader for the index." 1221 1222 if not exists(self.pathname): 1223 raise OSError, "Index path %r does not exist." % self.pathname 1224 1225 self.reader = IndexReader(self.pathname) 1226 return self.reader 1227 1228 def merge_terms(self, interval=INTERVAL): 1229 1230 "Merge term dictionaries using the given indexing 'interval'." 1231 1232 readers = [] 1233 partitions = [] 1234 1235 for filename in listdir(self.pathname): 1236 if filename.startswith("terms-"): # 6 character prefix 1237 partition = filename[6:] 1238 readers.append(get_term_reader(self.pathname, partition)) 1239 partitions.append(partition) 1240 1241 # Write directly to a dictionary. 1242 1243 if len(readers) > 1: 1244 writer = get_term_writer(self.pathname, "merged", interval) 1245 merger = TermDictionaryMerger(writer, readers) 1246 merger.merge() 1247 merger.close() 1248 1249 # Remove old files. 1250 1251 for partition in partitions: 1252 remove_term_files(self.pathname, partition) 1253 1254 elif len(readers) == 1 and partitions[0] != "merged": 1255 rename_term_files(self.pathname, partitions[0], "merged") 1256 1257 def merge_fields(self, interval=INTERVAL): 1258 1259 "Merge field dictionaries using the given indexing 'interval'." 1260 1261 readers = [] 1262 partitions = [] 1263 1264 for filename in listdir(self.pathname): 1265 if filename.startswith("fields-"): # 7 character prefix 1266 partition = filename[7:] 1267 readers.append(get_field_reader(self.pathname, partition)) 1268 partitions.append(partition) 1269 1270 # Write directly to a dictionary. 1271 1272 if len(readers) > 1: 1273 writer = get_field_writer(self.pathname, "merged", interval) 1274 merger = FieldDictionaryMerger(writer, readers) 1275 merger.merge() 1276 merger.close() 1277 1278 # Remove old files. 1279 1280 for partition in partitions: 1281 remove_field_files(self.pathname, partition) 1282 1283 elif len(readers) == 1 and partitions[0] != "merged": 1284 rename_field_files(self.pathname, partitions[0], "merged") 1285 1286 def close(self): 1287 if self.reader is not None: 1288 self.reader.close() 1289 self.reader = None 1290 if self.writer is not None: 1291 self.writer.close() 1292 self.writer = None 1293 1294 # vim: tabstop=4 expandtab shiftwidth=4