001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.IOException;
023import java.util.Collection;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
027import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
028import org.apache.hadoop.hbase.io.compress.Compression;
029import org.apache.hadoop.hbase.io.crypto.Encryption;
030import org.apache.hadoop.hbase.io.hfile.CacheConfig;
031import org.apache.hadoop.hbase.io.hfile.HFile;
032import org.apache.hadoop.hbase.io.hfile.HFileContext;
033import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
034import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
035import org.apache.hadoop.hbase.regionserver.StoreContext;
036import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
037import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
038import org.apache.hadoop.hbase.regionserver.StoreUtils;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Base class for all store file tracker.
046 * <p/>
047 * Mainly used to place the common logic to skip persistent for secondary replicas.
048 */
049@InterfaceAudience.Private
050abstract class StoreFileTrackerBase implements StoreFileTracker {
051
052  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
053
054  protected final Configuration conf;
055
056  protected final boolean isPrimaryReplica;
057
058  protected final StoreContext ctx;
059
060  private volatile boolean cacheOnWriteLogged;
061
062  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
063    this.conf = conf;
064    this.isPrimaryReplica = isPrimaryReplica;
065    this.ctx = ctx;
066  }
067
068  @Override
069  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
070    if (isPrimaryReplica) {
071      doAddNewStoreFiles(newFiles);
072    }
073  }
074
075  @Override
076  public final void replace(Collection<StoreFileInfo> compactedFiles,
077    Collection<StoreFileInfo> newFiles) throws IOException {
078    if (isPrimaryReplica) {
079      doAddCompactionResults(compactedFiles, newFiles);
080    }
081  }
082
083  @Override
084  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
085    builder.setValue(TRACKER_IMPL, getTrackerName());
086    return builder;
087  }
088
089  protected final String getTrackerName() {
090    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
091  }
092
093  private HFileContext createFileContext(Compression.Algorithm compression,
094    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
095    if (compression == null) {
096      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
097    }
098    ColumnFamilyDescriptor family = ctx.getFamily();
099    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
100      .withIncludesTags(includesTag).withCompression(compression)
101      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
102      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
103      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
104      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
105      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
106      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build();
107    return hFileContext;
108  }
109
110  @Override
111  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
112    if (!isPrimaryReplica) {
113      throw new IllegalStateException("Should not call create writer on secondary replicas");
114    }
115    // creating new cache config for each new writer
116    final CacheConfig cacheConf = ctx.getCacheConf();
117    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
118    long totalCompactedFilesSize = params.totalCompactedFilesSize();
119    if (params.isCompaction()) {
120      // Don't cache data on write on compactions, unless specifically configured to do so
121      // Cache only when total file size remains lower than configured threshold
122      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
123      // if data blocks are to be cached on write
124      // during compaction, we should forcefully
125      // cache index and bloom blocks as well
126      if (
127        cacheCompactedBlocksOnWrite
128          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
129      ) {
130        writerCacheConf.enableCacheOnWrite();
131        if (!cacheOnWriteLogged) {
132          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
133            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
134          cacheOnWriteLogged = true;
135        }
136      } else {
137        writerCacheConf.setCacheDataOnWrite(false);
138        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
139          // checking condition once again for logging
140          LOG.debug(
141            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
142              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
143            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
144        }
145      }
146    } else {
147      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
148      if (shouldCacheDataOnWrite) {
149        writerCacheConf.enableCacheOnWrite();
150        if (!cacheOnWriteLogged) {
151          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
152            + "Index blocks and Bloom filter blocks", this);
153          cacheOnWriteLogged = true;
154        }
155      }
156    }
157    Encryption.Context encryptionContext = ctx.getEncryptionContext();
158    HFileContext hFileContext = createFileContext(params.compression(),
159      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
160    Path outputDir;
161    if (requireWritingToTmpDirFirst()) {
162      outputDir =
163        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
164    } else {
165      outputDir = ctx.getFamilyStoreDirectoryPath();
166    }
167    StoreFileWriter.Builder builder =
168      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
169        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
170        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
171        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
172        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
173        .withFileStoragePolicy(params.fileStoragePolicy())
174        .withWriterCreationTracker(params.writerCreationTracker());
175    return builder.build();
176  }
177
178  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
179
180  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
181    Collection<StoreFileInfo> newFiles) throws IOException;
182}