1.1 --- a/iixr.py Sun Aug 30 03:10:20 2009 +0200
1.2 +++ b/iixr.py Sun Aug 30 21:29:10 2009 +0200
1.3 @@ -51,6 +51,7 @@
1.4
1.5 def rewind(self):
1.6 self.f.seek(0)
1.7 + self.reset()
1.8
1.9 def close(self):
1.10 if self.f is not None:
1.11 @@ -321,11 +322,6 @@
1.12 information was written to the file.
1.13 """
1.14
1.15 - # Too long terms are not currently supported.
1.16 -
1.17 - if len(term) > 255:
1.18 - raise ValueError, "Term %r is too long." % term
1.19 -
1.20 # Write the prefix length and term suffix.
1.21
1.22 common = len(commonprefix([self.last_term, term]))
1.23 @@ -355,10 +351,6 @@
1.24 self.last_term = ""
1.25 self.last_offset = 0
1.26
1.27 - def rewind(self):
1.28 - self.reset()
1.29 - FileReader.rewind(self)
1.30 -
1.31 def read_term(self):
1.32
1.33 """
1.34 @@ -597,8 +589,9 @@
1.35 def write_fields(self, docnum, fields):
1.36
1.37 """
1.38 - Write for the given 'docnum', a list of 'fields' (strings representing
1.39 - field values). Return the offset at which the fields are stored.
1.40 + Write for the given 'docnum', a list of 'fields' (integer, string pairs
1.41 + representing field identifiers and values respectively).
1.42 + Return the offset at which the fields are stored.
1.43 """
1.44
1.45 offset = self.f.tell()
1.46 @@ -613,7 +606,8 @@
1.47
1.48 # Write the fields themselves.
1.49
1.50 - for field in fields:
1.51 + for i, field in fields:
1.52 + self.write_number(i)
1.53 self.write_string(field, 1) # compress
1.54
1.55 self.last_docnum = docnum
1.56 @@ -630,7 +624,7 @@
1.57
1.58 """
1.59 Read fields from the file, returning a tuple containing the document
1.60 - number and a list of field values.
1.61 + number and a list of field (identifier, value) pairs.
1.62 """
1.63
1.64 # Read the document number.
1.65 @@ -647,7 +641,9 @@
1.66 i = 0
1.67
1.68 while i < nfields:
1.69 - fields.append(self.read_string(1)) # decompress
1.70 + identifier = self.read_number()
1.71 + value = self.read_string(1) # decompress
1.72 + fields.append((identifier, value))
1.73 i += 1
1.74
1.75 return self.last_docnum, fields
1.76 @@ -751,7 +747,16 @@
1.77
1.78 self.max_offset = self.docs[-1][1]
1.79
1.80 - def read_fields(self, docnum):
1.81 + def rewind(self):
1.82 + self.field_reader.rewind()
1.83 +
1.84 + def read_fields(self):
1.85 +
1.86 + "Return the next document number and fields."
1.87 +
1.88 + return self.field_reader.read_fields()
1.89 +
1.90 + def get_fields(self, docnum):
1.91
1.92 "Read the fields of the document with the given 'docnum'."
1.93
1.94 @@ -789,15 +794,29 @@
1.95
1.96 # Dictionary merging classes.
1.97
1.98 -class TermDictionaryMerger:
1.99 +class Merger:
1.100
1.101 - "Merge position files."
1.102 + "Merge files."
1.103
1.104 def __init__(self, writer, readers):
1.105 self.writer = writer
1.106 self.readers = readers
1.107
1.108 + def close(self):
1.109 + for reader in self.readers:
1.110 + reader.close()
1.111 + self.writer.close()
1.112 +
1.113 +class TermDictionaryMerger(Merger):
1.114 +
1.115 + "Merge term and position files."
1.116 +
1.117 def merge(self):
1.118 +
1.119 + """
1.120 + Merge terms and positions from the readers, sending them to the writer.
1.121 + """
1.122 +
1.123 entries = []
1.124
1.125 # Get the first entries from the readers.
1.126 @@ -829,7 +848,7 @@
1.127 # For such entries, merge the positions.
1.128
1.129 if other_term == term:
1.130 - doc_positions += other_doc_positions
1.131 + self.merge_positions(doc_positions, other_doc_positions)
1.132 to_update.append(other_partition)
1.133 i += 1
1.134 else:
1.135 @@ -837,7 +856,6 @@
1.136
1.137 # Write the combined term details.
1.138
1.139 - doc_positions.sort()
1.140 self.writer.write_term_positions(term, doc_positions)
1.141
1.142 # Update the entries from the affected readers.
1.143 @@ -851,6 +869,124 @@
1.144 except EOFError:
1.145 pass
1.146
1.147 + def merge_positions(self, doc_positions, other_doc_positions):
1.148 +
1.149 + """
1.150 + Merge 'doc_positions' with 'other_doc_positions' so that common document
1.151 + records contain positions from both collections.
1.152 + """
1.153 +
1.154 + doc_position_dict = dict(doc_positions)
1.155 +
1.156 + for docnum, positions in other_doc_positions:
1.157 + if doc_position_dict.has_key(docnum):
1.158 + doc_position_dict[docnum] += positions
1.159 + doc_position_dict[docnum].sort()
1.160 + else:
1.161 + doc_position_dict[docnum] = positions
1.162 +
1.163 + doc_positions = doc_position_dict.items()
1.164 + return doc_positions
1.165 +
1.166 +class FieldDictionaryMerger(Merger):
1.167 +
1.168 + "Merge field files."
1.169 +
1.170 + def merge(self):
1.171 +
1.172 + """
1.173 + Merge fields from the readers, sending them to the writer.
1.174 + """
1.175 +
1.176 + entries = []
1.177 +
1.178 + # Get the first entries from the readers.
1.179 +
1.180 + for partition, reader in enumerate(self.readers):
1.181 + reader.rewind()
1.182 +
1.183 + try:
1.184 + docnum, fields = reader.read_fields()
1.185 + insort_right(entries, (docnum, fields, partition))
1.186 + except EOFError:
1.187 + pass
1.188 +
1.189 + # While entries are available, write them out in order, merging where
1.190 + # appropriate.
1.191 +
1.192 + while entries:
1.193 + docnum, fields, partition = entries[0]
1.194 + to_update = [partition]
1.195 +
1.196 + nentries = len(entries)
1.197 + i = 1
1.198 +
1.199 + # Find other entries for the term.
1.200 +
1.201 + while i < nentries:
1.202 + other_docnum, other_fields, other_partition = entries[i]
1.203 +
1.204 + # For such entries, merge the positions.
1.205 +
1.206 + if other_term == term:
1.207 + fields += other_fields
1.208 + to_update.append(other_partition)
1.209 + i += 1
1.210 + else:
1.211 + break
1.212 +
1.213 + # Write the combined term details.
1.214 +
1.215 + self.writer.write_fields(docnum, fields)
1.216 +
1.217 + # Update the entries from the affected readers.
1.218 +
1.219 + del entries[:i]
1.220 +
1.221 + for partition in to_update:
1.222 + try:
1.223 + docnum, fields = self_readers[partition].read_fields()
1.224 + insort_right(entries, (docnum, fields, partition))
1.225 + except EOFError:
1.226 + pass
1.227 +
1.228 +# Utility functions.
1.229 +
1.230 +def get_term_writer(pathname, partition, interval):
1.231 +
1.232 + """
1.233 + Return a term dictionary writer using files under the given 'pathname'
1.234 + labelled according to the given 'partition', using the given indexing
1.235 + 'interval'.
1.236 + """
1.237 +
1.238 + tdf = open(join(pathname, "terms-%s" % partition), "wb")
1.239 + info_writer = TermWriter(tdf)
1.240 +
1.241 + tdif = open(join(pathname, "index-%s" % partition), "wb")
1.242 + index_writer = TermIndexWriter(tdif)
1.243 +
1.244 + tpf = open(join(pathname, "positions-%s" % partition), "wb")
1.245 + positions_writer = PositionWriter(tpf)
1.246 +
1.247 + return TermDictionaryWriter(info_writer, index_writer, positions_writer, interval)
1.248 +
1.249 +def get_field_writer(pathname, partition, interval):
1.250 +
1.251 + """
1.252 + Return a field dictionary writer using files under the given 'pathname'
1.253 + labelled according to the given 'partition', using the given indexing
1.254 + 'interval'.
1.255 + """
1.256 +
1.257 + ff = open(join(pathname, "fields-%s" % partition), "wb")
1.258 + field_writer = FieldWriter(ff)
1.259 +
1.260 + fif = open(join(pathname, "fields_index-%s" % partition), "wb")
1.261 + field_index_writer = FieldIndexWriter(fif)
1.262 +
1.263 + return FieldDictionaryWriter(field_writer, field_index_writer, interval)
1.264 +
1.265 # High-level classes.
1.266
1.267 class IndexWriter:
1.268 @@ -893,48 +1029,38 @@
1.269 doc.append(position)
1.270
1.271 self.position_counter += 1
1.272 - if self.flush_threshold and self.position_counter >= self.flush_threshold:
1.273 + if self.flush_interval and self.position_counter >= self.flush_interval:
1.274 self.flush_terms()
1.275
1.276 - def add_fields(self, docnum, fields):
1.277 + def add_field(self, docnum, identifier, value):
1.278
1.279 - "Add for the document with the given 'docnum' a list of 'fields'."
1.280 + """
1.281 + Add for the document with the given 'docnum' a field having the given
1.282 + 'identifier' and 'value'.
1.283 + """
1.284
1.285 if not self.docs.has_key(docnum):
1.286 - doc_fields = self.docs[docnum] = fields
1.287 + doc_fields = self.docs[docnum] = []
1.288 else:
1.289 - self.docs[docnum] += fields
1.290 + doc_fields = self.docs[docnum]
1.291
1.292 - self.field_counter += len(fields)
1.293 - if self.flush_threshold and self.field_counter >= self.flush_threshold:
1.294 + doc_fields.append((identifier, value))
1.295 +
1.296 + self.field_counter += 1
1.297 + if self.flush_interval and self.field_counter >= self.flush_interval:
1.298 self.flush_fields()
1.299
1.300 def get_term_writer(self):
1.301
1.302 "Return a term dictionary writer for the current partition."
1.303
1.304 - tdf = open(join(self.pathname, "terms-%d" % self.dict_partition), "wb")
1.305 - info_writer = TermWriter(tdf)
1.306 -
1.307 - tdif = open(join(self.pathname, "index-%d" % self.dict_partition), "wb")
1.308 - index_writer = TermIndexWriter(tdif)
1.309 -
1.310 - tpf = open(join(self.pathname, "positions-%d" % self.dict_partition), "wb")
1.311 - positions_writer = PositionWriter(tpf)
1.312 -
1.313 - return TermDictionaryWriter(info_writer, index_writer, positions_writer, self.interval)
1.314 + return get_term_writer(self.pathname, self.dict_partition, self.interval)
1.315
1.316 def get_field_writer(self):
1.317
1.318 "Return a field dictionary writer for the current partition."
1.319
1.320 - ff = open(join(self.pathname, "fields-%d" % self.field_dict_partition), "wb")
1.321 - field_writer = FieldWriter(ff)
1.322 -
1.323 - fif = open(join(self.pathname, "fields_index-%d" % self.field_dict_partition), "wb")
1.324 - field_index_writer = FieldIndexWriter(fif)
1.325 -
1.326 - return FieldDictionaryWriter(field_writer, field_index_writer, self.interval)
1.327 + return get_field_writer(self.pathname, self.field_dict_partition, self.interval)
1.328
1.329 def flush_terms(self):
1.330
1.331 @@ -992,22 +1118,22 @@
1.332 self.field_dict_reader = self.get_field_reader(partition)
1.333
1.334 def get_term_reader(self, partition):
1.335 - tdf = open(join(self.pathname, "terms-%d" % partition), "rb")
1.336 + tdf = open(join(self.pathname, "terms-%s" % partition), "rb")
1.337 info_reader = TermReader(tdf)
1.338
1.339 - tdif = open(join(self.pathname, "index-%d" % partition), "rb")
1.340 + tdif = open(join(self.pathname, "index-%s" % partition), "rb")
1.341 index_reader = TermIndexReader(tdif)
1.342
1.343 - tpf = open(join(self.pathname, "positions-%d" % partition), "rb")
1.344 + tpf = open(join(self.pathname, "positions-%s" % partition), "rb")
1.345 positions_reader = PositionReader(tpf)
1.346
1.347 return TermDictionaryReader(info_reader, index_reader, positions_reader)
1.348
1.349 def get_field_reader(self, partition):
1.350 - ff = open(join(self.pathname, "fields-%d" % partition), "rb")
1.351 + ff = open(join(self.pathname, "fields-%s" % partition), "rb")
1.352 field_reader = FieldReader(ff)
1.353
1.354 - fif = open(join(self.pathname, "fields_index-%d" % partition), "rb")
1.355 + fif = open(join(self.pathname, "fields_index-%s" % partition), "rb")
1.356 field_index_reader = FieldIndexReader(fif)
1.357
1.358 return FieldDictionaryReader(field_reader, field_index_reader)
1.359 @@ -1019,7 +1145,7 @@
1.360 return self.dict_reader.get_frequency(term)
1.361
1.362 def get_fields(self, docnum):
1.363 - return self.field_dict_reader.read_fields(docnum)
1.364 + return self.field_dict_reader.get_fields(docnum)
1.365
1.366 def close(self):
1.367 self.dict_reader.close()
1.368 @@ -1057,9 +1183,9 @@
1.369 self.reader = IndexReader(self.pathname, partition)
1.370 return self.reader
1.371
1.372 - def merge_terms(self):
1.373 + def merge_terms(self, interval=INTERVAL):
1.374
1.375 - "Merge term dictionaries."
1.376 + "Merge term dictionaries using the given indexing 'interval'."
1.377
1.378 readers = []
1.379
1.380 @@ -1068,7 +1194,11 @@
1.381 partition = int(filename[6:])
1.382 readers.append(self.get_reader(partition))
1.383
1.384 - # NOTE: Make a distinct new writer/index.
1.385 + writer = get_writer(self.pathname, "new", interval)
1.386 +
1.387 + merger = TermDictionaryMerger(writer, readers)
1.388 + merger.merge()
1.389 + merger.close()
1.390
1.391 def close(self):
1.392 if self.reader is not None:
2.1 --- a/test.py Sun Aug 30 03:10:20 2009 +0200
2.2 +++ b/test.py Sun Aug 30 21:29:10 2009 +0200
2.3 @@ -82,7 +82,7 @@
2.4 f = open("testF", "wb")
2.5 w = iixr.FieldWriter(f)
2.6 for docnum, fields in doc_fields:
2.7 - w.write_fields(docnum, fields)
2.8 + w.write_fields(docnum, list(enumerate(fields)))
2.9 w.close()
2.10
2.11 f = open("testF", "rb")
2.12 @@ -90,7 +90,7 @@
2.13 for docnum, fields in doc_fields:
2.14 dn, df = r.read_fields()
2.15 print docnum == dn, docnum, dn
2.16 - print fields == df, fields, df
2.17 + print list(enumerate(fields)) == df, list(enumerate(fields)), df
2.18 r.close()
2.19
2.20 # Test field index files.
2.21 @@ -123,7 +123,7 @@
2.22 w2 = iixr.FieldIndexWriter(f2)
2.23 wd = iixr.FieldDictionaryWriter(w, w2, 3)
2.24 for docnum, fields in doc_fields:
2.25 - wd.write_fields(docnum, fields)
2.26 + wd.write_fields(docnum, list(enumerate(fields)))
2.27 wd.close()
2.28
2.29 f = open("testF", "rb")
2.30 @@ -134,11 +134,19 @@
2.31 doc_fields_reversed = doc_fields[:]
2.32 doc_fields_reversed.reverse()
2.33 for docnum, fields in doc_fields_reversed:
2.34 - df = rd.read_fields(docnum)
2.35 - print fields == df, fields, df
2.36 + df = rd.get_fields(docnum)
2.37 + print list(enumerate(fields)) == df, list(enumerate(fields)), df
2.38 for docnum in (13579, 246810):
2.39 - df = rd.read_fields(docnum)
2.40 + df = rd.get_fields(docnum)
2.41 print df is None, df
2.42 +
2.43 +# (Test sequential access.)
2.44 +
2.45 +rd.rewind()
2.46 +for docnum, fields in doc_fields:
2.47 + dn, df = rd.read_fields()
2.48 + print docnum == dn, docnum, dn
2.49 + print list(enumerate(fields)) == df, list(enumerate(fields)), df
2.50 rd.close()
2.51
2.52 # Test terms.
2.53 @@ -297,7 +305,7 @@
2.54 for docnum, text in docs:
2.55 for position, term in enumerate(text.split()):
2.56 wi.add_position(term, docnum, position)
2.57 - wi.add_fields(docnum, [text])
2.58 + wi.add_field(docnum, 123, text)
2.59 wi.close()
2.60
2.61 rd = index.get_reader()
2.62 @@ -308,7 +316,7 @@
2.63 print frequency == fr, frequency, fr
2.64 for docnum, text in docs:
2.65 df = rd.get_fields(docnum)
2.66 - print text == df[0], text, df[0]
2.67 + print (123, text) == df[0], (123, text), df[0]
2.68 index.close()
2.69
2.70 # vim: tabstop=4 expandtab shiftwidth=4