001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.IOException;
023import java.util.Collection;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
028import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
029import org.apache.hadoop.hbase.io.compress.Compression;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.hfile.CacheConfig;
032import org.apache.hadoop.hbase.io.hfile.HFile;
033import org.apache.hadoop.hbase.io.hfile.HFileContext;
034import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
035import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
036import org.apache.hadoop.hbase.regionserver.StoreContext;
037import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
038import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
039import org.apache.hadoop.hbase.regionserver.StoreUtils;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Base class for all store file tracker.
047 * <p/>
048 * Mainly used to place the common logic to skip persistent for secondary replicas.
049 */
050@InterfaceAudience.Private
051abstract class StoreFileTrackerBase implements StoreFileTracker {
052
053  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
054
055  protected final Configuration conf;
056
057  protected final boolean isPrimaryReplica;
058
059  protected final StoreContext ctx;
060
061  private volatile boolean cacheOnWriteLogged;
062
063  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
064    this.conf = conf;
065    this.isPrimaryReplica = isPrimaryReplica;
066    this.ctx = ctx;
067  }
068
069  @Override
070  public final List<StoreFileInfo> load() throws IOException {
071    return doLoadStoreFiles(!isPrimaryReplica);
072  }
073
074  @Override
075  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
076    if (isPrimaryReplica) {
077      doAddNewStoreFiles(newFiles);
078    }
079  }
080
081  @Override
082  public final void replace(Collection<StoreFileInfo> compactedFiles,
083    Collection<StoreFileInfo> newFiles) throws IOException {
084    if (isPrimaryReplica) {
085      doAddCompactionResults(compactedFiles, newFiles);
086    }
087  }
088
089  @Override
090  public final void set(List<StoreFileInfo> files) throws IOException {
091    if (isPrimaryReplica) {
092      doSetStoreFiles(files);
093    }
094  }
095
096  @Override
097  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
098    builder.setValue(TRACKER_IMPL, getTrackerName());
099    return builder;
100  }
101
102  protected final String getTrackerName() {
103    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
104  }
105
106  private HFileContext createFileContext(Compression.Algorithm compression,
107    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
108    if (compression == null) {
109      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
110    }
111    ColumnFamilyDescriptor family = ctx.getFamily();
112    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
113      .withIncludesTags(includesTag).withCompression(compression)
114      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
115      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
116      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
117      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
118      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
119      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build();
120    return hFileContext;
121  }
122
123  @Override
124  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
125    if (!isPrimaryReplica) {
126      throw new IllegalStateException("Should not call create writer on secondary replicas");
127    }
128    // creating new cache config for each new writer
129    final CacheConfig cacheConf = ctx.getCacheConf();
130    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
131    long totalCompactedFilesSize = params.totalCompactedFilesSize();
132    if (params.isCompaction()) {
133      // Don't cache data on write on compactions, unless specifically configured to do so
134      // Cache only when total file size remains lower than configured threshold
135      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
136      // if data blocks are to be cached on write
137      // during compaction, we should forcefully
138      // cache index and bloom blocks as well
139      if (
140        cacheCompactedBlocksOnWrite
141          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
142      ) {
143        writerCacheConf.enableCacheOnWrite();
144        if (!cacheOnWriteLogged) {
145          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
146            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
147          cacheOnWriteLogged = true;
148        }
149      } else {
150        writerCacheConf.setCacheDataOnWrite(false);
151        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
152          // checking condition once again for logging
153          LOG.debug(
154            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
155              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
156            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
157        }
158      }
159    } else {
160      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
161      if (shouldCacheDataOnWrite) {
162        writerCacheConf.enableCacheOnWrite();
163        if (!cacheOnWriteLogged) {
164          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
165            + "Index blocks and Bloom filter blocks", this);
166          cacheOnWriteLogged = true;
167        }
168      }
169    }
170    Encryption.Context encryptionContext = ctx.getEncryptionContext();
171    HFileContext hFileContext = createFileContext(params.compression(),
172      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
173    Path outputDir;
174    if (requireWritingToTmpDirFirst()) {
175      outputDir =
176        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
177    } else {
178      outputDir = ctx.getFamilyStoreDirectoryPath();
179    }
180    StoreFileWriter.Builder builder =
181      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
182        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
183        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
184        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
185        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
186        .withFileStoragePolicy(params.fileStoragePolicy())
187        .withWriterCreationTracker(params.writerCreationTracker());
188    return builder.build();
189  }
190
191  /**
192   * For primary replica, we will call load once when opening a region, and the implementation could
193   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
194   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
195   * {@code true}.
196   */
197  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
198
199  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
200
201  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
202    Collection<StoreFileInfo> newFiles) throws IOException;
203
204  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
205
206}