1 #!/usr/bin/env python 2 3 """ 4 A simple (and sane) text indexing library. 5 6 Copyright (C) 2009 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from os import dup, fdopen # independent iterator access to files 22 from os import listdir, mkdir # index and partition discovery 23 from os import remove, rename # partition manipulation 24 from os.path import exists, join 25 from os.path import commonprefix # to find common string prefixes 26 from bisect import bisect_right # to find terms in the dictionary index 27 import bz2, zlib # for field compression 28 from itermerge import itermerge 29 30 try: 31 set 32 except NameError: 33 from sets import Set as set 34 35 # Constants. 36 37 TERM_INTERVAL = 100 38 DOCUMENT_INTERVAL = 100 39 FIELD_INTERVAL = 100 40 FLUSH_INTERVAL = 10000 41 42 TERM_FILENAMES = "terms", "terms_index", "positions", "positions_index" 43 FIELD_FILENAMES = "fields", "fields_index" 44 45 compressors = [("b", bz2.compress), ("z", zlib.compress)] 46 decompressors = {"b" : bz2.decompress, "z" : zlib.decompress} 47 48 # Utility functions. 49 50 try: 51 from vint import vint as _vint 52 53 def vint(number): 54 55 "Write 'number' as a variable-length integer." 56 57 if number >= 0: 58 return _vint(number) 59 else: 60 raise ValueError, "Number %r is negative." % number 61 62 except ImportError: 63 64 def vint(number): 65 66 "Write 'number' as a variable-length integer." 67 68 if number >= 0: 69 70 # Special case: one byte containing a 7-bit number. 71 72 if number < 128: 73 return chr(number) 74 75 # Write the number from least to most significant digits. 76 77 bytes = [] 78 79 while number != 0: 80 lsd = number & 127 81 number = number >> 7 82 if number != 0: 83 lsd |= 128 84 bytes.append(chr(lsd)) 85 86 return "".join(bytes) 87 88 # Negative numbers are not supported. 89 90 else: 91 raise ValueError, "Number %r is negative." % number 92 93 # Foundation classes. 94 95 class File: 96 97 "A basic file abstraction." 98 99 def __init__(self, f): 100 self.f = f 101 self.reset() 102 103 def reset(self): 104 105 "To be used to reset the state of the reader or writer between records." 106 107 pass 108 109 def rewind(self): 110 self.seek(0) 111 self.reset() 112 113 def seek(self, offset): 114 115 "To be defined by readers." 116 117 pass 118 119 def flush(self): 120 121 "To be defined by writers." 122 123 pass 124 125 def close(self): 126 if self.f is not None: 127 self.flush() 128 self.f.close() 129 self.f = None 130 131 class FileWriter(File): 132 133 "Writing basic data types to files." 134 135 def __init__(self, f): 136 File.__init__(self, f) 137 self.cache = [] 138 self.cache_length = 0 139 140 def write_number(self, number): 141 142 "Write 'number' to the file using a variable length encoding." 143 144 self.write(vint(number)) 145 146 def write_string(self, s, compress=0): 147 148 """ 149 Write 's' to the file, recording its length and compressing the string 150 if 'compress' is set to a true value. 151 """ 152 153 # Convert Unicode objects to strings. 154 155 if isinstance(s, unicode): 156 s = s.encode("utf-8") 157 158 # Compress the string if requested. 159 160 if compress: 161 for flag, fn in compressors: 162 cs = fn(s) 163 164 # Take the first string shorter than the original. 165 166 if len(cs) < len(s): 167 s = cs 168 break 169 else: 170 flag = "-" 171 172 else: 173 flag = "" 174 175 # Write the length of the data before the data itself. 176 177 length = len(s) 178 self.write(flag + vint(length) + s) 179 180 # Cache-affected methods. 181 182 def write(self, s): 183 self.cache.append(s) 184 self.cache_length += len(s) 185 if self.cache_length >= 1000: 186 self.flush() 187 188 def tell(self): 189 return self.f.tell() + self.cache_length 190 191 def flush(self): 192 self.f.write("".join(self.cache)) 193 self.cache = [] 194 self.cache_length = 0 195 196 class FileReader(File): 197 198 "Reading basic data types from files." 199 200 def __init__(self, f): 201 File.__init__(self, f) 202 self.reset_cache() 203 204 def reset_cache(self): 205 self.cache = "" 206 self.cache_length = 0 207 self.cache_start = 0 208 209 def read_number(self): 210 211 "Read a number from the file." 212 213 # Read each byte, adding it to the number. 214 215 shift = 0 216 number = 0 217 read = self.read 218 219 try: 220 csd = ord(read(1)) 221 while csd & 128: 222 number += ((csd & 127) << shift) 223 shift += 7 224 csd = ord(read(1)) 225 else: 226 number += (csd << shift) 227 except TypeError: 228 raise EOFError 229 230 return number 231 232 def read_string(self, decompress=0): 233 234 """ 235 Read a string from the file, decompressing the stored data if 236 'decompress' is set to a true value. 237 """ 238 239 # Decompress the data if requested. 240 241 if decompress: 242 flag = self.read(1) 243 else: 244 flag = "-" 245 246 length = self.read_number() 247 s = self.read(length) 248 249 # Perform decompression if applicable. 250 251 if flag != "-": 252 fn = decompressors[flag] 253 s = fn(s) 254 255 # Convert strings to Unicode objects. 256 257 return unicode(s, "utf-8") 258 259 # Cache-affected methods. 260 261 def read(self, n): 262 needed = n - (self.cache_length - self.cache_start) 263 264 # Read the needed number of characters, if possible. 265 266 if needed > 0: 267 s = self.f.read(max(needed, 1000)) 268 self.cache += s 269 self.cache_length += len(s) 270 271 # Get the end of the requested block. 272 273 next_start = self.cache_start + n 274 s = self.cache[self.cache_start:next_start] 275 276 # Reposition the pointer to the cache. 277 278 self._seek_cache(len(s)) 279 return s 280 281 def tell(self): 282 return self.f.tell() - self.cache_length + self.cache_start 283 284 def seek(self, offset): 285 current = self.tell() 286 self.f.seek(offset) 287 288 # If seeking forward, attempt to navigate the cache. 289 290 if offset >= current: 291 self._seek_cache(offset - current) 292 else: 293 self.reset_cache() 294 295 def _seek_cache(self, delta): 296 next_start = self.cache_start + delta 297 298 if next_start >= len(self.cache): 299 self.reset_cache() 300 301 # If the cache is too big, resize it. 302 303 elif next_start > 1000: 304 self.cache = self.cache[next_start:] 305 self.cache_length = len(self.cache) 306 self.cache_start = 0 307 308 # Otherwise, just reference the next part of the cache. 309 310 else: 311 self.cache_start = next_start 312 313 class FileOpener: 314 315 "Opening files using their filenames." 316 317 def __init__(self, filename): 318 self.filename = filename 319 320 def open(self, mode): 321 return open(self.filename, mode) 322 323 def close(self): 324 pass 325 326 # Specific classes for storing term and position information. 327 328 class PositionWriter(FileWriter): 329 330 "Writing position information to files." 331 332 def reset(self): 333 self.last_docnum = 0 334 335 def write_positions(self, docnum, positions): 336 337 """ 338 Write for the document 'docnum' the given 'positions'. 339 Return the offset of the written record. 340 """ 341 342 if docnum < self.last_docnum: 343 raise ValueError, "Document number %r is less than previous number %r." % (docnum, self.last_docnum) 344 345 # Record the offset of this record. 346 347 offset = self.tell() 348 349 # Make sure that the positions are sorted. 350 351 positions.sort() 352 353 # Write the position deltas. 354 355 output = [] 356 last = 0 357 358 for position in positions: 359 output.append(vint(position - last)) 360 last = position 361 362 # Write the document number delta. 363 # Write the number of positions. 364 # Then write the positions. 365 366 self.write(vint(docnum - self.last_docnum) + vint(len(positions)) + "".join(output)) 367 368 self.last_docnum = docnum 369 return offset 370 371 class PositionOpener(FileOpener): 372 373 "Reading position information from files." 374 375 def read_term_positions(self, offset, count): 376 377 """ 378 Read all positions from 'offset', seeking to that position in the file 379 before reading. The number of documents available for reading is limited 380 to 'count'. 381 """ 382 383 # Duplicate the file handle. 384 385 f = self.open("rb") 386 return PositionIterator(f, offset, count) 387 388 class PositionIndexWriter(FileWriter): 389 390 "Writing position index information to files." 391 392 def reset(self): 393 self.last_docnum = 0 394 self.last_pos_offset = 0 395 396 def write_positions(self, docnum, pos_offset, count): 397 398 """ 399 Write the given 'docnum, 'pos_offset' and document 'count' to the 400 position index file. 401 """ 402 403 # Record the offset of this record. 404 405 offset = self.tell() 406 output = [] 407 408 # Write the document number delta. 409 410 output.append(vint(docnum - self.last_docnum)) 411 self.last_docnum = docnum 412 413 # Write the position file offset delta. 414 415 output.append(vint(pos_offset - self.last_pos_offset)) 416 self.last_pos_offset = pos_offset 417 418 # Write the document count. 419 420 output.append(vint(count)) 421 422 # Actually write the data. 423 424 self.write("".join(output)) 425 426 return offset 427 428 class PositionIndexOpener(FileOpener): 429 430 "Reading position index information from files." 431 432 def read_term_positions(self, offset, doc_frequency): 433 434 """ 435 Read all positions from 'offset', seeking to that position in the file 436 before reading. The number of documents available for reading is limited 437 to 'doc_frequency'. 438 """ 439 440 # Duplicate the file handle. 441 442 f = self.open("rb") 443 return PositionIndexIterator(f, offset, doc_frequency) 444 445 # Iterators for position-related files. 446 447 class IteratorBase: 448 449 def __init__(self, count): 450 self.replenish(count) 451 452 def replenish(self, count): 453 self.count = count 454 self.read_documents = 0 455 456 def __len__(self): 457 return self.count 458 459 def sort(self): 460 pass # Stored document positions are already sorted. 461 462 def __iter__(self): 463 return self 464 465 class PositionIterator(FileReader, IteratorBase): 466 467 "Iterating over document positions." 468 469 def __init__(self, f, offset, count): 470 FileReader.__init__(self, f) 471 IteratorBase.__init__(self, count) 472 self.seek(offset) 473 474 def reset(self): 475 self.last_docnum = 0 476 477 def read_positions(self): 478 479 "Read positions, returning a document number and a list of positions." 480 481 # Read the document number delta and add it to the last number. 482 483 self.last_docnum += self.read_number() 484 485 # Read the number of positions. 486 487 npositions = self.read_number() 488 489 # Read the position deltas, adding each previous position to get the 490 # appropriate collection of absolute positions. 491 492 i = 0 493 last = 0 494 positions = [] 495 496 while i < npositions: 497 last += self.read_number() 498 positions.append(last) 499 i += 1 500 501 return self.last_docnum, positions 502 503 def next(self): 504 505 "Read positions for a single document." 506 507 if self.read_documents < self.count: 508 self.read_documents += 1 509 return self.read_positions() 510 else: 511 raise StopIteration 512 513 class PositionIndexIterator(FileReader, IteratorBase): 514 515 "Iterating over document positions." 516 517 def __init__(self, f, offset, count): 518 FileReader.__init__(self, f) 519 IteratorBase.__init__(self, count) 520 self.seek(offset) 521 self.section_count = 0 522 523 def reset(self): 524 self.last_docnum = 0 525 self.last_pos_offset = 0 526 527 def read_positions(self): 528 529 """ 530 Read a document number, a position file offset for the position index 531 file, and the number of documents in a section of that file. 532 """ 533 534 # Read the document number delta. 535 536 self.last_docnum += self.read_number() 537 538 # Read the offset delta. 539 540 self.last_pos_offset += self.read_number() 541 542 # Read the document count. 543 544 count = self.read_number() 545 546 return self.last_docnum, self.last_pos_offset, count 547 548 def next(self): 549 550 "Read positions for a single document." 551 552 self.read_documents += self.section_count 553 if self.read_documents < self.count: 554 docnum, pos_offset, self.section_count = t = self.read_positions() 555 return t 556 else: 557 raise StopIteration 558 559 class PositionDictionaryWriter: 560 561 "Writing position dictionaries." 562 563 def __init__(self, position_writer, position_index_writer, interval): 564 self.position_writer = position_writer 565 self.position_index_writer = position_index_writer 566 self.interval = interval 567 568 def write_term_positions(self, doc_positions): 569 570 """ 571 Write all 'doc_positions' - a collection of tuples of the form (document 572 number, position list) - to the file. 573 574 Add some records to the index, making dictionary entries. 575 576 Return a tuple containing the offset of the written data, the frequency 577 (number of positions), and document frequency (number of documents) for 578 the term involved. 579 """ 580 581 # Reset the writers. 582 583 self.position_writer.reset() 584 self.position_index_writer.reset() 585 586 index_offset = None 587 588 # Write the positions. 589 590 frequency = 0 591 first_docnum = None 592 first_offset = None 593 count = 0 594 595 doc_positions.sort() 596 597 for docnum, positions in doc_positions: 598 pos_offset = self.position_writer.write_positions(docnum, positions) 599 600 # Retain the first record offset for a subsequent index entry. 601 602 if first_offset is None: 603 first_offset = pos_offset 604 first_docnum = docnum 605 606 frequency += len(positions) 607 count += 1 608 609 # Every {interval} entries, write an index entry. 610 611 if count % self.interval == 0: 612 io = self.position_index_writer.write_positions(first_docnum, first_offset, self.interval) 613 614 # Remember the first index entry offset. 615 616 if index_offset is None: 617 index_offset = io 618 619 first_offset = None 620 first_docnum = None 621 622 # Reset the position writer so that position readers accessing 623 # a section start with the correct document number. 624 625 self.position_writer.reset() 626 627 # Finish writing an index entry for the remaining documents. 628 629 else: 630 if first_offset is not None: 631 io = self.position_index_writer.write_positions(first_docnum, first_offset, count % self.interval) 632 633 # Remember the first index entry offset. 634 635 if index_offset is None: 636 index_offset = io 637 638 return index_offset, frequency, count 639 640 def close(self): 641 self.position_writer.close() 642 self.position_index_writer.close() 643 644 class PositionDictionaryReader: 645 646 "Reading position dictionaries." 647 648 def __init__(self, position_opener, position_index_opener): 649 self.position_opener = position_opener 650 self.position_index_opener = position_index_opener 651 652 def read_term_positions(self, offset, doc_frequency): 653 654 """ 655 Return an iterator for dictionary entries starting at 'offset' with the 656 given 'doc_frequency'. 657 """ 658 659 return PositionDictionaryIterator(self.position_opener, 660 self.position_index_opener, offset, doc_frequency) 661 662 def close(self): 663 pass 664 665 class PositionDictionaryIterator: 666 667 "Iteration over position dictionary entries." 668 669 def __init__(self, position_opener, position_index_opener, offset, doc_frequency): 670 self.position_opener = position_opener 671 self.doc_frequency = doc_frequency 672 self.index_iterator = position_index_opener.read_term_positions(offset, doc_frequency) 673 self.iterator = None 674 675 # Remember the last values. 676 677 self.found_docnum, self.found_positions = None, None 678 679 # Maintain state for the next index entry, if read. 680 681 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 682 683 # Initialise the current index entry and current position file iterator. 684 685 self._next_section() 686 self._init_section() 687 688 # Sequence methods. 689 690 def __len__(self): 691 return self.doc_frequency 692 693 def sort(self): 694 pass 695 696 # Iterator methods. 697 698 def __iter__(self): 699 return self 700 701 def next(self): 702 703 """ 704 Attempt to get the next document record from the section in the 705 positions file. 706 """ 707 708 # Return any visited but unrequested record. 709 710 if self.found_docnum is not None: 711 t = self.found_docnum, self.found_positions 712 self.found_docnum, self.found_positions = None, None 713 return t 714 715 # Or search for the next record. 716 717 while 1: 718 719 # Either return the next record. 720 721 try: 722 return self.iterator.next() 723 724 # Or, where a section is finished, get the next section and try again. 725 726 except StopIteration: 727 728 # Where a section follows, update the index iterator, but keep 729 # reading using the same file iterator (since the data should 730 # just follow on from the last section). 731 732 self._next_section() 733 self.iterator.replenish(self.section_count) 734 735 # Reset the state of the iterator to make sure that document 736 # numbers are correct. 737 738 self.iterator.reset() 739 740 def from_document(self, docnum): 741 742 """ 743 Attempt to navigate to a positions entry for the given 'docnum', 744 returning the positions for 'docnum', or None otherwise. 745 """ 746 747 # Return any unrequested document positions. 748 749 if docnum == self.found_docnum: 750 return self.found_positions 751 752 # Read ahead in the index until the next entry refers to a document 753 # later than the desired document. 754 755 try: 756 if self.next_docnum is None: 757 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 758 759 # Read until the next entry is after the desired document number, 760 # or until the end of the results. 761 762 while self.next_docnum <= docnum: 763 self._next_read_section() 764 if self.docnum < docnum: 765 self.next_docnum, self.next_pos_offset, self.next_section_count = self.index_iterator.next() 766 else: 767 break 768 769 except StopIteration: 770 pass 771 772 # Navigate in the position file to the document. 773 774 self._init_section() 775 776 try: 777 while 1: 778 found_docnum, found_positions = self.iterator.next() 779 780 # Return the desired document positions or None (retaining the 781 # positions for the document immediately after). 782 783 if docnum == found_docnum: 784 return found_positions 785 elif docnum < found_docnum: 786 self.found_docnum, self.found_positions = found_docnum, found_positions 787 return None 788 789 except StopIteration: 790 return None 791 792 # Internal methods. 793 794 def _next_section(self): 795 796 "Attempt to get the next section in the index." 797 798 if self.next_docnum is None: 799 self.docnum, self.pos_offset, self.section_count = self.index_iterator.next() 800 else: 801 self._next_read_section() 802 803 def _next_read_section(self): 804 805 """ 806 Make the next index entry the current one without reading from the 807 index. 808 """ 809 810 self.docnum, self.pos_offset, self.section_count = self.next_docnum, self.next_pos_offset, self.next_section_count 811 self.next_docnum, self.next_pos_offset, self.next_section_count = None, None, None 812 813 def _init_section(self): 814 815 "Initialise the iterator for the section in the position file." 816 817 if self.iterator is not None: 818 self.iterator.close() 819 self.iterator = self.position_opener.read_term_positions(self.pos_offset, self.section_count) 820 821 def close(self): 822 if self.iterator is not None: 823 self.iterator.close() 824 self.iterator = None 825 if self.index_iterator is not None: 826 self.index_iterator.close() 827 self.index_iterator = None 828 829 class TermWriter(FileWriter): 830 831 "Writing term information to files." 832 833 def reset(self): 834 self.last_term = "" 835 self.last_offset = 0 836 837 def write_term(self, term, offset, frequency, doc_frequency): 838 839 """ 840 Write the given 'term', its position file 'offset', its 'frequency' and 841 its 'doc_frequency' (number of documents in which it appears) to the 842 term information file. Return the offset after the term information was 843 written to the file. 844 """ 845 846 # Write the prefix length and term suffix. 847 848 common = len(commonprefix([self.last_term, term])) 849 suffix = term[common:] 850 851 self.write_number(common) 852 self.write_string(suffix) 853 854 # Write the offset delta. 855 856 self.write_number(offset - self.last_offset) 857 858 # Write the frequency. 859 860 self.write_number(frequency) 861 862 # Write the document frequency. 863 864 self.write_number(doc_frequency) 865 866 self.last_term = term 867 self.last_offset = offset 868 869 return self.tell() 870 871 class TermReader(FileReader): 872 873 "Reading term information from files." 874 875 def reset(self): 876 self.last_term = "" 877 self.last_offset = 0 878 879 def read_term(self): 880 881 """ 882 Read a term, its position file offset, its frequency and its document 883 frequency from the term information file. 884 """ 885 886 # Read the prefix length and term suffix. 887 888 common = self.read_number() 889 suffix = self.read_string() 890 891 self.last_term = self.last_term[:common] + suffix 892 893 # Read the offset delta. 894 895 self.last_offset += self.read_number() 896 897 # Read the frequency. 898 899 frequency = self.read_number() 900 901 # Read the document frequency. 902 903 doc_frequency = self.read_number() 904 905 return self.last_term, self.last_offset, frequency, doc_frequency 906 907 def go_to_term(self, term, offset, info_offset): 908 909 """ 910 Seek past the entry for 'term' having 'offset' to 'info_offset'. This 911 permits the scanning for later terms from the specified term. 912 """ 913 914 self.seek(info_offset) 915 self.last_term = term 916 self.last_offset = offset 917 918 class TermIndexWriter(TermWriter): 919 920 "Writing term dictionary index details to files." 921 922 def reset(self): 923 TermWriter.reset(self) 924 self.last_info_offset = 0 925 926 def write_term(self, term, offset, frequency, doc_frequency, info_offset): 927 928 """ 929 Write the given 'term', its position file 'offset', its 'frequency' and 930 its 'doc_frequency' to the term dictionary index file, along with the 931 'info_offset' in the term information file. 932 """ 933 934 TermWriter.write_term(self, term, offset, frequency, doc_frequency) 935 936 # Write the information file offset delta. 937 938 self.write_number(info_offset - self.last_info_offset) 939 self.last_info_offset = info_offset 940 941 class TermIndexReader(TermReader): 942 943 "Reading term dictionary index details from files." 944 945 def reset(self): 946 TermReader.reset(self) 947 self.last_info_offset = 0 948 949 def read_term(self): 950 951 """ 952 Read a term, its position file offset, its frequency, its document 953 frequency and a term information file offset from the term dictionary 954 index file. 955 """ 956 957 term, offset, frequency, doc_frequency = TermReader.read_term(self) 958 959 # Read the offset delta. 960 961 self.last_info_offset += self.read_number() 962 963 return term, offset, frequency, doc_frequency, self.last_info_offset 964 965 class TermDictionaryWriter: 966 967 "Writing term dictionaries." 968 969 def __init__(self, info_writer, index_writer, position_dict_writer, interval): 970 self.info_writer = info_writer 971 self.index_writer = index_writer 972 self.position_dict_writer = position_dict_writer 973 self.interval = interval 974 self.entry = 0 975 976 def _write_term(self, term, offset, frequency, doc_frequency): 977 978 """ 979 Write the given 'term', its position file 'offset', its 'frequency' and 980 its 'doc_frequency' (number of documents in which it appears) to the 981 term information file. Return the offset after the term information was 982 written to the file. 983 """ 984 985 info_offset = self.info_writer.write_term(term, offset, frequency, doc_frequency) 986 987 if self.entry % self.interval == 0: 988 self.index_writer.write_term(term, offset, frequency, doc_frequency, info_offset) 989 990 self.entry += 1 991 992 def write_term_positions(self, term, doc_positions): 993 994 """ 995 Write the given 'term' and the 'doc_positions' recording the documents 996 and positions at which the term is found. 997 """ 998 999 offset, frequency, doc_frequency = self.position_dict_writer.write_term_positions(doc_positions) 1000 self._write_term(term, offset, frequency, doc_frequency) 1001 1002 def close(self): 1003 self.info_writer.close() 1004 self.index_writer.close() 1005 self.position_dict_writer.close() 1006 1007 class TermDictionaryReader: 1008 1009 "Reading term dictionaries." 1010 1011 def __init__(self, info_reader, index_reader, position_dict_reader): 1012 self.info_reader = info_reader 1013 self.index_reader = index_reader 1014 self.position_dict_reader = position_dict_reader 1015 1016 self.terms = [] 1017 try: 1018 while 1: 1019 self.terms.append(self.index_reader.read_term()) 1020 except EOFError: 1021 pass 1022 1023 # Large numbers for ordering purposes. 1024 1025 if self.terms: 1026 self.max_offset = self.terms[-1][1] + 1 1027 else: 1028 self.max_offset = None 1029 1030 def _find_closest_entry(self, term): 1031 1032 """ 1033 Find the offsets and frequencies of 'term' from the term dictionary or 1034 the closest term starting with the value of 'term'. 1035 1036 Return the closest index entry consisting of a term, the position file 1037 offset, the term frequency, the document frequency, and the term details 1038 file offset. 1039 """ 1040 1041 i = bisect_right(self.terms, (term, self.max_offset, 0, 0)) - 1 1042 1043 # Get the entry position providing the term or one preceding it. 1044 # If no entry precedes the requested term, return the very first entry 1045 # as the closest. 1046 1047 if i == -1: 1048 return self.terms[0] 1049 else: 1050 return self.terms[i] 1051 1052 def _find_closest_term(self, term): 1053 1054 """ 1055 Find the offsets and frequencies of 'term' from the term dictionary or 1056 the closest term starting with the value of 'term'. 1057 1058 Return the closest term (or the term itself), the position file offset, 1059 the term frequency, the document frequency, and the term details file 1060 offset (or None if the reader is already positioned). 1061 """ 1062 1063 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_entry(term) 1064 1065 # Where the term is found immediately, return the offset and 1066 # frequencies. If the term does not appear, return the details of the 1067 # closest entry. 1068 1069 if term <= found_term: 1070 return found_term, offset, frequency, doc_frequency, info_offset 1071 1072 # Otherwise, seek past the index term's entry in the information file 1073 # and scan for the desired term. 1074 1075 else: 1076 self.info_reader.go_to_term(found_term, offset, info_offset) 1077 try: 1078 while term > found_term: 1079 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1080 except EOFError: 1081 pass 1082 1083 return found_term, offset, frequency, doc_frequency, None 1084 1085 def _find_term(self, term): 1086 1087 """ 1088 Find the position file offset and frequency of 'term' from the term 1089 dictionary. 1090 """ 1091 1092 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1093 1094 # If the term is found, return the offset and frequencies. 1095 1096 if term == found_term: 1097 return offset, frequency, doc_frequency 1098 else: 1099 return None 1100 1101 def _get_positions(self, offset, doc_frequency): 1102 return self.position_dict_reader.read_term_positions(offset, doc_frequency) 1103 1104 # Iterator convenience methods. 1105 1106 def __iter__(self): 1107 self.rewind() 1108 return self 1109 1110 def next(self): 1111 try: 1112 return self.read_term() 1113 except EOFError: 1114 raise StopIteration 1115 1116 # Sequential access methods. 1117 1118 def rewind(self): 1119 self.info_reader.rewind() 1120 1121 def read_term(self): 1122 1123 """ 1124 Return the next term, its frequency, its document frequency, and the 1125 documents and positions at which the term is found. 1126 """ 1127 1128 term, offset, frequency, doc_frequency = self.info_reader.read_term() 1129 positions = self._get_positions(offset, doc_frequency) 1130 return term, frequency, doc_frequency, positions 1131 1132 # Query methods. 1133 1134 def find_terms(self, term): 1135 1136 "Return all terms whose values start with the value of 'term'." 1137 1138 terms = [] 1139 1140 found_term, offset, frequency, doc_frequency, info_offset = self._find_closest_term(term) 1141 1142 # Position the reader, if necessary. 1143 1144 if info_offset is not None: 1145 self.info_reader.go_to_term(found_term, offset, info_offset) 1146 1147 # Read and record terms. 1148 1149 try: 1150 # Add the found term if it starts with the specified term. 1151 1152 while found_term.startswith(term): 1153 terms.append(found_term) 1154 found_term, offset, frequency, doc_frequency = self.info_reader.read_term() 1155 1156 except EOFError: 1157 pass 1158 1159 return terms 1160 1161 def find_positions(self, term): 1162 1163 "Return the documents and positions at which the given 'term' is found." 1164 1165 t = self._find_term(term) 1166 if t is None: 1167 return None 1168 else: 1169 offset, frequency, doc_frequency = t 1170 return self._get_positions(offset, doc_frequency) 1171 1172 def get_frequency(self, term): 1173 1174 "Return the frequency of the given 'term'." 1175 1176 t = self._find_term(term) 1177 if t is None: 1178 return None 1179 else: 1180 offset, frequency, doc_frequency = t 1181 return frequency 1182 1183 def get_document_frequency(self, term): 1184 1185 "Return the document frequency of the given 'term'." 1186 1187 t = self._find_term(term) 1188 if t is None: 1189 return None 1190 else: 1191 offset, frequency, doc_frequency = t 1192 return doc_frequency 1193 1194 def close(self): 1195 self.info_reader.close() 1196 self.index_reader.close() 1197 self.position_dict_reader.close() 1198 1199 # Specific classes for storing document information. 1200 1201 class FieldWriter(FileWriter): 1202 1203 "Writing field data to files." 1204 1205 def reset(self): 1206 self.last_docnum = 0 1207 1208 def write_fields(self, docnum, fields): 1209 1210 """ 1211 Write for the given 'docnum', a list of 'fields' (integer, string pairs 1212 representing field identifiers and values respectively). 1213 Return the offset at which the fields are stored. 1214 """ 1215 1216 offset = self.tell() 1217 1218 # Write the document number delta. 1219 1220 self.write_number(docnum - self.last_docnum) 1221 1222 # Write the number of fields. 1223 1224 self.write_number(len(fields)) 1225 1226 # Write the fields themselves. 1227 1228 for i, field in fields: 1229 self.write_number(i) 1230 self.write_string(field, 1) # compress 1231 1232 self.last_docnum = docnum 1233 return offset 1234 1235 class FieldReader(FileReader): 1236 1237 "Reading field data from files." 1238 1239 def reset(self): 1240 self.last_docnum = 0 1241 1242 def read_fields(self): 1243 1244 """ 1245 Read fields from the file, returning a tuple containing the document 1246 number and a list of field (identifier, value) pairs. 1247 """ 1248 1249 # Read the document number. 1250 1251 self.last_docnum += self.read_number() 1252 1253 # Read the number of fields. 1254 1255 nfields = self.read_number() 1256 1257 # Collect the fields. 1258 1259 fields = [] 1260 i = 0 1261 1262 while i < nfields: 1263 identifier = self.read_number() 1264 value = self.read_string(1) # decompress 1265 fields.append((identifier, value)) 1266 i += 1 1267 1268 return self.last_docnum, fields 1269 1270 def read_document_fields(self, docnum, offset): 1271 1272 """ 1273 Read fields for 'docnum' at the given 'offset'. This permits the 1274 retrieval of details for the specified document, as well as scanning for 1275 later documents. 1276 """ 1277 1278 self.seek(offset) 1279 bad_docnum, fields = self.read_fields() 1280 self.last_docnum = docnum 1281 return docnum, fields 1282 1283 class FieldIndexWriter(FileWriter): 1284 1285 "Writing field index details to files." 1286 1287 def reset(self): 1288 self.last_docnum = 0 1289 self.last_offset = 0 1290 1291 def write_document(self, docnum, offset): 1292 1293 """ 1294 Write for the given 'docnum', the 'offset' at which the fields for the 1295 document are stored in the fields file. 1296 """ 1297 1298 # Write the document number and offset deltas. 1299 1300 self.write_number(docnum - self.last_docnum) 1301 self.write_number(offset - self.last_offset) 1302 1303 self.last_docnum = docnum 1304 self.last_offset = offset 1305 1306 class FieldIndexReader(FileReader): 1307 1308 "Reading field index details from files." 1309 1310 def reset(self): 1311 self.last_docnum = 0 1312 self.last_offset = 0 1313 1314 def read_document(self): 1315 1316 "Read a document number and field file offset." 1317 1318 # Read the document number delta and offset. 1319 1320 self.last_docnum += self.read_number() 1321 self.last_offset += self.read_number() 1322 1323 return self.last_docnum, self.last_offset 1324 1325 class FieldDictionaryWriter: 1326 1327 "Writing field dictionary details." 1328 1329 def __init__(self, field_writer, field_index_writer, interval): 1330 self.field_writer = field_writer 1331 self.field_index_writer = field_index_writer 1332 self.interval = interval 1333 self.entry = 0 1334 1335 def write_fields(self, docnum, fields): 1336 1337 "Write details of the document with the given 'docnum' and 'fields'." 1338 1339 offset = self.field_writer.write_fields(docnum, fields) 1340 1341 if self.entry % self.interval == 0: 1342 self.field_index_writer.write_document(docnum, offset) 1343 1344 self.entry += 1 1345 1346 def close(self): 1347 self.field_writer.close() 1348 self.field_index_writer.close() 1349 1350 class FieldDictionaryReader: 1351 1352 "Reading field dictionary details." 1353 1354 def __init__(self, field_reader, field_index_reader): 1355 self.field_reader = field_reader 1356 self.field_index_reader = field_index_reader 1357 1358 self.docs = [] 1359 try: 1360 while 1: 1361 self.docs.append(self.field_index_reader.read_document()) 1362 except EOFError: 1363 pass 1364 1365 # Large numbers for ordering purposes. 1366 1367 if self.docs: 1368 self.max_offset = self.docs[-1][1] 1369 else: 1370 self.max_offset = None 1371 1372 # Iterator convenience methods. 1373 1374 def __iter__(self): 1375 self.rewind() 1376 return self 1377 1378 def next(self): 1379 try: 1380 return self.read_fields() 1381 except EOFError: 1382 raise StopIteration 1383 1384 # Sequential access methods. 1385 1386 def rewind(self): 1387 self.field_reader.rewind() 1388 1389 def read_fields(self): 1390 1391 "Return the next document number and fields." 1392 1393 return self.field_reader.read_fields() 1394 1395 # Random access methods. 1396 1397 def get_fields(self, docnum): 1398 1399 "Read the fields of the document with the given 'docnum'." 1400 1401 i = bisect_right(self.docs, (docnum, self.max_offset)) - 1 1402 1403 # Get the entry position providing the term or one preceding it. 1404 1405 if i == -1: 1406 return None 1407 1408 found_docnum, offset = self.docs[i] 1409 1410 # Read from the fields file. 1411 1412 found_docnum, fields = self.field_reader.read_document_fields(found_docnum, offset) 1413 1414 # Scan for the document, if necessary. 1415 1416 try: 1417 while docnum > found_docnum: 1418 found_docnum, fields = self.field_reader.read_fields() 1419 except EOFError: 1420 pass 1421 1422 # If the document is found, return the fields. 1423 1424 if docnum == found_docnum: 1425 return fields 1426 else: 1427 return None 1428 1429 def close(self): 1430 self.field_reader.close() 1431 self.field_index_reader.close() 1432 1433 # Dictionary merging classes. 1434 1435 class Merger: 1436 1437 "Merge files." 1438 1439 def __init__(self, writer, readers): 1440 self.writer = writer 1441 self.readers = readers 1442 1443 def close(self): 1444 for reader in self.readers: 1445 reader.close() 1446 self.writer.close() 1447 1448 class TermDictionaryMerger(Merger): 1449 1450 "Merge term and position files." 1451 1452 def merge(self): 1453 1454 """ 1455 Merge terms and positions from the readers, sending them to the writer. 1456 """ 1457 1458 last_term = None 1459 current_readers = [] 1460 1461 for term, frequency, doc_frequency, positions in itermerge(self.readers): 1462 if term == last_term: 1463 current_readers.append(positions) 1464 else: 1465 if current_readers: 1466 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1467 last_term = term 1468 current_readers = [positions] 1469 else: 1470 if current_readers: 1471 self.writer.write_term_positions(last_term, itermerge(current_readers)) 1472 1473 class FieldDictionaryMerger(Merger): 1474 1475 "Merge field files." 1476 1477 def merge(self): 1478 1479 """ 1480 Merge fields from the readers, sending them to the writer. 1481 """ 1482 1483 for docnum, fields in itermerge(self.readers): 1484 self.writer.write_fields(docnum, fields) 1485 1486 # Utility functions. 1487 1488 def get_term_writer(pathname, partition, interval, doc_interval): 1489 1490 """ 1491 Return a term dictionary writer using files under the given 'pathname' 1492 labelled according to the given 'partition', using the given indexing 1493 'interval' for terms and 'doc_interval' for document position records. 1494 """ 1495 1496 tdf = open(join(pathname, "terms-%s" % partition), "wb") 1497 info_writer = TermWriter(tdf) 1498 1499 tdif = open(join(pathname, "terms_index-%s" % partition), "wb") 1500 index_writer = TermIndexWriter(tdif) 1501 1502 tpf = open(join(pathname, "positions-%s" % partition), "wb") 1503 positions_writer = PositionWriter(tpf) 1504 1505 tpif = open(join(pathname, "positions_index-%s" % partition), "wb") 1506 positions_index_writer = PositionIndexWriter(tpif) 1507 1508 positions_dict_writer = PositionDictionaryWriter(positions_writer, positions_index_writer, doc_interval) 1509 1510 return TermDictionaryWriter(info_writer, index_writer, positions_dict_writer, interval) 1511 1512 def get_field_writer(pathname, partition, interval): 1513 1514 """ 1515 Return a field dictionary writer using files under the given 'pathname' 1516 labelled according to the given 'partition', using the given indexing 1517 'interval'. 1518 """ 1519 1520 ff = open(join(pathname, "fields-%s" % partition), "wb") 1521 field_writer = FieldWriter(ff) 1522 1523 fif = open(join(pathname, "fields_index-%s" % partition), "wb") 1524 field_index_writer = FieldIndexWriter(fif) 1525 1526 return FieldDictionaryWriter(field_writer, field_index_writer, interval) 1527 1528 def get_term_reader(pathname, partition): 1529 1530 """ 1531 Return a term dictionary reader using files under the given 'pathname' 1532 labelled according to the given 'partition'. 1533 """ 1534 1535 tdf = open(join(pathname, "terms-%s" % partition), "rb") 1536 info_reader = TermReader(tdf) 1537 1538 tdif = open(join(pathname, "terms_index-%s" % partition), "rb") 1539 index_reader = TermIndexReader(tdif) 1540 1541 positions_opener = PositionOpener(join(pathname, "positions-%s" % partition)) 1542 positions_index_opener = PositionIndexOpener(join(pathname, "positions_index-%s" % partition)) 1543 1544 positions_dict_reader = PositionDictionaryReader(positions_opener, positions_index_opener) 1545 1546 return TermDictionaryReader(info_reader, index_reader, positions_dict_reader) 1547 1548 def get_field_reader(pathname, partition): 1549 1550 """ 1551 Return a field dictionary reader using files under the given 'pathname' 1552 labelled according to the given 'partition'. 1553 """ 1554 1555 ff = open(join(pathname, "fields-%s" % partition), "rb") 1556 field_reader = FieldReader(ff) 1557 1558 fif = open(join(pathname, "fields_index-%s" % partition), "rb") 1559 field_index_reader = FieldIndexReader(fif) 1560 1561 return FieldDictionaryReader(field_reader, field_index_reader) 1562 1563 def rename_files(pathname, names, from_partition, to_partition): 1564 for name in names: 1565 rename(join(pathname, "%s-%s" % (name, from_partition)), join(pathname, "%s-%s" % (name, to_partition))) 1566 1567 def rename_term_files(pathname, from_partition, to_partition): 1568 rename_files(pathname, TERM_FILENAMES, from_partition, to_partition) 1569 1570 def rename_field_files(pathname, from_partition, to_partition): 1571 rename_files(pathname, FIELD_FILENAMES, from_partition, to_partition) 1572 1573 def remove_files(pathname, names, partition): 1574 for name in names: 1575 remove(join(pathname, "%s-%s" % (name, partition))) 1576 1577 def remove_term_files(pathname, partition): 1578 remove_files(pathname, TERM_FILENAMES, partition) 1579 1580 def remove_field_files(pathname, partition): 1581 remove_files(pathname, FIELD_FILENAMES, partition) 1582 1583 # High-level classes. 1584 1585 class Document: 1586 1587 "A container of document information." 1588 1589 def __init__(self, docnum): 1590 self.docnum = docnum 1591 self.fields = [] 1592 self.terms = {} 1593 1594 def add_position(self, term, position): 1595 1596 """ 1597 Add a position entry for the given 'term', indicating the given 1598 'position'. 1599 """ 1600 1601 self.terms.setdefault(term, []).append(position) 1602 1603 def add_field(self, identifier, value): 1604 1605 "Add a field having the given 'identifier' and 'value'." 1606 1607 self.fields.append((identifier, unicode(value))) # convert to string 1608 1609 def set_fields(self, fields): 1610 1611 """ 1612 Set the document's 'fields': a list of tuples each containing an integer 1613 identifier and a string value. 1614 """ 1615 1616 self.fields = fields 1617 1618 class IndexWriter: 1619 1620 """ 1621 Building term information and writing it to the term and field dictionaries. 1622 """ 1623 1624 def __init__(self, pathname, interval, doc_interval, flush_interval): 1625 self.pathname = pathname 1626 self.interval = interval 1627 self.doc_interval = doc_interval 1628 self.flush_interval = flush_interval 1629 1630 self.dict_partition = 0 1631 self.field_dict_partition = 0 1632 1633 self.terms = {} 1634 self.docs = {} 1635 1636 self.doc_counter = 0 1637 1638 def add_document(self, doc): 1639 1640 """ 1641 Add the given document 'doc', updating the document counter and flushing 1642 terms and fields if appropriate. 1643 """ 1644 1645 for term, positions in doc.terms.items(): 1646 self.terms.setdefault(term, {})[doc.docnum] = positions 1647 1648 self.docs[doc.docnum] = doc.fields 1649 1650 self.doc_counter += 1 1651 if self.flush_interval and self.doc_counter >= self.flush_interval: 1652 self.flush_terms() 1653 self.flush_fields() 1654 self.doc_counter = 0 1655 1656 def get_term_writer(self): 1657 1658 "Return a term dictionary writer for the current partition." 1659 1660 return get_term_writer(self.pathname, self.dict_partition, self.interval, self.doc_interval) 1661 1662 def get_field_writer(self): 1663 1664 "Return a field dictionary writer for the current partition." 1665 1666 return get_field_writer(self.pathname, self.field_dict_partition, self.interval) 1667 1668 def flush_terms(self): 1669 1670 "Flush terms into the current term dictionary partition." 1671 1672 # Get the terms in order. 1673 1674 all_terms = self.terms 1675 terms = all_terms.keys() 1676 terms.sort() 1677 1678 dict_writer = self.get_term_writer() 1679 1680 for term in terms: 1681 doc_positions = all_terms[term].items() 1682 dict_writer.write_term_positions(term, doc_positions) 1683 1684 dict_writer.close() 1685 1686 self.terms = {} 1687 self.dict_partition += 1 1688 1689 def flush_fields(self): 1690 1691 "Flush fields into the current term dictionary partition." 1692 1693 # Get the documents in order. 1694 1695 docs = self.docs.items() 1696 docs.sort() 1697 1698 field_dict_writer = self.get_field_writer() 1699 1700 for docnum, fields in docs: 1701 field_dict_writer.write_fields(docnum, fields) 1702 1703 field_dict_writer.close() 1704 1705 self.docs = {} 1706 self.field_dict_partition += 1 1707 1708 def close(self): 1709 if self.terms: 1710 self.flush_terms() 1711 if self.docs: 1712 self.flush_fields() 1713 1714 class IndexReader: 1715 1716 "Accessing the term and field dictionaries." 1717 1718 def __init__(self, pathname): 1719 self.dict_reader = get_term_reader(pathname, "merged") 1720 self.field_dict_reader = get_field_reader(pathname, "merged") 1721 1722 def find_terms(self, term): 1723 return self.dict_reader.find_terms(term) 1724 1725 def find_positions(self, term): 1726 return self.dict_reader.find_positions(term) 1727 1728 def get_frequency(self, term): 1729 return self.dict_reader.get_frequency(term) 1730 1731 def get_document_frequency(self, term): 1732 return self.dict_reader.get_document_frequency(term) 1733 1734 def get_fields(self, docnum): 1735 return self.field_dict_reader.get_fields(docnum) 1736 1737 def close(self): 1738 self.dict_reader.close() 1739 self.field_dict_reader.close() 1740 1741 class Index: 1742 1743 "An inverted index solution encapsulating the various components." 1744 1745 def __init__(self, pathname): 1746 self.pathname = pathname 1747 self.reader = None 1748 self.writer = None 1749 1750 def get_writer(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL, flush_interval=FLUSH_INTERVAL): 1751 1752 """ 1753 Return a writer, optionally using the given indexing 'interval', 1754 'doc_interval' and 'flush_interval'. 1755 """ 1756 1757 if not exists(self.pathname): 1758 mkdir(self.pathname) 1759 1760 self.writer = IndexWriter(self.pathname, interval, doc_interval, flush_interval) 1761 return self.writer 1762 1763 def get_reader(self, partition=0): 1764 1765 "Return a reader for the index." 1766 1767 # Ensure that only one partition exists. 1768 1769 self.merge() 1770 return self._get_reader(partition) 1771 1772 def _get_reader(self, partition): 1773 1774 "Return a reader for the index." 1775 1776 if not exists(self.pathname): 1777 raise OSError, "Index path %r does not exist." % self.pathname 1778 1779 self.reader = IndexReader(self.pathname) 1780 return self.reader 1781 1782 def merge(self): 1783 1784 "Merge/optimise index partitions." 1785 1786 self.merge_terms() 1787 self.merge_fields() 1788 1789 def merge_terms(self, interval=TERM_INTERVAL, doc_interval=DOCUMENT_INTERVAL): 1790 1791 """ 1792 Merge term dictionaries using the given indexing 'interval' and 1793 'doc_interval'. 1794 """ 1795 1796 readers = [] 1797 partitions = set() 1798 1799 for filename in listdir(self.pathname): 1800 if filename.startswith("terms-"): # 6 character prefix 1801 partition = filename[6:] 1802 readers.append(get_term_reader(self.pathname, partition)) 1803 partitions.add(partition) 1804 1805 # Write directly to a dictionary. 1806 1807 if len(readers) > 1: 1808 if "merged" in partitions: 1809 rename_term_files(self.pathname, "merged", "old-merged") 1810 partitions.remove("merged") 1811 partitions.add("old-merged") 1812 1813 writer = get_term_writer(self.pathname, "merged", interval, doc_interval) 1814 merger = TermDictionaryMerger(writer, readers) 1815 merger.merge() 1816 merger.close() 1817 1818 # Remove old files. 1819 1820 for partition in partitions: 1821 remove_term_files(self.pathname, partition) 1822 1823 elif len(readers) == 1: 1824 partition = list(partitions)[0] 1825 if partition != "merged": 1826 rename_term_files(self.pathname, partition, "merged") 1827 1828 def merge_fields(self, interval=FIELD_INTERVAL): 1829 1830 "Merge field dictionaries using the given indexing 'interval'." 1831 1832 readers = [] 1833 partitions = set() 1834 1835 for filename in listdir(self.pathname): 1836 if filename.startswith("fields-"): # 7 character prefix 1837 partition = filename[7:] 1838 readers.append(get_field_reader(self.pathname, partition)) 1839 partitions.add(partition) 1840 1841 # Write directly to a dictionary. 1842 1843 if len(readers) > 1: 1844 if "merged" in partitions: 1845 rename_field_files(self.pathname, "merged", "old-merged") 1846 partitions.remove("merged") 1847 partitions.add("old-merged") 1848 1849 writer = get_field_writer(self.pathname, "merged", interval) 1850 merger = FieldDictionaryMerger(writer, readers) 1851 merger.merge() 1852 merger.close() 1853 1854 # Remove old files. 1855 1856 for partition in partitions: 1857 remove_field_files(self.pathname, partition) 1858 1859 elif len(readers) == 1: 1860 partition = list(partitions)[0] 1861 if partition != "merged": 1862 rename_field_files(self.pathname, partition, "merged") 1863 1864 def close(self): 1865 if self.reader is not None: 1866 self.reader.close() 1867 self.reader = None 1868 if self.writer is not None: 1869 self.writer.close() 1870 self.writer = None 1871 1872 # vim: tabstop=4 expandtab shiftwidth=4