001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.IOException;
023import java.util.Collection;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
028import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
029import org.apache.hadoop.hbase.io.compress.Compression;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.hfile.CacheConfig;
032import org.apache.hadoop.hbase.io.hfile.HFile;
033import org.apache.hadoop.hbase.io.hfile.HFileContext;
034import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
035import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
036import org.apache.hadoop.hbase.regionserver.StoreContext;
037import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
038import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
039import org.apache.hadoop.hbase.regionserver.StoreUtils;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Base class for all store file tracker.
047 * <p/>
048 * Mainly used to place the common logic to skip persistent for secondary replicas.
049 */
050@InterfaceAudience.Private
051abstract class StoreFileTrackerBase implements StoreFileTracker {
052
053  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
054
055  protected final Configuration conf;
056
057  protected final boolean isPrimaryReplica;
058
059  protected final StoreContext ctx;
060
061  private volatile boolean cacheOnWriteLogged;
062
063  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
064    this.conf = conf;
065    this.isPrimaryReplica = isPrimaryReplica;
066    this.ctx = ctx;
067  }
068
069  @Override
070  public final List<StoreFileInfo> load() throws IOException {
071    return doLoadStoreFiles(!isPrimaryReplica);
072  }
073
074  @Override
075  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
076    if (isPrimaryReplica) {
077      doAddNewStoreFiles(newFiles);
078    }
079  }
080
081  @Override
082  public final void replace(Collection<StoreFileInfo> compactedFiles,
083    Collection<StoreFileInfo> newFiles) throws IOException {
084    if (isPrimaryReplica) {
085      doAddCompactionResults(compactedFiles, newFiles);
086    }
087  }
088
089  @Override
090  public final void set(List<StoreFileInfo> files) throws IOException {
091    if (isPrimaryReplica) {
092      doSetStoreFiles(files);
093    }
094  }
095
096  @Override
097  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
098    builder.setValue(TRACKER_IMPL, getTrackerName());
099    return builder;
100  }
101
102  protected final String getTrackerName() {
103    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
104  }
105
106  private HFileContext createFileContext(Compression.Algorithm compression,
107    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
108    if (compression == null) {
109      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
110    }
111    ColumnFamilyDescriptor family = ctx.getFamily();
112    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
113      .withIncludesTags(includesTag).withCompression(compression)
114      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
115      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
116      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
117      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
118      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
119      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
120      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
121    return hFileContext;
122  }
123
124  @Override
125  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
126    if (!isPrimaryReplica) {
127      throw new IllegalStateException("Should not call create writer on secondary replicas");
128    }
129    // creating new cache config for each new writer
130    final CacheConfig cacheConf = ctx.getCacheConf();
131    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
132    long totalCompactedFilesSize = params.totalCompactedFilesSize();
133    if (params.isCompaction()) {
134      // Don't cache data on write on compactions, unless specifically configured to do so
135      // Cache only when total file size remains lower than configured threshold
136      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
137      // if data blocks are to be cached on write
138      // during compaction, we should forcefully
139      // cache index and bloom blocks as well
140      if (
141        cacheCompactedBlocksOnWrite
142          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
143      ) {
144        writerCacheConf.enableCacheOnWrite();
145        if (!cacheOnWriteLogged) {
146          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
147            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
148          cacheOnWriteLogged = true;
149        }
150      } else {
151        writerCacheConf.setCacheDataOnWrite(false);
152        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
153          // checking condition once again for logging
154          LOG.debug(
155            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
156              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
157            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
158        }
159      }
160    } else {
161      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
162      if (shouldCacheDataOnWrite) {
163        writerCacheConf.enableCacheOnWrite();
164        if (!cacheOnWriteLogged) {
165          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
166            + "Index blocks and Bloom filter blocks", this);
167          cacheOnWriteLogged = true;
168        }
169      }
170    }
171    Encryption.Context encryptionContext = ctx.getEncryptionContext();
172    HFileContext hFileContext = createFileContext(params.compression(),
173      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
174    Path outputDir;
175    if (requireWritingToTmpDirFirst()) {
176      outputDir =
177        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
178    } else {
179      outputDir = ctx.getFamilyStoreDirectoryPath();
180    }
181    StoreFileWriter.Builder builder =
182      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
183        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
184        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
185        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
186        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
187        .withFileStoragePolicy(params.fileStoragePolicy())
188        .withWriterCreationTracker(params.writerCreationTracker());
189    return builder.build();
190  }
191
192  /**
193   * For primary replica, we will call load once when opening a region, and the implementation could
194   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
195   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
196   * {@code true}.
197   */
198  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
199
200  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
201
202  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
203    Collection<StoreFileInfo> newFiles) throws IOException;
204
205  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
206
207}