1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010, 2011 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from itermerge import itermerge 23 from os import mkdir # index discovery 24 from os.path import exists 25 import operator 26 27 # Constants. 28 29 FLUSH_INTERVAL = 10000 30 OPEN_PARTITIONS = 20 31 32 # High-level classes. 33 34 class Document: 35 36 "A container of document information." 37 38 def __init__(self, docnum): 39 self.docnum = docnum 40 self.terms = {} 41 42 def add_position(self, term, position): 43 44 """ 45 Add a position entry for the given 'term', indicating the given 46 'position'. 47 """ 48 49 self.terms.setdefault(term, []).append(position) 50 51 class IndexWriter: 52 53 "Building term information and writing it to the term dictionary." 54 55 def __init__(self, pathname, flush_interval): 56 self.pathname = pathname 57 self.flush_interval = flush_interval 58 59 self.term_partition = get_next_partition(get_term_partitions(self.pathname)) 60 61 self.terms = {} 62 self.doc_counter = 0 63 64 def add_document(self, doc): 65 66 """ 67 Add the given document 'doc', updating the document counter and flushing 68 terms and fields if appropriate. 69 """ 70 71 docnum = doc.docnum 72 73 for term, positions in doc.terms.items(): 74 self.terms.setdefault(term, {})[docnum] = positions 75 76 self.doc_counter += 1 77 78 if self.flush_interval and self.doc_counter >= self.flush_interval: 79 self.flush_terms() 80 self.doc_counter = 0 81 82 def get_term_writer(self): 83 84 "Return a term writer for the current partition." 85 86 return get_term_writer(self.pathname, self.term_partition) 87 88 def flush_terms(self): 89 90 "Flush terms into the current term partition." 91 92 # Get the terms in order. 93 94 term_writer = self.get_term_writer() 95 try: 96 term_writer.write_terms(self.terms) 97 finally: 98 term_writer.close() 99 100 self.terms = {} 101 self.term_partition += 1 102 103 def close(self): 104 if self.terms or not get_term_partitions(self.pathname): 105 self.flush_terms() 106 107 class IndexReader(itermerge): 108 109 "Accessing the term dictionaries." 110 111 def __init__(self, pathname, get_reader=None, combine=None): 112 113 # Get the partitions in order. 114 115 partitions = list(get_term_partitions(pathname)) 116 partitions.sort() 117 118 # Initialise the underlying term partition readers. 119 120 self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions] 121 self.combine = combine or operator.add 122 123 # Initialise this object as an iterator over the readers. 124 125 itermerge.__init__(self, self.readers) 126 self.next_value = None 127 128 def get_sizes(self): 129 130 # Readers must have compatible sizes. 131 132 if self.readers: 133 return self.readers[0].get_sizes() 134 else: 135 return 0, 0 136 137 def next(self): 138 if self.next_value is not None: 139 term, positions = self.next_value 140 else: 141 term, positions = itermerge.next(self) 142 143 # Look at the next item to see if it is has positions for the current 144 # term. 145 146 try: 147 t, p = itermerge.next(self) 148 while t == term: 149 positions = self.combine(positions, p) 150 t, p = itermerge.next(self) 151 self.next_value = t, p 152 153 # Where an item could not be fetched, cause future requests to fail. 154 155 except StopIteration: 156 self.next_value = None 157 158 return term, positions 159 160 def close(self): 161 for reader in self.readers: 162 reader.close() 163 self.readers = [] 164 165 class Index: 166 167 "An inverted index solution encapsulating the various components." 168 169 def __init__(self, pathname, flush_interval=FLUSH_INTERVAL, 170 open_partitions=OPEN_PARTITIONS): 171 172 self.pathname = pathname 173 self.flush_interval = flush_interval 174 self.open_partitions = open_partitions 175 self.reader = None 176 self.writer = None 177 178 def get_writer(self): 179 180 "Return a writer." 181 182 if self.writer is None: 183 self._ensure_directory() 184 self.writer = IndexWriter(self.pathname, self.flush_interval) 185 return self.writer 186 187 def _ensure_directory(self): 188 if not exists(self.pathname): 189 mkdir(self.pathname) 190 191 def get_reader(self, refresh=0): 192 193 "Return a reader for the index." 194 195 if refresh and self.reader is not None: 196 self.reader.close() 197 self.reader = None 198 199 if self.reader is None: 200 if not exists(self.pathname): 201 raise OSError, "Index path %r does not exist." % self.pathname 202 self.reader = IndexReader(self.pathname) 203 return self.reader 204 205 def merge(self): 206 207 "Merge the partitions in the index." 208 209 reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data) 210 writer = get_term_writer(self.pathname, "merged") 211 try: 212 writer.begin(*reader.get_sizes()) 213 for term, data in reader: 214 writer.write_term_plus_remaining(term, data) 215 writer.end_record() 216 finally: 217 writer.close() 218 reader.close() 219 220 for partition in get_term_partitions(self.pathname): 221 remove_term_files(self.pathname, partition) 222 223 rename_term_files(self.pathname, "merged", 0) 224 225 def merge_data(self, a, b): 226 227 """ 228 Merge 'a' and 'b', modifying the data to permit concatenation. 229 """ 230 231 # Modify the record to indicate a continuation of the data. 232 233 c = a + b 234 c[len(a) - 1] = 1 235 return c 236 237 def close(self): 238 if self.reader is not None: 239 self.reader.close() 240 self.reader = None 241 if self.writer is not None: 242 self.writer.close() 243 self.writer = None 244 245 # vim: tabstop=4 expandtab shiftwidth=4