# HG changeset patch # User Paul Boddie # Date 1297640697 -3600 # Node ID cf159c14882af2b676ca9c5d6580f62940e97573 # Parent 6542c54d115b72f837b31d24a08ba25c3bdd21f2 Moved the IndexReader to the terms module, renaming it to MultipleReader. Introduced information file indexing in order to support go_to_term methods which can seek to terms or nearest following neighbours. Removed a redundant reset method invocation from the files module. Added an optimisation in itermerge where only one iterator is being managed. diff -r 6542c54d115b -r cf159c14882a iixr/files.py --- a/iixr/files.py Sun Feb 13 02:49:55 2011 +0100 +++ b/iixr/files.py Mon Feb 14 00:44:57 2011 +0100 @@ -49,7 +49,6 @@ def seek(self, offset): self.f.seek(offset) - self.reset() def rewind(self): self.seek(self.data_start) diff -r 6542c54d115b -r cf159c14882a iixr/filesystem.py --- a/iixr/filesystem.py Sun Feb 13 02:49:55 2011 +0100 +++ b/iixr/filesystem.py Mon Feb 14 00:44:57 2011 +0100 @@ -30,7 +30,7 @@ # Constants. -TERM_FILENAMES = "terms", +TERM_FILENAMES = "terms", "term-index" # Utility functions. @@ -63,6 +63,10 @@ def get_next_partition(partitions): return max(partitions or [-1]) + 1 +def get_writer(pathname, name, partition, cls): + f = open(join(pathname, "%s-%s" % (name, partition)), "wb") + return cls(f) + def get_term_writer(pathname, partition): """ @@ -70,8 +74,16 @@ according to the given 'partition'. """ - f = open(join(pathname, "terms-%s" % partition), "wb") - return TermWriter(f) + return get_writer(pathname, "terms", partition, TermWriter) + +def get_term_index_writer(pathname, partition): + + """ + Return a term index writer using files under the given 'pathname' labelled + according to the given 'partition'. + """ + + return get_writer(pathname, "term-index", partition, TermIndexWriter) def get_reader(pathname, name, partition, cls): f = open(join(pathname, "%s-%s" % (name, partition)), "rb") @@ -95,11 +107,32 @@ return get_reader(pathname, "terms", partition, TermDataIterator) +def get_term_index_reader(pathname, partition): + + """ + Return a term index reader using files under the given 'pathname' labelled + according to the given 'partition'. + """ + + return get_reader(pathname, "term-index", partition, TermIndexIterator) + +def get_combined_term_reader(pathname, partition): + + """ + Return a combined term reader using files under the given 'pathname' + labelled according to the given 'partition'. + """ + + return CombinedIterator(get_term_reader(pathname, partition), get_term_index_reader(pathname, partition)) + # Renaming. def rename_files(pathname, names, from_partition, to_partition): for name in names: - rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) + try: + rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) + except OSError: + pass def rename_term_files(pathname, from_partition, to_partition): rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) @@ -108,7 +141,10 @@ def remove_files(pathname, names, partition): for name in names: - remove(join(pathname, "%s-%s" % (name, partition))) + try: + remove(join(pathname, "%s-%s" % (name, partition))) + except OSError: + pass def remove_term_files(pathname, partition): remove_files(pathname, TERM_FILENAMES, partition) @@ -118,7 +154,10 @@ def copy_files(source, names, partition, destination, suffix): for name in names: filename = "%s-%s" % (name, partition) - copy(join(source, filename), join(destination, filename + suffix)) + try: + copy(join(source, filename), join(destination, filename + suffix)) + except OSError: + pass def copy_term_files(source, partition, destination, suffix): copy_files(source, TERM_FILENAMES, partition, destination, suffix) diff -r 6542c54d115b -r cf159c14882a iixr/index.py --- a/iixr/index.py Sun Feb 13 02:49:55 2011 +0100 +++ b/iixr/index.py Mon Feb 14 00:44:57 2011 +0100 @@ -19,14 +19,15 @@ """ from iixr.filesystem import * +from iixr.terms import MultipleReader from itermerge import itermerge from os import mkdir # index discovery from os.path import exists -import operator # Constants. FLUSH_INTERVAL = 10000 +INDEX_INTERVAL = 1000 OPEN_PARTITIONS = 20 # High-level classes. @@ -104,64 +105,6 @@ if self.terms or not get_term_partitions(self.pathname): self.flush_terms() -class IndexReader(itermerge): - - "Accessing the term dictionaries." - - def __init__(self, pathname, get_reader=None, combine=None): - - # Get the partitions in order. - - partitions = list(get_term_partitions(pathname)) - partitions.sort() - - # Initialise the underlying term partition readers. - - self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions] - self.combine = combine or operator.add - - # Initialise this object as an iterator over the readers. - - itermerge.__init__(self, self.readers) - self.next_value = None - - def get_sizes(self): - - # Readers must have compatible sizes. - - if self.readers: - return self.readers[0].get_sizes() - else: - return 0, 0 - - def next(self): - if self.next_value is not None: - term, positions = self.next_value - else: - term, positions = itermerge.next(self) - - # Look at the next item to see if it is has positions for the current - # term. - - try: - t, p = itermerge.next(self) - while t == term: - positions = self.combine(positions, p) - t, p = itermerge.next(self) - self.next_value = t, p - - # Where an item could not be fetched, cause future requests to fail. - - except StopIteration: - self.next_value = None - - return term, positions - - def close(self): - for reader in self.readers: - reader.close() - self.readers = [] - class Index: "An inverted index solution encapsulating the various components." @@ -188,6 +131,19 @@ if not exists(self.pathname): mkdir(self.pathname) + def _get_readers(self, get_reader): + + "Return a list of underlying readers given the 'get_reader' function." + + # Get the partitions in order. + + partitions = list(get_term_partitions(self.pathname)) + partitions.sort() + + # Return the readers. + + return [get_reader(self.pathname, partition) for partition in partitions] + def get_reader(self, refresh=0): "Return a reader for the index." @@ -199,21 +155,45 @@ if self.reader is None: if not exists(self.pathname): raise OSError, "Index path %r does not exist." % self.pathname - self.reader = IndexReader(self.pathname) + + # Try and get combined readers. + + try: + readers = self._get_readers(get_combined_term_reader) + except IOError: + readers = self._get_readers(get_term_reader) + + self.reader = MultipleReader(readers) + return self.reader - def merge(self): + def merge(self, interval=INDEX_INTERVAL): "Merge the partitions in the index." - reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data) + # Get data readers. + + readers = self._get_readers(get_term_data_reader) + reader = MultipleReader(readers, self.merge_data) writer = get_term_writer(self.pathname, "merged") + index_writer = get_term_index_writer(self.pathname, "merged") + try: writer.begin(*reader.get_sizes()) + index_writer.begin() + + i = 0 for term, data in reader: + if i % interval == 0: + index_writer.write_term(term, writer.tell()) + index_writer.end_record() + writer.write_term_plus_remaining(term, data) writer.end_record() + i += 1 + finally: + index_writer.close() writer.close() reader.close() @@ -221,6 +201,8 @@ remove_term_files(self.pathname, partition) rename_term_files(self.pathname, "merged", 0) + self.reader = None + self.writer = None def merge_data(self, a, b): diff -r 6542c54d115b -r cf159c14882a iixr/terms.py --- a/iixr/terms.py Sun Feb 13 02:49:55 2011 +0100 +++ b/iixr/terms.py Mon Feb 14 00:44:57 2011 +0100 @@ -20,8 +20,10 @@ from iixr.data import * from iixr.files import * -from iixr.phrases import PhraseIterator +from itermerge import itermerge from os.path import commonprefix # to find common string prefixes +from bisect import bisect_right, insort_right +import operator class TermWriter(FileWriter): @@ -251,13 +253,91 @@ return doc_positions -class TermIterator(TermReader): +# Indexes covering the information files. + +class TermIndexWriter(FileWriter): + + "Writing term index information to files." + + def begin(self): + + "Begin writing to the file." + + self.data_start = self.tell() + self.last_term = "" + self.last_offset = 0 + + def write_term(self, term, offset): + + "Write the given 'term' and 'offset'." + + if term <= self.last_term: + raise ValueError, "Term %r precedes the previous term %r." % (term, self.last_term) + + # Write the prefix length and term suffix. + + common = len(commonprefix([self.last_term, term])) + suffix = term[common:] + + self.write_number(common) + self.write_string(suffix) + + # Write the offset delta. + + self.write_number(offset - self.last_offset) + + self.last_term = term + self.last_offset = offset + +class TermIndexReader(FileReader): - "An iterator over terms and positions read from a file." + "Reading term index information to files." + + def begin(self): + + "Begin reading from the file." + + self.data_start = self.tell() + self.last_term = "" + self.last_offset = 0 + + def read_term(self): + + "Read a term and an offset from the file." + + # Read the prefix length and term suffix. + + common = self.read_number() + suffix = self.read_string() + + self.last_term = self.last_term[:common] + suffix + + # Read the offset delta. + + self.last_offset += self.read_number() + return self.last_term, self.last_offset + +# Iterator support classes. + +class Iterator: + + "Common iterator support." + + def go_to_term(self, term): + t, dp = self.next() + while t < term: + t, dp = self.next() + return t, dp def __iter__(self): return self +# External reading classes. + +class TermIterator(TermReader, Iterator): + + "An iterator over terms and positions read from a file." + def next(self): try: self.begin_record() @@ -265,7 +345,7 @@ except EOFError: raise StopIteration -class TermDataIterator(TermReader): +class TermDataIterator(TermReader, Iterator): "An iterator over terms and unprocessed document positions data." @@ -279,4 +359,148 @@ except EOFError: raise StopIteration +class TermIndexIterator(TermIndexReader): + + "An iterator over terms and offsets read from a file." + + def __iter__(self): + return self + + def next(self): + try: + self.begin_record() + return self.read_term() + except EOFError: + raise StopIteration + +class CombinedIterator: + + "An iterator providing index and information file access." + + def __init__(self, reader, index_reader): + self.reader = reader + self.index_reader = index_reader + self.records = list(index_reader) + + def go_to_term(self, term): + + # Get the record providing a term less than or equal to the requested + # term, getting the first entry if no such records exist. + + i = max(0, bisect_right(self.records, (term, None)) - 1) + t, offset = self.records[i] + + # Seek to the corresponding record in the information file. + + self.reader.seek(offset) + + # Where the found term is equal or greater, just read the positions for + # the index entry. + + if t >= term: + + # Skip the term information, overwrite the reader's state, and get + # the positions. + + self.reader.begin_record() + self.reader.read_term_only() + self.reader.last_term = t + + return t, self.reader.read_positions() + + # Where the found term is less, use the information file to find the + # term or the one after. + + else: + + # Overwrite the reader's state, then scan for the term. + + self.reader.last_term = t + t, dp = self.reader.next() + while t < term: + t, dp = self.reader.next() + + return t, dp + + def __iter__(self): + return self + + def next(self): + return self.reader.next() + + def close(self): + if self.reader is not None: + self.reader.close() + self.reader = None + if self.index_reader is not None: + self.index_reader.close() + self.index_reader = None + +class MultipleReader(itermerge): + + "Accessing many term readers at once." + + def __init__(self, readers, combine=None): + + """ + Initialise a master index reader using underlying 'readers' and a + 'combine' function which knows how to combine position information from + different sources. + """ + + self.readers = readers + self.combine = combine or operator.add + + # Initialise this object as an iterator over the readers. + + itermerge.__init__(self, self.readers) + self.next_value = None + + def get_sizes(self): + + # Readers must have compatible sizes. + + if self.readers: + return self.readers[0].get_sizes() + else: + return 0, 0 + + def go_to_term(self, term): + self.iters = [] + for reader in self.readers: + try: + insort_right(self.iters, (reader.go_to_term(term), reader.next)) + except StopIteration: + pass + self.next_value = None + return self.next() + + def next(self): + if self.next_value is not None: + term, positions = self.next_value + else: + term, positions = itermerge.next(self) + + # Look at the next item to see if it is has positions for the current + # term. + + try: + t, p = itermerge.next(self) + while t == term: + positions = self.combine(positions, p) + t, p = itermerge.next(self) + self.next_value = t, p + + # Where an item could not be fetched, cause future requests to fail. + + except StopIteration: + self.next_value = None + + return term, positions + + def close(self): + for reader in self.readers: + reader.close() + self.readers = [] + # vim: tabstop=4 expandtab shiftwidth=4 diff -r 6542c54d115b -r cf159c14882a itermerge.py --- a/itermerge.py Sun Feb 13 02:49:55 2011 +0100 +++ b/itermerge.py Mon Feb 14 00:44:57 2011 +0100 @@ -74,8 +74,11 @@ def next(self): if self.iters: value, next = self.iters[0] - del self.iters[0] - self._add_next(next) + if len(self.iters) > 1: + del self.iters[0] + self._add_next(next) + else: + self.iters[0] = next(), next return value else: raise StopIteration diff -r 6542c54d115b -r cf159c14882a test.py --- a/test.py Sun Feb 13 02:49:55 2011 +0100 +++ b/test.py Mon Feb 14 00:44:57 2011 +0100 @@ -200,7 +200,7 @@ print "- Test merge." l1 = list(index.get_reader()) -index.merge() +index.merge(3) l2 = list(index.get_reader(1)) for (t1, dp1), (t2, dp2) in zip(l1, l2):