001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.BufferedInputStream; 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Collection; 027import java.util.List; 028import org.apache.commons.io.IOUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.io.HFileLink; 037import org.apache.hadoop.hbase.io.Reference; 038import org.apache.hadoop.hbase.io.compress.Compression; 039import org.apache.hadoop.hbase.io.crypto.Encryption; 040import org.apache.hadoop.hbase.io.hfile.CacheConfig; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.io.hfile.HFileContext; 043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 045import org.apache.hadoop.hbase.regionserver.StoreContext; 046import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 047import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 048import org.apache.hadoop.hbase.regionserver.StoreUtils; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056 057/** 058 * Base class for all store file tracker. 059 * <p/> 060 * Mainly used to place the common logic to skip persistent for secondary replicas. 061 */ 062@InterfaceAudience.Private 063abstract class StoreFileTrackerBase implements StoreFileTracker { 064 065 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 066 067 protected final Configuration conf; 068 069 protected final boolean isPrimaryReplica; 070 071 protected final StoreContext ctx; 072 073 private volatile boolean cacheOnWriteLogged; 074 075 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 076 this.conf = conf; 077 this.isPrimaryReplica = isPrimaryReplica; 078 this.ctx = ctx; 079 } 080 081 @Override 082 public final List<StoreFileInfo> load() throws IOException { 083 return doLoadStoreFiles(!isPrimaryReplica); 084 } 085 086 @Override 087 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 088 if (isPrimaryReplica) { 089 doAddNewStoreFiles(newFiles); 090 } 091 } 092 093 @Override 094 public final void replace(Collection<StoreFileInfo> compactedFiles, 095 Collection<StoreFileInfo> newFiles) throws IOException { 096 if (isPrimaryReplica) { 097 doAddCompactionResults(compactedFiles, newFiles); 098 } 099 } 100 101 @Override 102 public final void set(List<StoreFileInfo> files) throws IOException { 103 if (isPrimaryReplica) { 104 doSetStoreFiles(files); 105 } 106 } 107 108 @Override 109 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 110 builder.setValue(TRACKER_IMPL, getTrackerName()); 111 return builder; 112 } 113 114 protected final String getTrackerName() { 115 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 116 } 117 118 private HFileContext createFileContext(Compression.Algorithm compression, 119 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 120 if (compression == null) { 121 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 122 } 123 ColumnFamilyDescriptor family = ctx.getFamily(); 124 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 125 .withIncludesTags(includesTag).withCompression(compression) 126 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 127 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 128 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 129 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 130 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 131 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 132 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 133 return hFileContext; 134 } 135 136 @Override 137 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 138 if (!isPrimaryReplica) { 139 throw new IllegalStateException("Should not call create writer on secondary replicas"); 140 } 141 // creating new cache config for each new writer 142 final CacheConfig cacheConf = ctx.getCacheConf(); 143 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 144 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 145 if (params.isCompaction()) { 146 // Don't cache data on write on compactions, unless specifically configured to do so 147 // Cache only when total file size remains lower than configured threshold 148 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 149 // if data blocks are to be cached on write 150 // during compaction, we should forcefully 151 // cache index and bloom blocks as well 152 if ( 153 cacheCompactedBlocksOnWrite 154 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 155 ) { 156 writerCacheConf.enableCacheOnWrite(); 157 if (!cacheOnWriteLogged) { 158 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 159 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 160 cacheOnWriteLogged = true; 161 } 162 } else { 163 writerCacheConf.setCacheDataOnWrite(false); 164 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 165 // checking condition once again for logging 166 LOG.debug( 167 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 168 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 169 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 170 } 171 } 172 } else { 173 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 174 if (shouldCacheDataOnWrite) { 175 writerCacheConf.enableCacheOnWrite(); 176 if (!cacheOnWriteLogged) { 177 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 178 + "Index blocks and Bloom filter blocks", this); 179 cacheOnWriteLogged = true; 180 } 181 } 182 } 183 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 184 HFileContext hFileContext = createFileContext(params.compression(), 185 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 186 Path outputDir; 187 if (requireWritingToTmpDirFirst()) { 188 outputDir = 189 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 190 } else { 191 outputDir = ctx.getFamilyStoreDirectoryPath(); 192 } 193 StoreFileWriter.Builder builder = 194 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 195 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 196 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 197 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 198 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 199 .withFileStoragePolicy(params.fileStoragePolicy()) 200 .withWriterCreationTracker(params.writerCreationTracker()) 201 .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) 202 .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); 203 return builder.build(); 204 } 205 206 @Override 207 public Reference createReference(Reference reference, Path path) throws IOException { 208 FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false); 209 try { 210 out.write(reference.toByteArray()); 211 } finally { 212 out.close(); 213 } 214 return reference; 215 } 216 217 /** 218 * Returns true if the specified family has reference files 219 * @param familyName Column Family Name 220 * @return true if family contains reference files 221 */ 222 public boolean hasReferences() throws IOException { 223 Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString()); 224 FileStatus[] files = 225 CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir); 226 if (files != null) { 227 for (FileStatus stat : files) { 228 if (stat.isDirectory()) { 229 continue; 230 } 231 if (StoreFileInfo.isReference(stat.getPath())) { 232 LOG.trace("Reference {}", stat.getPath()); 233 return true; 234 } 235 } 236 } 237 return false; 238 } 239 240 @Override 241 public Reference readReference(final Path p) throws IOException { 242 InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p); 243 try { 244 // I need to be able to move back in the stream if this is not a pb serialization so I can 245 // do the Writable decoding instead. 246 in = in.markSupported() ? in : new BufferedInputStream(in); 247 int pblen = ProtobufUtil.lengthOfPBMagic(); 248 in.mark(pblen); 249 byte[] pbuf = new byte[pblen]; 250 IOUtils.readFully(in, pbuf, 0, pblen); 251 // WATCHOUT! Return in middle of function!!! 252 if (ProtobufUtil.isPBMagicPrefix(pbuf)) { 253 return Reference.convert( 254 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in)); 255 } 256 // Else presume Writables. Need to reset the stream since it didn't start w/ pb. 257 // We won't bother rewriting thie Reference as a pb since Reference is transitory. 258 in.reset(); 259 Reference r = new Reference(); 260 DataInputStream dis = new DataInputStream(in); 261 // Set in = dis so it gets the close below in the finally on our way out. 262 in = dis; 263 r.readFields(dis); 264 return r; 265 } finally { 266 in.close(); 267 } 268 } 269 270 @Override 271 public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica) 272 throws IOException { 273 return getStoreFileInfo(null, initialPath, primaryReplica); 274 } 275 276 @Override 277 public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath, 278 boolean primaryReplica) throws IOException { 279 FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem(); 280 assert fs != null; 281 assert initialPath != null; 282 assert conf != null; 283 Reference reference = null; 284 HFileLink link = null; 285 long createdTimestamp = 0; 286 long size = 0; 287 Path p = initialPath; 288 if (HFileLink.isHFileLink(p)) { 289 // HFileLink 290 reference = null; 291 link = HFileLink.buildFromHFileLinkPattern(conf, p); 292 LOG.trace("{} is a link", p); 293 } else if (StoreFileInfo.isReference(p)) { 294 reference = readReference(p); 295 Path referencePath = StoreFileInfo.getReferredToFile(p); 296 if (HFileLink.isHFileLink(referencePath)) { 297 // HFileLink Reference 298 link = HFileLink.buildFromHFileLinkPattern(conf, referencePath); 299 } else { 300 // Reference 301 link = null; 302 } 303 LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath); 304 } else 305 if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) { 306 // HFile 307 if (fileStatus != null) { 308 createdTimestamp = fileStatus.getModificationTime(); 309 size = fileStatus.getLen(); 310 } else { 311 FileStatus fStatus = fs.getFileStatus(initialPath); 312 createdTimestamp = fStatus.getModificationTime(); 313 size = fStatus.getLen(); 314 } 315 } else { 316 throw new IOException("path=" + p + " doesn't look like a valid StoreFile"); 317 } 318 return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link, 319 isPrimaryReplica); 320 } 321 322 /** 323 * For primary replica, we will call load once when opening a region, and the implementation could 324 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 325 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 326 * {@code true}. 327 */ 328 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 329 330 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 331 332 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 333 Collection<StoreFileInfo> newFiles) throws IOException; 334 335 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 336 337}