1.1 --- a/iixr.py Wed Sep 09 01:18:04 2009 +0200
1.2 +++ b/iixr.py Thu Sep 10 23:19:13 2009 +0200
1.3 @@ -24,7 +24,6 @@
1.4 from os.path import exists, join
1.5 from os.path import commonprefix # to find common string prefixes
1.6 from bisect import bisect_right # to find terms in the dictionary index
1.7 -from bisect import insort_right # to maintain a sorted list of data for merging
1.8 import bz2, zlib # for field compression
1.9 from itermerge import itermerge
1.10
1.11 @@ -152,19 +151,18 @@
1.12
1.13 shift = 0
1.14 number = 0
1.15 - more = 1
1.16 -
1.17 - while more:
1.18 - byte = self.f.read(1)
1.19 - if not byte:
1.20 - raise EOFError
1.21 -
1.22 - csd = ord(byte)
1.23 - more = csd & 128 != 0
1.24 - if more:
1.25 - csd &= 127
1.26 - number += (csd << shift)
1.27 - shift += 7
1.28 + read = self.f.read
1.29 +
1.30 + try:
1.31 + csd = ord(read(1))
1.32 + while csd & 128:
1.33 + number += ((csd & 127) << shift)
1.34 + shift += 7
1.35 + csd = ord(read(1))
1.36 + else:
1.37 + number += (csd << shift)
1.38 + except TypeError:
1.39 + raise EOFError
1.40
1.41 return number
1.42
1.43 @@ -195,6 +193,19 @@
1.44
1.45 return unicode(s, "utf-8")
1.46
1.47 +class FileOpener:
1.48 +
1.49 + "Opening files using their filenames."
1.50 +
1.51 + def __init__(self, filename):
1.52 + self.filename = filename
1.53 +
1.54 + def open(self, mode):
1.55 + return open(self.filename, mode)
1.56 +
1.57 + def close(self):
1.58 + pass
1.59 +
1.60 # Specific classes for storing term and position information.
1.61
1.62 class PositionWriter(FileWriter):
1.63 @@ -242,39 +253,10 @@
1.64
1.65 return offset
1.66
1.67 -class PositionReader(FileReader):
1.68 +class PositionOpener(FileOpener):
1.69
1.70 "Reading position information from files."
1.71
1.72 - def reset(self):
1.73 - self.last_docnum = 0
1.74 -
1.75 - def read_positions(self):
1.76 -
1.77 - "Read positions, returning a document number and a list of positions."
1.78 -
1.79 - # Read the document number delta and add it to the last number.
1.80 -
1.81 - self.last_docnum += self.read_number()
1.82 -
1.83 - # Read the number of positions.
1.84 -
1.85 - npositions = self.read_number()
1.86 -
1.87 - # Read the position deltas, adding each previous position to get the
1.88 - # appropriate collection of absolute positions.
1.89 -
1.90 - i = 0
1.91 - last = 0
1.92 - positions = []
1.93 -
1.94 - while i < npositions:
1.95 - last += self.read_number()
1.96 - positions.append(last)
1.97 - i += 1
1.98 -
1.99 - return self.last_docnum, positions
1.100 -
1.101 def read_term_positions(self, offset, count):
1.102
1.103 """
1.104 @@ -285,7 +267,7 @@
1.105
1.106 # Duplicate the file handle.
1.107
1.108 - f = fdopen(dup(self.f.fileno()), "rb")
1.109 + f = self.open("rb")
1.110 f.seek(offset)
1.111 return PositionIterator(f, count)
1.112
1.113 @@ -324,10 +306,100 @@
1.114
1.115 return offset
1.116
1.117 -class PositionIndexReader(FileReader):
1.118 +class PositionIndexOpener(FileOpener):
1.119
1.120 "Reading position index information from files."
1.121
1.122 + def read_term_positions(self, offset, doc_frequency):
1.123 +
1.124 + """
1.125 + Read all positions from 'offset', seeking to that position in the file
1.126 + before reading. The number of documents available for reading is limited
1.127 + to 'doc_frequency'.
1.128 + """
1.129 +
1.130 + # Duplicate the file handle.
1.131 +
1.132 + f = self.open("rb")
1.133 + f.seek(offset)
1.134 + return PositionIndexIterator(f, doc_frequency)
1.135 +
1.136 +# Iterators for position-related files.
1.137 +
1.138 +class IteratorBase:
1.139 +
1.140 + def __init__(self, count):
1.141 + self.replenish(count)
1.142 +
1.143 + def replenish(self, count):
1.144 + self.count = count
1.145 + self.read_documents = 0
1.146 +
1.147 + def __len__(self):
1.148 + return self.count
1.149 +
1.150 + def sort(self):
1.151 + pass # Stored document positions are already sorted.
1.152 +
1.153 + def __iter__(self):
1.154 + return self
1.155 +
1.156 +class PositionIterator(FileReader, IteratorBase):
1.157 +
1.158 + "Iterating over document positions."
1.159 +
1.160 + def __init__(self, f, count):
1.161 + FileReader.__init__(self, f)
1.162 + IteratorBase.__init__(self, count)
1.163 +
1.164 + def reset(self):
1.165 + self.last_docnum = 0
1.166 +
1.167 + def read_positions(self):
1.168 +
1.169 + "Read positions, returning a document number and a list of positions."
1.170 +
1.171 + # Read the document number delta and add it to the last number.
1.172 +
1.173 + self.last_docnum += self.read_number()
1.174 +
1.175 + # Read the number of positions.
1.176 +
1.177 + npositions = self.read_number()
1.178 +
1.179 + # Read the position deltas, adding each previous position to get the
1.180 + # appropriate collection of absolute positions.
1.181 +
1.182 + i = 0
1.183 + last = 0
1.184 + positions = []
1.185 +
1.186 + while i < npositions:
1.187 + last += self.read_number()
1.188 + positions.append(last)
1.189 + i += 1
1.190 +
1.191 + return self.last_docnum, positions
1.192 +
1.193 + def next(self):
1.194 +
1.195 + "Read positions for a single document."
1.196 +
1.197 + if self.read_documents < self.count:
1.198 + self.read_documents += 1
1.199 + return self.read_positions()
1.200 + else:
1.201 + raise StopIteration
1.202 +
1.203 +class PositionIndexIterator(FileReader, IteratorBase):
1.204 +
1.205 + "Iterating over document positions."
1.206 +
1.207 + def __init__(self, f, count):
1.208 + FileReader.__init__(self, f)
1.209 + IteratorBase.__init__(self, count)
1.210 + self.section_count = 0
1.211 +
1.212 def reset(self):
1.213 self.last_docnum = 0
1.214 self.last_pos_offset = 0
1.215 @@ -353,67 +425,6 @@
1.216
1.217 return self.last_docnum, self.last_pos_offset, count
1.218
1.219 - def read_term_positions(self, offset, doc_frequency):
1.220 -
1.221 - """
1.222 - Read all positions from 'offset', seeking to that position in the file
1.223 - before reading. The number of documents available for reading is limited
1.224 - to 'doc_frequency'.
1.225 - """
1.226 -
1.227 - # Duplicate the file handle.
1.228 -
1.229 - f = fdopen(dup(self.f.fileno()), "rb")
1.230 - f.seek(offset)
1.231 - return PositionIndexIterator(f, doc_frequency)
1.232 -
1.233 -# Iterators for position-related files.
1.234 -
1.235 -class IteratorBase:
1.236 -
1.237 - def __init__(self, count):
1.238 - self.replenish(count)
1.239 -
1.240 - def replenish(self, count):
1.241 - self.count = count
1.242 - self.read_documents = 0
1.243 -
1.244 - def __len__(self):
1.245 - return self.count
1.246 -
1.247 - def sort(self):
1.248 - pass # Stored document positions are already sorted.
1.249 -
1.250 - def __iter__(self):
1.251 - return self
1.252 -
1.253 -class PositionIterator(PositionReader, IteratorBase):
1.254 -
1.255 - "Iterating over document positions."
1.256 -
1.257 - def __init__(self, f, count):
1.258 - PositionReader.__init__(self, f)
1.259 - IteratorBase.__init__(self, count)
1.260 -
1.261 - def next(self):
1.262 -
1.263 - "Read positions for a single document."
1.264 -
1.265 - if self.read_documents < self.count:
1.266 - self.read_documents += 1
1.267 - return self.read_positions()
1.268 - else:
1.269 - raise StopIteration
1.270 -
1.271 -class PositionIndexIterator(PositionIndexReader, IteratorBase):
1.272 -
1.273 - "Iterating over document positions."
1.274 -
1.275 - def __init__(self, f, count):
1.276 - PositionIndexReader.__init__(self, f)
1.277 - IteratorBase.__init__(self, count)
1.278 - self.section_count = 0
1.279 -
1.280 def next(self):
1.281
1.282 "Read positions for a single document."
1.283 @@ -477,7 +488,7 @@
1.284
1.285 # Every {interval} entries, write an index entry.
1.286
1.287 - if count == self.interval:
1.288 + if count % self.interval == 0:
1.289 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval)
1.290
1.291 # Remember the first index entry offset.
1.292 @@ -487,7 +498,6 @@
1.293
1.294 first_offset = None
1.295 first_docnum = None
1.296 - count = 0
1.297
1.298 # Reset the position writer so that position readers accessing
1.299 # a section start with the correct document number.
1.300 @@ -498,14 +508,14 @@
1.301
1.302 else:
1.303 if first_offset is not None:
1.304 - io = self.position_index_writer.write_positions(first_docnum, first_offset, count)
1.305 + io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval)
1.306
1.307 # Remember the first index entry offset.
1.308
1.309 if index_offset is None:
1.310 index_offset = io
1.311
1.312 - return index_offset, frequency, len(doc_positions)
1.313 + return index_offset, frequency, count
1.314
1.315 def close(self):
1.316 self.position_writer.close()
1.317 @@ -515,9 +525,9 @@
1.318
1.319 "Reading position dictionaries."
1.320
1.321 - def __init__(self, position_reader, position_index_reader):
1.322 - self.position_reader = position_reader
1.323 - self.position_index_reader = position_index_reader
1.324 + def __init__(self, position_opener, position_index_opener):
1.325 + self.position_opener = position_opener
1.326 + self.position_index_opener = position_index_opener
1.327
1.328 def read_term_positions(self, offset, doc_frequency):
1.329
1.330 @@ -526,21 +536,21 @@
1.331 given 'doc_frequency'.
1.332 """
1.333
1.334 - return PositionDictionaryIterator(self.position_reader,
1.335 - self.position_index_reader, offset, doc_frequency)
1.336 + return PositionDictionaryIterator(self.position_opener,
1.337 + self.position_index_opener, offset, doc_frequency)
1.338
1.339 def close(self):
1.340 - self.position_reader.close()
1.341 - self.position_index_reader.close()
1.342 + pass
1.343
1.344 class PositionDictionaryIterator:
1.345
1.346 "Iteration over position dictionary entries."
1.347
1.348 - def __init__(self, position_reader, position_index_reader, offset, doc_frequency):
1.349 - self.position_reader = position_reader
1.350 + def __init__(self, position_opener, position_index_opener, offset, doc_frequency):
1.351 + self.position_opener = position_opener
1.352 self.doc_frequency = doc_frequency
1.353 - self.index_iterator = position_index_reader.read_term_positions(offset, doc_frequency)
1.354 + self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency)
1.355 + self.iterator = None
1.356
1.357 # Remember the last values.
1.358
1.359 @@ -555,12 +565,16 @@
1.360 self._next_section()
1.361 self._init_section()
1.362
1.363 + # Sequence methods.
1.364 +
1.365 def __len__(self):
1.366 return self.doc_frequency
1.367
1.368 def sort(self):
1.369 pass
1.370
1.371 + # Iterator methods.
1.372 +
1.373 def __iter__(self):
1.374 return self
1.375
1.376 @@ -680,7 +694,17 @@
1.377
1.378 "Initialise the iterator for the section in the position file."
1.379
1.380 - self.iterator = self.position_reader.read_term_positions(self.pos_offset, self.section_count)
1.381 + if self.iterator is not None:
1.382 + self.iterator.close()
1.383 + self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count)
1.384 +
1.385 + def close(self):
1.386 + if self.iterator is not None:
1.387 + self.iterator.close()
1.388 + self.iterator = None
1.389 + if self.index_iterator is not None:
1.390 + self.index_iterator.close()
1.391 + self.index_iterator = None
1.392
1.393 class TermWriter(FileWriter):
1.394
1.395 @@ -957,6 +981,18 @@
1.396 def _get_positions(self, offset, doc_frequency):
1.397 return self.position_dict_reader.read_term_positions(offset, doc_frequency)
1.398
1.399 + # Iterator convenience methods.
1.400 +
1.401 + def __iter__(self):
1.402 + self.rewind()
1.403 + return self
1.404 +
1.405 + def next(self):
1.406 + try:
1.407 + return self.read_term()
1.408 + except EOFError:
1.409 + raise StopIteration
1.410 +
1.411 # Sequential access methods.
1.412
1.413 def rewind(self):
1.414 @@ -1213,6 +1249,20 @@
1.415 else:
1.416 self.max_offset = None
1.417
1.418 + # Iterator convenience methods.
1.419 +
1.420 + def __iter__(self):
1.421 + self.rewind()
1.422 + return self
1.423 +
1.424 + def next(self):
1.425 + try:
1.426 + return self.read_fields()
1.427 + except EOFError:
1.428 + raise StopIteration
1.429 +
1.430 + # Sequential access methods.
1.431 +
1.432 def rewind(self):
1.433 self.field_reader.rewind()
1.434
1.435 @@ -1222,6 +1272,8 @@
1.436
1.437 return self.field_reader.read_fields()
1.438
1.439 + # Random access methods.
1.440 +
1.441 def get_fields(self, docnum):
1.442
1.443 "Read the fields of the document with the given 'docnum'."
1.444 @@ -1283,57 +1335,20 @@
1.445 Merge terms and positions from the readers, sending them to the writer.
1.446 """
1.447
1.448 - entries = []
1.449 -
1.450 - # Get the first entries from the readers.
1.451 -
1.452 - for partition, reader in enumerate(self.readers):
1.453 - reader.rewind()
1.454 -
1.455 - try:
1.456 - term, frequency, doc_frequency, positions = reader.read_term()
1.457 - insort_right(entries, (term, positions, partition))
1.458 - except EOFError:
1.459 - pass
1.460 -
1.461 - # While entries are available, write them out in order, merging where
1.462 - # appropriate.
1.463 -
1.464 - while entries:
1.465 - term, doc_positions, partition = entries[0]
1.466 - to_update = [partition]
1.467 -
1.468 - nentries = len(entries)
1.469 - i = 1
1.470 -
1.471 - # Find other entries for the term.
1.472 -
1.473 - while i < nentries:
1.474 - other_term, other_doc_positions, other_partition = entries[i]
1.475 -
1.476 - # For such entries, merge the positions.
1.477 -
1.478 - if other_term == term:
1.479 - doc_positions = itermerge(doc_positions, other_doc_positions)
1.480 - to_update.append(other_partition)
1.481 - i += 1
1.482 - else:
1.483 - break
1.484 -
1.485 - # Write the combined term details.
1.486 -
1.487 - self.writer.write_term_positions(term, doc_positions)
1.488 -
1.489 - # Update the entries from the affected readers.
1.490 -
1.491 - del entries[:i]
1.492 -
1.493 - for partition in to_update:
1.494 - try:
1.495 - term, frequency, doc_frequency, positions = self.readers[partition].read_term()
1.496 - insort_right(entries, (term, positions, partition))
1.497 - except EOFError:
1.498 - pass
1.499 + last_term = None
1.500 + current_readers = []
1.501 +
1.502 + for term, frequency, doc_frequency, positions in itermerge(self.readers):
1.503 + if term == last_term:
1.504 + current_readers.append(positions)
1.505 + else:
1.506 + if current_readers:
1.507 + self.writer.write_term_positions(last_term, itermerge(current_readers))
1.508 + last_term = term
1.509 + current_readers = [positions]
1.510 + else:
1.511 + if current_readers:
1.512 + self.writer.write_term_positions(last_term, itermerge(current_readers))
1.513
1.514 class FieldDictionaryMerger(Merger):
1.515
1.516 @@ -1345,40 +1360,9 @@
1.517 Merge fields from the readers, sending them to the writer.
1.518 """
1.519
1.520 - entries = []
1.521 -
1.522 - # Get the first entries from the readers.
1.523 -
1.524 - for partition, reader in enumerate(self.readers):
1.525 - reader.rewind()
1.526 -
1.527 - try:
1.528 - docnum, fields = reader.read_fields()
1.529 - insort_right(entries, (docnum, fields, partition))
1.530 - except EOFError:
1.531 - pass
1.532 -
1.533 - # While entries are available, write them out in order, merging where
1.534 - # appropriate. Since fields from one document should only appear in a
1.535 - # single partition, only one partition will be updated at a time.
1.536 -
1.537 - while entries:
1.538 - docnum, fields, partition = entries[0]
1.539 -
1.540 - # Write the combined term details.
1.541 -
1.542 + for docnum, fields in itermerge(self.readers):
1.543 self.writer.write_fields(docnum, fields)
1.544
1.545 - # Update the entries from the affected readers.
1.546 -
1.547 - del entries[0]
1.548 -
1.549 - try:
1.550 - docnum, fields = self.readers[partition].read_fields()
1.551 - insort_right(entries, (docnum, fields, partition))
1.552 - except EOFError:
1.553 - pass
1.554 -
1.555 # Utility functions.
1.556
1.557 def get_term_writer(pathname, partition, interval, doc_interval):
1.558 @@ -1434,13 +1418,10 @@
1.559 tdif = open(join(pathname, "terms_index-%s" % partition), "rb")
1.560 index_reader = TermIndexReader(tdif)
1.561
1.562 - tpf = open(join(pathname, "positions-%s" % partition), "rb")
1.563 - positions_reader = PositionReader(tpf)
1.564 -
1.565 - tpif = open(join(pathname, "positions_index-%s" % partition), "rb")
1.566 - positions_index_reader = PositionIndexReader(tpif)
1.567 -
1.568 - positions_dict_reader = PositionDictionaryReader(positions_reader, positions_index_reader)
1.569 + positions_opener = PositionOpener(join(pathname, "positions-%s" % partition))
1.570 + positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition))
1.571 +
1.572 + positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener)
1.573
1.574 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader)
1.575
3.1 --- a/test.py Wed Sep 09 01:18:04 2009 +0200
3.2 +++ b/test.py Thu Sep 10 23:19:13 2009 +0200
3.3 @@ -60,7 +60,7 @@
3.4 w.close()
3.5
3.6 f = open("testP", "rb")
3.7 -r = iixr.PositionReader(f)
3.8 +r = iixr.PositionIterator(f, None)
3.9 for doc_positions in all_doc_positions:
3.10 for docnum, positions in doc_positions:
3.11 d, p = r.read_positions()
3.12 @@ -97,8 +97,7 @@
3.13 offsets.append((offset, doc_frequency))
3.14 w.close()
3.15
3.16 -f = open("testPI", "rb")
3.17 -r = iixr.PositionIndexReader(f)
3.18 +r = iixr.PositionIndexOpener("testPI")
3.19 offsets.reverse()
3.20 indexed_positions.reverse()
3.21 for (offset, doc_frequency), term_positions in zip(offsets, indexed_positions):
3.22 @@ -122,10 +121,8 @@
3.23 offsets.append((offset, doc_frequency))
3.24 wd.close()
3.25
3.26 -f = open("testP", "rb")
3.27 -r = iixr.PositionReader(f)
3.28 -f2 = open("testPI", "rb")
3.29 -r2 = iixr.PositionIndexReader(f2)
3.30 +r = iixr.PositionOpener("testP")
3.31 +r2 = iixr.PositionIndexOpener("testPI")
3.32 rd = iixr.PositionDictionaryReader(r, r2)
3.33 offsets.reverse()
3.34 all_doc_positions.reverse()
3.35 @@ -293,10 +290,8 @@
3.36 r = iixr.TermReader(f)
3.37 f2 = open("testI", "rb")
3.38 r2 = iixr.TermIndexReader(f2)
3.39 -f3 = open("testP", "rb")
3.40 -r3 = iixr.PositionReader(f3)
3.41 -f4 = open("testPI", "rb")
3.42 -r4 = iixr.PositionIndexReader(f4)
3.43 +r3 = iixr.PositionOpener("testP")
3.44 +r4 = iixr.PositionIndexOpener("testPI")
3.45 rp = iixr.PositionDictionaryReader(r3, r4)
3.46 rd = iixr.TermDictionaryReader(r, r2, rp)
3.47 terms_reversed = terms[:]
3.48 @@ -357,10 +352,8 @@
3.49 r = iixr.TermReader(f)
3.50 f2 = open("testI", "rb")
3.51 r2 = iixr.TermIndexReader(f2)
3.52 -f3 = open("testP", "rb")
3.53 -r3 = iixr.PositionReader(f3)
3.54 -f4 = open("testPI", "rb")
3.55 -r4 = iixr.PositionIndexReader(f4)
3.56 +r3 = iixr.PositionOpener("testP")
3.57 +r4 = iixr.PositionIndexOpener("testPI")
3.58 rp = iixr.PositionDictionaryReader(r3, r4)
3.59 rd = iixr.TermDictionaryReader(r, r2, rp)
3.60 terms_reversed = terms_with_positions[:]