1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.DataInput;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.Map;
30 import java.util.SortedSet;
31 import java.util.UUID;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.classification.InterfaceAudience;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.KeyValue.KVComparator;
45 import org.apache.hadoop.hbase.KeyValue.MetaKeyComparator;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
48 import org.apache.hadoop.hbase.io.compress.Compression;
49 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50 import org.apache.hadoop.hbase.io.hfile.BlockType;
51 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52 import org.apache.hadoop.hbase.io.hfile.HFile;
53 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
54 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
56 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
57 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
58 import org.apache.hadoop.hbase.util.BloomFilter;
59 import org.apache.hadoop.hbase.util.BloomFilterFactory;
60 import org.apache.hadoop.hbase.util.BloomFilterWriter;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.ChecksumType;
63 import org.apache.hadoop.hbase.util.Writables;
64 import org.apache.hadoop.io.RawComparator;
65 import org.apache.hadoop.io.WritableUtils;
66
67 import com.google.common.base.Function;
68 import com.google.common.base.Preconditions;
69 import com.google.common.collect.ImmutableList;
70 import com.google.common.collect.Ordering;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.LimitedPrivate("Coprocessor")
86 public class StoreFile {
87 static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
88
89
90
91
92 public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
93
94
95 public static final byte[] MAJOR_COMPACTION_KEY =
96 Bytes.toBytes("MAJOR_COMPACTION_KEY");
97
98
99 public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
100 Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
101
102
103 public static final byte[] BLOOM_FILTER_TYPE_KEY =
104 Bytes.toBytes("BLOOM_FILTER_TYPE");
105
106
107 public static final byte[] DELETE_FAMILY_COUNT =
108 Bytes.toBytes("DELETE_FAMILY_COUNT");
109
110
111 private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
112
113
114 public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
115
116
117 public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
118
119
120
121 public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
122
123 private final StoreFileInfo fileInfo;
124 private final FileSystem fs;
125
126
127 private final CacheConfig cacheConf;
128
129
130 private final HFileDataBlockEncoder dataBlockEncoder;
131
132
133
134 private long sequenceid = -1;
135
136
137
138 private long maxMemstoreTS = -1;
139
140 public long getMaxMemstoreTS() {
141 return maxMemstoreTS;
142 }
143
144 public void setMaxMemstoreTS(long maxMemstoreTS) {
145 this.maxMemstoreTS = maxMemstoreTS;
146 }
147
148
149
150 private AtomicBoolean majorCompaction = null;
151
152
153
154 private boolean excludeFromMinorCompaction = false;
155
156
157 public static final byte[] BULKLOAD_TASK_KEY =
158 Bytes.toBytes("BULKLOAD_SOURCE_TASK");
159 public static final byte[] BULKLOAD_TIME_KEY =
160 Bytes.toBytes("BULKLOAD_TIMESTAMP");
161
162
163
164
165 private Map<byte[], byte[]> metadataMap;
166
167
168 private volatile Reader reader;
169
170
171
172
173
174 private final BloomType cfBloomType;
175
176
177 private long modificationTimeStamp = 0L;
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
196 final CacheConfig cacheConf, final BloomType cfBloomType,
197 final HFileDataBlockEncoder dataBlockEncoder) throws IOException {
198 this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, dataBlockEncoder);
199 }
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
219 final CacheConfig cacheConf, final BloomType cfBloomType,
220 final HFileDataBlockEncoder dataBlockEncoder) throws IOException {
221 this.fs = fs;
222 this.fileInfo = fileInfo;
223 this.cacheConf = cacheConf;
224 this.dataBlockEncoder =
225 dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
226 : dataBlockEncoder;
227
228 if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
229 this.cfBloomType = cfBloomType;
230 } else {
231 LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
232 "cfBloomType=" + cfBloomType + " (disabled in config)");
233 this.cfBloomType = BloomType.NONE;
234 }
235
236
237 this.modificationTimeStamp = fileInfo.getModificationTime();
238 }
239
240
241
242
243 public Path getPath() {
244 return this.fileInfo.getPath();
245 }
246
247
248
249
250
251 public boolean isReference() {
252 return this.fileInfo.isReference();
253 }
254
255
256
257
258 public boolean isMajorCompaction() {
259 if (this.majorCompaction == null) {
260 throw new NullPointerException("This has not been set yet");
261 }
262 return this.majorCompaction.get();
263 }
264
265
266
267
268 public boolean excludeFromMinorCompaction() {
269 return this.excludeFromMinorCompaction;
270 }
271
272
273
274
275 public long getMaxSequenceId() {
276 return this.sequenceid;
277 }
278
279 public long getModificationTimeStamp() {
280 return modificationTimeStamp;
281 }
282
283
284
285
286
287
288
289
290
291 public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
292 long max = 0;
293 for (StoreFile sf : sfs) {
294 if (!sf.isBulkLoadResult()) {
295 max = Math.max(max, sf.getMaxMemstoreTS());
296 }
297 }
298 return max;
299 }
300
301
302
303
304
305
306
307
308
309
310
311 public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
312 boolean includeBulkLoadedFiles) {
313 long max = 0;
314 for (StoreFile sf : sfs) {
315 if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
316 max = Math.max(max, sf.getMaxSequenceId());
317 }
318 }
319 return max;
320 }
321
322
323
324
325
326 boolean isBulkLoadResult() {
327 return metadataMap.containsKey(BULKLOAD_TIME_KEY);
328 }
329
330
331
332
333 public long getBulkLoadTimestamp() {
334 return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
335 }
336
337
338
339
340
341 public HDFSBlocksDistribution getHDFSBlockDistribution() {
342 return this.fileInfo.getHDFSBlockDistribution();
343 }
344
345
346
347
348
349
350
351 private Reader open() throws IOException {
352 if (this.reader != null) {
353 throw new IllegalAccessError("Already open");
354 }
355
356
357 this.reader = fileInfo.open(this.fs, this.cacheConf, dataBlockEncoder.getEncodingInCache());
358
359
360 metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
361
362
363 byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
364 if (b != null) {
365
366
367
368
369
370 this.sequenceid = Bytes.toLong(b);
371 if (fileInfo.isTopReference()) {
372 this.sequenceid += 1;
373 }
374 }
375
376 if (isBulkLoadResult()){
377
378
379 String fileName = this.getPath().getName();
380 int startPos = fileName.indexOf("SeqId_");
381 if (startPos != -1) {
382 this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
383 fileName.indexOf('_', startPos + 6)));
384
385 if (fileInfo.isTopReference()) {
386 this.sequenceid += 1;
387 }
388 }
389 }
390 this.reader.setSequenceID(this.sequenceid);
391
392 b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
393 if (b != null) {
394 this.maxMemstoreTS = Bytes.toLong(b);
395 }
396
397 b = metadataMap.get(MAJOR_COMPACTION_KEY);
398 if (b != null) {
399 boolean mc = Bytes.toBoolean(b);
400 if (this.majorCompaction == null) {
401 this.majorCompaction = new AtomicBoolean(mc);
402 } else {
403 this.majorCompaction.set(mc);
404 }
405 } else {
406
407
408 this.majorCompaction = new AtomicBoolean(false);
409 }
410
411 b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
412 this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
413
414 BloomType hfileBloomType = reader.getBloomFilterType();
415 if (cfBloomType != BloomType.NONE) {
416 reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
417 if (hfileBloomType != cfBloomType) {
418 LOG.info("HFile Bloom filter type for "
419 + reader.getHFileReader().getName() + ": " + hfileBloomType
420 + ", but " + cfBloomType + " specified in column family "
421 + "configuration");
422 }
423 } else if (hfileBloomType != BloomType.NONE) {
424 LOG.info("Bloom filter turned off by CF config for "
425 + reader.getHFileReader().getName());
426 }
427
428
429 reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
430
431 try {
432 byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
433 if (timerangeBytes != null) {
434 this.reader.timeRangeTracker = new TimeRangeTracker();
435 Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
436 }
437 } catch (IllegalArgumentException e) {
438 LOG.error("Error reading timestamp range data from meta -- " +
439 "proceeding without", e);
440 this.reader.timeRangeTracker = null;
441 }
442 return this.reader;
443 }
444
445
446
447
448
449 public Reader createReader() throws IOException {
450 if (this.reader == null) {
451 try {
452 this.reader = open();
453 } catch (IOException e) {
454 try {
455 this.closeReader(true);
456 } catch (IOException ee) {
457 }
458 throw e;
459 }
460
461 }
462 return this.reader;
463 }
464
465
466
467
468
469 public Reader getReader() {
470 return this.reader;
471 }
472
473
474
475
476
477 public synchronized void closeReader(boolean evictOnClose)
478 throws IOException {
479 if (this.reader != null) {
480 this.reader.close(evictOnClose);
481 this.reader = null;
482 }
483 }
484
485
486
487
488
489 public void deleteReader() throws IOException {
490 closeReader(true);
491 this.fs.delete(getPath(), true);
492 }
493
494 @Override
495 public String toString() {
496 return this.fileInfo.toString();
497 }
498
499
500
501
502 public String toStringDetailed() {
503 StringBuilder sb = new StringBuilder();
504 sb.append(this.getPath().toString());
505 sb.append(", isReference=").append(isReference());
506 sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
507 if (isBulkLoadResult()) {
508 sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
509 } else {
510 sb.append(", seqid=").append(getMaxSequenceId());
511 }
512 sb.append(", majorCompaction=").append(isMajorCompaction());
513
514 return sb.toString();
515 }
516
517 public static class WriterBuilder {
518 private final Configuration conf;
519 private final CacheConfig cacheConf;
520 private final FileSystem fs;
521 private final int blockSize;
522
523 private Compression.Algorithm compressAlgo =
524 HFile.DEFAULT_COMPRESSION_ALGORITHM;
525 private HFileDataBlockEncoder dataBlockEncoder =
526 NoOpDataBlockEncoder.INSTANCE;
527 private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
528 private BloomType bloomType = BloomType.NONE;
529 private long maxKeyCount = 0;
530 private Path dir;
531 private Path filePath;
532 private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
533 private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
534 private boolean includeMVCCReadpoint = true;
535
536 public WriterBuilder(Configuration conf, CacheConfig cacheConf,
537 FileSystem fs, int blockSize) {
538 this.conf = conf;
539 this.cacheConf = cacheConf;
540 this.fs = fs;
541 this.blockSize = blockSize;
542 }
543
544
545
546
547
548
549
550
551 public WriterBuilder withOutputDir(Path dir) {
552 Preconditions.checkNotNull(dir);
553 this.dir = dir;
554 return this;
555 }
556
557
558
559
560
561
562 public WriterBuilder withFilePath(Path filePath) {
563 Preconditions.checkNotNull(filePath);
564 this.filePath = filePath;
565 return this;
566 }
567
568 public WriterBuilder withCompression(Compression.Algorithm compressAlgo) {
569 Preconditions.checkNotNull(compressAlgo);
570 this.compressAlgo = compressAlgo;
571 return this;
572 }
573
574 public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) {
575 Preconditions.checkNotNull(encoder);
576 this.dataBlockEncoder = encoder;
577 return this;
578 }
579
580 public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
581 Preconditions.checkNotNull(comparator);
582 this.comparator = comparator;
583 return this;
584 }
585
586 public WriterBuilder withBloomType(BloomType bloomType) {
587 Preconditions.checkNotNull(bloomType);
588 this.bloomType = bloomType;
589 return this;
590 }
591
592
593
594
595
596 public WriterBuilder withMaxKeyCount(long maxKeyCount) {
597 this.maxKeyCount = maxKeyCount;
598 return this;
599 }
600
601
602
603
604
605 public WriterBuilder withChecksumType(ChecksumType checksumType) {
606 this.checksumType = checksumType;
607 return this;
608 }
609
610
611
612
613
614 public WriterBuilder withBytesPerChecksum(int bytesPerChecksum) {
615 this.bytesPerChecksum = bytesPerChecksum;
616 return this;
617 }
618
619
620
621
622
623 public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) {
624 this.includeMVCCReadpoint = includeMVCCReadpoint;
625 return this;
626 }
627
628
629
630
631
632
633 public Writer build() throws IOException {
634 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
635 throw new IllegalArgumentException("Either specify parent directory " +
636 "or file path");
637 }
638
639 if (dir == null) {
640 dir = filePath.getParent();
641 }
642
643 if (!fs.exists(dir)) {
644 fs.mkdirs(dir);
645 }
646
647 if (filePath == null) {
648 filePath = getUniqueFile(fs, dir);
649 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
650 bloomType = BloomType.NONE;
651 }
652 }
653
654 if (compressAlgo == null) {
655 compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
656 }
657 if (comparator == null) {
658 comparator = KeyValue.COMPARATOR;
659 }
660 return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
661 conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
662 bytesPerChecksum, includeMVCCReadpoint);
663 }
664 }
665
666
667
668
669
670
671 public static Path getUniqueFile(final FileSystem fs, final Path dir)
672 throws IOException {
673 if (!fs.getFileStatus(dir).isDir()) {
674 throw new IOException("Expecting " + dir.toString() +
675 " to be a directory");
676 }
677 return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
678 }
679
680 public Long getMinimumTimestamp() {
681 return (getReader().timeRangeTracker == null) ?
682 null :
683 getReader().timeRangeTracker.minimumTimestamp;
684 }
685
686
687
688
689
690
691 byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
692 if (this.reader == null) {
693 LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
694 return null;
695 }
696
697
698
699 byte [] midkey = this.reader.midkey();
700 if (midkey != null) {
701 KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
702 byte [] fk = this.reader.getFirstKey();
703 KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
704 byte [] lk = this.reader.getLastKey();
705 KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
706
707 if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
708 if (LOG.isDebugEnabled()) {
709 LOG.debug("cannot split because midkey is the same as first or last row");
710 }
711 return null;
712 }
713 return mk.getRow();
714 }
715 return null;
716 }
717
718
719
720
721
722 public static class Writer implements Compactor.CellSink {
723 private final BloomFilterWriter generalBloomFilterWriter;
724 private final BloomFilterWriter deleteFamilyBloomFilterWriter;
725 private final BloomType bloomType;
726 private byte[] lastBloomKey;
727 private int lastBloomKeyOffset, lastBloomKeyLen;
728 private KVComparator kvComparator;
729 private KeyValue lastKv = null;
730 private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
731 private KeyValue lastDeleteFamilyKV = null;
732 private long deleteFamilyCnt = 0;
733
734 protected HFileDataBlockEncoder dataBlockEncoder;
735
736
737 protected ChecksumType checksumType;
738
739
740 protected int bytesPerChecksum;
741
742 TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
743
744
745
746
747
748
749 boolean isTimeRangeTrackerSet = false;
750
751 protected HFile.Writer writer;
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769 private Writer(FileSystem fs, Path path, int blocksize,
770 Compression.Algorithm compress,
771 HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
772 CacheConfig cacheConf,
773 final KVComparator comparator, BloomType bloomType, long maxKeys,
774 final ChecksumType checksumType, final int bytesPerChecksum,
775 final boolean includeMVCCReadpoint) throws IOException {
776 this.dataBlockEncoder = dataBlockEncoder != null ?
777 dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
778 writer = HFile.getWriterFactory(conf, cacheConf)
779 .withPath(fs, path)
780 .withBlockSize(blocksize)
781 .withCompression(compress)
782 .withDataBlockEncoder(this.dataBlockEncoder)
783 .withComparator(comparator.getRawComparator())
784 .withChecksumType(checksumType)
785 .withBytesPerChecksum(bytesPerChecksum)
786 .includeMVCCReadpoint(includeMVCCReadpoint)
787 .create();
788
789 this.kvComparator = comparator;
790
791 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
792 conf, cacheConf, bloomType,
793 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
794
795 if (generalBloomFilterWriter != null) {
796 this.bloomType = bloomType;
797 LOG.info("Bloom filter type for " + path + ": " + this.bloomType + ", "
798 + generalBloomFilterWriter.getClass().getSimpleName());
799 } else {
800
801 this.bloomType = BloomType.NONE;
802 }
803
804
805
806 if (this.bloomType != BloomType.ROWCOL) {
807 this.deleteFamilyBloomFilterWriter = BloomFilterFactory
808 .createDeleteBloomAtWrite(conf, cacheConf,
809 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
810 } else {
811 deleteFamilyBloomFilterWriter = null;
812 }
813 if (deleteFamilyBloomFilterWriter != null) {
814 LOG.info("Delete Family Bloom filter type for " + path + ": "
815 + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
816 }
817 this.checksumType = checksumType;
818 this.bytesPerChecksum = bytesPerChecksum;
819 }
820
821
822
823
824
825
826
827
828 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
829 throws IOException {
830 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
831 writer.appendFileInfo(MAJOR_COMPACTION_KEY,
832 Bytes.toBytes(majorCompaction));
833 appendTrackedTimestampsToMetadata();
834 }
835
836
837
838
839 public void appendTrackedTimestampsToMetadata() throws IOException {
840 appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
841 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
842 }
843
844
845
846
847
848 public void setTimeRangeTracker(final TimeRangeTracker trt) {
849 this.timeRangeTracker = trt;
850 isTimeRangeTrackerSet = true;
851 }
852
853
854
855
856
857
858
859
860 public void trackTimestamps(final KeyValue kv) {
861 if (KeyValue.Type.Put.getCode() == kv.getType()) {
862 earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
863 }
864 if (!isTimeRangeTrackerSet) {
865 timeRangeTracker.includeTimestamp(kv);
866 }
867 }
868
869 private void appendGeneralBloomfilter(final KeyValue kv) throws IOException {
870 if (this.generalBloomFilterWriter != null) {
871
872 boolean newKey = true;
873 if (this.lastKv != null) {
874 switch(bloomType) {
875 case ROW:
876 newKey = ! kvComparator.matchingRows(kv, lastKv);
877 break;
878 case ROWCOL:
879 newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
880 break;
881 case NONE:
882 newKey = false;
883 break;
884 default:
885 throw new IOException("Invalid Bloom filter type: " + bloomType +
886 " (ROW or ROWCOL expected)");
887 }
888 }
889 if (newKey) {
890
891
892
893
894
895
896
897
898 byte[] bloomKey;
899 int bloomKeyOffset, bloomKeyLen;
900
901 switch (bloomType) {
902 case ROW:
903 bloomKey = kv.getBuffer();
904 bloomKeyOffset = kv.getRowOffset();
905 bloomKeyLen = kv.getRowLength();
906 break;
907 case ROWCOL:
908
909
910
911 bloomKey = generalBloomFilterWriter.createBloomKey(kv.getBuffer(),
912 kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
913 kv.getQualifierOffset(), kv.getQualifierLength());
914 bloomKeyOffset = 0;
915 bloomKeyLen = bloomKey.length;
916 break;
917 default:
918 throw new IOException("Invalid Bloom filter type: " + bloomType +
919 " (ROW or ROWCOL expected)");
920 }
921 generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
922 if (lastBloomKey != null
923 && generalBloomFilterWriter.getComparator().compare(bloomKey,
924 bloomKeyOffset, bloomKeyLen, lastBloomKey,
925 lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
926 throw new IOException("Non-increasing Bloom keys: "
927 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
928 + " after "
929 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
930 lastBloomKeyLen));
931 }
932 lastBloomKey = bloomKey;
933 lastBloomKeyOffset = bloomKeyOffset;
934 lastBloomKeyLen = bloomKeyLen;
935 this.lastKv = kv;
936 }
937 }
938 }
939
940 private void appendDeleteFamilyBloomFilter(final KeyValue kv)
941 throws IOException {
942 if (!kv.isDeleteFamily()) {
943 return;
944 }
945
946
947 deleteFamilyCnt++;
948 if (null != this.deleteFamilyBloomFilterWriter) {
949 boolean newKey = true;
950 if (lastDeleteFamilyKV != null) {
951 newKey = !kvComparator.matchingRows(kv, lastDeleteFamilyKV);
952 }
953 if (newKey) {
954 this.deleteFamilyBloomFilterWriter.add(kv.getBuffer(),
955 kv.getRowOffset(), kv.getRowLength());
956 this.lastDeleteFamilyKV = kv;
957 }
958 }
959 }
960
961 public void append(final KeyValue kv) throws IOException {
962 appendGeneralBloomfilter(kv);
963 appendDeleteFamilyBloomFilter(kv);
964 writer.append(kv);
965 trackTimestamps(kv);
966 }
967
968 public Path getPath() {
969 return this.writer.getPath();
970 }
971
972 boolean hasGeneralBloom() {
973 return this.generalBloomFilterWriter != null;
974 }
975
976
977
978
979
980
981 BloomFilterWriter getGeneralBloomWriter() {
982 return generalBloomFilterWriter;
983 }
984
985 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
986 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
987 if (haveBloom) {
988 bfw.compactBloom();
989 }
990 return haveBloom;
991 }
992
993 private boolean closeGeneralBloomFilter() throws IOException {
994 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
995
996
997 if (hasGeneralBloom) {
998 writer.addGeneralBloomFilter(generalBloomFilterWriter);
999 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1000 Bytes.toBytes(bloomType.toString()));
1001 if (lastBloomKey != null) {
1002 writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1003 lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1004 + lastBloomKeyLen));
1005 }
1006 }
1007 return hasGeneralBloom;
1008 }
1009
1010 private boolean closeDeleteFamilyBloomFilter() throws IOException {
1011 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1012
1013
1014 if (hasDeleteFamilyBloom) {
1015 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1016 }
1017
1018
1019
1020 writer.appendFileInfo(DELETE_FAMILY_COUNT,
1021 Bytes.toBytes(this.deleteFamilyCnt));
1022
1023 return hasDeleteFamilyBloom;
1024 }
1025
1026 public void close() throws IOException {
1027 boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1028 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1029
1030 writer.close();
1031
1032
1033
1034 StoreFile.LOG.info((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
1035 + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily"
1036 + " was added to HFile (" + getPath() + ") ");
1037
1038 }
1039
1040 public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1041 writer.appendFileInfo(key, value);
1042 }
1043
1044
1045
1046 HFile.Writer getHFileWriter() {
1047 return writer;
1048 }
1049 }
1050
1051
1052
1053
1054 public static class Reader {
1055 static final Log LOG = LogFactory.getLog(Reader.class.getName());
1056
1057 protected BloomFilter generalBloomFilter = null;
1058 protected BloomFilter deleteFamilyBloomFilter = null;
1059 protected BloomType bloomFilterType;
1060 private final HFile.Reader reader;
1061 protected TimeRangeTracker timeRangeTracker = null;
1062 protected long sequenceID = -1;
1063 private byte[] lastBloomKey;
1064 private long deleteFamilyCnt = -1;
1065
1066 public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
1067 DataBlockEncoding preferredEncodingInCache) throws IOException {
1068 reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
1069 preferredEncodingInCache);
1070 bloomFilterType = BloomType.NONE;
1071 }
1072
1073 public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1074 CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache) throws IOException {
1075 reader = HFile.createReaderWithEncoding(
1076 fs, path, in, size, cacheConf, preferredEncodingInCache);
1077 bloomFilterType = BloomType.NONE;
1078 }
1079
1080
1081
1082
1083 Reader() {
1084 this.reader = null;
1085 }
1086
1087 public RawComparator<byte []> getComparator() {
1088 return reader.getComparator();
1089 }
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099 public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1100 boolean pread) {
1101 return getStoreFileScanner(cacheBlocks, pread, false);
1102 }
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112 public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1113 boolean pread,
1114 boolean isCompaction) {
1115 return new StoreFileScanner(this,
1116 getScanner(cacheBlocks, pread,
1117 isCompaction), !isCompaction);
1118 }
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129 @Deprecated
1130 public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1131 return getScanner(cacheBlocks, pread, false);
1132 }
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147 @Deprecated
1148 public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1149 boolean isCompaction) {
1150 return reader.getScanner(cacheBlocks, pread, isCompaction);
1151 }
1152
1153 public void close(boolean evictOnClose) throws IOException {
1154 reader.close(evictOnClose);
1155 }
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165 boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1166 if (timeRangeTracker == null) {
1167 return true;
1168 } else {
1169 return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1170 timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1171 }
1172 }
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190 boolean passesBloomFilter(Scan scan,
1191 final SortedSet<byte[]> columns) {
1192
1193
1194 if (!scan.isGetScan()) {
1195 return true;
1196 }
1197
1198 byte[] row = scan.getStartRow();
1199 switch (this.bloomFilterType) {
1200 case ROW:
1201 return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1202
1203 case ROWCOL:
1204 if (columns != null && columns.size() == 1) {
1205 byte[] column = columns.first();
1206 return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1207 column.length);
1208 }
1209
1210
1211
1212 return true;
1213
1214 default:
1215 return true;
1216 }
1217 }
1218
1219 public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1220 int rowLen) {
1221
1222
1223 BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1224
1225
1226 if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1227 return false;
1228 }
1229
1230 if (bloomFilter == null) {
1231 return true;
1232 }
1233
1234 try {
1235 if (!bloomFilter.supportsAutoLoading()) {
1236 return true;
1237 }
1238 return bloomFilter.contains(row, rowOffset, rowLen, null);
1239 } catch (IllegalArgumentException e) {
1240 LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1241 e);
1242 setDeleteFamilyBloomFilterFaulty();
1243 }
1244
1245 return true;
1246 }
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260 public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1261 int rowLen, byte[] col, int colOffset, int colLen) {
1262
1263
1264 BloomFilter bloomFilter = this.generalBloomFilter;
1265 if (bloomFilter == null) {
1266 return true;
1267 }
1268
1269 byte[] key;
1270 switch (bloomFilterType) {
1271 case ROW:
1272 if (col != null) {
1273 throw new RuntimeException("Row-only Bloom filter called with " +
1274 "column specified");
1275 }
1276 if (rowOffset != 0 || rowLen != row.length) {
1277 throw new AssertionError("For row-only Bloom filters the row "
1278 + "must occupy the whole array");
1279 }
1280 key = row;
1281 break;
1282
1283 case ROWCOL:
1284 key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1285 colOffset, colLen);
1286 break;
1287
1288 default:
1289 return true;
1290 }
1291
1292
1293 if (reader.getTrailer().getEntryCount() == 0)
1294 return false;
1295
1296 try {
1297 boolean shouldCheckBloom;
1298 ByteBuffer bloom;
1299 if (bloomFilter.supportsAutoLoading()) {
1300 bloom = null;
1301 shouldCheckBloom = true;
1302 } else {
1303 bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1304 true);
1305 shouldCheckBloom = bloom != null;
1306 }
1307
1308 if (shouldCheckBloom) {
1309 boolean exists;
1310
1311
1312
1313
1314 boolean keyIsAfterLast = lastBloomKey != null
1315 && bloomFilter.getComparator().compare(key, lastBloomKey) > 0;
1316
1317 if (bloomFilterType == BloomType.ROWCOL) {
1318
1319
1320
1321
1322 byte[] rowBloomKey = bloomFilter.createBloomKey(row, 0, row.length,
1323 null, 0, 0);
1324
1325 if (keyIsAfterLast
1326 && bloomFilter.getComparator().compare(rowBloomKey,
1327 lastBloomKey) > 0) {
1328 exists = false;
1329 } else {
1330 exists =
1331 bloomFilter.contains(key, 0, key.length, bloom) ||
1332 bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1333 bloom);
1334 }
1335 } else {
1336 exists = !keyIsAfterLast
1337 && bloomFilter.contains(key, 0, key.length, bloom);
1338 }
1339
1340 return exists;
1341 }
1342 } catch (IOException e) {
1343 LOG.error("Error reading bloom filter data -- proceeding without",
1344 e);
1345 setGeneralBloomFilterFaulty();
1346 } catch (IllegalArgumentException e) {
1347 LOG.error("Bad bloom filter data -- proceeding without", e);
1348 setGeneralBloomFilterFaulty();
1349 }
1350
1351 return true;
1352 }
1353
1354
1355
1356
1357
1358
1359 public boolean passesKeyRangeFilter(Scan scan) {
1360 if (this.getFirstKey() == null || this.getLastKey() == null) {
1361
1362 return false;
1363 }
1364 if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1365 && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1366 return true;
1367 }
1368 KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow());
1369 KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow());
1370 boolean nonOverLapping = (getComparator().compare(this.getFirstKey(),
1371 stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1372 || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0;
1373 return !nonOverLapping;
1374 }
1375
1376 public Map<byte[], byte[]> loadFileInfo() throws IOException {
1377 Map<byte [], byte []> fi = reader.loadFileInfo();
1378
1379 byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1380 if (b != null) {
1381 bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1382 }
1383
1384 lastBloomKey = fi.get(LAST_BLOOM_KEY);
1385 byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1386 if (cnt != null) {
1387 deleteFamilyCnt = Bytes.toLong(cnt);
1388 }
1389
1390 return fi;
1391 }
1392
1393 public void loadBloomfilter() {
1394 this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1395 this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1396 }
1397
1398 private void loadBloomfilter(BlockType blockType) {
1399 try {
1400 if (blockType == BlockType.GENERAL_BLOOM_META) {
1401 if (this.generalBloomFilter != null)
1402 return;
1403
1404 DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1405 if (bloomMeta != null) {
1406
1407 if (bloomFilterType == BloomType.NONE) {
1408 throw new IOException(
1409 "valid bloom filter type not found in FileInfo");
1410 } else {
1411 generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1412 reader);
1413 LOG.info("Loaded " + bloomFilterType.toString() + " ("
1414 + generalBloomFilter.getClass().getSimpleName()
1415 + ") metadata for " + reader.getName());
1416 }
1417 }
1418 } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1419 if (this.deleteFamilyBloomFilter != null)
1420 return;
1421
1422 DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1423 if (bloomMeta != null) {
1424 deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1425 bloomMeta, reader);
1426 LOG.info("Loaded Delete Family Bloom ("
1427 + deleteFamilyBloomFilter.getClass().getSimpleName()
1428 + ") metadata for " + reader.getName());
1429 }
1430 } else {
1431 throw new RuntimeException("Block Type: " + blockType.toString()
1432 + "is not supported for Bloom filter");
1433 }
1434 } catch (IOException e) {
1435 LOG.error("Error reading bloom filter meta for " + blockType
1436 + " -- proceeding without", e);
1437 setBloomFilterFaulty(blockType);
1438 } catch (IllegalArgumentException e) {
1439 LOG.error("Bad bloom filter meta " + blockType
1440 + " -- proceeding without", e);
1441 setBloomFilterFaulty(blockType);
1442 }
1443 }
1444
1445 private void setBloomFilterFaulty(BlockType blockType) {
1446 if (blockType == BlockType.GENERAL_BLOOM_META) {
1447 setGeneralBloomFilterFaulty();
1448 } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1449 setDeleteFamilyBloomFilterFaulty();
1450 }
1451 }
1452
1453
1454
1455
1456
1457
1458
1459
1460 public long getFilterEntries() {
1461 return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1462 : reader.getEntries();
1463 }
1464
1465 public void setGeneralBloomFilterFaulty() {
1466 generalBloomFilter = null;
1467 }
1468
1469 public void setDeleteFamilyBloomFilterFaulty() {
1470 this.deleteFamilyBloomFilter = null;
1471 }
1472
1473 public byte[] getLastKey() {
1474 return reader.getLastKey();
1475 }
1476
1477 public byte[] midkey() throws IOException {
1478 return reader.midkey();
1479 }
1480
1481 public long length() {
1482 return reader.length();
1483 }
1484
1485 public long getTotalUncompressedBytes() {
1486 return reader.getTrailer().getTotalUncompressedBytes();
1487 }
1488
1489 public long getEntries() {
1490 return reader.getEntries();
1491 }
1492
1493 public long getDeleteFamilyCnt() {
1494 return deleteFamilyCnt;
1495 }
1496
1497 public byte[] getFirstKey() {
1498 return reader.getFirstKey();
1499 }
1500
1501 public long indexSize() {
1502 return reader.indexSize();
1503 }
1504
1505 public BloomType getBloomFilterType() {
1506 return this.bloomFilterType;
1507 }
1508
1509 public long getSequenceID() {
1510 return sequenceID;
1511 }
1512
1513 public void setSequenceID(long sequenceID) {
1514 this.sequenceID = sequenceID;
1515 }
1516
1517 BloomFilter getGeneralBloomFilter() {
1518 return generalBloomFilter;
1519 }
1520
1521 long getUncompressedDataIndexSize() {
1522 return reader.getTrailer().getUncompressedDataIndexSize();
1523 }
1524
1525 public long getTotalBloomSize() {
1526 if (generalBloomFilter == null)
1527 return 0;
1528 return generalBloomFilter.getByteSize();
1529 }
1530
1531 public int getHFileVersion() {
1532 return reader.getTrailer().getMajorVersion();
1533 }
1534
1535 public int getHFileMinorVersion() {
1536 return reader.getTrailer().getMinorVersion();
1537 }
1538
1539 public HFile.Reader getHFileReader() {
1540 return reader;
1541 }
1542
1543 void disableBloomFilterForTesting() {
1544 generalBloomFilter = null;
1545 this.deleteFamilyBloomFilter = null;
1546 }
1547
1548 public long getMaxTimestamp() {
1549 return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.maximumTimestamp;
1550 }
1551 }
1552
1553
1554
1555
1556 public abstract static class Comparators {
1557
1558
1559
1560
1561
1562
1563
1564
1565 public static final Comparator<StoreFile> SEQ_ID =
1566 Ordering.compound(ImmutableList.of(
1567 Ordering.natural().onResultOf(new GetSeqId()),
1568 Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1569 Ordering.natural().onResultOf(new GetBulkTime()),
1570 Ordering.natural().onResultOf(new GetPathName())
1571 ));
1572
1573 private static class GetSeqId implements Function<StoreFile, Long> {
1574 @Override
1575 public Long apply(StoreFile sf) {
1576 return sf.getMaxSequenceId();
1577 }
1578 }
1579
1580 private static class GetFileSize implements Function<StoreFile, Long> {
1581 @Override
1582 public Long apply(StoreFile sf) {
1583 return sf.getReader().length();
1584 }
1585 }
1586
1587 private static class GetBulkTime implements Function<StoreFile, Long> {
1588 @Override
1589 public Long apply(StoreFile sf) {
1590 if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1591 return sf.getBulkLoadTimestamp();
1592 }
1593 }
1594
1595 private static class GetPathName implements Function<StoreFile, String> {
1596 @Override
1597 public String apply(StoreFile sf) {
1598 return sf.getPath().getName();
1599 }
1600 }
1601 }
1602 }