1.1 --- a/iixr/index.py Sat Feb 12 01:23:58 2011 +0100
1.2 +++ b/iixr/index.py Sun Feb 13 02:49:55 2011 +0100
1.3 @@ -19,18 +19,14 @@
1.4 """
1.5
1.6 from iixr.filesystem import *
1.7 -from iixr.merging import *
1.8 -from itertools import islice
1.9 +from itermerge import itermerge
1.10 from os import mkdir # index discovery
1.11 from os.path import exists
1.12 +import operator
1.13
1.14 # Constants.
1.15
1.16 -TERM_INTERVAL = 100
1.17 -DOCUMENT_INTERVAL = 100
1.18 -FIELD_INTERVAL = 100
1.19 FLUSH_INTERVAL = 10000
1.20 -POSITIONS_FLUSH_INTERVAL = 1000000
1.21 OPEN_PARTITIONS = 20
1.22
1.23 # High-level classes.
1.24 @@ -39,11 +35,9 @@
1.25
1.26 "A container of document information."
1.27
1.28 - def __init__(self, docnum, fields=None):
1.29 + def __init__(self, docnum):
1.30 self.docnum = docnum
1.31 - self.fields = fields or []
1.32 self.terms = {}
1.33 - self.field_dict = None
1.34
1.35 def add_position(self, term, position):
1.36
1.37 @@ -54,55 +48,18 @@
1.38
1.39 self.terms.setdefault(term, []).append(position)
1.40
1.41 - def add_field(self, identifier, value):
1.42 -
1.43 - "Add a field having the given 'identifier' and 'value'."
1.44 -
1.45 - self.fields.append((identifier, unicode(value))) # convert to string
1.46 -
1.47 - def set_fields(self, fields):
1.48 -
1.49 - """
1.50 - Set the document's 'fields': a list of tuples each containing an integer
1.51 - identifier and a string value.
1.52 - """
1.53 -
1.54 - self.fields = fields
1.55 -
1.56 - def _ensure_dict(self):
1.57 - if self.field_dict is None:
1.58 - self.field_dict = dict(self.fields)
1.59 -
1.60 - def keys(self):
1.61 - self._ensure_dict()
1.62 - return self.field_dict.keys()
1.63 -
1.64 - def __getitem__(self, key):
1.65 - self._ensure_dict()
1.66 - return self.field_dict[key]
1.67 -
1.68 class IndexWriter:
1.69
1.70 - """
1.71 - Building term information and writing it to the term and field dictionaries.
1.72 - """
1.73 + "Building term information and writing it to the term dictionary."
1.74
1.75 - def __init__(self, pathname, interval, doc_interval, field_interval, flush_interval, positions_flush_interval):
1.76 + def __init__(self, pathname, flush_interval):
1.77 self.pathname = pathname
1.78 - self.interval = interval
1.79 - self.doc_interval = doc_interval
1.80 - self.field_interval = field_interval
1.81 self.flush_interval = flush_interval
1.82 - self.positions_flush_interval = positions_flush_interval
1.83
1.84 - self.dict_partition = get_next_partition(get_term_partitions(self.pathname))
1.85 - self.field_dict_partition = get_next_partition(get_field_partitions(self.pathname))
1.86 + self.term_partition = get_next_partition(get_term_partitions(self.pathname))
1.87
1.88 self.terms = {}
1.89 - self.docs = []
1.90 -
1.91 self.doc_counter = 0
1.92 - self.position_counter = 0
1.93
1.94 def add_document(self, doc):
1.95
1.96 @@ -115,134 +72,105 @@
1.97
1.98 for term, positions in doc.terms.items():
1.99 self.terms.setdefault(term, {})[docnum] = positions
1.100 - self.position_counter += len(positions)
1.101 -
1.102 - self.docs.append((docnum, doc.fields))
1.103
1.104 self.doc_counter += 1
1.105
1.106 - if self.flush_interval and self.doc_counter >= self.flush_interval or \
1.107 - self.positions_flush_interval and self.position_counter >= self.positions_flush_interval:
1.108 -
1.109 + if self.flush_interval and self.doc_counter >= self.flush_interval:
1.110 self.flush_terms()
1.111 - self.flush_fields()
1.112 self.doc_counter = 0
1.113 - self.position_counter = 0
1.114
1.115 def get_term_writer(self):
1.116
1.117 - "Return a term dictionary writer for the current partition."
1.118 -
1.119 - return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval)
1.120 + "Return a term writer for the current partition."
1.121
1.122 - def get_field_writer(self):
1.123 -
1.124 - "Return a field dictionary writer for the current partition."
1.125 -
1.126 - return get_field_writer(self.pathname, self.field_dict_partition, self.field_interval)
1.127 + return get_term_writer(self.pathname, self.term_partition)
1.128
1.129 def flush_terms(self):
1.130
1.131 - "Flush terms into the current term dictionary partition."
1.132 + "Flush terms into the current term partition."
1.133
1.134 # Get the terms in order.
1.135
1.136 - all_terms = self.terms
1.137 - terms = all_terms.keys()
1.138 - terms.sort()
1.139 -
1.140 - dict_writer = self.get_term_writer()
1.141 -
1.142 - for term in terms:
1.143 - doc_positions = all_terms[term].items()
1.144 - dict_writer.write_term_positions(term, doc_positions)
1.145 -
1.146 - dict_writer.close()
1.147 + term_writer = self.get_term_writer()
1.148 + try:
1.149 + term_writer.write_terms(self.terms)
1.150 + finally:
1.151 + term_writer.close()
1.152
1.153 self.terms = {}
1.154 - self.dict_partition += 1
1.155 -
1.156 - def flush_fields(self):
1.157 -
1.158 - "Flush fields into the current term dictionary partition."
1.159 -
1.160 - # Get the documents in order.
1.161 -
1.162 - self.docs.sort()
1.163 -
1.164 - field_dict_writer = self.get_field_writer()
1.165 - for docnum, fields in self.docs:
1.166 - field_dict_writer.write_fields(docnum, fields)
1.167 - field_dict_writer.close()
1.168 -
1.169 - self.docs = []
1.170 - self.field_dict_partition += 1
1.171 + self.term_partition += 1
1.172
1.173 def close(self):
1.174 if self.terms or not get_term_partitions(self.pathname):
1.175 self.flush_terms()
1.176 - if self.docs or not get_field_partitions(self.pathname):
1.177 - self.flush_fields()
1.178 +
1.179 +class IndexReader(itermerge):
1.180 +
1.181 + "Accessing the term dictionaries."
1.182
1.183 -class IndexReader:
1.184 + def __init__(self, pathname, get_reader=None, combine=None):
1.185
1.186 - "Accessing the term and field dictionaries."
1.187 + # Get the partitions in order.
1.188 +
1.189 + partitions = list(get_term_partitions(pathname))
1.190 + partitions.sort()
1.191
1.192 - def __init__(self, pathname):
1.193 - self.dict_reader = get_term_reader(pathname, "merged")
1.194 - self.field_dict_reader = get_field_reader(pathname, "merged")
1.195 + # Initialise the underlying term partition readers.
1.196
1.197 - # Sequential access.
1.198 + self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions]
1.199 + self.combine = combine or operator.add
1.200 +
1.201 + # Initialise this object as an iterator over the readers.
1.202
1.203 - def read_term(self):
1.204 - return self.dict_reader.read_term()
1.205 + itermerge.__init__(self, self.readers)
1.206 + self.next_value = None
1.207
1.208 - def go_to_term(self, term):
1.209 - return self.dict_reader._get_term_and_positions(*self.dict_reader.go_to_term(term))
1.210 + def get_sizes(self):
1.211
1.212 - # Query access.
1.213 + # Readers must have compatible sizes.
1.214
1.215 - def get_terms(self):
1.216 - return self.dict_reader.get_terms()
1.217 -
1.218 - def find_terms(self, term):
1.219 - return self.dict_reader.find_terms(term)
1.220 + if self.readers:
1.221 + return self.readers[0].get_sizes()
1.222 + else:
1.223 + return 0, 0
1.224
1.225 - def find_positions(self, term):
1.226 - return self.dict_reader.find_positions(term)
1.227 + def next(self):
1.228 + if self.next_value is not None:
1.229 + term, positions = self.next_value
1.230 + else:
1.231 + term, positions = itermerge.next(self)
1.232
1.233 - def find_common_positions(self, terms):
1.234 - return self.dict_reader.find_common_positions(terms)
1.235 + # Look at the next item to see if it is has positions for the current
1.236 + # term.
1.237
1.238 - def get_frequency(self, term):
1.239 - return self.dict_reader.get_frequency(term)
1.240 -
1.241 - def get_document_frequency(self, term):
1.242 - return self.dict_reader.get_document_frequency(term)
1.243 + try:
1.244 + t, p = itermerge.next(self)
1.245 + while t == term:
1.246 + positions = self.combine(positions, p)
1.247 + t, p = itermerge.next(self)
1.248 + self.next_value = t, p
1.249
1.250 - def get_fields(self, docnum):
1.251 - return self.field_dict_reader.get_fields(docnum)
1.252 + # Where an item could not be fetched, cause future requests to fail.
1.253
1.254 - def get_document(self, docnum):
1.255 - return Document(docnum, self.get_fields(docnum))
1.256 + except StopIteration:
1.257 + self.next_value = None
1.258 +
1.259 + return term, positions
1.260
1.261 def close(self):
1.262 - self.dict_reader.close()
1.263 - self.field_dict_reader.close()
1.264 + for reader in self.readers:
1.265 + reader.close()
1.266 + self.readers = []
1.267
1.268 class Index:
1.269
1.270 "An inverted index solution encapsulating the various components."
1.271
1.272 - def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL,
1.273 - flush_interval=FLUSH_INTERVAL, positions_flush_interval=POSITIONS_FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS):
1.274 + def __init__(self, pathname, flush_interval=FLUSH_INTERVAL,
1.275 + open_partitions=OPEN_PARTITIONS):
1.276
1.277 self.pathname = pathname
1.278 - self.interval = interval
1.279 - self.doc_interval = doc_interval
1.280 - self.field_interval = field_interval
1.281 self.flush_interval = flush_interval
1.282 - self.positions_flush_interval = positions_flush_interval
1.283 self.open_partitions = open_partitions
1.284 self.reader = None
1.285 self.writer = None
1.286 @@ -251,132 +179,60 @@
1.287
1.288 "Return a writer."
1.289
1.290 - self._ensure_directory()
1.291 - self.writer = IndexWriter(self.pathname, self.interval, self.doc_interval,
1.292 - self.field_interval, self.flush_interval, self.positions_flush_interval)
1.293 + if self.writer is None:
1.294 + self._ensure_directory()
1.295 + self.writer = IndexWriter(self.pathname, self.flush_interval)
1.296 return self.writer
1.297
1.298 def _ensure_directory(self):
1.299 if not exists(self.pathname):
1.300 mkdir(self.pathname)
1.301
1.302 - def get_reader(self, partition=0):
1.303 -
1.304 - "Return a reader for the index."
1.305 -
1.306 - # Ensure that only one partition exists.
1.307 -
1.308 - self.merge()
1.309 - return self._get_reader(partition)
1.310 -
1.311 - def _get_reader(self, partition):
1.312 + def get_reader(self, refresh=0):
1.313
1.314 "Return a reader for the index."
1.315
1.316 - if not exists(self.pathname):
1.317 - raise OSError, "Index path %r does not exist." % self.pathname
1.318 -
1.319 - self.reader = IndexReader(self.pathname)
1.320 - return self.reader
1.321 -
1.322 - def get_term_partitions(self):
1.323 + if refresh and self.reader is not None:
1.324 + self.reader.close()
1.325 + self.reader = None
1.326
1.327 - "Return a set of term partition identifiers."
1.328 -
1.329 - return get_term_partitions(self.pathname)
1.330 -
1.331 - def get_field_partitions(self):
1.332 -
1.333 - "Return a set of field partition identifiers."
1.334 -
1.335 - return get_field_partitions(self.pathname)
1.336 + if self.reader is None:
1.337 + if not exists(self.pathname):
1.338 + raise OSError, "Index path %r does not exist." % self.pathname
1.339 + self.reader = IndexReader(self.pathname)
1.340 + return self.reader
1.341
1.342 def merge(self):
1.343
1.344 - "Merge/optimise index partitions."
1.345 -
1.346 - self._merge_terms()
1.347 - self._merge_fields()
1.348 -
1.349 - def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals):
1.350 -
1.351 - "Merge term or field dictionaries."
1.352 -
1.353 - partitions = get_partitions()
1.354 -
1.355 - # Ensure the correct labelling of a single partition.
1.356 -
1.357 - if len(partitions) == 1:
1.358 - partition = list(partitions)[0]
1.359 - if partition != "merged":
1.360 - rename_files(self.pathname, partition, "merged")
1.361 - return
1.362 + "Merge the partitions in the index."
1.363
1.364 - # Merge the partitions.
1.365 -
1.366 - old_merged_counter = 0
1.367 -
1.368 - while len(partitions) > 1:
1.369 -
1.370 - if "merged" in partitions:
1.371 - rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter)
1.372 - partitions.remove("merged")
1.373 - partitions.add("old-merged-%d" % old_merged_counter)
1.374 - old_merged_counter += 1
1.375 -
1.376 - # Process only a certain number at once, avoiding resource limits.
1.377 -
1.378 - active_partitions = list(islice(partitions, self.open_partitions))
1.379 -
1.380 - readers = []
1.381 - for partition in active_partitions:
1.382 - readers.append(get_reader(self.pathname, partition))
1.383 -
1.384 - # Write directly to a dictionary.
1.385 + reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data)
1.386 + writer = get_term_writer(self.pathname, "merged")
1.387 + try:
1.388 + writer.begin(*reader.get_sizes())
1.389 + for term, data in reader:
1.390 + writer.write_term_plus_remaining(term, data)
1.391 + writer.end_record()
1.392 + finally:
1.393 + writer.close()
1.394 + reader.close()
1.395
1.396 - writer = get_writer(self.pathname, "merged", *intervals)
1.397 - merger = get_merger(writer, readers)
1.398 - merger.merge()
1.399 - merger.close()
1.400 -
1.401 - # Remove old files.
1.402 -
1.403 - for partition in active_partitions:
1.404 - remove_files(self.pathname, partition)
1.405 + for partition in get_term_partitions(self.pathname):
1.406 + remove_term_files(self.pathname, partition)
1.407
1.408 - # Acquire the partitions to check their number again.
1.409 -
1.410 - partitions = get_partitions()
1.411 -
1.412 - def _merge_terms(self):
1.413 + rename_term_files(self.pathname, "merged", 0)
1.414
1.415 - "Merge term dictionaries."
1.416 -
1.417 - self._merge_dictionaries(self.get_term_partitions, rename_term_files,
1.418 - remove_term_files, get_term_reader, get_term_writer,
1.419 - TermDictionaryMerger, [self.interval, self.doc_interval])
1.420 + def merge_data(self, a, b):
1.421
1.422 - def _merge_fields(self):
1.423 -
1.424 - "Merge field dictionaries."
1.425 -
1.426 - self._merge_dictionaries(self.get_field_partitions, rename_field_files,
1.427 - remove_field_files, get_field_reader, get_field_writer,
1.428 - FieldDictionaryMerger, [self.field_interval])
1.429 -
1.430 - def update(self, other_indexes):
1.431 + """
1.432 + Merge 'a' and 'b', modifying the data to permit concatenation.
1.433 + """
1.434
1.435 - "Copy the content of the 'other_indexes' into this index and merge."
1.436 -
1.437 - self._ensure_directory()
1.438 + # Modify the record to indicate a continuation of the data.
1.439
1.440 - for i, index in enumerate(other_indexes):
1.441 - for partition in index.get_term_partitions():
1.442 - copy_term_files(index.pathname, partition, self.pathname, "-added-%d" % i)
1.443 - for partition in index.get_field_partitions():
1.444 - copy_field_files(index.pathname, partition, self.pathname, "-added-%d" % i)
1.445 -
1.446 - self.merge()
1.447 + c = a + b
1.448 + c[len(a) - 1] = 1
1.449 + return c
1.450
1.451 def close(self):
1.452 if self.reader is not None: