1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010, 2011 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from itertools import islice 24 from os import mkdir # index discovery 25 from os.path import exists 26 27 # Constants. 28 29 TERM_INTERVAL = 100 30 DOCUMENT_INTERVAL = 100 31 FIELD_INTERVAL = 100 32 FLUSH_INTERVAL = 10000 33 POSITIONS_FLUSH_INTERVAL = 1000000 34 OPEN_PARTITIONS = 20 35 36 # High-level classes. 37 38 class Document: 39 40 "A container of document information." 41 42 def __init__(self, docnum, fields=None): 43 self.docnum = docnum 44 self.fields = fields or [] 45 self.terms = {} 46 self.field_dict = None 47 48 def add_position(self, term, position): 49 50 """ 51 Add a position entry for the given 'term', indicating the given 52 'position'. 53 """ 54 55 self.terms.setdefault(term, []).append(position) 56 57 def add_field(self, identifier, value): 58 59 "Add a field having the given 'identifier' and 'value'." 60 61 self.fields.append((identifier, unicode(value))) # convert to string 62 63 def set_fields(self, fields): 64 65 """ 66 Set the document's 'fields': a list of tuples each containing an integer 67 identifier and a string value. 68 """ 69 70 self.fields = fields 71 72 def _ensure_dict(self): 73 if self.field_dict is None: 74 self.field_dict = dict(self.fields) 75 76 def keys(self): 77 self._ensure_dict() 78 return self.field_dict.keys() 79 80 def __getitem__(self, key): 81 self._ensure_dict() 82 return self.field_dict[key] 83 84 class IndexWriter: 85 86 """ 87 Building term information and writing it to the term and field dictionaries. 88 """ 89 90 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval, positions_flush_interval): 91 self.pathname = pathname 92 self.interval = interval 93 self.doc_interval = doc_interval 94 self.field_interval = field_interval 95 self.flush_interval = flush_interval 96 self.positions_flush_interval = positions_flush_interval 97 98 self.dict_partition = get_next_partition(get_term_partitions(self.pathname)) 99 self.field_dict_partition = get_next_partition(get_field_partitions(self.pathname)) 100 101 self.terms = {} 102 self.docs = [] 103 104 self.doc_counter = 0 105 self.position_counter = 0 106 107 def add_document(self, doc): 108 109 """ 110 Add the given document 'doc', updating the document counter and flushing 111 terms and fields if appropriate. 112 """ 113 114 docnum = doc.docnum 115 116 for term, positions in doc.terms.items(): 117 self.terms.setdefault(term, {})[docnum] = positions 118 self.position_counter += len(positions) 119 120 self.docs.append((docnum, doc.fields)) 121 122 self.doc_counter += 1 123 124 if self.flush_interval and self.doc_counter >= self.flush_interval or \ 125 self.positions_flush_interval and self.position_counter >= self.positions_flush_interval: 126 127 self.flush_terms() 128 self.flush_fields() 129 self.doc_counter = 0 130 self.position_counter = 0 131 132 def get_term_writer(self): 133 134 "Return a term dictionary writer for the current partition." 135 136 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 137 138 def get_field_writer(self): 139 140 "Return a field dictionary writer for the current partition." 141 142 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 143 144 def flush_terms(self): 145 146 "Flush terms into the current term dictionary partition." 147 148 # Get the terms in order. 149 150 all_terms = self.terms 151 terms = all_terms.keys() 152 terms.sort() 153 154 dict_writer = self.get_term_writer() 155 156 for term in terms: 157 doc_positions = all_terms[term].items() 158 dict_writer.write_term_positions(term, doc_positions) 159 160 dict_writer.close() 161 162 self.terms = {} 163 self.dict_partition += 1 164 165 def flush_fields(self): 166 167 "Flush fields into the current term dictionary partition." 168 169 # Get the documents in order. 170 171 self.docs.sort() 172 173 field_dict_writer = self.get_field_writer() 174 175 for docnum, fields in self.docs: 176 field_dict_writer.write_fields(docnum, fields) 177 178 field_dict_writer.close() 179 180 self.docs = [] 181 self.field_dict_partition += 1 182 183 def close(self): 184 if self.terms or not get_term_partitions(self.pathname): 185 self.flush_terms() 186 if self.docs or not get_field_partitions(self.pathname): 187 self.flush_fields() 188 189 class IndexReader: 190 191 "Accessing the term and field dictionaries." 192 193 def __init__(self, pathname): 194 self.dict_reader = get_term_reader(pathname, "merged") 195 self.field_dict_reader = get_field_reader(pathname, "merged") 196 197 # Sequential access. 198 199 def read_term(self): 200 return self.dict_reader.read_term() 201 202 def go_to_term(self, term): 203 return self.dict_reader._get_term_and_positions(*self.dict_reader.go_to_term(term)) 204 205 # Query access. 206 207 def get_terms(self): 208 return self.dict_reader.get_terms() 209 210 def find_terms(self, term): 211 return self.dict_reader.find_terms(term) 212 213 def find_positions(self, term): 214 return self.dict_reader.find_positions(term) 215 216 def find_common_positions(self, terms): 217 return self.dict_reader.find_common_positions(terms) 218 219 def get_frequency(self, term): 220 return self.dict_reader.get_frequency(term) 221 222 def get_document_frequency(self, term): 223 return self.dict_reader.get_document_frequency(term) 224 225 def get_fields(self, docnum): 226 return self.field_dict_reader.get_fields(docnum) 227 228 def get_document(self, docnum): 229 return Document(docnum, self.get_fields(docnum)) 230 231 def close(self): 232 self.dict_reader.close() 233 self.field_dict_reader.close() 234 235 class Index: 236 237 "An inverted index solution encapsulating the various components." 238 239 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 240 flush_interval=FLUSH_INTERVAL, positions_flush_interval=POSITIONS_FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): 241 242 self.pathname = pathname 243 self.interval = interval 244 self.doc_interval = doc_interval 245 self.field_interval = field_interval 246 self.flush_interval = flush_interval 247 self.positions_flush_interval = positions_flush_interval 248 self.open_partitions = open_partitions 249 self.reader = None 250 self.writer = None 251 252 def get_writer(self): 253 254 "Return a writer." 255 256 self._ensure_directory() 257 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 258 self.field_interval, self.flush_interval, self.positions_flush_interval) 259 return self.writer 260 261 def _ensure_directory(self): 262 if not exists(self.pathname): 263 mkdir(self.pathname) 264 265 def get_reader(self, partition=0): 266 267 "Return a reader for the index." 268 269 # Ensure that only one partition exists. 270 271 self.merge() 272 return self._get_reader(partition) 273 274 def _get_reader(self, partition): 275 276 "Return a reader for the index." 277 278 if not exists(self.pathname): 279 raise OSError, "Index path %r does not exist." % self.pathname 280 281 self.reader = IndexReader(self.pathname) 282 return self.reader 283 284 def get_term_partitions(self): 285 286 "Return a set of term partition identifiers." 287 288 return get_term_partitions(self.pathname) 289 290 def get_field_partitions(self): 291 292 "Return a set of field partition identifiers." 293 294 return get_field_partitions(self.pathname) 295 296 def merge(self): 297 298 "Merge/optimise index partitions." 299 300 self._merge_terms() 301 self._merge_fields() 302 303 def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): 304 305 "Merge term or field dictionaries." 306 307 partitions = get_partitions() 308 309 # Ensure the correct labelling of a single partition. 310 311 if len(partitions) == 1: 312 partition = list(partitions)[0] 313 if partition != "merged": 314 rename_files(self.pathname, partition, "merged") 315 return 316 317 # Merge the partitions. 318 319 old_merged_counter = 0 320 321 while len(partitions) > 1: 322 323 if "merged" in partitions: 324 rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) 325 partitions.remove("merged") 326 partitions.add("old-merged-%d" % old_merged_counter) 327 old_merged_counter += 1 328 329 # Process only a certain number at once, avoiding resource limits. 330 331 active_partitions = list(islice(partitions, self.open_partitions)) 332 333 readers = [] 334 for partition in active_partitions: 335 readers.append(get_reader(self.pathname, partition)) 336 337 # Write directly to a dictionary. 338 339 writer = get_writer(self.pathname, "merged", *intervals) 340 merger = get_merger(writer, readers) 341 merger.merge() 342 merger.close() 343 344 # Remove old files. 345 346 for partition in active_partitions: 347 remove_files(self.pathname, partition) 348 349 # Acquire the partitions to check their number again. 350 351 partitions = get_partitions() 352 353 def _merge_terms(self): 354 355 "Merge term dictionaries." 356 357 self._merge_dictionaries(self.get_term_partitions, rename_term_files, 358 remove_term_files, get_term_reader, get_term_writer, 359 TermDictionaryMerger, [self.interval, self.doc_interval]) 360 361 def _merge_fields(self): 362 363 "Merge field dictionaries." 364 365 self._merge_dictionaries(self.get_field_partitions, rename_field_files, 366 remove_field_files, get_field_reader, get_field_writer, 367 FieldDictionaryMerger, [self.field_interval]) 368 369 def update(self, other_indexes): 370 371 "Copy the content of the 'other_indexes' into this index and merge." 372 373 self._ensure_directory() 374 375 for i, index in enumerate(other_indexes): 376 for partition in index.get_term_partitions(): 377 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 378 for partition in index.get_field_partitions(): 379 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 380 381 self.merge() 382 383 def close(self): 384 if self.reader is not None: 385 self.reader.close() 386 self.reader = None 387 if self.writer is not None: 388 self.writer.close() 389 self.writer = None 390 391 # vim: tabstop=4 expandtab shiftwidth=4