001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Optional;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.io.hfile.BlockType;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.ReaderContext;
043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
044import org.apache.hadoop.hbase.util.BloomFilterFactory;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * A Store data file.  Stores usually have one or more of these files.  They
056 * are produced by flushing the memstore to disk.  To
057 * create, instantiate a writer using {@link StoreFileWriter.Builder}
058 * and append data. Be sure to add any metadata before calling close on the
059 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
060 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
061 * passing filesystem and path.  To read, call {@link #initReader()}
062 * <p>StoreFiles may also reference store files in another Store.
063 *
064 * The reason for this weird pattern where you use a different instance for the
065 * writer and a reader is that we write once but read a lot more.
066 */
067@InterfaceAudience.Private
068public class HStoreFile implements StoreFile {
069
070  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
071
072  // Keys for fileinfo values in HFile
073
074  /** Max Sequence ID in FileInfo */
075  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
076
077  /** Major compaction flag in FileInfo */
078  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
079
080  /** Minor compaction flag in FileInfo */
081  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
082      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
083
084  /**
085   * Key for compaction event which contains the compacted storefiles in FileInfo
086   */
087  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
088
089  /** Bloom filter Type in FileInfo */
090  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
091
092  /** Bloom filter param in FileInfo */
093  public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
094
095  /** Delete Family Count in FileInfo */
096  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
097
098  /** Last Bloom filter key in FileInfo */
099  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
100
101  /** Key for Timerange information in metadata */
102  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
103
104  /** Key for timestamp of earliest-put in metadata */
105  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
106
107  /** Key for the number of mob cells in metadata */
108  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
109
110  /** Null data */
111  public static final byte[] NULL_VALUE = new byte[] {0};
112
113  /** Key for the list of MOB file references */
114  public static final byte[] MOB_FILE_REFS = Bytes.toBytes("MOB_FILE_REFS");
115
116  /** Meta key set when store file is a result of a bulk load */
117  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
118  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
119
120  /**
121   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
122   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
123   */
124  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
125
126  private final StoreFileInfo fileInfo;
127
128  // StoreFile.Reader
129  private volatile StoreFileReader initialReader;
130
131  // Block cache configuration and reference.
132  private final CacheConfig cacheConf;
133
134  // Indicates if the file got compacted
135  private volatile boolean compactedAway = false;
136
137  // Keys for metadata stored in backing HFile.
138  // Set when we obtain a Reader.
139  private long sequenceid = -1;
140
141  // max of the MemstoreTS in the KV's in this store
142  // Set when we obtain a Reader.
143  private long maxMemstoreTS = -1;
144
145  // firstKey, lastkey and cellComparator will be set when openReader.
146  private Optional<Cell> firstKey;
147
148  private Optional<Cell> lastKey;
149
150  private CellComparator comparator;
151
152  public CacheConfig getCacheConf() {
153    return this.cacheConf;
154  }
155
156  @Override
157  public Optional<Cell> getFirstKey() {
158    return firstKey;
159  }
160
161  @Override
162  public Optional<Cell> getLastKey() {
163    return lastKey;
164  }
165
166  @Override
167  public CellComparator getComparator() {
168    return comparator;
169  }
170
171  @Override
172  public long getMaxMemStoreTS() {
173    return maxMemstoreTS;
174  }
175
176  // If true, this file was product of a major compaction.  Its then set
177  // whenever you get a Reader.
178  private AtomicBoolean majorCompaction = null;
179
180  // If true, this file should not be included in minor compactions.
181  // It's set whenever you get a Reader.
182  private boolean excludeFromMinorCompaction = false;
183
184  // This file was product of these compacted store files
185  private final Set<String> compactedStoreFiles = new HashSet<>();
186
187  /**
188   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
189   * after which it is not modified again.
190   */
191  private Map<byte[], byte[]> metadataMap;
192
193  /**
194   * Bloom filter type specified in column family configuration. Does not
195   * necessarily correspond to the Bloom filter type present in the HFile.
196   */
197  private final BloomType cfBloomType;
198
199  /**
200   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
201   * depending on the underlying files (10-20MB?).
202   * @param fs The current file system to use.
203   * @param p The path of the file.
204   * @param conf The current configuration.
205   * @param cacheConf The cache configuration and block cache reference.
206   * @param cfBloomType The bloom type to use for this store file as specified by column family
207   *          configuration. This may or may not be the same as the Bloom filter type actually
208   *          present in the HFile, because column family configuration might change. If this is
209   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
210   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
211   * @throws IOException
212   */
213  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
214      BloomType cfBloomType, boolean primaryReplica) throws IOException {
215    this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
216  }
217
218  /**
219   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
220   * depending on the underlying files (10-20MB?).
221   * @param fileInfo The store file information.
222   * @param cfBloomType The bloom type to use for this store file as specified by column
223   *          family configuration. This may or may not be the same as the Bloom filter type
224   *          actually present in the HFile, because column family configuration might change. If
225   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
226   * @param cacheConf The cache configuration and block cache reference.
227   */
228  public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
229    this.fileInfo = fileInfo;
230    this.cacheConf = cacheConf;
231    if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
232      this.cfBloomType = cfBloomType;
233    } else {
234      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
235          cfBloomType + " (disabled in config)");
236      this.cfBloomType = BloomType.NONE;
237    }
238  }
239
240  /**
241   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
242   *         reference.
243   */
244  public StoreFileInfo getFileInfo() {
245    return this.fileInfo;
246  }
247
248  @Override
249  public Path getPath() {
250    return this.fileInfo.getPath();
251  }
252
253  @Override
254  public Path getEncodedPath() {
255    try {
256      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
257    } catch (UnsupportedEncodingException ex) {
258      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
259    }
260  }
261
262  @Override
263  public Path getQualifiedPath() {
264    FileSystem fs = fileInfo.getFileSystem();
265    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
266  }
267
268  @Override
269  public boolean isReference() {
270    return this.fileInfo.isReference();
271  }
272
273  @Override
274  public boolean isHFile() {
275    return StoreFileInfo.isHFile(this.fileInfo.getPath());
276  }
277
278  @Override
279  public boolean isMajorCompactionResult() {
280    Preconditions.checkState(this.majorCompaction != null, "Major compation has not been set yet");
281    return this.majorCompaction.get();
282  }
283
284  @Override
285  public boolean excludeFromMinorCompaction() {
286    return this.excludeFromMinorCompaction;
287  }
288
289  @Override
290  public long getMaxSequenceId() {
291    return this.sequenceid;
292  }
293
294  @Override
295  public long getModificationTimestamp() throws IOException {
296    return fileInfo.getModificationTime();
297  }
298
299  /**
300   * @param key to look up
301   * @return value associated with the metadata key
302   */
303  public byte[] getMetadataValue(byte[] key) {
304    return metadataMap.get(key);
305  }
306
307  @Override
308  public boolean isBulkLoadResult() {
309    boolean bulkLoadedHFile = false;
310    String fileName = this.getPath().getName();
311    int startPos = fileName.indexOf("SeqId_");
312    if (startPos != -1) {
313      bulkLoadedHFile = true;
314    }
315    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
316  }
317
318  public boolean isCompactedAway() {
319    return compactedAway;
320  }
321
322  @VisibleForTesting
323  public int getRefCount() {
324    return fileInfo.refCount.get();
325  }
326
327  /**
328   * @return true if the file is still used in reads
329   */
330  public boolean isReferencedInReads() {
331    int rc = fileInfo.refCount.get();
332    assert rc >= 0; // we should not go negative.
333    return rc > 0;
334  }
335
336  @Override
337  public OptionalLong getBulkLoadTimestamp() {
338    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
339    return bulkLoadTimestamp == null ? OptionalLong.empty()
340        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
341  }
342
343  /**
344   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
345   *         file is opened.
346   */
347  public HDFSBlocksDistribution getHDFSBlockDistribution() {
348    return this.fileInfo.getHDFSBlockDistribution();
349  }
350
351  /**
352   * Opens reader on this store file. Called by Constructor.
353   * @see #closeStoreFile(boolean)
354   */
355  private void open() throws IOException {
356    fileInfo.initHDFSBlocksDistribution();
357    long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
358    ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
359    fileInfo.initHFileInfo(context);
360    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
361    if (reader == null) {
362      reader = fileInfo.createReader(context, cacheConf);
363      fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
364    }
365    this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
366    // Load up indices and fileinfo. This also loads Bloom filter type.
367    metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
368
369    // Read in our metadata.
370    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
371    if (b != null) {
372      // By convention, if halfhfile, top half has a sequence number > bottom
373      // half. Thats why we add one in below. Its done for case the two halves
374      // are ever merged back together --rare.  Without it, on open of store,
375      // since store files are distinguished by sequence id, the one half would
376      // subsume the other.
377      this.sequenceid = Bytes.toLong(b);
378      if (fileInfo.isTopReference()) {
379        this.sequenceid += 1;
380      }
381    }
382
383    if (isBulkLoadResult()){
384      // generate the sequenceId from the fileName
385      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
386      String fileName = this.getPath().getName();
387      // Use lastIndexOf() to get the last, most recent bulk load seqId.
388      int startPos = fileName.lastIndexOf("SeqId_");
389      if (startPos != -1) {
390        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
391            fileName.indexOf('_', startPos + 6)));
392        // Handle reference files as done above.
393        if (fileInfo.isTopReference()) {
394          this.sequenceid += 1;
395        }
396      }
397      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
398      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
399      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
400      // to reset new seqIds for them since this might make a mess of the visibility of cells that
401      // have the same row key but different seqIds.
402      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
403      if (skipResetSeqId) {
404        // increase the seqId when it is a bulk loaded file from mob compaction.
405        this.sequenceid += 1;
406      }
407      initialReader.setSkipResetSeqId(skipResetSeqId);
408      initialReader.setBulkLoaded(true);
409    }
410    initialReader.setSequenceID(this.sequenceid);
411
412    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
413    if (b != null) {
414      this.maxMemstoreTS = Bytes.toLong(b);
415    }
416
417    b = metadataMap.get(MAJOR_COMPACTION_KEY);
418    if (b != null) {
419      boolean mc = Bytes.toBoolean(b);
420      if (this.majorCompaction == null) {
421        this.majorCompaction = new AtomicBoolean(mc);
422      } else {
423        this.majorCompaction.set(mc);
424      }
425    } else {
426      // Presume it is not major compacted if it doesn't explicity say so
427      // HFileOutputFormat explicitly sets the major compacted key.
428      this.majorCompaction = new AtomicBoolean(false);
429    }
430
431    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
432    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
433
434    BloomType hfileBloomType = initialReader.getBloomFilterType();
435    if (cfBloomType != BloomType.NONE) {
436      initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
437      if (hfileBloomType != cfBloomType) {
438        LOG.debug("HFile Bloom filter type for "
439            + initialReader.getHFileReader().getName() + ": " + hfileBloomType
440            + ", but " + cfBloomType + " specified in column family "
441            + "configuration");
442      }
443    } else if (hfileBloomType != BloomType.NONE) {
444      LOG.info("Bloom filter turned off by CF config for "
445          + initialReader.getHFileReader().getName());
446    }
447
448    // load delete family bloom filter
449    initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
450
451    try {
452      byte[] data = metadataMap.get(TIMERANGE_KEY);
453      initialReader.timeRange = data == null ? null :
454          TimeRangeTracker.parseFrom(data).toTimeRange();
455    } catch (IllegalArgumentException e) {
456      LOG.error("Error reading timestamp range data from meta -- " +
457          "proceeding without", e);
458      this.initialReader.timeRange = null;
459    }
460
461    try {
462      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
463      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
464    } catch (IOException e) {
465      LOG.error("Error reading compacted storefiles from meta data", e);
466    }
467
468    // initialize so we can reuse them after reader closed.
469    firstKey = initialReader.getFirstKey();
470    lastKey = initialReader.getLastKey();
471    comparator = initialReader.getComparator();
472  }
473
474  /**
475   * Initialize the reader used for pread.
476   */
477  public void initReader() throws IOException {
478    if (initialReader == null) {
479      synchronized (this) {
480        if (initialReader == null) {
481          try {
482            open();
483          } catch (Exception e) {
484            try {
485              boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
486              this.closeStoreFile(evictOnClose);
487            } catch (IOException ee) {
488              LOG.warn("failed to close reader", ee);
489            }
490            throw e;
491          }
492        }
493      }
494    }
495  }
496
497  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
498    initReader();
499    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
500    ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
501    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
502    if (reader == null) {
503      reader = fileInfo.createReader(context, cacheConf);
504      // steam reader need copy stuffs from pread reader
505      reader.copyFields(initialReader);
506    }
507    return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
508  }
509
510  /**
511   * Get a scanner which uses pread.
512   * <p>
513   * Must be called after initReader.
514   */
515  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
516      boolean canOptimizeForNonNullColumn) {
517    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
518      canOptimizeForNonNullColumn);
519  }
520
521  /**
522   * Get a scanner which uses streaming read.
523   * <p>
524   * Must be called after initReader.
525   */
526  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
527      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
528      throws IOException {
529    return createStreamReader(canUseDropBehind)
530        .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
531            canOptimizeForNonNullColumn);
532  }
533
534  /**
535   * @return Current reader. Must call initReader first else returns null.
536   * @see #initReader()
537   */
538  public StoreFileReader getReader() {
539    return this.initialReader;
540  }
541
542  /**
543   * @param evictOnClose whether to evict blocks belonging to this file
544   * @throws IOException
545   */
546  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
547    if (this.initialReader != null) {
548      this.initialReader.close(evictOnClose);
549      this.initialReader = null;
550    }
551  }
552
553  /**
554   * Delete this file
555   * @throws IOException
556   */
557  public void deleteStoreFile() throws IOException {
558    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
559    closeStoreFile(evictOnClose);
560    this.fileInfo.getFileSystem().delete(getPath(), true);
561  }
562
563  public void markCompactedAway() {
564    this.compactedAway = true;
565  }
566
567  @Override
568  public String toString() {
569    return this.fileInfo.toString();
570  }
571
572  @Override
573  public String toStringDetailed() {
574    StringBuilder sb = new StringBuilder();
575    sb.append(this.getPath().toString());
576    sb.append(", isReference=").append(isReference());
577    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
578    if (isBulkLoadResult()) {
579      sb.append(", bulkLoadTS=");
580      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
581      if (bulkLoadTS.isPresent()) {
582        sb.append(bulkLoadTS.getAsLong());
583      } else {
584        sb.append("NotPresent");
585      }
586    } else {
587      sb.append(", seqid=").append(getMaxSequenceId());
588    }
589    sb.append(", majorCompaction=").append(isMajorCompactionResult());
590
591    return sb.toString();
592  }
593
594  /**
595   * Gets whether to skip resetting the sequence id for cells.
596   * @param skipResetSeqId The byte array of boolean.
597   * @return Whether to skip resetting the sequence id.
598   */
599  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
600    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
601      return Bytes.toBoolean(skipResetSeqId);
602    }
603    return false;
604  }
605
606  @Override
607  public OptionalLong getMinimumTimestamp() {
608    TimeRange tr = getReader().timeRange;
609    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
610  }
611
612  @Override
613  public OptionalLong getMaximumTimestamp() {
614    TimeRange tr = getReader().timeRange;
615    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
616  }
617
618  Set<String> getCompactedStoreFiles() {
619    return Collections.unmodifiableSet(this.compactedStoreFiles);
620  }
621}