1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 try: 51 from vint import vint as _vint 52 53 def vint(number): 54 55 "Write 'number' as a variable-length integer." 56 57 if number >= 0: 58 return _vint(number) 59 else: 60 raise ValueError, "Number %r is negative." % number 61 62 except ImportError: 63 64 def vint(number): 65 66 "Write 'number' as a variable-length integer." 67 68 if number >= 0: 69 70 # Special case: one byte containing a 7-bit number. 71 72 if number < 128: 73 return chr(number) 74 75 # Write the number from least to most significant digits. 76 77 bytes = [] 78 79 while number != 0: 80 lsd = number & 127 81 number = number >> 7 82 if number != 0: 83 lsd |= 128 84 bytes.append(chr(lsd)) 85 86 return "".join(bytes) 87 88 # Negative numbers are not supported. 89 90 else: 91 raise ValueError, "Number %r is negative." % number 92 93 # Foundation classes. 94 95 class File: 96 97 "A basic file abstraction." 98 99 def __init__(self, f): 100 self.f = f 101 self.reset() 102 103 def reset(self): 104 105 "To be used to reset the state of the reader or writer between records." 106 107 pass 108 109 def rewind(self): 110 self.seek(0) 111 self.reset() 112 113 def seek(self, offset): 114 115 "To be defined by readers." 116 117 pass 118 119 def flush(self): 120 121 "To be defined by writers." 122 123 pass 124 125 def close(self): 126 if self.f is not None: 127 self.flush() 128 self.f.close() 129 self.f = None 130 131 class FileWriter(File): 132 133 "Writing basic data types to files." 134 135 def __init__(self, f): 136 File.__init__(self, f) 137 self.cache = [] 138 self.cache_length = 0 139 140 def write_number(self, number): 141 142 "Write 'number' to the file using a variable length encoding." 143 144 self.write(vint(number)) 145 146 def write_string(self, s, compress=0): 147 148 """ 149 Write 's' to the file, recording its length and compressing the string 150 if 'compress' is set to a true value. 151 """ 152 153 # Convert Unicode objects to strings. 154 155 if isinstance(s, unicode): 156 s = s.encode("utf-8") 157 158 # Compress the string if requested. 159 160 if compress: 161 for flag, fn in compressors: 162 cs = fn(s) 163 164 # Take the first string shorter than the original. 165 166 if len(cs) < len(s): 167 s = cs 168 break 169 else: 170 flag = "-" 171 172 else: 173 flag = "" 174 175 # Write the length of the data before the data itself. 176 177 length = len(s) 178 self.write(flag + vint(length) + s) 179 180 # Cache-affected methods. 181 182 def write(self, s): 183 self.cache.append(s) 184 self.cache_length += len(s) 185 if self.cache_length >= 1000: 186 self.flush() 187 188 def tell(self): 189 return self.f.tell() + self.cache_length 190 191 def flush(self): 192 self.f.write("".join(self.cache)) 193 self.cache = [] 194 self.cache_length = 0 195 196 class FileReader(File): 197 198 "Reading basic data types from files." 199 200 def __init__(self, f): 201 File.__init__(self, f) 202 self.cache = "" 203 self.cache_length = 0 204 205 def read_number(self): 206 207 "Read a number from the file." 208 209 # Read each byte, adding it to the number. 210 211 shift = 0 212 number = 0 213 read = self.read 214 215 try: 216 csd = ord(read(1)) 217 while csd & 128: 218 number += ((csd & 127) << shift) 219 shift += 7 220 csd = ord(read(1)) 221 else: 222 number += (csd << shift) 223 except TypeError: 224 raise EOFError 225 226 return number 227 228 def read_string(self, decompress=0): 229 230 """ 231 Read a string from the file, decompressing the stored data if 232 'decompress' is set to a true value. 233 """ 234 235 # Decompress the data if requested. 236 237 if decompress: 238 flag = self.read(1) 239 else: 240 flag = "-" 241 242 length = self.read_number() 243 s = self.read(length) 244 245 # Perform decompression if applicable. 246 247 if flag != "-": 248 fn = decompressors[flag] 249 s = fn(s) 250 251 # Convert strings to Unicode objects. 252 253 return unicode(s, "utf-8") 254 255 # Cache-affected methods. 256 257 def read(self, n): 258 needed = n - self.cache_length 259 if needed > 0: 260 s = self.f.read(max(needed, 1000)) 261 self.cache += s 262 self.cache_length += len(s) 263 264 s = self.cache[:n] 265 self.cache = self.cache[n:] 266 self.cache_length -= len(s) 267 return s 268 269 def tell(self): 270 return self.f.tell() - self.cache_length 271 272 def seek(self, offset): 273 self.f.seek(offset) 274 self.cache = "" 275 self.cache_length = 0 276 277 class FileOpener: 278 279 "Opening files using their filenames." 280 281 def __init__(self, filename): 282 self.filename = filename 283 284 def open(self, mode): 285 return open(self.filename, mode) 286 287 def close(self): 288 pass 289 290 # Specific classes for storing term and position information. 291 292 class PositionWriter(FileWriter): 293 294 "Writing position information to files." 295 296 def reset(self): 297 self.last_docnum = 0 298 299 def write_positions(self, docnum, positions): 300 301 """ 302 Write for the document 'docnum' the given 'positions'. 303 Return the offset of the written record. 304 """ 305 306 if docnum < self.last_docnum: 307 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 308 309 # Record the offset of this record. 310 311 offset = self.tell() 312 313 # Make sure that the positions are sorted. 314 315 positions.sort() 316 317 # Write the position deltas. 318 319 output = [] 320 last = 0 321 322 for position in positions: 323 output.append(vint(position - last)) 324 last = position 325 326 # Write the document number delta. 327 # Write the number of positions. 328 # Then write the positions. 329 330 self.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 331 332 self.last_docnum = docnum 333 return offset 334 335 class PositionOpener(FileOpener): 336 337 "Reading position information from files." 338 339 def read_term_positions(self, offset, count): 340 341 """ 342 Read all positions from 'offset', seeking to that position in the file 343 before reading. The number of documents available for reading is limited 344 to 'count'. 345 """ 346 347 # Duplicate the file handle. 348 349 f = self.open("rb") 350 return PositionIterator(f, offset, count) 351 352 class PositionIndexWriter(FileWriter): 353 354 "Writing position index information to files." 355 356 def reset(self): 357 self.last_docnum = 0 358 self.last_pos_offset = 0 359 360 def write_positions(self, docnum, pos_offset, count): 361 362 """ 363 Write the given 'docnum, 'pos_offset' and document 'count' to the 364 position index file. 365 """ 366 367 # Record the offset of this record. 368 369 offset = self.tell() 370 output = [] 371 372 # Write the document number delta. 373 374 output.append(vint(docnum - self.last_docnum)) 375 self.last_docnum = docnum 376 377 # Write the position file offset delta. 378 379 output.append(vint(pos_offset - self.last_pos_offset)) 380 self.last_pos_offset = pos_offset 381 382 # Write the document count. 383 384 output.append(vint(count)) 385 386 # Actually write the data. 387 388 self.write("".join(output)) 389 390 return offset 391 392 class PositionIndexOpener(FileOpener): 393 394 "Reading position index information from files." 395 396 def read_term_positions(self, offset, doc_frequency): 397 398 """ 399 Read all positions from 'offset', seeking to that position in the file 400 before reading. The number of documents available for reading is limited 401 to 'doc_frequency'. 402 """ 403 404 # Duplicate the file handle. 405 406 f = self.open("rb") 407 return PositionIndexIterator(f, offset, doc_frequency) 408 409 # Iterators for position-related files. 410 411 class IteratorBase: 412 413 def __init__(self, count): 414 self.replenish(count) 415 416 def replenish(self, count): 417 self.count = count 418 self.read_documents = 0 419 420 def __len__(self): 421 return self.count 422 423 def sort(self): 424 pass # Stored document positions are already sorted. 425 426 def __iter__(self): 427 return self 428 429 class PositionIterator(FileReader, IteratorBase): 430 431 "Iterating over document positions." 432 433 def __init__(self, f, offset, count): 434 FileReader.__init__(self, f) 435 IteratorBase.__init__(self, count) 436 self.seek(offset) 437 438 def reset(self): 439 self.last_docnum = 0 440 441 def read_positions(self): 442 443 "Read positions, returning a document number and a list of positions." 444 445 # Read the document number delta and add it to the last number. 446 447 self.last_docnum += self.read_number() 448 449 # Read the number of positions. 450 451 npositions = self.read_number() 452 453 # Read the position deltas, adding each previous position to get the 454 # appropriate collection of absolute positions. 455 456 i = 0 457 last = 0 458 positions = [] 459 460 while i < npositions: 461 last += self.read_number() 462 positions.append(last) 463 i += 1 464 465 return self.last_docnum, positions 466 467 def next(self): 468 469 "Read positions for a single document." 470 471 if self.read_documents < self.count: 472 self.read_documents += 1 473 return self.read_positions() 474 else: 475 raise StopIteration 476 477 class PositionIndexIterator(FileReader, IteratorBase): 478 479 "Iterating over document positions." 480 481 def __init__(self, f, offset, count): 482 FileReader.__init__(self, f) 483 IteratorBase.__init__(self, count) 484 self.seek(offset) 485 self.section_count = 0 486 487 def reset(self): 488 self.last_docnum = 0 489 self.last_pos_offset = 0 490 491 def read_positions(self): 492 493 """ 494 Read a document number, a position file offset for the position index 495 file, and the number of documents in a section of that file. 496 """ 497 498 # Read the document number delta. 499 500 self.last_docnum += self.read_number() 501 502 # Read the offset delta. 503 504 self.last_pos_offset += self.read_number() 505 506 # Read the document count. 507 508 count = self.read_number() 509 510 return self.last_docnum, self.last_pos_offset, count 511 512 def next(self): 513 514 "Read positions for a single document." 515 516 self.read_documents += self.section_count 517 if self.read_documents < self.count: 518 docnum, pos_offset, self.section_count = t = self.read_positions() 519 return t 520 else: 521 raise StopIteration 522 523 class PositionDictionaryWriter: 524 525 "Writing position dictionaries." 526 527 def __init__(self, position_writer, position_index_writer, interval): 528 self.position_writer = position_writer 529 self.position_index_writer = position_index_writer 530 self.interval = interval 531 532 def write_term_positions(self, doc_positions): 533 534 """ 535 Write all 'doc_positions' - a collection of tuples of the form (document 536 number, position list) - to the file. 537 538 Add some records to the index, making dictionary entries. 539 540 Return a tuple containing the offset of the written data, the frequency 541 (number of positions), and document frequency (number of documents) for 542 the term involved. 543 """ 544 545 # Reset the writers. 546 547 self.position_writer.reset() 548 self.position_index_writer.reset() 549 550 index_offset = None 551 552 # Write the positions. 553 554 frequency = 0 555 first_docnum = None 556 first_offset = None 557 count = 0 558 559 doc_positions.sort() 560 561 for docnum, positions in doc_positions: 562 pos_offset = self.position_writer.write_positions(docnum, positions) 563 564 # Retain the first record offset for a subsequent index entry. 565 566 if first_offset is None: 567 first_offset = pos_offset 568 first_docnum = docnum 569 570 frequency += len(positions) 571 count += 1 572 573 # Every {interval} entries, write an index entry. 574 575 if count % self.interval == 0: 576 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 577 578 # Remember the first index entry offset. 579 580 if index_offset is None: 581 index_offset = io 582 583 first_offset = None 584 first_docnum = None 585 586 # Reset the position writer so that position readers accessing 587 # a section start with the correct document number. 588 589 self.position_writer.reset() 590 591 # Finish writing an index entry for the remaining documents. 592 593 else: 594 if first_offset is not None: 595 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 596 597 # Remember the first index entry offset. 598 599 if index_offset is None: 600 index_offset = io 601 602 return index_offset, frequency, count 603 604 def close(self): 605 self.position_writer.close() 606 self.position_index_writer.close() 607 608 class PositionDictionaryReader: 609 610 "Reading position dictionaries." 611 612 def __init__(self, position_opener, position_index_opener): 613 self.position_opener = position_opener 614 self.position_index_opener = position_index_opener 615 616 def read_term_positions(self, offset, doc_frequency): 617 618 """ 619 Return an iterator for dictionary entries starting at 'offset' with the 620 given 'doc_frequency'. 621 """ 622 623 return PositionDictionaryIterator(self.position_opener, 624 self.position_index_opener, offset, doc_frequency) 625 626 def close(self): 627 pass 628 629 class PositionDictionaryIterator: 630 631 "Iteration over position dictionary entries." 632 633 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 634 self.position_opener = position_opener 635 self.doc_frequency = doc_frequency 636 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 637 self.iterator = None 638 639 # Remember the last values. 640 641 self.found_docnum, self.found_positions = None, None 642 643 # Maintain state for the next index entry, if read. 644 645 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 646 647 # Initialise the current index entry and current position file iterator. 648 649 self._next_section() 650 self._init_section() 651 652 # Sequence methods. 653 654 def __len__(self): 655 return self.doc_frequency 656 657 def sort(self): 658 pass 659 660 # Iterator methods. 661 662 def __iter__(self): 663 return self 664 665 def next(self): 666 667 """ 668 Attempt to get the next document record from the section in the 669 positions file. 670 """ 671 672 # Return any visited but unrequested record. 673 674 if self.found_docnum is not None: 675 t = self.found_docnum, self.found_positions 676 self.found_docnum, self.found_positions = None, None 677 return t 678 679 # Or search for the next record. 680 681 while 1: 682 683 # Either return the next record. 684 685 try: 686 return self.iterator.next() 687 688 # Or, where a section is finished, get the next section and try again. 689 690 except StopIteration: 691 692 # Where a section follows, update the index iterator, but keep 693 # reading using the same file iterator (since the data should 694 # just follow on from the last section). 695 696 self._next_section() 697 self.iterator.replenish(self.section_count) 698 699 # Reset the state of the iterator to make sure that document 700 # numbers are correct. 701 702 self.iterator.reset() 703 704 def from_document(self, docnum): 705 706 """ 707 Attempt to navigate to a positions entry for the given 'docnum', 708 returning the positions for 'docnum', or None otherwise. 709 """ 710 711 # Return any unrequested document positions. 712 713 if docnum == self.found_docnum: 714 return self.found_positions 715 716 # Read ahead in the index until the next entry refers to a document 717 # later than the desired document. 718 719 try: 720 if self.next_docnum is None: 721 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 722 723 # Read until the next entry is after the desired document number, 724 # or until the end of the results. 725 726 while self.next_docnum <= docnum: 727 self._next_read_section() 728 if self.docnum < docnum: 729 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 730 else: 731 break 732 733 except StopIteration: 734 pass 735 736 # Navigate in the position file to the document. 737 738 self._init_section() 739 740 try: 741 while 1: 742 found_docnum, found_positions = self.iterator.next() 743 744 # Return the desired document positions or None (retaining the 745 # positions for the document immediately after). 746 747 if docnum == found_docnum: 748 return found_positions 749 elif docnum < found_docnum: 750 self.found_docnum, self.found_positions = found_docnum, found_positions 751 return None 752 753 except StopIteration: 754 return None 755 756 # Internal methods. 757 758 def _next_section(self): 759 760 "Attempt to get the next section in the index." 761 762 if self.next_docnum is None: 763 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 764 else: 765 self._next_read_section() 766 767 def _next_read_section(self): 768 769 """ 770 Make the next index entry the current one without reading from the 771 index. 772 """ 773 774 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 775 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 776 777 def _init_section(self): 778 779 "Initialise the iterator for the section in the position file." 780 781 if self.iterator is not None: 782 self.iterator.close() 783 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 784 785 def close(self): 786 if self.iterator is not None: 787 self.iterator.close() 788 self.iterator = None 789 if self.index_iterator is not None: 790 self.index_iterator.close() 791 self.index_iterator = None 792 793 class TermWriter(FileWriter): 794 795 "Writing term information to files." 796 797 def reset(self): 798 self.last_term = "" 799 self.last_offset = 0 800 801 def write_term(self, term, offset, frequency, doc_frequency): 802 803 """ 804 Write the given 'term', its position file 'offset', its 'frequency' and 805 its 'doc_frequency' (number of documents in which it appears) to the 806 term information file. Return the offset after the term information was 807 written to the file. 808 """ 809 810 # Write the prefix length and term suffix. 811 812 common = len(commonprefix([self.last_term, term])) 813 suffix = term[common:] 814 815 self.write_number(common) 816 self.write_string(suffix) 817 818 # Write the offset delta. 819 820 self.write_number(offset - self.last_offset) 821 822 # Write the frequency. 823 824 self.write_number(frequency) 825 826 # Write the document frequency. 827 828 self.write_number(doc_frequency) 829 830 self.last_term = term 831 self.last_offset = offset 832 833 return self.tell() 834 835 class TermReader(FileReader): 836 837 "Reading term information from files." 838 839 def reset(self): 840 self.last_term = "" 841 self.last_offset = 0 842 843 def read_term(self): 844 845 """ 846 Read a term, its position file offset, its frequency and its document 847 frequency from the term information file. 848 """ 849 850 # Read the prefix length and term suffix. 851 852 common = self.read_number() 853 suffix = self.read_string() 854 855 self.last_term = self.last_term[:common] + suffix 856 857 # Read the offset delta. 858 859 self.last_offset += self.read_number() 860 861 # Read the frequency. 862 863 frequency = self.read_number() 864 865 # Read the document frequency. 866 867 doc_frequency = self.read_number() 868 869 return self.last_term, self.last_offset, frequency, doc_frequency 870 871 def go_to_term(self, term, offset, info_offset): 872 873 """ 874 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 875 permits the scanning for later terms from the specified term. 876 """ 877 878 self.seek(info_offset) 879 self.last_term = term 880 self.last_offset = offset 881 882 class TermIndexWriter(TermWriter): 883 884 "Writing term dictionary index details to files." 885 886 def reset(self): 887 TermWriter.reset(self) 888 self.last_info_offset = 0 889 890 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 891 892 """ 893 Write the given 'term', its position file 'offset', its 'frequency' and 894 its 'doc_frequency' to the term dictionary index file, along with the 895 'info_offset' in the term information file. 896 """ 897 898 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 899 900 # Write the information file offset delta. 901 902 self.write_number(info_offset - self.last_info_offset) 903 self.last_info_offset = info_offset 904 905 class TermIndexReader(TermReader): 906 907 "Reading term dictionary index details from files." 908 909 def reset(self): 910 TermReader.reset(self) 911 self.last_info_offset = 0 912 913 def read_term(self): 914 915 """ 916 Read a term, its position file offset, its frequency, its document 917 frequency and a term information file offset from the term dictionary 918 index file. 919 """ 920 921 term, offset, frequency, doc_frequency = TermReader.read_term(self) 922 923 # Read the offset delta. 924 925 self.last_info_offset += self.read_number() 926 927 return term, offset, frequency, doc_frequency, self.last_info_offset 928 929 class TermDictionaryWriter: 930 931 "Writing term dictionaries." 932 933 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 934 self.info_writer = info_writer 935 self.index_writer = index_writer 936 self.position_dict_writer = position_dict_writer 937 self.interval = interval 938 self.entry = 0 939 940 def _write_term(self, term, offset, frequency, doc_frequency): 941 942 """ 943 Write the given 'term', its position file 'offset', its 'frequency' and 944 its 'doc_frequency' (number of documents in which it appears) to the 945 term information file. Return the offset after the term information was 946 written to the file. 947 """ 948 949 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 950 951 if self.entry % self.interval == 0: 952 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 953 954 self.entry += 1 955 956 def write_term_positions(self, term, doc_positions): 957 958 """ 959 Write the given 'term' and the 'doc_positions' recording the documents 960 and positions at which the term is found. 961 """ 962 963 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 964 self._write_term(term, offset, frequency, doc_frequency) 965 966 def close(self): 967 self.info_writer.close() 968 self.index_writer.close() 969 self.position_dict_writer.close() 970 971 class TermDictionaryReader: 972 973 "Reading term dictionaries." 974 975 def __init__(self, info_reader, index_reader, position_dict_reader): 976 self.info_reader = info_reader 977 self.index_reader = index_reader 978 self.position_dict_reader = position_dict_reader 979 980 self.terms = [] 981 try: 982 while 1: 983 self.terms.append(self.index_reader.read_term()) 984 except EOFError: 985 pass 986 987 # Large numbers for ordering purposes. 988 989 if self.terms: 990 self.max_offset = self.terms[-1][1] + 1 991 else: 992 self.max_offset = None 993 994 def _find_closest_entry(self, term): 995 996 """ 997 Find the offsets and frequencies of 'term' from the term dictionary or 998 the closest term starting with the value of 'term'. 999 1000 Return the closest index entry consisting of a term, the position file 1001 offset, the term frequency, the document frequency, and the term details 1002 file offset. 1003 """ 1004 1005 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 1006 1007 # Get the entry position providing the term or one preceding it. 1008 # If no entry precedes the requested term, return the very first entry 1009 # as the closest. 1010 1011 if i == -1: 1012 return self.terms[0] 1013 else: 1014 return self.terms[i] 1015 1016 def _find_closest_term(self, term): 1017 1018 """ 1019 Find the offsets and frequencies of 'term' from the term dictionary or 1020 the closest term starting with the value of 'term'. 1021 1022 Return the closest term (or the term itself), the position file offset, 1023 the term frequency, the document frequency, and the term details file 1024 offset (or None if the reader is already positioned). 1025 """ 1026 1027 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 1028 1029 # Where the term is found immediately, return the offset and 1030 # frequencies. If the term does not appear, return the details of the 1031 # closest entry. 1032 1033 if term <= found_term: 1034 return found_term, offset, frequency, doc_frequency, info_offset 1035 1036 # Otherwise, seek past the index term's entry in the information file 1037 # and scan for the desired term. 1038 1039 else: 1040 self.info_reader.go_to_term(found_term, offset, info_offset) 1041 try: 1042 while term > found_term: 1043 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1044 except EOFError: 1045 pass 1046 1047 return found_term, offset, frequency, doc_frequency, None 1048 1049 def _find_term(self, term): 1050 1051 """ 1052 Find the position file offset and frequency of 'term' from the term 1053 dictionary. 1054 """ 1055 1056 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1057 1058 # If the term is found, return the offset and frequencies. 1059 1060 if term == found_term: 1061 return offset, frequency, doc_frequency 1062 else: 1063 return None 1064 1065 def _get_positions(self, offset, doc_frequency): 1066 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 1067 1068 # Iterator convenience methods. 1069 1070 def __iter__(self): 1071 self.rewind() 1072 return self 1073 1074 def next(self): 1075 try: 1076 return self.read_term() 1077 except EOFError: 1078 raise StopIteration 1079 1080 # Sequential access methods. 1081 1082 def rewind(self): 1083 self.info_reader.rewind() 1084 1085 def read_term(self): 1086 1087 """ 1088 Return the next term, its frequency, its document frequency, and the 1089 documents and positions at which the term is found. 1090 """ 1091 1092 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1093 positions = self._get_positions(offset, doc_frequency) 1094 return term, frequency, doc_frequency, positions 1095 1096 # Query methods. 1097 1098 def find_terms(self, term): 1099 1100 "Return all terms whose values start with the value of 'term'." 1101 1102 terms = [] 1103 1104 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1105 1106 # Position the reader, if necessary. 1107 1108 if info_offset is not None: 1109 self.info_reader.go_to_term(found_term, offset, info_offset) 1110 1111 # Read and record terms. 1112 1113 try: 1114 # Add the found term if it starts with the specified term. 1115 1116 while found_term.startswith(term): 1117 terms.append(found_term) 1118 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1119 1120 except EOFError: 1121 pass 1122 1123 return terms 1124 1125 def find_positions(self, term): 1126 1127 "Return the documents and positions at which the given 'term' is found." 1128 1129 t = self._find_term(term) 1130 if t is None: 1131 return None 1132 else: 1133 offset, frequency, doc_frequency = t 1134 return self._get_positions(offset, doc_frequency) 1135 1136 def get_frequency(self, term): 1137 1138 "Return the frequency of the given 'term'." 1139 1140 t = self._find_term(term) 1141 if t is None: 1142 return None 1143 else: 1144 offset, frequency, doc_frequency = t 1145 return frequency 1146 1147 def get_document_frequency(self, term): 1148 1149 "Return the document frequency of the given 'term'." 1150 1151 t = self._find_term(term) 1152 if t is None: 1153 return None 1154 else: 1155 offset, frequency, doc_frequency = t 1156 return doc_frequency 1157 1158 def close(self): 1159 self.info_reader.close() 1160 self.index_reader.close() 1161 self.position_dict_reader.close() 1162 1163 # Specific classes for storing document information. 1164 1165 class FieldWriter(FileWriter): 1166 1167 "Writing field data to files." 1168 1169 def reset(self): 1170 self.last_docnum = 0 1171 1172 def write_fields(self, docnum, fields): 1173 1174 """ 1175 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1176 representing field identifiers and values respectively). 1177 Return the offset at which the fields are stored. 1178 """ 1179 1180 offset = self.tell() 1181 1182 # Write the document number delta. 1183 1184 self.write_number(docnum - self.last_docnum) 1185 1186 # Write the number of fields. 1187 1188 self.write_number(len(fields)) 1189 1190 # Write the fields themselves. 1191 1192 for i, field in fields: 1193 self.write_number(i) 1194 self.write_string(field, 1) # compress 1195 1196 self.last_docnum = docnum 1197 return offset 1198 1199 class FieldReader(FileReader): 1200 1201 "Reading field data from files." 1202 1203 def reset(self): 1204 self.last_docnum = 0 1205 1206 def read_fields(self): 1207 1208 """ 1209 Read fields from the file, returning a tuple containing the document 1210 number and a list of field (identifier, value) pairs. 1211 """ 1212 1213 # Read the document number. 1214 1215 self.last_docnum += self.read_number() 1216 1217 # Read the number of fields. 1218 1219 nfields = self.read_number() 1220 1221 # Collect the fields. 1222 1223 fields = [] 1224 i = 0 1225 1226 while i < nfields: 1227 identifier = self.read_number() 1228 value = self.read_string(1) # decompress 1229 fields.append((identifier, value)) 1230 i += 1 1231 1232 return self.last_docnum, fields 1233 1234 def read_document_fields(self, docnum, offset): 1235 1236 """ 1237 Read fields for 'docnum' at the given 'offset'. This permits the 1238 retrieval of details for the specified document, as well as scanning for 1239 later documents. 1240 """ 1241 1242 self.seek(offset) 1243 bad_docnum, fields = self.read_fields() 1244 self.last_docnum = docnum 1245 return docnum, fields 1246 1247 class FieldIndexWriter(FileWriter): 1248 1249 "Writing field index details to files." 1250 1251 def reset(self): 1252 self.last_docnum = 0 1253 self.last_offset = 0 1254 1255 def write_document(self, docnum, offset): 1256 1257 """ 1258 Write for the given 'docnum', the 'offset' at which the fields for the 1259 document are stored in the fields file. 1260 """ 1261 1262 # Write the document number and offset deltas. 1263 1264 self.write_number(docnum - self.last_docnum) 1265 self.write_number(offset - self.last_offset) 1266 1267 self.last_docnum = docnum 1268 self.last_offset = offset 1269 1270 class FieldIndexReader(FileReader): 1271 1272 "Reading field index details from files." 1273 1274 def reset(self): 1275 self.last_docnum = 0 1276 self.last_offset = 0 1277 1278 def read_document(self): 1279 1280 "Read a document number and field file offset." 1281 1282 # Read the document number delta and offset. 1283 1284 self.last_docnum += self.read_number() 1285 self.last_offset += self.read_number() 1286 1287 return self.last_docnum, self.last_offset 1288 1289 class FieldDictionaryWriter: 1290 1291 "Writing field dictionary details." 1292 1293 def __init__(self, field_writer, field_index_writer, interval): 1294 self.field_writer = field_writer 1295 self.field_index_writer = field_index_writer 1296 self.interval = interval 1297 self.entry = 0 1298 1299 def write_fields(self, docnum, fields): 1300 1301 "Write details of the document with the given 'docnum' and 'fields'." 1302 1303 offset = self.field_writer.write_fields(docnum, fields) 1304 1305 if self.entry % self.interval == 0: 1306 self.field_index_writer.write_document(docnum, offset) 1307 1308 self.entry += 1 1309 1310 def close(self): 1311 self.field_writer.close() 1312 self.field_index_writer.close() 1313 1314 class FieldDictionaryReader: 1315 1316 "Reading field dictionary details." 1317 1318 def __init__(self, field_reader, field_index_reader): 1319 self.field_reader = field_reader 1320 self.field_index_reader = field_index_reader 1321 1322 self.docs = [] 1323 try: 1324 while 1: 1325 self.docs.append(self.field_index_reader.read_document()) 1326 except EOFError: 1327 pass 1328 1329 # Large numbers for ordering purposes. 1330 1331 if self.docs: 1332 self.max_offset = self.docs[-1][1] 1333 else: 1334 self.max_offset = None 1335 1336 # Iterator convenience methods. 1337 1338 def __iter__(self): 1339 self.rewind() 1340 return self 1341 1342 def next(self): 1343 try: 1344 return self.read_fields() 1345 except EOFError: 1346 raise StopIteration 1347 1348 # Sequential access methods. 1349 1350 def rewind(self): 1351 self.field_reader.rewind() 1352 1353 def read_fields(self): 1354 1355 "Return the next document number and fields." 1356 1357 return self.field_reader.read_fields() 1358 1359 # Random access methods. 1360 1361 def get_fields(self, docnum): 1362 1363 "Read the fields of the document with the given 'docnum'." 1364 1365 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1366 1367 # Get the entry position providing the term or one preceding it. 1368 1369 if i == -1: 1370 return None 1371 1372 found_docnum, offset = self.docs[i] 1373 1374 # Read from the fields file. 1375 1376 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1377 1378 # Scan for the document, if necessary. 1379 1380 try: 1381 while docnum > found_docnum: 1382 found_docnum, fields = self.field_reader.read_fields() 1383 except EOFError: 1384 pass 1385 1386 # If the document is found, return the fields. 1387 1388 if docnum == found_docnum: 1389 return fields 1390 else: 1391 return None 1392 1393 def close(self): 1394 self.field_reader.close() 1395 self.field_index_reader.close() 1396 1397 # Dictionary merging classes. 1398 1399 class Merger: 1400 1401 "Merge files." 1402 1403 def __init__(self, writer, readers): 1404 self.writer = writer 1405 self.readers = readers 1406 1407 def close(self): 1408 for reader in self.readers: 1409 reader.close() 1410 self.writer.close() 1411 1412 class TermDictionaryMerger(Merger): 1413 1414 "Merge term and position files." 1415 1416 def merge(self): 1417 1418 """ 1419 Merge terms and positions from the readers, sending them to the writer. 1420 """ 1421 1422 last_term = None 1423 current_readers = [] 1424 1425 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1426 if term == last_term: 1427 current_readers.append(positions) 1428 else: 1429 if current_readers: 1430 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1431 last_term = term 1432 current_readers = [positions] 1433 else: 1434 if current_readers: 1435 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1436 1437 class FieldDictionaryMerger(Merger): 1438 1439 "Merge field files." 1440 1441 def merge(self): 1442 1443 """ 1444 Merge fields from the readers, sending them to the writer. 1445 """ 1446 1447 for docnum, fields in itermerge(self.readers): 1448 self.writer.write_fields(docnum, fields) 1449 1450 # Utility functions. 1451 1452 def get_term_writer(pathname, partition, interval, doc_interval): 1453 1454 """ 1455 Return a term dictionary writer using files under the given 'pathname' 1456 labelled according to the given 'partition', using the given indexing 1457 'interval' for terms and 'doc_interval' for document position records. 1458 """ 1459 1460 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1461 info_writer = TermWriter(tdf) 1462 1463 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1464 index_writer = TermIndexWriter(tdif) 1465 1466 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1467 positions_writer = PositionWriter(tpf) 1468 1469 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1470 positions_index_writer = PositionIndexWriter(tpif) 1471 1472 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1473 1474 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1475 1476 def get_field_writer(pathname, partition, interval): 1477 1478 """ 1479 Return a field dictionary writer using files under the given 'pathname' 1480 labelled according to the given 'partition', using the given indexing 1481 'interval'. 1482 """ 1483 1484 ff = open(join(pathname, "fields-%s" % partition), "wb") 1485 field_writer = FieldWriter(ff) 1486 1487 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1488 field_index_writer = FieldIndexWriter(fif) 1489 1490 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1491 1492 def get_term_reader(pathname, partition): 1493 1494 """ 1495 Return a term dictionary reader using files under the given 'pathname' 1496 labelled according to the given 'partition'. 1497 """ 1498 1499 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1500 info_reader = TermReader(tdf) 1501 1502 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1503 index_reader = TermIndexReader(tdif) 1504 1505 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1506 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1507 1508 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1509 1510 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1511 1512 def get_field_reader(pathname, partition): 1513 1514 """ 1515 Return a field dictionary reader using files under the given 'pathname' 1516 labelled according to the given 'partition'. 1517 """ 1518 1519 ff = open(join(pathname, "fields-%s" % partition), "rb") 1520 field_reader = FieldReader(ff) 1521 1522 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1523 field_index_reader = FieldIndexReader(fif) 1524 1525 return FieldDictionaryReader(field_reader, field_index_reader) 1526 1527 def rename_files(pathname, names, from_partition, to_partition): 1528 for name in names: 1529 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1530 1531 def rename_term_files(pathname, from_partition, to_partition): 1532 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1533 1534 def rename_field_files(pathname, from_partition, to_partition): 1535 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1536 1537 def remove_files(pathname, names, partition): 1538 for name in names: 1539 remove(join(pathname, "%s-%s" % (name, partition))) 1540 1541 def remove_term_files(pathname, partition): 1542 remove_files(pathname, TERM_FILENAMES, partition) 1543 1544 def remove_field_files(pathname, partition): 1545 remove_files(pathname, FIELD_FILENAMES, partition) 1546 1547 # High-level classes. 1548 1549 class Document: 1550 1551 "A container of document information." 1552 1553 def __init__(self, docnum): 1554 self.docnum = docnum 1555 self.fields = [] 1556 self.terms = {} 1557 1558 def add_position(self, term, position): 1559 1560 """ 1561 Add a position entry for the given 'term', indicating the given 1562 'position'. 1563 """ 1564 1565 self.terms.setdefault(term, []).append(position) 1566 1567 def add_field(self, identifier, value): 1568 1569 "Add a field having the given 'identifier' and 'value'." 1570 1571 self.fields.append((identifier, unicode(value))) # convert to string 1572 1573 def set_fields(self, fields): 1574 1575 """ 1576 Set the document's 'fields': a list of tuples each containing an integer 1577 identifier and a string value. 1578 """ 1579 1580 self.fields = fields 1581 1582 class IndexWriter: 1583 1584 """ 1585 Building term information and writing it to the term and field dictionaries. 1586 """ 1587 1588 def __init__(self, pathname, interval, doc_interval, flush_interval): 1589 self.pathname = pathname 1590 self.interval = interval 1591 self.doc_interval = doc_interval 1592 self.flush_interval = flush_interval 1593 1594 self.dict_partition = 0 1595 self.field_dict_partition = 0 1596 1597 self.terms = {} 1598 self.docs = {} 1599 1600 self.doc_counter = 0 1601 1602 def add_document(self, doc): 1603 1604 """ 1605 Add the given document 'doc', updating the document counter and flushing 1606 terms and fields if appropriate. 1607 """ 1608 1609 for term, positions in doc.terms.items(): 1610 self.terms.setdefault(term, {})[doc.docnum] = positions 1611 1612 self.docs[doc.docnum] = doc.fields 1613 1614 self.doc_counter += 1 1615 if self.flush_interval and self.doc_counter >= self.flush_interval: 1616 self.flush_terms() 1617 self.flush_fields() 1618 self.doc_counter = 0 1619 1620 def get_term_writer(self): 1621 1622 "Return a term dictionary writer for the current partition." 1623 1624 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1625 1626 def get_field_writer(self): 1627 1628 "Return a field dictionary writer for the current partition." 1629 1630 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1631 1632 def flush_terms(self): 1633 1634 "Flush terms into the current term dictionary partition." 1635 1636 # Get the terms in order. 1637 1638 all_terms = self.terms 1639 terms = all_terms.keys() 1640 terms.sort() 1641 1642 dict_writer = self.get_term_writer() 1643 1644 for term in terms: 1645 doc_positions = all_terms[term].items() 1646 dict_writer.write_term_positions(term, doc_positions) 1647 1648 dict_writer.close() 1649 1650 self.terms = {} 1651 self.dict_partition += 1 1652 1653 def flush_fields(self): 1654 1655 "Flush fields into the current term dictionary partition." 1656 1657 # Get the documents in order. 1658 1659 docs = self.docs.items() 1660 docs.sort() 1661 1662 field_dict_writer = self.get_field_writer() 1663 1664 for docnum, fields in docs: 1665 field_dict_writer.write_fields(docnum, fields) 1666 1667 field_dict_writer.close() 1668 1669 self.docs = {} 1670 self.field_dict_partition += 1 1671 1672 def close(self): 1673 if self.terms: 1674 self.flush_terms() 1675 if self.docs: 1676 self.flush_fields() 1677 1678 class IndexReader: 1679 1680 "Accessing the term and field dictionaries." 1681 1682 def __init__(self, pathname): 1683 self.dict_reader = get_term_reader(pathname, "merged") 1684 self.field_dict_reader = get_field_reader(pathname, "merged") 1685 1686 def find_terms(self, term): 1687 return self.dict_reader.find_terms(term) 1688 1689 def find_positions(self, term): 1690 return self.dict_reader.find_positions(term) 1691 1692 def get_frequency(self, term): 1693 return self.dict_reader.get_frequency(term) 1694 1695 def get_document_frequency(self, term): 1696 return self.dict_reader.get_document_frequency(term) 1697 1698 def get_fields(self, docnum): 1699 return self.field_dict_reader.get_fields(docnum) 1700 1701 def close(self): 1702 self.dict_reader.close() 1703 self.field_dict_reader.close() 1704 1705 class Index: 1706 1707 "An inverted index solution encapsulating the various components." 1708 1709 def __init__(self, pathname): 1710 self.pathname = pathname 1711 self.reader = None 1712 self.writer = None 1713 1714 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1715 1716 """ 1717 Return a writer, optionally using the given indexing 'interval', 1718 'doc_interval' and 'flush_interval'. 1719 """ 1720 1721 if not exists(self.pathname): 1722 mkdir(self.pathname) 1723 1724 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1725 return self.writer 1726 1727 def get_reader(self, partition=0): 1728 1729 "Return a reader for the index." 1730 1731 # Ensure that only one partition exists. 1732 1733 self.merge() 1734 return self._get_reader(partition) 1735 1736 def _get_reader(self, partition): 1737 1738 "Return a reader for the index." 1739 1740 if not exists(self.pathname): 1741 raise OSError, "Index path %r does not exist." % self.pathname 1742 1743 self.reader = IndexReader(self.pathname) 1744 return self.reader 1745 1746 def merge(self): 1747 1748 "Merge/optimise index partitions." 1749 1750 self.merge_terms() 1751 self.merge_fields() 1752 1753 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1754 1755 """ 1756 Merge term dictionaries using the given indexing 'interval' and 1757 'doc_interval'. 1758 """ 1759 1760 readers = [] 1761 partitions = set() 1762 1763 for filename in listdir(self.pathname): 1764 if filename.startswith("terms-"): # 6 character prefix 1765 partition = filename[6:] 1766 readers.append(get_term_reader(self.pathname, partition)) 1767 partitions.add(partition) 1768 1769 # Write directly to a dictionary. 1770 1771 if len(readers) > 1: 1772 if "merged" in partitions: 1773 rename_term_files(self.pathname, "merged", "old-merged") 1774 partitions.remove("merged") 1775 partitions.add("old-merged") 1776 1777 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1778 merger = TermDictionaryMerger(writer, readers) 1779 merger.merge() 1780 merger.close() 1781 1782 # Remove old files. 1783 1784 for partition in partitions: 1785 remove_term_files(self.pathname, partition) 1786 1787 elif len(readers) == 1: 1788 partition = list(partitions)[0] 1789 if partition != "merged": 1790 rename_term_files(self.pathname, partition, "merged") 1791 1792 def merge_fields(self, interval=FIELD_INTERVAL): 1793 1794 "Merge field dictionaries using the given indexing 'interval'." 1795 1796 readers = [] 1797 partitions = set() 1798 1799 for filename in listdir(self.pathname): 1800 if filename.startswith("fields-"): # 7 character prefix 1801 partition = filename[7:] 1802 readers.append(get_field_reader(self.pathname, partition)) 1803 partitions.add(partition) 1804 1805 # Write directly to a dictionary. 1806 1807 if len(readers) > 1: 1808 if "merged" in partitions: 1809 rename_field_files(self.pathname, "merged", "old-merged") 1810 partitions.remove("merged") 1811 partitions.add("old-merged") 1812 1813 writer = get_field_writer(self.pathname, "merged", interval) 1814 merger = FieldDictionaryMerger(writer, readers) 1815 merger.merge() 1816 merger.close() 1817 1818 # Remove old files. 1819 1820 for partition in partitions: 1821 remove_field_files(self.pathname, partition) 1822 1823 elif len(readers) == 1: 1824 partition = list(partitions)[0] 1825 if partition != "merged": 1826 rename_field_files(self.pathname, partition, "merged") 1827 1828 def close(self): 1829 if self.reader is not None: 1830 self.reader.close() 1831 self.reader = None 1832 if self.writer is not None: 1833 self.writer.close() 1834 self.writer = None 1835 1836 # vim: tabstop=4 expandtab shiftwidth=4