1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import listdir, mkdir # index and partition discovery 22 from os import remove, rename # partition manipulation 23 from os.path import exists, join 24 from os.path import commonprefix # to find common string prefixes 25 from bisect import bisect_right # to find terms in the dictionary index 26 from bisect import insort_right # to maintain a sorted list of data for merging 27 import bz2, zlib # for field compression 28 29 # Constants. 30 31 INTERVAL = 100 32 FLUSH_INTERVAL = 1000000 33 34 compressors = [("b", bz2.compress), ("z", zlib.compress)] 35 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 36 37 # Foundation classes. 38 39 class File: 40 41 "A basic file abstraction." 42 43 def __init__(self, f): 44 self.f = f 45 self.reset() 46 47 def reset(self): 48 49 "To be used to reset the state of the reader or writer between records." 50 51 pass 52 53 def rewind(self): 54 self.f.seek(0) 55 self.reset() 56 57 def close(self): 58 if self.f is not None: 59 self.f.close() 60 self.f = None 61 62 class FileWriter(File): 63 64 "Writing basic data types to files." 65 66 def write_number(self, number): 67 68 "Write 'number' to the file using a variable length encoding." 69 70 # Negative numbers are not supported. 71 72 if number < 0: 73 raise ValueError, "Number %r is negative." % number 74 75 # Special case: one byte containing zero. 76 77 elif number == 0: 78 self.f.write(chr(0)) 79 return 80 81 # Write the number from least to most significant digits. 82 83 bytes = [] 84 85 while number != 0: 86 lsd = number & 127 87 number = number >> 7 88 if number != 0: 89 lsd |= 128 90 bytes.append(chr(lsd)) 91 92 record = "".join(bytes) 93 self.f.write(record) 94 95 def write_string(self, s, compress=0): 96 97 """ 98 Write 's' to the file, recording its length and compressing the string 99 if 'compress' is set to a true value. 100 """ 101 102 # Convert Unicode objects to strings. 103 104 if isinstance(s, unicode): 105 s = s.encode("utf-8") 106 107 # Compress the string if requested. 108 109 if compress: 110 for flag, fn in compressors: 111 cs = fn(s) 112 113 # Take the first string shorter than the original. 114 115 if len(cs) < len(s): 116 s = cs 117 break 118 else: 119 flag = "-" 120 121 # Record whether compression was used. 122 123 self.f.write(flag) 124 125 # Write the length of the data before the data itself. 126 127 length = len(s) 128 self.write_number(length) 129 self.f.write(s) 130 131 class FileReader(File): 132 133 "Reading basic data types from files." 134 135 def read_number(self): 136 137 "Read a number from the file." 138 139 # Read each byte, adding it to the number. 140 141 shift = 0 142 number = 0 143 more = 1 144 145 while more: 146 byte = self.f.read(1) 147 if not byte: 148 raise EOFError 149 150 csd = ord(byte) 151 more = csd & 128 != 0 152 if more: 153 csd &= 127 154 number += (csd << shift) 155 shift += 7 156 157 return number 158 159 def read_string(self, decompress=0): 160 161 """ 162 Read a string from the file, decompressing the stored data if 163 'decompress' is set to a true value. 164 """ 165 166 # Decompress the data if requested. 167 168 if decompress: 169 flag = self.f.read(1) 170 else: 171 flag = "-" 172 173 length = self.read_number() 174 s = self.f.read(length) 175 176 # Perform decompression if applicable. 177 178 if flag != "-": 179 fn = decompressors[flag] 180 s = fn(s) 181 182 # Convert strings to Unicode objects. 183 184 return unicode(s, "utf-8") 185 186 # Specific classes for storing term and position information. 187 188 class PositionWriter(FileWriter): 189 190 "Writing position information to files." 191 192 def reset(self): 193 self.last_docnum = 0 194 195 def write_positions(self, docnum, positions): 196 197 "Write for the document 'docnum' the given 'positions'." 198 199 if docnum < self.last_docnum: 200 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 201 202 # Write the document number delta. 203 204 self.write_number(docnum - self.last_docnum) 205 206 # Write the number of positions. 207 208 self.write_number(len(positions)) 209 210 # Make sure that the positions are sorted. 211 212 positions.sort() 213 214 # Write the position deltas. 215 216 last = 0 217 for position in positions: 218 pos = position - last 219 self.write_number(pos) 220 last = position 221 222 self.last_docnum = docnum 223 224 def write_term_positions(self, doc_positions): 225 226 """ 227 Write all 'doc_positions' - a collection of tuples of the form (document 228 number, position list) - to the file, returning a tuple containing the 229 offset at which they were stored together with the frequency (number of 230 positions) for the term involved. 231 """ 232 233 # Reset the writer and record the current file offset. 234 235 self.reset() 236 offset = self.f.tell() 237 238 # Write the number of documents. 239 240 self.write_number(len(doc_positions)) 241 doc_positions.sort() 242 243 # Write the positions. 244 245 frequency = 0 246 247 for docnum, positions in doc_positions: 248 self.write_positions(docnum, positions) 249 frequency += len(positions) 250 251 return offset, frequency 252 253 class PositionReader(FileReader): 254 255 "Reading position information from files." 256 257 def reset(self): 258 self.last_docnum = 0 259 260 def read_positions(self): 261 262 "Read positions, returning a document number and a list of positions." 263 264 # Read the document number delta and add it to the last number. 265 266 self.last_docnum += self.read_number() 267 268 # Read the number of positions. 269 270 npositions = self.read_number() 271 272 # Read the position deltas, adding each previous position to get the 273 # appropriate collection of absolute positions. 274 275 i = 0 276 last = 0 277 positions = [] 278 279 while i < npositions: 280 last += self.read_number() 281 positions.append(last) 282 i += 1 283 284 return self.last_docnum, positions 285 286 def read_term_positions(self, offset): 287 288 """ 289 Read all positions from 'offset', seeking to that position in the file 290 before reading. 291 """ 292 293 self.reset() 294 self.f.seek(offset) 295 296 # Could duplicate the file handle using... 297 # fdopen(dup(self.f.fileno()), "rb") 298 299 return PositionIterator(self.f) 300 301 class PositionIterator(PositionReader): 302 303 "Iterating over document positions." 304 305 def __init__(self, f): 306 PositionReader.__init__(self, f) 307 308 # Read the number of documents. 309 310 self.ndocuments = self.read_number() 311 self.read_documents = 0 312 313 def __len__(self): 314 return self.ndocuments 315 316 def sort(self): 317 318 "Stored document positions are already sorted." 319 320 pass 321 322 def __iter__(self): 323 return self 324 325 def next(self): 326 327 "Read positions for a single document." 328 329 if self.read_documents < self.ndocuments: 330 self.read_documents += 1 331 return self.read_positions() 332 else: 333 raise StopIteration 334 335 class TermWriter(FileWriter): 336 337 "Writing term information to files." 338 339 def reset(self): 340 self.last_term = "" 341 self.last_offset = 0 342 343 def write_term(self, term, offset, frequency): 344 345 """ 346 Write the given 'term', its position file 'offset', and its 'frequency' 347 to the term information file. Return the offset after the term 348 information was written to the file. 349 """ 350 351 # Write the prefix length and term suffix. 352 353 common = len(commonprefix([self.last_term, term])) 354 suffix = term[common:] 355 356 self.write_number(common) 357 self.write_string(suffix) 358 359 # Write the offset delta. 360 361 self.write_number(offset - self.last_offset) 362 363 # Write the frequency. 364 365 self.write_number(frequency) 366 367 self.last_term = term 368 self.last_offset = offset 369 370 return self.f.tell() 371 372 class TermReader(FileReader): 373 374 "Reading term information from files." 375 376 def reset(self): 377 self.last_term = "" 378 self.last_offset = 0 379 380 def read_term(self): 381 382 """ 383 Read a term, its position file offset, and its frequency from the term 384 information file. 385 """ 386 387 # Read the prefix length and term suffix. 388 389 common = self.read_number() 390 suffix = self.read_string() 391 392 self.last_term = self.last_term[:common] + suffix 393 394 # Read the offset delta. 395 396 self.last_offset += self.read_number() 397 398 # Read the frequency. 399 400 frequency = self.read_number() 401 402 return self.last_term, self.last_offset, frequency 403 404 def go_to_term(self, term, offset, info_offset): 405 406 """ 407 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 408 permits the scanning for later terms from the specified term. 409 """ 410 411 self.f.seek(info_offset) 412 self.last_term = term 413 self.last_offset = offset 414 415 class TermIndexWriter(TermWriter): 416 417 "Writing term dictionary index details to files." 418 419 def reset(self): 420 TermWriter.reset(self) 421 self.last_info_offset = 0 422 423 def write_term(self, term, offset, frequency, info_offset): 424 425 """ 426 Write the given 'term', its position file 'offset', and its 'frequency' 427 to the term dictionary index file, along with the 'info_offset' in the 428 term information file. 429 """ 430 431 TermWriter.write_term(self, term, offset, frequency) 432 433 # Write the information file offset delta. 434 435 self.write_number(info_offset - self.last_info_offset) 436 self.last_info_offset = info_offset 437 438 class TermIndexReader(TermReader): 439 440 "Reading term dictionary index details from files." 441 442 def reset(self): 443 TermReader.reset(self) 444 self.last_info_offset = 0 445 446 def read_term(self): 447 448 """ 449 Read a term, its position file offset, its frequency, and its term 450 information file offset from the term dictionary index file. 451 """ 452 453 term, offset, frequency = TermReader.read_term(self) 454 455 # Read the offset delta. 456 457 self.last_info_offset += self.read_number() 458 459 return term, offset, frequency, self.last_info_offset 460 461 class TermDictionaryWriter: 462 463 "Writing term dictionaries." 464 465 def __init__(self, info_writer, index_writer, position_writer, interval): 466 self.info_writer = info_writer 467 self.index_writer = index_writer 468 self.position_writer = position_writer 469 self.interval = interval 470 self.entry = 0 471 472 def _write_term(self, term, offset, frequency): 473 474 """ 475 Write the given 'term', its position file 'offset', and its 'frequency' 476 to the term information file and optionally to the index, making a 477 dictionary entry. 478 """ 479 480 info_offset = self.info_writer.write_term(term, offset, frequency) 481 482 if self.entry % self.interval == 0: 483 self.index_writer.write_term(term, offset, frequency, info_offset) 484 485 self.entry += 1 486 487 def write_term_positions(self, term, doc_positions): 488 489 """ 490 Write the given 'term' and the 'doc_positions' recording the documents 491 and positions at which the term is found. 492 """ 493 494 offset, frequency = self.position_writer.write_term_positions(doc_positions) 495 self._write_term(term, offset, frequency) 496 497 def close(self): 498 self.info_writer.close() 499 self.index_writer.close() 500 self.position_writer.close() 501 502 class TermDictionaryReader: 503 504 "Reading term dictionaries." 505 506 def __init__(self, info_reader, index_reader, position_reader): 507 self.info_reader = info_reader 508 self.index_reader = index_reader 509 self.position_reader = position_reader 510 511 self.terms = [] 512 try: 513 while 1: 514 self.terms.append(self.index_reader.read_term()) 515 except EOFError: 516 pass 517 518 # Large numbers for ordering purposes. 519 520 self.max_offset = self.terms[-1][1] + 1 521 522 def _find_term(self, term): 523 524 """ 525 Find the position file offset and frequency of 'term' from the term 526 dictionary. 527 """ 528 529 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 530 531 # Get the entry position providing the term or one preceding it. 532 533 if i == -1: 534 return None 535 536 found_term, offset, frequency, info_offset = self.terms[i] 537 538 # Where the term is found immediately, return the offset. 539 540 if term == found_term: 541 return offset, frequency 542 543 # Otherwise, seek past the index term's entry in the information file 544 # and scan for the desired term. 545 546 else: 547 self.info_reader.go_to_term(found_term, offset, info_offset) 548 try: 549 while term > found_term: 550 found_term, offset, frequency = self.info_reader.read_term() 551 except EOFError: 552 pass 553 554 # If the term is found, return the offset and frequency. 555 556 if term == found_term: 557 return offset, frequency 558 else: 559 return None 560 561 def rewind(self): 562 self.info_reader.rewind() 563 564 def _get_positions(self, offset): 565 return self.position_reader.read_term_positions(offset) 566 567 def read_term(self): 568 569 """ 570 Return the next term, its frequency and the documents and positions at 571 which the term is found. 572 """ 573 574 term, offset, frequency = self.info_reader.read_term() 575 positions = self._get_positions(offset) 576 return term, frequency, positions 577 578 def find_positions(self, term): 579 580 "Return the documents and positions at which the given 'term' is found." 581 582 t = self._find_term(term) 583 if t is None: 584 return None 585 else: 586 offset, frequency = t 587 return self._get_positions(offset) 588 589 def get_frequency(self, term): 590 591 "Return the frequency of the given 'term'." 592 593 t = self._find_term(term) 594 if t is None: 595 return None 596 else: 597 offset, frequency = t 598 return frequency 599 600 def close(self): 601 self.info_reader.close() 602 self.index_reader.close() 603 self.position_reader.close() 604 605 # Specific classes for storing document information. 606 607 class FieldWriter(FileWriter): 608 609 "Writing field data to files." 610 611 def reset(self): 612 self.last_docnum = 0 613 614 def write_fields(self, docnum, fields): 615 616 """ 617 Write for the given 'docnum', a list of 'fields' (integer, string pairs 618 representing field identifiers and values respectively). 619 Return the offset at which the fields are stored. 620 """ 621 622 offset = self.f.tell() 623 624 # Write the document number delta. 625 626 self.write_number(docnum - self.last_docnum) 627 628 # Write the number of fields. 629 630 self.write_number(len(fields)) 631 632 # Write the fields themselves. 633 634 for i, field in fields: 635 self.write_number(i) 636 self.write_string(field, 1) # compress 637 638 self.last_docnum = docnum 639 return offset 640 641 class FieldReader(FileReader): 642 643 "Reading field data from files." 644 645 def reset(self): 646 self.last_docnum = 0 647 648 def read_fields(self): 649 650 """ 651 Read fields from the file, returning a tuple containing the document 652 number and a list of field (identifier, value) pairs. 653 """ 654 655 # Read the document number. 656 657 self.last_docnum += self.read_number() 658 659 # Read the number of fields. 660 661 nfields = self.read_number() 662 663 # Collect the fields. 664 665 fields = [] 666 i = 0 667 668 while i < nfields: 669 identifier = self.read_number() 670 value = self.read_string(1) # decompress 671 fields.append((identifier, value)) 672 i += 1 673 674 return self.last_docnum, fields 675 676 def read_document_fields(self, docnum, offset): 677 678 """ 679 Read fields for 'docnum' at the given 'offset'. This permits the 680 retrieval of details for the specified document, as well as scanning for 681 later documents. 682 """ 683 684 self.f.seek(offset) 685 bad_docnum, fields = self.read_fields() 686 self.last_docnum = docnum 687 return docnum, fields 688 689 class FieldIndexWriter(FileWriter): 690 691 "Writing field index details to files." 692 693 def reset(self): 694 self.last_docnum = 0 695 self.last_offset = 0 696 697 def write_document(self, docnum, offset): 698 699 """ 700 Write for the given 'docnum', the 'offset' at which the fields for the 701 document are stored in the fields file. 702 """ 703 704 # Write the document number and offset deltas. 705 706 self.write_number(docnum - self.last_docnum) 707 self.write_number(offset - self.last_offset) 708 709 self.last_docnum = docnum 710 self.last_offset = offset 711 712 class FieldIndexReader(FileReader): 713 714 "Reading field index details from files." 715 716 def reset(self): 717 self.last_docnum = 0 718 self.last_offset = 0 719 720 def read_document(self): 721 722 "Read a document number and field file offset." 723 724 # Read the document number delta and offset. 725 726 self.last_docnum += self.read_number() 727 self.last_offset += self.read_number() 728 729 return self.last_docnum, self.last_offset 730 731 class FieldDictionaryWriter: 732 733 "Writing field dictionary details." 734 735 def __init__(self, field_writer, field_index_writer, interval): 736 self.field_writer = field_writer 737 self.field_index_writer = field_index_writer 738 self.interval = interval 739 self.entry = 0 740 741 def write_fields(self, docnum, fields): 742 743 "Write details of the document with the given 'docnum' and 'fields'." 744 745 offset = self.field_writer.write_fields(docnum, fields) 746 747 if self.entry % self.interval == 0: 748 self.field_index_writer.write_document(docnum, offset) 749 750 self.entry += 1 751 752 def close(self): 753 self.field_writer.close() 754 self.field_index_writer.close() 755 756 class FieldDictionaryReader: 757 758 "Reading field dictionary details." 759 760 def __init__(self, field_reader, field_index_reader): 761 self.field_reader = field_reader 762 self.field_index_reader = field_index_reader 763 764 self.docs = [] 765 try: 766 while 1: 767 self.docs.append(self.field_index_reader.read_document()) 768 except EOFError: 769 pass 770 771 # Large numbers for ordering purposes. 772 773 self.max_offset = self.docs[-1][1] 774 775 def rewind(self): 776 self.field_reader.rewind() 777 778 def read_fields(self): 779 780 "Return the next document number and fields." 781 782 return self.field_reader.read_fields() 783 784 def get_fields(self, docnum): 785 786 "Read the fields of the document with the given 'docnum'." 787 788 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 789 790 # Get the entry position providing the term or one preceding it. 791 792 if i == -1: 793 return None 794 795 found_docnum, offset = self.docs[i] 796 797 # Read from the fields file. 798 799 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 800 801 # Scan for the document, if necessary. 802 803 try: 804 while docnum > found_docnum: 805 found_docnum, fields = self.field_reader.read_fields() 806 except EOFError: 807 pass 808 809 # If the document is found, return the fields. 810 811 if docnum == found_docnum: 812 return fields 813 else: 814 return None 815 816 def close(self): 817 self.field_reader.close() 818 self.field_index_reader.close() 819 820 # Dictionary merging classes. 821 822 class Merger: 823 824 "Merge files." 825 826 def __init__(self, writer, readers): 827 self.writer = writer 828 self.readers = readers 829 830 def close(self): 831 for reader in self.readers: 832 reader.close() 833 self.writer.close() 834 835 class TermDictionaryMerger(Merger): 836 837 "Merge term and position files." 838 839 def merge(self): 840 841 """ 842 Merge terms and positions from the readers, sending them to the writer. 843 """ 844 845 entries = [] 846 847 # Get the first entries from the readers. 848 849 for partition, reader in enumerate(self.readers): 850 reader.rewind() 851 852 try: 853 term, frequency, positions = reader.read_term() 854 insort_right(entries, (term, positions, partition)) 855 except EOFError: 856 pass 857 858 # While entries are available, write them out in order, merging where 859 # appropriate. 860 861 while entries: 862 term, doc_positions, partition = entries[0] 863 to_update = [partition] 864 865 nentries = len(entries) 866 i = 1 867 868 # Find other entries for the term. 869 870 while i < nentries: 871 other_term, other_doc_positions, other_partition = entries[i] 872 873 # For such entries, merge the positions. 874 875 if other_term == term: 876 doc_positions = self.merge_positions(doc_positions, other_doc_positions) 877 to_update.append(other_partition) 878 i += 1 879 else: 880 break 881 882 # Write the combined term details. 883 884 self.writer.write_term_positions(term, doc_positions) 885 886 # Update the entries from the affected readers. 887 888 del entries[:i] 889 890 for partition in to_update: 891 try: 892 term, frequency, positions = self.readers[partition].read_term() 893 insort_right(entries, (term, positions, partition)) 894 except EOFError: 895 pass 896 897 def merge_positions(self, doc_positions, other_doc_positions): 898 899 """ 900 Merge 'doc_positions' with 'other_doc_positions' so that common document 901 records contain positions from both collections. 902 """ 903 904 doc_position_dict = dict(doc_positions) 905 906 for docnum, positions in other_doc_positions: 907 if doc_position_dict.has_key(docnum): 908 doc_position_dict[docnum] += positions 909 else: 910 doc_position_dict[docnum] = positions 911 912 return doc_position_dict.items() 913 914 class FieldDictionaryMerger(Merger): 915 916 "Merge field files." 917 918 def merge(self): 919 920 """ 921 Merge fields from the readers, sending them to the writer. 922 """ 923 924 entries = [] 925 926 # Get the first entries from the readers. 927 928 for partition, reader in enumerate(self.readers): 929 reader.rewind() 930 931 try: 932 docnum, fields = reader.read_fields() 933 insort_right(entries, (docnum, fields, partition)) 934 except EOFError: 935 pass 936 937 # While entries are available, write them out in order, merging where 938 # appropriate. 939 940 while entries: 941 docnum, fields, partition = entries[0] 942 to_update = [partition] 943 944 nentries = len(entries) 945 i = 1 946 947 # Find other entries for the term. 948 949 while i < nentries: 950 other_docnum, other_fields, other_partition = entries[i] 951 952 # For such entries, merge the positions. 953 954 if other_docnum == docnum: 955 fields += other_fields 956 to_update.append(other_partition) 957 i += 1 958 else: 959 break 960 961 # Write the combined term details. 962 963 self.writer.write_fields(docnum, fields) 964 965 # Update the entries from the affected readers. 966 967 del entries[:i] 968 969 for partition in to_update: 970 try: 971 docnum, fields = self.readers[partition].read_fields() 972 insort_right(entries, (docnum, fields, partition)) 973 except EOFError: 974 pass 975 976 # Utility functions. 977 978 def get_term_writer(pathname, partition, interval): 979 980 """ 981 Return a term dictionary writer using files under the given 'pathname' 982 labelled according to the given 'partition', using the given indexing 983 'interval'. 984 """ 985 986 tdf = open(join(pathname, "terms-%s" % partition), "wb") 987 info_writer = TermWriter(tdf) 988 989 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 990 index_writer = TermIndexWriter(tdif) 991 992 tpf = open(join(pathname, "positions-%s" % partition), "wb") 993 positions_writer = PositionWriter(tpf) 994 995 return TermDictionaryWriter(info_writer, index_writer, positions_writer, interval) 996 997 def get_field_writer(pathname, partition, interval): 998 999 """ 1000 Return a field dictionary writer using files under the given 'pathname' 1001 labelled according to the given 'partition', using the given indexing 1002 'interval'. 1003 """ 1004 1005 ff = open(join(pathname, "fields-%s" % partition), "wb") 1006 field_writer = FieldWriter(ff) 1007 1008 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1009 field_index_writer = FieldIndexWriter(fif) 1010 1011 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1012 1013 def get_term_reader(pathname, partition): 1014 1015 """ 1016 Return a term dictionary reader using files under the given 'pathname' 1017 labelled according to the given 'partition'. 1018 """ 1019 1020 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1021 info_reader = TermReader(tdf) 1022 1023 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1024 index_reader = TermIndexReader(tdif) 1025 1026 tpf = open(join(pathname, "positions-%s" % partition), "rb") 1027 positions_reader = PositionReader(tpf) 1028 1029 return TermDictionaryReader(info_reader, index_reader, positions_reader) 1030 1031 def get_field_reader(pathname, partition): 1032 1033 """ 1034 Return a field dictionary reader using files under the given 'pathname' 1035 labelled according to the given 'partition'. 1036 """ 1037 1038 ff = open(join(pathname, "fields-%s" % partition), "rb") 1039 field_reader = FieldReader(ff) 1040 1041 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1042 field_index_reader = FieldIndexReader(fif) 1043 1044 return FieldDictionaryReader(field_reader, field_index_reader) 1045 1046 def rename_files(pathname, names, from_partition, to_partition): 1047 for name in names: 1048 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1049 1050 def rename_term_files(pathname, from_partition, to_partition): 1051 rename_files(pathname, ("terms", "terms_index", "positions"), from_partition, to_partition) 1052 1053 def rename_field_files(pathname, from_partition, to_partition): 1054 rename_files(pathname, ("fields", "fields_index"), from_partition, to_partition) 1055 1056 def remove_files(pathname, names, partition): 1057 for name in names: 1058 remove(join(pathname, "%s-%s" % (name, partition))) 1059 1060 def remove_term_files(pathname, partition): 1061 remove_files(pathname, ("terms", "terms_index", "positions"), partition) 1062 1063 def remove_field_files(pathname, partition): 1064 remove_files(pathname, ("fields", "fields_index"), partition) 1065 1066 # High-level classes. 1067 1068 class IndexWriter: 1069 1070 """ 1071 Building term information and writing it to the term and field dictionaries. 1072 """ 1073 1074 def __init__(self, pathname, interval, flush_interval): 1075 self.pathname = pathname 1076 self.interval = interval 1077 self.flush_interval = flush_interval 1078 1079 self.dict_partition = 0 1080 self.field_dict_partition = 0 1081 1082 self.terms = {} 1083 self.docs = {} 1084 1085 self.position_counter = 0 1086 self.field_counter = 0 1087 1088 def add_position(self, term, docnum, position): 1089 1090 """ 1091 Add a position entry for the given 'term' in the document with the given 1092 'docnum', indicating the given 'position'. 1093 """ 1094 1095 if not self.terms.has_key(term): 1096 doc_positions = self.terms[term] = {} 1097 else: 1098 doc_positions = self.terms[term] 1099 1100 if not doc_positions.has_key(docnum): 1101 doc = doc_positions[docnum] = [] 1102 else: 1103 doc = doc_positions[docnum] 1104 1105 doc.append(position) 1106 1107 self.position_counter += 1 1108 if self.flush_interval and self.position_counter >= self.flush_interval: 1109 self.flush_terms() 1110 self.position_counter = 0 1111 1112 def add_field(self, docnum, identifier, value): 1113 1114 """ 1115 Add for the document with the given 'docnum' a field having the given 1116 'identifier' and 'value'. 1117 """ 1118 1119 if not self.docs.has_key(docnum): 1120 doc_fields = self.docs[docnum] = [] 1121 else: 1122 doc_fields = self.docs[docnum] 1123 1124 doc_fields.append((identifier, value)) 1125 1126 self.field_counter += 1 1127 if self.flush_interval and self.field_counter >= self.flush_interval: 1128 self.flush_fields() 1129 self.field_counter = 0 1130 1131 def get_term_writer(self): 1132 1133 "Return a term dictionary writer for the current partition." 1134 1135 return get_term_writer(self.pathname, self.dict_partition, self.interval) 1136 1137 def get_field_writer(self): 1138 1139 "Return a field dictionary writer for the current partition." 1140 1141 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1142 1143 def flush_terms(self): 1144 1145 "Flush terms into the current term dictionary partition." 1146 1147 # Get the terms in order. 1148 1149 terms = self.terms.items() 1150 terms.sort() 1151 1152 dict_writer = self.get_term_writer() 1153 1154 for term, doc_positions in terms: 1155 doc_positions = doc_positions.items() 1156 dict_writer.write_term_positions(term, doc_positions) 1157 1158 dict_writer.close() 1159 1160 self.terms = {} 1161 self.dict_partition += 1 1162 1163 def flush_fields(self): 1164 1165 "Flush fields into the current term dictionary partition." 1166 1167 # Get the documents in order. 1168 1169 docs = self.docs.items() 1170 docs.sort() 1171 1172 field_dict_writer = self.get_field_writer() 1173 1174 for docnum, fields in docs: 1175 field_dict_writer.write_fields(docnum, fields) 1176 1177 field_dict_writer.close() 1178 1179 self.docs = {} 1180 self.field_dict_partition += 1 1181 1182 def close(self): 1183 if self.terms: 1184 self.flush_terms() 1185 if self.docs: 1186 self.flush_fields() 1187 1188 class IndexReader: 1189 1190 "Accessing the term and field dictionaries." 1191 1192 def __init__(self, pathname): 1193 self.dict_reader = get_term_reader(pathname, "merged") 1194 self.field_dict_reader = get_field_reader(pathname, "merged") 1195 1196 def find_positions(self, term): 1197 return self.dict_reader.find_positions(term) 1198 1199 def get_frequency(self, term): 1200 return self.dict_reader.get_frequency(term) 1201 1202 def get_fields(self, docnum): 1203 return self.field_dict_reader.get_fields(docnum) 1204 1205 def close(self): 1206 self.dict_reader.close() 1207 self.field_dict_reader.close() 1208 1209 class Index: 1210 1211 "An inverted index solution encapsulating the various components." 1212 1213 def __init__(self, pathname): 1214 self.pathname = pathname 1215 self.reader = None 1216 self.writer = None 1217 1218 def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL): 1219 1220 """ 1221 Return a writer, optionally using the given indexing 'interval' and 1222 'flush_interval'. 1223 """ 1224 1225 if not exists(self.pathname): 1226 mkdir(self.pathname) 1227 1228 self.writer = IndexWriter(self.pathname, interval, flush_interval) 1229 return self.writer 1230 1231 def get_reader(self, partition=0): 1232 1233 "Return a reader for the index." 1234 1235 # Ensure that only one partition exists. 1236 1237 self.merge_terms() 1238 self.merge_fields() 1239 1240 return self._get_reader(partition) 1241 1242 def _get_reader(self, partition): 1243 1244 "Return a reader for the index." 1245 1246 if not exists(self.pathname): 1247 raise OSError, "Index path %r does not exist." % self.pathname 1248 1249 self.reader = IndexReader(self.pathname) 1250 return self.reader 1251 1252 def merge_terms(self, interval=INTERVAL): 1253 1254 "Merge term dictionaries using the given indexing 'interval'." 1255 1256 readers = [] 1257 partitions = [] 1258 1259 for filename in listdir(self.pathname): 1260 if filename.startswith("terms-"): # 6 character prefix 1261 partition = filename[6:] 1262 readers.append(get_term_reader(self.pathname, partition)) 1263 partitions.append(partition) 1264 1265 # Write directly to a dictionary. 1266 1267 if len(readers) > 1: 1268 writer = get_term_writer(self.pathname, "merged", interval) 1269 merger = TermDictionaryMerger(writer, readers) 1270 merger.merge() 1271 merger.close() 1272 1273 # Remove old files. 1274 1275 for partition in partitions: 1276 remove_term_files(self.pathname, partition) 1277 1278 elif len(readers) == 1 and partitions[0] != "merged": 1279 rename_term_files(self.pathname, partitions[0], "merged") 1280 1281 def merge_fields(self, interval=INTERVAL): 1282 1283 "Merge field dictionaries using the given indexing 'interval'." 1284 1285 readers = [] 1286 partitions = [] 1287 1288 for filename in listdir(self.pathname): 1289 if filename.startswith("fields-"): # 7 character prefix 1290 partition = filename[7:] 1291 readers.append(get_field_reader(self.pathname, partition)) 1292 partitions.append(partition) 1293 1294 # Write directly to a dictionary. 1295 1296 if len(readers) > 1: 1297 writer = get_field_writer(self.pathname, "merged", interval) 1298 merger = FieldDictionaryMerger(writer, readers) 1299 merger.merge() 1300 merger.close() 1301 1302 # Remove old files. 1303 1304 for partition in partitions: 1305 remove_field_files(self.pathname, partition) 1306 1307 elif len(readers) == 1 and partitions[0] != "merged": 1308 rename_field_files(self.pathname, partitions[0], "merged") 1309 1310 def close(self): 1311 if self.reader is not None: 1312 self.reader.close() 1313 self.reader = None 1314 if self.writer is not None: 1315 self.writer.close() 1316 self.writer = None 1317 1318 # vim: tabstop=4 expandtab shiftwidth=4