001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.net.URLEncoder;
024import java.util.Collections;
025import java.util.Map;
026import java.util.Optional;
027import java.util.OptionalLong;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HDFSBlocksDistribution;
040import org.apache.hadoop.hbase.io.TimeRange;
041import org.apache.hadoop.hbase.io.hfile.BlockType;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.util.BloomFilterFactory;
045import org.apache.hadoop.hbase.util.Bytes;
046
047import org.apache.yetus.audience.InterfaceAudience;
048
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053
054
055/**
056 * A Store data file.  Stores usually have one or more of these files.  They
057 * are produced by flushing the memstore to disk.  To
058 * create, instantiate a writer using {@link StoreFileWriter.Builder}
059 * and append data. Be sure to add any metadata before calling close on the
060 * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
061 * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
062 * passing filesystem and path.  To read, call {@link #initReader()}
063 * <p>StoreFiles may also reference store files in another Store.
064 *
065 * The reason for this weird pattern where you use a different instance for the
066 * writer and a reader is that we write once but read a lot more.
067 */
068@InterfaceAudience.Private
069public class HStoreFile implements StoreFile, StoreFileReader.Listener {
070
071  private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
072
073  public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead";
074
075  private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
076
077  // Keys for fileinfo values in HFile
078
079  /** Max Sequence ID in FileInfo */
080  public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
081
082  /** Major compaction flag in FileInfo */
083  public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
084
085  /** Minor compaction flag in FileInfo */
086  public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
087      Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
088
089  /** Bloom filter Type in FileInfo */
090  public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
091
092  /** Delete Family Count in FileInfo */
093  public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
094
095  /** Last Bloom filter key in FileInfo */
096  public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
097
098  /** Key for Timerange information in metadata */
099  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
100
101  /** Key for timestamp of earliest-put in metadata */
102  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
103
104  /** Key for the number of mob cells in metadata */
105  public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
106
107  /** Meta key set when store file is a result of a bulk load */
108  public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
109  public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
110
111  /**
112   * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
113   * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
114   */
115  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
116
117  private final StoreFileInfo fileInfo;
118  private final FileSystem fs;
119
120  // Block cache configuration and reference.
121  private final CacheConfig cacheConf;
122
123  // Counter that is incremented every time a scanner is created on the
124  // store file. It is decremented when the scan on the store file is
125  // done.
126  private final AtomicInteger refCount = new AtomicInteger(0);
127
128  // Set implementation must be of concurrent type
129  @VisibleForTesting
130  final Set<StoreFileReader> streamReaders;
131
132  private final boolean noReadahead;
133
134  private final boolean primaryReplica;
135
136  // Indicates if the file got compacted
137  private volatile boolean compactedAway = false;
138
139  // Keys for metadata stored in backing HFile.
140  // Set when we obtain a Reader.
141  private long sequenceid = -1;
142
143  // max of the MemstoreTS in the KV's in this store
144  // Set when we obtain a Reader.
145  private long maxMemstoreTS = -1;
146
147  // firstKey, lastkey and cellComparator will be set when openReader.
148  private Optional<Cell> firstKey;
149
150  private Optional<Cell> lastKey;
151
152  private CellComparator comparator;
153
154  public CacheConfig getCacheConf() {
155    return cacheConf;
156  }
157
158  @Override
159  public Optional<Cell> getFirstKey() {
160    return firstKey;
161  }
162
163  @Override
164  public Optional<Cell> getLastKey() {
165    return lastKey;
166  }
167
168  @Override
169  public CellComparator getComparator() {
170    return comparator;
171  }
172
173  @Override
174  public long getMaxMemStoreTS() {
175    return maxMemstoreTS;
176  }
177
178  // If true, this file was product of a major compaction.  Its then set
179  // whenever you get a Reader.
180  private AtomicBoolean majorCompaction = null;
181
182  // If true, this file should not be included in minor compactions.
183  // It's set whenever you get a Reader.
184  private boolean excludeFromMinorCompaction = false;
185
186  /**
187   * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
188   * after which it is not modified again.
189   */
190  private Map<byte[], byte[]> metadataMap;
191
192  // StoreFile.Reader
193  private volatile StoreFileReader reader;
194
195  /**
196   * Bloom filter type specified in column family configuration. Does not
197   * necessarily correspond to the Bloom filter type present in the HFile.
198   */
199  private final BloomType cfBloomType;
200
201  /**
202   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
203   * depending on the underlying files (10-20MB?).
204   * @param fs The current file system to use.
205   * @param p The path of the file.
206   * @param conf The current configuration.
207   * @param cacheConf The cache configuration and block cache reference.
208   * @param cfBloomType The bloom type to use for this store file as specified by column family
209   *          configuration. This may or may not be the same as the Bloom filter type actually
210   *          present in the HFile, because column family configuration might change. If this is
211   *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
212   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
213   * @throws IOException
214   */
215  public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
216      BloomType cfBloomType, boolean primaryReplica) throws IOException {
217    this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica);
218  }
219
220  /**
221   * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
222   * depending on the underlying files (10-20MB?).
223   * @param fs fs The current file system to use.
224   * @param fileInfo The store file information.
225   * @param conf The current configuration.
226   * @param cacheConf The cache configuration and block cache reference.
227   * @param cfBloomType The bloom type to use for this store file as specified by column
228   *          family configuration. This may or may not be the same as the Bloom filter type
229   *          actually present in the HFile, because column family configuration might change. If
230   *          this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
231   * @param primaryReplica true if this is a store file for primary replica, otherwise false.
232   */
233  public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
234      BloomType cfBloomType, boolean primaryReplica) {
235    this.streamReaders = ConcurrentHashMap.newKeySet();
236    this.fs = fs;
237    this.fileInfo = fileInfo;
238    this.cacheConf = cacheConf;
239    this.noReadahead =
240        conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
241    if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
242      this.cfBloomType = cfBloomType;
243    } else {
244      LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
245          cfBloomType + " (disabled in config)");
246      this.cfBloomType = BloomType.NONE;
247    }
248    this.primaryReplica = primaryReplica;
249  }
250
251  /**
252   * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
253   *         reference.
254   */
255  public StoreFileInfo getFileInfo() {
256    return this.fileInfo;
257  }
258
259  @Override
260  public Path getPath() {
261    return this.fileInfo.getPath();
262  }
263
264  @Override
265  public Path getEncodedPath() {
266    try {
267      return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
268    } catch (UnsupportedEncodingException ex) {
269      throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
270    }
271  }
272
273  @Override
274  public Path getQualifiedPath() {
275    return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
276  }
277
278  @Override
279  public boolean isReference() {
280    return this.fileInfo.isReference();
281  }
282
283  @Override
284  public boolean isHFile() {
285    return StoreFileInfo.isHFile(this.fileInfo.getPath());
286  }
287
288  @Override
289  public boolean isMajorCompactionResult() {
290    if (this.majorCompaction == null) {
291      throw new NullPointerException("This has not been set yet");
292    }
293    return this.majorCompaction.get();
294  }
295
296  @Override
297  public boolean excludeFromMinorCompaction() {
298    return this.excludeFromMinorCompaction;
299  }
300
301  @Override
302  public long getMaxSequenceId() {
303    return this.sequenceid;
304  }
305
306  @Override
307  public long getModificationTimeStamp() throws IOException {
308    return getModificationTimestamp();
309  }
310
311  @Override
312  public long getModificationTimestamp() throws IOException {
313    return fileInfo.getModificationTime();
314  }
315
316  /**
317   * Only used by the Striped Compaction Policy
318   * @param key
319   * @return value associated with the metadata key
320   */
321  public byte[] getMetadataValue(byte[] key) {
322    return metadataMap.get(key);
323  }
324
325  @Override
326  public boolean isBulkLoadResult() {
327    boolean bulkLoadedHFile = false;
328    String fileName = this.getPath().getName();
329    int startPos = fileName.indexOf("SeqId_");
330    if (startPos != -1) {
331      bulkLoadedHFile = true;
332    }
333    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
334  }
335
336  public boolean isCompactedAway() {
337    return compactedAway;
338  }
339
340  @VisibleForTesting
341  public int getRefCount() {
342    return refCount.get();
343  }
344
345  /**
346   * @return true if the file is still used in reads
347   */
348  public boolean isReferencedInReads() {
349    int rc = refCount.get();
350    assert rc >= 0; // we should not go negative.
351    return rc > 0;
352  }
353
354  @Override
355  public OptionalLong getBulkLoadTimestamp() {
356    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
357    return bulkLoadTimestamp == null ? OptionalLong.empty()
358        : OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
359  }
360
361  /**
362   * @return the cached value of HDFS blocks distribution. The cached value is calculated when store
363   *         file is opened.
364   */
365  public HDFSBlocksDistribution getHDFSBlockDistribution() {
366    return this.fileInfo.getHDFSBlockDistribution();
367  }
368
369  /**
370   * Opens reader on this store file. Called by Constructor.
371   * @throws IOException
372   * @see #closeStoreFile(boolean)
373   */
374  private void open() throws IOException {
375    if (this.reader != null) {
376      throw new IllegalAccessError("Already open");
377    }
378
379    // Open the StoreFile.Reader
380    this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L,
381      primaryReplica, refCount, true);
382
383    // Load up indices and fileinfo. This also loads Bloom filter type.
384    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
385
386    // Read in our metadata.
387    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
388    if (b != null) {
389      // By convention, if halfhfile, top half has a sequence number > bottom
390      // half. Thats why we add one in below. Its done for case the two halves
391      // are ever merged back together --rare.  Without it, on open of store,
392      // since store files are distinguished by sequence id, the one half would
393      // subsume the other.
394      this.sequenceid = Bytes.toLong(b);
395      if (fileInfo.isTopReference()) {
396        this.sequenceid += 1;
397      }
398    }
399
400    if (isBulkLoadResult()){
401      // generate the sequenceId from the fileName
402      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
403      String fileName = this.getPath().getName();
404      // Use lastIndexOf() to get the last, most recent bulk load seqId.
405      int startPos = fileName.lastIndexOf("SeqId_");
406      if (startPos != -1) {
407        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
408            fileName.indexOf('_', startPos + 6)));
409        // Handle reference files as done above.
410        if (fileInfo.isTopReference()) {
411          this.sequenceid += 1;
412        }
413      }
414      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
415      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
416      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
417      // to reset new seqIds for them since this might make a mess of the visibility of cells that
418      // have the same row key but different seqIds.
419      boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
420      if (skipResetSeqId) {
421        // increase the seqId when it is a bulk loaded file from mob compaction.
422        this.sequenceid += 1;
423      }
424      this.reader.setSkipResetSeqId(skipResetSeqId);
425      this.reader.setBulkLoaded(true);
426    }
427    this.reader.setSequenceID(this.sequenceid);
428
429    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
430    if (b != null) {
431      this.maxMemstoreTS = Bytes.toLong(b);
432    }
433
434    b = metadataMap.get(MAJOR_COMPACTION_KEY);
435    if (b != null) {
436      boolean mc = Bytes.toBoolean(b);
437      if (this.majorCompaction == null) {
438        this.majorCompaction = new AtomicBoolean(mc);
439      } else {
440        this.majorCompaction.set(mc);
441      }
442    } else {
443      // Presume it is not major compacted if it doesn't explicity say so
444      // HFileOutputFormat explicitly sets the major compacted key.
445      this.majorCompaction = new AtomicBoolean(false);
446    }
447
448    b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
449    this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
450
451    BloomType hfileBloomType = reader.getBloomFilterType();
452    if (cfBloomType != BloomType.NONE) {
453      reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
454      if (hfileBloomType != cfBloomType) {
455        LOG.info("HFile Bloom filter type for "
456            + reader.getHFileReader().getName() + ": " + hfileBloomType
457            + ", but " + cfBloomType + " specified in column family "
458            + "configuration");
459      }
460    } else if (hfileBloomType != BloomType.NONE) {
461      LOG.info("Bloom filter turned off by CF config for "
462          + reader.getHFileReader().getName());
463    }
464
465    // load delete family bloom filter
466    reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
467
468    try {
469      byte[] data = metadataMap.get(TIMERANGE_KEY);
470      this.reader.timeRange = data == null ? null : TimeRangeTracker.parseFrom(data).toTimeRange();
471    } catch (IllegalArgumentException e) {
472      LOG.error("Error reading timestamp range data from meta -- " +
473          "proceeding without", e);
474      this.reader.timeRange = null;
475    }
476    // initialize so we can reuse them after reader closed.
477    firstKey = reader.getFirstKey();
478    lastKey = reader.getLastKey();
479    comparator = reader.getComparator();
480  }
481
482  /**
483   * Initialize the reader used for pread.
484   */
485  public void initReader() throws IOException {
486    if (reader == null) {
487      try {
488        open();
489      } catch (Exception e) {
490        try {
491          boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
492          this.closeStoreFile(evictOnClose);
493        } catch (IOException ee) {
494          LOG.warn("failed to close reader", ee);
495        }
496        throw e;
497      }
498    }
499  }
500
501  private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
502    initReader();
503    StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
504      primaryReplica, refCount, false);
505    reader.copyFields(this.reader);
506    return reader;
507  }
508
509  /**
510   * Get a scanner which uses pread.
511   * <p>
512   * Must be called after initReader.
513   */
514  public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
515      boolean canOptimizeForNonNullColumn) {
516    return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
517      canOptimizeForNonNullColumn);
518  }
519
520  /**
521   * Get a scanner which uses streaming read.
522   * <p>
523   * Must be called after initReader.
524   */
525  public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
526      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
527      throws IOException {
528    StoreFileReader reader = createStreamReader(canUseDropBehind);
529    reader.setListener(this);
530    StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false,
531      isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
532    //Add reader once the scanner is created
533    streamReaders.add(reader);
534    return sfScanner;
535  }
536
537  /**
538   * @return Current reader. Must call initReader first else returns null.
539   * @see #initReader()
540   */
541  public StoreFileReader getReader() {
542    return this.reader;
543  }
544
545  /**
546   * @param evictOnClose whether to evict blocks belonging to this file
547   * @throws IOException
548   */
549  public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
550    if (this.reader != null) {
551      this.reader.close(evictOnClose);
552      this.reader = null;
553    }
554    closeStreamReaders(evictOnClose);
555  }
556
557  public void closeStreamReaders(boolean evictOnClose) throws IOException {
558    synchronized (this) {
559      for (StoreFileReader entry : streamReaders) {
560        //closing the reader will remove itself from streamReaders thanks to the Listener
561        entry.close(evictOnClose);
562      }
563      int size = streamReaders.size();
564      Preconditions.checkState(size == 0,
565          "There are still streamReaders post close: " + size);
566    }
567  }
568
569  /**
570   * Delete this file
571   * @throws IOException
572   */
573  public void deleteStoreFile() throws IOException {
574    boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
575    closeStoreFile(evictOnClose);
576    this.fs.delete(getPath(), true);
577  }
578
579  public void markCompactedAway() {
580    this.compactedAway = true;
581  }
582
583  @Override
584  public String toString() {
585    return this.fileInfo.toString();
586  }
587
588  @Override
589  public String toStringDetailed() {
590    StringBuilder sb = new StringBuilder();
591    sb.append(this.getPath().toString());
592    sb.append(", isReference=").append(isReference());
593    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
594    if (isBulkLoadResult()) {
595      sb.append(", bulkLoadTS=");
596      OptionalLong bulkLoadTS = getBulkLoadTimestamp();
597      if (bulkLoadTS.isPresent()) {
598        sb.append(bulkLoadTS.getAsLong());
599      } else {
600        sb.append("NotPresent");
601      }
602    } else {
603      sb.append(", seqid=").append(getMaxSequenceId());
604    }
605    sb.append(", majorCompaction=").append(isMajorCompactionResult());
606
607    return sb.toString();
608  }
609
610  /**
611   * Gets whether to skip resetting the sequence id for cells.
612   * @param skipResetSeqId The byte array of boolean.
613   * @return Whether to skip resetting the sequence id.
614   */
615  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
616    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
617      return Bytes.toBoolean(skipResetSeqId);
618    }
619    return false;
620  }
621
622  @Override
623  public OptionalLong getMinimumTimestamp() {
624    TimeRange tr = getReader().timeRange;
625    return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
626  }
627
628  @Override
629  public OptionalLong getMaximumTimestamp() {
630    TimeRange tr = getReader().timeRange;
631    return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
632  }
633
634  @Override
635  public void storeFileReaderClosed(StoreFileReader reader) {
636    streamReaders.remove(reader);
637  }
638}