001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.Collections;
023import java.util.Map;
024import java.util.Optional;
025import java.util.OptionalLong;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.HDFSBlocksDistribution;
037import org.apache.hadoop.hbase.io.TimeRange;
038import org.apache.hadoop.hbase.io.hfile.BlockType;
039import org.apache.hadoop.hbase.io.hfile.CacheConfig;
040import org.apache.hadoop.hbase.io.hfile.HFile;
041import org.apache.hadoop.hbase.util.BloomFilterFactory;
042import org.apache.hadoop.hbase.util.Bytes;
043
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
050
051
052/**
053 * A Store data file.  Stores usually have one or more of these files.  They
054 * are produced by flushing the memstore to disk.  To
055 * create, instantiate a writer using {@link StoreFileWriter.Builder}
056 * and append data. Be sure to add any metadata before calling close on the
057 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
058 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
059 * passing filesystem and path.  To read, call {@link #initReader()}
060 * <p>StoreFiles may also reference store files in another Store.
061 *
062 * The reason for this weird pattern where you use a different instance for the
063 * writer and a reader is that we write once but read a lot more.
064 */
065@InterfaceAudience.Private
066public class HStoreFile implements StoreFile, StoreFileReader.Listener {
067
068  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
069
070  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
071
072  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
073
074  // Keys for fileinfo values in HFile
075
076  /** Max Sequence ID in FileInfo */
077  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
078
079  /** Major compaction flag in FileInfo */
080  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
081
082  /** Minor compaction flag in FileInfo */
083  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
084      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
085
086  /** Bloom filter Type in FileInfo */
087  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
088
089  /** Delete Family Count in FileInfo */
090  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
091
092  /** Last Bloom filter key in FileInfo */
093  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
094
095  /** Key for Timerange information in metadata */
096  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
097
098  /** Key for timestamp of earliest-put in metadata */
099  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
100
101  /** Key for the number of mob cells in metadata */
102  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
103
104  /** Meta key set when store file is a result of a bulk load */
105  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
106  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
107
108  /**
109   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
110   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
111   */
112  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
113
114  private final StoreFileInfo fileInfo;
115  private final FileSystem fs;
116
117  // Block cache configuration and reference.
118  private final CacheConfig cacheConf;
119
120  // Counter that is incremented every time a scanner is created on the
121  // store file. It is decremented when the scan on the store file is
122  // done.
123  private final AtomicInteger refCount = new AtomicInteger(0);
124
125  // Set implementation must be of concurrent type
126  @VisibleForTesting
127  final Set<StoreFileReader> streamReaders;
128
129  private final boolean noReadahead;
130
131  private final boolean primaryReplica;
132
133  // Indicates if the file got compacted
134  private volatile boolean compactedAway = false;
135
136  // Keys for metadata stored in backing HFile.
137  // Set when we obtain a Reader.
138  private long sequenceid = -1;
139
140  // max of the MemstoreTS in the KV's in this store
141  // Set when we obtain a Reader.
142  private long maxMemstoreTS = -1;
143
144  // firstKey, lastkey and cellComparator will be set when openReader.
145  private Optional<Cell> firstKey;
146
147  private Optional<Cell> lastKey;
148
149  private CellComparator comparator;
150
151  public CacheConfig getCacheConf() {
152    return cacheConf;
153  }
154
155  @Override
156  public Optional<Cell> getFirstKey() {
157    return firstKey;
158  }
159
160  @Override
161  public Optional<Cell> getLastKey() {
162    return lastKey;
163  }
164
165  @Override
166  public CellComparator getComparator() {
167    return comparator;
168  }
169
170  @Override
171  public long getMaxMemStoreTS() {
172    return maxMemstoreTS;
173  }
174
175  // If true, this file was product of a major compaction.  Its then set
176  // whenever you get a Reader.
177  private AtomicBoolean majorCompaction = null;
178
179  // If true, this file should not be included in minor compactions.
180  // It's set whenever you get a Reader.
181  private boolean excludeFromMinorCompaction = false;
182
183  /**
184   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
185   * after which it is not modified again.
186   */
187  private Map<byte[], byte[]> metadataMap;
188
189  // StoreFile.Reader
190  private volatile StoreFileReader reader;
191
192  /**
193   * Bloom filter type specified in column family configuration. Does not
194   * necessarily correspond to the Bloom filter type present in the HFile.
195   */
196  private final BloomType cfBloomType;
197
198  /**
199   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
200   * depending on the underlying files (10-20MB?).
201   * @param fs The current file system to use.
202   * @param p The path of the file.
203   * @param conf The current configuration.
204   * @param cacheConf The cache configuration and block cache reference.
205   * @param cfBloomType The bloom type to use for this store file as specified by column family
206   *          configuration. This may or may not be the same as the Bloom filter type actually
207   *          present in the HFile, because column family configuration might change. If this is
208   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
209   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
210   * @throws IOException
211   */
212  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
213      BloomType cfBloomType, boolean primaryReplica) throws IOException {
214    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
215  }
216
217  /**
218   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
219   * depending on the underlying files (10-20MB?).
220   * @param fs fs The current file system to use.
221   * @param fileInfo The store file information.
222   * @param conf The current configuration.
223   * @param cacheConf The cache configuration and block cache reference.
224   * @param cfBloomType The bloom type to use for this store file as specified by column
225   *          family configuration. This may or may not be the same as the Bloom filter type
226   *          actually present in the HFile, because column family configuration might change. If
227   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
228   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
229   */
230  public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
231      BloomType cfBloomType, boolean primaryReplica) {
232    this.streamReaders = ConcurrentHashMap.newKeySet();
233    this.fs = fs;
234    this.fileInfo = fileInfo;
235    this.cacheConf = cacheConf;
236    this.noReadahead =
237        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
238    if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
239      this.cfBloomType = cfBloomType;
240    } else {
241      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
242          cfBloomType + " (disabled in config)");
243      this.cfBloomType = BloomType.NONE;
244    }
245    this.primaryReplica = primaryReplica;
246  }
247
248  /**
249   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
250   *         reference.
251   */
252  public StoreFileInfo getFileInfo() {
253    return this.fileInfo;
254  }
255
256  @Override
257  public Path getPath() {
258    return this.fileInfo.getPath();
259  }
260
261  @Override
262  public Path getQualifiedPath() {
263    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
264  }
265
266  @Override
267  public boolean isReference() {
268    return this.fileInfo.isReference();
269  }
270
271  @Override
272  public boolean isHFile() {
273    return StoreFileInfo.isHFile(this.fileInfo.getPath());
274  }
275
276  @Override
277  public boolean isMajorCompactionResult() {
278    if (this.majorCompaction == null) {
279      throw new NullPointerException("This has not been set yet");
280    }
281    return this.majorCompaction.get();
282  }
283
284  @Override
285  public boolean excludeFromMinorCompaction() {
286    return this.excludeFromMinorCompaction;
287  }
288
289  @Override
290  public long getMaxSequenceId() {
291    return this.sequenceid;
292  }
293
294  @Override
295  public long getModificationTimeStamp() throws IOException {
296    return getModificationTimestamp();
297  }
298
299  @Override
300  public long getModificationTimestamp() throws IOException {
301    return fileInfo.getModificationTime();
302  }
303
304  /**
305   * Only used by the Striped Compaction Policy
306   * @param key
307   * @return value associated with the metadata key
308   */
309  public byte[] getMetadataValue(byte[] key) {
310    return metadataMap.get(key);
311  }
312
313  @Override
314  public boolean isBulkLoadResult() {
315    boolean bulkLoadedHFile = false;
316    String fileName = this.getPath().getName();
317    int startPos = fileName.indexOf("SeqId_");
318    if (startPos != -1) {
319      bulkLoadedHFile = true;
320    }
321    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
322  }
323
324  public boolean isCompactedAway() {
325    return compactedAway;
326  }
327
328  @VisibleForTesting
329  public int getRefCount() {
330    return refCount.get();
331  }
332
333  /**
334   * @return true if the file is still used in reads
335   */
336  public boolean isReferencedInReads() {
337    int rc = refCount.get();
338    assert rc >= 0; // we should not go negative.
339    return rc > 0;
340  }
341
342  @Override
343  public OptionalLong getBulkLoadTimestamp() {
344    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
345    return bulkLoadTimestamp == null ? OptionalLong.empty()
346        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
347  }
348
349  /**
350   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
351   *         file is opened.
352   */
353  public HDFSBlocksDistribution getHDFSBlockDistribution() {
354    return this.fileInfo.getHDFSBlockDistribution();
355  }
356
357  /**
358   * Opens reader on this store file. Called by Constructor.
359   * @throws IOException
360   * @see #closeStoreFile(boolean)
361   */
362  private void open() throws IOException {
363    if (this.reader != null) {
364      throw new IllegalAccessError("Already open");
365    }
366
367    // Open the StoreFile.Reader
368    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
369      primaryReplica, refCount, true);
370
371    // Load up indices and fileinfo. This also loads Bloom filter type.
372    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
373
374    // Read in our metadata.
375    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
376    if (b != null) {
377      // By convention, if halfhfile, top half has a sequence number > bottom
378      // half. Thats why we add one in below. Its done for case the two halves
379      // are ever merged back together --rare.  Without it, on open of store,
380      // since store files are distinguished by sequence id, the one half would
381      // subsume the other.
382      this.sequenceid = Bytes.toLong(b);
383      if (fileInfo.isTopReference()) {
384        this.sequenceid += 1;
385      }
386    }
387
388    if (isBulkLoadResult()){
389      // generate the sequenceId from the fileName
390      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
391      String fileName = this.getPath().getName();
392      // Use lastIndexOf() to get the last, most recent bulk load seqId.
393      int startPos = fileName.lastIndexOf("SeqId_");
394      if (startPos != -1) {
395        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
396            fileName.indexOf('_', startPos + 6)));
397        // Handle reference files as done above.
398        if (fileInfo.isTopReference()) {
399          this.sequenceid += 1;
400        }
401      }
402      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
403      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
404      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
405      // to reset new seqIds for them since this might make a mess of the visibility of cells that
406      // have the same row key but different seqIds.
407      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
408      if (skipResetSeqId) {
409        // increase the seqId when it is a bulk loaded file from mob compaction.
410        this.sequenceid += 1;
411      }
412      this.reader.setSkipResetSeqId(skipResetSeqId);
413      this.reader.setBulkLoaded(true);
414    }
415    this.reader.setSequenceID(this.sequenceid);
416
417    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
418    if (b != null) {
419      this.maxMemstoreTS = Bytes.toLong(b);
420    }
421
422    b = metadataMap.get(MAJOR_COMPACTION_KEY);
423    if (b != null) {
424      boolean mc = Bytes.toBoolean(b);
425      if (this.majorCompaction == null) {
426        this.majorCompaction = new AtomicBoolean(mc);
427      } else {
428        this.majorCompaction.set(mc);
429      }
430    } else {
431      // Presume it is not major compacted if it doesn't explicity say so
432      // HFileOutputFormat explicitly sets the major compacted key.
433      this.majorCompaction = new AtomicBoolean(false);
434    }
435
436    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
437    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
438
439    BloomType hfileBloomType = reader.getBloomFilterType();
440    if (cfBloomType != BloomType.NONE) {
441      reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
442      if (hfileBloomType != cfBloomType) {
443        LOG.info("HFile Bloom filter type for "
444            + reader.getHFileReader().getName() + ": " + hfileBloomType
445            + ", but " + cfBloomType + " specified in column family "
446            + "configuration");
447      }
448    } else if (hfileBloomType != BloomType.NONE) {
449      LOG.info("Bloom filter turned off by CF config for "
450          + reader.getHFileReader().getName());
451    }
452
453    // load delete family bloom filter
454    reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
455
456    try {
457      byte[] data = metadataMap.get(TIMERANGE_KEY);
458      this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange();
459    } catch (IllegalArgumentException e) {
460      LOG.error("Error reading timestamp range data from meta -- " +
461          "proceeding without", e);
462      this.reader.timeRange = null;
463    }
464    // initialize so we can reuse them after reader closed.
465    firstKey = reader.getFirstKey();
466    lastKey = reader.getLastKey();
467    comparator = reader.getComparator();
468  }
469
470  /**
471   * Initialize the reader used for pread.
472   */
473  public void initReader() throws IOException {
474    if (reader == null) {
475      try {
476        open();
477      } catch (Exception e) {
478        try {
479          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
480          this.closeStoreFile(evictOnClose);
481        } catch (IOException ee) {
482          LOG.warn("failed to close reader", ee);
483        }
484        throw e;
485      }
486    }
487  }
488
489  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
490    initReader();
491    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
492      primaryReplica, refCount, false);
493    reader.copyFields(this.reader);
494    return reader;
495  }
496
497  /**
498   * Get a scanner which uses pread.
499   * <p>
500   * Must be called after initReader.
501   */
502  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
503      boolean canOptimizeForNonNullColumn) {
504    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
505      canOptimizeForNonNullColumn);
506  }
507
508  /**
509   * Get a scanner which uses streaming read.
510   * <p>
511   * Must be called after initReader.
512   */
513  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
514      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
515      throws IOException {
516    StoreFileReader reader = createStreamReader(canUseDropBehind);
517    reader.setListener(this);
518    StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false,
519      isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
520    //Add reader once the scanner is created
521    streamReaders.add(reader);
522    return sfScanner;
523  }
524
525  /**
526   * @return Current reader. Must call initReader first else returns null.
527   * @see #initReader()
528   */
529  public StoreFileReader getReader() {
530    return this.reader;
531  }
532
533  /**
534   * @param evictOnClose whether to evict blocks belonging to this file
535   * @throws IOException
536   */
537  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
538    if (this.reader != null) {
539      this.reader.close(evictOnClose);
540      this.reader = null;
541    }
542    closeStreamReaders(evictOnClose);
543  }
544
545  public void closeStreamReaders(boolean evictOnClose) throws IOException {
546    synchronized (this) {
547      for (StoreFileReader entry : streamReaders) {
548        //closing the reader will remove itself from streamReaders thanks to the Listener
549        entry.close(evictOnClose);
550      }
551      int size = streamReaders.size();
552      Preconditions.checkState(size == 0,
553          "There are still streamReaders post close: " + size);
554    }
555  }
556
557  /**
558   * Delete this file
559   * @throws IOException
560   */
561  public void deleteStoreFile() throws IOException {
562    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
563    closeStoreFile(evictOnClose);
564    this.fs.delete(getPath(), true);
565  }
566
567  public void markCompactedAway() {
568    this.compactedAway = true;
569  }
570
571  @Override
572  public String toString() {
573    return this.fileInfo.toString();
574  }
575
576  @Override
577  public String toStringDetailed() {
578    StringBuilder sb = new StringBuilder();
579    sb.append(this.getPath().toString());
580    sb.append(", isReference=").append(isReference());
581    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
582    if (isBulkLoadResult()) {
583      sb.append(", bulkLoadTS=");
584      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
585      if (bulkLoadTS.isPresent()) {
586        sb.append(bulkLoadTS.getAsLong());
587      } else {
588        sb.append("NotPresent");
589      }
590    } else {
591      sb.append(", seqid=").append(getMaxSequenceId());
592    }
593    sb.append(", majorCompaction=").append(isMajorCompactionResult());
594
595    return sb.toString();
596  }
597
598  /**
599   * Gets whether to skip resetting the sequence id for cells.
600   * @param skipResetSeqId The byte array of boolean.
601   * @return Whether to skip resetting the sequence id.
602   */
603  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
604    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
605      return Bytes.toBoolean(skipResetSeqId);
606    }
607    return false;
608  }
609
610  @Override
611  public OptionalLong getMinimumTimestamp() {
612    TimeRange tr = getReader().timeRange;
613    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
614  }
615
616  @Override
617  public OptionalLong getMaximumTimestamp() {
618    TimeRange tr = getReader().timeRange;
619    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
620  }
621
622  @Override
623  public void storeFileReaderClosed(StoreFileReader reader) {
624    streamReaders.remove(reader);
625  }
626}