1 #!/usr/bin/env python 2 3 """ 4 High-level classes. 5 6 Copyright (C) 2009, 2010, 2011 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from iixr.filesystem import * 22 from iixr.terms import MultipleReader 23 from itermerge import itermerge 24 from os import mkdir # index discovery 25 from os.path import exists 26 27 # Constants. 28 29 FLUSH_INTERVAL = 10000 30 INDEX_INTERVAL = 1000 31 OPEN_PARTITIONS = 20 32 33 # High-level classes. 34 35 class Document: 36 37 "A container of document information." 38 39 def __init__(self, docnum): 40 self.docnum = docnum 41 self.terms = {} 42 43 def add_position(self, term, position): 44 45 """ 46 Add a position entry for the given 'term', indicating the given 47 'position'. 48 """ 49 50 self.terms.setdefault(term, []).append(position) 51 52 class IndexWriter: 53 54 "Building term information and writing it to the term dictionary." 55 56 def __init__(self, pathname, flush_interval): 57 self.pathname = pathname 58 self.flush_interval = flush_interval 59 60 self.term_partition = get_next_partition(get_term_partitions(self.pathname)) 61 62 self.terms = {} 63 self.doc_counter = 0 64 65 def add_document(self, doc): 66 67 """ 68 Add the given document 'doc', updating the document counter and flushing 69 terms and fields if appropriate. 70 """ 71 72 docnum = doc.docnum 73 74 for term, positions in doc.terms.items(): 75 self.terms.setdefault(term, {})[docnum] = positions 76 77 self.doc_counter += 1 78 79 if self.flush_interval and self.doc_counter >= self.flush_interval: 80 self.flush_terms() 81 self.doc_counter = 0 82 83 def get_term_writer(self): 84 85 "Return a term writer for the current partition." 86 87 return get_term_writer(self.pathname, self.term_partition) 88 89 def flush_terms(self): 90 91 "Flush terms into the current term partition." 92 93 # Get the terms in order. 94 95 term_writer = self.get_term_writer() 96 try: 97 term_writer.write_terms(self.terms) 98 finally: 99 term_writer.close() 100 101 self.terms = {} 102 self.term_partition += 1 103 104 def close(self): 105 if self.terms or not get_term_partitions(self.pathname): 106 self.flush_terms() 107 108 class Index: 109 110 "An inverted index solution encapsulating the various components." 111 112 def __init__(self, pathname, flush_interval=FLUSH_INTERVAL, 113 open_partitions=OPEN_PARTITIONS): 114 115 self.pathname = pathname 116 self.flush_interval = flush_interval 117 self.open_partitions = open_partitions 118 self.reader = None 119 self.writer = None 120 121 def get_writer(self): 122 123 "Return a writer." 124 125 if self.writer is None: 126 self._ensure_directory() 127 self.writer = IndexWriter(self.pathname, self.flush_interval) 128 return self.writer 129 130 def _ensure_directory(self): 131 if not exists(self.pathname): 132 mkdir(self.pathname) 133 134 def _get_readers(self, get_reader): 135 136 "Return a list of underlying readers given the 'get_reader' function." 137 138 # Get the partitions in order. 139 140 partitions = list(get_term_partitions(self.pathname)) 141 partitions.sort() 142 143 # Return the readers. 144 145 return [get_reader(self.pathname, partition) for partition in partitions] 146 147 def get_reader(self, refresh=0): 148 149 "Return a reader for the index." 150 151 if refresh and self.reader is not None: 152 self.reader.close() 153 self.reader = None 154 155 if self.reader is None: 156 if not exists(self.pathname): 157 raise OSError, "Index path %r does not exist." % self.pathname 158 159 # Try and get combined readers. 160 161 try: 162 readers = self._get_readers(get_combined_term_reader) 163 except IOError: 164 readers = self._get_readers(get_term_reader) 165 166 self.reader = MultipleReader(readers) 167 168 return self.reader 169 170 def merge(self, interval=INDEX_INTERVAL): 171 172 "Merge the partitions in the index." 173 174 # Get data readers. 175 176 readers = self._get_readers(get_term_data_reader) 177 reader = MultipleReader(readers, self.merge_data) 178 writer = get_term_writer(self.pathname, "merged") 179 index_writer = get_term_index_writer(self.pathname, "merged") 180 181 try: 182 writer.begin(*reader.get_sizes()) 183 index_writer.begin() 184 185 i = 0 186 for term, data in reader: 187 if i % interval == 0: 188 index_writer.write_term(term, writer.tell()) 189 index_writer.end_record() 190 191 writer.write_term_plus_remaining(term, data) 192 writer.end_record() 193 i += 1 194 195 finally: 196 index_writer.close() 197 writer.close() 198 reader.close() 199 200 for partition in get_term_partitions(self.pathname): 201 remove_term_files(self.pathname, partition) 202 203 rename_term_files(self.pathname, "merged", 0) 204 self.reader = None 205 self.writer = None 206 207 def merge_data(self, a, b): 208 209 """ 210 Merge 'a' and 'b', modifying the data to permit concatenation. 211 """ 212 213 # Modify the record to indicate a continuation of the data. 214 215 c = a + b 216 c[len(a) - 1] = 1 217 return c 218 219 def close(self): 220 if self.reader is not None: 221 self.reader.close() 222 self.reader = None 223 if self.writer is not None: 224 self.writer.close() 225 self.writer = None 226 227 # vim: tabstop=4 expandtab shiftwidth=4