1.1 --- a/iixr/files.py Sun Feb 13 02:49:55 2011 +0100
1.2 +++ b/iixr/files.py Mon Feb 14 00:44:57 2011 +0100
1.3 @@ -49,7 +49,6 @@
1.4
1.5 def seek(self, offset):
1.6 self.f.seek(offset)
1.7 - self.reset()
1.8
1.9 def rewind(self):
1.10 self.seek(self.data_start)
2.1 --- a/iixr/filesystem.py Sun Feb 13 02:49:55 2011 +0100
2.2 +++ b/iixr/filesystem.py Mon Feb 14 00:44:57 2011 +0100
2.3 @@ -30,7 +30,7 @@
2.4
2.5 # Constants.
2.6
2.7 -TERM_FILENAMES = "terms",
2.8 +TERM_FILENAMES = "terms", "term-index"
2.9
2.10 # Utility functions.
2.11
2.12 @@ -63,6 +63,10 @@
2.13 def get_next_partition(partitions):
2.14 return max(partitions or [-1]) + 1
2.15
2.16 +def get_writer(pathname, name, partition, cls):
2.17 + f = open(join(pathname, "%s-%s" % (name, partition)), "wb")
2.18 + return cls(f)
2.19 +
2.20 def get_term_writer(pathname, partition):
2.21
2.22 """
2.23 @@ -70,8 +74,16 @@
2.24 according to the given 'partition'.
2.25 """
2.26
2.27 - f = open(join(pathname, "terms-%s" % partition), "wb")
2.28 - return TermWriter(f)
2.29 + return get_writer(pathname, "terms", partition, TermWriter)
2.30 +
2.31 +def get_term_index_writer(pathname, partition):
2.32 +
2.33 + """
2.34 + Return a term index writer using files under the given 'pathname' labelled
2.35 + according to the given 'partition'.
2.36 + """
2.37 +
2.38 + return get_writer(pathname, "term-index", partition, TermIndexWriter)
2.39
2.40 def get_reader(pathname, name, partition, cls):
2.41 f = open(join(pathname, "%s-%s" % (name, partition)), "rb")
2.42 @@ -95,11 +107,32 @@
2.43
2.44 return get_reader(pathname, "terms", partition, TermDataIterator)
2.45
2.46 +def get_term_index_reader(pathname, partition):
2.47 +
2.48 + """
2.49 + Return a term index reader using files under the given 'pathname' labelled
2.50 + according to the given 'partition'.
2.51 + """
2.52 +
2.53 + return get_reader(pathname, "term-index", partition, TermIndexIterator)
2.54 +
2.55 +def get_combined_term_reader(pathname, partition):
2.56 +
2.57 + """
2.58 + Return a combined term reader using files under the given 'pathname'
2.59 + labelled according to the given 'partition'.
2.60 + """
2.61 +
2.62 + return CombinedIterator(get_term_reader(pathname, partition), get_term_index_reader(pathname, partition))
2.63 +
2.64 # Renaming.
2.65
2.66 def rename_files(pathname, names, from_partition, to_partition):
2.67 for name in names:
2.68 - rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition)))
2.69 + try:
2.70 + rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition)))
2.71 + except OSError:
2.72 + pass
2.73
2.74 def rename_term_files(pathname, from_partition, to_partition):
2.75 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition)
2.76 @@ -108,7 +141,10 @@
2.77
2.78 def remove_files(pathname, names, partition):
2.79 for name in names:
2.80 - remove(join(pathname, "%s-%s" % (name, partition)))
2.81 + try:
2.82 + remove(join(pathname, "%s-%s" % (name, partition)))
2.83 + except OSError:
2.84 + pass
2.85
2.86 def remove_term_files(pathname, partition):
2.87 remove_files(pathname, TERM_FILENAMES, partition)
2.88 @@ -118,7 +154,10 @@
2.89 def copy_files(source, names, partition, destination, suffix):
2.90 for name in names:
2.91 filename = "%s-%s" % (name, partition)
2.92 - copy(join(source, filename), join(destination, filename + suffix))
2.93 + try:
2.94 + copy(join(source, filename), join(destination, filename + suffix))
2.95 + except OSError:
2.96 + pass
2.97
2.98 def copy_term_files(source, partition, destination, suffix):
2.99 copy_files(source, TERM_FILENAMES, partition, destination, suffix)
3.1 --- a/iixr/index.py Sun Feb 13 02:49:55 2011 +0100
3.2 +++ b/iixr/index.py Mon Feb 14 00:44:57 2011 +0100
3.3 @@ -19,14 +19,15 @@
3.4 """
3.5
3.6 from iixr.filesystem import *
3.7 +from iixr.terms import MultipleReader
3.8 from itermerge import itermerge
3.9 from os import mkdir # index discovery
3.10 from os.path import exists
3.11 -import operator
3.12
3.13 # Constants.
3.14
3.15 FLUSH_INTERVAL = 10000
3.16 +INDEX_INTERVAL = 1000
3.17 OPEN_PARTITIONS = 20
3.18
3.19 # High-level classes.
3.20 @@ -104,64 +105,6 @@
3.21 if self.terms or not get_term_partitions(self.pathname):
3.22 self.flush_terms()
3.23
3.24 -class IndexReader(itermerge):
3.25 -
3.26 - "Accessing the term dictionaries."
3.27 -
3.28 - def __init__(self, pathname, get_reader=None, combine=None):
3.29 -
3.30 - # Get the partitions in order.
3.31 -
3.32 - partitions = list(get_term_partitions(pathname))
3.33 - partitions.sort()
3.34 -
3.35 - # Initialise the underlying term partition readers.
3.36 -
3.37 - self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions]
3.38 - self.combine = combine or operator.add
3.39 -
3.40 - # Initialise this object as an iterator over the readers.
3.41 -
3.42 - itermerge.__init__(self, self.readers)
3.43 - self.next_value = None
3.44 -
3.45 - def get_sizes(self):
3.46 -
3.47 - # Readers must have compatible sizes.
3.48 -
3.49 - if self.readers:
3.50 - return self.readers[0].get_sizes()
3.51 - else:
3.52 - return 0, 0
3.53 -
3.54 - def next(self):
3.55 - if self.next_value is not None:
3.56 - term, positions = self.next_value
3.57 - else:
3.58 - term, positions = itermerge.next(self)
3.59 -
3.60 - # Look at the next item to see if it is has positions for the current
3.61 - # term.
3.62 -
3.63 - try:
3.64 - t, p = itermerge.next(self)
3.65 - while t == term:
3.66 - positions = self.combine(positions, p)
3.67 - t, p = itermerge.next(self)
3.68 - self.next_value = t, p
3.69 -
3.70 - # Where an item could not be fetched, cause future requests to fail.
3.71 -
3.72 - except StopIteration:
3.73 - self.next_value = None
3.74 -
3.75 - return term, positions
3.76 -
3.77 - def close(self):
3.78 - for reader in self.readers:
3.79 - reader.close()
3.80 - self.readers = []
3.81 -
3.82 class Index:
3.83
3.84 "An inverted index solution encapsulating the various components."
3.85 @@ -188,6 +131,19 @@
3.86 if not exists(self.pathname):
3.87 mkdir(self.pathname)
3.88
3.89 + def _get_readers(self, get_reader):
3.90 +
3.91 + "Return a list of underlying readers given the 'get_reader' function."
3.92 +
3.93 + # Get the partitions in order.
3.94 +
3.95 + partitions = list(get_term_partitions(self.pathname))
3.96 + partitions.sort()
3.97 +
3.98 + # Return the readers.
3.99 +
3.100 + return [get_reader(self.pathname, partition) for partition in partitions]
3.101 +
3.102 def get_reader(self, refresh=0):
3.103
3.104 "Return a reader for the index."
3.105 @@ -199,21 +155,45 @@
3.106 if self.reader is None:
3.107 if not exists(self.pathname):
3.108 raise OSError, "Index path %r does not exist." % self.pathname
3.109 - self.reader = IndexReader(self.pathname)
3.110 +
3.111 + # Try and get combined readers.
3.112 +
3.113 + try:
3.114 + readers = self._get_readers(get_combined_term_reader)
3.115 + except IOError:
3.116 + readers = self._get_readers(get_term_reader)
3.117 +
3.118 + self.reader = MultipleReader(readers)
3.119 +
3.120 return self.reader
3.121
3.122 - def merge(self):
3.123 + def merge(self, interval=INDEX_INTERVAL):
3.124
3.125 "Merge the partitions in the index."
3.126
3.127 - reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data)
3.128 + # Get data readers.
3.129 +
3.130 + readers = self._get_readers(get_term_data_reader)
3.131 + reader = MultipleReader(readers, self.merge_data)
3.132 writer = get_term_writer(self.pathname, "merged")
3.133 + index_writer = get_term_index_writer(self.pathname, "merged")
3.134 +
3.135 try:
3.136 writer.begin(*reader.get_sizes())
3.137 + index_writer.begin()
3.138 +
3.139 + i = 0
3.140 for term, data in reader:
3.141 + if i % interval == 0:
3.142 + index_writer.write_term(term, writer.tell())
3.143 + index_writer.end_record()
3.144 +
3.145 writer.write_term_plus_remaining(term, data)
3.146 writer.end_record()
3.147 + i += 1
3.148 +
3.149 finally:
3.150 + index_writer.close()
3.151 writer.close()
3.152 reader.close()
3.153
3.154 @@ -221,6 +201,8 @@
3.155 remove_term_files(self.pathname, partition)
3.156
3.157 rename_term_files(self.pathname, "merged", 0)
3.158 + self.reader = None
3.159 + self.writer = None
3.160
3.161 def merge_data(self, a, b):
3.162
4.1 --- a/iixr/terms.py Sun Feb 13 02:49:55 2011 +0100
4.2 +++ b/iixr/terms.py Mon Feb 14 00:44:57 2011 +0100
4.3 @@ -20,8 +20,10 @@
4.4
4.5 from iixr.data import *
4.6 from iixr.files import *
4.7 -from iixr.phrases import PhraseIterator
4.8 +from itermerge import itermerge
4.9 from os.path import commonprefix # to find common string prefixes
4.10 +from bisect import bisect_right, insort_right
4.11 +import operator
4.12
4.13 class TermWriter(FileWriter):
4.14
4.15 @@ -251,13 +253,91 @@
4.16
4.17 return doc_positions
4.18
4.19 -class TermIterator(TermReader):
4.20 +# Indexes covering the information files.
4.21 +
4.22 +class TermIndexWriter(FileWriter):
4.23 +
4.24 + "Writing term index information to files."
4.25 +
4.26 + def begin(self):
4.27 +
4.28 + "Begin writing to the file."
4.29 +
4.30 + self.data_start = self.tell()
4.31 + self.last_term = ""
4.32 + self.last_offset = 0
4.33 +
4.34 + def write_term(self, term, offset):
4.35 +
4.36 + "Write the given 'term' and 'offset'."
4.37 +
4.38 + if term <= self.last_term:
4.39 + raise ValueError, "Term %r precedes the previous term %r." % (term, self.last_term)
4.40 +
4.41 + # Write the prefix length and term suffix.
4.42 +
4.43 + common = len(commonprefix([self.last_term, term]))
4.44 + suffix = term[common:]
4.45 +
4.46 + self.write_number(common)
4.47 + self.write_string(suffix)
4.48 +
4.49 + # Write the offset delta.
4.50 +
4.51 + self.write_number(offset - self.last_offset)
4.52 +
4.53 + self.last_term = term
4.54 + self.last_offset = offset
4.55 +
4.56 +class TermIndexReader(FileReader):
4.57
4.58 - "An iterator over terms and positions read from a file."
4.59 + "Reading term index information to files."
4.60 +
4.61 + def begin(self):
4.62 +
4.63 + "Begin reading from the file."
4.64 +
4.65 + self.data_start = self.tell()
4.66 + self.last_term = ""
4.67 + self.last_offset = 0
4.68 +
4.69 + def read_term(self):
4.70 +
4.71 + "Read a term and an offset from the file."
4.72 +
4.73 + # Read the prefix length and term suffix.
4.74 +
4.75 + common = self.read_number()
4.76 + suffix = self.read_string()
4.77 +
4.78 + self.last_term = self.last_term[:common] + suffix
4.79 +
4.80 + # Read the offset delta.
4.81 +
4.82 + self.last_offset += self.read_number()
4.83 + return self.last_term, self.last_offset
4.84 +
4.85 +# Iterator support classes.
4.86 +
4.87 +class Iterator:
4.88 +
4.89 + "Common iterator support."
4.90 +
4.91 + def go_to_term(self, term):
4.92 + t, dp = self.next()
4.93 + while t < term:
4.94 + t, dp = self.next()
4.95 + return t, dp
4.96
4.97 def __iter__(self):
4.98 return self
4.99
4.100 +# External reading classes.
4.101 +
4.102 +class TermIterator(TermReader, Iterator):
4.103 +
4.104 + "An iterator over terms and positions read from a file."
4.105 +
4.106 def next(self):
4.107 try:
4.108 self.begin_record()
4.109 @@ -265,7 +345,7 @@
4.110 except EOFError:
4.111 raise StopIteration
4.112
4.113 -class TermDataIterator(TermReader):
4.114 +class TermDataIterator(TermReader, Iterator):
4.115
4.116 "An iterator over terms and unprocessed document positions data."
4.117
4.118 @@ -279,4 +359,148 @@
4.119 except EOFError:
4.120 raise StopIteration
4.121
4.122 +class TermIndexIterator(TermIndexReader):
4.123 +
4.124 + "An iterator over terms and offsets read from a file."
4.125 +
4.126 + def __iter__(self):
4.127 + return self
4.128 +
4.129 + def next(self):
4.130 + try:
4.131 + self.begin_record()
4.132 + return self.read_term()
4.133 + except EOFError:
4.134 + raise StopIteration
4.135 +
4.136 +class CombinedIterator:
4.137 +
4.138 + "An iterator providing index and information file access."
4.139 +
4.140 + def __init__(self, reader, index_reader):
4.141 + self.reader = reader
4.142 + self.index_reader = index_reader
4.143 + self.records = list(index_reader)
4.144 +
4.145 + def go_to_term(self, term):
4.146 +
4.147 + # Get the record providing a term less than or equal to the requested
4.148 + # term, getting the first entry if no such records exist.
4.149 +
4.150 + i = max(0, bisect_right(self.records, (term, None)) - 1)
4.151 + t, offset = self.records[i]
4.152 +
4.153 + # Seek to the corresponding record in the information file.
4.154 +
4.155 + self.reader.seek(offset)
4.156 +
4.157 + # Where the found term is equal or greater, just read the positions for
4.158 + # the index entry.
4.159 +
4.160 + if t >= term:
4.161 +
4.162 + # Skip the term information, overwrite the reader's state, and get
4.163 + # the positions.
4.164 +
4.165 + self.reader.begin_record()
4.166 + self.reader.read_term_only()
4.167 + self.reader.last_term = t
4.168 +
4.169 + return t, self.reader.read_positions()
4.170 +
4.171 + # Where the found term is less, use the information file to find the
4.172 + # term or the one after.
4.173 +
4.174 + else:
4.175 +
4.176 + # Overwrite the reader's state, then scan for the term.
4.177 +
4.178 + self.reader.last_term = t
4.179 + t, dp = self.reader.next()
4.180 + while t < term:
4.181 + t, dp = self.reader.next()
4.182 +
4.183 + return t, dp
4.184 +
4.185 + def __iter__(self):
4.186 + return self
4.187 +
4.188 + def next(self):
4.189 + return self.reader.next()
4.190 +
4.191 + def close(self):
4.192 + if self.reader is not None:
4.193 + self.reader.close()
4.194 + self.reader = None
4.195 + if self.index_reader is not None:
4.196 + self.index_reader.close()
4.197 + self.index_reader = None
4.198 +
4.199 +class MultipleReader(itermerge):
4.200 +
4.201 + "Accessing many term readers at once."
4.202 +
4.203 + def __init__(self, readers, combine=None):
4.204 +
4.205 + """
4.206 + Initialise a master index reader using underlying 'readers' and a
4.207 + 'combine' function which knows how to combine position information from
4.208 + different sources.
4.209 + """
4.210 +
4.211 + self.readers = readers
4.212 + self.combine = combine or operator.add
4.213 +
4.214 + # Initialise this object as an iterator over the readers.
4.215 +
4.216 + itermerge.__init__(self, self.readers)
4.217 + self.next_value = None
4.218 +
4.219 + def get_sizes(self):
4.220 +
4.221 + # Readers must have compatible sizes.
4.222 +
4.223 + if self.readers:
4.224 + return self.readers[0].get_sizes()
4.225 + else:
4.226 + return 0, 0
4.227 +
4.228 + def go_to_term(self, term):
4.229 + self.iters = []
4.230 + for reader in self.readers:
4.231 + try:
4.232 + insort_right(self.iters, (reader.go_to_term(term), reader.next))
4.233 + except StopIteration:
4.234 + pass
4.235 + self.next_value = None
4.236 + return self.next()
4.237 +
4.238 + def next(self):
4.239 + if self.next_value is not None:
4.240 + term, positions = self.next_value
4.241 + else:
4.242 + term, positions = itermerge.next(self)
4.243 +
4.244 + # Look at the next item to see if it is has positions for the current
4.245 + # term.
4.246 +
4.247 + try:
4.248 + t, p = itermerge.next(self)
4.249 + while t == term:
4.250 + positions = self.combine(positions, p)
4.251 + t, p = itermerge.next(self)
4.252 + self.next_value = t, p
4.253 +
4.254 + # Where an item could not be fetched, cause future requests to fail.
4.255 +
4.256 + except StopIteration:
4.257 + self.next_value = None
4.258 +
4.259 + return term, positions
4.260 +
4.261 + def close(self):
4.262 + for reader in self.readers:
4.263 + reader.close()
4.264 + self.readers = []
4.265 +
4.266 # vim: tabstop=4 expandtab shiftwidth=4
5.1 --- a/itermerge.py Sun Feb 13 02:49:55 2011 +0100
5.2 +++ b/itermerge.py Mon Feb 14 00:44:57 2011 +0100
5.3 @@ -74,8 +74,11 @@
5.4 def next(self):
5.5 if self.iters:
5.6 value, next = self.iters[0]
5.7 - del self.iters[0]
5.8 - self._add_next(next)
5.9 + if len(self.iters) > 1:
5.10 + del self.iters[0]
5.11 + self._add_next(next)
5.12 + else:
5.13 + self.iters[0] = next(), next
5.14 return value
5.15 else:
5.16 raise StopIteration
6.1 --- a/test.py Sun Feb 13 02:49:55 2011 +0100
6.2 +++ b/test.py Mon Feb 14 00:44:57 2011 +0100
6.3 @@ -200,7 +200,7 @@
6.4 print "- Test merge."
6.5
6.6 l1 = list(index.get_reader())
6.7 -index.merge()
6.8 +index.merge(3)
6.9 l2 = list(index.get_reader(1))
6.10
6.11 for (t1, dp1), (t2, dp2) in zip(l1, l2):