001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Optional;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.io.hfile.BlockType;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.ReaderContext;
043import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
045import org.apache.hadoop.hbase.util.BloomFilterFactory;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054
055/**
056 * A Store data file.  Stores usually have one or more of these files.  They
057 * are produced by flushing the memstore to disk.  To
058 * create, instantiate a writer using {@link StoreFileWriter.Builder}
059 * and append data. Be sure to add any metadata before calling close on the
060 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
061 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
062 * passing filesystem and path.  To read, call {@link #initReader()}
063 * <p>StoreFiles may also reference store files in another Store.
064 *
065 * The reason for this weird pattern where you use a different instance for the
066 * writer and a reader is that we write once but read a lot more.
067 */
068@InterfaceAudience.Private
069public class HStoreFile implements StoreFile {
070
071  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
072
073  // Keys for fileinfo values in HFile
074
075  /** Max Sequence ID in FileInfo */
076  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
077
078  /** Major compaction flag in FileInfo */
079  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
080
081  /** Minor compaction flag in FileInfo */
082  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
083      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
084
085  /**
086   * Key for compaction event which contains the compacted storefiles in FileInfo
087   */
088  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
089
090  /** Bloom filter Type in FileInfo */
091  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
092
093  /** Bloom filter param in FileInfo */
094  public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
095
096  /** Delete Family Count in FileInfo */
097  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
098
099  /** Last Bloom filter key in FileInfo */
100  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
101
102  /** Key for Timerange information in metadata */
103  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
104
105  /** Key for timestamp of earliest-put in metadata */
106  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
107
108  /** Key for the number of mob cells in metadata */
109  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
110
111  /** Meta key set when store file is a result of a bulk load */
112  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
113  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
114
115  /**
116   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
117   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
118   */
119  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
120
121  private final StoreFileInfo fileInfo;
122
123  // StoreFile.Reader
124  private volatile StoreFileReader initialReader;
125
126  // Block cache configuration and reference.
127  private final CacheConfig cacheConf;
128
129  // Indicates if the file got compacted
130  private volatile boolean compactedAway = false;
131
132  // Keys for metadata stored in backing HFile.
133  // Set when we obtain a Reader.
134  private long sequenceid = -1;
135
136  // max of the MemstoreTS in the KV's in this store
137  // Set when we obtain a Reader.
138  private long maxMemstoreTS = -1;
139
140  // firstKey, lastkey and cellComparator will be set when openReader.
141  private Optional<Cell> firstKey;
142
143  private Optional<Cell> lastKey;
144
145  private CellComparator comparator;
146
147  public CacheConfig getCacheConf() {
148    return this.cacheConf;
149  }
150
151  @Override
152  public Optional<Cell> getFirstKey() {
153    return firstKey;
154  }
155
156  @Override
157  public Optional<Cell> getLastKey() {
158    return lastKey;
159  }
160
161  @Override
162  public CellComparator getComparator() {
163    return comparator;
164  }
165
166  @Override
167  public long getMaxMemStoreTS() {
168    return maxMemstoreTS;
169  }
170
171  // If true, this file was product of a major compaction.  Its then set
172  // whenever you get a Reader.
173  private AtomicBoolean majorCompaction = null;
174
175  // If true, this file should not be included in minor compactions.
176  // It's set whenever you get a Reader.
177  private boolean excludeFromMinorCompaction = false;
178
179  // This file was product of these compacted store files
180  private final Set<String> compactedStoreFiles = new HashSet<>();
181
182  /**
183   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
184   * after which it is not modified again.
185   */
186  private Map<byte[], byte[]> metadataMap;
187
188  /**
189   * Bloom filter type specified in column family configuration. Does not
190   * necessarily correspond to the Bloom filter type present in the HFile.
191   */
192  private final BloomType cfBloomType;
193
194  /**
195   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
196   * depending on the underlying files (10-20MB?).
197   * @param fs The current file system to use.
198   * @param p The path of the file.
199   * @param conf The current configuration.
200   * @param cacheConf The cache configuration and block cache reference.
201   * @param cfBloomType The bloom type to use for this store file as specified by column family
202   *          configuration. This may or may not be the same as the Bloom filter type actually
203   *          present in the HFile, because column family configuration might change. If this is
204   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
205   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
206   * @throws IOException
207   */
208  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
209      BloomType cfBloomType, boolean primaryReplica) throws IOException {
210    this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
211  }
212
213  /**
214   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
215   * depending on the underlying files (10-20MB?).
216   * @param fileInfo The store file information.
217   * @param cfBloomType The bloom type to use for this store file as specified by column
218   *          family configuration. This may or may not be the same as the Bloom filter type
219   *          actually present in the HFile, because column family configuration might change. If
220   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
221   * @param cacheConf The cache configuration and block cache reference.
222   */
223  public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
224    this.fileInfo = fileInfo;
225    this.cacheConf = cacheConf;
226    if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
227      this.cfBloomType = cfBloomType;
228    } else {
229      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
230          cfBloomType + " (disabled in config)");
231      this.cfBloomType = BloomType.NONE;
232    }
233  }
234
235  /**
236   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
237   *         reference.
238   */
239  public StoreFileInfo getFileInfo() {
240    return this.fileInfo;
241  }
242
243  @Override
244  public Path getPath() {
245    return this.fileInfo.getPath();
246  }
247
248  @Override
249  public Path getEncodedPath() {
250    try {
251      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
252    } catch (UnsupportedEncodingException ex) {
253      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
254    }
255  }
256
257  @Override
258  public Path getQualifiedPath() {
259    FileSystem fs = fileInfo.getFileSystem();
260    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
261  }
262
263  @Override
264  public boolean isReference() {
265    return this.fileInfo.isReference();
266  }
267
268  @Override
269  public boolean isHFile() {
270    return StoreFileInfo.isHFile(this.fileInfo.getPath());
271  }
272
273  @Override
274  public boolean isMajorCompactionResult() {
275    if (this.majorCompaction == null) {
276      throw new NullPointerException("This has not been set yet");
277    }
278    return this.majorCompaction.get();
279  }
280
281  @Override
282  public boolean excludeFromMinorCompaction() {
283    return this.excludeFromMinorCompaction;
284  }
285
286  @Override
287  public long getMaxSequenceId() {
288    return this.sequenceid;
289  }
290
291  @Override
292  public long getModificationTimeStamp() throws IOException {
293    return getModificationTimestamp();
294  }
295
296  @Override
297  public long getModificationTimestamp() throws IOException {
298    return fileInfo.getModificationTime();
299  }
300
301  /**
302   * Only used by the Striped Compaction Policy
303   * @param key
304   * @return value associated with the metadata key
305   */
306  public byte[] getMetadataValue(byte[] key) {
307    return metadataMap.get(key);
308  }
309
310  @Override
311  public boolean isBulkLoadResult() {
312    boolean bulkLoadedHFile = false;
313    String fileName = this.getPath().getName();
314    int startPos = fileName.indexOf("SeqId_");
315    if (startPos != -1) {
316      bulkLoadedHFile = true;
317    }
318    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
319  }
320
321  public boolean isCompactedAway() {
322    return compactedAway;
323  }
324
325  public int getRefCount() {
326    return fileInfo.refCount.get();
327  }
328
329  /**
330   * @return true if the file is still used in reads
331   */
332  public boolean isReferencedInReads() {
333    int rc = fileInfo.refCount.get();
334    assert rc >= 0; // we should not go negative.
335    return rc > 0;
336  }
337
338  @Override
339  public OptionalLong getBulkLoadTimestamp() {
340    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
341    return bulkLoadTimestamp == null ? OptionalLong.empty()
342        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
343  }
344
345  /**
346   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
347   *         file is opened.
348   */
349  public HDFSBlocksDistribution getHDFSBlockDistribution() {
350    return this.fileInfo.getHDFSBlockDistribution();
351  }
352
353  /**
354   * Opens reader on this store file. Called by Constructor.
355   * @see #closeStoreFile(boolean)
356   */
357  private void open() throws IOException {
358    fileInfo.initHDFSBlocksDistribution();
359    long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
360    ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
361    fileInfo.initHFileInfo(context);
362    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
363    if (reader == null) {
364      reader = fileInfo.createReader(context, cacheConf);
365      fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
366    }
367    this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
368    // Load up indices and fileinfo. This also loads Bloom filter type.
369    metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
370
371    // Read in our metadata.
372    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
373    if (b != null) {
374      // By convention, if halfhfile, top half has a sequence number > bottom
375      // half. Thats why we add one in below. Its done for case the two halves
376      // are ever merged back together --rare.  Without it, on open of store,
377      // since store files are distinguished by sequence id, the one half would
378      // subsume the other.
379      this.sequenceid = Bytes.toLong(b);
380      if (fileInfo.isTopReference()) {
381        this.sequenceid += 1;
382      }
383    }
384
385    if (isBulkLoadResult()){
386      // generate the sequenceId from the fileName
387      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
388      String fileName = this.getPath().getName();
389      // Use lastIndexOf() to get the last, most recent bulk load seqId.
390      int startPos = fileName.lastIndexOf("SeqId_");
391      if (startPos != -1) {
392        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
393            fileName.indexOf('_', startPos + 6)));
394        // Handle reference files as done above.
395        if (fileInfo.isTopReference()) {
396          this.sequenceid += 1;
397        }
398      }
399      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
400      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
401      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
402      // to reset new seqIds for them since this might make a mess of the visibility of cells that
403      // have the same row key but different seqIds.
404      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
405      if (skipResetSeqId) {
406        // increase the seqId when it is a bulk loaded file from mob compaction.
407        this.sequenceid += 1;
408      }
409      initialReader.setSkipResetSeqId(skipResetSeqId);
410      initialReader.setBulkLoaded(true);
411    }
412    initialReader.setSequenceID(this.sequenceid);
413
414    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
415    if (b != null) {
416      this.maxMemstoreTS = Bytes.toLong(b);
417    }
418
419    b = metadataMap.get(MAJOR_COMPACTION_KEY);
420    if (b != null) {
421      boolean mc = Bytes.toBoolean(b);
422      if (this.majorCompaction == null) {
423        this.majorCompaction = new AtomicBoolean(mc);
424      } else {
425        this.majorCompaction.set(mc);
426      }
427    } else {
428      // Presume it is not major compacted if it doesn't explicity say so
429      // HFileOutputFormat explicitly sets the major compacted key.
430      this.majorCompaction = new AtomicBoolean(false);
431    }
432
433    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
434    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
435
436    BloomType hfileBloomType = initialReader.getBloomFilterType();
437    if (cfBloomType != BloomType.NONE) {
438      initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
439      if (hfileBloomType != cfBloomType) {
440        LOG.debug("HFile Bloom filter type for "
441            + initialReader.getHFileReader().getName() + ": " + hfileBloomType
442            + ", but " + cfBloomType + " specified in column family "
443            + "configuration");
444      }
445    } else if (hfileBloomType != BloomType.NONE) {
446      LOG.info("Bloom filter turned off by CF config for "
447          + initialReader.getHFileReader().getName());
448    }
449
450    // load delete family bloom filter
451    initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
452
453    try {
454      byte[] data = metadataMap.get(TIMERANGE_KEY);
455      initialReader.timeRange = data == null ? null :
456          TimeRangeTracker.parseFrom(data).toTimeRange();
457    } catch (IllegalArgumentException e) {
458      LOG.error("Error reading timestamp range data from meta -- " +
459          "proceeding without", e);
460      this.initialReader.timeRange = null;
461    }
462
463    try {
464      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
465      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
466    } catch (IOException e) {
467      LOG.error("Error reading compacted storefiles from meta data", e);
468    }
469
470    // initialize so we can reuse them after reader closed.
471    firstKey = initialReader.getFirstKey();
472    lastKey = initialReader.getLastKey();
473    comparator = initialReader.getComparator();
474  }
475
476  /**
477   * Initialize the reader used for pread.
478   */
479  public void initReader() throws IOException {
480    if (initialReader == null) {
481      synchronized (this) {
482        if (initialReader == null) {
483          try {
484            open();
485          } catch (Exception e) {
486            try {
487              boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
488              this.closeStoreFile(evictOnClose);
489            } catch (IOException ee) {
490              LOG.warn("failed to close reader", ee);
491            }
492            throw e;
493          }
494        }
495      }
496    }
497  }
498
499  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
500    initReader();
501    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
502    ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
503    StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
504    if (reader == null) {
505      reader = fileInfo.createReader(context, cacheConf);
506      // steam reader need copy stuffs from pread reader
507      reader.copyFields(initialReader);
508    }
509    return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
510  }
511
512  /**
513   * Get a scanner which uses pread.
514   * <p>
515   * Must be called after initReader.
516   */
517  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
518      boolean canOptimizeForNonNullColumn) {
519    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
520      canOptimizeForNonNullColumn);
521  }
522
523  /**
524   * Get a scanner which uses streaming read.
525   * <p>
526   * Must be called after initReader.
527   */
528  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
529      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
530      throws IOException {
531    return createStreamReader(canUseDropBehind)
532        .getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
533            canOptimizeForNonNullColumn);
534  }
535
536  /**
537   * @return Current reader. Must call initReader first else returns null.
538   * @see #initReader()
539   */
540  public StoreFileReader getReader() {
541    return this.initialReader;
542  }
543
544  /**
545   * @param evictOnClose whether to evict blocks belonging to this file
546   * @throws IOException
547   */
548  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
549    if (this.initialReader != null) {
550      this.initialReader.close(evictOnClose);
551      this.initialReader = null;
552    }
553  }
554
555  /**
556   * Delete this file
557   * @throws IOException
558   */
559  public void deleteStoreFile() throws IOException {
560    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
561    closeStoreFile(evictOnClose);
562    this.fileInfo.getFileSystem().delete(getPath(), true);
563  }
564
565  public void markCompactedAway() {
566    this.compactedAway = true;
567  }
568
569  @Override
570  public String toString() {
571    return this.fileInfo.toString();
572  }
573
574  @Override
575  public String toStringDetailed() {
576    StringBuilder sb = new StringBuilder();
577    sb.append(this.getPath().toString());
578    sb.append(", isReference=").append(isReference());
579    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
580    if (isBulkLoadResult()) {
581      sb.append(", bulkLoadTS=");
582      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
583      if (bulkLoadTS.isPresent()) {
584        sb.append(bulkLoadTS.getAsLong());
585      } else {
586        sb.append("NotPresent");
587      }
588    } else {
589      sb.append(", seqid=").append(getMaxSequenceId());
590    }
591    sb.append(", majorCompaction=").append(isMajorCompactionResult());
592
593    return sb.toString();
594  }
595
596  /**
597   * Gets whether to skip resetting the sequence id for cells.
598   * @param skipResetSeqId The byte array of boolean.
599   * @return Whether to skip resetting the sequence id.
600   */
601  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
602    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
603      return Bytes.toBoolean(skipResetSeqId);
604    }
605    return false;
606  }
607
608  @Override
609  public OptionalLong getMinimumTimestamp() {
610    TimeRange tr = getReader().timeRange;
611    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
612  }
613
614  @Override
615  public OptionalLong getMaximumTimestamp() {
616    TimeRange tr = getReader().timeRange;
617    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
618  }
619
620  Set<String> getCompactedStoreFiles() {
621    return Collections.unmodifiableSet(this.compactedStoreFiles);
622  }
623}