1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from itertools import islice 24 from os import mkdir # index discovery 25 from os.path import exists 26 27 # Constants. 28 29 TERM_INTERVAL = 100 30 DOCUMENT_INTERVAL = 100 31 FIELD_INTERVAL = 100 32 FLUSH_INTERVAL = 10000 33 OPEN_PARTITIONS = 20 34 35 # High-level classes. 36 37 class Document: 38 39 "A container of document information." 40 41 def __init__(self, docnum, fields=None): 42 self.docnum = docnum 43 self.fields = fields or [] 44 self.terms = {} 45 self.field_dict = None 46 47 def add_position(self, term, position): 48 49 """ 50 Add a position entry for the given 'term', indicating the given 51 'position'. 52 """ 53 54 self.terms.setdefault(term, []).append(position) 55 56 def add_field(self, identifier, value): 57 58 "Add a field having the given 'identifier' and 'value'." 59 60 self.fields.append((identifier, unicode(value))) # convert to string 61 62 def set_fields(self, fields): 63 64 """ 65 Set the document's 'fields': a list of tuples each containing an integer 66 identifier and a string value. 67 """ 68 69 self.fields = fields 70 71 def _ensure_dict(self): 72 if self.field_dict is None: 73 self.field_dict = dict(self.fields) 74 75 def keys(self): 76 self._ensure_dict() 77 return self.field_dict.keys() 78 79 def __getitem__(self, key): 80 self._ensure_dict() 81 return self.field_dict[key] 82 83 class IndexWriter: 84 85 """ 86 Building term information and writing it to the term and field dictionaries. 87 """ 88 89 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval): 90 self.pathname = pathname 91 self.interval = interval 92 self.doc_interval = doc_interval 93 self.field_interval = field_interval 94 self.flush_interval = flush_interval 95 96 self.dict_partition = get_next_partition(get_term_partitions(self.pathname)) 97 self.field_dict_partition = get_next_partition(get_field_partitions(self.pathname)) 98 99 self.terms = {} 100 self.docs = [] 101 102 self.doc_counter = 0 103 104 def add_document(self, doc): 105 106 """ 107 Add the given document 'doc', updating the document counter and flushing 108 terms and fields if appropriate. 109 """ 110 111 for term, positions in doc.terms.items(): 112 self.terms.setdefault(term, {})[doc.docnum] = positions 113 114 self.docs.append((doc.docnum, doc.fields)) 115 116 self.doc_counter += 1 117 if self.flush_interval and self.doc_counter >= self.flush_interval: 118 self.flush_terms() 119 self.flush_fields() 120 self.doc_counter = 0 121 122 def get_term_writer(self): 123 124 "Return a term dictionary writer for the current partition." 125 126 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 127 128 def get_field_writer(self): 129 130 "Return a field dictionary writer for the current partition." 131 132 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 133 134 def flush_terms(self): 135 136 "Flush terms into the current term dictionary partition." 137 138 # Get the terms in order. 139 140 all_terms = self.terms 141 terms = all_terms.keys() 142 terms.sort() 143 144 dict_writer = self.get_term_writer() 145 146 for term in terms: 147 doc_positions = all_terms[term].items() 148 dict_writer.write_term_positions(term, doc_positions) 149 150 dict_writer.close() 151 152 self.terms = {} 153 self.dict_partition += 1 154 155 def flush_fields(self): 156 157 "Flush fields into the current term dictionary partition." 158 159 # Get the documents in order. 160 161 self.docs.sort() 162 163 field_dict_writer = self.get_field_writer() 164 165 for docnum, fields in self.docs: 166 field_dict_writer.write_fields(docnum, fields) 167 168 field_dict_writer.close() 169 170 self.docs = [] 171 self.field_dict_partition += 1 172 173 def close(self): 174 if self.terms or not get_term_partitions(self.pathname): 175 self.flush_terms() 176 if self.docs or not get_field_partitions(self.pathname): 177 self.flush_fields() 178 179 class IndexReader: 180 181 "Accessing the term and field dictionaries." 182 183 def __init__(self, pathname): 184 self.dict_reader = get_term_reader(pathname, "merged") 185 self.field_dict_reader = get_field_reader(pathname, "merged") 186 187 # Sequential access. 188 189 def read_term(self): 190 return self.dict_reader.read_term() 191 192 def go_to_term(self, term): 193 return self.dict_reader._get_term_and_positions(*self.dict_reader.go_to_term(term)) 194 195 # Query access. 196 197 def get_terms(self): 198 return self.dict_reader.get_terms() 199 200 def find_terms(self, term): 201 return self.dict_reader.find_terms(term) 202 203 def find_positions(self, term): 204 return self.dict_reader.find_positions(term) 205 206 def find_common_positions(self, terms): 207 return self.dict_reader.find_common_positions(terms) 208 209 def get_frequency(self, term): 210 return self.dict_reader.get_frequency(term) 211 212 def get_document_frequency(self, term): 213 return self.dict_reader.get_document_frequency(term) 214 215 def get_fields(self, docnum): 216 return self.field_dict_reader.get_fields(docnum) 217 218 def get_document(self, docnum): 219 return Document(docnum, self.get_fields(docnum)) 220 221 def close(self): 222 self.dict_reader.close() 223 self.field_dict_reader.close() 224 225 class Index: 226 227 "An inverted index solution encapsulating the various components." 228 229 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 230 flush_interval=FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): 231 232 self.pathname = pathname 233 self.interval = interval 234 self.doc_interval = doc_interval 235 self.field_interval = field_interval 236 self.flush_interval = flush_interval 237 self.open_partitions = open_partitions 238 self.reader = None 239 self.writer = None 240 241 def get_writer(self): 242 243 "Return a writer." 244 245 self._ensure_directory() 246 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 247 self.field_interval, self.flush_interval) 248 return self.writer 249 250 def _ensure_directory(self): 251 if not exists(self.pathname): 252 mkdir(self.pathname) 253 254 def get_reader(self, partition=0): 255 256 "Return a reader for the index." 257 258 # Ensure that only one partition exists. 259 260 self.merge() 261 return self._get_reader(partition) 262 263 def _get_reader(self, partition): 264 265 "Return a reader for the index." 266 267 if not exists(self.pathname): 268 raise OSError, "Index path %r does not exist." % self.pathname 269 270 self.reader = IndexReader(self.pathname) 271 return self.reader 272 273 def get_term_partitions(self): 274 275 "Return a set of term partition identifiers." 276 277 return get_term_partitions(self.pathname) 278 279 def get_field_partitions(self): 280 281 "Return a set of field partition identifiers." 282 283 return get_field_partitions(self.pathname) 284 285 def merge(self): 286 287 "Merge/optimise index partitions." 288 289 self._merge_terms() 290 self._merge_fields() 291 292 def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): 293 294 "Merge term or field dictionaries." 295 296 partitions = get_partitions() 297 298 # Ensure the correct labelling of a single partition. 299 300 if len(partitions) == 1: 301 partition = list(partitions)[0] 302 if partition != "merged": 303 rename_files(self.pathname, partition, "merged") 304 return 305 306 # Merge the partitions. 307 308 old_merged_counter = 0 309 310 while len(partitions) > 1: 311 312 if "merged" in partitions: 313 rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) 314 partitions.remove("merged") 315 partitions.add("old-merged-%d" % old_merged_counter) 316 old_merged_counter += 1 317 318 # Process only a certain number at once, avoiding resource limits. 319 320 active_partitions = list(islice(partitions, self.open_partitions)) 321 322 readers = [] 323 for partition in active_partitions: 324 readers.append(get_reader(self.pathname, partition)) 325 326 # Write directly to a dictionary. 327 328 writer = get_writer(self.pathname, "merged", *intervals) 329 merger = get_merger(writer, readers) 330 merger.merge() 331 merger.close() 332 333 # Remove old files. 334 335 for partition in active_partitions: 336 remove_files(self.pathname, partition) 337 338 # Acquire the partitions to check their number again. 339 340 partitions = get_partitions() 341 342 def _merge_terms(self): 343 344 "Merge term dictionaries." 345 346 self._merge_dictionaries(self.get_term_partitions, rename_term_files, 347 remove_term_files, get_term_reader, get_term_writer, 348 TermDictionaryMerger, [self.interval, self.doc_interval]) 349 350 def _merge_fields(self): 351 352 "Merge field dictionaries." 353 354 self._merge_dictionaries(self.get_field_partitions, rename_field_files, 355 remove_field_files, get_field_reader, get_field_writer, 356 FieldDictionaryMerger, [self.field_interval]) 357 358 def update(self, other_indexes): 359 360 "Copy the content of the 'other_indexes' into this index and merge." 361 362 self._ensure_directory() 363 364 for i, index in enumerate(other_indexes): 365 for partition in index.get_term_partitions(): 366 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 367 for partition in index.get_field_partitions(): 368 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 369 370 self.merge() 371 372 def close(self): 373 if self.reader is not None: 374 self.reader.close() 375 self.reader = None 376 if self.writer is not None: 377 self.writer.close() 378 self.writer = None 379 380 # vim: tabstop=4 expandtab shiftwidth=4