1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 try: 51 from vint import vint as _vint 52 53 def vint(number): 54 55 "Write 'number' as a variable-length integer." 56 57 if number >= 0: 58 return _vint(number) 59 else: 60 raise ValueError, "Number %r is negative." % number 61 62 except ImportError: 63 64 def vint(number): 65 66 "Write 'number' as a variable-length integer." 67 68 if number >= 0: 69 70 # Special case: one byte containing a 7-bit number. 71 72 if number < 128: 73 return chr(number) 74 75 # Write the number from least to most significant digits. 76 77 bytes = [] 78 79 while number != 0: 80 lsd = number & 127 81 number = number >> 7 82 if number != 0: 83 lsd |= 128 84 bytes.append(chr(lsd)) 85 86 return "".join(bytes) 87 88 # Negative numbers are not supported. 89 90 else: 91 raise ValueError, "Number %r is negative." % number 92 93 # Foundation classes. 94 95 class File: 96 97 "A basic file abstraction." 98 99 def __init__(self, f): 100 self.f = f 101 self.reset() 102 103 def reset(self): 104 105 "To be used to reset the state of the reader or writer between records." 106 107 pass 108 109 def rewind(self): 110 self.seek(0) 111 self.reset() 112 113 def seek(self, offset): 114 115 "To be defined by readers." 116 117 pass 118 119 def flush(self): 120 121 "To be defined by writers." 122 123 pass 124 125 def close(self): 126 if self.f is not None: 127 self.flush() 128 self.f.close() 129 self.f = None 130 131 class FileWriter(File): 132 133 "Writing basic data types to files." 134 135 def __init__(self, f): 136 File.__init__(self, f) 137 self.cache = [] 138 self.cache_length = 0 139 140 def write_number(self, number): 141 142 "Write 'number' to the file using a variable length encoding." 143 144 self.write(vint(number)) 145 146 def write_string(self, s, compress=0): 147 148 """ 149 Write 's' to the file, recording its length and compressing the string 150 if 'compress' is set to a true value. 151 """ 152 153 # Convert Unicode objects to strings. 154 155 if isinstance(s, unicode): 156 s = s.encode("utf-8") 157 158 # Compress the string if requested. 159 160 if compress: 161 for flag, fn in compressors: 162 cs = fn(s) 163 164 # Take the first string shorter than the original. 165 166 if len(cs) < len(s): 167 s = cs 168 break 169 else: 170 flag = "-" 171 172 else: 173 flag = "" 174 175 # Write the length of the data before the data itself. 176 177 length = len(s) 178 self.write(flag + vint(length) + s) 179 180 # Cache-affected methods. 181 182 def write(self, s): 183 self.cache.append(s) 184 self.cache_length += len(s) 185 if self.cache_length >= 1000: 186 self.flush() 187 188 def tell(self): 189 return self.f.tell() + self.cache_length 190 191 def flush(self): 192 self.f.write("".join(self.cache)) 193 self.cache = [] 194 self.cache_length = 0 195 196 class FileReader(File): 197 198 "Reading basic data types from files." 199 200 def __init__(self, f): 201 File.__init__(self, f) 202 self.cache = "" 203 self.cache_length = 0 204 205 def read_number(self): 206 207 "Read a number from the file." 208 209 # Read each byte, adding it to the number. 210 211 shift = 0 212 number = 0 213 read = self.read 214 215 try: 216 csd = ord(read(1)) 217 while csd & 128: 218 number += ((csd & 127) << shift) 219 shift += 7 220 csd = ord(read(1)) 221 else: 222 number += (csd << shift) 223 except TypeError: 224 raise EOFError 225 226 return number 227 228 def read_string(self, decompress=0): 229 230 """ 231 Read a string from the file, decompressing the stored data if 232 'decompress' is set to a true value. 233 """ 234 235 # Decompress the data if requested. 236 237 if decompress: 238 flag = self.read(1) 239 else: 240 flag = "-" 241 242 length = self.read_number() 243 s = self.read(length) 244 245 # Perform decompression if applicable. 246 247 if flag != "-": 248 fn = decompressors[flag] 249 s = fn(s) 250 251 # Convert strings to Unicode objects. 252 253 return unicode(s, "utf-8") 254 255 # Cache-affected methods. 256 257 def read(self, n): 258 needed = n - self.cache_length 259 if needed > 0: 260 s = self.f.read(max(needed, 1000)) 261 self.cache += s 262 self.cache_length += len(s) 263 264 s = self.cache[:n] 265 self.cache = self.cache[n:] 266 self.cache_length = len(self.cache) 267 return s 268 269 def tell(self): 270 return self.f.tell() - self.cache_length 271 272 def seek(self, offset): 273 current = self.tell() 274 self.f.seek(offset) 275 if offset >= current: 276 discarded = offset - current 277 self.cache = self.cache[discarded:] 278 self.cache_length = len(self.cache) 279 else: 280 self.cache = "" 281 self.cache_length = 0 282 283 class FileOpener: 284 285 "Opening files using their filenames." 286 287 def __init__(self, filename): 288 self.filename = filename 289 290 def open(self, mode): 291 return open(self.filename, mode) 292 293 def close(self): 294 pass 295 296 # Specific classes for storing term and position information. 297 298 class PositionWriter(FileWriter): 299 300 "Writing position information to files." 301 302 def reset(self): 303 self.last_docnum = 0 304 305 def write_positions(self, docnum, positions): 306 307 """ 308 Write for the document 'docnum' the given 'positions'. 309 Return the offset of the written record. 310 """ 311 312 if docnum < self.last_docnum: 313 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 314 315 # Record the offset of this record. 316 317 offset = self.tell() 318 319 # Make sure that the positions are sorted. 320 321 positions.sort() 322 323 # Write the position deltas. 324 325 output = [] 326 last = 0 327 328 for position in positions: 329 output.append(vint(position - last)) 330 last = position 331 332 # Write the document number delta. 333 # Write the number of positions. 334 # Then write the positions. 335 336 self.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 337 338 self.last_docnum = docnum 339 return offset 340 341 class PositionOpener(FileOpener): 342 343 "Reading position information from files." 344 345 def read_term_positions(self, offset, count): 346 347 """ 348 Read all positions from 'offset', seeking to that position in the file 349 before reading. The number of documents available for reading is limited 350 to 'count'. 351 """ 352 353 # Duplicate the file handle. 354 355 f = self.open("rb") 356 return PositionIterator(f, offset, count) 357 358 class PositionIndexWriter(FileWriter): 359 360 "Writing position index information to files." 361 362 def reset(self): 363 self.last_docnum = 0 364 self.last_pos_offset = 0 365 366 def write_positions(self, docnum, pos_offset, count): 367 368 """ 369 Write the given 'docnum, 'pos_offset' and document 'count' to the 370 position index file. 371 """ 372 373 # Record the offset of this record. 374 375 offset = self.tell() 376 output = [] 377 378 # Write the document number delta. 379 380 output.append(vint(docnum - self.last_docnum)) 381 self.last_docnum = docnum 382 383 # Write the position file offset delta. 384 385 output.append(vint(pos_offset - self.last_pos_offset)) 386 self.last_pos_offset = pos_offset 387 388 # Write the document count. 389 390 output.append(vint(count)) 391 392 # Actually write the data. 393 394 self.write("".join(output)) 395 396 return offset 397 398 class PositionIndexOpener(FileOpener): 399 400 "Reading position index information from files." 401 402 def read_term_positions(self, offset, doc_frequency): 403 404 """ 405 Read all positions from 'offset', seeking to that position in the file 406 before reading. The number of documents available for reading is limited 407 to 'doc_frequency'. 408 """ 409 410 # Duplicate the file handle. 411 412 f = self.open("rb") 413 return PositionIndexIterator(f, offset, doc_frequency) 414 415 # Iterators for position-related files. 416 417 class IteratorBase: 418 419 def __init__(self, count): 420 self.replenish(count) 421 422 def replenish(self, count): 423 self.count = count 424 self.read_documents = 0 425 426 def __len__(self): 427 return self.count 428 429 def sort(self): 430 pass # Stored document positions are already sorted. 431 432 def __iter__(self): 433 return self 434 435 class PositionIterator(FileReader, IteratorBase): 436 437 "Iterating over document positions." 438 439 def __init__(self, f, offset, count): 440 FileReader.__init__(self, f) 441 IteratorBase.__init__(self, count) 442 self.seek(offset) 443 444 def reset(self): 445 self.last_docnum = 0 446 447 def read_positions(self): 448 449 "Read positions, returning a document number and a list of positions." 450 451 # Read the document number delta and add it to the last number. 452 453 self.last_docnum += self.read_number() 454 455 # Read the number of positions. 456 457 npositions = self.read_number() 458 459 # Read the position deltas, adding each previous position to get the 460 # appropriate collection of absolute positions. 461 462 i = 0 463 last = 0 464 positions = [] 465 466 while i < npositions: 467 last += self.read_number() 468 positions.append(last) 469 i += 1 470 471 return self.last_docnum, positions 472 473 def next(self): 474 475 "Read positions for a single document." 476 477 if self.read_documents < self.count: 478 self.read_documents += 1 479 return self.read_positions() 480 else: 481 raise StopIteration 482 483 class PositionIndexIterator(FileReader, IteratorBase): 484 485 "Iterating over document positions." 486 487 def __init__(self, f, offset, count): 488 FileReader.__init__(self, f) 489 IteratorBase.__init__(self, count) 490 self.seek(offset) 491 self.section_count = 0 492 493 def reset(self): 494 self.last_docnum = 0 495 self.last_pos_offset = 0 496 497 def read_positions(self): 498 499 """ 500 Read a document number, a position file offset for the position index 501 file, and the number of documents in a section of that file. 502 """ 503 504 # Read the document number delta. 505 506 self.last_docnum += self.read_number() 507 508 # Read the offset delta. 509 510 self.last_pos_offset += self.read_number() 511 512 # Read the document count. 513 514 count = self.read_number() 515 516 return self.last_docnum, self.last_pos_offset, count 517 518 def next(self): 519 520 "Read positions for a single document." 521 522 self.read_documents += self.section_count 523 if self.read_documents < self.count: 524 docnum, pos_offset, self.section_count = t = self.read_positions() 525 return t 526 else: 527 raise StopIteration 528 529 class PositionDictionaryWriter: 530 531 "Writing position dictionaries." 532 533 def __init__(self, position_writer, position_index_writer, interval): 534 self.position_writer = position_writer 535 self.position_index_writer = position_index_writer 536 self.interval = interval 537 538 def write_term_positions(self, doc_positions): 539 540 """ 541 Write all 'doc_positions' - a collection of tuples of the form (document 542 number, position list) - to the file. 543 544 Add some records to the index, making dictionary entries. 545 546 Return a tuple containing the offset of the written data, the frequency 547 (number of positions), and document frequency (number of documents) for 548 the term involved. 549 """ 550 551 # Reset the writers. 552 553 self.position_writer.reset() 554 self.position_index_writer.reset() 555 556 index_offset = None 557 558 # Write the positions. 559 560 frequency = 0 561 first_docnum = None 562 first_offset = None 563 count = 0 564 565 doc_positions.sort() 566 567 for docnum, positions in doc_positions: 568 pos_offset = self.position_writer.write_positions(docnum, positions) 569 570 # Retain the first record offset for a subsequent index entry. 571 572 if first_offset is None: 573 first_offset = pos_offset 574 first_docnum = docnum 575 576 frequency += len(positions) 577 count += 1 578 579 # Every {interval} entries, write an index entry. 580 581 if count % self.interval == 0: 582 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 583 584 # Remember the first index entry offset. 585 586 if index_offset is None: 587 index_offset = io 588 589 first_offset = None 590 first_docnum = None 591 592 # Reset the position writer so that position readers accessing 593 # a section start with the correct document number. 594 595 self.position_writer.reset() 596 597 # Finish writing an index entry for the remaining documents. 598 599 else: 600 if first_offset is not None: 601 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 602 603 # Remember the first index entry offset. 604 605 if index_offset is None: 606 index_offset = io 607 608 return index_offset, frequency, count 609 610 def close(self): 611 self.position_writer.close() 612 self.position_index_writer.close() 613 614 class PositionDictionaryReader: 615 616 "Reading position dictionaries." 617 618 def __init__(self, position_opener, position_index_opener): 619 self.position_opener = position_opener 620 self.position_index_opener = position_index_opener 621 622 def read_term_positions(self, offset, doc_frequency): 623 624 """ 625 Return an iterator for dictionary entries starting at 'offset' with the 626 given 'doc_frequency'. 627 """ 628 629 return PositionDictionaryIterator(self.position_opener, 630 self.position_index_opener, offset, doc_frequency) 631 632 def close(self): 633 pass 634 635 class PositionDictionaryIterator: 636 637 "Iteration over position dictionary entries." 638 639 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 640 self.position_opener = position_opener 641 self.doc_frequency = doc_frequency 642 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 643 self.iterator = None 644 645 # Remember the last values. 646 647 self.found_docnum, self.found_positions = None, None 648 649 # Maintain state for the next index entry, if read. 650 651 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 652 653 # Initialise the current index entry and current position file iterator. 654 655 self._next_section() 656 self._init_section() 657 658 # Sequence methods. 659 660 def __len__(self): 661 return self.doc_frequency 662 663 def sort(self): 664 pass 665 666 # Iterator methods. 667 668 def __iter__(self): 669 return self 670 671 def next(self): 672 673 """ 674 Attempt to get the next document record from the section in the 675 positions file. 676 """ 677 678 # Return any visited but unrequested record. 679 680 if self.found_docnum is not None: 681 t = self.found_docnum, self.found_positions 682 self.found_docnum, self.found_positions = None, None 683 return t 684 685 # Or search for the next record. 686 687 while 1: 688 689 # Either return the next record. 690 691 try: 692 return self.iterator.next() 693 694 # Or, where a section is finished, get the next section and try again. 695 696 except StopIteration: 697 698 # Where a section follows, update the index iterator, but keep 699 # reading using the same file iterator (since the data should 700 # just follow on from the last section). 701 702 self._next_section() 703 self.iterator.replenish(self.section_count) 704 705 # Reset the state of the iterator to make sure that document 706 # numbers are correct. 707 708 self.iterator.reset() 709 710 def from_document(self, docnum): 711 712 """ 713 Attempt to navigate to a positions entry for the given 'docnum', 714 returning the positions for 'docnum', or None otherwise. 715 """ 716 717 # Return any unrequested document positions. 718 719 if docnum == self.found_docnum: 720 return self.found_positions 721 722 # Read ahead in the index until the next entry refers to a document 723 # later than the desired document. 724 725 try: 726 if self.next_docnum is None: 727 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 728 729 # Read until the next entry is after the desired document number, 730 # or until the end of the results. 731 732 while self.next_docnum <= docnum: 733 self._next_read_section() 734 if self.docnum < docnum: 735 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 736 else: 737 break 738 739 except StopIteration: 740 pass 741 742 # Navigate in the position file to the document. 743 744 self._init_section() 745 746 try: 747 while 1: 748 found_docnum, found_positions = self.iterator.next() 749 750 # Return the desired document positions or None (retaining the 751 # positions for the document immediately after). 752 753 if docnum == found_docnum: 754 return found_positions 755 elif docnum < found_docnum: 756 self.found_docnum, self.found_positions = found_docnum, found_positions 757 return None 758 759 except StopIteration: 760 return None 761 762 # Internal methods. 763 764 def _next_section(self): 765 766 "Attempt to get the next section in the index." 767 768 if self.next_docnum is None: 769 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 770 else: 771 self._next_read_section() 772 773 def _next_read_section(self): 774 775 """ 776 Make the next index entry the current one without reading from the 777 index. 778 """ 779 780 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 781 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 782 783 def _init_section(self): 784 785 "Initialise the iterator for the section in the position file." 786 787 if self.iterator is not None: 788 self.iterator.close() 789 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 790 791 def close(self): 792 if self.iterator is not None: 793 self.iterator.close() 794 self.iterator = None 795 if self.index_iterator is not None: 796 self.index_iterator.close() 797 self.index_iterator = None 798 799 class TermWriter(FileWriter): 800 801 "Writing term information to files." 802 803 def reset(self): 804 self.last_term = "" 805 self.last_offset = 0 806 807 def write_term(self, term, offset, frequency, doc_frequency): 808 809 """ 810 Write the given 'term', its position file 'offset', its 'frequency' and 811 its 'doc_frequency' (number of documents in which it appears) to the 812 term information file. Return the offset after the term information was 813 written to the file. 814 """ 815 816 # Write the prefix length and term suffix. 817 818 common = len(commonprefix([self.last_term, term])) 819 suffix = term[common:] 820 821 self.write_number(common) 822 self.write_string(suffix) 823 824 # Write the offset delta. 825 826 self.write_number(offset - self.last_offset) 827 828 # Write the frequency. 829 830 self.write_number(frequency) 831 832 # Write the document frequency. 833 834 self.write_number(doc_frequency) 835 836 self.last_term = term 837 self.last_offset = offset 838 839 return self.tell() 840 841 class TermReader(FileReader): 842 843 "Reading term information from files." 844 845 def reset(self): 846 self.last_term = "" 847 self.last_offset = 0 848 849 def read_term(self): 850 851 """ 852 Read a term, its position file offset, its frequency and its document 853 frequency from the term information file. 854 """ 855 856 # Read the prefix length and term suffix. 857 858 common = self.read_number() 859 suffix = self.read_string() 860 861 self.last_term = self.last_term[:common] + suffix 862 863 # Read the offset delta. 864 865 self.last_offset += self.read_number() 866 867 # Read the frequency. 868 869 frequency = self.read_number() 870 871 # Read the document frequency. 872 873 doc_frequency = self.read_number() 874 875 return self.last_term, self.last_offset, frequency, doc_frequency 876 877 def go_to_term(self, term, offset, info_offset): 878 879 """ 880 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 881 permits the scanning for later terms from the specified term. 882 """ 883 884 self.seek(info_offset) 885 self.last_term = term 886 self.last_offset = offset 887 888 class TermIndexWriter(TermWriter): 889 890 "Writing term dictionary index details to files." 891 892 def reset(self): 893 TermWriter.reset(self) 894 self.last_info_offset = 0 895 896 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 897 898 """ 899 Write the given 'term', its position file 'offset', its 'frequency' and 900 its 'doc_frequency' to the term dictionary index file, along with the 901 'info_offset' in the term information file. 902 """ 903 904 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 905 906 # Write the information file offset delta. 907 908 self.write_number(info_offset - self.last_info_offset) 909 self.last_info_offset = info_offset 910 911 class TermIndexReader(TermReader): 912 913 "Reading term dictionary index details from files." 914 915 def reset(self): 916 TermReader.reset(self) 917 self.last_info_offset = 0 918 919 def read_term(self): 920 921 """ 922 Read a term, its position file offset, its frequency, its document 923 frequency and a term information file offset from the term dictionary 924 index file. 925 """ 926 927 term, offset, frequency, doc_frequency = TermReader.read_term(self) 928 929 # Read the offset delta. 930 931 self.last_info_offset += self.read_number() 932 933 return term, offset, frequency, doc_frequency, self.last_info_offset 934 935 class TermDictionaryWriter: 936 937 "Writing term dictionaries." 938 939 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 940 self.info_writer = info_writer 941 self.index_writer = index_writer 942 self.position_dict_writer = position_dict_writer 943 self.interval = interval 944 self.entry = 0 945 946 def _write_term(self, term, offset, frequency, doc_frequency): 947 948 """ 949 Write the given 'term', its position file 'offset', its 'frequency' and 950 its 'doc_frequency' (number of documents in which it appears) to the 951 term information file. Return the offset after the term information was 952 written to the file. 953 """ 954 955 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 956 957 if self.entry % self.interval == 0: 958 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 959 960 self.entry += 1 961 962 def write_term_positions(self, term, doc_positions): 963 964 """ 965 Write the given 'term' and the 'doc_positions' recording the documents 966 and positions at which the term is found. 967 """ 968 969 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 970 self._write_term(term, offset, frequency, doc_frequency) 971 972 def close(self): 973 self.info_writer.close() 974 self.index_writer.close() 975 self.position_dict_writer.close() 976 977 class TermDictionaryReader: 978 979 "Reading term dictionaries." 980 981 def __init__(self, info_reader, index_reader, position_dict_reader): 982 self.info_reader = info_reader 983 self.index_reader = index_reader 984 self.position_dict_reader = position_dict_reader 985 986 self.terms = [] 987 try: 988 while 1: 989 self.terms.append(self.index_reader.read_term()) 990 except EOFError: 991 pass 992 993 # Large numbers for ordering purposes. 994 995 if self.terms: 996 self.max_offset = self.terms[-1][1] + 1 997 else: 998 self.max_offset = None 999 1000 def _find_closest_entry(self, term): 1001 1002 """ 1003 Find the offsets and frequencies of 'term' from the term dictionary or 1004 the closest term starting with the value of 'term'. 1005 1006 Return the closest index entry consisting of a term, the position file 1007 offset, the term frequency, the document frequency, and the term details 1008 file offset. 1009 """ 1010 1011 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 1012 1013 # Get the entry position providing the term or one preceding it. 1014 # If no entry precedes the requested term, return the very first entry 1015 # as the closest. 1016 1017 if i == -1: 1018 return self.terms[0] 1019 else: 1020 return self.terms[i] 1021 1022 def _find_closest_term(self, term): 1023 1024 """ 1025 Find the offsets and frequencies of 'term' from the term dictionary or 1026 the closest term starting with the value of 'term'. 1027 1028 Return the closest term (or the term itself), the position file offset, 1029 the term frequency, the document frequency, and the term details file 1030 offset (or None if the reader is already positioned). 1031 """ 1032 1033 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 1034 1035 # Where the term is found immediately, return the offset and 1036 # frequencies. If the term does not appear, return the details of the 1037 # closest entry. 1038 1039 if term <= found_term: 1040 return found_term, offset, frequency, doc_frequency, info_offset 1041 1042 # Otherwise, seek past the index term's entry in the information file 1043 # and scan for the desired term. 1044 1045 else: 1046 self.info_reader.go_to_term(found_term, offset, info_offset) 1047 try: 1048 while term > found_term: 1049 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1050 except EOFError: 1051 pass 1052 1053 return found_term, offset, frequency, doc_frequency, None 1054 1055 def _find_term(self, term): 1056 1057 """ 1058 Find the position file offset and frequency of 'term' from the term 1059 dictionary. 1060 """ 1061 1062 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1063 1064 # If the term is found, return the offset and frequencies. 1065 1066 if term == found_term: 1067 return offset, frequency, doc_frequency 1068 else: 1069 return None 1070 1071 def _get_positions(self, offset, doc_frequency): 1072 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 1073 1074 # Iterator convenience methods. 1075 1076 def __iter__(self): 1077 self.rewind() 1078 return self 1079 1080 def next(self): 1081 try: 1082 return self.read_term() 1083 except EOFError: 1084 raise StopIteration 1085 1086 # Sequential access methods. 1087 1088 def rewind(self): 1089 self.info_reader.rewind() 1090 1091 def read_term(self): 1092 1093 """ 1094 Return the next term, its frequency, its document frequency, and the 1095 documents and positions at which the term is found. 1096 """ 1097 1098 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1099 positions = self._get_positions(offset, doc_frequency) 1100 return term, frequency, doc_frequency, positions 1101 1102 # Query methods. 1103 1104 def find_terms(self, term): 1105 1106 "Return all terms whose values start with the value of 'term'." 1107 1108 terms = [] 1109 1110 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1111 1112 # Position the reader, if necessary. 1113 1114 if info_offset is not None: 1115 self.info_reader.go_to_term(found_term, offset, info_offset) 1116 1117 # Read and record terms. 1118 1119 try: 1120 # Add the found term if it starts with the specified term. 1121 1122 while found_term.startswith(term): 1123 terms.append(found_term) 1124 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1125 1126 except EOFError: 1127 pass 1128 1129 return terms 1130 1131 def find_positions(self, term): 1132 1133 "Return the documents and positions at which the given 'term' is found." 1134 1135 t = self._find_term(term) 1136 if t is None: 1137 return None 1138 else: 1139 offset, frequency, doc_frequency = t 1140 return self._get_positions(offset, doc_frequency) 1141 1142 def get_frequency(self, term): 1143 1144 "Return the frequency of the given 'term'." 1145 1146 t = self._find_term(term) 1147 if t is None: 1148 return None 1149 else: 1150 offset, frequency, doc_frequency = t 1151 return frequency 1152 1153 def get_document_frequency(self, term): 1154 1155 "Return the document frequency of the given 'term'." 1156 1157 t = self._find_term(term) 1158 if t is None: 1159 return None 1160 else: 1161 offset, frequency, doc_frequency = t 1162 return doc_frequency 1163 1164 def close(self): 1165 self.info_reader.close() 1166 self.index_reader.close() 1167 self.position_dict_reader.close() 1168 1169 # Specific classes for storing document information. 1170 1171 class FieldWriter(FileWriter): 1172 1173 "Writing field data to files." 1174 1175 def reset(self): 1176 self.last_docnum = 0 1177 1178 def write_fields(self, docnum, fields): 1179 1180 """ 1181 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1182 representing field identifiers and values respectively). 1183 Return the offset at which the fields are stored. 1184 """ 1185 1186 offset = self.tell() 1187 1188 # Write the document number delta. 1189 1190 self.write_number(docnum - self.last_docnum) 1191 1192 # Write the number of fields. 1193 1194 self.write_number(len(fields)) 1195 1196 # Write the fields themselves. 1197 1198 for i, field in fields: 1199 self.write_number(i) 1200 self.write_string(field, 1) # compress 1201 1202 self.last_docnum = docnum 1203 return offset 1204 1205 class FieldReader(FileReader): 1206 1207 "Reading field data from files." 1208 1209 def reset(self): 1210 self.last_docnum = 0 1211 1212 def read_fields(self): 1213 1214 """ 1215 Read fields from the file, returning a tuple containing the document 1216 number and a list of field (identifier, value) pairs. 1217 """ 1218 1219 # Read the document number. 1220 1221 self.last_docnum += self.read_number() 1222 1223 # Read the number of fields. 1224 1225 nfields = self.read_number() 1226 1227 # Collect the fields. 1228 1229 fields = [] 1230 i = 0 1231 1232 while i < nfields: 1233 identifier = self.read_number() 1234 value = self.read_string(1) # decompress 1235 fields.append((identifier, value)) 1236 i += 1 1237 1238 return self.last_docnum, fields 1239 1240 def read_document_fields(self, docnum, offset): 1241 1242 """ 1243 Read fields for 'docnum' at the given 'offset'. This permits the 1244 retrieval of details for the specified document, as well as scanning for 1245 later documents. 1246 """ 1247 1248 self.seek(offset) 1249 bad_docnum, fields = self.read_fields() 1250 self.last_docnum = docnum 1251 return docnum, fields 1252 1253 class FieldIndexWriter(FileWriter): 1254 1255 "Writing field index details to files." 1256 1257 def reset(self): 1258 self.last_docnum = 0 1259 self.last_offset = 0 1260 1261 def write_document(self, docnum, offset): 1262 1263 """ 1264 Write for the given 'docnum', the 'offset' at which the fields for the 1265 document are stored in the fields file. 1266 """ 1267 1268 # Write the document number and offset deltas. 1269 1270 self.write_number(docnum - self.last_docnum) 1271 self.write_number(offset - self.last_offset) 1272 1273 self.last_docnum = docnum 1274 self.last_offset = offset 1275 1276 class FieldIndexReader(FileReader): 1277 1278 "Reading field index details from files." 1279 1280 def reset(self): 1281 self.last_docnum = 0 1282 self.last_offset = 0 1283 1284 def read_document(self): 1285 1286 "Read a document number and field file offset." 1287 1288 # Read the document number delta and offset. 1289 1290 self.last_docnum += self.read_number() 1291 self.last_offset += self.read_number() 1292 1293 return self.last_docnum, self.last_offset 1294 1295 class FieldDictionaryWriter: 1296 1297 "Writing field dictionary details." 1298 1299 def __init__(self, field_writer, field_index_writer, interval): 1300 self.field_writer = field_writer 1301 self.field_index_writer = field_index_writer 1302 self.interval = interval 1303 self.entry = 0 1304 1305 def write_fields(self, docnum, fields): 1306 1307 "Write details of the document with the given 'docnum' and 'fields'." 1308 1309 offset = self.field_writer.write_fields(docnum, fields) 1310 1311 if self.entry % self.interval == 0: 1312 self.field_index_writer.write_document(docnum, offset) 1313 1314 self.entry += 1 1315 1316 def close(self): 1317 self.field_writer.close() 1318 self.field_index_writer.close() 1319 1320 class FieldDictionaryReader: 1321 1322 "Reading field dictionary details." 1323 1324 def __init__(self, field_reader, field_index_reader): 1325 self.field_reader = field_reader 1326 self.field_index_reader = field_index_reader 1327 1328 self.docs = [] 1329 try: 1330 while 1: 1331 self.docs.append(self.field_index_reader.read_document()) 1332 except EOFError: 1333 pass 1334 1335 # Large numbers for ordering purposes. 1336 1337 if self.docs: 1338 self.max_offset = self.docs[-1][1] 1339 else: 1340 self.max_offset = None 1341 1342 # Iterator convenience methods. 1343 1344 def __iter__(self): 1345 self.rewind() 1346 return self 1347 1348 def next(self): 1349 try: 1350 return self.read_fields() 1351 except EOFError: 1352 raise StopIteration 1353 1354 # Sequential access methods. 1355 1356 def rewind(self): 1357 self.field_reader.rewind() 1358 1359 def read_fields(self): 1360 1361 "Return the next document number and fields." 1362 1363 return self.field_reader.read_fields() 1364 1365 # Random access methods. 1366 1367 def get_fields(self, docnum): 1368 1369 "Read the fields of the document with the given 'docnum'." 1370 1371 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1372 1373 # Get the entry position providing the term or one preceding it. 1374 1375 if i == -1: 1376 return None 1377 1378 found_docnum, offset = self.docs[i] 1379 1380 # Read from the fields file. 1381 1382 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1383 1384 # Scan for the document, if necessary. 1385 1386 try: 1387 while docnum > found_docnum: 1388 found_docnum, fields = self.field_reader.read_fields() 1389 except EOFError: 1390 pass 1391 1392 # If the document is found, return the fields. 1393 1394 if docnum == found_docnum: 1395 return fields 1396 else: 1397 return None 1398 1399 def close(self): 1400 self.field_reader.close() 1401 self.field_index_reader.close() 1402 1403 # Dictionary merging classes. 1404 1405 class Merger: 1406 1407 "Merge files." 1408 1409 def __init__(self, writer, readers): 1410 self.writer = writer 1411 self.readers = readers 1412 1413 def close(self): 1414 for reader in self.readers: 1415 reader.close() 1416 self.writer.close() 1417 1418 class TermDictionaryMerger(Merger): 1419 1420 "Merge term and position files." 1421 1422 def merge(self): 1423 1424 """ 1425 Merge terms and positions from the readers, sending them to the writer. 1426 """ 1427 1428 last_term = None 1429 current_readers = [] 1430 1431 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1432 if term == last_term: 1433 current_readers.append(positions) 1434 else: 1435 if current_readers: 1436 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1437 last_term = term 1438 current_readers = [positions] 1439 else: 1440 if current_readers: 1441 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1442 1443 class FieldDictionaryMerger(Merger): 1444 1445 "Merge field files." 1446 1447 def merge(self): 1448 1449 """ 1450 Merge fields from the readers, sending them to the writer. 1451 """ 1452 1453 for docnum, fields in itermerge(self.readers): 1454 self.writer.write_fields(docnum, fields) 1455 1456 # Utility functions. 1457 1458 def get_term_writer(pathname, partition, interval, doc_interval): 1459 1460 """ 1461 Return a term dictionary writer using files under the given 'pathname' 1462 labelled according to the given 'partition', using the given indexing 1463 'interval' for terms and 'doc_interval' for document position records. 1464 """ 1465 1466 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1467 info_writer = TermWriter(tdf) 1468 1469 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1470 index_writer = TermIndexWriter(tdif) 1471 1472 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1473 positions_writer = PositionWriter(tpf) 1474 1475 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1476 positions_index_writer = PositionIndexWriter(tpif) 1477 1478 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1479 1480 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1481 1482 def get_field_writer(pathname, partition, interval): 1483 1484 """ 1485 Return a field dictionary writer using files under the given 'pathname' 1486 labelled according to the given 'partition', using the given indexing 1487 'interval'. 1488 """ 1489 1490 ff = open(join(pathname, "fields-%s" % partition), "wb") 1491 field_writer = FieldWriter(ff) 1492 1493 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1494 field_index_writer = FieldIndexWriter(fif) 1495 1496 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1497 1498 def get_term_reader(pathname, partition): 1499 1500 """ 1501 Return a term dictionary reader using files under the given 'pathname' 1502 labelled according to the given 'partition'. 1503 """ 1504 1505 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1506 info_reader = TermReader(tdf) 1507 1508 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1509 index_reader = TermIndexReader(tdif) 1510 1511 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1512 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1513 1514 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1515 1516 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1517 1518 def get_field_reader(pathname, partition): 1519 1520 """ 1521 Return a field dictionary reader using files under the given 'pathname' 1522 labelled according to the given 'partition'. 1523 """ 1524 1525 ff = open(join(pathname, "fields-%s" % partition), "rb") 1526 field_reader = FieldReader(ff) 1527 1528 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1529 field_index_reader = FieldIndexReader(fif) 1530 1531 return FieldDictionaryReader(field_reader, field_index_reader) 1532 1533 def rename_files(pathname, names, from_partition, to_partition): 1534 for name in names: 1535 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1536 1537 def rename_term_files(pathname, from_partition, to_partition): 1538 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1539 1540 def rename_field_files(pathname, from_partition, to_partition): 1541 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1542 1543 def remove_files(pathname, names, partition): 1544 for name in names: 1545 remove(join(pathname, "%s-%s" % (name, partition))) 1546 1547 def remove_term_files(pathname, partition): 1548 remove_files(pathname, TERM_FILENAMES, partition) 1549 1550 def remove_field_files(pathname, partition): 1551 remove_files(pathname, FIELD_FILENAMES, partition) 1552 1553 # High-level classes. 1554 1555 class Document: 1556 1557 "A container of document information." 1558 1559 def __init__(self, docnum): 1560 self.docnum = docnum 1561 self.fields = [] 1562 self.terms = {} 1563 1564 def add_position(self, term, position): 1565 1566 """ 1567 Add a position entry for the given 'term', indicating the given 1568 'position'. 1569 """ 1570 1571 self.terms.setdefault(term, []).append(position) 1572 1573 def add_field(self, identifier, value): 1574 1575 "Add a field having the given 'identifier' and 'value'." 1576 1577 self.fields.append((identifier, unicode(value))) # convert to string 1578 1579 def set_fields(self, fields): 1580 1581 """ 1582 Set the document's 'fields': a list of tuples each containing an integer 1583 identifier and a string value. 1584 """ 1585 1586 self.fields = fields 1587 1588 class IndexWriter: 1589 1590 """ 1591 Building term information and writing it to the term and field dictionaries. 1592 """ 1593 1594 def __init__(self, pathname, interval, doc_interval, flush_interval): 1595 self.pathname = pathname 1596 self.interval = interval 1597 self.doc_interval = doc_interval 1598 self.flush_interval = flush_interval 1599 1600 self.dict_partition = 0 1601 self.field_dict_partition = 0 1602 1603 self.terms = {} 1604 self.docs = {} 1605 1606 self.doc_counter = 0 1607 1608 def add_document(self, doc): 1609 1610 """ 1611 Add the given document 'doc', updating the document counter and flushing 1612 terms and fields if appropriate. 1613 """ 1614 1615 for term, positions in doc.terms.items(): 1616 self.terms.setdefault(term, {})[doc.docnum] = positions 1617 1618 self.docs[doc.docnum] = doc.fields 1619 1620 self.doc_counter += 1 1621 if self.flush_interval and self.doc_counter >= self.flush_interval: 1622 self.flush_terms() 1623 self.flush_fields() 1624 self.doc_counter = 0 1625 1626 def get_term_writer(self): 1627 1628 "Return a term dictionary writer for the current partition." 1629 1630 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1631 1632 def get_field_writer(self): 1633 1634 "Return a field dictionary writer for the current partition." 1635 1636 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1637 1638 def flush_terms(self): 1639 1640 "Flush terms into the current term dictionary partition." 1641 1642 # Get the terms in order. 1643 1644 all_terms = self.terms 1645 terms = all_terms.keys() 1646 terms.sort() 1647 1648 dict_writer = self.get_term_writer() 1649 1650 for term in terms: 1651 doc_positions = all_terms[term].items() 1652 dict_writer.write_term_positions(term, doc_positions) 1653 1654 dict_writer.close() 1655 1656 self.terms = {} 1657 self.dict_partition += 1 1658 1659 def flush_fields(self): 1660 1661 "Flush fields into the current term dictionary partition." 1662 1663 # Get the documents in order. 1664 1665 docs = self.docs.items() 1666 docs.sort() 1667 1668 field_dict_writer = self.get_field_writer() 1669 1670 for docnum, fields in docs: 1671 field_dict_writer.write_fields(docnum, fields) 1672 1673 field_dict_writer.close() 1674 1675 self.docs = {} 1676 self.field_dict_partition += 1 1677 1678 def close(self): 1679 if self.terms: 1680 self.flush_terms() 1681 if self.docs: 1682 self.flush_fields() 1683 1684 class IndexReader: 1685 1686 "Accessing the term and field dictionaries." 1687 1688 def __init__(self, pathname): 1689 self.dict_reader = get_term_reader(pathname, "merged") 1690 self.field_dict_reader = get_field_reader(pathname, "merged") 1691 1692 def find_terms(self, term): 1693 return self.dict_reader.find_terms(term) 1694 1695 def find_positions(self, term): 1696 return self.dict_reader.find_positions(term) 1697 1698 def get_frequency(self, term): 1699 return self.dict_reader.get_frequency(term) 1700 1701 def get_document_frequency(self, term): 1702 return self.dict_reader.get_document_frequency(term) 1703 1704 def get_fields(self, docnum): 1705 return self.field_dict_reader.get_fields(docnum) 1706 1707 def close(self): 1708 self.dict_reader.close() 1709 self.field_dict_reader.close() 1710 1711 class Index: 1712 1713 "An inverted index solution encapsulating the various components." 1714 1715 def __init__(self, pathname): 1716 self.pathname = pathname 1717 self.reader = None 1718 self.writer = None 1719 1720 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1721 1722 """ 1723 Return a writer, optionally using the given indexing 'interval', 1724 'doc_interval' and 'flush_interval'. 1725 """ 1726 1727 if not exists(self.pathname): 1728 mkdir(self.pathname) 1729 1730 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1731 return self.writer 1732 1733 def get_reader(self, partition=0): 1734 1735 "Return a reader for the index." 1736 1737 # Ensure that only one partition exists. 1738 1739 self.merge() 1740 return self._get_reader(partition) 1741 1742 def _get_reader(self, partition): 1743 1744 "Return a reader for the index." 1745 1746 if not exists(self.pathname): 1747 raise OSError, "Index path %r does not exist." % self.pathname 1748 1749 self.reader = IndexReader(self.pathname) 1750 return self.reader 1751 1752 def merge(self): 1753 1754 "Merge/optimise index partitions." 1755 1756 self.merge_terms() 1757 self.merge_fields() 1758 1759 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1760 1761 """ 1762 Merge term dictionaries using the given indexing 'interval' and 1763 'doc_interval'. 1764 """ 1765 1766 readers = [] 1767 partitions = set() 1768 1769 for filename in listdir(self.pathname): 1770 if filename.startswith("terms-"): # 6 character prefix 1771 partition = filename[6:] 1772 readers.append(get_term_reader(self.pathname, partition)) 1773 partitions.add(partition) 1774 1775 # Write directly to a dictionary. 1776 1777 if len(readers) > 1: 1778 if "merged" in partitions: 1779 rename_term_files(self.pathname, "merged", "old-merged") 1780 partitions.remove("merged") 1781 partitions.add("old-merged") 1782 1783 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1784 merger = TermDictionaryMerger(writer, readers) 1785 merger.merge() 1786 merger.close() 1787 1788 # Remove old files. 1789 1790 for partition in partitions: 1791 remove_term_files(self.pathname, partition) 1792 1793 elif len(readers) == 1: 1794 partition = list(partitions)[0] 1795 if partition != "merged": 1796 rename_term_files(self.pathname, partition, "merged") 1797 1798 def merge_fields(self, interval=FIELD_INTERVAL): 1799 1800 "Merge field dictionaries using the given indexing 'interval'." 1801 1802 readers = [] 1803 partitions = set() 1804 1805 for filename in listdir(self.pathname): 1806 if filename.startswith("fields-"): # 7 character prefix 1807 partition = filename[7:] 1808 readers.append(get_field_reader(self.pathname, partition)) 1809 partitions.add(partition) 1810 1811 # Write directly to a dictionary. 1812 1813 if len(readers) > 1: 1814 if "merged" in partitions: 1815 rename_field_files(self.pathname, "merged", "old-merged") 1816 partitions.remove("merged") 1817 partitions.add("old-merged") 1818 1819 writer = get_field_writer(self.pathname, "merged", interval) 1820 merger = FieldDictionaryMerger(writer, readers) 1821 merger.merge() 1822 merger.close() 1823 1824 # Remove old files. 1825 1826 for partition in partitions: 1827 remove_field_files(self.pathname, partition) 1828 1829 elif len(readers) == 1: 1830 partition = list(partitions)[0] 1831 if partition != "merged": 1832 rename_field_files(self.pathname, partition, "merged") 1833 1834 def close(self): 1835 if self.reader is not None: 1836 self.reader.close() 1837 self.reader = None 1838 if self.writer is not None: 1839 self.writer.close() 1840 self.writer = None 1841 1842 # vim: tabstop=4 expandtab shiftwidth=4