001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Optional;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.io.hfile.BlockType;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.ReaderContext;
043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
044import org.apache.hadoop.hbase.util.BloomFilterFactory;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051
052/**
053 * A Store data file.  Stores usually have one or more of these files.  They
054 * are produced by flushing the memstore to disk.  To
055 * create, instantiate a writer using {@link StoreFileWriter.Builder}
056 * and append data. Be sure to add any metadata before calling close on the
057 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
058 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
059 * passing filesystem and path.  To read, call {@link #initReader()}
060 * <p>StoreFiles may also reference store files in another Store.
061 *
062 * The reason for this weird pattern where you use a different instance for the
063 * writer and a reader is that we write once but read a lot more.
064 */
065@InterfaceAudience.Private
066public class HStoreFile implements StoreFile {
067
068  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
069
070  // Keys for fileinfo values in HFile
071
072  /** Max Sequence ID in FileInfo */
073  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
074
075  /** Major compaction flag in FileInfo */
076  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
077
078  /** Minor compaction flag in FileInfo */
079  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
080      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
081
082  /**
083   * Key for compaction event which contains the compacted storefiles in FileInfo
084   */
085  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
086
087  /** Bloom filter Type in FileInfo */
088  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
089
090  /** Bloom filter param in FileInfo */
091  public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
092
093  /** Delete Family Count in FileInfo */
094  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
095
096  /** Last Bloom filter key in FileInfo */
097  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
098
099  /** Key for Timerange information in metadata */
100  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
101
102  /** Key for timestamp of earliest-put in metadata */
103  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
104
105  /** Key for the number of mob cells in metadata */
106  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
107
108  /** Meta key set when store file is a result of a bulk load */
109  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
110  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
111
112  /**
113   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
114   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
115   */
116  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
117
118  private final StoreFileInfo fileInfo;
119
120  // StoreFile.Reader
121  private volatile StoreFileReader initialReader;
122
123  // Block cache configuration and reference.
124  private final CacheConfig cacheConf;
125
126  // Indicates if the file got compacted
127  private volatile boolean compactedAway = false;
128
129  // Keys for metadata stored in backing HFile.
130  // Set when we obtain a Reader.
131  private long sequenceid = -1;
132
133  // max of the MemstoreTS in the KV's in this store
134  // Set when we obtain a Reader.
135  private long maxMemstoreTS = -1;
136
137  // firstKey, lastkey and cellComparator will be set when openReader.
138  private Optional<Cell> firstKey;
139
140  private Optional<Cell> lastKey;
141
142  private CellComparator comparator;
143
144  public CacheConfig getCacheConf() {
145    return this.cacheConf;
146  }
147
148  @Override
149  public Optional<Cell> getFirstKey() {
150    return firstKey;
151  }
152
153  @Override
154  public Optional<Cell> getLastKey() {
155    return lastKey;
156  }
157
158  @Override
159  public CellComparator getComparator() {
160    return comparator;
161  }
162
163  @Override
164  public long getMaxMemStoreTS() {
165    return maxMemstoreTS;
166  }
167
168  // If true, this file was product of a major compaction.  Its then set
169  // whenever you get a Reader.
170  private AtomicBoolean majorCompaction = null;
171
172  // If true, this file should not be included in minor compactions.
173  // It's set whenever you get a Reader.
174  private boolean excludeFromMinorCompaction = false;
175
176  // This file was product of these compacted store files
177  private final Set<String> compactedStoreFiles = new HashSet<>();
178
179  /**
180   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
181   * after which it is not modified again.
182   */
183  private Map<byte[], byte[]> metadataMap;
184
185  /**
186   * Bloom filter type specified in column family configuration. Does not
187   * necessarily correspond to the Bloom filter type present in the HFile.
188   */
189  private final BloomType cfBloomType;
190
191  /**
192   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
193   * depending on the underlying files (10-20MB?).
194   * @param fs The current file system to use.
195   * @param p The path of the file.
196   * @param conf The current configuration.
197   * @param cacheConf The cache configuration and block cache reference.
198   * @param cfBloomType The bloom type to use for this store file as specified by column family
199   *          configuration. This may or may not be the same as the Bloom filter type actually
200   *          present in the HFile, because column family configuration might change. If this is
201   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
202   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
203   * @throws IOException
204   */
205  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
206      BloomType cfBloomType, boolean primaryReplica) throws IOException {
207    this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
208  }
209
210  /**
211   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
212   * depending on the underlying files (10-20MB?).
213   * @param fileInfo The store file information.
214   * @param cfBloomType The bloom type to use for this store file as specified by column
215   *          family configuration. This may or may not be the same as the Bloom filter type
216   *          actually present in the HFile, because column family configuration might change. If
217   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
218   * @param cacheConf The cache configuration and block cache reference.
219   */
220  public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
221    this.fileInfo = fileInfo;
222    this.cacheConf = cacheConf;
223    if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
224      this.cfBloomType = cfBloomType;
225    } else {
226      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
227          cfBloomType + " (disabled in config)");
228      this.cfBloomType = BloomType.NONE;
229    }
230  }
231
232  /**
233   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
234   *         reference.
235   */
236  public StoreFileInfo getFileInfo() {
237    return this.fileInfo;
238  }
239
240  @Override
241  public Path getPath() {
242    return this.fileInfo.getPath();
243  }
244
245  @Override
246  public Path getEncodedPath() {
247    try {
248      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
249    } catch (UnsupportedEncodingException ex) {
250      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
251    }
252  }
253
254  @Override
255  public Path getQualifiedPath() {
256    FileSystem fs = fileInfo.getFileSystem();
257    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
258  }
259
260  @Override
261  public boolean isReference() {
262    return this.fileInfo.isReference();
263  }
264
265  @Override
266  public boolean isHFile() {
267    return StoreFileInfo.isHFile(this.fileInfo.getPath());
268  }
269
270  @Override
271  public boolean isMajorCompactionResult() {
272    if (this.majorCompaction == null) {
273      throw new NullPointerException("This has not been set yet");
274    }
275    return this.majorCompaction.get();
276  }
277
278  @Override
279  public boolean excludeFromMinorCompaction() {
280    return this.excludeFromMinorCompaction;
281  }
282
283  @Override
284  public long getMaxSequenceId() {
285    return this.sequenceid;
286  }
287
288  @Override
289  public long getModificationTimeStamp() throws IOException {
290    return getModificationTimestamp();
291  }
292
293  @Override
294  public long getModificationTimestamp() throws IOException {
295    return fileInfo.getModificationTime();
296  }
297
298  /**
299   * Only used by the Striped Compaction Policy
300   * @param key
301   * @return value associated with the metadata key
302   */
303  public byte[] getMetadataValue(byte[] key) {
304    return metadataMap.get(key);
305  }
306
307  @Override
308  public boolean isBulkLoadResult() {
309    boolean bulkLoadedHFile = false;
310    String fileName = this.getPath().getName();
311    int startPos = fileName.indexOf("SeqId_");
312    if (startPos != -1) {
313      bulkLoadedHFile = true;
314    }
315    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
316  }
317
318  public boolean isCompactedAway() {
319    return compactedAway;
320  }
321
322  @VisibleForTesting
323  public int getRefCount() {
324    return fileInfo.refCount.get();
325  }
326
327  /**
328   * @return true if the file is still used in reads
329   */
330  public boolean isReferencedInReads() {
331    int rc = fileInfo.refCount.get();
332    assert rc >= 0; // we should not go negative.
333    return rc > 0;
334  }
335
336  @Override
337  public OptionalLong getBulkLoadTimestamp() {
338    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
339    return bulkLoadTimestamp == null ? OptionalLong.empty()
340        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
341  }
342
343  /**
344   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
345   *         file is opened.
346   */
347  public HDFSBlocksDistribution getHDFSBlockDistribution() {
348    return this.fileInfo.getHDFSBlockDistribution();
349  }
350
351  /**
352   * Opens reader on this store file. Called by Constructor.
353   * @see #closeStoreFile(boolean)
354   */
355  private void open() throws IOException {
356    fileInfo.initHDFSBlocksDistribution();
357    long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
358    ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
359    fileInfo.initHFileInfo(context);
360    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
361    if (reader == null) {
362      reader = fileInfo.createReader(context, cacheConf);
363      fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
364    }
365    this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
366    // Load up indices and fileinfo. This also loads Bloom filter type.
367    metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
368
369    // Read in our metadata.
370    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
371    if (b != null) {
372      // By convention, if halfhfile, top half has a sequence number > bottom
373      // half. Thats why we add one in below. Its done for case the two halves
374      // are ever merged back together --rare.  Without it, on open of store,
375      // since store files are distinguished by sequence id, the one half would
376      // subsume the other.
377      this.sequenceid = Bytes.toLong(b);
378      if (fileInfo.isTopReference()) {
379        this.sequenceid += 1;
380      }
381    }
382
383    if (isBulkLoadResult()){
384      // generate the sequenceId from the fileName
385      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
386      String fileName = this.getPath().getName();
387      // Use lastIndexOf() to get the last, most recent bulk load seqId.
388      int startPos = fileName.lastIndexOf("SeqId_");
389      if (startPos != -1) {
390        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
391            fileName.indexOf('_', startPos + 6)));
392        // Handle reference files as done above.
393        if (fileInfo.isTopReference()) {
394          this.sequenceid += 1;
395        }
396      }
397      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
398      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
399      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
400      // to reset new seqIds for them since this might make a mess of the visibility of cells that
401      // have the same row key but different seqIds.
402      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
403      if (skipResetSeqId) {
404        // increase the seqId when it is a bulk loaded file from mob compaction.
405        this.sequenceid += 1;
406      }
407      initialReader.setSkipResetSeqId(skipResetSeqId);
408      initialReader.setBulkLoaded(true);
409    }
410    initialReader.setSequenceID(this.sequenceid);
411
412    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
413    if (b != null) {
414      this.maxMemstoreTS = Bytes.toLong(b);
415    }
416
417    b = metadataMap.get(MAJOR_COMPACTION_KEY);
418    if (b != null) {
419      boolean mc = Bytes.toBoolean(b);
420      if (this.majorCompaction == null) {
421        this.majorCompaction = new AtomicBoolean(mc);
422      } else {
423        this.majorCompaction.set(mc);
424      }
425    } else {
426      // Presume it is not major compacted if it doesn't explicity say so
427      // HFileOutputFormat explicitly sets the major compacted key.
428      this.majorCompaction = new AtomicBoolean(false);
429    }
430
431    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
432    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
433
434    BloomType hfileBloomType = initialReader.getBloomFilterType();
435    if (cfBloomType != BloomType.NONE) {
436      initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
437      if (hfileBloomType != cfBloomType) {
438        LOG.debug("HFile Bloom filter type for "
439            + initialReader.getHFileReader().getName() + ": " + hfileBloomType
440            + ", but " + cfBloomType + " specified in column family "
441            + "configuration");
442      }
443    } else if (hfileBloomType != BloomType.NONE) {
444      LOG.info("Bloom filter turned off by CF config for "
445          + initialReader.getHFileReader().getName());
446    }
447
448    // load delete family bloom filter
449    initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
450
451    try {
452      byte[] data = metadataMap.get(TIMERANGE_KEY);
453      initialReader.timeRange = data == null ? null :
454          TimeRangeTracker.parseFrom(data).toTimeRange();
455    } catch (IllegalArgumentException e) {
456      LOG.error("Error reading timestamp range data from meta -- " +
457          "proceeding without", e);
458      this.initialReader.timeRange = null;
459    }
460
461    try {
462      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
463      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
464    } catch (IOException e) {
465      LOG.error("Error reading compacted storefiles from meta data", e);
466    }
467
468    // initialize so we can reuse them after reader closed.
469    firstKey = initialReader.getFirstKey();
470    lastKey = initialReader.getLastKey();
471    comparator = initialReader.getComparator();
472  }
473
474  /**
475   * Initialize the reader used for pread.
476   */
477  public void initReader() throws IOException {
478    if (initialReader == null) {
479      synchronized (this) {
480        if (initialReader == null) {
481          try {
482            open();
483          } catch (Exception e) {
484            try {
485              boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
486              this.closeStoreFile(evictOnClose);
487            } catch (IOException ee) {
488              LOG.warn("failed to close reader", ee);
489            }
490            throw e;
491          }
492        }
493      }
494    }
495  }
496
497  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
498    initReader();
499    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
500    ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
501    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
502    if (reader == null) {
503      reader = fileInfo.createReader(context, cacheConf);
504      // steam reader need copy stuffs from pread reader
505      reader.copyFields(initialReader);
506    }
507    return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
508  }
509
510  /**
511   * Get a scanner which uses pread.
512   * <p>
513   * Must be called after initReader.
514   */
515  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
516      boolean canOptimizeForNonNullColumn) {
517    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
518      canOptimizeForNonNullColumn);
519  }
520
521  /**
522   * Get a scanner which uses streaming read.
523   * <p>
524   * Must be called after initReader.
525   */
526  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
527      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
528      throws IOException {
529    return createStreamReader(canUseDropBehind)
530        .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
531            canOptimizeForNonNullColumn);
532  }
533
534  /**
535   * @return Current reader. Must call initReader first else returns null.
536   * @see #initReader()
537   */
538  public StoreFileReader getReader() {
539    return this.initialReader;
540  }
541
542  /**
543   * @param evictOnClose whether to evict blocks belonging to this file
544   * @throws IOException
545   */
546  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
547    if (this.initialReader != null) {
548      this.initialReader.close(evictOnClose);
549      this.initialReader = null;
550    }
551  }
552
553  /**
554   * Delete this file
555   * @throws IOException
556   */
557  public void deleteStoreFile() throws IOException {
558    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
559    closeStoreFile(evictOnClose);
560    this.fileInfo.getFileSystem().delete(getPath(), true);
561  }
562
563  public void markCompactedAway() {
564    this.compactedAway = true;
565  }
566
567  @Override
568  public String toString() {
569    return this.fileInfo.toString();
570  }
571
572  @Override
573  public String toStringDetailed() {
574    StringBuilder sb = new StringBuilder();
575    sb.append(this.getPath().toString());
576    sb.append(", isReference=").append(isReference());
577    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
578    if (isBulkLoadResult()) {
579      sb.append(", bulkLoadTS=");
580      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
581      if (bulkLoadTS.isPresent()) {
582        sb.append(bulkLoadTS.getAsLong());
583      } else {
584        sb.append("NotPresent");
585      }
586    } else {
587      sb.append(", seqid=").append(getMaxSequenceId());
588    }
589    sb.append(", majorCompaction=").append(isMajorCompactionResult());
590
591    return sb.toString();
592  }
593
594  /**
595   * Gets whether to skip resetting the sequence id for cells.
596   * @param skipResetSeqId The byte array of boolean.
597   * @return Whether to skip resetting the sequence id.
598   */
599  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
600    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
601      return Bytes.toBoolean(skipResetSeqId);
602    }
603    return false;
604  }
605
606  @Override
607  public OptionalLong getMinimumTimestamp() {
608    TimeRange tr = getReader().timeRange;
609    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
610  }
611
612  @Override
613  public OptionalLong getMaximumTimestamp() {
614    TimeRange tr = getReader().timeRange;
615    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
616  }
617
618  Set<String> getCompactedStoreFiles() {
619    return Collections.unmodifiableSet(this.compactedStoreFiles);
620  }
621}