iixr

Changeset

97:cf159c14882a
2011-02-14 Paul Boddie raw files shortlog changelog graph Moved the IndexReader to the terms module, renaming it to MultipleReader. Introduced information file indexing in order to support go_to_term methods which can seek to terms or nearest following neighbours. Removed a redundant reset method invocation from the files module. Added an optimisation in itermerge where only one iterator is being managed.
iixr/files.py (file) iixr/filesystem.py (file) iixr/index.py (file) iixr/terms.py (file) itermerge.py (file) test.py (file)
     1.1 --- a/iixr/files.py	Sun Feb 13 02:49:55 2011 +0100
     1.2 +++ b/iixr/files.py	Mon Feb 14 00:44:57 2011 +0100
     1.3 @@ -49,7 +49,6 @@
     1.4  
     1.5      def seek(self, offset):
     1.6          self.f.seek(offset)
     1.7 -        self.reset()
     1.8  
     1.9      def rewind(self):
    1.10          self.seek(self.data_start)
     2.1 --- a/iixr/filesystem.py	Sun Feb 13 02:49:55 2011 +0100
     2.2 +++ b/iixr/filesystem.py	Mon Feb 14 00:44:57 2011 +0100
     2.3 @@ -30,7 +30,7 @@
     2.4  
     2.5  # Constants.
     2.6  
     2.7 -TERM_FILENAMES    = "terms",
     2.8 +TERM_FILENAMES    = "terms", "term-index"
     2.9  
    2.10  # Utility functions.
    2.11  
    2.12 @@ -63,6 +63,10 @@
    2.13  def get_next_partition(partitions):
    2.14      return max(partitions or [-1]) + 1
    2.15  
    2.16 +def get_writer(pathname, name, partition, cls):
    2.17 +    f = open(join(pathname, "%s-%s" % (name, partition)), "wb")
    2.18 +    return cls(f)
    2.19 +
    2.20  def get_term_writer(pathname, partition):
    2.21  
    2.22      """
    2.23 @@ -70,8 +74,16 @@
    2.24      according to the given 'partition'.
    2.25      """
    2.26  
    2.27 -    f = open(join(pathname, "terms-%s" % partition), "wb")
    2.28 -    return TermWriter(f)
    2.29 +    return get_writer(pathname, "terms", partition, TermWriter)
    2.30 +
    2.31 +def get_term_index_writer(pathname, partition):
    2.32 +
    2.33 +    """
    2.34 +    Return a term index writer using files under the given 'pathname' labelled
    2.35 +    according to the given 'partition'.
    2.36 +    """
    2.37 +
    2.38 +    return get_writer(pathname, "term-index", partition, TermIndexWriter)
    2.39  
    2.40  def get_reader(pathname, name, partition, cls):
    2.41      f = open(join(pathname, "%s-%s" % (name, partition)), "rb")
    2.42 @@ -95,11 +107,32 @@
    2.43  
    2.44      return get_reader(pathname, "terms", partition, TermDataIterator)
    2.45  
    2.46 +def get_term_index_reader(pathname, partition):
    2.47 +
    2.48 +    """
    2.49 +    Return a term index reader using files under the given 'pathname' labelled
    2.50 +    according to the given 'partition'.
    2.51 +    """
    2.52 +
    2.53 +    return get_reader(pathname, "term-index", partition, TermIndexIterator)
    2.54 +
    2.55 +def get_combined_term_reader(pathname, partition):
    2.56 +
    2.57 +    """
    2.58 +    Return a combined term reader using files under the given 'pathname'
    2.59 +    labelled according to the given 'partition'.
    2.60 +    """
    2.61 +
    2.62 +    return CombinedIterator(get_term_reader(pathname, partition), get_term_index_reader(pathname, partition))
    2.63 +
    2.64  # Renaming.
    2.65  
    2.66  def rename_files(pathname, names, from_partition, to_partition):
    2.67      for name in names:
    2.68 -        rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition)))
    2.69 +        try:
    2.70 +            rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition)))
    2.71 +        except OSError:
    2.72 +            pass
    2.73  
    2.74  def rename_term_files(pathname, from_partition, to_partition):
    2.75      rename_files(pathname, TERM_FILENAMES, from_partition, to_partition)
    2.76 @@ -108,7 +141,10 @@
    2.77  
    2.78  def remove_files(pathname, names, partition):
    2.79      for name in names:
    2.80 -        remove(join(pathname, "%s-%s" % (name, partition)))
    2.81 +        try:
    2.82 +            remove(join(pathname, "%s-%s" % (name, partition)))
    2.83 +        except OSError:
    2.84 +            pass
    2.85  
    2.86  def remove_term_files(pathname, partition):
    2.87      remove_files(pathname, TERM_FILENAMES, partition)
    2.88 @@ -118,7 +154,10 @@
    2.89  def copy_files(source, names, partition, destination, suffix):
    2.90      for name in names:
    2.91          filename = "%s-%s" % (name, partition)
    2.92 -        copy(join(source, filename), join(destination, filename + suffix))
    2.93 +        try:
    2.94 +            copy(join(source, filename), join(destination, filename + suffix))
    2.95 +        except OSError:
    2.96 +            pass
    2.97  
    2.98  def copy_term_files(source, partition, destination, suffix):
    2.99      copy_files(source, TERM_FILENAMES, partition, destination, suffix)
     3.1 --- a/iixr/index.py	Sun Feb 13 02:49:55 2011 +0100
     3.2 +++ b/iixr/index.py	Mon Feb 14 00:44:57 2011 +0100
     3.3 @@ -19,14 +19,15 @@
     3.4  """
     3.5  
     3.6  from iixr.filesystem import *
     3.7 +from iixr.terms import MultipleReader
     3.8  from itermerge import itermerge
     3.9  from os import mkdir    # index discovery
    3.10  from os.path import exists
    3.11 -import operator
    3.12  
    3.13  # Constants.
    3.14  
    3.15  FLUSH_INTERVAL    = 10000
    3.16 +INDEX_INTERVAL    = 1000
    3.17  OPEN_PARTITIONS   = 20
    3.18  
    3.19  # High-level classes.
    3.20 @@ -104,64 +105,6 @@
    3.21          if self.terms or not get_term_partitions(self.pathname):
    3.22              self.flush_terms()
    3.23  
    3.24 -class IndexReader(itermerge):
    3.25 -
    3.26 -    "Accessing the term dictionaries."
    3.27 -
    3.28 -    def __init__(self, pathname, get_reader=None, combine=None):
    3.29 -
    3.30 -        # Get the partitions in order.
    3.31 -
    3.32 -        partitions = list(get_term_partitions(pathname))
    3.33 -        partitions.sort()
    3.34 -
    3.35 -        # Initialise the underlying term partition readers.
    3.36 -
    3.37 -        self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions]
    3.38 -        self.combine = combine or operator.add
    3.39 -
    3.40 -        # Initialise this object as an iterator over the readers.
    3.41 -
    3.42 -        itermerge.__init__(self, self.readers)
    3.43 -        self.next_value = None
    3.44 -
    3.45 -    def get_sizes(self):
    3.46 -
    3.47 -        # Readers must have compatible sizes.
    3.48 -
    3.49 -        if self.readers:
    3.50 -            return self.readers[0].get_sizes()
    3.51 -        else:
    3.52 -            return 0, 0
    3.53 -
    3.54 -    def next(self):
    3.55 -        if self.next_value is not None:
    3.56 -            term, positions = self.next_value
    3.57 -        else:
    3.58 -            term, positions = itermerge.next(self)
    3.59 -
    3.60 -        # Look at the next item to see if it is has positions for the current
    3.61 -        # term.
    3.62 -
    3.63 -        try:
    3.64 -            t, p = itermerge.next(self)
    3.65 -            while t == term:
    3.66 -                positions = self.combine(positions, p)
    3.67 -                t, p = itermerge.next(self)
    3.68 -            self.next_value = t, p
    3.69 -
    3.70 -        # Where an item could not be fetched, cause future requests to fail.
    3.71 -
    3.72 -        except StopIteration:
    3.73 -            self.next_value = None
    3.74 -
    3.75 -        return term, positions
    3.76 -
    3.77 -    def close(self):
    3.78 -        for reader in self.readers:
    3.79 -            reader.close()
    3.80 -        self.readers = []
    3.81 -
    3.82  class Index:
    3.83  
    3.84      "An inverted index solution encapsulating the various components."
    3.85 @@ -188,6 +131,19 @@
    3.86          if not exists(self.pathname):
    3.87              mkdir(self.pathname)
    3.88  
    3.89 +    def _get_readers(self, get_reader):
    3.90 +
    3.91 +        "Return a list of underlying readers given the 'get_reader' function."
    3.92 +
    3.93 +        # Get the partitions in order.
    3.94 +
    3.95 +        partitions = list(get_term_partitions(self.pathname))
    3.96 +        partitions.sort()
    3.97 +
    3.98 +        # Return the readers.
    3.99 +
   3.100 +        return [get_reader(self.pathname, partition) for partition in partitions]
   3.101 +
   3.102      def get_reader(self, refresh=0):
   3.103  
   3.104          "Return a reader for the index."
   3.105 @@ -199,21 +155,45 @@
   3.106          if self.reader is None:
   3.107              if not exists(self.pathname):
   3.108                  raise OSError, "Index path %r does not exist." % self.pathname
   3.109 -            self.reader = IndexReader(self.pathname)
   3.110 +
   3.111 +            # Try and get combined readers.
   3.112 +
   3.113 +            try:
   3.114 +                readers = self._get_readers(get_combined_term_reader)
   3.115 +            except IOError:
   3.116 +                readers = self._get_readers(get_term_reader)
   3.117 +
   3.118 +            self.reader = MultipleReader(readers)
   3.119 +
   3.120          return self.reader
   3.121  
   3.122 -    def merge(self):
   3.123 +    def merge(self, interval=INDEX_INTERVAL):
   3.124  
   3.125          "Merge the partitions in the index."
   3.126  
   3.127 -        reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data)
   3.128 +        # Get data readers.
   3.129 +
   3.130 +        readers = self._get_readers(get_term_data_reader)
   3.131 +        reader = MultipleReader(readers, self.merge_data)
   3.132          writer = get_term_writer(self.pathname, "merged")
   3.133 +        index_writer = get_term_index_writer(self.pathname, "merged")
   3.134 +
   3.135          try:
   3.136              writer.begin(*reader.get_sizes())
   3.137 +            index_writer.begin()
   3.138 +
   3.139 +            i = 0
   3.140              for term, data in reader:
   3.141 +                if i % interval == 0:
   3.142 +                    index_writer.write_term(term, writer.tell())
   3.143 +                    index_writer.end_record()
   3.144 +
   3.145                  writer.write_term_plus_remaining(term, data)
   3.146                  writer.end_record()
   3.147 +                i += 1
   3.148 +
   3.149          finally:
   3.150 +            index_writer.close()
   3.151              writer.close()
   3.152              reader.close()
   3.153  
   3.154 @@ -221,6 +201,8 @@
   3.155              remove_term_files(self.pathname, partition)
   3.156  
   3.157          rename_term_files(self.pathname, "merged", 0)
   3.158 +        self.reader = None
   3.159 +        self.writer = None
   3.160  
   3.161      def merge_data(self, a, b):
   3.162  
     4.1 --- a/iixr/terms.py	Sun Feb 13 02:49:55 2011 +0100
     4.2 +++ b/iixr/terms.py	Mon Feb 14 00:44:57 2011 +0100
     4.3 @@ -20,8 +20,10 @@
     4.4  
     4.5  from iixr.data import *
     4.6  from iixr.files import *
     4.7 -from iixr.phrases import PhraseIterator
     4.8 +from itermerge import itermerge
     4.9  from os.path import commonprefix # to find common string prefixes
    4.10 +from bisect import bisect_right, insort_right
    4.11 +import operator
    4.12  
    4.13  class TermWriter(FileWriter):
    4.14  
    4.15 @@ -251,13 +253,91 @@
    4.16  
    4.17          return doc_positions
    4.18  
    4.19 -class TermIterator(TermReader):
    4.20 +# Indexes covering the information files.
    4.21 +
    4.22 +class TermIndexWriter(FileWriter):
    4.23 +
    4.24 +    "Writing term index information to files."
    4.25 +
    4.26 +    def begin(self):
    4.27 +
    4.28 +        "Begin writing to the file."
    4.29 +
    4.30 +        self.data_start = self.tell()
    4.31 +        self.last_term = ""
    4.32 +        self.last_offset = 0
    4.33 +
    4.34 +    def write_term(self, term, offset):
    4.35 +
    4.36 +        "Write the given 'term' and 'offset'."
    4.37 +
    4.38 +        if term <= self.last_term:
    4.39 +            raise ValueError, "Term %r precedes the previous term %r." % (term, self.last_term)
    4.40 +
    4.41 +        # Write the prefix length and term suffix.
    4.42 +
    4.43 +        common = len(commonprefix([self.last_term, term]))
    4.44 +        suffix = term[common:]
    4.45 +
    4.46 +        self.write_number(common)
    4.47 +        self.write_string(suffix)
    4.48 +
    4.49 +        # Write the offset delta.
    4.50 +
    4.51 +        self.write_number(offset - self.last_offset)
    4.52 +
    4.53 +        self.last_term = term
    4.54 +        self.last_offset = offset
    4.55 +
    4.56 +class TermIndexReader(FileReader):
    4.57  
    4.58 -    "An iterator over terms and positions read from a file."
    4.59 +    "Reading term index information to files."
    4.60 +
    4.61 +    def begin(self):
    4.62 +
    4.63 +        "Begin reading from the file."
    4.64 +
    4.65 +        self.data_start = self.tell()
    4.66 +        self.last_term = ""
    4.67 +        self.last_offset = 0
    4.68 +
    4.69 +    def read_term(self):
    4.70 +
    4.71 +        "Read a term and an offset from the file."
    4.72 +
    4.73 +        # Read the prefix length and term suffix.
    4.74 +
    4.75 +        common = self.read_number()
    4.76 +        suffix = self.read_string()
    4.77 +
    4.78 +        self.last_term = self.last_term[:common] + suffix
    4.79 +
    4.80 +        # Read the offset delta.
    4.81 +
    4.82 +        self.last_offset += self.read_number()
    4.83 +        return self.last_term, self.last_offset
    4.84 +
    4.85 +# Iterator support classes.
    4.86 +
    4.87 +class Iterator:
    4.88 +
    4.89 +    "Common iterator support."
    4.90 +
    4.91 +    def go_to_term(self, term):
    4.92 +        t, dp = self.next()
    4.93 +        while t < term:
    4.94 +            t, dp = self.next()
    4.95 +        return t, dp
    4.96  
    4.97      def __iter__(self):
    4.98          return self
    4.99  
   4.100 +# External reading classes.
   4.101 +
   4.102 +class TermIterator(TermReader, Iterator):
   4.103 +
   4.104 +    "An iterator over terms and positions read from a file."
   4.105 +
   4.106      def next(self):
   4.107          try:
   4.108              self.begin_record()
   4.109 @@ -265,7 +345,7 @@
   4.110          except EOFError:
   4.111              raise StopIteration
   4.112  
   4.113 -class TermDataIterator(TermReader):
   4.114 +class TermDataIterator(TermReader, Iterator):
   4.115  
   4.116      "An iterator over terms and unprocessed document positions data."
   4.117  
   4.118 @@ -279,4 +359,148 @@
   4.119          except EOFError:
   4.120              raise StopIteration
   4.121  
   4.122 +class TermIndexIterator(TermIndexReader):
   4.123 +
   4.124 +    "An iterator over terms and offsets read from a file."
   4.125 +
   4.126 +    def __iter__(self):
   4.127 +        return self
   4.128 +
   4.129 +    def next(self):
   4.130 +        try:
   4.131 +            self.begin_record()
   4.132 +            return self.read_term()
   4.133 +        except EOFError:
   4.134 +            raise StopIteration
   4.135 +
   4.136 +class CombinedIterator:
   4.137 +
   4.138 +    "An iterator providing index and information file access."
   4.139 +
   4.140 +    def __init__(self, reader, index_reader):
   4.141 +        self.reader = reader
   4.142 +        self.index_reader = index_reader
   4.143 +        self.records = list(index_reader)
   4.144 +
   4.145 +    def go_to_term(self, term):
   4.146 +
   4.147 +        # Get the record providing a term less than or equal to the requested
   4.148 +        # term, getting the first entry if no such records exist.
   4.149 +
   4.150 +        i = max(0, bisect_right(self.records, (term, None)) - 1)
   4.151 +        t, offset = self.records[i]
   4.152 +
   4.153 +        # Seek to the corresponding record in the information file.
   4.154 +
   4.155 +        self.reader.seek(offset)
   4.156 +
   4.157 +        # Where the found term is equal or greater, just read the positions for
   4.158 +        # the index entry.
   4.159 +
   4.160 +        if t >= term:
   4.161 +
   4.162 +            # Skip the term information, overwrite the reader's state, and get
   4.163 +            # the positions.
   4.164 +
   4.165 +            self.reader.begin_record()
   4.166 +            self.reader.read_term_only()
   4.167 +            self.reader.last_term = t
   4.168 +
   4.169 +            return t, self.reader.read_positions()
   4.170 +
   4.171 +        # Where the found term is less, use the information file to find the
   4.172 +        # term or the one after.
   4.173 +
   4.174 +        else:
   4.175 +
   4.176 +            # Overwrite the reader's state, then scan for the term.
   4.177 +
   4.178 +            self.reader.last_term = t
   4.179 +            t, dp = self.reader.next()
   4.180 +            while t < term:
   4.181 +                t, dp = self.reader.next()
   4.182 +
   4.183 +            return t, dp
   4.184 +
   4.185 +    def __iter__(self):
   4.186 +        return self
   4.187 +
   4.188 +    def next(self):
   4.189 +        return self.reader.next()
   4.190 +
   4.191 +    def close(self):
   4.192 +        if self.reader is not None:
   4.193 +            self.reader.close()
   4.194 +            self.reader = None
   4.195 +        if self.index_reader is not None:
   4.196 +            self.index_reader.close()
   4.197 +            self.index_reader = None
   4.198 +
   4.199 +class MultipleReader(itermerge):
   4.200 +
   4.201 +    "Accessing many term readers at once."
   4.202 +
   4.203 +    def __init__(self, readers, combine=None):
   4.204 +
   4.205 +        """
   4.206 +        Initialise a master index reader using underlying 'readers' and a
   4.207 +        'combine' function which knows how to combine position information from
   4.208 +        different sources.
   4.209 +        """
   4.210 +
   4.211 +        self.readers = readers
   4.212 +        self.combine = combine or operator.add
   4.213 +
   4.214 +        # Initialise this object as an iterator over the readers.
   4.215 +
   4.216 +        itermerge.__init__(self, self.readers)
   4.217 +        self.next_value = None
   4.218 +
   4.219 +    def get_sizes(self):
   4.220 +
   4.221 +        # Readers must have compatible sizes.
   4.222 +
   4.223 +        if self.readers:
   4.224 +            return self.readers[0].get_sizes()
   4.225 +        else:
   4.226 +            return 0, 0
   4.227 +
   4.228 +    def go_to_term(self, term):
   4.229 +        self.iters = []
   4.230 +        for reader in self.readers:
   4.231 +            try:
   4.232 +                insort_right(self.iters, (reader.go_to_term(term), reader.next))
   4.233 +            except StopIteration:
   4.234 +                pass
   4.235 +        self.next_value = None
   4.236 +        return self.next()
   4.237 +
   4.238 +    def next(self):
   4.239 +        if self.next_value is not None:
   4.240 +            term, positions = self.next_value
   4.241 +        else:
   4.242 +            term, positions = itermerge.next(self)
   4.243 +
   4.244 +        # Look at the next item to see if it is has positions for the current
   4.245 +        # term.
   4.246 +
   4.247 +        try:
   4.248 +            t, p = itermerge.next(self)
   4.249 +            while t == term:
   4.250 +                positions = self.combine(positions, p)
   4.251 +                t, p = itermerge.next(self)
   4.252 +            self.next_value = t, p
   4.253 +
   4.254 +        # Where an item could not be fetched, cause future requests to fail.
   4.255 +
   4.256 +        except StopIteration:
   4.257 +            self.next_value = None
   4.258 +
   4.259 +        return term, positions
   4.260 +
   4.261 +    def close(self):
   4.262 +        for reader in self.readers:
   4.263 +            reader.close()
   4.264 +        self.readers = []
   4.265 +
   4.266  # vim: tabstop=4 expandtab shiftwidth=4
     5.1 --- a/itermerge.py	Sun Feb 13 02:49:55 2011 +0100
     5.2 +++ b/itermerge.py	Mon Feb 14 00:44:57 2011 +0100
     5.3 @@ -74,8 +74,11 @@
     5.4      def next(self):
     5.5          if self.iters:
     5.6              value, next = self.iters[0]
     5.7 -            del self.iters[0]
     5.8 -            self._add_next(next)
     5.9 +            if len(self.iters) > 1:
    5.10 +                del self.iters[0]
    5.11 +                self._add_next(next)
    5.12 +            else:
    5.13 +                self.iters[0] = next(), next
    5.14              return value
    5.15          else:
    5.16              raise StopIteration
     6.1 --- a/test.py	Sun Feb 13 02:49:55 2011 +0100
     6.2 +++ b/test.py	Mon Feb 14 00:44:57 2011 +0100
     6.3 @@ -200,7 +200,7 @@
     6.4  print "- Test merge."
     6.5  
     6.6  l1 = list(index.get_reader())
     6.7 -index.merge()
     6.8 +index.merge(3)
     6.9  l2 = list(index.get_reader(1))
    6.10  
    6.11  for (t1, dp1), (t2, dp2) in zip(l1, l2):