1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010, 2011 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from itertools import islice 24 from os import mkdir # index discovery 25 from os.path import exists 26 27 # Constants. 28 29 TERM_INTERVAL = 100 30 DOCUMENT_INTERVAL = 100 31 FIELD_INTERVAL = 100 32 FLUSH_INTERVAL = 10000 33 POSITIONS_FLUSH_INTERVAL = 1000000 34 OPEN_PARTITIONS = 20 35 36 # High-level classes. 37 38 class Document: 39 40 "A container of document information." 41 42 def __init__(self, docnum, fields=None): 43 self.docnum = docnum 44 self.fields = fields or [] 45 self.terms = {} 46 self.field_dict = None 47 48 def add_position(self, term, position): 49 50 """ 51 Add a position entry for the given 'term', indicating the given 52 'position'. 53 """ 54 55 self.terms.setdefault(term, []).append(position) 56 57 def add_field(self, identifier, value): 58 59 "Add a field having the given 'identifier' and 'value'." 60 61 self.fields.append((identifier, unicode(value))) # convert to string 62 63 def set_fields(self, fields): 64 65 """ 66 Set the document's 'fields': a list of tuples each containing an integer 67 identifier and a string value. 68 """ 69 70 self.fields = fields 71 72 def _ensure_dict(self): 73 if self.field_dict is None: 74 self.field_dict = dict(self.fields) 75 76 def keys(self): 77 self._ensure_dict() 78 return self.field_dict.keys() 79 80 def __getitem__(self, key): 81 self._ensure_dict() 82 return self.field_dict[key] 83 84 class IndexWriter: 85 86 """ 87 Building term information and writing it to the term and field dictionaries. 88 """ 89 90 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval, positions_flush_interval): 91 self.pathname = pathname 92 self.interval = interval 93 self.doc_interval = doc_interval 94 self.field_interval = field_interval 95 self.flush_interval = flush_interval 96 self.positions_flush_interval = positions_flush_interval 97 98 self.dict_partition = get_next_partition(get_term_partitions(self.pathname)) 99 self.field_dict_partition = get_next_partition(get_field_partitions(self.pathname)) 100 101 self.terms = {} 102 self.docs = [] 103 104 self.doc_counter = 0 105 self.position_counter = 0 106 107 def add_document(self, doc): 108 109 """ 110 Add the given document 'doc', updating the document counter and flushing 111 terms and fields if appropriate. 112 """ 113 114 docnum = doc.docnum 115 116 for term, positions in doc.terms.items(): 117 self.terms.setdefault(term, {})[docnum] = positions 118 self.position_counter += len(positions) 119 120 self.docs.append((docnum, doc.fields)) 121 122 self.doc_counter += 1 123 124 if self.flush_interval and self.doc_counter >= self.flush_interval or \ 125 self.positions_flush_interval and self.position_counter >= self.positions_flush_interval: 126 127 self.flush_terms() 128 self.flush_fields() 129 self.doc_counter = 0 130 self.position_counter = 0 131 132 def get_term_writer(self): 133 134 "Return a term dictionary writer for the current partition." 135 136 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 137 138 def get_field_writer(self): 139 140 "Return a field dictionary writer for the current partition." 141 142 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 143 144 def flush_terms(self): 145 146 "Flush terms into the current term dictionary partition." 147 148 # Get the terms in order. 149 150 all_terms = self.terms 151 terms = all_terms.keys() 152 terms.sort() 153 154 dict_writer = self.get_term_writer() 155 156 for term in terms: 157 doc_positions = all_terms[term].items() 158 dict_writer.write_term_positions(term, doc_positions) 159 160 dict_writer.close() 161 162 self.terms = {} 163 self.dict_partition += 1 164 165 def flush_fields(self): 166 167 "Flush fields into the current term dictionary partition." 168 169 # Get the documents in order. 170 171 self.docs.sort() 172 173 field_dict_writer = self.get_field_writer() 174 for docnum, fields in self.docs: 175 field_dict_writer.write_fields(docnum, fields) 176 field_dict_writer.close() 177 178 self.docs = [] 179 self.field_dict_partition += 1 180 181 def close(self): 182 if self.terms or not get_term_partitions(self.pathname): 183 self.flush_terms() 184 if self.docs or not get_field_partitions(self.pathname): 185 self.flush_fields() 186 187 class IndexReader: 188 189 "Accessing the term and field dictionaries." 190 191 def __init__(self, pathname): 192 self.dict_reader = get_term_reader(pathname, "merged") 193 self.field_dict_reader = get_field_reader(pathname, "merged") 194 195 # Sequential access. 196 197 def read_term(self): 198 return self.dict_reader.read_term() 199 200 def go_to_term(self, term): 201 return self.dict_reader._get_term_and_positions(*self.dict_reader.go_to_term(term)) 202 203 # Query access. 204 205 def get_terms(self): 206 return self.dict_reader.get_terms() 207 208 def find_terms(self, term): 209 return self.dict_reader.find_terms(term) 210 211 def find_positions(self, term): 212 return self.dict_reader.find_positions(term) 213 214 def find_common_positions(self, terms): 215 return self.dict_reader.find_common_positions(terms) 216 217 def get_frequency(self, term): 218 return self.dict_reader.get_frequency(term) 219 220 def get_document_frequency(self, term): 221 return self.dict_reader.get_document_frequency(term) 222 223 def get_fields(self, docnum): 224 return self.field_dict_reader.get_fields(docnum) 225 226 def get_document(self, docnum): 227 return Document(docnum, self.get_fields(docnum)) 228 229 def close(self): 230 self.dict_reader.close() 231 self.field_dict_reader.close() 232 233 class Index: 234 235 "An inverted index solution encapsulating the various components." 236 237 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 238 flush_interval=FLUSH_INTERVAL, positions_flush_interval=POSITIONS_FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): 239 240 self.pathname = pathname 241 self.interval = interval 242 self.doc_interval = doc_interval 243 self.field_interval = field_interval 244 self.flush_interval = flush_interval 245 self.positions_flush_interval = positions_flush_interval 246 self.open_partitions = open_partitions 247 self.reader = None 248 self.writer = None 249 250 def get_writer(self): 251 252 "Return a writer." 253 254 self._ensure_directory() 255 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 256 self.field_interval, self.flush_interval, self.positions_flush_interval) 257 return self.writer 258 259 def _ensure_directory(self): 260 if not exists(self.pathname): 261 mkdir(self.pathname) 262 263 def get_reader(self, partition=0): 264 265 "Return a reader for the index." 266 267 # Ensure that only one partition exists. 268 269 self.merge() 270 return self._get_reader(partition) 271 272 def _get_reader(self, partition): 273 274 "Return a reader for the index." 275 276 if not exists(self.pathname): 277 raise OSError, "Index path %r does not exist." % self.pathname 278 279 self.reader = IndexReader(self.pathname) 280 return self.reader 281 282 def get_term_partitions(self): 283 284 "Return a set of term partition identifiers." 285 286 return get_term_partitions(self.pathname) 287 288 def get_field_partitions(self): 289 290 "Return a set of field partition identifiers." 291 292 return get_field_partitions(self.pathname) 293 294 def merge(self): 295 296 "Merge/optimise index partitions." 297 298 self._merge_terms() 299 self._merge_fields() 300 301 def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): 302 303 "Merge term or field dictionaries." 304 305 partitions = get_partitions() 306 307 # Ensure the correct labelling of a single partition. 308 309 if len(partitions) == 1: 310 partition = list(partitions)[0] 311 if partition != "merged": 312 rename_files(self.pathname, partition, "merged") 313 return 314 315 # Merge the partitions. 316 317 old_merged_counter = 0 318 319 while len(partitions) > 1: 320 321 if "merged" in partitions: 322 rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) 323 partitions.remove("merged") 324 partitions.add("old-merged-%d" % old_merged_counter) 325 old_merged_counter += 1 326 327 # Process only a certain number at once, avoiding resource limits. 328 329 active_partitions = list(islice(partitions, self.open_partitions)) 330 331 readers = [] 332 for partition in active_partitions: 333 readers.append(get_reader(self.pathname, partition)) 334 335 # Write directly to a dictionary. 336 337 writer = get_writer(self.pathname, "merged", *intervals) 338 merger = get_merger(writer, readers) 339 merger.merge() 340 merger.close() 341 342 # Remove old files. 343 344 for partition in active_partitions: 345 remove_files(self.pathname, partition) 346 347 # Acquire the partitions to check their number again. 348 349 partitions = get_partitions() 350 351 def _merge_terms(self): 352 353 "Merge term dictionaries." 354 355 self._merge_dictionaries(self.get_term_partitions, rename_term_files, 356 remove_term_files, get_term_reader, get_term_writer, 357 TermDictionaryMerger, [self.interval, self.doc_interval]) 358 359 def _merge_fields(self): 360 361 "Merge field dictionaries." 362 363 self._merge_dictionaries(self.get_field_partitions, rename_field_files, 364 remove_field_files, get_field_reader, get_field_writer, 365 FieldDictionaryMerger, [self.field_interval]) 366 367 def update(self, other_indexes): 368 369 "Copy the content of the 'other_indexes' into this index and merge." 370 371 self._ensure_directory() 372 373 for i, index in enumerate(other_indexes): 374 for partition in index.get_term_partitions(): 375 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 376 for partition in index.get_field_partitions(): 377 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 378 379 self.merge() 380 381 def close(self): 382 if self.reader is not None: 383 self.reader.close() 384 self.reader = None 385 if self.writer is not None: 386 self.writer.close() 387 self.writer = None 388 389 # vim: tabstop=4 expandtab shiftwidth=4