001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Optional;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HDFSBlocksDistribution;
040import org.apache.hadoop.hbase.io.TimeRange;
041import org.apache.hadoop.hbase.io.hfile.BlockType;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.util.BloomFilterFactory;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * A Store data file.  Stores usually have one or more of these files.  They
056 * are produced by flushing the memstore to disk.  To
057 * create, instantiate a writer using {@link StoreFileWriter.Builder}
058 * and append data. Be sure to add any metadata before calling close on the
059 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
060 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
061 * passing filesystem and path.  To read, call {@link #initReader()}
062 * <p>StoreFiles may also reference store files in another Store.
063 *
064 * The reason for this weird pattern where you use a different instance for the
065 * writer and a reader is that we write once but read a lot more.
066 */
067@InterfaceAudience.Private
068public class HStoreFile implements StoreFile {
069
070  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
071
072  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
073
074  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
075
076  // Keys for fileinfo values in HFile
077
078  /** Max Sequence ID in FileInfo */
079  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
080
081  /** Major compaction flag in FileInfo */
082  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
083
084  /** Minor compaction flag in FileInfo */
085  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
086      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
087
088  /**
089   * Key for compaction event which contains the compacted storefiles in FileInfo
090   */
091  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
092
093  /** Bloom filter Type in FileInfo */
094  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
095
096  /** Bloom filter param in FileInfo */
097  public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
098
099  /** Delete Family Count in FileInfo */
100  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
101
102  /** Last Bloom filter key in FileInfo */
103  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
104
105  /** Key for Timerange information in metadata */
106  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
107
108  /** Key for timestamp of earliest-put in metadata */
109  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
110
111  /** Key for the number of mob cells in metadata */
112  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
113
114  /** Meta key set when store file is a result of a bulk load */
115  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
116  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
117
118  /**
119   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
120   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
121   */
122  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
123
124  private final StoreFileInfo fileInfo;
125  private final FileSystem fs;
126
127  // Block cache configuration and reference.
128  private final CacheConfig cacheConf;
129
130  // Counter that is incremented every time a scanner is created on the
131  // store file. It is decremented when the scan on the store file is
132  // done.
133  private final AtomicInteger refCount = new AtomicInteger(0);
134
135  private final boolean noReadahead;
136
137  private final boolean primaryReplica;
138
139  // Indicates if the file got compacted
140  private volatile boolean compactedAway = false;
141
142  // Keys for metadata stored in backing HFile.
143  // Set when we obtain a Reader.
144  private long sequenceid = -1;
145
146  // max of the MemstoreTS in the KV's in this store
147  // Set when we obtain a Reader.
148  private long maxMemstoreTS = -1;
149
150  // firstKey, lastkey and cellComparator will be set when openReader.
151  private Optional<Cell> firstKey;
152
153  private Optional<Cell> lastKey;
154
155  private CellComparator comparator;
156
157  public CacheConfig getCacheConf() {
158    return cacheConf;
159  }
160
161  @Override
162  public Optional<Cell> getFirstKey() {
163    return firstKey;
164  }
165
166  @Override
167  public Optional<Cell> getLastKey() {
168    return lastKey;
169  }
170
171  @Override
172  public CellComparator getComparator() {
173    return comparator;
174  }
175
176  @Override
177  public long getMaxMemStoreTS() {
178    return maxMemstoreTS;
179  }
180
181  // If true, this file was product of a major compaction.  Its then set
182  // whenever you get a Reader.
183  private AtomicBoolean majorCompaction = null;
184
185  // If true, this file should not be included in minor compactions.
186  // It's set whenever you get a Reader.
187  private boolean excludeFromMinorCompaction = false;
188
189  // This file was product of these compacted store files
190  private final Set<String> compactedStoreFiles = new HashSet<>();
191
192  /**
193   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
194   * after which it is not modified again.
195   */
196  private Map<byte[], byte[]> metadataMap;
197
198  // StoreFile.Reader
199  private volatile StoreFileReader reader;
200
201  /**
202   * Bloom filter type specified in column family configuration. Does not
203   * necessarily correspond to the Bloom filter type present in the HFile.
204   */
205  private final BloomType cfBloomType;
206
207  /**
208   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
209   * depending on the underlying files (10-20MB?).
210   * @param fs The current file system to use.
211   * @param p The path of the file.
212   * @param conf The current configuration.
213   * @param cacheConf The cache configuration and block cache reference.
214   * @param cfBloomType The bloom type to use for this store file as specified by column family
215   *          configuration. This may or may not be the same as the Bloom filter type actually
216   *          present in the HFile, because column family configuration might change. If this is
217   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
218   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
219   * @throws IOException
220   */
221  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
222      BloomType cfBloomType, boolean primaryReplica) throws IOException {
223    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
224  }
225
226  /**
227   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
228   * depending on the underlying files (10-20MB?).
229   * @param fs fs The current file system to use.
230   * @param fileInfo The store file information.
231   * @param conf The current configuration.
232   * @param cacheConf The cache configuration and block cache reference.
233   * @param cfBloomType The bloom type to use for this store file as specified by column
234   *          family configuration. This may or may not be the same as the Bloom filter type
235   *          actually present in the HFile, because column family configuration might change. If
236   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
237   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
238   */
239  public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
240      BloomType cfBloomType, boolean primaryReplica) {
241    this.fs = fs;
242    this.fileInfo = fileInfo;
243    this.cacheConf = cacheConf;
244    this.noReadahead =
245        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
246    if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
247      this.cfBloomType = cfBloomType;
248    } else {
249      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
250          cfBloomType + " (disabled in config)");
251      this.cfBloomType = BloomType.NONE;
252    }
253    this.primaryReplica = primaryReplica;
254  }
255
256  /**
257   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
258   *         reference.
259   */
260  public StoreFileInfo getFileInfo() {
261    return this.fileInfo;
262  }
263
264  @Override
265  public Path getPath() {
266    return this.fileInfo.getPath();
267  }
268
269  @Override
270  public Path getEncodedPath() {
271    try {
272      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
273    } catch (UnsupportedEncodingException ex) {
274      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
275    }
276  }
277
278  @Override
279  public Path getQualifiedPath() {
280    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
281  }
282
283  @Override
284  public boolean isReference() {
285    return this.fileInfo.isReference();
286  }
287
288  @Override
289  public boolean isHFile() {
290    return StoreFileInfo.isHFile(this.fileInfo.getPath());
291  }
292
293  @Override
294  public boolean isMajorCompactionResult() {
295    if (this.majorCompaction == null) {
296      throw new NullPointerException("This has not been set yet");
297    }
298    return this.majorCompaction.get();
299  }
300
301  @Override
302  public boolean excludeFromMinorCompaction() {
303    return this.excludeFromMinorCompaction;
304  }
305
306  @Override
307  public long getMaxSequenceId() {
308    return this.sequenceid;
309  }
310
311  @Override
312  public long getModificationTimeStamp() throws IOException {
313    return getModificationTimestamp();
314  }
315
316  @Override
317  public long getModificationTimestamp() throws IOException {
318    return fileInfo.getModificationTime();
319  }
320
321  /**
322   * Only used by the Striped Compaction Policy
323   * @param key
324   * @return value associated with the metadata key
325   */
326  public byte[] getMetadataValue(byte[] key) {
327    return metadataMap.get(key);
328  }
329
330  @Override
331  public boolean isBulkLoadResult() {
332    boolean bulkLoadedHFile = false;
333    String fileName = this.getPath().getName();
334    int startPos = fileName.indexOf("SeqId_");
335    if (startPos != -1) {
336      bulkLoadedHFile = true;
337    }
338    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
339  }
340
341  public boolean isCompactedAway() {
342    return compactedAway;
343  }
344
345  @VisibleForTesting
346  public int getRefCount() {
347    return refCount.get();
348  }
349
350  /**
351   * @return true if the file is still used in reads
352   */
353  public boolean isReferencedInReads() {
354    int rc = refCount.get();
355    assert rc >= 0; // we should not go negative.
356    return rc > 0;
357  }
358
359  @Override
360  public OptionalLong getBulkLoadTimestamp() {
361    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
362    return bulkLoadTimestamp == null ? OptionalLong.empty()
363        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
364  }
365
366  /**
367   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
368   *         file is opened.
369   */
370  public HDFSBlocksDistribution getHDFSBlockDistribution() {
371    return this.fileInfo.getHDFSBlockDistribution();
372  }
373
374  /**
375   * Opens reader on this store file. Called by Constructor.
376   * @see #closeStoreFile(boolean)
377   */
378  private void open() throws IOException {
379    if (this.reader != null) {
380      throw new IllegalAccessError("Already open");
381    }
382
383    // Open the StoreFile.Reader
384    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
385      primaryReplica, refCount, true);
386
387    // Load up indices and fileinfo. This also loads Bloom filter type.
388    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
389
390    // Read in our metadata.
391    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
392    if (b != null) {
393      // By convention, if halfhfile, top half has a sequence number > bottom
394      // half. Thats why we add one in below. Its done for case the two halves
395      // are ever merged back together --rare.  Without it, on open of store,
396      // since store files are distinguished by sequence id, the one half would
397      // subsume the other.
398      this.sequenceid = Bytes.toLong(b);
399      if (fileInfo.isTopReference()) {
400        this.sequenceid += 1;
401      }
402    }
403
404    if (isBulkLoadResult()){
405      // generate the sequenceId from the fileName
406      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
407      String fileName = this.getPath().getName();
408      // Use lastIndexOf() to get the last, most recent bulk load seqId.
409      int startPos = fileName.lastIndexOf("SeqId_");
410      if (startPos != -1) {
411        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
412            fileName.indexOf('_', startPos + 6)));
413        // Handle reference files as done above.
414        if (fileInfo.isTopReference()) {
415          this.sequenceid += 1;
416        }
417      }
418      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
419      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
420      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
421      // to reset new seqIds for them since this might make a mess of the visibility of cells that
422      // have the same row key but different seqIds.
423      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
424      if (skipResetSeqId) {
425        // increase the seqId when it is a bulk loaded file from mob compaction.
426        this.sequenceid += 1;
427      }
428      this.reader.setSkipResetSeqId(skipResetSeqId);
429      this.reader.setBulkLoaded(true);
430    }
431    this.reader.setSequenceID(this.sequenceid);
432
433    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
434    if (b != null) {
435      this.maxMemstoreTS = Bytes.toLong(b);
436    }
437
438    b = metadataMap.get(MAJOR_COMPACTION_KEY);
439    if (b != null) {
440      boolean mc = Bytes.toBoolean(b);
441      if (this.majorCompaction == null) {
442        this.majorCompaction = new AtomicBoolean(mc);
443      } else {
444        this.majorCompaction.set(mc);
445      }
446    } else {
447      // Presume it is not major compacted if it doesn't explicity say so
448      // HFileOutputFormat explicitly sets the major compacted key.
449      this.majorCompaction = new AtomicBoolean(false);
450    }
451
452    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
453    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
454
455    BloomType hfileBloomType = reader.getBloomFilterType();
456    if (cfBloomType != BloomType.NONE) {
457      reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
458      if (hfileBloomType != cfBloomType) {
459        LOG.info("HFile Bloom filter type for "
460            + reader.getHFileReader().getName() + ": " + hfileBloomType
461            + ", but " + cfBloomType + " specified in column family "
462            + "configuration");
463      }
464    } else if (hfileBloomType != BloomType.NONE) {
465      LOG.info("Bloom filter turned off by CF config for "
466          + reader.getHFileReader().getName());
467    }
468
469    // load delete family bloom filter
470    reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
471
472    try {
473      byte[] data = metadataMap.get(TIMERANGE_KEY);
474      this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange();
475    } catch (IllegalArgumentException e) {
476      LOG.error("Error reading timestamp range data from meta -- " +
477          "proceeding without", e);
478      this.reader.timeRange = null;
479    }
480
481    try {
482      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
483      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
484    } catch (IOException e) {
485      LOG.error("Error reading compacted storefiles from meta data", e);
486    }
487
488    // initialize so we can reuse them after reader closed.
489    firstKey = reader.getFirstKey();
490    lastKey = reader.getLastKey();
491    comparator = reader.getComparator();
492  }
493
494  /**
495   * Initialize the reader used for pread.
496   */
497  public void initReader() throws IOException {
498    if (reader == null) {
499      try {
500        open();
501      } catch (Exception e) {
502        try {
503          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
504          this.closeStoreFile(evictOnClose);
505        } catch (IOException ee) {
506          LOG.warn("failed to close reader", ee);
507        }
508        throw e;
509      }
510    }
511  }
512
513  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
514    initReader();
515    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
516      primaryReplica, refCount, false);
517    reader.copyFields(this.reader);
518    return reader;
519  }
520
521  /**
522   * Get a scanner which uses pread.
523   * <p>
524   * Must be called after initReader.
525   */
526  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
527      boolean canOptimizeForNonNullColumn) {
528    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
529      canOptimizeForNonNullColumn);
530  }
531
532  /**
533   * Get a scanner which uses streaming read.
534   * <p>
535   * Must be called after initReader.
536   */
537  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
538      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
539      throws IOException {
540    return createStreamReader(canUseDropBehind)
541        .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
542            canOptimizeForNonNullColumn);
543  }
544
545  /**
546   * @return Current reader. Must call initReader first else returns null.
547   * @see #initReader()
548   */
549  public StoreFileReader getReader() {
550    return this.reader;
551  }
552
553  /**
554   * @param evictOnClose whether to evict blocks belonging to this file
555   * @throws IOException
556   */
557  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
558    if (this.reader != null) {
559      this.reader.close(evictOnClose);
560      this.reader = null;
561    }
562  }
563
564  /**
565   * Delete this file
566   * @throws IOException
567   */
568  public void deleteStoreFile() throws IOException {
569    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
570    closeStoreFile(evictOnClose);
571    this.fs.delete(getPath(), true);
572  }
573
574  public void markCompactedAway() {
575    this.compactedAway = true;
576  }
577
578  @Override
579  public String toString() {
580    return this.fileInfo.toString();
581  }
582
583  @Override
584  public String toStringDetailed() {
585    StringBuilder sb = new StringBuilder();
586    sb.append(this.getPath().toString());
587    sb.append(", isReference=").append(isReference());
588    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
589    if (isBulkLoadResult()) {
590      sb.append(", bulkLoadTS=");
591      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
592      if (bulkLoadTS.isPresent()) {
593        sb.append(bulkLoadTS.getAsLong());
594      } else {
595        sb.append("NotPresent");
596      }
597    } else {
598      sb.append(", seqid=").append(getMaxSequenceId());
599    }
600    sb.append(", majorCompaction=").append(isMajorCompactionResult());
601
602    return sb.toString();
603  }
604
605  /**
606   * Gets whether to skip resetting the sequence id for cells.
607   * @param skipResetSeqId The byte array of boolean.
608   * @return Whether to skip resetting the sequence id.
609   */
610  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
611    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
612      return Bytes.toBoolean(skipResetSeqId);
613    }
614    return false;
615  }
616
617  @Override
618  public OptionalLong getMinimumTimestamp() {
619    TimeRange tr = getReader().timeRange;
620    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
621  }
622
623  @Override
624  public OptionalLong getMaximumTimestamp() {
625    TimeRange tr = getReader().timeRange;
626    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
627  }
628
629  Set<String> getCompactedStoreFiles() {
630    return Collections.unmodifiableSet(this.compactedStoreFiles);
631  }
632}