1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from os import listdir, mkdir # index and partition discovery 24 from os.path import exists 25 26 try: 27 set 28 except NameError: 29 from sets import Set as set 30 31 # Constants. 32 33 TERM_INTERVAL = 100 34 DOCUMENT_INTERVAL = 100 35 FIELD_INTERVAL = 100 36 FLUSH_INTERVAL = 10000 37 38 # High-level classes. 39 40 class Document: 41 42 "A container of document information." 43 44 def __init__(self, docnum): 45 self.docnum = docnum 46 self.fields = [] 47 self.terms = {} 48 49 def add_position(self, term, position): 50 51 """ 52 Add a position entry for the given 'term', indicating the given 53 'position'. 54 """ 55 56 self.terms.setdefault(term, []).append(position) 57 58 def add_field(self, identifier, value): 59 60 "Add a field having the given 'identifier' and 'value'." 61 62 self.fields.append((identifier, unicode(value))) # convert to string 63 64 def set_fields(self, fields): 65 66 """ 67 Set the document's 'fields': a list of tuples each containing an integer 68 identifier and a string value. 69 """ 70 71 self.fields = fields 72 73 class IndexWriter: 74 75 """ 76 Building term information and writing it to the term and field dictionaries. 77 """ 78 79 def __init__(self, pathname, interval, doc_interval, flush_interval): 80 self.pathname = pathname 81 self.interval = interval 82 self.doc_interval = doc_interval 83 self.flush_interval = flush_interval 84 85 self.dict_partition = 0 86 self.field_dict_partition = 0 87 88 self.terms = {} 89 self.docs = {} 90 91 self.doc_counter = 0 92 93 def add_document(self, doc): 94 95 """ 96 Add the given document 'doc', updating the document counter and flushing 97 terms and fields if appropriate. 98 """ 99 100 for term, positions in doc.terms.items(): 101 self.terms.setdefault(term, {})[doc.docnum] = positions 102 103 self.docs[doc.docnum] = doc.fields 104 105 self.doc_counter += 1 106 if self.flush_interval and self.doc_counter >= self.flush_interval: 107 self.flush_terms() 108 self.flush_fields() 109 self.doc_counter = 0 110 111 def get_term_writer(self): 112 113 "Return a term dictionary writer for the current partition." 114 115 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 116 117 def get_field_writer(self): 118 119 "Return a field dictionary writer for the current partition." 120 121 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 122 123 def flush_terms(self): 124 125 "Flush terms into the current term dictionary partition." 126 127 # Get the terms in order. 128 129 all_terms = self.terms 130 terms = all_terms.keys() 131 terms.sort() 132 133 dict_writer = self.get_term_writer() 134 135 for term in terms: 136 doc_positions = all_terms[term].items() 137 dict_writer.write_term_positions(term, doc_positions) 138 139 dict_writer.close() 140 141 self.terms = {} 142 self.dict_partition += 1 143 144 def flush_fields(self): 145 146 "Flush fields into the current term dictionary partition." 147 148 # Get the documents in order. 149 150 docs = self.docs.items() 151 docs.sort() 152 153 field_dict_writer = self.get_field_writer() 154 155 for docnum, fields in docs: 156 field_dict_writer.write_fields(docnum, fields) 157 158 field_dict_writer.close() 159 160 self.docs = {} 161 self.field_dict_partition += 1 162 163 def close(self): 164 if self.terms: 165 self.flush_terms() 166 if self.docs: 167 self.flush_fields() 168 169 class IndexReader: 170 171 "Accessing the term and field dictionaries." 172 173 def __init__(self, pathname): 174 self.dict_reader = get_term_reader(pathname, "merged") 175 self.field_dict_reader = get_field_reader(pathname, "merged") 176 177 def find_terms(self, term): 178 return self.dict_reader.find_terms(term) 179 180 def find_positions(self, term): 181 return self.dict_reader.find_positions(term) 182 183 def find_common_positions(self, term): 184 return self.dict_reader.find_common_positions(term) 185 186 def get_frequency(self, term): 187 return self.dict_reader.get_frequency(term) 188 189 def get_document_frequency(self, term): 190 return self.dict_reader.get_document_frequency(term) 191 192 def get_fields(self, docnum): 193 return self.field_dict_reader.get_fields(docnum) 194 195 def close(self): 196 self.dict_reader.close() 197 self.field_dict_reader.close() 198 199 class Index: 200 201 "An inverted index solution encapsulating the various components." 202 203 def __init__(self, pathname): 204 self.pathname = pathname 205 self.reader = None 206 self.writer = None 207 208 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 209 210 """ 211 Return a writer, optionally using the given indexing 'interval', 212 'doc_interval' and 'flush_interval'. 213 """ 214 215 self._ensure_directory() 216 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 217 return self.writer 218 219 def _ensure_directory(self): 220 if not exists(self.pathname): 221 mkdir(self.pathname) 222 223 def get_reader(self, partition=0): 224 225 "Return a reader for the index." 226 227 # Ensure that only one partition exists. 228 229 self.merge() 230 return self._get_reader(partition) 231 232 def _get_reader(self, partition): 233 234 "Return a reader for the index." 235 236 if not exists(self.pathname): 237 raise OSError, "Index path %r does not exist." % self.pathname 238 239 self.reader = IndexReader(self.pathname) 240 return self.reader 241 242 def _get_partitions(self, prefix): 243 244 """ 245 Return a set of partition identifiers using 'prefix' to identify 246 relevant files. 247 """ 248 249 prefix_length = len(prefix) 250 251 partitions = set() 252 for filename in listdir(self.pathname): 253 if filename.startswith(prefix): 254 partitions.add(filename[prefix_length:]) 255 return partitions 256 257 def get_term_partitions(self): 258 259 "Return a set of term partition identifiers." 260 261 return self._get_partitions("terms-") 262 263 def get_field_partitions(self): 264 265 "Return a set of field partition identifiers." 266 267 return self._get_partitions("fields-") 268 269 def merge(self): 270 271 "Merge/optimise index partitions." 272 273 self._merge_terms() 274 self._merge_fields() 275 276 def _merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 277 278 """ 279 Merge term dictionaries using the given indexing 'interval' and 280 'doc_interval'. 281 """ 282 283 readers = [] 284 partitions = self.get_term_partitions() 285 286 for partition in partitions: 287 readers.append(get_term_reader(self.pathname, partition)) 288 289 # Write directly to a dictionary. 290 291 if len(readers) > 1: 292 if "merged" in partitions: 293 rename_term_files(self.pathname, "merged", "old-merged") 294 partitions.remove("merged") 295 partitions.add("old-merged") 296 297 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 298 merger = TermDictionaryMerger(writer, readers) 299 merger.merge() 300 merger.close() 301 302 # Remove old files. 303 304 for partition in partitions: 305 remove_term_files(self.pathname, partition) 306 307 elif len(readers) == 1: 308 partition = list(partitions)[0] 309 if partition != "merged": 310 rename_term_files(self.pathname, partition, "merged") 311 312 def _merge_fields(self, interval=FIELD_INTERVAL): 313 314 "Merge field dictionaries using the given indexing 'interval'." 315 316 readers = [] 317 partitions = self.get_field_partitions() 318 319 for partition in partitions: 320 readers.append(get_field_reader(self.pathname, partition)) 321 322 # Write directly to a dictionary. 323 324 if len(readers) > 1: 325 if "merged" in partitions: 326 rename_field_files(self.pathname, "merged", "old-merged") 327 partitions.remove("merged") 328 partitions.add("old-merged") 329 330 writer = get_field_writer(self.pathname, "merged", interval) 331 merger = FieldDictionaryMerger(writer, readers) 332 merger.merge() 333 merger.close() 334 335 # Remove old files. 336 337 for partition in partitions: 338 remove_field_files(self.pathname, partition) 339 340 elif len(readers) == 1: 341 partition = list(partitions)[0] 342 if partition != "merged": 343 rename_field_files(self.pathname, partition, "merged") 344 345 def update(self, other_indexes): 346 347 "Copy the content of the 'other_indexes' into this index and merge." 348 349 self._ensure_directory() 350 351 for i, index in enumerate(other_indexes): 352 for partition in index.get_term_partitions(): 353 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 354 for partition in index.get_field_partitions(): 355 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 356 357 self.merge() 358 359 def close(self): 360 if self.reader is not None: 361 self.reader.close() 362 self.reader = None 363 if self.writer is not None: 364 self.writer.close() 365 self.writer = None 366 367 # vim: tabstop=4 expandtab shiftwidth=4