1.1 --- a/iixr/index.py Sun Feb 13 02:49:55 2011 +0100
1.2 +++ b/iixr/index.py Mon Feb 14 00:44:57 2011 +0100
1.3 @@ -19,14 +19,15 @@
1.4 """
1.5
1.6 from iixr.filesystem import *
1.7 +from iixr.terms import MultipleReader
1.8 from itermerge import itermerge
1.9 from os import mkdir # index discovery
1.10 from os.path import exists
1.11 -import operator
1.12
1.13 # Constants.
1.14
1.15 FLUSH_INTERVAL = 10000
1.16 +INDEX_INTERVAL = 1000
1.17 OPEN_PARTITIONS = 20
1.18
1.19 # High-level classes.
1.20 @@ -104,64 +105,6 @@
1.21 if self.terms or not get_term_partitions(self.pathname):
1.22 self.flush_terms()
1.23
1.24 -class IndexReader(itermerge):
1.25 -
1.26 - "Accessing the term dictionaries."
1.27 -
1.28 - def __init__(self, pathname, get_reader=None, combine=None):
1.29 -
1.30 - # Get the partitions in order.
1.31 -
1.32 - partitions = list(get_term_partitions(pathname))
1.33 - partitions.sort()
1.34 -
1.35 - # Initialise the underlying term partition readers.
1.36 -
1.37 - self.readers = [(get_reader or get_term_reader)(pathname, partition) for partition in partitions]
1.38 - self.combine = combine or operator.add
1.39 -
1.40 - # Initialise this object as an iterator over the readers.
1.41 -
1.42 - itermerge.__init__(self, self.readers)
1.43 - self.next_value = None
1.44 -
1.45 - def get_sizes(self):
1.46 -
1.47 - # Readers must have compatible sizes.
1.48 -
1.49 - if self.readers:
1.50 - return self.readers[0].get_sizes()
1.51 - else:
1.52 - return 0, 0
1.53 -
1.54 - def next(self):
1.55 - if self.next_value is not None:
1.56 - term, positions = self.next_value
1.57 - else:
1.58 - term, positions = itermerge.next(self)
1.59 -
1.60 - # Look at the next item to see if it is has positions for the current
1.61 - # term.
1.62 -
1.63 - try:
1.64 - t, p = itermerge.next(self)
1.65 - while t == term:
1.66 - positions = self.combine(positions, p)
1.67 - t, p = itermerge.next(self)
1.68 - self.next_value = t, p
1.69 -
1.70 - # Where an item could not be fetched, cause future requests to fail.
1.71 -
1.72 - except StopIteration:
1.73 - self.next_value = None
1.74 -
1.75 - return term, positions
1.76 -
1.77 - def close(self):
1.78 - for reader in self.readers:
1.79 - reader.close()
1.80 - self.readers = []
1.81 -
1.82 class Index:
1.83
1.84 "An inverted index solution encapsulating the various components."
1.85 @@ -188,6 +131,19 @@
1.86 if not exists(self.pathname):
1.87 mkdir(self.pathname)
1.88
1.89 + def _get_readers(self, get_reader):
1.90 +
1.91 + "Return a list of underlying readers given the 'get_reader' function."
1.92 +
1.93 + # Get the partitions in order.
1.94 +
1.95 + partitions = list(get_term_partitions(self.pathname))
1.96 + partitions.sort()
1.97 +
1.98 + # Return the readers.
1.99 +
1.100 + return [get_reader(self.pathname, partition) for partition in partitions]
1.101 +
1.102 def get_reader(self, refresh=0):
1.103
1.104 "Return a reader for the index."
1.105 @@ -199,21 +155,45 @@
1.106 if self.reader is None:
1.107 if not exists(self.pathname):
1.108 raise OSError, "Index path %r does not exist." % self.pathname
1.109 - self.reader = IndexReader(self.pathname)
1.110 +
1.111 + # Try and get combined readers.
1.112 +
1.113 + try:
1.114 + readers = self._get_readers(get_combined_term_reader)
1.115 + except IOError:
1.116 + readers = self._get_readers(get_term_reader)
1.117 +
1.118 + self.reader = MultipleReader(readers)
1.119 +
1.120 return self.reader
1.121
1.122 - def merge(self):
1.123 + def merge(self, interval=INDEX_INTERVAL):
1.124
1.125 "Merge the partitions in the index."
1.126
1.127 - reader = IndexReader(self.pathname, get_term_data_reader, self.merge_data)
1.128 + # Get data readers.
1.129 +
1.130 + readers = self._get_readers(get_term_data_reader)
1.131 + reader = MultipleReader(readers, self.merge_data)
1.132 writer = get_term_writer(self.pathname, "merged")
1.133 + index_writer = get_term_index_writer(self.pathname, "merged")
1.134 +
1.135 try:
1.136 writer.begin(*reader.get_sizes())
1.137 + index_writer.begin()
1.138 +
1.139 + i = 0
1.140 for term, data in reader:
1.141 + if i % interval == 0:
1.142 + index_writer.write_term(term, writer.tell())
1.143 + index_writer.end_record()
1.144 +
1.145 writer.write_term_plus_remaining(term, data)
1.146 writer.end_record()
1.147 + i += 1
1.148 +
1.149 finally:
1.150 + index_writer.close()
1.151 writer.close()
1.152 reader.close()
1.153
1.154 @@ -221,6 +201,8 @@
1.155 remove_term_files(self.pathname, partition)
1.156
1.157 rename_term_files(self.pathname, "merged", 0)
1.158 + self.reader = None
1.159 + self.writer = None
1.160
1.161 def merge_data(self, a, b):
1.162