1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.merging import * 23 from os import listdir, mkdir # index and partition discovery 24 from os.path import exists 25 26 try: 27 set 28 except NameError: 29 from sets import Set as set 30 31 # Constants. 32 33 TERM_INTERVAL = 100 34 DOCUMENT_INTERVAL = 100 35 FIELD_INTERVAL = 100 36 FLUSH_INTERVAL = 10000 37 38 # High-level classes. 39 40 class Document: 41 42 "A container of document information." 43 44 def __init__(self, docnum): 45 self.docnum = docnum 46 self.fields = [] 47 self.terms = {} 48 49 def add_position(self, term, position): 50 51 """ 52 Add a position entry for the given 'term', indicating the given 53 'position'. 54 """ 55 56 self.terms.setdefault(term, []).append(position) 57 58 def add_field(self, identifier, value): 59 60 "Add a field having the given 'identifier' and 'value'." 61 62 self.fields.append((identifier, unicode(value))) # convert to string 63 64 def set_fields(self, fields): 65 66 """ 67 Set the document's 'fields': a list of tuples each containing an integer 68 identifier and a string value. 69 """ 70 71 self.fields = fields 72 73 class IndexWriter: 74 75 """ 76 Building term information and writing it to the term and field dictionaries. 77 """ 78 79 def __init__(self, pathname, interval, doc_interval, flush_interval): 80 self.pathname = pathname 81 self.interval = interval 82 self.doc_interval = doc_interval 83 self.flush_interval = flush_interval 84 85 self.dict_partition = 0 86 self.field_dict_partition = 0 87 88 self.terms = {} 89 self.docs = {} 90 91 self.doc_counter = 0 92 93 def add_document(self, doc): 94 95 """ 96 Add the given document 'doc', updating the document counter and flushing 97 terms and fields if appropriate. 98 """ 99 100 for term, positions in doc.terms.items(): 101 self.terms.setdefault(term, {})[doc.docnum] = positions 102 103 self.docs[doc.docnum] = doc.fields 104 105 self.doc_counter += 1 106 if self.flush_interval and self.doc_counter >= self.flush_interval: 107 self.flush_terms() 108 self.flush_fields() 109 self.doc_counter = 0 110 111 def get_term_writer(self): 112 113 "Return a term dictionary writer for the current partition." 114 115 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 116 117 def get_field_writer(self): 118 119 "Return a field dictionary writer for the current partition." 120 121 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 122 123 def flush_terms(self): 124 125 "Flush terms into the current term dictionary partition." 126 127 # Get the terms in order. 128 129 all_terms = self.terms 130 terms = all_terms.keys() 131 terms.sort() 132 133 dict_writer = self.get_term_writer() 134 135 for term in terms: 136 doc_positions = all_terms[term].items() 137 dict_writer.write_term_positions(term, doc_positions) 138 139 dict_writer.close() 140 141 self.terms = {} 142 self.dict_partition += 1 143 144 def flush_fields(self): 145 146 "Flush fields into the current term dictionary partition." 147 148 # Get the documents in order. 149 150 docs = self.docs.items() 151 docs.sort() 152 153 field_dict_writer = self.get_field_writer() 154 155 for docnum, fields in docs: 156 field_dict_writer.write_fields(docnum, fields) 157 158 field_dict_writer.close() 159 160 self.docs = {} 161 self.field_dict_partition += 1 162 163 def close(self): 164 if self.terms: 165 self.flush_terms() 166 if self.docs: 167 self.flush_fields() 168 169 class IndexReader: 170 171 "Accessing the term and field dictionaries." 172 173 def __init__(self, pathname): 174 self.dict_reader = get_term_reader(pathname, "merged") 175 self.field_dict_reader = get_field_reader(pathname, "merged") 176 177 def find_terms(self, term): 178 return self.dict_reader.find_terms(term) 179 180 def find_positions(self, term): 181 return self.dict_reader.find_positions(term) 182 183 def get_frequency(self, term): 184 return self.dict_reader.get_frequency(term) 185 186 def get_document_frequency(self, term): 187 return self.dict_reader.get_document_frequency(term) 188 189 def get_fields(self, docnum): 190 return self.field_dict_reader.get_fields(docnum) 191 192 def close(self): 193 self.dict_reader.close() 194 self.field_dict_reader.close() 195 196 class Index: 197 198 "An inverted index solution encapsulating the various components." 199 200 def __init__(self, pathname): 201 self.pathname = pathname 202 self.reader = None 203 self.writer = None 204 205 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 206 207 """ 208 Return a writer, optionally using the given indexing 'interval', 209 'doc_interval' and 'flush_interval'. 210 """ 211 212 if not exists(self.pathname): 213 mkdir(self.pathname) 214 215 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 216 return self.writer 217 218 def get_reader(self, partition=0): 219 220 "Return a reader for the index." 221 222 # Ensure that only one partition exists. 223 224 self.merge() 225 return self._get_reader(partition) 226 227 def _get_reader(self, partition): 228 229 "Return a reader for the index." 230 231 if not exists(self.pathname): 232 raise OSError, "Index path %r does not exist." % self.pathname 233 234 self.reader = IndexReader(self.pathname) 235 return self.reader 236 237 def _get_partitions(self, prefix): 238 239 """ 240 Return a set of partition identifiers using 'prefix' to identify 241 relevant files. 242 """ 243 244 prefix_length = len(prefix) 245 246 partitions = set() 247 for filename in listdir(self.pathname): 248 if filename.startswith(prefix): 249 partitions.add(filename[prefix_length:]) 250 return partitions 251 252 def get_term_partitions(self): 253 254 "Return a set of term partition identifiers." 255 256 return self._get_partitions("terms-") 257 258 def get_field_partitions(self): 259 260 "Return a set of field partition identifiers." 261 262 return self._get_partitions("fields-") 263 264 def merge(self): 265 266 "Merge/optimise index partitions." 267 268 self._merge_terms() 269 self._merge_fields() 270 271 def _merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 272 273 """ 274 Merge term dictionaries using the given indexing 'interval' and 275 'doc_interval'. 276 """ 277 278 readers = [] 279 partitions = self.get_term_partitions() 280 281 for partition in partitions: 282 readers.append(get_term_reader(self.pathname, partition)) 283 284 # Write directly to a dictionary. 285 286 if len(readers) > 1: 287 if "merged" in partitions: 288 rename_term_files(self.pathname, "merged", "old-merged") 289 partitions.remove("merged") 290 partitions.add("old-merged") 291 292 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 293 merger = TermDictionaryMerger(writer, readers) 294 merger.merge() 295 merger.close() 296 297 # Remove old files. 298 299 for partition in partitions: 300 remove_term_files(self.pathname, partition) 301 302 elif len(readers) == 1: 303 partition = list(partitions)[0] 304 if partition != "merged": 305 rename_term_files(self.pathname, partition, "merged") 306 307 def _merge_fields(self, interval=FIELD_INTERVAL): 308 309 "Merge field dictionaries using the given indexing 'interval'." 310 311 readers = [] 312 partitions = self.get_field_partitions() 313 314 for partition in partitions: 315 readers.append(get_field_reader(self.pathname, partition)) 316 317 # Write directly to a dictionary. 318 319 if len(readers) > 1: 320 if "merged" in partitions: 321 rename_field_files(self.pathname, "merged", "old-merged") 322 partitions.remove("merged") 323 partitions.add("old-merged") 324 325 writer = get_field_writer(self.pathname, "merged", interval) 326 merger = FieldDictionaryMerger(writer, readers) 327 merger.merge() 328 merger.close() 329 330 # Remove old files. 331 332 for partition in partitions: 333 remove_field_files(self.pathname, partition) 334 335 elif len(readers) == 1: 336 partition = list(partitions)[0] 337 if partition != "merged": 338 rename_field_files(self.pathname, partition, "merged") 339 340 def update(self, other_indexes): 341 342 "Copy the content of the 'other_indexes' into this index and merge." 343 344 for i, index in enumerate(other_indexes): 345 for partition in index.get_term_partitions(): 346 copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i) 347 for partition in index.get_field_partitions(): 348 copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i) 349 350 self.merge() 351 352 def close(self): 353 if self.reader is not None: 354 self.reader.close() 355 self.reader = None 356 if self.writer is not None: 357 self.writer.close() 358 self.writer = None 359 360 # vim: tabstop=4 expandtab shiftwidth=4