1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from os import listdir, mkdir # index and partition discovery 24 from os.path import exists 25 26 try: 27 set 28 except NameError: 29 from sets import Set as set 30 31 # Constants. 32 33 TERM_INTERVAL = 100 34 DOCUMENT_INTERVAL = 100 35 FIELD_INTERVAL = 100 36 FLUSH_INTERVAL = 10000 37 38 # High-level classes. 39 40 class Document: 41 42 "A container of document information." 43 44 def __init__(self, docnum, fields=None): 45 self.docnum = docnum 46 self.fields = fields or [] 47 self.terms = {} 48 self.field_dict = None 49 50 def add_position(self, term, position): 51 52 """ 53 Add a position entry for the given 'term', indicating the given 54 'position'. 55 """ 56 57 self.terms.setdefault(term, []).append(position) 58 59 def add_field(self, identifier, value): 60 61 "Add a field having the given 'identifier' and 'value'." 62 63 self.fields.append((identifier, unicode(value))) # convert to string 64 65 def set_fields(self, fields): 66 67 """ 68 Set the document's 'fields': a list of tuples each containing an integer 69 identifier and a string value. 70 """ 71 72 self.fields = fields 73 74 def _ensure_dict(self): 75 if self.field_dict is None: 76 self.field_dict = dict(self.fields) 77 78 def keys(self): 79 self._ensure_dict() 80 return self.field_dict.keys() 81 82 def __getitem__(self, key): 83 self._ensure_dict() 84 return self.field_dict[key] 85 86 class IndexWriter: 87 88 """ 89 Building term information and writing it to the term and field dictionaries. 90 """ 91 92 def __init__(self, pathname, interval, doc_interval, flush_interval): 93 self.pathname = pathname 94 self.interval = interval 95 self.doc_interval = doc_interval 96 self.flush_interval = flush_interval 97 98 self.dict_partition = 0 99 self.field_dict_partition = 0 100 101 self.terms = {} 102 self.docs = [] 103 104 self.doc_counter = 0 105 106 def add_document(self, doc): 107 108 """ 109 Add the given document 'doc', updating the document counter and flushing 110 terms and fields if appropriate. 111 """ 112 113 for term, positions in doc.terms.items(): 114 self.terms.setdefault(term, {})[doc.docnum] = positions 115 116 self.docs.append((doc.docnum, doc.fields)) 117 118 self.doc_counter += 1 119 if self.flush_interval and self.doc_counter >= self.flush_interval: 120 self.flush_terms() 121 self.flush_fields() 122 self.doc_counter = 0 123 124 def get_term_writer(self): 125 126 "Return a term dictionary writer for the current partition." 127 128 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 129 130 def get_field_writer(self): 131 132 "Return a field dictionary writer for the current partition." 133 134 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 135 136 def flush_terms(self): 137 138 "Flush terms into the current term dictionary partition." 139 140 # Get the terms in order. 141 142 all_terms = self.terms 143 terms = all_terms.keys() 144 terms.sort() 145 146 dict_writer = self.get_term_writer() 147 148 for term in terms: 149 doc_positions = all_terms[term].items() 150 dict_writer.write_term_positions(term, doc_positions) 151 152 dict_writer.close() 153 154 self.terms = {} 155 self.dict_partition += 1 156 157 def flush_fields(self): 158 159 "Flush fields into the current term dictionary partition." 160 161 # Get the documents in order. 162 163 self.docs.sort() 164 165 field_dict_writer = self.get_field_writer() 166 167 for docnum, fields in self.docs: 168 field_dict_writer.write_fields(docnum, fields) 169 170 field_dict_writer.close() 171 172 self.docs = [] 173 self.field_dict_partition += 1 174 175 def close(self): 176 if self.terms: 177 self.flush_terms() 178 if self.docs: 179 self.flush_fields() 180 181 class IndexReader: 182 183 "Accessing the term and field dictionaries." 184 185 def __init__(self, pathname): 186 self.dict_reader = get_term_reader(pathname, "merged") 187 self.field_dict_reader = get_field_reader(pathname, "merged") 188 189 def find_terms(self, term): 190 return self.dict_reader.find_terms(term) 191 192 def find_positions(self, term): 193 return self.dict_reader.find_positions(term) 194 195 def find_common_positions(self, term): 196 return self.dict_reader.find_common_positions(term) 197 198 def get_frequency(self, term): 199 return self.dict_reader.get_frequency(term) 200 201 def get_document_frequency(self, term): 202 return self.dict_reader.get_document_frequency(term) 203 204 def get_fields(self, docnum): 205 return self.field_dict_reader.get_fields(docnum) 206 207 def get_document(self, docnum): 208 return Document(docnum, self.get_fields(docnum)) 209 210 def close(self): 211 self.dict_reader.close() 212 self.field_dict_reader.close() 213 214 class Index: 215 216 "An inverted index solution encapsulating the various components." 217 218 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, 219 flush_interval=FLUSH_INTERVAL): 220 221 self.pathname = pathname 222 self.interval = interval 223 self.doc_interval = doc_interval 224 self.field_interval = field_interval 225 self.flush_interval = flush_interval 226 self.reader = None 227 self.writer = None 228 229 def get_writer(self): 230 231 "Return a writer." 232 233 self._ensure_directory() 234 self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval, self.flush_interval) 235 return self.writer 236 237 def _ensure_directory(self): 238 if not exists(self.pathname): 239 mkdir(self.pathname) 240 241 def get_reader(self, partition=0): 242 243 "Return a reader for the index." 244 245 # Ensure that only one partition exists. 246 247 self.merge() 248 return self._get_reader(partition) 249 250 def _get_reader(self, partition): 251 252 "Return a reader for the index." 253 254 if not exists(self.pathname): 255 raise OSError, "Index path %r does not exist." % self.pathname 256 257 self.reader = IndexReader(self.pathname) 258 return self.reader 259 260 def _get_partitions(self, prefix): 261 262 """ 263 Return a set of partition identifiers using 'prefix' to identify 264 relevant files. 265 """ 266 267 prefix_length = len(prefix) 268 269 partitions = set() 270 for filename in listdir(self.pathname): 271 if filename.startswith(prefix): 272 partitions.add(filename[prefix_length:]) 273 return partitions 274 275 def get_term_partitions(self): 276 277 "Return a set of term partition identifiers." 278 279 return self._get_partitions("terms-") 280 281 def get_field_partitions(self): 282 283 "Return a set of field partition identifiers." 284 285 return self._get_partitions("fields-") 286 287 def merge(self): 288 289 "Merge/optimise index partitions." 290 291 self._merge_terms() 292 self._merge_fields() 293 294 def _merge_terms(self): 295 296 "Merge term dictionaries." 297 298 readers = [] 299 partitions = self.get_term_partitions() 300 301 for partition in partitions: 302 readers.append(get_term_reader(self.pathname, partition)) 303 304 # Write directly to a dictionary. 305 306 if len(readers) > 1: 307 if "merged" in partitions: 308 rename_term_files(self.pathname, "merged", "old-merged") 309 partitions.remove("merged") 310 partitions.add("old-merged") 311 312 writer = get_term_writer(self.pathname, "merged", self.interval, self.doc_interval) 313 merger = TermDictionaryMerger(writer, readers) 314 merger.merge() 315 merger.close() 316 317 # Remove old files. 318 319 for partition in partitions: 320 remove_term_files(self.pathname, partition) 321 322 elif len(readers) == 1: 323 partition = list(partitions)[0] 324 if partition != "merged": 325 rename_term_files(self.pathname, partition, "merged") 326 327 def _merge_fields(self): 328 329 "Merge field dictionaries." 330 331 readers = [] 332 partitions = self.get_field_partitions() 333 334 for partition in partitions: 335 readers.append(get_field_reader(self.pathname, partition)) 336 337 # Write directly to a dictionary. 338 339 if len(readers) > 1: 340 if "merged" in partitions: 341 rename_field_files(self.pathname, "merged", "old-merged") 342 partitions.remove("merged") 343 partitions.add("old-merged") 344 345 writer = get_field_writer(self.pathname, "merged", self.field_interval) 346 merger = FieldDictionaryMerger(writer, readers) 347 merger.merge() 348 merger.close() 349 350 # Remove old files. 351 352 for partition in partitions: 353 remove_field_files(self.pathname, partition) 354 355 elif len(readers) == 1: 356 partition = list(partitions)[0] 357 if partition != "merged": 358 rename_field_files(self.pathname, partition, "merged") 359 360 def update(self, other_indexes): 361 362 "Copy the content of the 'other_indexes' into this index and merge." 363 364 self._ensure_directory() 365 366 for i, index in enumerate(other_indexes): 367 for partition in index.get_term_partitions(): 368 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 369 for partition in index.get_field_partitions(): 370 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 371 372 self.merge() 373 374 def close(self): 375 if self.reader is not None: 376 self.reader.close() 377 self.reader = None 378 if self.writer is not None: 379 self.writer.close() 380 self.writer = None 381 382 # vim: tabstop=4 expandtab shiftwidth=4