001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Optional;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.io.hfile.BlockType;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.ReaderContext;
043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
044import org.apache.hadoop.hbase.util.BloomFilterFactory;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * A Store data file.  Stores usually have one or more of these files.  They
056 * are produced by flushing the memstore to disk.  To
057 * create, instantiate a writer using {@link StoreFileWriter.Builder}
058 * and append data. Be sure to add any metadata before calling close on the
059 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
060 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
061 * passing filesystem and path.  To read, call {@link #initReader()}
062 * <p>StoreFiles may also reference store files in another Store.
063 *
064 * The reason for this weird pattern where you use a different instance for the
065 * writer and a reader is that we write once but read a lot more.
066 */
067@InterfaceAudience.Private
068public class HStoreFile implements StoreFile {
069
070  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
071
072  // Keys for fileinfo values in HFile
073
074  /** Max Sequence ID in FileInfo */
075  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
076
077  /** Major compaction flag in FileInfo */
078  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
079
080  /** Minor compaction flag in FileInfo */
081  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
082      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
083
084  /**
085   * Key for compaction event which contains the compacted storefiles in FileInfo
086   */
087  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
088
089  /** Bloom filter Type in FileInfo */
090  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
091
092  /** Bloom filter param in FileInfo */
093  public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
094
095  /** Delete Family Count in FileInfo */
096  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
097
098  /** Last Bloom filter key in FileInfo */
099  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
100
101  /** Key for Timerange information in metadata */
102  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
103
104  /** Key for timestamp of earliest-put in metadata */
105  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
106
107  /** Key for the number of mob cells in metadata */
108  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
109
110  /** Meta key set when store file is a result of a bulk load */
111  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
112  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
113
114  /**
115   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
116   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
117   */
118  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
119
120  private final StoreFileInfo fileInfo;
121
122  // StoreFile.Reader
123  private volatile StoreFileReader initialReader;
124
125  // Block cache configuration and reference.
126  private final CacheConfig cacheConf;
127
128  // Indicates if the file got compacted
129  private volatile boolean compactedAway = false;
130
131  // Keys for metadata stored in backing HFile.
132  // Set when we obtain a Reader.
133  private long sequenceid = -1;
134
135  // max of the MemstoreTS in the KV's in this store
136  // Set when we obtain a Reader.
137  private long maxMemstoreTS = -1;
138
139  // firstKey, lastkey and cellComparator will be set when openReader.
140  private Optional<Cell> firstKey;
141
142  private Optional<Cell> lastKey;
143
144  private CellComparator comparator;
145
146  public CacheConfig getCacheConf() {
147    return this.cacheConf;
148  }
149
150  @Override
151  public Optional<Cell> getFirstKey() {
152    return firstKey;
153  }
154
155  @Override
156  public Optional<Cell> getLastKey() {
157    return lastKey;
158  }
159
160  @Override
161  public CellComparator getComparator() {
162    return comparator;
163  }
164
165  @Override
166  public long getMaxMemStoreTS() {
167    return maxMemstoreTS;
168  }
169
170  // If true, this file was product of a major compaction.  Its then set
171  // whenever you get a Reader.
172  private AtomicBoolean majorCompaction = null;
173
174  // If true, this file should not be included in minor compactions.
175  // It's set whenever you get a Reader.
176  private boolean excludeFromMinorCompaction = false;
177
178  // This file was product of these compacted store files
179  private final Set<String> compactedStoreFiles = new HashSet<>();
180
181  /**
182   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
183   * after which it is not modified again.
184   */
185  private Map<byte[], byte[]> metadataMap;
186
187  /**
188   * Bloom filter type specified in column family configuration. Does not
189   * necessarily correspond to the Bloom filter type present in the HFile.
190   */
191  private final BloomType cfBloomType;
192
193  /**
194   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
195   * depending on the underlying files (10-20MB?).
196   * @param fs The current file system to use.
197   * @param p The path of the file.
198   * @param conf The current configuration.
199   * @param cacheConf The cache configuration and block cache reference.
200   * @param cfBloomType The bloom type to use for this store file as specified by column family
201   *          configuration. This may or may not be the same as the Bloom filter type actually
202   *          present in the HFile, because column family configuration might change. If this is
203   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
204   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
205   * @throws IOException
206   */
207  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
208      BloomType cfBloomType, boolean primaryReplica) throws IOException {
209    this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
210  }
211
212  /**
213   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
214   * depending on the underlying files (10-20MB?).
215   * @param fileInfo The store file information.
216   * @param cfBloomType The bloom type to use for this store file as specified by column
217   *          family configuration. This may or may not be the same as the Bloom filter type
218   *          actually present in the HFile, because column family configuration might change. If
219   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
220   * @param cacheConf The cache configuration and block cache reference.
221   */
222  public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
223    this.fileInfo = fileInfo;
224    this.cacheConf = cacheConf;
225    if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
226      this.cfBloomType = cfBloomType;
227    } else {
228      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
229          cfBloomType + " (disabled in config)");
230      this.cfBloomType = BloomType.NONE;
231    }
232  }
233
234  /**
235   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
236   *         reference.
237   */
238  public StoreFileInfo getFileInfo() {
239    return this.fileInfo;
240  }
241
242  @Override
243  public Path getPath() {
244    return this.fileInfo.getPath();
245  }
246
247  @Override
248  public Path getEncodedPath() {
249    try {
250      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
251    } catch (UnsupportedEncodingException ex) {
252      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
253    }
254  }
255
256  @Override
257  public Path getQualifiedPath() {
258    FileSystem fs = fileInfo.getFileSystem();
259    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
260  }
261
262  @Override
263  public boolean isReference() {
264    return this.fileInfo.isReference();
265  }
266
267  @Override
268  public boolean isHFile() {
269    return StoreFileInfo.isHFile(this.fileInfo.getPath());
270  }
271
272  @Override
273  public boolean isMajorCompactionResult() {
274    Preconditions.checkState(this.majorCompaction != null, "Major compation has not been set yet");
275    return this.majorCompaction.get();
276  }
277
278  @Override
279  public boolean excludeFromMinorCompaction() {
280    return this.excludeFromMinorCompaction;
281  }
282
283  @Override
284  public long getMaxSequenceId() {
285    return this.sequenceid;
286  }
287
288  @Override
289  public long getModificationTimestamp() throws IOException {
290    return fileInfo.getModificationTime();
291  }
292
293  /**
294   * Only used by the Striped Compaction Policy
295   * @param key
296   * @return value associated with the metadata key
297   */
298  public byte[] getMetadataValue(byte[] key) {
299    return metadataMap.get(key);
300  }
301
302  @Override
303  public boolean isBulkLoadResult() {
304    boolean bulkLoadedHFile = false;
305    String fileName = this.getPath().getName();
306    int startPos = fileName.indexOf("SeqId_");
307    if (startPos != -1) {
308      bulkLoadedHFile = true;
309    }
310    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
311  }
312
313  public boolean isCompactedAway() {
314    return compactedAway;
315  }
316
317  @VisibleForTesting
318  public int getRefCount() {
319    return fileInfo.refCount.get();
320  }
321
322  /**
323   * @return true if the file is still used in reads
324   */
325  public boolean isReferencedInReads() {
326    int rc = fileInfo.refCount.get();
327    assert rc >= 0; // we should not go negative.
328    return rc > 0;
329  }
330
331  @Override
332  public OptionalLong getBulkLoadTimestamp() {
333    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
334    return bulkLoadTimestamp == null ? OptionalLong.empty()
335        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
336  }
337
338  /**
339   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
340   *         file is opened.
341   */
342  public HDFSBlocksDistribution getHDFSBlockDistribution() {
343    return this.fileInfo.getHDFSBlockDistribution();
344  }
345
346  /**
347   * Opens reader on this store file. Called by Constructor.
348   * @see #closeStoreFile(boolean)
349   */
350  private void open() throws IOException {
351    fileInfo.initHDFSBlocksDistribution();
352    long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
353    ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
354    fileInfo.initHFileInfo(context);
355    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
356    if (reader == null) {
357      reader = fileInfo.createReader(context, cacheConf);
358      fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
359    }
360    this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
361    // Load up indices and fileinfo. This also loads Bloom filter type.
362    metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
363
364    // Read in our metadata.
365    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
366    if (b != null) {
367      // By convention, if halfhfile, top half has a sequence number > bottom
368      // half. Thats why we add one in below. Its done for case the two halves
369      // are ever merged back together --rare.  Without it, on open of store,
370      // since store files are distinguished by sequence id, the one half would
371      // subsume the other.
372      this.sequenceid = Bytes.toLong(b);
373      if (fileInfo.isTopReference()) {
374        this.sequenceid += 1;
375      }
376    }
377
378    if (isBulkLoadResult()){
379      // generate the sequenceId from the fileName
380      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
381      String fileName = this.getPath().getName();
382      // Use lastIndexOf() to get the last, most recent bulk load seqId.
383      int startPos = fileName.lastIndexOf("SeqId_");
384      if (startPos != -1) {
385        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
386            fileName.indexOf('_', startPos + 6)));
387        // Handle reference files as done above.
388        if (fileInfo.isTopReference()) {
389          this.sequenceid += 1;
390        }
391      }
392      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
393      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
394      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
395      // to reset new seqIds for them since this might make a mess of the visibility of cells that
396      // have the same row key but different seqIds.
397      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
398      if (skipResetSeqId) {
399        // increase the seqId when it is a bulk loaded file from mob compaction.
400        this.sequenceid += 1;
401      }
402      initialReader.setSkipResetSeqId(skipResetSeqId);
403      initialReader.setBulkLoaded(true);
404    }
405    initialReader.setSequenceID(this.sequenceid);
406
407    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
408    if (b != null) {
409      this.maxMemstoreTS = Bytes.toLong(b);
410    }
411
412    b = metadataMap.get(MAJOR_COMPACTION_KEY);
413    if (b != null) {
414      boolean mc = Bytes.toBoolean(b);
415      if (this.majorCompaction == null) {
416        this.majorCompaction = new AtomicBoolean(mc);
417      } else {
418        this.majorCompaction.set(mc);
419      }
420    } else {
421      // Presume it is not major compacted if it doesn't explicity say so
422      // HFileOutputFormat explicitly sets the major compacted key.
423      this.majorCompaction = new AtomicBoolean(false);
424    }
425
426    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
427    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
428
429    BloomType hfileBloomType = initialReader.getBloomFilterType();
430    if (cfBloomType != BloomType.NONE) {
431      initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
432      if (hfileBloomType != cfBloomType) {
433        LOG.info("HFile Bloom filter type for "
434            + initialReader.getHFileReader().getName() + ": " + hfileBloomType
435            + ", but " + cfBloomType + " specified in column family "
436            + "configuration");
437      }
438    } else if (hfileBloomType != BloomType.NONE) {
439      LOG.info("Bloom filter turned off by CF config for "
440          + initialReader.getHFileReader().getName());
441    }
442
443    // load delete family bloom filter
444    initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
445
446    try {
447      byte[] data = metadataMap.get(TIMERANGE_KEY);
448      initialReader.timeRange = data == null ? null :
449          TimeRangeTracker.parseFrom(data).toTimeRange();
450    } catch (IllegalArgumentException e) {
451      LOG.error("Error reading timestamp range data from meta -- " +
452          "proceeding without", e);
453      this.initialReader.timeRange = null;
454    }
455
456    try {
457      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
458      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
459    } catch (IOException e) {
460      LOG.error("Error reading compacted storefiles from meta data", e);
461    }
462
463    // initialize so we can reuse them after reader closed.
464    firstKey = initialReader.getFirstKey();
465    lastKey = initialReader.getLastKey();
466    comparator = initialReader.getComparator();
467  }
468
469  /**
470   * Initialize the reader used for pread.
471   */
472  public void initReader() throws IOException {
473    if (initialReader == null) {
474      synchronized (this) {
475        if (initialReader == null) {
476          try {
477            open();
478          } catch (Exception e) {
479            try {
480              boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
481              this.closeStoreFile(evictOnClose);
482            } catch (IOException ee) {
483              LOG.warn("failed to close reader", ee);
484            }
485            throw e;
486          }
487        }
488      }
489    }
490  }
491
492  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
493    initReader();
494    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
495    ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
496    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
497    if (reader == null) {
498      reader = fileInfo.createReader(context, cacheConf);
499      // steam reader need copy stuffs from pread reader
500      reader.copyFields(initialReader);
501    }
502    return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
503  }
504
505  /**
506   * Get a scanner which uses pread.
507   * <p>
508   * Must be called after initReader.
509   */
510  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
511      boolean canOptimizeForNonNullColumn) {
512    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
513      canOptimizeForNonNullColumn);
514  }
515
516  /**
517   * Get a scanner which uses streaming read.
518   * <p>
519   * Must be called after initReader.
520   */
521  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
522      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
523      throws IOException {
524    return createStreamReader(canUseDropBehind)
525        .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
526            canOptimizeForNonNullColumn);
527  }
528
529  /**
530   * @return Current reader. Must call initReader first else returns null.
531   * @see #initReader()
532   */
533  public StoreFileReader getReader() {
534    return this.initialReader;
535  }
536
537  /**
538   * @param evictOnClose whether to evict blocks belonging to this file
539   * @throws IOException
540   */
541  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
542    if (this.initialReader != null) {
543      this.initialReader.close(evictOnClose);
544      this.initialReader = null;
545    }
546  }
547
548  /**
549   * Delete this file
550   * @throws IOException
551   */
552  public void deleteStoreFile() throws IOException {
553    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
554    closeStoreFile(evictOnClose);
555    this.fileInfo.getFileSystem().delete(getPath(), true);
556  }
557
558  public void markCompactedAway() {
559    this.compactedAway = true;
560  }
561
562  @Override
563  public String toString() {
564    return this.fileInfo.toString();
565  }
566
567  @Override
568  public String toStringDetailed() {
569    StringBuilder sb = new StringBuilder();
570    sb.append(this.getPath().toString());
571    sb.append(", isReference=").append(isReference());
572    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
573    if (isBulkLoadResult()) {
574      sb.append(", bulkLoadTS=");
575      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
576      if (bulkLoadTS.isPresent()) {
577        sb.append(bulkLoadTS.getAsLong());
578      } else {
579        sb.append("NotPresent");
580      }
581    } else {
582      sb.append(", seqid=").append(getMaxSequenceId());
583    }
584    sb.append(", majorCompaction=").append(isMajorCompactionResult());
585
586    return sb.toString();
587  }
588
589  /**
590   * Gets whether to skip resetting the sequence id for cells.
591   * @param skipResetSeqId The byte array of boolean.
592   * @return Whether to skip resetting the sequence id.
593   */
594  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
595    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
596      return Bytes.toBoolean(skipResetSeqId);
597    }
598    return false;
599  }
600
601  @Override
602  public OptionalLong getMinimumTimestamp() {
603    TimeRange tr = getReader().timeRange;
604    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
605  }
606
607  @Override
608  public OptionalLong getMaximumTimestamp() {
609    TimeRange tr = getReader().timeRange;
610    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
611  }
612
613  Set<String> getCompactedStoreFiles() {
614    return Collections.unmodifiableSet(this.compactedStoreFiles);
615  }
616}