1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from itertools import islice 24 from os import mkdir # index discovery 25 from os.path import exists 26 27 # Constants. 28 29 TERM_INTERVAL = 100 30 DOCUMENT_INTERVAL = 100 31 FIELD_INTERVAL = 100 32 FLUSH_INTERVAL = 10000 33 OPEN_PARTITIONS = 20 34 35 # High-level classes. 36 37 class Document: 38 39 "A container of document information." 40 41 def __init__(self, docnum, fields=None): 42 self.docnum = docnum 43 self.fields = fields or [] 44 self.terms = {} 45 self.field_dict = None 46 47 def add_position(self, term, position): 48 49 """ 50 Add a position entry for the given 'term', indicating the given 51 'position'. 52 """ 53 54 self.terms.setdefault(term, []).append(position) 55 56 def add_field(self, identifier, value): 57 58 "Add a field having the given 'identifier' and 'value'." 59 60 self.fields.append((identifier, unicode(value))) # convert to string 61 62 def set_fields(self, fields): 63 64 """ 65 Set the document's 'fields': a list of tuples each containing an integer 66 identifier and a string value. 67 """ 68 69 self.fields = fields 70 71 def _ensure_dict(self): 72 if self.field_dict is None: 73 self.field_dict = dict(self.fields) 74 75 def keys(self): 76 self._ensure_dict() 77 return self.field_dict.keys() 78 79 def __getitem__(self, key): 80 self._ensure_dict() 81 return self.field_dict[key] 82 83 class IndexWriter: 84 85 """ 86 Building term information and writing it to the term and field dictionaries. 87 """ 88 89 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval): 90 self.pathname = pathname 91 self.interval = interval 92 self.doc_interval = doc_interval 93 self.field_interval = field_interval 94 self.flush_interval = flush_interval 95 96 self.dict_partition = get_next_partition(get_term_partitions(self.pathname)) 97 self.field_dict_partition = get_next_partition(get_field_partitions(self.pathname)) 98 99 self.terms = {} 100 self.docs = [] 101 102 self.doc_counter = 0 103 104 def add_document(self, doc): 105 106 """ 107 Add the given document 'doc', updating the document counter and flushing 108 terms and fields if appropriate. 109 """ 110 111 for term, positions in doc.terms.items(): 112 self.terms.setdefault(term, {})[doc.docnum] = positions 113 114 self.docs.append((doc.docnum, doc.fields)) 115 116 self.doc_counter += 1 117 if self.flush_interval and self.doc_counter >= self.flush_interval: 118 self.flush_terms() 119 self.flush_fields() 120 self.doc_counter = 0 121 122 def get_term_writer(self): 123 124 "Return a term dictionary writer for the current partition." 125 126 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 127 128 def get_field_writer(self): 129 130 "Return a field dictionary writer for the current partition." 131 132 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 133 134 def flush_terms(self): 135 136 "Flush terms into the current term dictionary partition." 137 138 # Get the terms in order. 139 140 all_terms = self.terms 141 terms = all_terms.keys() 142 terms.sort() 143 144 dict_writer = self.get_term_writer() 145 146 for term in terms: 147 doc_positions = all_terms[term].items() 148 dict_writer.write_term_positions(term, doc_positions) 149 150 dict_writer.close() 151 152 self.terms = {} 153 self.dict_partition += 1 154 155 def flush_fields(self): 156 157 "Flush fields into the current term dictionary partition." 158 159 # Get the documents in order. 160 161 self.docs.sort() 162 163 field_dict_writer = self.get_field_writer() 164 165 for docnum, fields in self.docs: 166 field_dict_writer.write_fields(docnum, fields) 167 168 field_dict_writer.close() 169 170 self.docs = [] 171 self.field_dict_partition += 1 172 173 def close(self): 174 if self.terms or not get_term_partitions(self.pathname): 175 self.flush_terms() 176 if self.docs or not get_field_partitions(self.pathname): 177 self.flush_fields() 178 179 class IndexReader: 180 181 "Accessing the term and field dictionaries." 182 183 def __init__(self, pathname): 184 self.dict_reader = get_term_reader(pathname, "merged") 185 self.field_dict_reader = get_field_reader(pathname, "merged") 186 187 def get_terms(self): 188 return self.dict_reader.get_terms() 189 190 def find_terms(self, term): 191 return self.dict_reader.find_terms(term) 192 193 def find_positions(self, term): 194 return self.dict_reader.find_positions(term) 195 196 def find_common_positions(self, terms): 197 return self.dict_reader.find_common_positions(terms) 198 199 def get_frequency(self, term): 200 return self.dict_reader.get_frequency(term) 201 202 def get_document_frequency(self, term): 203 return self.dict_reader.get_document_frequency(term) 204 205 def get_fields(self, docnum): 206 return self.field_dict_reader.get_fields(docnum) 207 208 def get_document(self, docnum): 209 return Document(docnum, self.get_fields(docnum)) 210 211 def close(self): 212 self.dict_reader.close() 213 self.field_dict_reader.close() 214 215 class Index: 216 217 "An inverted index solution encapsulating the various components." 218 219 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 220 flush_interval=FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): 221 222 self.pathname = pathname 223 self.interval = interval 224 self.doc_interval = doc_interval 225 self.field_interval = field_interval 226 self.flush_interval = flush_interval 227 self.open_partitions = open_partitions 228 self.reader = None 229 self.writer = None 230 231 def get_writer(self): 232 233 "Return a writer." 234 235 self._ensure_directory() 236 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 237 self.field_interval, self.flush_interval) 238 return self.writer 239 240 def _ensure_directory(self): 241 if not exists(self.pathname): 242 mkdir(self.pathname) 243 244 def get_reader(self, partition=0): 245 246 "Return a reader for the index." 247 248 # Ensure that only one partition exists. 249 250 self.merge() 251 return self._get_reader(partition) 252 253 def _get_reader(self, partition): 254 255 "Return a reader for the index." 256 257 if not exists(self.pathname): 258 raise OSError, "Index path %r does not exist." % self.pathname 259 260 self.reader = IndexReader(self.pathname) 261 return self.reader 262 263 def get_term_partitions(self): 264 265 "Return a set of term partition identifiers." 266 267 return get_term_partitions(self.pathname) 268 269 def get_field_partitions(self): 270 271 "Return a set of field partition identifiers." 272 273 return get_field_partitions(self.pathname) 274 275 def merge(self): 276 277 "Merge/optimise index partitions." 278 279 self._merge_terms() 280 self._merge_fields() 281 282 def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): 283 284 "Merge term or field dictionaries." 285 286 partitions = get_partitions() 287 288 # Ensure the correct labelling of a single partition. 289 290 if len(partitions) == 1: 291 partition = list(partitions)[0] 292 if partition != "merged": 293 rename_files(self.pathname, partition, "merged") 294 return 295 296 # Merge the partitions. 297 298 old_merged_counter = 0 299 300 while len(partitions) > 1: 301 302 if "merged" in partitions: 303 rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) 304 partitions.remove("merged") 305 partitions.add("old-merged-%d" % old_merged_counter) 306 old_merged_counter += 1 307 308 # Process only a certain number at once, avoiding resource limits. 309 310 active_partitions = list(islice(partitions, self.open_partitions)) 311 312 readers = [] 313 for partition in active_partitions: 314 readers.append(get_reader(self.pathname, partition)) 315 316 # Write directly to a dictionary. 317 318 writer = get_writer(self.pathname, "merged", *intervals) 319 merger = get_merger(writer, readers) 320 merger.merge() 321 merger.close() 322 323 # Remove old files. 324 325 for partition in active_partitions: 326 remove_files(self.pathname, partition) 327 328 # Acquire the partitions to check their number again. 329 330 partitions = get_partitions() 331 332 def _merge_terms(self): 333 334 "Merge term dictionaries." 335 336 self._merge_dictionaries(self.get_term_partitions, rename_term_files, 337 remove_term_files, get_term_reader, get_term_writer, 338 TermDictionaryMerger, [self.interval, self.doc_interval]) 339 340 def _merge_fields(self): 341 342 "Merge field dictionaries." 343 344 self._merge_dictionaries(self.get_field_partitions, rename_field_files, 345 remove_field_files, get_field_reader, get_field_writer, 346 FieldDictionaryMerger, [self.field_interval]) 347 348 def update(self, other_indexes): 349 350 "Copy the content of the 'other_indexes' into this index and merge." 351 352 self._ensure_directory() 353 354 for i, index in enumerate(other_indexes): 355 for partition in index.get_term_partitions(): 356 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 357 for partition in index.get_field_partitions(): 358 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 359 360 self.merge() 361 362 def close(self): 363 if self.reader is not None: 364 self.reader.close() 365 self.reader = None 366 if self.writer is not None: 367 self.writer.close() 368 self.writer = None 369 370 # vim: tabstop=4 expandtab shiftwidth=4