1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 WRITE_CACHE_SIZE = 100000 43 READ_CACHE_SIZE = 10000 44 READ_CACHE_RESIZE = 5000 45 46 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 47 FIELD_FILENAMES = "fields", "fields_index" 48 49 compressors = [("b", bz2.compress), ("z", zlib.compress)] 50 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 51 52 # Utility functions. 53 54 try: 55 from vint import vint as _vint 56 57 def vint(number): 58 59 "Write 'number' as a variable-length integer." 60 61 if number >= 0: 62 return _vint(number) 63 else: 64 raise ValueError, "Number %r is negative." % number 65 66 except ImportError: 67 68 def vint(number): 69 70 "Write 'number' as a variable-length integer." 71 72 if number >= 0: 73 74 # Special case: one byte containing a 7-bit number. 75 76 if number < 128: 77 return chr(number) 78 79 # Write the number from least to most significant digits. 80 81 bytes = [] 82 83 while number != 0: 84 lsd = number & 127 85 number = number >> 7 86 if number != 0: 87 lsd |= 128 88 bytes.append(chr(lsd)) 89 90 return "".join(bytes) 91 92 # Negative numbers are not supported. 93 94 else: 95 raise ValueError, "Number %r is negative." % number 96 97 # Foundation classes. 98 99 class File: 100 101 "A basic file abstraction." 102 103 def __init__(self, f): 104 self.f = f 105 self.reset() 106 107 def reset(self): 108 109 "To be used to reset the state of the reader or writer between records." 110 111 pass 112 113 def rewind(self): 114 self.seek(0) 115 self.reset() 116 117 def seek(self, offset): 118 119 "To be defined by readers." 120 121 pass 122 123 def flush(self): 124 125 "To be defined by writers." 126 127 pass 128 129 def close(self): 130 if self.f is not None: 131 self.flush() 132 self.f.close() 133 self.f = None 134 135 class FileWriter(File): 136 137 "Writing basic data types to files." 138 139 def __init__(self, f): 140 File.__init__(self, f) 141 self.cache = [] 142 self.cache_length = 0 143 144 def write_number(self, number): 145 146 "Write 'number' to the file using a variable length encoding." 147 148 self.write(vint(number)) 149 150 def write_string(self, s, compress=0): 151 152 """ 153 Write 's' to the file, recording its length and compressing the string 154 if 'compress' is set to a true value. 155 """ 156 157 # Convert Unicode objects to strings. 158 159 if isinstance(s, unicode): 160 s = s.encode("utf-8") 161 162 # Compress the string if requested. 163 164 if compress: 165 for flag, fn in compressors: 166 cs = fn(s) 167 168 # Take the first string shorter than the original. 169 170 if len(cs) < len(s): 171 s = cs 172 break 173 else: 174 flag = "-" 175 176 else: 177 flag = "" 178 179 # Write the length of the data before the data itself. 180 181 length = len(s) 182 self.write(flag + vint(length) + s) 183 184 # Cache-affected methods. 185 186 def write(self, s): 187 self.cache.append(s) 188 self.cache_length += len(s) 189 if self.cache_length >= WRITE_CACHE_SIZE: 190 self.flush() 191 192 def tell(self): 193 return self.f.tell() + self.cache_length 194 195 def flush(self): 196 self.f.write("".join(self.cache)) 197 self.cache = [] 198 self.cache_length = 0 199 200 class FileReader(File): 201 202 "Reading basic data types from files." 203 204 def __init__(self, f): 205 File.__init__(self, f) 206 self.reset_cache() 207 208 def reset_cache(self): 209 self.cache = "" 210 self.cache_length = 0 211 self.cache_start = 0 212 213 def read_number(self): 214 215 "Read a number from the file." 216 217 # Read each byte, adding it to the number. 218 219 shift = 0 220 number = 0 221 read = self.read 222 223 try: 224 csd = ord(read(1)) 225 while csd & 128: 226 number += ((csd & 127) << shift) 227 shift += 7 228 csd = ord(read(1)) 229 else: 230 number += (csd << shift) 231 except TypeError: 232 raise EOFError 233 234 return number 235 236 def read_string(self, decompress=0): 237 238 """ 239 Read a string from the file, decompressing the stored data if 240 'decompress' is set to a true value. 241 """ 242 243 # Decompress the data if requested. 244 245 if decompress: 246 flag = self.read(1) 247 else: 248 flag = "-" 249 250 length = self.read_number() 251 s = self.read(length) 252 253 # Perform decompression if applicable. 254 255 if flag != "-": 256 fn = decompressors[flag] 257 s = fn(s) 258 259 # Convert strings to Unicode objects. 260 261 return unicode(s, "utf-8") 262 263 # Cache-affected methods. 264 265 def read(self, n): 266 needed = n - (self.cache_length - self.cache_start) 267 268 # Read the needed number of characters, if possible. 269 270 if needed > 0: 271 s = self.f.read(max(needed, READ_CACHE_SIZE)) 272 self.cache += s 273 self.cache_length += len(s) 274 275 # Get the end of the requested block. 276 277 next_start = self.cache_start + n 278 s = self.cache[self.cache_start:next_start] 279 280 # Reposition the pointer to the cache. 281 282 self._seek_cache(len(s)) 283 return s 284 285 def tell(self): 286 return self.f.tell() - self.cache_length + self.cache_start 287 288 def seek(self, offset): 289 current = self.tell() 290 self.f.seek(offset) 291 292 # If seeking forward, attempt to navigate the cache. 293 294 if offset >= current: 295 self._seek_cache(offset - current) 296 else: 297 self.reset_cache() 298 299 def _seek_cache(self, delta): 300 next_start = self.cache_start + delta 301 302 if next_start > 0 and next_start >= len(self.cache): 303 self.reset_cache() 304 305 # If the cache is too big, resize it. 306 307 elif next_start > READ_CACHE_RESIZE: 308 self.cache = self.cache[next_start:] 309 self.cache_length = len(self.cache) 310 self.cache_start = 0 311 312 # Otherwise, just reference the next part of the cache. 313 314 else: 315 self.cache_start = next_start 316 317 class FileOpener: 318 319 "Opening files using their filenames." 320 321 def __init__(self, filename): 322 self.filename = filename 323 324 def open(self, mode): 325 return open(self.filename, mode) 326 327 def close(self): 328 pass 329 330 # Specific classes for storing term and position information. 331 332 class PositionWriter(FileWriter): 333 334 "Writing position information to files." 335 336 def reset(self): 337 self.last_docnum = 0 338 339 def write_positions(self, docnum, positions): 340 341 """ 342 Write for the document 'docnum' the given 'positions'. 343 Return the offset of the written record. 344 """ 345 346 if docnum < self.last_docnum: 347 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 348 349 # Record the offset of this record. 350 351 offset = self.tell() 352 353 # Make sure that the positions are sorted. 354 355 positions.sort() 356 357 # Write the position deltas. 358 359 output = [] 360 last = 0 361 362 for position in positions: 363 output.append(vint(position - last)) 364 last = position 365 366 # Write the document number delta. 367 # Write the number of positions. 368 # Then write the positions. 369 370 self.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 371 372 self.last_docnum = docnum 373 return offset 374 375 class PositionOpener(FileOpener): 376 377 "Reading position information from files." 378 379 def read_term_positions(self, offset, count): 380 381 """ 382 Read all positions from 'offset', seeking to that position in the file 383 before reading. The number of documents available for reading is limited 384 to 'count'. 385 """ 386 387 # Duplicate the file handle. 388 389 f = self.open("rb") 390 return PositionIterator(f, offset, count) 391 392 class PositionIndexWriter(FileWriter): 393 394 "Writing position index information to files." 395 396 def reset(self): 397 self.last_docnum = 0 398 self.last_pos_offset = 0 399 400 def write_positions(self, docnum, pos_offset, count): 401 402 """ 403 Write the given 'docnum, 'pos_offset' and document 'count' to the 404 position index file. 405 """ 406 407 # Record the offset of this record. 408 409 offset = self.tell() 410 output = [] 411 412 # Write the document number delta. 413 414 output.append(vint(docnum - self.last_docnum)) 415 self.last_docnum = docnum 416 417 # Write the position file offset delta. 418 419 output.append(vint(pos_offset - self.last_pos_offset)) 420 self.last_pos_offset = pos_offset 421 422 # Write the document count. 423 424 output.append(vint(count)) 425 426 # Actually write the data. 427 428 self.write("".join(output)) 429 430 return offset 431 432 class PositionIndexOpener(FileOpener): 433 434 "Reading position index information from files." 435 436 def read_term_positions(self, offset, doc_frequency): 437 438 """ 439 Read all positions from 'offset', seeking to that position in the file 440 before reading. The number of documents available for reading is limited 441 to 'doc_frequency'. 442 """ 443 444 # Duplicate the file handle. 445 446 f = self.open("rb") 447 return PositionIndexIterator(f, offset, doc_frequency) 448 449 # Iterators for position-related files. 450 451 class IteratorBase: 452 453 def __init__(self, count): 454 self.replenish(count) 455 456 def replenish(self, count): 457 self.count = count 458 self.read_documents = 0 459 460 def __len__(self): 461 return self.count 462 463 def sort(self): 464 pass # Stored document positions are already sorted. 465 466 def __iter__(self): 467 return self 468 469 class PositionIterator(FileReader, IteratorBase): 470 471 "Iterating over document positions." 472 473 def __init__(self, f, offset, count): 474 FileReader.__init__(self, f) 475 IteratorBase.__init__(self, count) 476 self.seek(offset) 477 478 def reset(self): 479 self.last_docnum = 0 480 481 def read_positions(self): 482 483 "Read positions, returning a document number and a list of positions." 484 485 # Read the document number delta and add it to the last number. 486 487 self.last_docnum += self.read_number() 488 489 # Read the number of positions. 490 491 npositions = self.read_number() 492 493 # Read the position deltas, adding each previous position to get the 494 # appropriate collection of absolute positions. 495 496 i = 0 497 last = 0 498 positions = [] 499 500 while i < npositions: 501 last += self.read_number() 502 positions.append(last) 503 i += 1 504 505 return self.last_docnum, positions 506 507 def next(self): 508 509 "Read positions for a single document." 510 511 if self.read_documents < self.count: 512 self.read_documents += 1 513 return self.read_positions() 514 else: 515 raise StopIteration 516 517 class PositionIndexIterator(FileReader, IteratorBase): 518 519 "Iterating over document positions." 520 521 def __init__(self, f, offset, count): 522 FileReader.__init__(self, f) 523 IteratorBase.__init__(self, count) 524 self.seek(offset) 525 self.section_count = 0 526 527 def reset(self): 528 self.last_docnum = 0 529 self.last_pos_offset = 0 530 531 def read_positions(self): 532 533 """ 534 Read a document number, a position file offset for the position index 535 file, and the number of documents in a section of that file. 536 """ 537 538 # Read the document number delta. 539 540 self.last_docnum += self.read_number() 541 542 # Read the offset delta. 543 544 self.last_pos_offset += self.read_number() 545 546 # Read the document count. 547 548 count = self.read_number() 549 550 return self.last_docnum, self.last_pos_offset, count 551 552 def next(self): 553 554 "Read positions for a single document." 555 556 self.read_documents += self.section_count 557 if self.read_documents < self.count: 558 docnum, pos_offset, self.section_count = t = self.read_positions() 559 return t 560 else: 561 raise StopIteration 562 563 class PositionDictionaryWriter: 564 565 "Writing position dictionaries." 566 567 def __init__(self, position_writer, position_index_writer, interval): 568 self.position_writer = position_writer 569 self.position_index_writer = position_index_writer 570 self.interval = interval 571 572 def write_term_positions(self, doc_positions): 573 574 """ 575 Write all 'doc_positions' - a collection of tuples of the form (document 576 number, position list) - to the file. 577 578 Add some records to the index, making dictionary entries. 579 580 Return a tuple containing the offset of the written data, the frequency 581 (number of positions), and document frequency (number of documents) for 582 the term involved. 583 """ 584 585 # Reset the writers. 586 587 self.position_writer.reset() 588 self.position_index_writer.reset() 589 590 index_offset = None 591 592 # Write the positions. 593 594 frequency = 0 595 first_docnum = None 596 first_offset = None 597 count = 0 598 599 doc_positions.sort() 600 601 for docnum, positions in doc_positions: 602 pos_offset = self.position_writer.write_positions(docnum, positions) 603 604 # Retain the first record offset for a subsequent index entry. 605 606 if first_offset is None: 607 first_offset = pos_offset 608 first_docnum = docnum 609 610 frequency += len(positions) 611 count += 1 612 613 # Every {interval} entries, write an index entry. 614 615 if count % self.interval == 0: 616 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 617 618 # Remember the first index entry offset. 619 620 if index_offset is None: 621 index_offset = io 622 623 first_offset = None 624 first_docnum = None 625 626 # Reset the position writer so that position readers accessing 627 # a section start with the correct document number. 628 629 self.position_writer.reset() 630 631 # Finish writing an index entry for the remaining documents. 632 633 else: 634 if first_offset is not None: 635 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 636 637 # Remember the first index entry offset. 638 639 if index_offset is None: 640 index_offset = io 641 642 return index_offset, frequency, count 643 644 def close(self): 645 self.position_writer.close() 646 self.position_index_writer.close() 647 648 class PositionDictionaryReader: 649 650 "Reading position dictionaries." 651 652 def __init__(self, position_opener, position_index_opener): 653 self.position_opener = position_opener 654 self.position_index_opener = position_index_opener 655 656 def read_term_positions(self, offset, doc_frequency): 657 658 """ 659 Return an iterator for dictionary entries starting at 'offset' with the 660 given 'doc_frequency'. 661 """ 662 663 return PositionDictionaryIterator(self.position_opener, 664 self.position_index_opener, offset, doc_frequency) 665 666 def close(self): 667 pass 668 669 class PositionDictionaryIterator: 670 671 "Iteration over position dictionary entries." 672 673 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 674 self.position_opener = position_opener 675 self.doc_frequency = doc_frequency 676 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 677 self.iterator = None 678 679 # Remember the last values. 680 681 self.found_docnum, self.found_positions = None, None 682 683 # Maintain state for the next index entry, if read. 684 685 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 686 687 # Initialise the current index entry and current position file iterator. 688 689 self._next_section() 690 self._init_section() 691 692 # Sequence methods. 693 694 def __len__(self): 695 return self.doc_frequency 696 697 def sort(self): 698 pass 699 700 # Iterator methods. 701 702 def __iter__(self): 703 return self 704 705 def next(self): 706 707 """ 708 Attempt to get the next document record from the section in the 709 positions file. 710 """ 711 712 # Return any visited but unrequested record. 713 714 if self.found_docnum is not None: 715 t = self.found_docnum, self.found_positions 716 self.found_docnum, self.found_positions = None, None 717 return t 718 719 # Or search for the next record. 720 721 while 1: 722 723 # Either return the next record. 724 725 try: 726 return self.iterator.next() 727 728 # Or, where a section is finished, get the next section and try again. 729 730 except StopIteration: 731 732 # Where a section follows, update the index iterator, but keep 733 # reading using the same file iterator (since the data should 734 # just follow on from the last section). 735 736 self._next_section() 737 self.iterator.replenish(self.section_count) 738 739 # Reset the state of the iterator to make sure that document 740 # numbers are correct. 741 742 self.iterator.reset() 743 744 def from_document(self, docnum): 745 746 """ 747 Attempt to navigate to a positions entry for the given 'docnum', 748 returning the positions for 'docnum', or None otherwise. 749 """ 750 751 # Return any unrequested document positions. 752 753 if docnum == self.found_docnum: 754 return self.found_positions 755 756 # Read ahead in the index until the next entry refers to a document 757 # later than the desired document. 758 759 try: 760 if self.next_docnum is None: 761 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 762 763 # Read until the next entry is after the desired document number, 764 # or until the end of the results. 765 766 while self.next_docnum <= docnum: 767 self._next_read_section() 768 if self.docnum < docnum: 769 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 770 else: 771 break 772 773 except StopIteration: 774 pass 775 776 # Navigate in the position file to the document. 777 778 self._init_section() 779 780 try: 781 while 1: 782 found_docnum, found_positions = self.iterator.next() 783 784 # Return the desired document positions or None (retaining the 785 # positions for the document immediately after). 786 787 if docnum == found_docnum: 788 return found_positions 789 elif docnum < found_docnum: 790 self.found_docnum, self.found_positions = found_docnum, found_positions 791 return None 792 793 except StopIteration: 794 return None 795 796 # Internal methods. 797 798 def _next_section(self): 799 800 "Attempt to get the next section in the index." 801 802 if self.next_docnum is None: 803 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 804 else: 805 self._next_read_section() 806 807 def _next_read_section(self): 808 809 """ 810 Make the next index entry the current one without reading from the 811 index. 812 """ 813 814 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 815 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 816 817 def _init_section(self): 818 819 "Initialise the iterator for the section in the position file." 820 821 if self.iterator is not None: 822 self.iterator.close() 823 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 824 825 def close(self): 826 if self.iterator is not None: 827 self.iterator.close() 828 self.iterator = None 829 if self.index_iterator is not None: 830 self.index_iterator.close() 831 self.index_iterator = None 832 833 class TermWriter(FileWriter): 834 835 "Writing term information to files." 836 837 def reset(self): 838 self.last_term = "" 839 self.last_offset = 0 840 841 def write_term(self, term, offset, frequency, doc_frequency): 842 843 """ 844 Write the given 'term', its position file 'offset', its 'frequency' and 845 its 'doc_frequency' (number of documents in which it appears) to the 846 term information file. Return the offset after the term information was 847 written to the file. 848 """ 849 850 # Write the prefix length and term suffix. 851 852 common = len(commonprefix([self.last_term, term])) 853 suffix = term[common:] 854 855 self.write_number(common) 856 self.write_string(suffix) 857 858 # Write the offset delta. 859 860 self.write_number(offset - self.last_offset) 861 862 # Write the frequency. 863 864 self.write_number(frequency) 865 866 # Write the document frequency. 867 868 self.write_number(doc_frequency) 869 870 self.last_term = term 871 self.last_offset = offset 872 873 return self.tell() 874 875 class TermReader(FileReader): 876 877 "Reading term information from files." 878 879 def reset(self): 880 self.last_term = "" 881 self.last_offset = 0 882 883 def read_term(self): 884 885 """ 886 Read a term, its position file offset, its frequency and its document 887 frequency from the term information file. 888 """ 889 890 # Read the prefix length and term suffix. 891 892 common = self.read_number() 893 suffix = self.read_string() 894 895 self.last_term = self.last_term[:common] + suffix 896 897 # Read the offset delta. 898 899 self.last_offset += self.read_number() 900 901 # Read the frequency. 902 903 frequency = self.read_number() 904 905 # Read the document frequency. 906 907 doc_frequency = self.read_number() 908 909 return self.last_term, self.last_offset, frequency, doc_frequency 910 911 def go_to_term(self, term, offset, info_offset): 912 913 """ 914 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 915 permits the scanning for later terms from the specified term. 916 """ 917 918 self.seek(info_offset) 919 self.last_term = term 920 self.last_offset = offset 921 922 class TermIndexWriter(TermWriter): 923 924 "Writing term dictionary index details to files." 925 926 def reset(self): 927 TermWriter.reset(self) 928 self.last_info_offset = 0 929 930 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 931 932 """ 933 Write the given 'term', its position file 'offset', its 'frequency' and 934 its 'doc_frequency' to the term dictionary index file, along with the 935 'info_offset' in the term information file. 936 """ 937 938 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 939 940 # Write the information file offset delta. 941 942 self.write_number(info_offset - self.last_info_offset) 943 self.last_info_offset = info_offset 944 945 class TermIndexReader(TermReader): 946 947 "Reading term dictionary index details from files." 948 949 def reset(self): 950 TermReader.reset(self) 951 self.last_info_offset = 0 952 953 def read_term(self): 954 955 """ 956 Read a term, its position file offset, its frequency, its document 957 frequency and a term information file offset from the term dictionary 958 index file. 959 """ 960 961 term, offset, frequency, doc_frequency = TermReader.read_term(self) 962 963 # Read the offset delta. 964 965 self.last_info_offset += self.read_number() 966 967 return term, offset, frequency, doc_frequency, self.last_info_offset 968 969 class TermDictionaryWriter: 970 971 "Writing term dictionaries." 972 973 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 974 self.info_writer = info_writer 975 self.index_writer = index_writer 976 self.position_dict_writer = position_dict_writer 977 self.interval = interval 978 self.entry = 0 979 980 def _write_term(self, term, offset, frequency, doc_frequency): 981 982 """ 983 Write the given 'term', its position file 'offset', its 'frequency' and 984 its 'doc_frequency' (number of documents in which it appears) to the 985 term information file. Return the offset after the term information was 986 written to the file. 987 """ 988 989 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 990 991 if self.entry % self.interval == 0: 992 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 993 994 self.entry += 1 995 996 def write_term_positions(self, term, doc_positions): 997 998 """ 999 Write the given 'term' and the 'doc_positions' recording the documents 1000 and positions at which the term is found. 1001 """ 1002 1003 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 1004 self._write_term(term, offset, frequency, doc_frequency) 1005 1006 def close(self): 1007 self.info_writer.close() 1008 self.index_writer.close() 1009 self.position_dict_writer.close() 1010 1011 class TermDictionaryReader: 1012 1013 "Reading term dictionaries." 1014 1015 def __init__(self, info_reader, index_reader, position_dict_reader): 1016 self.info_reader = info_reader 1017 self.index_reader = index_reader 1018 self.position_dict_reader = position_dict_reader 1019 1020 self.terms = [] 1021 try: 1022 while 1: 1023 self.terms.append(self.index_reader.read_term()) 1024 except EOFError: 1025 pass 1026 1027 # Large numbers for ordering purposes. 1028 1029 if self.terms: 1030 self.max_offset = self.terms[-1][1] + 1 1031 else: 1032 self.max_offset = None 1033 1034 def _find_closest_entry(self, term): 1035 1036 """ 1037 Find the offsets and frequencies of 'term' from the term dictionary or 1038 the closest term starting with the value of 'term'. 1039 1040 Return the closest index entry consisting of a term, the position file 1041 offset, the term frequency, the document frequency, and the term details 1042 file offset. 1043 """ 1044 1045 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 1046 1047 # Get the entry position providing the term or one preceding it. 1048 # If no entry precedes the requested term, return the very first entry 1049 # as the closest. 1050 1051 if i == -1: 1052 return self.terms[0] 1053 else: 1054 return self.terms[i] 1055 1056 def _find_closest_term(self, term): 1057 1058 """ 1059 Find the offsets and frequencies of 'term' from the term dictionary or 1060 the closest term starting with the value of 'term'. 1061 1062 Return the closest term (or the term itself), the position file offset, 1063 the term frequency, the document frequency, and the term details file 1064 offset (or None if the reader is already positioned). 1065 """ 1066 1067 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 1068 1069 # Where the term is found immediately, return the offset and 1070 # frequencies. If the term does not appear, return the details of the 1071 # closest entry. 1072 1073 if term <= found_term: 1074 return found_term, offset, frequency, doc_frequency, info_offset 1075 1076 # Otherwise, seek past the index term's entry in the information file 1077 # and scan for the desired term. 1078 1079 else: 1080 self.info_reader.go_to_term(found_term, offset, info_offset) 1081 try: 1082 while term > found_term: 1083 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1084 except EOFError: 1085 pass 1086 1087 return found_term, offset, frequency, doc_frequency, None 1088 1089 def _find_term(self, term): 1090 1091 """ 1092 Find the position file offset and frequency of 'term' from the term 1093 dictionary. 1094 """ 1095 1096 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1097 1098 # If the term is found, return the offset and frequencies. 1099 1100 if term == found_term: 1101 return offset, frequency, doc_frequency 1102 else: 1103 return None 1104 1105 def _get_positions(self, offset, doc_frequency): 1106 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 1107 1108 # Iterator convenience methods. 1109 1110 def __iter__(self): 1111 self.rewind() 1112 return self 1113 1114 def next(self): 1115 try: 1116 return self.read_term() 1117 except EOFError: 1118 raise StopIteration 1119 1120 # Sequential access methods. 1121 1122 def rewind(self): 1123 self.info_reader.rewind() 1124 1125 def read_term(self): 1126 1127 """ 1128 Return the next term, its frequency, its document frequency, and the 1129 documents and positions at which the term is found. 1130 """ 1131 1132 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1133 positions = self._get_positions(offset, doc_frequency) 1134 return term, frequency, doc_frequency, positions 1135 1136 # Query methods. 1137 1138 def find_terms(self, term): 1139 1140 "Return all terms whose values start with the value of 'term'." 1141 1142 terms = [] 1143 1144 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1145 1146 # Position the reader, if necessary. 1147 1148 if info_offset is not None: 1149 self.info_reader.go_to_term(found_term, offset, info_offset) 1150 1151 # Read and record terms. 1152 1153 try: 1154 # Add the found term if it starts with the specified term. 1155 1156 while found_term.startswith(term): 1157 terms.append(found_term) 1158 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1159 1160 except EOFError: 1161 pass 1162 1163 return terms 1164 1165 def find_positions(self, term): 1166 1167 "Return the documents and positions at which the given 'term' is found." 1168 1169 t = self._find_term(term) 1170 if t is None: 1171 return None 1172 else: 1173 offset, frequency, doc_frequency = t 1174 return self._get_positions(offset, doc_frequency) 1175 1176 def get_frequency(self, term): 1177 1178 "Return the frequency of the given 'term'." 1179 1180 t = self._find_term(term) 1181 if t is None: 1182 return None 1183 else: 1184 offset, frequency, doc_frequency = t 1185 return frequency 1186 1187 def get_document_frequency(self, term): 1188 1189 "Return the document frequency of the given 'term'." 1190 1191 t = self._find_term(term) 1192 if t is None: 1193 return None 1194 else: 1195 offset, frequency, doc_frequency = t 1196 return doc_frequency 1197 1198 def close(self): 1199 self.info_reader.close() 1200 self.index_reader.close() 1201 self.position_dict_reader.close() 1202 1203 # Specific classes for storing document information. 1204 1205 class FieldWriter(FileWriter): 1206 1207 "Writing field data to files." 1208 1209 def reset(self): 1210 self.last_docnum = 0 1211 1212 def write_fields(self, docnum, fields): 1213 1214 """ 1215 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1216 representing field identifiers and values respectively). 1217 Return the offset at which the fields are stored. 1218 """ 1219 1220 offset = self.tell() 1221 1222 # Write the document number delta. 1223 1224 self.write_number(docnum - self.last_docnum) 1225 1226 # Write the number of fields. 1227 1228 self.write_number(len(fields)) 1229 1230 # Write the fields themselves. 1231 1232 for i, field in fields: 1233 self.write_number(i) 1234 self.write_string(field, 1) # compress 1235 1236 self.last_docnum = docnum 1237 return offset 1238 1239 class FieldReader(FileReader): 1240 1241 "Reading field data from files." 1242 1243 def reset(self): 1244 self.last_docnum = 0 1245 1246 def read_fields(self): 1247 1248 """ 1249 Read fields from the file, returning a tuple containing the document 1250 number and a list of field (identifier, value) pairs. 1251 """ 1252 1253 # Read the document number. 1254 1255 self.last_docnum += self.read_number() 1256 1257 # Read the number of fields. 1258 1259 nfields = self.read_number() 1260 1261 # Collect the fields. 1262 1263 fields = [] 1264 i = 0 1265 1266 while i < nfields: 1267 identifier = self.read_number() 1268 value = self.read_string(1) # decompress 1269 fields.append((identifier, value)) 1270 i += 1 1271 1272 return self.last_docnum, fields 1273 1274 def read_document_fields(self, docnum, offset): 1275 1276 """ 1277 Read fields for 'docnum' at the given 'offset'. This permits the 1278 retrieval of details for the specified document, as well as scanning for 1279 later documents. 1280 """ 1281 1282 self.seek(offset) 1283 bad_docnum, fields = self.read_fields() 1284 self.last_docnum = docnum 1285 return docnum, fields 1286 1287 class FieldIndexWriter(FileWriter): 1288 1289 "Writing field index details to files." 1290 1291 def reset(self): 1292 self.last_docnum = 0 1293 self.last_offset = 0 1294 1295 def write_document(self, docnum, offset): 1296 1297 """ 1298 Write for the given 'docnum', the 'offset' at which the fields for the 1299 document are stored in the fields file. 1300 """ 1301 1302 # Write the document number and offset deltas. 1303 1304 self.write_number(docnum - self.last_docnum) 1305 self.write_number(offset - self.last_offset) 1306 1307 self.last_docnum = docnum 1308 self.last_offset = offset 1309 1310 class FieldIndexReader(FileReader): 1311 1312 "Reading field index details from files." 1313 1314 def reset(self): 1315 self.last_docnum = 0 1316 self.last_offset = 0 1317 1318 def read_document(self): 1319 1320 "Read a document number and field file offset." 1321 1322 # Read the document number delta and offset. 1323 1324 self.last_docnum += self.read_number() 1325 self.last_offset += self.read_number() 1326 1327 return self.last_docnum, self.last_offset 1328 1329 class FieldDictionaryWriter: 1330 1331 "Writing field dictionary details." 1332 1333 def __init__(self, field_writer, field_index_writer, interval): 1334 self.field_writer = field_writer 1335 self.field_index_writer = field_index_writer 1336 self.interval = interval 1337 self.entry = 0 1338 1339 def write_fields(self, docnum, fields): 1340 1341 "Write details of the document with the given 'docnum' and 'fields'." 1342 1343 offset = self.field_writer.write_fields(docnum, fields) 1344 1345 if self.entry % self.interval == 0: 1346 self.field_index_writer.write_document(docnum, offset) 1347 1348 self.entry += 1 1349 1350 def close(self): 1351 self.field_writer.close() 1352 self.field_index_writer.close() 1353 1354 class FieldDictionaryReader: 1355 1356 "Reading field dictionary details." 1357 1358 def __init__(self, field_reader, field_index_reader): 1359 self.field_reader = field_reader 1360 self.field_index_reader = field_index_reader 1361 1362 self.docs = [] 1363 try: 1364 while 1: 1365 self.docs.append(self.field_index_reader.read_document()) 1366 except EOFError: 1367 pass 1368 1369 # Large numbers for ordering purposes. 1370 1371 if self.docs: 1372 self.max_offset = self.docs[-1][1] 1373 else: 1374 self.max_offset = None 1375 1376 # Iterator convenience methods. 1377 1378 def __iter__(self): 1379 self.rewind() 1380 return self 1381 1382 def next(self): 1383 try: 1384 return self.read_fields() 1385 except EOFError: 1386 raise StopIteration 1387 1388 # Sequential access methods. 1389 1390 def rewind(self): 1391 self.field_reader.rewind() 1392 1393 def read_fields(self): 1394 1395 "Return the next document number and fields." 1396 1397 return self.field_reader.read_fields() 1398 1399 # Random access methods. 1400 1401 def get_fields(self, docnum): 1402 1403 "Read the fields of the document with the given 'docnum'." 1404 1405 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1406 1407 # Get the entry position providing the term or one preceding it. 1408 1409 if i == -1: 1410 return None 1411 1412 found_docnum, offset = self.docs[i] 1413 1414 # Read from the fields file. 1415 1416 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1417 1418 # Scan for the document, if necessary. 1419 1420 try: 1421 while docnum > found_docnum: 1422 found_docnum, fields = self.field_reader.read_fields() 1423 except EOFError: 1424 pass 1425 1426 # If the document is found, return the fields. 1427 1428 if docnum == found_docnum: 1429 return fields 1430 else: 1431 return None 1432 1433 def close(self): 1434 self.field_reader.close() 1435 self.field_index_reader.close() 1436 1437 # Dictionary merging classes. 1438 1439 class Merger: 1440 1441 "Merge files." 1442 1443 def __init__(self, writer, readers): 1444 self.writer = writer 1445 self.readers = readers 1446 1447 def close(self): 1448 for reader in self.readers: 1449 reader.close() 1450 self.writer.close() 1451 1452 class TermDictionaryMerger(Merger): 1453 1454 "Merge term and position files." 1455 1456 def merge(self): 1457 1458 """ 1459 Merge terms and positions from the readers, sending them to the writer. 1460 """ 1461 1462 last_term = None 1463 current_readers = [] 1464 1465 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1466 if term == last_term: 1467 current_readers.append(positions) 1468 else: 1469 if current_readers: 1470 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1471 last_term = term 1472 current_readers = [positions] 1473 else: 1474 if current_readers: 1475 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1476 1477 class FieldDictionaryMerger(Merger): 1478 1479 "Merge field files." 1480 1481 def merge(self): 1482 1483 """ 1484 Merge fields from the readers, sending them to the writer. 1485 """ 1486 1487 for docnum, fields in itermerge(self.readers): 1488 self.writer.write_fields(docnum, fields) 1489 1490 # Utility functions. 1491 1492 def get_term_writer(pathname, partition, interval, doc_interval): 1493 1494 """ 1495 Return a term dictionary writer using files under the given 'pathname' 1496 labelled according to the given 'partition', using the given indexing 1497 'interval' for terms and 'doc_interval' for document position records. 1498 """ 1499 1500 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1501 info_writer = TermWriter(tdf) 1502 1503 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1504 index_writer = TermIndexWriter(tdif) 1505 1506 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1507 positions_writer = PositionWriter(tpf) 1508 1509 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1510 positions_index_writer = PositionIndexWriter(tpif) 1511 1512 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1513 1514 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1515 1516 def get_field_writer(pathname, partition, interval): 1517 1518 """ 1519 Return a field dictionary writer using files under the given 'pathname' 1520 labelled according to the given 'partition', using the given indexing 1521 'interval'. 1522 """ 1523 1524 ff = open(join(pathname, "fields-%s" % partition), "wb") 1525 field_writer = FieldWriter(ff) 1526 1527 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1528 field_index_writer = FieldIndexWriter(fif) 1529 1530 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1531 1532 def get_term_reader(pathname, partition): 1533 1534 """ 1535 Return a term dictionary reader using files under the given 'pathname' 1536 labelled according to the given 'partition'. 1537 """ 1538 1539 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1540 info_reader = TermReader(tdf) 1541 1542 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1543 index_reader = TermIndexReader(tdif) 1544 1545 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1546 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1547 1548 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1549 1550 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1551 1552 def get_field_reader(pathname, partition): 1553 1554 """ 1555 Return a field dictionary reader using files under the given 'pathname' 1556 labelled according to the given 'partition'. 1557 """ 1558 1559 ff = open(join(pathname, "fields-%s" % partition), "rb") 1560 field_reader = FieldReader(ff) 1561 1562 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1563 field_index_reader = FieldIndexReader(fif) 1564 1565 return FieldDictionaryReader(field_reader, field_index_reader) 1566 1567 def rename_files(pathname, names, from_partition, to_partition): 1568 for name in names: 1569 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1570 1571 def rename_term_files(pathname, from_partition, to_partition): 1572 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1573 1574 def rename_field_files(pathname, from_partition, to_partition): 1575 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1576 1577 def remove_files(pathname, names, partition): 1578 for name in names: 1579 remove(join(pathname, "%s-%s" % (name, partition))) 1580 1581 def remove_term_files(pathname, partition): 1582 remove_files(pathname, TERM_FILENAMES, partition) 1583 1584 def remove_field_files(pathname, partition): 1585 remove_files(pathname, FIELD_FILENAMES, partition) 1586 1587 # High-level classes. 1588 1589 class Document: 1590 1591 "A container of document information." 1592 1593 def __init__(self, docnum): 1594 self.docnum = docnum 1595 self.fields = [] 1596 self.terms = {} 1597 1598 def add_position(self, term, position): 1599 1600 """ 1601 Add a position entry for the given 'term', indicating the given 1602 'position'. 1603 """ 1604 1605 self.terms.setdefault(term, []).append(position) 1606 1607 def add_field(self, identifier, value): 1608 1609 "Add a field having the given 'identifier' and 'value'." 1610 1611 self.fields.append((identifier, unicode(value))) # convert to string 1612 1613 def set_fields(self, fields): 1614 1615 """ 1616 Set the document's 'fields': a list of tuples each containing an integer 1617 identifier and a string value. 1618 """ 1619 1620 self.fields = fields 1621 1622 class IndexWriter: 1623 1624 """ 1625 Building term information and writing it to the term and field dictionaries. 1626 """ 1627 1628 def __init__(self, pathname, interval, doc_interval, flush_interval): 1629 self.pathname = pathname 1630 self.interval = interval 1631 self.doc_interval = doc_interval 1632 self.flush_interval = flush_interval 1633 1634 self.dict_partition = 0 1635 self.field_dict_partition = 0 1636 1637 self.terms = {} 1638 self.docs = {} 1639 1640 self.doc_counter = 0 1641 1642 def add_document(self, doc): 1643 1644 """ 1645 Add the given document 'doc', updating the document counter and flushing 1646 terms and fields if appropriate. 1647 """ 1648 1649 for term, positions in doc.terms.items(): 1650 self.terms.setdefault(term, {})[doc.docnum] = positions 1651 1652 self.docs[doc.docnum] = doc.fields 1653 1654 self.doc_counter += 1 1655 if self.flush_interval and self.doc_counter >= self.flush_interval: 1656 self.flush_terms() 1657 self.flush_fields() 1658 self.doc_counter = 0 1659 1660 def get_term_writer(self): 1661 1662 "Return a term dictionary writer for the current partition." 1663 1664 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1665 1666 def get_field_writer(self): 1667 1668 "Return a field dictionary writer for the current partition." 1669 1670 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1671 1672 def flush_terms(self): 1673 1674 "Flush terms into the current term dictionary partition." 1675 1676 # Get the terms in order. 1677 1678 all_terms = self.terms 1679 terms = all_terms.keys() 1680 terms.sort() 1681 1682 dict_writer = self.get_term_writer() 1683 1684 for term in terms: 1685 doc_positions = all_terms[term].items() 1686 dict_writer.write_term_positions(term, doc_positions) 1687 1688 dict_writer.close() 1689 1690 self.terms = {} 1691 self.dict_partition += 1 1692 1693 def flush_fields(self): 1694 1695 "Flush fields into the current term dictionary partition." 1696 1697 # Get the documents in order. 1698 1699 docs = self.docs.items() 1700 docs.sort() 1701 1702 field_dict_writer = self.get_field_writer() 1703 1704 for docnum, fields in docs: 1705 field_dict_writer.write_fields(docnum, fields) 1706 1707 field_dict_writer.close() 1708 1709 self.docs = {} 1710 self.field_dict_partition += 1 1711 1712 def close(self): 1713 if self.terms: 1714 self.flush_terms() 1715 if self.docs: 1716 self.flush_fields() 1717 1718 class IndexReader: 1719 1720 "Accessing the term and field dictionaries." 1721 1722 def __init__(self, pathname): 1723 self.dict_reader = get_term_reader(pathname, "merged") 1724 self.field_dict_reader = get_field_reader(pathname, "merged") 1725 1726 def find_terms(self, term): 1727 return self.dict_reader.find_terms(term) 1728 1729 def find_positions(self, term): 1730 return self.dict_reader.find_positions(term) 1731 1732 def get_frequency(self, term): 1733 return self.dict_reader.get_frequency(term) 1734 1735 def get_document_frequency(self, term): 1736 return self.dict_reader.get_document_frequency(term) 1737 1738 def get_fields(self, docnum): 1739 return self.field_dict_reader.get_fields(docnum) 1740 1741 def close(self): 1742 self.dict_reader.close() 1743 self.field_dict_reader.close() 1744 1745 class Index: 1746 1747 "An inverted index solution encapsulating the various components." 1748 1749 def __init__(self, pathname): 1750 self.pathname = pathname 1751 self.reader = None 1752 self.writer = None 1753 1754 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1755 1756 """ 1757 Return a writer, optionally using the given indexing 'interval', 1758 'doc_interval' and 'flush_interval'. 1759 """ 1760 1761 if not exists(self.pathname): 1762 mkdir(self.pathname) 1763 1764 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1765 return self.writer 1766 1767 def get_reader(self, partition=0): 1768 1769 "Return a reader for the index." 1770 1771 # Ensure that only one partition exists. 1772 1773 self.merge() 1774 return self._get_reader(partition) 1775 1776 def _get_reader(self, partition): 1777 1778 "Return a reader for the index." 1779 1780 if not exists(self.pathname): 1781 raise OSError, "Index path %r does not exist." % self.pathname 1782 1783 self.reader = IndexReader(self.pathname) 1784 return self.reader 1785 1786 def merge(self): 1787 1788 "Merge/optimise index partitions." 1789 1790 self.merge_terms() 1791 self.merge_fields() 1792 1793 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1794 1795 """ 1796 Merge term dictionaries using the given indexing 'interval' and 1797 'doc_interval'. 1798 """ 1799 1800 readers = [] 1801 partitions = set() 1802 1803 for filename in listdir(self.pathname): 1804 if filename.startswith("terms-"): # 6 character prefix 1805 partition = filename[6:] 1806 readers.append(get_term_reader(self.pathname, partition)) 1807 partitions.add(partition) 1808 1809 # Write directly to a dictionary. 1810 1811 if len(readers) > 1: 1812 if "merged" in partitions: 1813 rename_term_files(self.pathname, "merged", "old-merged") 1814 partitions.remove("merged") 1815 partitions.add("old-merged") 1816 1817 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1818 merger = TermDictionaryMerger(writer, readers) 1819 merger.merge() 1820 merger.close() 1821 1822 # Remove old files. 1823 1824 for partition in partitions: 1825 remove_term_files(self.pathname, partition) 1826 1827 elif len(readers) == 1: 1828 partition = list(partitions)[0] 1829 if partition != "merged": 1830 rename_term_files(self.pathname, partition, "merged") 1831 1832 def merge_fields(self, interval=FIELD_INTERVAL): 1833 1834 "Merge field dictionaries using the given indexing 'interval'." 1835 1836 readers = [] 1837 partitions = set() 1838 1839 for filename in listdir(self.pathname): 1840 if filename.startswith("fields-"): # 7 character prefix 1841 partition = filename[7:] 1842 readers.append(get_field_reader(self.pathname, partition)) 1843 partitions.add(partition) 1844 1845 # Write directly to a dictionary. 1846 1847 if len(readers) > 1: 1848 if "merged" in partitions: 1849 rename_field_files(self.pathname, "merged", "old-merged") 1850 partitions.remove("merged") 1851 partitions.add("old-merged") 1852 1853 writer = get_field_writer(self.pathname, "merged", interval) 1854 merger = FieldDictionaryMerger(writer, readers) 1855 merger.merge() 1856 merger.close() 1857 1858 # Remove old files. 1859 1860 for partition in partitions: 1861 remove_field_files(self.pathname, partition) 1862 1863 elif len(readers) == 1: 1864 partition = list(partitions)[0] 1865 if partition != "merged": 1866 rename_field_files(self.pathname, partition, "merged") 1867 1868 def close(self): 1869 if self.reader is not None: 1870 self.reader.close() 1871 self.reader = None 1872 if self.writer is not None: 1873 self.writer.close() 1874 self.writer = None 1875 1876 # vim: tabstop=4 expandtab shiftwidth=4