# HG changeset patch # User Paul Boddie # Date 1290293776 -3600 # Node ID 6dd92daca068f82954f6de43ea902f474a324fae # Parent 1cccc03f183e23065a05b0ce35d46bf99a7c6c34 Introduced code to handle index merging where a large number of partitions exist, combining the term and field dictionary merging into a common method which is then parameterised for each kind of data. diff -r 1cccc03f183e -r 6dd92daca068 iixr/index.py --- a/iixr/index.py Sat Nov 20 23:54:05 2010 +0100 +++ b/iixr/index.py Sat Nov 20 23:56:16 2010 +0100 @@ -20,6 +20,7 @@ from iixr.filesystem import * from iixr.merging import * +from itertools import islice from os import listdir, mkdir # index and partition discovery from os.path import exists @@ -34,6 +35,7 @@ DOCUMENT_INTERVAL = 100 FIELD_INTERVAL = 100 FLUSH_INTERVAL = 10000 +OPEN_PARTITIONS = 20 # High-level classes. @@ -220,13 +222,14 @@ "An inverted index solution encapsulating the various components." def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL, - flush_interval=FLUSH_INTERVAL): + flush_interval=FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS): self.pathname = pathname self.interval = interval self.doc_interval = doc_interval self.field_interval = field_interval self.flush_interval = flush_interval + self.open_partitions = open_partitions self.reader = None self.writer = None @@ -296,71 +299,71 @@ self._merge_terms() self._merge_fields() - def _merge_terms(self): + def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals): + + "Merge term or field dictionaries." - "Merge term dictionaries." + partitions = get_partitions() + + # Ensure the correct labelling of a single partition. - readers = [] - partitions = self.get_term_partitions() + if len(partitions) == 1: + partition = list(partitions)[0] + if partition != "merged": + rename_files(self.pathname, partition, "merged") + return - for partition in partitions: - readers.append(get_term_reader(self.pathname, partition)) + # Merge the partitions. + + old_merged_counter = 0 - # Write directly to a dictionary. + while len(partitions) > 1: - if len(readers) > 1: if "merged" in partitions: - rename_term_files(self.pathname, "merged", "old-merged") + rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter) partitions.remove("merged") - partitions.add("old-merged") + partitions.add("old-merged-%d" % old_merged_counter) + old_merged_counter += 1 + + # Process only a certain number at once, avoiding resource limits. + + active_partitions = list(islice(partitions, self.open_partitions)) - writer = get_term_writer(self.pathname, "merged", self.interval, self.doc_interval) - merger = TermDictionaryMerger(writer, readers) + readers = [] + for partition in active_partitions: + readers.append(get_reader(self.pathname, partition)) + + # Write directly to a dictionary. + + writer = get_writer(self.pathname, "merged", *intervals) + merger = get_merger(writer, readers) merger.merge() merger.close() # Remove old files. - for partition in partitions: - remove_term_files(self.pathname, partition) + for partition in active_partitions: + remove_files(self.pathname, partition) + + # Acquire the partitions to check their number again. + + partitions = get_partitions() - elif len(readers) == 1: - partition = list(partitions)[0] - if partition != "merged": - rename_term_files(self.pathname, partition, "merged") + def _merge_terms(self): + + "Merge term dictionaries." + + self._merge_dictionaries(self.get_term_partitions, rename_term_files, + remove_term_files, get_term_reader, get_term_writer, + TermDictionaryMerger, [self.interval, self.doc_interval]) def _merge_fields(self): "Merge field dictionaries." - readers = [] - partitions = self.get_field_partitions() - - for partition in partitions: - readers.append(get_field_reader(self.pathname, partition)) - - # Write directly to a dictionary. - - if len(readers) > 1: - if "merged" in partitions: - rename_field_files(self.pathname, "merged", "old-merged") - partitions.remove("merged") - partitions.add("old-merged") - - writer = get_field_writer(self.pathname, "merged", self.field_interval) - merger = FieldDictionaryMerger(writer, readers) - merger.merge() - merger.close() - - # Remove old files. - - for partition in partitions: - remove_field_files(self.pathname, partition) - - elif len(readers) == 1: - partition = list(partitions)[0] - if partition != "merged": - rename_field_files(self.pathname, partition, "merged") + self._merge_dictionaries(self.get_field_partitions, rename_field_files, + remove_field_files, get_field_reader, get_field_writer, + FieldDictionaryMerger, [self.field_interval]) def update(self, other_indexes):