1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from itertools import islice 24 from os import listdir, mkdir # index and partition discovery 25 from os.path import exists 26 27 try: 28 set 29 except NameError: 30 from sets import Set as set 31 32 # Constants. 33 34 TERM_INTERVAL = 100 35 DOCUMENT_INTERVAL = 100 36 FIELD_INTERVAL = 100 37 FLUSH_INTERVAL = 10000 38 OPEN_PARTITIONS = 20 39 40 # High-level classes. 41 42 class Document: 43 44 "A container of document information." 45 46 def __init__(self, docnum, fields=None): 47 self.docnum = docnum 48 self.fields = fields or [] 49 self.terms = {} 50 self.field_dict = None 51 52 def add_position(self, term, position): 53 54 """ 55 Add a position entry for the given 'term', indicating the given 56 'position'. 57 """ 58 59 self.terms.setdefault(term, []).append(position) 60 61 def add_field(self, identifier, value): 62 63 "Add a field having the given 'identifier' and 'value'." 64 65 self.fields.append((identifier, unicode(value))) # convert to string 66 67 def set_fields(self, fields): 68 69 """ 70 Set the document's 'fields': a list of tuples each containing an integer 71 identifier and a string value. 72 """ 73 74 self.fields = fields 75 76 def _ensure_dict(self): 77 if self.field_dict is None: 78 self.field_dict = dict(self.fields) 79 80 def keys(self): 81 self._ensure_dict() 82 return self.field_dict.keys() 83 84 def __getitem__(self, key): 85 self._ensure_dict() 86 return self.field_dict[key] 87 88 class IndexWriter: 89 90 """ 91 Building term information and writing it to the term and field dictionaries. 92 """ 93 94 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval): 95 self.pathname = pathname 96 self.interval = interval 97 self.doc_interval = doc_interval 98 self.field_interval = field_interval 99 self.flush_interval = flush_interval 100 101 self.dict_partition = 0 102 self.field_dict_partition = 0 103 104 self.terms = {} 105 self.docs = [] 106 107 self.doc_counter = 0 108 109 def add_document(self, doc): 110 111 """ 112 Add the given document 'doc', updating the document counter and flushing 113 terms and fields if appropriate. 114 """ 115 116 for term, positions in doc.terms.items(): 117 self.terms.setdefault(term, {})[doc.docnum] = positions 118 119 self.docs.append((doc.docnum, doc.fields)) 120 121 self.doc_counter += 1 122 if self.flush_interval and self.doc_counter >= self.flush_interval: 123 self.flush_terms() 124 self.flush_fields() 125 self.doc_counter = 0 126 127 def get_term_writer(self): 128 129 "Return a term dictionary writer for the current partition." 130 131 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 132 133 def get_field_writer(self): 134 135 "Return a field dictionary writer for the current partition." 136 137 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 138 139 def flush_terms(self): 140 141 "Flush terms into the current term dictionary partition." 142 143 # Get the terms in order. 144 145 all_terms = self.terms 146 terms = all_terms.keys() 147 terms.sort() 148 149 dict_writer = self.get_term_writer() 150 151 for term in terms: 152 doc_positions = all_terms[term].items() 153 dict_writer.write_term_positions(term, doc_positions) 154 155 dict_writer.close() 156 157 self.terms = {} 158 self.dict_partition += 1 159 160 def flush_fields(self): 161 162 "Flush fields into the current term dictionary partition." 163 164 # Get the documents in order. 165 166 self.docs.sort() 167 168 field_dict_writer = self.get_field_writer() 169 170 for docnum, fields in self.docs: 171 field_dict_writer.write_fields(docnum, fields) 172 173 field_dict_writer.close() 174 175 self.docs = [] 176 self.field_dict_partition += 1 177 178 def close(self): 179 if self.terms: 180 self.flush_terms() 181 if self.docs: 182 self.flush_fields() 183 184 class IndexReader: 185 186 "Accessing the term and field dictionaries." 187 188 def __init__(self, pathname): 189 self.dict_reader = get_term_reader(pathname, "merged") 190 self.field_dict_reader = get_field_reader(pathname, "merged") 191 192 def get_terms(self): 193 return self.dict_reader.get_terms() 194 195 def find_terms(self, term): 196 return self.dict_reader.find_terms(term) 197 198 def find_positions(self, term): 199 return self.dict_reader.find_positions(term) 200 201 def find_common_positions(self, terms): 202 return self.dict_reader.find_common_positions(terms) 203 204 def get_frequency(self, term): 205 return self.dict_reader.get_frequency(term) 206 207 def get_document_frequency(self, term): 208 return self.dict_reader.get_document_frequency(term) 209 210 def get_fields(self, docnum): 211 return self.field_dict_reader.get_fields(docnum) 212 213 def get_document(self, docnum): 214 return Document(docnum, self.get_fields(docnum)) 215 216 def close(self): 217 self.dict_reader.close() 218 self.field_dict_reader.close() 219 220 class Index: 221 222 "An inverted index solution encapsulating the various components." 223 224 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 225 flush_interval=FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): 226 227 self.pathname = pathname 228 self.interval = interval 229 self.doc_interval = doc_interval 230 self.field_interval = field_interval 231 self.flush_interval = flush_interval 232 self.open_partitions = open_partitions 233 self.reader = None 234 self.writer = None 235 236 def get_writer(self): 237 238 "Return a writer." 239 240 self._ensure_directory() 241 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 242 self.field_interval, self.flush_interval) 243 return self.writer 244 245 def _ensure_directory(self): 246 if not exists(self.pathname): 247 mkdir(self.pathname) 248 249 def get_reader(self, partition=0): 250 251 "Return a reader for the index." 252 253 # Ensure that only one partition exists. 254 255 self.merge() 256 return self._get_reader(partition) 257 258 def _get_reader(self, partition): 259 260 "Return a reader for the index." 261 262 if not exists(self.pathname): 263 raise OSError, "Index path %r does not exist." % self.pathname 264 265 self.reader = IndexReader(self.pathname) 266 return self.reader 267 268 def _get_partitions(self, prefix): 269 270 """ 271 Return a set of partition identifiers using 'prefix' to identify 272 relevant files. 273 """ 274 275 prefix_length = len(prefix) 276 277 partitions = set() 278 for filename in listdir(self.pathname): 279 if filename.startswith(prefix): 280 partitions.add(filename[prefix_length:]) 281 return partitions 282 283 def get_term_partitions(self): 284 285 "Return a set of term partition identifiers." 286 287 return self._get_partitions("terms-") 288 289 def get_field_partitions(self): 290 291 "Return a set of field partition identifiers." 292 293 return self._get_partitions("fields-") 294 295 def merge(self): 296 297 "Merge/optimise index partitions." 298 299 self._merge_terms() 300 self._merge_fields() 301 302 def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): 303 304 "Merge term or field dictionaries." 305 306 partitions = get_partitions() 307 308 # Ensure the correct labelling of a single partition. 309 310 if len(partitions) == 1: 311 partition = list(partitions)[0] 312 if partition != "merged": 313 rename_files(self.pathname, partition, "merged") 314 return 315 316 # Merge the partitions. 317 318 old_merged_counter = 0 319 320 while len(partitions) > 1: 321 322 if "merged" in partitions: 323 rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) 324 partitions.remove("merged") 325 partitions.add("old-merged-%d" % old_merged_counter) 326 old_merged_counter += 1 327 328 # Process only a certain number at once, avoiding resource limits. 329 330 active_partitions = list(islice(partitions, self.open_partitions)) 331 332 readers = [] 333 for partition in active_partitions: 334 readers.append(get_reader(self.pathname, partition)) 335 336 # Write directly to a dictionary. 337 338 writer = get_writer(self.pathname, "merged", *intervals) 339 merger = get_merger(writer, readers) 340 merger.merge() 341 merger.close() 342 343 # Remove old files. 344 345 for partition in active_partitions: 346 remove_files(self.pathname, partition) 347 348 # Acquire the partitions to check their number again. 349 350 partitions = get_partitions() 351 352 def _merge_terms(self): 353 354 "Merge term dictionaries." 355 356 self._merge_dictionaries(self.get_term_partitions, rename_term_files, 357 remove_term_files, get_term_reader, get_term_writer, 358 TermDictionaryMerger, [self.interval, self.doc_interval]) 359 360 def _merge_fields(self): 361 362 "Merge field dictionaries." 363 364 self._merge_dictionaries(self.get_field_partitions, rename_field_files, 365 remove_field_files, get_field_reader, get_field_writer, 366 FieldDictionaryMerger, [self.field_interval]) 367 368 def update(self, other_indexes): 369 370 "Copy the content of the 'other_indexes' into this index and merge." 371 372 self._ensure_directory() 373 374 for i, index in enumerate(other_indexes): 375 for partition in index.get_term_partitions(): 376 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 377 for partition in index.get_field_partitions(): 378 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 379 380 self.merge() 381 382 def close(self): 383 if self.reader is not None: 384 self.reader.close() 385 self.reader = None 386 if self.writer is not None: 387 self.writer.close() 388 self.writer = None 389 390 # vim: tabstop=4 expandtab shiftwidth=4