1.1 --- a/iixr/index.py Sat Nov 20 23:54:05 2010 +0100
1.2 +++ b/iixr/index.py Sat Nov 20 23:56:16 2010 +0100
1.3 @@ -20,6 +20,7 @@
1.4
1.5 from iixr.filesystem import *
1.6 from iixr.merging import *
1.7 +from itertools import islice
1.8 from os import listdir, mkdir # index and partition discovery
1.9 from os.path import exists
1.10
1.11 @@ -34,6 +35,7 @@
1.12 DOCUMENT_INTERVAL = 100
1.13 FIELD_INTERVAL = 100
1.14 FLUSH_INTERVAL = 10000
1.15 +OPEN_PARTITIONS = 20
1.16
1.17 # High-level classes.
1.18
1.19 @@ -220,13 +222,14 @@
1.20 "An inverted index solution encapsulating the various components."
1.21
1.22 def __init__(self, pathname, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, field_interval=FIELD_INTERVAL,
1.23 - flush_interval=FLUSH_INTERVAL):
1.24 + flush_interval=FLUSH_INTERVAL, open_partitions=OPEN_PARTITIONS):
1.25
1.26 self.pathname = pathname
1.27 self.interval = interval
1.28 self.doc_interval = doc_interval
1.29 self.field_interval = field_interval
1.30 self.flush_interval = flush_interval
1.31 + self.open_partitions = open_partitions
1.32 self.reader = None
1.33 self.writer = None
1.34
1.35 @@ -296,71 +299,71 @@
1.36 self._merge_terms()
1.37 self._merge_fields()
1.38
1.39 - def _merge_terms(self):
1.40 + def _merge_dictionaries(self, get_partitions, rename_files, remove_files, get_reader, get_writer, get_merger, intervals):
1.41 +
1.42 + "Merge term or field dictionaries."
1.43
1.44 - "Merge term dictionaries."
1.45 + partitions = get_partitions()
1.46 +
1.47 + # Ensure the correct labelling of a single partition.
1.48
1.49 - readers = []
1.50 - partitions = self.get_term_partitions()
1.51 + if len(partitions) == 1:
1.52 + partition = list(partitions)[0]
1.53 + if partition != "merged":
1.54 + rename_files(self.pathname, partition, "merged")
1.55 + return
1.56
1.57 - for partition in partitions:
1.58 - readers.append(get_term_reader(self.pathname, partition))
1.59 + # Merge the partitions.
1.60 +
1.61 + old_merged_counter = 0
1.62
1.63 - # Write directly to a dictionary.
1.64 + while len(partitions) > 1:
1.65
1.66 - if len(readers) > 1:
1.67 if "merged" in partitions:
1.68 - rename_term_files(self.pathname, "merged", "old-merged")
1.69 + rename_files(self.pathname, "merged", "old-merged-%d" % old_merged_counter)
1.70 partitions.remove("merged")
1.71 - partitions.add("old-merged")
1.72 + partitions.add("old-merged-%d" % old_merged_counter)
1.73 + old_merged_counter += 1
1.74 +
1.75 + # Process only a certain number at once, avoiding resource limits.
1.76 +
1.77 + active_partitions = list(islice(partitions, self.open_partitions))
1.78
1.79 - writer = get_term_writer(self.pathname, "merged", self.interval, self.doc_interval)
1.80 - merger = TermDictionaryMerger(writer, readers)
1.81 + readers = []
1.82 + for partition in active_partitions:
1.83 + readers.append(get_reader(self.pathname, partition))
1.84 +
1.85 + # Write directly to a dictionary.
1.86 +
1.87 + writer = get_writer(self.pathname, "merged", *intervals)
1.88 + merger = get_merger(writer, readers)
1.89 merger.merge()
1.90 merger.close()
1.91
1.92 # Remove old files.
1.93
1.94 - for partition in partitions:
1.95 - remove_term_files(self.pathname, partition)
1.96 + for partition in active_partitions:
1.97 + remove_files(self.pathname, partition)
1.98 +
1.99 + # Acquire the partitions to check their number again.
1.100 +
1.101 + partitions = get_partitions()
1.102
1.103 - elif len(readers) == 1:
1.104 - partition = list(partitions)[0]
1.105 - if partition != "merged":
1.106 - rename_term_files(self.pathname, partition, "merged")
1.107 + def _merge_terms(self):
1.108 +
1.109 + "Merge term dictionaries."
1.110 +
1.111 + self._merge_dictionaries(self.get_term_partitions, rename_term_files,
1.112 + remove_term_files, get_term_reader, get_term_writer,
1.113 + TermDictionaryMerger, [self.interval, self.doc_interval])
1.114
1.115 def _merge_fields(self):
1.116
1.117 "Merge field dictionaries."
1.118
1.119 - readers = []
1.120 - partitions = self.get_field_partitions()
1.121 -
1.122 - for partition in partitions:
1.123 - readers.append(get_field_reader(self.pathname, partition))
1.124 -
1.125 - # Write directly to a dictionary.
1.126 -
1.127 - if len(readers) > 1:
1.128 - if "merged" in partitions:
1.129 - rename_field_files(self.pathname, "merged", "old-merged")
1.130 - partitions.remove("merged")
1.131 - partitions.add("old-merged")
1.132 -
1.133 - writer = get_field_writer(self.pathname, "merged", self.field_interval)
1.134 - merger = FieldDictionaryMerger(writer, readers)
1.135 - merger.merge()
1.136 - merger.close()
1.137 -
1.138 - # Remove old files.
1.139 -
1.140 - for partition in partitions:
1.141 - remove_field_files(self.pathname, partition)
1.142 -
1.143 - elif len(readers) == 1:
1.144 - partition = list(partitions)[0]
1.145 - if partition != "merged":
1.146 - rename_field_files(self.pathname, partition, "merged")
1.147 + self._merge_dictionaries(self.get_field_partitions, rename_field_files,
1.148 + remove_field_files, get_field_reader, get_field_writer,
1.149 + FieldDictionaryMerger, [self.field_interval])
1.150
1.151 def update(self, other_indexes):
1.152