1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from os import listdir, mkdir # index and partition discovery 24 from os.path import exists 25 26 try: 27 set 28 except NameError: 29 from sets import Set as set 30 31 # Constants. 32 33 TERM_INTERVAL = 100 34 DOCUMENT_INTERVAL = 100 35 FIELD_INTERVAL = 100 36 FLUSH_INTERVAL = 10000 37 38 # High-level classes. 39 40 class Document: 41 42 "A container of document information." 43 44 def __init__(self, docnum, fields=None): 45 self.docnum = docnum 46 self.fields = fields or [] 47 self.terms = {} 48 self.field_dict = None 49 50 def add_position(self, term, position): 51 52 """ 53 Add a position entry for the given 'term', indicating the given 54 'position'. 55 """ 56 57 self.terms.setdefault(term, []).append(position) 58 59 def add_field(self, identifier, value): 60 61 "Add a field having the given 'identifier' and 'value'." 62 63 self.fields.append((identifier, unicode(value))) # convert to string 64 65 def set_fields(self, fields): 66 67 """ 68 Set the document's 'fields': a list of tuples each containing an integer 69 identifier and a string value. 70 """ 71 72 self.fields = fields 73 74 def _ensure_dict(self): 75 if self.field_dict is None: 76 self.field_dict = dict(self.fields) 77 78 def keys(self): 79 self._ensure_dict() 80 return self.field_dict.keys() 81 82 def __getitem__(self, key): 83 self._ensure_dict() 84 return self.field_dict[key] 85 86 class IndexWriter: 87 88 """ 89 Building term information and writing it to the term and field dictionaries. 90 """ 91 92 def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval): 93 self.pathname = pathname 94 self.interval = interval 95 self.doc_interval = doc_interval 96 self.field_interval = field_interval 97 self.flush_interval = flush_interval 98 99 self.dict_partition = 0 100 self.field_dict_partition = 0 101 102 self.terms = {} 103 self.docs = [] 104 105 self.doc_counter = 0 106 107 def add_document(self, doc): 108 109 """ 110 Add the given document 'doc', updating the document counter and flushing 111 terms and fields if appropriate. 112 """ 113 114 for term, positions in doc.terms.items(): 115 self.terms.setdefault(term, {})[doc.docnum] = positions 116 117 self.docs.append((doc.docnum, doc.fields)) 118 119 self.doc_counter += 1 120 if self.flush_interval and self.doc_counter >= self.flush_interval: 121 self.flush_terms() 122 self.flush_fields() 123 self.doc_counter = 0 124 125 def get_term_writer(self): 126 127 "Return a term dictionary writer for the current partition." 128 129 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 130 131 def get_field_writer(self): 132 133 "Return a field dictionary writer for the current partition." 134 135 return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval) 136 137 def flush_terms(self): 138 139 "Flush terms into the current term dictionary partition." 140 141 # Get the terms in order. 142 143 all_terms = self.terms 144 terms = all_terms.keys() 145 terms.sort() 146 147 dict_writer = self.get_term_writer() 148 149 for term in terms: 150 doc_positions = all_terms[term].items() 151 dict_writer.write_term_positions(term, doc_positions) 152 153 dict_writer.close() 154 155 self.terms = {} 156 self.dict_partition += 1 157 158 def flush_fields(self): 159 160 "Flush fields into the current term dictionary partition." 161 162 # Get the documents in order. 163 164 self.docs.sort() 165 166 field_dict_writer = self.get_field_writer() 167 168 for docnum, fields in self.docs: 169 field_dict_writer.write_fields(docnum, fields) 170 171 field_dict_writer.close() 172 173 self.docs = [] 174 self.field_dict_partition += 1 175 176 def close(self): 177 if self.terms: 178 self.flush_terms() 179 if self.docs: 180 self.flush_fields() 181 182 class IndexReader: 183 184 "Accessing the term and field dictionaries." 185 186 def __init__(self, pathname): 187 self.dict_reader = get_term_reader(pathname, "merged") 188 self.field_dict_reader = get_field_reader(pathname, "merged") 189 190 def find_terms(self, term): 191 return self.dict_reader.find_terms(term) 192 193 def find_positions(self, term): 194 return self.dict_reader.find_positions(term) 195 196 def find_common_positions(self, terms): 197 return self.dict_reader.find_common_positions(terms) 198 199 def get_frequency(self, term): 200 return self.dict_reader.get_frequency(term) 201 202 def get_document_frequency(self, term): 203 return self.dict_reader.get_document_frequency(term) 204 205 def get_fields(self, docnum): 206 return self.field_dict_reader.get_fields(docnum) 207 208 def get_document(self, docnum): 209 return Document(docnum, self.get_fields(docnum)) 210 211 def close(self): 212 self.dict_reader.close() 213 self.field_dict_reader.close() 214 215 class Index: 216 217 "An inverted index solution encapsulating the various components." 218 219 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 220 flush_interval=FLUSH_INTERVAL): 221 222 self.pathname = pathname 223 self.interval = interval 224 self.doc_interval = doc_interval 225 self.field_interval = field_interval 226 self.flush_interval = flush_interval 227 self.reader = None 228 self.writer = None 229 230 def get_writer(self): 231 232 "Return a writer." 233 234 self._ensure_directory() 235 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, 236 self.field_interval, self.flush_interval) 237 return self.writer 238 239 def _ensure_directory(self): 240 if not exists(self.pathname): 241 mkdir(self.pathname) 242 243 def get_reader(self, partition=0): 244 245 "Return a reader for the index." 246 247 # Ensure that only one partition exists. 248 249 self.merge() 250 return self._get_reader(partition) 251 252 def _get_reader(self, partition): 253 254 "Return a reader for the index." 255 256 if not exists(self.pathname): 257 raise OSError, "Index path %r does not exist." % self.pathname 258 259 self.reader = IndexReader(self.pathname) 260 return self.reader 261 262 def _get_partitions(self, prefix): 263 264 """ 265 Return a set of partition identifiers using 'prefix' to identify 266 relevant files. 267 """ 268 269 prefix_length = len(prefix) 270 271 partitions = set() 272 for filename in listdir(self.pathname): 273 if filename.startswith(prefix): 274 partitions.add(filename[prefix_length:]) 275 return partitions 276 277 def get_term_partitions(self): 278 279 "Return a set of term partition identifiers." 280 281 return self._get_partitions("terms-") 282 283 def get_field_partitions(self): 284 285 "Return a set of field partition identifiers." 286 287 return self._get_partitions("fields-") 288 289 def merge(self): 290 291 "Merge/optimise index partitions." 292 293 self._merge_terms() 294 self._merge_fields() 295 296 def _merge_terms(self): 297 298 "Merge term dictionaries." 299 300 readers = [] 301 partitions = self.get_term_partitions() 302 303 for partition in partitions: 304 readers.append(get_term_reader(self.pathname, partition)) 305 306 # Write directly to a dictionary. 307 308 if len(readers) > 1: 309 if "merged" in partitions: 310 rename_term_files(self.pathname, "merged", "old-merged") 311 partitions.remove("merged") 312 partitions.add("old-merged") 313 314 writer = get_term_writer(self.pathname, "merged", self.interval, self.doc_interval) 315 merger = TermDictionaryMerger(writer, readers) 316 merger.merge() 317 merger.close() 318 319 # Remove old files. 320 321 for partition in partitions: 322 remove_term_files(self.pathname, partition) 323 324 elif len(readers) == 1: 325 partition = list(partitions)[0] 326 if partition != "merged": 327 rename_term_files(self.pathname, partition, "merged") 328 329 def _merge_fields(self): 330 331 "Merge field dictionaries." 332 333 readers = [] 334 partitions = self.get_field_partitions() 335 336 for partition in partitions: 337 readers.append(get_field_reader(self.pathname, partition)) 338 339 # Write directly to a dictionary. 340 341 if len(readers) > 1: 342 if "merged" in partitions: 343 rename_field_files(self.pathname, "merged", "old-merged") 344 partitions.remove("merged") 345 partitions.add("old-merged") 346 347 writer = get_field_writer(self.pathname, "merged", self.field_interval) 348 merger = FieldDictionaryMerger(writer, readers) 349 merger.merge() 350 merger.close() 351 352 # Remove old files. 353 354 for partition in partitions: 355 remove_field_files(self.pathname, partition) 356 357 elif len(readers) == 1: 358 partition = list(partitions)[0] 359 if partition != "merged": 360 rename_field_files(self.pathname, partition, "merged") 361 362 def update(self, other_indexes): 363 364 "Copy the content of the 'other_indexes' into this index and merge." 365 366 self._ensure_directory() 367 368 for i, index in enumerate(other_indexes): 369 for partition in index.get_term_partitions(): 370 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 371 for partition in index.get_field_partitions(): 372 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 373 374 self.merge() 375 376 def close(self): 377 if self.reader is not None: 378 self.reader.close() 379 self.reader = None 380 if self.writer is not None: 381 self.writer.close() 382 self.writer = None 383 384 # vim: tabstop=4 expandtab shiftwidth=4