# HG changeset patch # User Paul Boddie # Date 1252966558 -7200 # Node ID 8d41e2d063e91164d0d0ea9b187030c70e617a8a # Parent fad9698e2c46a12bb0e924adb588fb75874b0bb6 Removed old module. diff -r fad9698e2c46 -r 8d41e2d063e9 iixr.py --- a/iixr.py Tue Sep 15 00:15:11 2009 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1876 +0,0 @@ -#!/usr/bin/env python - -""" -A simple (and sane) text indexing library. - -Copyright (C) 2009 Paul Boddie - -This program is free software; you can redistribute it and/or modify it under -the terms of the GNU General Public License as published by the Free Software -Foundation; either version 3 of the License, or (at your option) any later -version. - -This program is distributed in the hope that it will be useful, but WITHOUT ANY -WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along -with this program. If not, see . -""" - -from os import dup, fdopen # independent iterator access to files -from os import listdir, mkdir # index and partition discovery -from os import remove, rename # partition manipulation -from os.path import exists, join -from os.path import commonprefix # to find common string prefixes -from bisect import bisect_right # to find terms in the dictionary index -import bz2, zlib # for field compression -from itermerge import itermerge - -try: - set -except NameError: - from sets import Set as set - -# Constants. - -TERM_INTERVAL = 100 -DOCUMENT_INTERVAL = 100 -FIELD_INTERVAL = 100 -FLUSH_INTERVAL = 10000 - -WRITE_CACHE_SIZE = 100000 -READ_CACHE_SIZE = 10000 -READ_CACHE_RESIZE = 5000 - -TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" -FIELD_FILENAMES = "fields", "fields_index" - -compressors = [("b", bz2.compress), ("z", zlib.compress)] -decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} - -# Utility functions. - -try: - from vint import vint as _vint - - def vint(number): - - "Write 'number' as a variable-length integer." - - if number >= 0: - return _vint(number) - else: - raise ValueError, "Number %r is negative." % number - -except ImportError: - - def vint(number): - - "Write 'number' as a variable-length integer." - - if number >= 0: - - # Special case: one byte containing a 7-bit number. - - if number < 128: - return chr(number) - - # Write the number from least to most significant digits. - - bytes = [] - - while number != 0: - lsd = number & 127 - number = number >> 7 - if number != 0: - lsd |= 128 - bytes.append(chr(lsd)) - - return "".join(bytes) - - # Negative numbers are not supported. - - else: - raise ValueError, "Number %r is negative." % number - -# Foundation classes. - -class File: - - "A basic file abstraction." - - def __init__(self, f): - self.f = f - self.reset() - - def reset(self): - - "To be used to reset the state of the reader or writer between records." - - pass - - def rewind(self): - self.seek(0) - self.reset() - - def seek(self, offset): - - "To be defined by readers." - - pass - - def flush(self): - - "To be defined by writers." - - pass - - def close(self): - if self.f is not None: - self.flush() - self.f.close() - self.f = None - -class FileWriter(File): - - "Writing basic data types to files." - - def __init__(self, f): - File.__init__(self, f) - self.cache = [] - self.cache_length = 0 - - def write_number(self, number): - - "Write 'number' to the file using a variable length encoding." - - self.write(vint(number)) - - def write_string(self, s, compress=0): - - """ - Write 's' to the file, recording its length and compressing the string - if 'compress' is set to a true value. - """ - - # Convert Unicode objects to strings. - - if isinstance(s, unicode): - s = s.encode("utf-8") - - # Compress the string if requested. - - if compress: - for flag, fn in compressors: - cs = fn(s) - - # Take the first string shorter than the original. - - if len(cs) < len(s): - s = cs - break - else: - flag = "-" - - else: - flag = "" - - # Write the length of the data before the data itself. - - length = len(s) - self.write(flag + vint(length) + s) - - # Cache-affected methods. - - def write(self, s): - self.cache.append(s) - self.cache_length += len(s) - if self.cache_length >= WRITE_CACHE_SIZE: - self.flush() - - def tell(self): - return self.f.tell() + self.cache_length - - def flush(self): - self.f.write("".join(self.cache)) - self.cache = [] - self.cache_length = 0 - -class FileReader(File): - - "Reading basic data types from files." - - def __init__(self, f): - File.__init__(self, f) - self.reset_cache() - - def reset_cache(self): - self.cache = "" - self.cache_length = 0 - self.cache_start = 0 - - def read_number(self): - - "Read a number from the file." - - # Read each byte, adding it to the number. - - shift = 0 - number = 0 - read = self.read - - try: - csd = ord(read(1)) - while csd & 128: - number += ((csd & 127) << shift) - shift += 7 - csd = ord(read(1)) - else: - number += (csd << shift) - except TypeError: - raise EOFError - - return number - - def read_string(self, decompress=0): - - """ - Read a string from the file, decompressing the stored data if - 'decompress' is set to a true value. - """ - - # Decompress the data if requested. - - if decompress: - flag = self.read(1) - else: - flag = "-" - - length = self.read_number() - s = self.read(length) - - # Perform decompression if applicable. - - if flag != "-": - fn = decompressors[flag] - s = fn(s) - - # Convert strings to Unicode objects. - - return unicode(s, "utf-8") - - # Cache-affected methods. - - def read(self, n): - needed = n - (self.cache_length - self.cache_start) - - # Read the needed number of characters, if possible. - - if needed > 0: - s = self.f.read(max(needed, READ_CACHE_SIZE)) - self.cache += s - self.cache_length += len(s) - - # Get the end of the requested block. - - next_start = self.cache_start + n - s = self.cache[self.cache_start:next_start] - - # Reposition the pointer to the cache. - - self._seek_cache(len(s)) - return s - - def tell(self): - return self.f.tell() - self.cache_length + self.cache_start - - def seek(self, offset): - current = self.tell() - self.f.seek(offset) - - # If seeking forward, attempt to navigate the cache. - - if offset >= current: - self._seek_cache(offset - current) - else: - self.reset_cache() - - def _seek_cache(self, delta): - next_start = self.cache_start + delta - - if next_start > 0 and next_start >= len(self.cache): - self.reset_cache() - - # If the cache is too big, resize it. - - elif next_start > READ_CACHE_RESIZE: - self.cache = self.cache[next_start:] - self.cache_length = len(self.cache) - self.cache_start = 0 - - # Otherwise, just reference the next part of the cache. - - else: - self.cache_start = next_start - -class FileOpener: - - "Opening files using their filenames." - - def __init__(self, filename): - self.filename = filename - - def open(self, mode): - return open(self.filename, mode) - - def close(self): - pass - -# Specific classes for storing term and position information. - -class PositionWriter(FileWriter): - - "Writing position information to files." - - def reset(self): - self.last_docnum = 0 - - def write_positions(self, docnum, positions): - - """ - Write for the document 'docnum' the given 'positions'. - Return the offset of the written record. - """ - - if docnum < self.last_docnum: - raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) - - # Record the offset of this record. - - offset = self.tell() - - # Make sure that the positions are sorted. - - positions.sort() - - # Write the position deltas. - - output = [] - last = 0 - - for position in positions: - output.append(vint(position - last)) - last = position - - # Write the document number delta. - # Write the number of positions. - # Then write the positions. - - self.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) - - self.last_docnum = docnum - return offset - -class PositionOpener(FileOpener): - - "Reading position information from files." - - def read_term_positions(self, offset, count): - - """ - Read all positions from 'offset', seeking to that position in the file - before reading. The number of documents available for reading is limited - to 'count'. - """ - - # Duplicate the file handle. - - f = self.open("rb") - return PositionIterator(f, offset, count) - -class PositionIndexWriter(FileWriter): - - "Writing position index information to files." - - def reset(self): - self.last_docnum = 0 - self.last_pos_offset = 0 - - def write_positions(self, docnum, pos_offset, count): - - """ - Write the given 'docnum, 'pos_offset' and document 'count' to the - position index file. - """ - - # Record the offset of this record. - - offset = self.tell() - output = [] - - # Write the document number delta. - - output.append(vint(docnum - self.last_docnum)) - self.last_docnum = docnum - - # Write the position file offset delta. - - output.append(vint(pos_offset - self.last_pos_offset)) - self.last_pos_offset = pos_offset - - # Write the document count. - - output.append(vint(count)) - - # Actually write the data. - - self.write("".join(output)) - - return offset - -class PositionIndexOpener(FileOpener): - - "Reading position index information from files." - - def read_term_positions(self, offset, doc_frequency): - - """ - Read all positions from 'offset', seeking to that position in the file - before reading. The number of documents available for reading is limited - to 'doc_frequency'. - """ - - # Duplicate the file handle. - - f = self.open("rb") - return PositionIndexIterator(f, offset, doc_frequency) - -# Iterators for position-related files. - -class IteratorBase: - - def __init__(self, count): - self.replenish(count) - - def replenish(self, count): - self.count = count - self.read_documents = 0 - - def __len__(self): - return self.count - - def sort(self): - pass # Stored document positions are already sorted. - - def __iter__(self): - return self - -class PositionIterator(FileReader, IteratorBase): - - "Iterating over document positions." - - def __init__(self, f, offset, count): - FileReader.__init__(self, f) - IteratorBase.__init__(self, count) - self.seek(offset) - - def reset(self): - self.last_docnum = 0 - - def read_positions(self): - - "Read positions, returning a document number and a list of positions." - - # Read the document number delta and add it to the last number. - - self.last_docnum += self.read_number() - - # Read the number of positions. - - npositions = self.read_number() - - # Read the position deltas, adding each previous position to get the - # appropriate collection of absolute positions. - - i = 0 - last = 0 - positions = [] - - while i < npositions: - last += self.read_number() - positions.append(last) - i += 1 - - return self.last_docnum, positions - - def next(self): - - "Read positions for a single document." - - if self.read_documents < self.count: - self.read_documents += 1 - return self.read_positions() - else: - raise StopIteration - -class PositionIndexIterator(FileReader, IteratorBase): - - "Iterating over document positions." - - def __init__(self, f, offset, count): - FileReader.__init__(self, f) - IteratorBase.__init__(self, count) - self.seek(offset) - self.section_count = 0 - - def reset(self): - self.last_docnum = 0 - self.last_pos_offset = 0 - - def read_positions(self): - - """ - Read a document number, a position file offset for the position index - file, and the number of documents in a section of that file. - """ - - # Read the document number delta. - - self.last_docnum += self.read_number() - - # Read the offset delta. - - self.last_pos_offset += self.read_number() - - # Read the document count. - - count = self.read_number() - - return self.last_docnum, self.last_pos_offset, count - - def next(self): - - "Read positions for a single document." - - self.read_documents += self.section_count - if self.read_documents < self.count: - docnum, pos_offset, self.section_count = t = self.read_positions() - return t - else: - raise StopIteration - -class PositionDictionaryWriter: - - "Writing position dictionaries." - - def __init__(self, position_writer, position_index_writer, interval): - self.position_writer = position_writer - self.position_index_writer = position_index_writer - self.interval = interval - - def write_term_positions(self, doc_positions): - - """ - Write all 'doc_positions' - a collection of tuples of the form (document - number, position list) - to the file. - - Add some records to the index, making dictionary entries. - - Return a tuple containing the offset of the written data, the frequency - (number of positions), and document frequency (number of documents) for - the term involved. - """ - - # Reset the writers. - - self.position_writer.reset() - self.position_index_writer.reset() - - index_offset = None - - # Write the positions. - - frequency = 0 - first_docnum = None - first_offset = None - count = 0 - - doc_positions.sort() - - for docnum, positions in doc_positions: - pos_offset = self.position_writer.write_positions(docnum, positions) - - # Retain the first record offset for a subsequent index entry. - - if first_offset is None: - first_offset = pos_offset - first_docnum = docnum - - frequency += len(positions) - count += 1 - - # Every {interval} entries, write an index entry. - - if count % self.interval == 0: - io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) - - # Remember the first index entry offset. - - if index_offset is None: - index_offset = io - - first_offset = None - first_docnum = None - - # Reset the position writer so that position readers accessing - # a section start with the correct document number. - - self.position_writer.reset() - - # Finish writing an index entry for the remaining documents. - - else: - if first_offset is not None: - io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) - - # Remember the first index entry offset. - - if index_offset is None: - index_offset = io - - return index_offset, frequency, count - - def close(self): - self.position_writer.close() - self.position_index_writer.close() - -class PositionDictionaryReader: - - "Reading position dictionaries." - - def __init__(self, position_opener, position_index_opener): - self.position_opener = position_opener - self.position_index_opener = position_index_opener - - def read_term_positions(self, offset, doc_frequency): - - """ - Return an iterator for dictionary entries starting at 'offset' with the - given 'doc_frequency'. - """ - - return PositionDictionaryIterator(self.position_opener, - self.position_index_opener, offset, doc_frequency) - - def close(self): - pass - -class PositionDictionaryIterator: - - "Iteration over position dictionary entries." - - def __init__(self, position_opener, position_index_opener, offset, doc_frequency): - self.position_opener = position_opener - self.doc_frequency = doc_frequency - self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) - self.iterator = None - - # Remember the last values. - - self.found_docnum, self.found_positions = None, None - - # Maintain state for the next index entry, if read. - - self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None - - # Initialise the current index entry and current position file iterator. - - self._next_section() - self._init_section() - - # Sequence methods. - - def __len__(self): - return self.doc_frequency - - def sort(self): - pass - - # Iterator methods. - - def __iter__(self): - return self - - def next(self): - - """ - Attempt to get the next document record from the section in the - positions file. - """ - - # Return any visited but unrequested record. - - if self.found_docnum is not None: - t = self.found_docnum, self.found_positions - self.found_docnum, self.found_positions = None, None - return t - - # Or search for the next record. - - while 1: - - # Either return the next record. - - try: - return self.iterator.next() - - # Or, where a section is finished, get the next section and try again. - - except StopIteration: - - # Where a section follows, update the index iterator, but keep - # reading using the same file iterator (since the data should - # just follow on from the last section). - - self._next_section() - self.iterator.replenish(self.section_count) - - # Reset the state of the iterator to make sure that document - # numbers are correct. - - self.iterator.reset() - - def from_document(self, docnum): - - """ - Attempt to navigate to a positions entry for the given 'docnum', - returning the positions for 'docnum', or None otherwise. - """ - - # Return any unrequested document positions. - - if docnum == self.found_docnum: - return self.found_positions - - # Read ahead in the index until the next entry refers to a document - # later than the desired document. - - try: - if self.next_docnum is None: - self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() - - # Read until the next entry is after the desired document number, - # or until the end of the results. - - while self.next_docnum <= docnum: - self._next_read_section() - if self.docnum < docnum: - self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() - else: - break - - except StopIteration: - pass - - # Navigate in the position file to the document. - - self._init_section() - - try: - while 1: - found_docnum, found_positions = self.iterator.next() - - # Return the desired document positions or None (retaining the - # positions for the document immediately after). - - if docnum == found_docnum: - return found_positions - elif docnum < found_docnum: - self.found_docnum, self.found_positions = found_docnum, found_positions - return None - - except StopIteration: - return None - - # Internal methods. - - def _next_section(self): - - "Attempt to get the next section in the index." - - if self.next_docnum is None: - self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() - else: - self._next_read_section() - - def _next_read_section(self): - - """ - Make the next index entry the current one without reading from the - index. - """ - - self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count - self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None - - def _init_section(self): - - "Initialise the iterator for the section in the position file." - - if self.iterator is not None: - self.iterator.close() - self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) - - def close(self): - if self.iterator is not None: - self.iterator.close() - self.iterator = None - if self.index_iterator is not None: - self.index_iterator.close() - self.index_iterator = None - -class TermWriter(FileWriter): - - "Writing term information to files." - - def reset(self): - self.last_term = "" - self.last_offset = 0 - - def write_term(self, term, offset, frequency, doc_frequency): - - """ - Write the given 'term', its position file 'offset', its 'frequency' and - its 'doc_frequency' (number of documents in which it appears) to the - term information file. Return the offset after the term information was - written to the file. - """ - - # Write the prefix length and term suffix. - - common = len(commonprefix([self.last_term, term])) - suffix = term[common:] - - self.write_number(common) - self.write_string(suffix) - - # Write the offset delta. - - self.write_number(offset - self.last_offset) - - # Write the frequency. - - self.write_number(frequency) - - # Write the document frequency. - - self.write_number(doc_frequency) - - self.last_term = term - self.last_offset = offset - - return self.tell() - -class TermReader(FileReader): - - "Reading term information from files." - - def reset(self): - self.last_term = "" - self.last_offset = 0 - - def read_term(self): - - """ - Read a term, its position file offset, its frequency and its document - frequency from the term information file. - """ - - # Read the prefix length and term suffix. - - common = self.read_number() - suffix = self.read_string() - - self.last_term = self.last_term[:common] + suffix - - # Read the offset delta. - - self.last_offset += self.read_number() - - # Read the frequency. - - frequency = self.read_number() - - # Read the document frequency. - - doc_frequency = self.read_number() - - return self.last_term, self.last_offset, frequency, doc_frequency - - def go_to_term(self, term, offset, info_offset): - - """ - Seek past the entry for 'term' having 'offset' to 'info_offset'. This - permits the scanning for later terms from the specified term. - """ - - self.seek(info_offset) - self.last_term = term - self.last_offset = offset - -class TermIndexWriter(TermWriter): - - "Writing term dictionary index details to files." - - def reset(self): - TermWriter.reset(self) - self.last_info_offset = 0 - - def write_term(self, term, offset, frequency, doc_frequency, info_offset): - - """ - Write the given 'term', its position file 'offset', its 'frequency' and - its 'doc_frequency' to the term dictionary index file, along with the - 'info_offset' in the term information file. - """ - - TermWriter.write_term(self, term, offset, frequency, doc_frequency) - - # Write the information file offset delta. - - self.write_number(info_offset - self.last_info_offset) - self.last_info_offset = info_offset - -class TermIndexReader(TermReader): - - "Reading term dictionary index details from files." - - def reset(self): - TermReader.reset(self) - self.last_info_offset = 0 - - def read_term(self): - - """ - Read a term, its position file offset, its frequency, its document - frequency and a term information file offset from the term dictionary - index file. - """ - - term, offset, frequency, doc_frequency = TermReader.read_term(self) - - # Read the offset delta. - - self.last_info_offset += self.read_number() - - return term, offset, frequency, doc_frequency, self.last_info_offset - -class TermDictionaryWriter: - - "Writing term dictionaries." - - def __init__(self, info_writer, index_writer, position_dict_writer, interval): - self.info_writer = info_writer - self.index_writer = index_writer - self.position_dict_writer = position_dict_writer - self.interval = interval - self.entry = 0 - - def _write_term(self, term, offset, frequency, doc_frequency): - - """ - Write the given 'term', its position file 'offset', its 'frequency' and - its 'doc_frequency' (number of documents in which it appears) to the - term information file. Return the offset after the term information was - written to the file. - """ - - info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) - - if self.entry % self.interval == 0: - self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) - - self.entry += 1 - - def write_term_positions(self, term, doc_positions): - - """ - Write the given 'term' and the 'doc_positions' recording the documents - and positions at which the term is found. - """ - - offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) - self._write_term(term, offset, frequency, doc_frequency) - - def close(self): - self.info_writer.close() - self.index_writer.close() - self.position_dict_writer.close() - -class TermDictionaryReader: - - "Reading term dictionaries." - - def __init__(self, info_reader, index_reader, position_dict_reader): - self.info_reader = info_reader - self.index_reader = index_reader - self.position_dict_reader = position_dict_reader - - self.terms = [] - try: - while 1: - self.terms.append(self.index_reader.read_term()) - except EOFError: - pass - - # Large numbers for ordering purposes. - - if self.terms: - self.max_offset = self.terms[-1][1] + 1 - else: - self.max_offset = None - - def _find_closest_entry(self, term): - - """ - Find the offsets and frequencies of 'term' from the term dictionary or - the closest term starting with the value of 'term'. - - Return the closest index entry consisting of a term, the position file - offset, the term frequency, the document frequency, and the term details - file offset. - """ - - i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 - - # Get the entry position providing the term or one preceding it. - # If no entry precedes the requested term, return the very first entry - # as the closest. - - if i == -1: - return self.terms[0] - else: - return self.terms[i] - - def _find_closest_term(self, term): - - """ - Find the offsets and frequencies of 'term' from the term dictionary or - the closest term starting with the value of 'term'. - - Return the closest term (or the term itself), the position file offset, - the term frequency, the document frequency, and the term details file - offset (or None if the reader is already positioned). - """ - - found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) - - # Where the term is found immediately, return the offset and - # frequencies. If the term does not appear, return the details of the - # closest entry. - - if term <= found_term: - return found_term, offset, frequency, doc_frequency, info_offset - - # Otherwise, seek past the index term's entry in the information file - # and scan for the desired term. - - else: - self.info_reader.go_to_term(found_term, offset, info_offset) - try: - while term > found_term: - found_term, offset, frequency, doc_frequency = self.info_reader.read_term() - except EOFError: - pass - - return found_term, offset, frequency, doc_frequency, None - - def _find_term(self, term): - - """ - Find the position file offset and frequency of 'term' from the term - dictionary. - """ - - found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) - - # If the term is found, return the offset and frequencies. - - if term == found_term: - return offset, frequency, doc_frequency - else: - return None - - def _get_positions(self, offset, doc_frequency): - return self.position_dict_reader.read_term_positions(offset, doc_frequency) - - # Iterator convenience methods. - - def __iter__(self): - self.rewind() - return self - - def next(self): - try: - return self.read_term() - except EOFError: - raise StopIteration - - # Sequential access methods. - - def rewind(self): - self.info_reader.rewind() - - def read_term(self): - - """ - Return the next term, its frequency, its document frequency, and the - documents and positions at which the term is found. - """ - - term, offset, frequency, doc_frequency = self.info_reader.read_term() - positions = self._get_positions(offset, doc_frequency) - return term, frequency, doc_frequency, positions - - # Query methods. - - def find_terms(self, term): - - "Return all terms whose values start with the value of 'term'." - - terms = [] - - found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) - - # Position the reader, if necessary. - - if info_offset is not None: - self.info_reader.go_to_term(found_term, offset, info_offset) - - # Read and record terms. - - try: - # Add the found term if it starts with the specified term. - - while found_term.startswith(term): - terms.append(found_term) - found_term, offset, frequency, doc_frequency = self.info_reader.read_term() - - except EOFError: - pass - - return terms - - def find_positions(self, term): - - "Return the documents and positions at which the given 'term' is found." - - t = self._find_term(term) - if t is None: - return None - else: - offset, frequency, doc_frequency = t - return self._get_positions(offset, doc_frequency) - - def get_frequency(self, term): - - "Return the frequency of the given 'term'." - - t = self._find_term(term) - if t is None: - return None - else: - offset, frequency, doc_frequency = t - return frequency - - def get_document_frequency(self, term): - - "Return the document frequency of the given 'term'." - - t = self._find_term(term) - if t is None: - return None - else: - offset, frequency, doc_frequency = t - return doc_frequency - - def close(self): - self.info_reader.close() - self.index_reader.close() - self.position_dict_reader.close() - -# Specific classes for storing document information. - -class FieldWriter(FileWriter): - - "Writing field data to files." - - def reset(self): - self.last_docnum = 0 - - def write_fields(self, docnum, fields): - - """ - Write for the given 'docnum', a list of 'fields' (integer, string pairs - representing field identifiers and values respectively). - Return the offset at which the fields are stored. - """ - - offset = self.tell() - - # Write the document number delta. - - self.write_number(docnum - self.last_docnum) - - # Write the number of fields. - - self.write_number(len(fields)) - - # Write the fields themselves. - - for i, field in fields: - self.write_number(i) - self.write_string(field, 1) # compress - - self.last_docnum = docnum - return offset - -class FieldReader(FileReader): - - "Reading field data from files." - - def reset(self): - self.last_docnum = 0 - - def read_fields(self): - - """ - Read fields from the file, returning a tuple containing the document - number and a list of field (identifier, value) pairs. - """ - - # Read the document number. - - self.last_docnum += self.read_number() - - # Read the number of fields. - - nfields = self.read_number() - - # Collect the fields. - - fields = [] - i = 0 - - while i < nfields: - identifier = self.read_number() - value = self.read_string(1) # decompress - fields.append((identifier, value)) - i += 1 - - return self.last_docnum, fields - - def read_document_fields(self, docnum, offset): - - """ - Read fields for 'docnum' at the given 'offset'. This permits the - retrieval of details for the specified document, as well as scanning for - later documents. - """ - - self.seek(offset) - bad_docnum, fields = self.read_fields() - self.last_docnum = docnum - return docnum, fields - -class FieldIndexWriter(FileWriter): - - "Writing field index details to files." - - def reset(self): - self.last_docnum = 0 - self.last_offset = 0 - - def write_document(self, docnum, offset): - - """ - Write for the given 'docnum', the 'offset' at which the fields for the - document are stored in the fields file. - """ - - # Write the document number and offset deltas. - - self.write_number(docnum - self.last_docnum) - self.write_number(offset - self.last_offset) - - self.last_docnum = docnum - self.last_offset = offset - -class FieldIndexReader(FileReader): - - "Reading field index details from files." - - def reset(self): - self.last_docnum = 0 - self.last_offset = 0 - - def read_document(self): - - "Read a document number and field file offset." - - # Read the document number delta and offset. - - self.last_docnum += self.read_number() - self.last_offset += self.read_number() - - return self.last_docnum, self.last_offset - -class FieldDictionaryWriter: - - "Writing field dictionary details." - - def __init__(self, field_writer, field_index_writer, interval): - self.field_writer = field_writer - self.field_index_writer = field_index_writer - self.interval = interval - self.entry = 0 - - def write_fields(self, docnum, fields): - - "Write details of the document with the given 'docnum' and 'fields'." - - offset = self.field_writer.write_fields(docnum, fields) - - if self.entry % self.interval == 0: - self.field_index_writer.write_document(docnum, offset) - - self.entry += 1 - - def close(self): - self.field_writer.close() - self.field_index_writer.close() - -class FieldDictionaryReader: - - "Reading field dictionary details." - - def __init__(self, field_reader, field_index_reader): - self.field_reader = field_reader - self.field_index_reader = field_index_reader - - self.docs = [] - try: - while 1: - self.docs.append(self.field_index_reader.read_document()) - except EOFError: - pass - - # Large numbers for ordering purposes. - - if self.docs: - self.max_offset = self.docs[-1][1] - else: - self.max_offset = None - - # Iterator convenience methods. - - def __iter__(self): - self.rewind() - return self - - def next(self): - try: - return self.read_fields() - except EOFError: - raise StopIteration - - # Sequential access methods. - - def rewind(self): - self.field_reader.rewind() - - def read_fields(self): - - "Return the next document number and fields." - - return self.field_reader.read_fields() - - # Random access methods. - - def get_fields(self, docnum): - - "Read the fields of the document with the given 'docnum'." - - i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 - - # Get the entry position providing the term or one preceding it. - - if i == -1: - return None - - found_docnum, offset = self.docs[i] - - # Read from the fields file. - - found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) - - # Scan for the document, if necessary. - - try: - while docnum > found_docnum: - found_docnum, fields = self.field_reader.read_fields() - except EOFError: - pass - - # If the document is found, return the fields. - - if docnum == found_docnum: - return fields - else: - return None - - def close(self): - self.field_reader.close() - self.field_index_reader.close() - -# Dictionary merging classes. - -class Merger: - - "Merge files." - - def __init__(self, writer, readers): - self.writer = writer - self.readers = readers - - def close(self): - for reader in self.readers: - reader.close() - self.writer.close() - -class TermDictionaryMerger(Merger): - - "Merge term and position files." - - def merge(self): - - """ - Merge terms and positions from the readers, sending them to the writer. - """ - - last_term = None - current_readers = [] - - for term, frequency, doc_frequency, positions in itermerge(self.readers): - if term == last_term: - current_readers.append(positions) - else: - if current_readers: - self.writer.write_term_positions(last_term, itermerge(current_readers)) - last_term = term - current_readers = [positions] - else: - if current_readers: - self.writer.write_term_positions(last_term, itermerge(current_readers)) - -class FieldDictionaryMerger(Merger): - - "Merge field files." - - def merge(self): - - """ - Merge fields from the readers, sending them to the writer. - """ - - for docnum, fields in itermerge(self.readers): - self.writer.write_fields(docnum, fields) - -# Utility functions. - -def get_term_writer(pathname, partition, interval, doc_interval): - - """ - Return a term dictionary writer using files under the given 'pathname' - labelled according to the given 'partition', using the given indexing - 'interval' for terms and 'doc_interval' for document position records. - """ - - tdf = open(join(pathname, "terms-%s" % partition), "wb") - info_writer = TermWriter(tdf) - - tdif = open(join(pathname, "terms_index-%s" % partition), "wb") - index_writer = TermIndexWriter(tdif) - - tpf = open(join(pathname, "positions-%s" % partition), "wb") - positions_writer = PositionWriter(tpf) - - tpif = open(join(pathname, "positions_index-%s" % partition), "wb") - positions_index_writer = PositionIndexWriter(tpif) - - positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) - - return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) - -def get_field_writer(pathname, partition, interval): - - """ - Return a field dictionary writer using files under the given 'pathname' - labelled according to the given 'partition', using the given indexing - 'interval'. - """ - - ff = open(join(pathname, "fields-%s" % partition), "wb") - field_writer = FieldWriter(ff) - - fif = open(join(pathname, "fields_index-%s" % partition), "wb") - field_index_writer = FieldIndexWriter(fif) - - return FieldDictionaryWriter(field_writer, field_index_writer, interval) - -def get_term_reader(pathname, partition): - - """ - Return a term dictionary reader using files under the given 'pathname' - labelled according to the given 'partition'. - """ - - tdf = open(join(pathname, "terms-%s" % partition), "rb") - info_reader = TermReader(tdf) - - tdif = open(join(pathname, "terms_index-%s" % partition), "rb") - index_reader = TermIndexReader(tdif) - - positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) - positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) - - positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) - - return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) - -def get_field_reader(pathname, partition): - - """ - Return a field dictionary reader using files under the given 'pathname' - labelled according to the given 'partition'. - """ - - ff = open(join(pathname, "fields-%s" % partition), "rb") - field_reader = FieldReader(ff) - - fif = open(join(pathname, "fields_index-%s" % partition), "rb") - field_index_reader = FieldIndexReader(fif) - - return FieldDictionaryReader(field_reader, field_index_reader) - -def rename_files(pathname, names, from_partition, to_partition): - for name in names: - rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) - -def rename_term_files(pathname, from_partition, to_partition): - rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) - -def rename_field_files(pathname, from_partition, to_partition): - rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) - -def remove_files(pathname, names, partition): - for name in names: - remove(join(pathname, "%s-%s" % (name, partition))) - -def remove_term_files(pathname, partition): - remove_files(pathname, TERM_FILENAMES, partition) - -def remove_field_files(pathname, partition): - remove_files(pathname, FIELD_FILENAMES, partition) - -# High-level classes. - -class Document: - - "A container of document information." - - def __init__(self, docnum): - self.docnum = docnum - self.fields = [] - self.terms = {} - - def add_position(self, term, position): - - """ - Add a position entry for the given 'term', indicating the given - 'position'. - """ - - self.terms.setdefault(term, []).append(position) - - def add_field(self, identifier, value): - - "Add a field having the given 'identifier' and 'value'." - - self.fields.append((identifier, unicode(value))) # convert to string - - def set_fields(self, fields): - - """ - Set the document's 'fields': a list of tuples each containing an integer - identifier and a string value. - """ - - self.fields = fields - -class IndexWriter: - - """ - Building term information and writing it to the term and field dictionaries. - """ - - def __init__(self, pathname, interval, doc_interval, flush_interval): - self.pathname = pathname - self.interval = interval - self.doc_interval = doc_interval - self.flush_interval = flush_interval - - self.dict_partition = 0 - self.field_dict_partition = 0 - - self.terms = {} - self.docs = {} - - self.doc_counter = 0 - - def add_document(self, doc): - - """ - Add the given document 'doc', updating the document counter and flushing - terms and fields if appropriate. - """ - - for term, positions in doc.terms.items(): - self.terms.setdefault(term, {})[doc.docnum] = positions - - self.docs[doc.docnum] = doc.fields - - self.doc_counter += 1 - if self.flush_interval and self.doc_counter >= self.flush_interval: - self.flush_terms() - self.flush_fields() - self.doc_counter = 0 - - def get_term_writer(self): - - "Return a term dictionary writer for the current partition." - - return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) - - def get_field_writer(self): - - "Return a field dictionary writer for the current partition." - - return get_field_writer(self.pathname, self.field_dict_partition, self.interval) - - def flush_terms(self): - - "Flush terms into the current term dictionary partition." - - # Get the terms in order. - - all_terms = self.terms - terms = all_terms.keys() - terms.sort() - - dict_writer = self.get_term_writer() - - for term in terms: - doc_positions = all_terms[term].items() - dict_writer.write_term_positions(term, doc_positions) - - dict_writer.close() - - self.terms = {} - self.dict_partition += 1 - - def flush_fields(self): - - "Flush fields into the current term dictionary partition." - - # Get the documents in order. - - docs = self.docs.items() - docs.sort() - - field_dict_writer = self.get_field_writer() - - for docnum, fields in docs: - field_dict_writer.write_fields(docnum, fields) - - field_dict_writer.close() - - self.docs = {} - self.field_dict_partition += 1 - - def close(self): - if self.terms: - self.flush_terms() - if self.docs: - self.flush_fields() - -class IndexReader: - - "Accessing the term and field dictionaries." - - def __init__(self, pathname): - self.dict_reader = get_term_reader(pathname, "merged") - self.field_dict_reader = get_field_reader(pathname, "merged") - - def find_terms(self, term): - return self.dict_reader.find_terms(term) - - def find_positions(self, term): - return self.dict_reader.find_positions(term) - - def get_frequency(self, term): - return self.dict_reader.get_frequency(term) - - def get_document_frequency(self, term): - return self.dict_reader.get_document_frequency(term) - - def get_fields(self, docnum): - return self.field_dict_reader.get_fields(docnum) - - def close(self): - self.dict_reader.close() - self.field_dict_reader.close() - -class Index: - - "An inverted index solution encapsulating the various components." - - def __init__(self, pathname): - self.pathname = pathname - self.reader = None - self.writer = None - - def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): - - """ - Return a writer, optionally using the given indexing 'interval', - 'doc_interval' and 'flush_interval'. - """ - - if not exists(self.pathname): - mkdir(self.pathname) - - self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) - return self.writer - - def get_reader(self, partition=0): - - "Return a reader for the index." - - # Ensure that only one partition exists. - - self.merge() - return self._get_reader(partition) - - def _get_reader(self, partition): - - "Return a reader for the index." - - if not exists(self.pathname): - raise OSError, "Index path %r does not exist." % self.pathname - - self.reader = IndexReader(self.pathname) - return self.reader - - def merge(self): - - "Merge/optimise index partitions." - - self.merge_terms() - self.merge_fields() - - def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): - - """ - Merge term dictionaries using the given indexing 'interval' and - 'doc_interval'. - """ - - readers = [] - partitions = set() - - for filename in listdir(self.pathname): - if filename.startswith("terms-"): # 6 character prefix - partition = filename[6:] - readers.append(get_term_reader(self.pathname, partition)) - partitions.add(partition) - - # Write directly to a dictionary. - - if len(readers) > 1: - if "merged" in partitions: - rename_term_files(self.pathname, "merged", "old-merged") - partitions.remove("merged") - partitions.add("old-merged") - - writer = get_term_writer(self.pathname, "merged", interval, doc_interval) - merger = TermDictionaryMerger(writer, readers) - merger.merge() - merger.close() - - # Remove old files. - - for partition in partitions: - remove_term_files(self.pathname, partition) - - elif len(readers) == 1: - partition = list(partitions)[0] - if partition != "merged": - rename_term_files(self.pathname, partition, "merged") - - def merge_fields(self, interval=FIELD_INTERVAL): - - "Merge field dictionaries using the given indexing 'interval'." - - readers = [] - partitions = set() - - for filename in listdir(self.pathname): - if filename.startswith("fields-"): # 7 character prefix - partition = filename[7:] - readers.append(get_field_reader(self.pathname, partition)) - partitions.add(partition) - - # Write directly to a dictionary. - - if len(readers) > 1: - if "merged" in partitions: - rename_field_files(self.pathname, "merged", "old-merged") - partitions.remove("merged") - partitions.add("old-merged") - - writer = get_field_writer(self.pathname, "merged", interval) - merger = FieldDictionaryMerger(writer, readers) - merger.merge() - merger.close() - - # Remove old files. - - for partition in partitions: - remove_field_files(self.pathname, partition) - - elif len(readers) == 1: - partition = list(partitions)[0] - if partition != "merged": - rename_field_files(self.pathname, partition, "merged") - - def close(self): - if self.reader is not None: - self.reader.close() - self.reader = None - if self.writer is not None: - self.writer.close() - self.writer = None - -# vim: tabstop=4 expandtab shiftwidth=4