1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from os import listdir, mkdir # index and partition discovery 24 from os.path import exists 25 26 try: 27 set 28 except NameError: 29 from sets import Set as set 30 31 # Constants. 32 33 TERM_INTERVAL = 100 34 DOCUMENT_INTERVAL = 100 35 FIELD_INTERVAL = 100 36 FLUSH_INTERVAL = 10000 37 38 # High-level classes. 39 40 class Document: 41 42 "A container of document information." 43 44 def __init__(self, docnum, fields=None): 45 self.docnum = docnum 46 self.fields = fields or [] 47 self.terms = {} 48 self.field_dict = None 49 50 def add_position(self, term, position): 51 52 """ 53 Add a position entry for the given 'term', indicating the given 54 'position'. 55 """ 56 57 self.terms.setdefault(term, []).append(position) 58 59 def add_field(self, identifier, value): 60 61 "Add a field having the given 'identifier' and 'value'." 62 63 self.fields.append((identifier, unicode(value))) # convert to string 64 65 def set_fields(self, fields): 66 67 """ 68 Set the document's 'fields': a list of tuples each containing an integer 69 identifier and a string value. 70 """ 71 72 self.fields = fields 73 74 def _ensure_dict(self): 75 if self.field_dict is None: 76 self.field_dict = dict(self.fields) 77 78 def keys(self): 79 self._ensure_dict() 80 return self.field_dict.keys() 81 82 def __getitem__(self, key): 83 self._ensure_dict() 84 return self.field_dict[key] 85 86 class IndexWriter: 87 88 """ 89 Building term information and writing it to the term and field dictionaries. 90 """ 91 92 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval): 93 self.pathname = pathname 94 self.interval = interval 95 self.doc_interval = doc_interval 96 self.field_interval = field_interval 97 self.flush_interval = flush_interval 98 99 self.dict_partition = 0 100 self.field_dict_partition = 0 101 102 self.terms = {} 103 self.docs = [] 104 105 self.doc_counter = 0 106 107 def add_document(self, doc): 108 109 """ 110 Add the given document 'doc', updating the document counter and flushing 111 terms and fields if appropriate. 112 """ 113 114 for term, positions in doc.terms.items(): 115 self.terms.setdefault(term, {})[doc.docnum] = positions 116 117 self.docs.append((doc.docnum, doc.fields)) 118 119 self.doc_counter += 1 120 if self.flush_interval and self.doc_counter >= self.flush_interval: 121 self.flush_terms() 122 self.flush_fields() 123 self.doc_counter = 0 124 125 def get_term_writer(self): 126 127 "Return a term dictionary writer for the current partition." 128 129 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 130 131 def get_field_writer(self): 132 133 "Return a field dictionary writer for the current partition." 134 135 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 136 137 def flush_terms(self): 138 139 "Flush terms into the current term dictionary partition." 140 141 # Get the terms in order. 142 143 all_terms = self.terms 144 terms = all_terms.keys() 145 terms.sort() 146 147 dict_writer = self.get_term_writer() 148 149 for term in terms: 150 doc_positions = all_terms[term].items() 151 dict_writer.write_term_positions(term, doc_positions) 152 153 dict_writer.close() 154 155 self.terms = {} 156 self.dict_partition += 1 157 158 def flush_fields(self): 159 160 "Flush fields into the current term dictionary partition." 161 162 # Get the documents in order. 163 164 self.docs.sort() 165 166 field_dict_writer = self.get_field_writer() 167 168 for docnum, fields in self.docs: 169 field_dict_writer.write_fields(docnum, fields) 170 171 field_dict_writer.close() 172 173 self.docs = [] 174 self.field_dict_partition += 1 175 176 def close(self): 177 if self.terms: 178 self.flush_terms() 179 if self.docs: 180 self.flush_fields() 181 182 class IndexReader: 183 184 "Accessing the term and field dictionaries." 185 186 def __init__(self, pathname): 187 self.dict_reader = get_term_reader(pathname, "merged") 188 self.field_dict_reader = get_field_reader(pathname, "merged") 189 190 def get_terms(self): 191 return self.dict_reader.get_terms() 192 193 def find_terms(self, term): 194 return self.dict_reader.find_terms(term) 195 196 def find_positions(self, term): 197 return self.dict_reader.find_positions(term) 198 199 def find_common_positions(self, terms): 200 return self.dict_reader.find_common_positions(terms) 201 202 def get_frequency(self, term): 203 return self.dict_reader.get_frequency(term) 204 205 def get_document_frequency(self, term): 206 return self.dict_reader.get_document_frequency(term) 207 208 def get_fields(self, docnum): 209 return self.field_dict_reader.get_fields(docnum) 210 211 def get_document(self, docnum): 212 return Document(docnum, self.get_fields(docnum)) 213 214 def close(self): 215 self.dict_reader.close() 216 self.field_dict_reader.close() 217 218 class Index: 219 220 "An inverted index solution encapsulating the various components." 221 222 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 223 flush_interval=FLUSH_INTERVAL): 224 225 self.pathname = pathname 226 self.interval = interval 227 self.doc_interval = doc_interval 228 self.field_interval = field_interval 229 self.flush_interval = flush_interval 230 self.reader = None 231 self.writer = None 232 233 def get_writer(self): 234 235 "Return a writer." 236 237 self._ensure_directory() 238 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 239 self.field_interval, self.flush_interval) 240 return self.writer 241 242 def _ensure_directory(self): 243 if not exists(self.pathname): 244 mkdir(self.pathname) 245 246 def get_reader(self, partition=0): 247 248 "Return a reader for the index." 249 250 # Ensure that only one partition exists. 251 252 self.merge() 253 return self._get_reader(partition) 254 255 def _get_reader(self, partition): 256 257 "Return a reader for the index." 258 259 if not exists(self.pathname): 260 raise OSError, "Index path %r does not exist." % self.pathname 261 262 self.reader = IndexReader(self.pathname) 263 return self.reader 264 265 def _get_partitions(self, prefix): 266 267 """ 268 Return a set of partition identifiers using 'prefix' to identify 269 relevant files. 270 """ 271 272 prefix_length = len(prefix) 273 274 partitions = set() 275 for filename in listdir(self.pathname): 276 if filename.startswith(prefix): 277 partitions.add(filename[prefix_length:]) 278 return partitions 279 280 def get_term_partitions(self): 281 282 "Return a set of term partition identifiers." 283 284 return self._get_partitions("terms-") 285 286 def get_field_partitions(self): 287 288 "Return a set of field partition identifiers." 289 290 return self._get_partitions("fields-") 291 292 def merge(self): 293 294 "Merge/optimise index partitions." 295 296 self._merge_terms() 297 self._merge_fields() 298 299 def _merge_terms(self): 300 301 "Merge term dictionaries." 302 303 readers = [] 304 partitions = self.get_term_partitions() 305 306 for partition in partitions: 307 readers.append(get_term_reader(self.pathname, partition)) 308 309 # Write directly to a dictionary. 310 311 if len(readers) > 1: 312 if "merged" in partitions: 313 rename_term_files(self.pathname, "merged", "old-merged") 314 partitions.remove("merged") 315 partitions.add("old-merged") 316 317 writer = get_term_writer(self.pathname, "merged", self.interval, self.doc_interval) 318 merger = TermDictionaryMerger(writer, readers) 319 merger.merge() 320 merger.close() 321 322 # Remove old files. 323 324 for partition in partitions: 325 remove_term_files(self.pathname, partition) 326 327 elif len(readers) == 1: 328 partition = list(partitions)[0] 329 if partition != "merged": 330 rename_term_files(self.pathname, partition, "merged") 331 332 def _merge_fields(self): 333 334 "Merge field dictionaries." 335 336 readers = [] 337 partitions = self.get_field_partitions() 338 339 for partition in partitions: 340 readers.append(get_field_reader(self.pathname, partition)) 341 342 # Write directly to a dictionary. 343 344 if len(readers) > 1: 345 if "merged" in partitions: 346 rename_field_files(self.pathname, "merged", "old-merged") 347 partitions.remove("merged") 348 partitions.add("old-merged") 349 350 writer = get_field_writer(self.pathname, "merged", self.field_interval) 351 merger = FieldDictionaryMerger(writer, readers) 352 merger.merge() 353 merger.close() 354 355 # Remove old files. 356 357 for partition in partitions: 358 remove_field_files(self.pathname, partition) 359 360 elif len(readers) == 1: 361 partition = list(partitions)[0] 362 if partition != "merged": 363 rename_field_files(self.pathname, partition, "merged") 364 365 def update(self, other_indexes): 366 367 "Copy the content of the 'other_indexes' into this index and merge." 368 369 self._ensure_directory() 370 371 for i, index in enumerate(other_indexes): 372 for partition in index.get_term_partitions(): 373 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 374 for partition in index.get_field_partitions(): 375 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 376 377 self.merge() 378 379 def close(self): 380 if self.reader is not None: 381 self.reader.close() 382 self.reader = None 383 if self.writer is not None: 384 self.writer.close() 385 self.writer = None 386 387 # vim: tabstop=4 expandtab shiftwidth=4