1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 try: 51 from vint import vint as _vint 52 53 def vint(number): 54 55 "Write 'number' as a variable-length integer." 56 57 if number >= 0: 58 return _vint(number) 59 else: 60 raise ValueError, "Number %r is negative." % number 61 62 except ImportError: 63 64 def vint(number): 65 66 "Write 'number' as a variable-length integer." 67 68 if number >= 0: 69 70 # Special case: one byte containing a 7-bit number. 71 72 if number < 128: 73 return chr(number) 74 75 # Write the number from least to most significant digits. 76 77 bytes = [] 78 79 while number != 0: 80 lsd = number & 127 81 number = number >> 7 82 if number != 0: 83 lsd |= 128 84 bytes.append(chr(lsd)) 85 86 return "".join(bytes) 87 88 # Negative numbers are not supported. 89 90 else: 91 raise ValueError, "Number %r is negative." % number 92 93 # Foundation classes. 94 95 class File: 96 97 "A basic file abstraction." 98 99 def __init__(self, f): 100 self.f = f 101 self.reset() 102 103 def reset(self): 104 105 "To be used to reset the state of the reader or writer between records." 106 107 pass 108 109 def rewind(self): 110 self.f.seek(0) 111 self.reset() 112 113 def close(self): 114 if self.f is not None: 115 self.f.close() 116 self.f = None 117 118 class FileWriter(File): 119 120 "Writing basic data types to files." 121 122 def write_number(self, number): 123 124 "Write 'number' to the file using a variable length encoding." 125 126 self.f.write(vint(number)) 127 128 def write_string(self, s, compress=0): 129 130 """ 131 Write 's' to the file, recording its length and compressing the string 132 if 'compress' is set to a true value. 133 """ 134 135 # Convert Unicode objects to strings. 136 137 if isinstance(s, unicode): 138 s = s.encode("utf-8") 139 140 # Compress the string if requested. 141 142 if compress: 143 for flag, fn in compressors: 144 cs = fn(s) 145 146 # Take the first string shorter than the original. 147 148 if len(cs) < len(s): 149 s = cs 150 break 151 else: 152 flag = "-" 153 154 else: 155 flag = "" 156 157 # Write the length of the data before the data itself. 158 159 length = len(s) 160 self.f.write(flag + vint(length) + s) 161 162 class FileReader(File): 163 164 "Reading basic data types from files." 165 166 def read_number(self): 167 168 "Read a number from the file." 169 170 # Read each byte, adding it to the number. 171 172 shift = 0 173 number = 0 174 read = self.f.read 175 176 try: 177 csd = ord(read(1)) 178 while csd & 128: 179 number += ((csd & 127) << shift) 180 shift += 7 181 csd = ord(read(1)) 182 else: 183 number += (csd << shift) 184 except TypeError: 185 raise EOFError 186 187 return number 188 189 def read_string(self, decompress=0): 190 191 """ 192 Read a string from the file, decompressing the stored data if 193 'decompress' is set to a true value. 194 """ 195 196 # Decompress the data if requested. 197 198 if decompress: 199 flag = self.f.read(1) 200 else: 201 flag = "-" 202 203 length = self.read_number() 204 s = self.f.read(length) 205 206 # Perform decompression if applicable. 207 208 if flag != "-": 209 fn = decompressors[flag] 210 s = fn(s) 211 212 # Convert strings to Unicode objects. 213 214 return unicode(s, "utf-8") 215 216 class FileOpener: 217 218 "Opening files using their filenames." 219 220 def __init__(self, filename): 221 self.filename = filename 222 223 def open(self, mode): 224 return open(self.filename, mode) 225 226 def close(self): 227 pass 228 229 # Specific classes for storing term and position information. 230 231 class PositionWriter(FileWriter): 232 233 "Writing position information to files." 234 235 def reset(self): 236 self.last_docnum = 0 237 238 def write_positions(self, docnum, positions): 239 240 """ 241 Write for the document 'docnum' the given 'positions'. 242 Return the offset of the written record. 243 """ 244 245 if docnum < self.last_docnum: 246 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 247 248 # Record the offset of this record. 249 250 offset = self.f.tell() 251 252 # Make sure that the positions are sorted. 253 254 positions.sort() 255 256 # Write the position deltas. 257 258 output = [] 259 last = 0 260 261 for position in positions: 262 output.append(vint(position - last)) 263 last = position 264 265 # Write the document number delta. 266 # Write the number of positions. 267 # Then write the positions. 268 269 self.f.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 270 271 self.last_docnum = docnum 272 return offset 273 274 class PositionOpener(FileOpener): 275 276 "Reading position information from files." 277 278 def read_term_positions(self, offset, count): 279 280 """ 281 Read all positions from 'offset', seeking to that position in the file 282 before reading. The number of documents available for reading is limited 283 to 'count'. 284 """ 285 286 # Duplicate the file handle. 287 288 f = self.open("rb") 289 f.seek(offset) 290 return PositionIterator(f, count) 291 292 class PositionIndexWriter(FileWriter): 293 294 "Writing position index information to files." 295 296 def reset(self): 297 self.last_docnum = 0 298 self.last_pos_offset = 0 299 300 def write_positions(self, docnum, pos_offset, count): 301 302 """ 303 Write the given 'docnum, 'pos_offset' and document 'count' to the 304 position index file. 305 """ 306 307 # Record the offset of this record. 308 309 offset = self.f.tell() 310 output = [] 311 312 # Write the document number delta. 313 314 output.append(vint(docnum - self.last_docnum)) 315 self.last_docnum = docnum 316 317 # Write the position file offset delta. 318 319 output.append(vint(pos_offset - self.last_pos_offset)) 320 self.last_pos_offset = pos_offset 321 322 # Write the document count. 323 324 output.append(vint(count)) 325 326 # Actually write the data. 327 328 self.f.write("".join(output)) 329 330 return offset 331 332 class PositionIndexOpener(FileOpener): 333 334 "Reading position index information from files." 335 336 def read_term_positions(self, offset, doc_frequency): 337 338 """ 339 Read all positions from 'offset', seeking to that position in the file 340 before reading. The number of documents available for reading is limited 341 to 'doc_frequency'. 342 """ 343 344 # Duplicate the file handle. 345 346 f = self.open("rb") 347 f.seek(offset) 348 return PositionIndexIterator(f, doc_frequency) 349 350 # Iterators for position-related files. 351 352 class IteratorBase: 353 354 def __init__(self, count): 355 self.replenish(count) 356 357 def replenish(self, count): 358 self.count = count 359 self.read_documents = 0 360 361 def __len__(self): 362 return self.count 363 364 def sort(self): 365 pass # Stored document positions are already sorted. 366 367 def __iter__(self): 368 return self 369 370 class PositionIterator(FileReader, IteratorBase): 371 372 "Iterating over document positions." 373 374 def __init__(self, f, count): 375 FileReader.__init__(self, f) 376 IteratorBase.__init__(self, count) 377 378 def reset(self): 379 self.last_docnum = 0 380 381 def read_positions(self): 382 383 "Read positions, returning a document number and a list of positions." 384 385 # Read the document number delta and add it to the last number. 386 387 self.last_docnum += self.read_number() 388 389 # Read the number of positions. 390 391 npositions = self.read_number() 392 393 # Read the position deltas, adding each previous position to get the 394 # appropriate collection of absolute positions. 395 396 i = 0 397 last = 0 398 positions = [] 399 400 while i < npositions: 401 last += self.read_number() 402 positions.append(last) 403 i += 1 404 405 return self.last_docnum, positions 406 407 def next(self): 408 409 "Read positions for a single document." 410 411 if self.read_documents < self.count: 412 self.read_documents += 1 413 return self.read_positions() 414 else: 415 raise StopIteration 416 417 class PositionIndexIterator(FileReader, IteratorBase): 418 419 "Iterating over document positions." 420 421 def __init__(self, f, count): 422 FileReader.__init__(self, f) 423 IteratorBase.__init__(self, count) 424 self.section_count = 0 425 426 def reset(self): 427 self.last_docnum = 0 428 self.last_pos_offset = 0 429 430 def read_positions(self): 431 432 """ 433 Read a document number, a position file offset for the position index 434 file, and the number of documents in a section of that file. 435 """ 436 437 # Read the document number delta. 438 439 self.last_docnum += self.read_number() 440 441 # Read the offset delta. 442 443 self.last_pos_offset += self.read_number() 444 445 # Read the document count. 446 447 count = self.read_number() 448 449 return self.last_docnum, self.last_pos_offset, count 450 451 def next(self): 452 453 "Read positions for a single document." 454 455 self.read_documents += self.section_count 456 if self.read_documents < self.count: 457 docnum, pos_offset, self.section_count = t = self.read_positions() 458 return t 459 else: 460 raise StopIteration 461 462 class PositionDictionaryWriter: 463 464 "Writing position dictionaries." 465 466 def __init__(self, position_writer, position_index_writer, interval): 467 self.position_writer = position_writer 468 self.position_index_writer = position_index_writer 469 self.interval = interval 470 471 def write_term_positions(self, doc_positions): 472 473 """ 474 Write all 'doc_positions' - a collection of tuples of the form (document 475 number, position list) - to the file. 476 477 Add some records to the index, making dictionary entries. 478 479 Return a tuple containing the offset of the written data, the frequency 480 (number of positions), and document frequency (number of documents) for 481 the term involved. 482 """ 483 484 # Reset the writers. 485 486 self.position_writer.reset() 487 self.position_index_writer.reset() 488 489 index_offset = None 490 491 # Write the positions. 492 493 frequency = 0 494 first_docnum = None 495 first_offset = None 496 count = 0 497 498 doc_positions.sort() 499 500 for docnum, positions in doc_positions: 501 pos_offset = self.position_writer.write_positions(docnum, positions) 502 503 # Retain the first record offset for a subsequent index entry. 504 505 if first_offset is None: 506 first_offset = pos_offset 507 first_docnum = docnum 508 509 frequency += len(positions) 510 count += 1 511 512 # Every {interval} entries, write an index entry. 513 514 if count % self.interval == 0: 515 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 516 517 # Remember the first index entry offset. 518 519 if index_offset is None: 520 index_offset = io 521 522 first_offset = None 523 first_docnum = None 524 525 # Reset the position writer so that position readers accessing 526 # a section start with the correct document number. 527 528 self.position_writer.reset() 529 530 # Finish writing an index entry for the remaining documents. 531 532 else: 533 if first_offset is not None: 534 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 535 536 # Remember the first index entry offset. 537 538 if index_offset is None: 539 index_offset = io 540 541 return index_offset, frequency, count 542 543 def close(self): 544 self.position_writer.close() 545 self.position_index_writer.close() 546 547 class PositionDictionaryReader: 548 549 "Reading position dictionaries." 550 551 def __init__(self, position_opener, position_index_opener): 552 self.position_opener = position_opener 553 self.position_index_opener = position_index_opener 554 555 def read_term_positions(self, offset, doc_frequency): 556 557 """ 558 Return an iterator for dictionary entries starting at 'offset' with the 559 given 'doc_frequency'. 560 """ 561 562 return PositionDictionaryIterator(self.position_opener, 563 self.position_index_opener, offset, doc_frequency) 564 565 def close(self): 566 pass 567 568 class PositionDictionaryIterator: 569 570 "Iteration over position dictionary entries." 571 572 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 573 self.position_opener = position_opener 574 self.doc_frequency = doc_frequency 575 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 576 self.iterator = None 577 578 # Remember the last values. 579 580 self.found_docnum, self.found_positions = None, None 581 582 # Maintain state for the next index entry, if read. 583 584 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 585 586 # Initialise the current index entry and current position file iterator. 587 588 self._next_section() 589 self._init_section() 590 591 # Sequence methods. 592 593 def __len__(self): 594 return self.doc_frequency 595 596 def sort(self): 597 pass 598 599 # Iterator methods. 600 601 def __iter__(self): 602 return self 603 604 def next(self): 605 606 """ 607 Attempt to get the next document record from the section in the 608 positions file. 609 """ 610 611 # Return any visited but unrequested record. 612 613 if self.found_docnum is not None: 614 t = self.found_docnum, self.found_positions 615 self.found_docnum, self.found_positions = None, None 616 return t 617 618 # Or search for the next record. 619 620 while 1: 621 622 # Either return the next record. 623 624 try: 625 return self.iterator.next() 626 627 # Or, where a section is finished, get the next section and try again. 628 629 except StopIteration: 630 631 # Where a section follows, update the index iterator, but keep 632 # reading using the same file iterator (since the data should 633 # just follow on from the last section). 634 635 self._next_section() 636 self.iterator.replenish(self.section_count) 637 638 # Reset the state of the iterator to make sure that document 639 # numbers are correct. 640 641 self.iterator.reset() 642 643 def from_document(self, docnum): 644 645 """ 646 Attempt to navigate to a positions entry for the given 'docnum', 647 returning the positions for 'docnum', or None otherwise. 648 """ 649 650 # Return any unrequested document positions. 651 652 if docnum == self.found_docnum: 653 return self.found_positions 654 655 # Read ahead in the index until the next entry refers to a document 656 # later than the desired document. 657 658 try: 659 if self.next_docnum is None: 660 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 661 662 # Read until the next entry is after the desired document number, 663 # or until the end of the results. 664 665 while self.next_docnum <= docnum: 666 self._next_read_section() 667 if self.docnum < docnum: 668 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 669 else: 670 break 671 672 except StopIteration: 673 pass 674 675 # Navigate in the position file to the document. 676 677 self._init_section() 678 679 try: 680 while 1: 681 found_docnum, found_positions = self.iterator.next() 682 683 # Return the desired document positions or None (retaining the 684 # positions for the document immediately after). 685 686 if docnum == found_docnum: 687 return found_positions 688 elif docnum < found_docnum: 689 self.found_docnum, self.found_positions = found_docnum, found_positions 690 return None 691 692 except StopIteration: 693 return None 694 695 # Internal methods. 696 697 def _next_section(self): 698 699 "Attempt to get the next section in the index." 700 701 if self.next_docnum is None: 702 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 703 else: 704 self._next_read_section() 705 706 def _next_read_section(self): 707 708 """ 709 Make the next index entry the current one without reading from the 710 index. 711 """ 712 713 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 714 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 715 716 def _init_section(self): 717 718 "Initialise the iterator for the section in the position file." 719 720 if self.iterator is not None: 721 self.iterator.close() 722 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 723 724 def close(self): 725 if self.iterator is not None: 726 self.iterator.close() 727 self.iterator = None 728 if self.index_iterator is not None: 729 self.index_iterator.close() 730 self.index_iterator = None 731 732 class TermWriter(FileWriter): 733 734 "Writing term information to files." 735 736 def reset(self): 737 self.last_term = "" 738 self.last_offset = 0 739 740 def write_term(self, term, offset, frequency, doc_frequency): 741 742 """ 743 Write the given 'term', its position file 'offset', its 'frequency' and 744 its 'doc_frequency' (number of documents in which it appears) to the 745 term information file. Return the offset after the term information was 746 written to the file. 747 """ 748 749 # Write the prefix length and term suffix. 750 751 common = len(commonprefix([self.last_term, term])) 752 suffix = term[common:] 753 754 self.write_number(common) 755 self.write_string(suffix) 756 757 # Write the offset delta. 758 759 self.write_number(offset - self.last_offset) 760 761 # Write the frequency. 762 763 self.write_number(frequency) 764 765 # Write the document frequency. 766 767 self.write_number(doc_frequency) 768 769 self.last_term = term 770 self.last_offset = offset 771 772 return self.f.tell() 773 774 class TermReader(FileReader): 775 776 "Reading term information from files." 777 778 def reset(self): 779 self.last_term = "" 780 self.last_offset = 0 781 782 def read_term(self): 783 784 """ 785 Read a term, its position file offset, its frequency and its document 786 frequency from the term information file. 787 """ 788 789 # Read the prefix length and term suffix. 790 791 common = self.read_number() 792 suffix = self.read_string() 793 794 self.last_term = self.last_term[:common] + suffix 795 796 # Read the offset delta. 797 798 self.last_offset += self.read_number() 799 800 # Read the frequency. 801 802 frequency = self.read_number() 803 804 # Read the document frequency. 805 806 doc_frequency = self.read_number() 807 808 return self.last_term, self.last_offset, frequency, doc_frequency 809 810 def go_to_term(self, term, offset, info_offset): 811 812 """ 813 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 814 permits the scanning for later terms from the specified term. 815 """ 816 817 self.f.seek(info_offset) 818 self.last_term = term 819 self.last_offset = offset 820 821 class TermIndexWriter(TermWriter): 822 823 "Writing term dictionary index details to files." 824 825 def reset(self): 826 TermWriter.reset(self) 827 self.last_info_offset = 0 828 829 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 830 831 """ 832 Write the given 'term', its position file 'offset', its 'frequency' and 833 its 'doc_frequency' to the term dictionary index file, along with the 834 'info_offset' in the term information file. 835 """ 836 837 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 838 839 # Write the information file offset delta. 840 841 self.write_number(info_offset - self.last_info_offset) 842 self.last_info_offset = info_offset 843 844 class TermIndexReader(TermReader): 845 846 "Reading term dictionary index details from files." 847 848 def reset(self): 849 TermReader.reset(self) 850 self.last_info_offset = 0 851 852 def read_term(self): 853 854 """ 855 Read a term, its position file offset, its frequency, its document 856 frequency and a term information file offset from the term dictionary 857 index file. 858 """ 859 860 term, offset, frequency, doc_frequency = TermReader.read_term(self) 861 862 # Read the offset delta. 863 864 self.last_info_offset += self.read_number() 865 866 return term, offset, frequency, doc_frequency, self.last_info_offset 867 868 class TermDictionaryWriter: 869 870 "Writing term dictionaries." 871 872 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 873 self.info_writer = info_writer 874 self.index_writer = index_writer 875 self.position_dict_writer = position_dict_writer 876 self.interval = interval 877 self.entry = 0 878 879 def _write_term(self, term, offset, frequency, doc_frequency): 880 881 """ 882 Write the given 'term', its position file 'offset', its 'frequency' and 883 its 'doc_frequency' (number of documents in which it appears) to the 884 term information file. Return the offset after the term information was 885 written to the file. 886 """ 887 888 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 889 890 if self.entry % self.interval == 0: 891 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 892 893 self.entry += 1 894 895 def write_term_positions(self, term, doc_positions): 896 897 """ 898 Write the given 'term' and the 'doc_positions' recording the documents 899 and positions at which the term is found. 900 """ 901 902 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 903 self._write_term(term, offset, frequency, doc_frequency) 904 905 def close(self): 906 self.info_writer.close() 907 self.index_writer.close() 908 self.position_dict_writer.close() 909 910 class TermDictionaryReader: 911 912 "Reading term dictionaries." 913 914 def __init__(self, info_reader, index_reader, position_dict_reader): 915 self.info_reader = info_reader 916 self.index_reader = index_reader 917 self.position_dict_reader = position_dict_reader 918 919 self.terms = [] 920 try: 921 while 1: 922 self.terms.append(self.index_reader.read_term()) 923 except EOFError: 924 pass 925 926 # Large numbers for ordering purposes. 927 928 if self.terms: 929 self.max_offset = self.terms[-1][1] + 1 930 else: 931 self.max_offset = None 932 933 def _find_closest_entry(self, term): 934 935 """ 936 Find the offsets and frequencies of 'term' from the term dictionary or 937 the closest term starting with the value of 'term'. 938 939 Return the closest index entry consisting of a term, the position file 940 offset, the term frequency, the document frequency, and the term details 941 file offset. 942 """ 943 944 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 945 946 # Get the entry position providing the term or one preceding it. 947 # If no entry precedes the requested term, return the very first entry 948 # as the closest. 949 950 if i == -1: 951 return self.terms[0] 952 else: 953 return self.terms[i] 954 955 def _find_closest_term(self, term): 956 957 """ 958 Find the offsets and frequencies of 'term' from the term dictionary or 959 the closest term starting with the value of 'term'. 960 961 Return the closest term (or the term itself), the position file offset, 962 the term frequency, the document frequency, and the term details file 963 offset (or None if the reader is already positioned). 964 """ 965 966 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 967 968 # Where the term is found immediately, return the offset and 969 # frequencies. If the term does not appear, return the details of the 970 # closest entry. 971 972 if term <= found_term: 973 return found_term, offset, frequency, doc_frequency, info_offset 974 975 # Otherwise, seek past the index term's entry in the information file 976 # and scan for the desired term. 977 978 else: 979 self.info_reader.go_to_term(found_term, offset, info_offset) 980 try: 981 while term > found_term: 982 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 983 except EOFError: 984 pass 985 986 return found_term, offset, frequency, doc_frequency, None 987 988 def _find_term(self, term): 989 990 """ 991 Find the position file offset and frequency of 'term' from the term 992 dictionary. 993 """ 994 995 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 996 997 # If the term is found, return the offset and frequencies. 998 999 if term == found_term: 1000 return offset, frequency, doc_frequency 1001 else: 1002 return None 1003 1004 def _get_positions(self, offset, doc_frequency): 1005 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 1006 1007 # Iterator convenience methods. 1008 1009 def __iter__(self): 1010 self.rewind() 1011 return self 1012 1013 def next(self): 1014 try: 1015 return self.read_term() 1016 except EOFError: 1017 raise StopIteration 1018 1019 # Sequential access methods. 1020 1021 def rewind(self): 1022 self.info_reader.rewind() 1023 1024 def read_term(self): 1025 1026 """ 1027 Return the next term, its frequency, its document frequency, and the 1028 documents and positions at which the term is found. 1029 """ 1030 1031 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1032 positions = self._get_positions(offset, doc_frequency) 1033 return term, frequency, doc_frequency, positions 1034 1035 # Query methods. 1036 1037 def find_terms(self, term): 1038 1039 "Return all terms whose values start with the value of 'term'." 1040 1041 terms = [] 1042 1043 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1044 1045 # Position the reader, if necessary. 1046 1047 if info_offset is not None: 1048 self.info_reader.go_to_term(found_term, offset, info_offset) 1049 1050 # Read and record terms. 1051 1052 try: 1053 # Add the found term if it starts with the specified term. 1054 1055 while found_term.startswith(term): 1056 terms.append(found_term) 1057 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1058 1059 except EOFError: 1060 pass 1061 1062 return terms 1063 1064 def find_positions(self, term): 1065 1066 "Return the documents and positions at which the given 'term' is found." 1067 1068 t = self._find_term(term) 1069 if t is None: 1070 return None 1071 else: 1072 offset, frequency, doc_frequency = t 1073 return self._get_positions(offset, doc_frequency) 1074 1075 def get_frequency(self, term): 1076 1077 "Return the frequency of the given 'term'." 1078 1079 t = self._find_term(term) 1080 if t is None: 1081 return None 1082 else: 1083 offset, frequency, doc_frequency = t 1084 return frequency 1085 1086 def get_document_frequency(self, term): 1087 1088 "Return the document frequency of the given 'term'." 1089 1090 t = self._find_term(term) 1091 if t is None: 1092 return None 1093 else: 1094 offset, frequency, doc_frequency = t 1095 return doc_frequency 1096 1097 def close(self): 1098 self.info_reader.close() 1099 self.index_reader.close() 1100 self.position_dict_reader.close() 1101 1102 # Specific classes for storing document information. 1103 1104 class FieldWriter(FileWriter): 1105 1106 "Writing field data to files." 1107 1108 def reset(self): 1109 self.last_docnum = 0 1110 1111 def write_fields(self, docnum, fields): 1112 1113 """ 1114 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1115 representing field identifiers and values respectively). 1116 Return the offset at which the fields are stored. 1117 """ 1118 1119 offset = self.f.tell() 1120 1121 # Write the document number delta. 1122 1123 self.write_number(docnum - self.last_docnum) 1124 1125 # Write the number of fields. 1126 1127 self.write_number(len(fields)) 1128 1129 # Write the fields themselves. 1130 1131 for i, field in fields: 1132 self.write_number(i) 1133 self.write_string(field, 1) # compress 1134 1135 self.last_docnum = docnum 1136 return offset 1137 1138 class FieldReader(FileReader): 1139 1140 "Reading field data from files." 1141 1142 def reset(self): 1143 self.last_docnum = 0 1144 1145 def read_fields(self): 1146 1147 """ 1148 Read fields from the file, returning a tuple containing the document 1149 number and a list of field (identifier, value) pairs. 1150 """ 1151 1152 # Read the document number. 1153 1154 self.last_docnum += self.read_number() 1155 1156 # Read the number of fields. 1157 1158 nfields = self.read_number() 1159 1160 # Collect the fields. 1161 1162 fields = [] 1163 i = 0 1164 1165 while i < nfields: 1166 identifier = self.read_number() 1167 value = self.read_string(1) # decompress 1168 fields.append((identifier, value)) 1169 i += 1 1170 1171 return self.last_docnum, fields 1172 1173 def read_document_fields(self, docnum, offset): 1174 1175 """ 1176 Read fields for 'docnum' at the given 'offset'. This permits the 1177 retrieval of details for the specified document, as well as scanning for 1178 later documents. 1179 """ 1180 1181 self.f.seek(offset) 1182 bad_docnum, fields = self.read_fields() 1183 self.last_docnum = docnum 1184 return docnum, fields 1185 1186 class FieldIndexWriter(FileWriter): 1187 1188 "Writing field index details to files." 1189 1190 def reset(self): 1191 self.last_docnum = 0 1192 self.last_offset = 0 1193 1194 def write_document(self, docnum, offset): 1195 1196 """ 1197 Write for the given 'docnum', the 'offset' at which the fields for the 1198 document are stored in the fields file. 1199 """ 1200 1201 # Write the document number and offset deltas. 1202 1203 self.write_number(docnum - self.last_docnum) 1204 self.write_number(offset - self.last_offset) 1205 1206 self.last_docnum = docnum 1207 self.last_offset = offset 1208 1209 class FieldIndexReader(FileReader): 1210 1211 "Reading field index details from files." 1212 1213 def reset(self): 1214 self.last_docnum = 0 1215 self.last_offset = 0 1216 1217 def read_document(self): 1218 1219 "Read a document number and field file offset." 1220 1221 # Read the document number delta and offset. 1222 1223 self.last_docnum += self.read_number() 1224 self.last_offset += self.read_number() 1225 1226 return self.last_docnum, self.last_offset 1227 1228 class FieldDictionaryWriter: 1229 1230 "Writing field dictionary details." 1231 1232 def __init__(self, field_writer, field_index_writer, interval): 1233 self.field_writer = field_writer 1234 self.field_index_writer = field_index_writer 1235 self.interval = interval 1236 self.entry = 0 1237 1238 def write_fields(self, docnum, fields): 1239 1240 "Write details of the document with the given 'docnum' and 'fields'." 1241 1242 offset = self.field_writer.write_fields(docnum, fields) 1243 1244 if self.entry % self.interval == 0: 1245 self.field_index_writer.write_document(docnum, offset) 1246 1247 self.entry += 1 1248 1249 def close(self): 1250 self.field_writer.close() 1251 self.field_index_writer.close() 1252 1253 class FieldDictionaryReader: 1254 1255 "Reading field dictionary details." 1256 1257 def __init__(self, field_reader, field_index_reader): 1258 self.field_reader = field_reader 1259 self.field_index_reader = field_index_reader 1260 1261 self.docs = [] 1262 try: 1263 while 1: 1264 self.docs.append(self.field_index_reader.read_document()) 1265 except EOFError: 1266 pass 1267 1268 # Large numbers for ordering purposes. 1269 1270 if self.docs: 1271 self.max_offset = self.docs[-1][1] 1272 else: 1273 self.max_offset = None 1274 1275 # Iterator convenience methods. 1276 1277 def __iter__(self): 1278 self.rewind() 1279 return self 1280 1281 def next(self): 1282 try: 1283 return self.read_fields() 1284 except EOFError: 1285 raise StopIteration 1286 1287 # Sequential access methods. 1288 1289 def rewind(self): 1290 self.field_reader.rewind() 1291 1292 def read_fields(self): 1293 1294 "Return the next document number and fields." 1295 1296 return self.field_reader.read_fields() 1297 1298 # Random access methods. 1299 1300 def get_fields(self, docnum): 1301 1302 "Read the fields of the document with the given 'docnum'." 1303 1304 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1305 1306 # Get the entry position providing the term or one preceding it. 1307 1308 if i == -1: 1309 return None 1310 1311 found_docnum, offset = self.docs[i] 1312 1313 # Read from the fields file. 1314 1315 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1316 1317 # Scan for the document, if necessary. 1318 1319 try: 1320 while docnum > found_docnum: 1321 found_docnum, fields = self.field_reader.read_fields() 1322 except EOFError: 1323 pass 1324 1325 # If the document is found, return the fields. 1326 1327 if docnum == found_docnum: 1328 return fields 1329 else: 1330 return None 1331 1332 def close(self): 1333 self.field_reader.close() 1334 self.field_index_reader.close() 1335 1336 # Dictionary merging classes. 1337 1338 class Merger: 1339 1340 "Merge files." 1341 1342 def __init__(self, writer, readers): 1343 self.writer = writer 1344 self.readers = readers 1345 1346 def close(self): 1347 for reader in self.readers: 1348 reader.close() 1349 self.writer.close() 1350 1351 class TermDictionaryMerger(Merger): 1352 1353 "Merge term and position files." 1354 1355 def merge(self): 1356 1357 """ 1358 Merge terms and positions from the readers, sending them to the writer. 1359 """ 1360 1361 last_term = None 1362 current_readers = [] 1363 1364 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1365 if term == last_term: 1366 current_readers.append(positions) 1367 else: 1368 if current_readers: 1369 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1370 last_term = term 1371 current_readers = [positions] 1372 else: 1373 if current_readers: 1374 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1375 1376 class FieldDictionaryMerger(Merger): 1377 1378 "Merge field files." 1379 1380 def merge(self): 1381 1382 """ 1383 Merge fields from the readers, sending them to the writer. 1384 """ 1385 1386 for docnum, fields in itermerge(self.readers): 1387 self.writer.write_fields(docnum, fields) 1388 1389 # Utility functions. 1390 1391 def get_term_writer(pathname, partition, interval, doc_interval): 1392 1393 """ 1394 Return a term dictionary writer using files under the given 'pathname' 1395 labelled according to the given 'partition', using the given indexing 1396 'interval' for terms and 'doc_interval' for document position records. 1397 """ 1398 1399 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1400 info_writer = TermWriter(tdf) 1401 1402 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1403 index_writer = TermIndexWriter(tdif) 1404 1405 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1406 positions_writer = PositionWriter(tpf) 1407 1408 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1409 positions_index_writer = PositionIndexWriter(tpif) 1410 1411 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1412 1413 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1414 1415 def get_field_writer(pathname, partition, interval): 1416 1417 """ 1418 Return a field dictionary writer using files under the given 'pathname' 1419 labelled according to the given 'partition', using the given indexing 1420 'interval'. 1421 """ 1422 1423 ff = open(join(pathname, "fields-%s" % partition), "wb") 1424 field_writer = FieldWriter(ff) 1425 1426 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1427 field_index_writer = FieldIndexWriter(fif) 1428 1429 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1430 1431 def get_term_reader(pathname, partition): 1432 1433 """ 1434 Return a term dictionary reader using files under the given 'pathname' 1435 labelled according to the given 'partition'. 1436 """ 1437 1438 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1439 info_reader = TermReader(tdf) 1440 1441 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1442 index_reader = TermIndexReader(tdif) 1443 1444 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1445 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1446 1447 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1448 1449 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1450 1451 def get_field_reader(pathname, partition): 1452 1453 """ 1454 Return a field dictionary reader using files under the given 'pathname' 1455 labelled according to the given 'partition'. 1456 """ 1457 1458 ff = open(join(pathname, "fields-%s" % partition), "rb") 1459 field_reader = FieldReader(ff) 1460 1461 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1462 field_index_reader = FieldIndexReader(fif) 1463 1464 return FieldDictionaryReader(field_reader, field_index_reader) 1465 1466 def rename_files(pathname, names, from_partition, to_partition): 1467 for name in names: 1468 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1469 1470 def rename_term_files(pathname, from_partition, to_partition): 1471 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1472 1473 def rename_field_files(pathname, from_partition, to_partition): 1474 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1475 1476 def remove_files(pathname, names, partition): 1477 for name in names: 1478 remove(join(pathname, "%s-%s" % (name, partition))) 1479 1480 def remove_term_files(pathname, partition): 1481 remove_files(pathname, TERM_FILENAMES, partition) 1482 1483 def remove_field_files(pathname, partition): 1484 remove_files(pathname, FIELD_FILENAMES, partition) 1485 1486 # High-level classes. 1487 1488 class Document: 1489 1490 "A container of document information." 1491 1492 def __init__(self, docnum): 1493 self.docnum = docnum 1494 self.fields = [] 1495 self.terms = {} 1496 1497 def add_position(self, term, position): 1498 1499 """ 1500 Add a position entry for the given 'term', indicating the given 1501 'position'. 1502 """ 1503 1504 self.terms.setdefault(term, []).append(position) 1505 1506 def add_field(self, identifier, value): 1507 1508 "Add a field having the given 'identifier' and 'value'." 1509 1510 self.fields.append((identifier, unicode(value))) # convert to string 1511 1512 def set_fields(self, fields): 1513 1514 """ 1515 Set the document's 'fields': a list of tuples each containing an integer 1516 identifier and a string value. 1517 """ 1518 1519 self.fields = fields 1520 1521 class IndexWriter: 1522 1523 """ 1524 Building term information and writing it to the term and field dictionaries. 1525 """ 1526 1527 def __init__(self, pathname, interval, doc_interval, flush_interval): 1528 self.pathname = pathname 1529 self.interval = interval 1530 self.doc_interval = doc_interval 1531 self.flush_interval = flush_interval 1532 1533 self.dict_partition = 0 1534 self.field_dict_partition = 0 1535 1536 self.terms = {} 1537 self.docs = {} 1538 1539 self.doc_counter = 0 1540 1541 def add_document(self, doc): 1542 1543 """ 1544 Add the given document 'doc', updating the document counter and flushing 1545 terms and fields if appropriate. 1546 """ 1547 1548 for term, positions in doc.terms.items(): 1549 self.terms.setdefault(term, {})[doc.docnum] = positions 1550 1551 self.docs[doc.docnum] = doc.fields 1552 1553 self.doc_counter += 1 1554 if self.flush_interval and self.doc_counter >= self.flush_interval: 1555 self.flush_terms() 1556 self.flush_fields() 1557 self.doc_counter = 0 1558 1559 def get_term_writer(self): 1560 1561 "Return a term dictionary writer for the current partition." 1562 1563 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1564 1565 def get_field_writer(self): 1566 1567 "Return a field dictionary writer for the current partition." 1568 1569 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1570 1571 def flush_terms(self): 1572 1573 "Flush terms into the current term dictionary partition." 1574 1575 # Get the terms in order. 1576 1577 all_terms = self.terms 1578 terms = all_terms.keys() 1579 terms.sort() 1580 1581 dict_writer = self.get_term_writer() 1582 1583 for term in terms: 1584 doc_positions = all_terms[term].items() 1585 dict_writer.write_term_positions(term, doc_positions) 1586 1587 dict_writer.close() 1588 1589 self.terms = {} 1590 self.dict_partition += 1 1591 1592 def flush_fields(self): 1593 1594 "Flush fields into the current term dictionary partition." 1595 1596 # Get the documents in order. 1597 1598 docs = self.docs.items() 1599 docs.sort() 1600 1601 field_dict_writer = self.get_field_writer() 1602 1603 for docnum, fields in docs: 1604 field_dict_writer.write_fields(docnum, fields) 1605 1606 field_dict_writer.close() 1607 1608 self.docs = {} 1609 self.field_dict_partition += 1 1610 1611 def close(self): 1612 if self.terms: 1613 self.flush_terms() 1614 if self.docs: 1615 self.flush_fields() 1616 1617 class IndexReader: 1618 1619 "Accessing the term and field dictionaries." 1620 1621 def __init__(self, pathname): 1622 self.dict_reader = get_term_reader(pathname, "merged") 1623 self.field_dict_reader = get_field_reader(pathname, "merged") 1624 1625 def find_terms(self, term): 1626 return self.dict_reader.find_terms(term) 1627 1628 def find_positions(self, term): 1629 return self.dict_reader.find_positions(term) 1630 1631 def get_frequency(self, term): 1632 return self.dict_reader.get_frequency(term) 1633 1634 def get_document_frequency(self, term): 1635 return self.dict_reader.get_document_frequency(term) 1636 1637 def get_fields(self, docnum): 1638 return self.field_dict_reader.get_fields(docnum) 1639 1640 def close(self): 1641 self.dict_reader.close() 1642 self.field_dict_reader.close() 1643 1644 class Index: 1645 1646 "An inverted index solution encapsulating the various components." 1647 1648 def __init__(self, pathname): 1649 self.pathname = pathname 1650 self.reader = None 1651 self.writer = None 1652 1653 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1654 1655 """ 1656 Return a writer, optionally using the given indexing 'interval', 1657 'doc_interval' and 'flush_interval'. 1658 """ 1659 1660 if not exists(self.pathname): 1661 mkdir(self.pathname) 1662 1663 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1664 return self.writer 1665 1666 def get_reader(self, partition=0): 1667 1668 "Return a reader for the index." 1669 1670 # Ensure that only one partition exists. 1671 1672 self.merge() 1673 return self._get_reader(partition) 1674 1675 def _get_reader(self, partition): 1676 1677 "Return a reader for the index." 1678 1679 if not exists(self.pathname): 1680 raise OSError, "Index path %r does not exist." % self.pathname 1681 1682 self.reader = IndexReader(self.pathname) 1683 return self.reader 1684 1685 def merge(self): 1686 1687 "Merge/optimise index partitions." 1688 1689 self.merge_terms() 1690 self.merge_fields() 1691 1692 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1693 1694 """ 1695 Merge term dictionaries using the given indexing 'interval' and 1696 'doc_interval'. 1697 """ 1698 1699 readers = [] 1700 partitions = set() 1701 1702 for filename in listdir(self.pathname): 1703 if filename.startswith("terms-"): # 6 character prefix 1704 partition = filename[6:] 1705 readers.append(get_term_reader(self.pathname, partition)) 1706 partitions.add(partition) 1707 1708 # Write directly to a dictionary. 1709 1710 if len(readers) > 1: 1711 if "merged" in partitions: 1712 rename_term_files(self.pathname, "merged", "old-merged") 1713 partitions.remove("merged") 1714 partitions.add("old-merged") 1715 1716 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1717 merger = TermDictionaryMerger(writer, readers) 1718 merger.merge() 1719 merger.close() 1720 1721 # Remove old files. 1722 1723 for partition in partitions: 1724 remove_term_files(self.pathname, partition) 1725 1726 elif len(readers) == 1: 1727 partition = list(partitions)[0] 1728 if partition != "merged": 1729 rename_term_files(self.pathname, partition, "merged") 1730 1731 def merge_fields(self, interval=FIELD_INTERVAL): 1732 1733 "Merge field dictionaries using the given indexing 'interval'." 1734 1735 readers = [] 1736 partitions = set() 1737 1738 for filename in listdir(self.pathname): 1739 if filename.startswith("fields-"): # 7 character prefix 1740 partition = filename[7:] 1741 readers.append(get_field_reader(self.pathname, partition)) 1742 partitions.add(partition) 1743 1744 # Write directly to a dictionary. 1745 1746 if len(readers) > 1: 1747 if "merged" in partitions: 1748 rename_field_files(self.pathname, "merged", "old-merged") 1749 partitions.remove("merged") 1750 partitions.add("old-merged") 1751 1752 writer = get_field_writer(self.pathname, "merged", interval) 1753 merger = FieldDictionaryMerger(writer, readers) 1754 merger.merge() 1755 merger.close() 1756 1757 # Remove old files. 1758 1759 for partition in partitions: 1760 remove_field_files(self.pathname, partition) 1761 1762 elif len(readers) == 1: 1763 partition = list(partitions)[0] 1764 if partition != "merged": 1765 rename_field_files(self.pathname, partition, "merged") 1766 1767 def close(self): 1768 if self.reader is not None: 1769 self.reader.close() 1770 self.reader = None 1771 if self.writer is not None: 1772 self.writer.close() 1773 self.writer = None 1774 1775 # vim: tabstop=4 expandtab shiftwidth=4