1.1 --- a/iixr.py Sat Aug 29 22:12:25 2009 +0200
1.2 +++ b/iixr.py Sun Aug 30 03:10:20 2009 +0200
1.3 @@ -18,15 +18,17 @@
1.4 with this program. If not, see <http://www.gnu.org/licenses/>.
1.5 """
1.6
1.7 -from os import mkdir # to determine whether to create indexes
1.8 +from os import listdir, mkdir # index and partition discovery
1.9 from os.path import exists, join
1.10 from os.path import commonprefix # to find common string prefixes
1.11 from bisect import bisect_right # to find terms in the dictionary index
1.12 +from bisect import insort_right # to maintain a sorted list of data for merging
1.13 import bz2, zlib # for field compression
1.14
1.15 # Constants.
1.16
1.17 INTERVAL = 100
1.18 +FLUSH_INTERVAL = 1000000
1.19
1.20 compressors = [("b", bz2.compress), ("z", zlib.compress)]
1.21 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress}
1.22 @@ -42,8 +44,14 @@
1.23 self.reset()
1.24
1.25 def reset(self):
1.26 +
1.27 + "To be used to reset the state of the reader or writer between records."
1.28 +
1.29 pass
1.30
1.31 + def rewind(self):
1.32 + self.f.seek(0)
1.33 +
1.34 def close(self):
1.35 if self.f is not None:
1.36 self.f.close()
1.37 @@ -211,7 +219,7 @@
1.38
1.39 self.last_docnum = docnum
1.40
1.41 - def write_all_positions(self, doc_positions):
1.42 + def write_term_positions(self, doc_positions):
1.43
1.44 """
1.45 Write all 'doc_positions' - a collection of tuples of the form (document
1.46 @@ -272,7 +280,7 @@
1.47
1.48 return self.last_docnum, positions
1.49
1.50 - def read_all_positions(self, offset):
1.51 + def read_term_positions(self, offset):
1.52
1.53 """
1.54 Read all positions from 'offset', seeking to that position in the file
1.55 @@ -347,6 +355,10 @@
1.56 self.last_term = ""
1.57 self.last_offset = 0
1.58
1.59 + def rewind(self):
1.60 + self.reset()
1.61 + FileReader.rewind(self)
1.62 +
1.63 def read_term(self):
1.64
1.65 """
1.66 @@ -461,7 +473,7 @@
1.67 and positions at which the term is found.
1.68 """
1.69
1.70 - offset, frequency = self.position_writer.write_all_positions(doc_positions)
1.71 + offset, frequency = self.position_writer.write_term_positions(doc_positions)
1.72 self._write_term(term, offset, frequency)
1.73
1.74 def close(self):
1.75 @@ -529,6 +541,23 @@
1.76 else:
1.77 return None
1.78
1.79 + def rewind(self):
1.80 + self.info_reader.rewind()
1.81 +
1.82 + def _get_positions(self, offset):
1.83 + return self.position_reader.read_term_positions(offset)
1.84 +
1.85 + def read_term(self):
1.86 +
1.87 + """
1.88 + Return the next term, its frequency and the documents and positions at
1.89 + which the term is found.
1.90 + """
1.91 +
1.92 + term, offset, frequency = self.info_reader.read_term()
1.93 + positions = self._get_positions(offset)
1.94 + return term, frequency, positions
1.95 +
1.96 def find_positions(self, term):
1.97
1.98 "Return the documents and positions at which the given 'term' is found."
1.99 @@ -538,7 +567,7 @@
1.100 return None
1.101 else:
1.102 offset, frequency = t
1.103 - return self.position_reader.read_all_positions(offset)
1.104 + return self._get_positions(offset)
1.105
1.106 def get_frequency(self, term):
1.107
1.108 @@ -635,7 +664,7 @@
1.109 bad_docnum, fields = self.read_fields()
1.110 self.last_docnum = docnum
1.111 return docnum, fields
1.112 -
1.113 +
1.114 class FieldIndexWriter(FileWriter):
1.115
1.116 "Writing field index details to files."
1.117 @@ -758,6 +787,70 @@
1.118 self.field_reader.close()
1.119 self.field_index_reader.close()
1.120
1.121 +# Dictionary merging classes.
1.122 +
1.123 +class TermDictionaryMerger:
1.124 +
1.125 + "Merge position files."
1.126 +
1.127 + def __init__(self, writer, readers):
1.128 + self.writer = writer
1.129 + self.readers = readers
1.130 +
1.131 + def merge(self):
1.132 + entries = []
1.133 +
1.134 + # Get the first entries from the readers.
1.135 +
1.136 + for partition, reader in enumerate(self.readers):
1.137 + reader.rewind()
1.138 +
1.139 + try:
1.140 + term, frequency, positions = reader.read_term()
1.141 + insort_right(entries, (term, positions, partition))
1.142 + except EOFError:
1.143 + pass
1.144 +
1.145 + # While entries are available, write them out in order, merging where
1.146 + # appropriate.
1.147 +
1.148 + while entries:
1.149 + term, doc_positions, partition = entries[0]
1.150 + to_update = [partition]
1.151 +
1.152 + nentries = len(entries)
1.153 + i = 1
1.154 +
1.155 + # Find other entries for the term.
1.156 +
1.157 + while i < nentries:
1.158 + other_term, other_doc_positions, other_partition = entries[i]
1.159 +
1.160 + # For such entries, merge the positions.
1.161 +
1.162 + if other_term == term:
1.163 + doc_positions += other_doc_positions
1.164 + to_update.append(other_partition)
1.165 + i += 1
1.166 + else:
1.167 + break
1.168 +
1.169 + # Write the combined term details.
1.170 +
1.171 + doc_positions.sort()
1.172 + self.writer.write_term_positions(term, doc_positions)
1.173 +
1.174 + # Update the entries from the affected readers.
1.175 +
1.176 + del entries[:i]
1.177 +
1.178 + for partition in to_update:
1.179 + try:
1.180 + term, frequency, positions = self_readers[partition].read_term()
1.181 + insort_right(entries, (term, positions, partition))
1.182 + except EOFError:
1.183 + pass
1.184 +
1.185 # High-level classes.
1.186
1.187 class IndexWriter:
1.188 @@ -766,12 +859,20 @@
1.189 Building term information and writing it to the term and field dictionaries.
1.190 """
1.191
1.192 - def __init__(self, dict_writer, field_dict_writer):
1.193 - self.dict_writer = dict_writer
1.194 - self.field_dict_writer = field_dict_writer
1.195 + def __init__(self, pathname, interval, flush_interval):
1.196 + self.pathname = pathname
1.197 + self.interval = interval
1.198 + self.flush_interval = flush_interval
1.199 +
1.200 + self.dict_partition = 0
1.201 + self.field_dict_partition = 0
1.202 +
1.203 self.terms = {}
1.204 self.docs = {}
1.205
1.206 + self.position_counter = 0
1.207 + self.field_counter = 0
1.208 +
1.209 def add_position(self, term, docnum, position):
1.210
1.211 """
1.212 @@ -791,6 +892,10 @@
1.213
1.214 doc.append(position)
1.215
1.216 + self.position_counter += 1
1.217 + if self.flush_threshold and self.position_counter >= self.flush_threshold:
1.218 + self.flush_terms()
1.219 +
1.220 def add_fields(self, docnum, fields):
1.221
1.222 "Add for the document with the given 'docnum' a list of 'fields'."
1.223 @@ -800,41 +905,112 @@
1.224 else:
1.225 self.docs[docnum] += fields
1.226
1.227 - def close(self):
1.228 - if self.dict_writer is None:
1.229 - return
1.230 + self.field_counter += len(fields)
1.231 + if self.flush_threshold and self.field_counter >= self.flush_threshold:
1.232 + self.flush_fields()
1.233 +
1.234 + def get_term_writer(self):
1.235 +
1.236 + "Return a term dictionary writer for the current partition."
1.237 +
1.238 + tdf = open(join(self.pathname, "terms-%d" % self.dict_partition), "wb")
1.239 + info_writer = TermWriter(tdf)
1.240 +
1.241 + tdif = open(join(self.pathname, "index-%d" % self.dict_partition), "wb")
1.242 + index_writer = TermIndexWriter(tdif)
1.243 +
1.244 + tpf = open(join(self.pathname, "positions-%d" % self.dict_partition), "wb")
1.245 + positions_writer = PositionWriter(tpf)
1.246 +
1.247 + return TermDictionaryWriter(info_writer, index_writer, positions_writer, self.interval)
1.248 +
1.249 + def get_field_writer(self):
1.250 +
1.251 + "Return a field dictionary writer for the current partition."
1.252 +
1.253 + ff = open(join(self.pathname, "fields-%d" % self.field_dict_partition), "wb")
1.254 + field_writer = FieldWriter(ff)
1.255 +
1.256 + fif = open(join(self.pathname, "fields_index-%d" % self.field_dict_partition), "wb")
1.257 + field_index_writer = FieldIndexWriter(fif)
1.258 +
1.259 + return FieldDictionaryWriter(field_writer, field_index_writer, self.interval)
1.260 +
1.261 + def flush_terms(self):
1.262 +
1.263 + "Flush terms into the current term dictionary partition."
1.264
1.265 # Get the terms in order.
1.266
1.267 terms = self.terms.items()
1.268 terms.sort()
1.269
1.270 + dict_writer = self.get_term_writer()
1.271 +
1.272 for term, doc_positions in terms:
1.273 doc_positions = doc_positions.items()
1.274 doc_positions.sort()
1.275 - self.dict_writer.write_term_positions(term, doc_positions)
1.276 + dict_writer.write_term_positions(term, doc_positions)
1.277 +
1.278 + dict_writer.close()
1.279
1.280 - self.dict_writer.close()
1.281 - self.dict_writer = None
1.282 + self.terms = {}
1.283 + self.dict_partition += 1
1.284 +
1.285 + def flush_fields(self):
1.286 +
1.287 + "Flush fields into the current term dictionary partition."
1.288
1.289 # Get the documents in order.
1.290
1.291 docs = self.docs.items()
1.292 docs.sort()
1.293
1.294 + field_dict_writer = self.get_field_writer()
1.295 +
1.296 for docnum, fields in docs:
1.297 - self.field_dict_writer.write_fields(docnum, fields)
1.298 + field_dict_writer.write_fields(docnum, fields)
1.299 +
1.300 + field_dict_writer.close()
1.301
1.302 - self.field_dict_writer.close()
1.303 - self.field_dict_writer = None
1.304 + self.docs = {}
1.305 + self.field_dict_partition += 1
1.306 +
1.307 + def close(self):
1.308 + if self.terms:
1.309 + self.flush_terms()
1.310 + if self.docs:
1.311 + self.flush_fields()
1.312
1.313 class IndexReader:
1.314
1.315 "Accessing the term and field dictionaries."
1.316
1.317 - def __init__(self, dict_reader, field_dict_reader):
1.318 - self.dict_reader = dict_reader
1.319 - self.field_dict_reader = field_dict_reader
1.320 + def __init__(self, pathname, partition=0):
1.321 + self.pathname = pathname
1.322 + self.dict_reader = self.get_term_reader(partition)
1.323 + self.field_dict_reader = self.get_field_reader(partition)
1.324 +
1.325 + def get_term_reader(self, partition):
1.326 + tdf = open(join(self.pathname, "terms-%d" % partition), "rb")
1.327 + info_reader = TermReader(tdf)
1.328 +
1.329 + tdif = open(join(self.pathname, "index-%d" % partition), "rb")
1.330 + index_reader = TermIndexReader(tdif)
1.331 +
1.332 + tpf = open(join(self.pathname, "positions-%d" % partition), "rb")
1.333 + positions_reader = PositionReader(tpf)
1.334 +
1.335 + return TermDictionaryReader(info_reader, index_reader, positions_reader)
1.336 +
1.337 + def get_field_reader(self, partition):
1.338 + ff = open(join(self.pathname, "fields-%d" % partition), "rb")
1.339 + field_reader = FieldReader(ff)
1.340 +
1.341 + fif = open(join(self.pathname, "fields_index-%d" % partition), "rb")
1.342 + field_index_reader = FieldIndexReader(fif)
1.343 +
1.344 + return FieldDictionaryReader(field_reader, field_index_reader)
1.345
1.346 def find_positions(self, term):
1.347 return self.dict_reader.find_positions(term)
1.348 @@ -858,63 +1034,41 @@
1.349 self.reader = None
1.350 self.writer = None
1.351
1.352 - def get_writer(self, interval=INTERVAL):
1.353 + def get_writer(self, interval=INTERVAL, flush_interval=FLUSH_INTERVAL):
1.354
1.355 - "Return a writer, optionally using the given indexing 'interval'."
1.356 + """
1.357 + Return a writer, optionally using the given indexing 'interval' and
1.358 + 'flush_interval'.
1.359 + """
1.360
1.361 if not exists(self.pathname):
1.362 mkdir(self.pathname)
1.363
1.364 - tdf = open(join(self.pathname, "terms"), "wb")
1.365 - info_writer = TermWriter(tdf)
1.366 -
1.367 - tdif = open(join(self.pathname, "index"), "wb")
1.368 - index_writer = TermIndexWriter(tdif)
1.369 -
1.370 - tpf = open(join(self.pathname, "positions"), "wb")
1.371 - positions_writer = PositionWriter(tpf)
1.372 -
1.373 - dict_writer = TermDictionaryWriter(info_writer, index_writer, positions_writer, interval)
1.374 -
1.375 - ff = open(join(self.pathname, "fields"), "wb")
1.376 - field_writer = FieldWriter(ff)
1.377 -
1.378 - fif = open(join(self.pathname, "fields_index"), "wb")
1.379 - field_index_writer = FieldIndexWriter(fif)
1.380 -
1.381 - field_dict_writer = FieldDictionaryWriter(field_writer, field_index_writer, interval)
1.382 -
1.383 - self.writer = IndexWriter(dict_writer, field_dict_writer)
1.384 + self.writer = IndexWriter(self.pathname, interval, flush_interval)
1.385 return self.writer
1.386
1.387 - def get_reader(self):
1.388 + def get_reader(self, partition=0):
1.389
1.390 "Return a reader for the index."
1.391
1.392 if not exists(self.pathname):
1.393 raise OSError, "Index path %r does not exist." % self.pathname
1.394
1.395 - tdf = open(join(self.pathname, "terms"), "rb")
1.396 - info_reader = TermReader(tdf)
1.397 + self.reader = IndexReader(self.pathname, partition)
1.398 + return self.reader
1.399
1.400 - tdif = open(join(self.pathname, "index"), "rb")
1.401 - index_reader = TermIndexReader(tdif)
1.402 + def merge_terms(self):
1.403
1.404 - tpf = open(join(self.pathname, "positions"), "rb")
1.405 - positions_reader = PositionReader(tpf)
1.406 -
1.407 - dict_reader = TermDictionaryReader(info_reader, index_reader, positions_reader)
1.408 + "Merge term dictionaries."
1.409
1.410 - ff = open(join(self.pathname, "fields"), "rb")
1.411 - field_reader = FieldReader(ff)
1.412 + readers = []
1.413
1.414 - fif = open(join(self.pathname, "fields_index"), "rb")
1.415 - field_index_reader = FieldIndexReader(fif)
1.416 + for filename in os.listdir(self.pathname):
1.417 + if filename.startswith("terms-"): # 6 character prefix
1.418 + partition = int(filename[6:])
1.419 + readers.append(self.get_reader(partition))
1.420
1.421 - field_dict_reader = FieldDictionaryReader(field_reader, field_index_reader)
1.422 -
1.423 - self.reader = IndexReader(dict_reader, field_dict_reader)
1.424 - return self.reader
1.425 + # NOTE: Make a distinct new writer/index.
1.426
1.427 def close(self):
1.428 if self.reader is not None: