001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.IOException; 023import java.util.Collection; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 027import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 028import org.apache.hadoop.hbase.io.compress.Compression; 029import org.apache.hadoop.hbase.io.crypto.Encryption; 030import org.apache.hadoop.hbase.io.hfile.CacheConfig; 031import org.apache.hadoop.hbase.io.hfile.HFile; 032import org.apache.hadoop.hbase.io.hfile.HFileContext; 033import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 034import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 035import org.apache.hadoop.hbase.regionserver.StoreContext; 036import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 037import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 038import org.apache.hadoop.hbase.regionserver.StoreUtils; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Base class for all store file tracker. 046 * <p/> 047 * Mainly used to place the common logic to skip persistent for secondary replicas. 048 */ 049@InterfaceAudience.Private 050abstract class StoreFileTrackerBase implements StoreFileTracker { 051 052 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 053 054 protected final Configuration conf; 055 056 protected final boolean isPrimaryReplica; 057 058 protected final StoreContext ctx; 059 060 private volatile boolean cacheOnWriteLogged; 061 062 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 063 this.conf = conf; 064 this.isPrimaryReplica = isPrimaryReplica; 065 this.ctx = ctx; 066 } 067 068 @Override 069 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 070 if (isPrimaryReplica) { 071 doAddNewStoreFiles(newFiles); 072 } 073 } 074 075 @Override 076 public final void replace(Collection<StoreFileInfo> compactedFiles, 077 Collection<StoreFileInfo> newFiles) throws IOException { 078 if (isPrimaryReplica) { 079 doAddCompactionResults(compactedFiles, newFiles); 080 } 081 } 082 083 @Override 084 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 085 builder.setValue(TRACKER_IMPL, getTrackerName()); 086 return builder; 087 } 088 089 protected final String getTrackerName() { 090 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 091 } 092 093 private HFileContext createFileContext(Compression.Algorithm compression, 094 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 095 if (compression == null) { 096 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 097 } 098 ColumnFamilyDescriptor family = ctx.getFamily(); 099 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 100 .withIncludesTags(includesTag).withCompression(compression) 101 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 102 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 103 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 104 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 105 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 106 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build(); 107 return hFileContext; 108 } 109 110 @Override 111 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 112 if (!isPrimaryReplica) { 113 throw new IllegalStateException("Should not call create writer on secondary replicas"); 114 } 115 // creating new cache config for each new writer 116 final CacheConfig cacheConf = ctx.getCacheConf(); 117 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 118 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 119 if (params.isCompaction()) { 120 // Don't cache data on write on compactions, unless specifically configured to do so 121 // Cache only when total file size remains lower than configured threshold 122 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 123 // if data blocks are to be cached on write 124 // during compaction, we should forcefully 125 // cache index and bloom blocks as well 126 if ( 127 cacheCompactedBlocksOnWrite 128 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 129 ) { 130 writerCacheConf.enableCacheOnWrite(); 131 if (!cacheOnWriteLogged) { 132 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 133 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 134 cacheOnWriteLogged = true; 135 } 136 } else { 137 writerCacheConf.setCacheDataOnWrite(false); 138 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 139 // checking condition once again for logging 140 LOG.debug( 141 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 142 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 143 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 144 } 145 } 146 } else { 147 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 148 if (shouldCacheDataOnWrite) { 149 writerCacheConf.enableCacheOnWrite(); 150 if (!cacheOnWriteLogged) { 151 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 152 + "Index blocks and Bloom filter blocks", this); 153 cacheOnWriteLogged = true; 154 } 155 } 156 } 157 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 158 HFileContext hFileContext = createFileContext(params.compression(), 159 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 160 Path outputDir; 161 if (requireWritingToTmpDirFirst()) { 162 outputDir = 163 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 164 } else { 165 outputDir = ctx.getFamilyStoreDirectoryPath(); 166 } 167 StoreFileWriter.Builder builder = 168 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 169 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 170 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 171 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 172 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 173 .withFileStoragePolicy(params.fileStoragePolicy()) 174 .withWriterCreationTracker(params.writerCreationTracker()); 175 return builder.build(); 176 } 177 178 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 179 180 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 181 Collection<StoreFileInfo> newFiles) throws IOException; 182}