001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.BufferedInputStream; 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Collection; 027import java.util.List; 028import java.util.regex.Matcher; 029import org.apache.commons.io.IOUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.backup.HFileArchiver; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.io.HFileLink; 040import org.apache.hadoop.hbase.io.Reference; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.crypto.Encryption; 043import org.apache.hadoop.hbase.io.hfile.CacheConfig; 044import org.apache.hadoop.hbase.io.hfile.HFile; 045import org.apache.hadoop.hbase.io.hfile.HFileContext; 046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 047import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 048import org.apache.hadoop.hbase.regionserver.HStoreFile; 049import org.apache.hadoop.hbase.regionserver.StoreContext; 050import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 051import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 052import org.apache.hadoop.hbase.regionserver.StoreUtils; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.HFileArchiveUtil; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061 062/** 063 * Base class for all store file tracker. 064 * <p/> 065 * Mainly used to place the common logic to skip persistent for secondary replicas. 066 */ 067@InterfaceAudience.Private 068abstract class StoreFileTrackerBase implements StoreFileTracker { 069 070 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 071 072 protected final Configuration conf; 073 074 protected final boolean isPrimaryReplica; 075 076 protected final StoreContext ctx; 077 078 private volatile boolean cacheOnWriteLogged; 079 080 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 081 this.conf = conf; 082 this.isPrimaryReplica = isPrimaryReplica; 083 this.ctx = ctx; 084 } 085 086 @Override 087 public final List<StoreFileInfo> load() throws IOException { 088 return doLoadStoreFiles(!isPrimaryReplica); 089 } 090 091 @Override 092 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 093 if (isPrimaryReplica) { 094 doAddNewStoreFiles(newFiles); 095 } 096 } 097 098 @Override 099 public final void replace(Collection<StoreFileInfo> compactedFiles, 100 Collection<StoreFileInfo> newFiles) throws IOException { 101 if (isPrimaryReplica) { 102 doAddCompactionResults(compactedFiles, newFiles); 103 } 104 } 105 106 @Override 107 public final void set(List<StoreFileInfo> files) throws IOException { 108 if (isPrimaryReplica) { 109 doSetStoreFiles(files); 110 } 111 } 112 113 @Override 114 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 115 builder.setValue(TRACKER_IMPL, getTrackerName()); 116 return builder; 117 } 118 119 protected final String getTrackerName() { 120 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 121 } 122 123 private HFileContext createFileContext(Compression.Algorithm compression, 124 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 125 if (compression == null) { 126 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 127 } 128 ColumnFamilyDescriptor family = ctx.getFamily(); 129 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 130 .withIncludesTags(includesTag).withCompression(compression) 131 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 132 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 133 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 134 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 135 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 136 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 137 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 138 return hFileContext; 139 } 140 141 @Override 142 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 143 if (!isPrimaryReplica) { 144 throw new IllegalStateException("Should not call create writer on secondary replicas"); 145 } 146 // creating new cache config for each new writer 147 final CacheConfig cacheConf = ctx.getCacheConf(); 148 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 149 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 150 if (params.isCompaction()) { 151 // Don't cache data on write on compactions, unless specifically configured to do so 152 // Cache only when total file size remains lower than configured threshold 153 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 154 // if data blocks are to be cached on write 155 // during compaction, we should forcefully 156 // cache index and bloom blocks as well 157 if ( 158 cacheCompactedBlocksOnWrite 159 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 160 ) { 161 writerCacheConf.enableCacheOnWrite(); 162 if (!cacheOnWriteLogged) { 163 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 164 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 165 cacheOnWriteLogged = true; 166 } 167 } else { 168 writerCacheConf.setCacheDataOnWrite(false); 169 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 170 // checking condition once again for logging 171 LOG.debug( 172 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 173 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 174 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 175 } 176 } 177 } else { 178 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 179 if (shouldCacheDataOnWrite) { 180 writerCacheConf.enableCacheOnWrite(); 181 if (!cacheOnWriteLogged) { 182 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 183 + "Index blocks and Bloom filter blocks", this); 184 cacheOnWriteLogged = true; 185 } 186 } 187 } 188 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 189 HFileContext hFileContext = createFileContext(params.compression(), 190 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 191 Path outputDir; 192 if (requireWritingToTmpDirFirst()) { 193 outputDir = 194 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 195 } else { 196 outputDir = ctx.getFamilyStoreDirectoryPath(); 197 } 198 StoreFileWriter.Builder builder = 199 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 200 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 201 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 202 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 203 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 204 .withFileStoragePolicy(params.fileStoragePolicy()) 205 .withWriterCreationTracker(params.writerCreationTracker()) 206 .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) 207 .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); 208 return builder.build(); 209 } 210 211 @Override 212 public Reference createReference(Reference reference, Path path) throws IOException { 213 FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false); 214 try { 215 out.write(reference.toByteArray()); 216 } finally { 217 out.close(); 218 } 219 return reference; 220 } 221 222 /** 223 * Returns true if the specified family has reference files 224 * @param familyName Column Family Name 225 * @return true if family contains reference files 226 */ 227 public boolean hasReferences() throws IOException { 228 Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString()); 229 FileStatus[] files = 230 CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir); 231 if (files != null) { 232 for (FileStatus stat : files) { 233 if (stat.isDirectory()) { 234 continue; 235 } 236 if (StoreFileInfo.isReference(stat.getPath())) { 237 LOG.trace("Reference {}", stat.getPath()); 238 return true; 239 } 240 } 241 } 242 return false; 243 } 244 245 @Override 246 public Reference readReference(final Path p) throws IOException { 247 InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p); 248 try { 249 // I need to be able to move back in the stream if this is not a pb serialization so I can 250 // do the Writable decoding instead. 251 in = in.markSupported() ? in : new BufferedInputStream(in); 252 int pblen = ProtobufUtil.lengthOfPBMagic(); 253 in.mark(pblen); 254 byte[] pbuf = new byte[pblen]; 255 IOUtils.readFully(in, pbuf, 0, pblen); 256 // WATCHOUT! Return in middle of function!!! 257 if (ProtobufUtil.isPBMagicPrefix(pbuf)) { 258 return Reference.convert( 259 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in)); 260 } 261 // Else presume Writables. Need to reset the stream since it didn't start w/ pb. 262 // We won't bother rewriting thie Reference as a pb since Reference is transitory. 263 in.reset(); 264 Reference r = new Reference(); 265 DataInputStream dis = new DataInputStream(in); 266 // Set in = dis so it gets the close below in the finally on our way out. 267 in = dis; 268 r.readFields(dis); 269 return r; 270 } finally { 271 in.close(); 272 } 273 } 274 275 @Override 276 public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica) 277 throws IOException { 278 return getStoreFileInfo(null, initialPath, primaryReplica); 279 } 280 281 @Override 282 public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath, 283 boolean primaryReplica) throws IOException { 284 FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem(); 285 assert fs != null; 286 assert initialPath != null; 287 assert conf != null; 288 Reference reference = null; 289 HFileLink link = null; 290 long createdTimestamp = 0; 291 long size = 0; 292 Path p = initialPath; 293 if (HFileLink.isHFileLink(p)) { 294 // HFileLink 295 reference = null; 296 link = HFileLink.buildFromHFileLinkPattern(conf, p); 297 LOG.trace("{} is a link", p); 298 } else if (StoreFileInfo.isReference(p)) { 299 reference = readReference(p); 300 Path referencePath = StoreFileInfo.getReferredToFile(p); 301 if (HFileLink.isHFileLink(referencePath)) { 302 // HFileLink Reference 303 link = HFileLink.buildFromHFileLinkPattern(conf, referencePath); 304 } else { 305 // Reference 306 link = null; 307 } 308 LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath); 309 } else 310 if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) { 311 // HFile 312 if (fileStatus != null) { 313 createdTimestamp = fileStatus.getModificationTime(); 314 size = fileStatus.getLen(); 315 } else { 316 FileStatus fStatus = fs.getFileStatus(initialPath); 317 createdTimestamp = fStatus.getModificationTime(); 318 size = fStatus.getLen(); 319 } 320 } else { 321 throw new IOException("path=" + p + " doesn't look like a valid StoreFile"); 322 } 323 return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link, 324 isPrimaryReplica); 325 } 326 327 public String createHFileLink(final TableName linkedTable, final String linkedRegion, 328 final String hfileName, final boolean createBackRef) throws IOException { 329 String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName); 330 String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(), 331 ctx.getRegionInfo().getEncodedName()); 332 333 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 334 // Make sure the destination directory exists 335 fs.mkdirs(ctx.getFamilyStoreDirectoryPath()); 336 337 // Make sure the FileLink reference directory exists 338 Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion, 339 ctx.getFamily().getNameAsString()); 340 Path backRefPath = null; 341 if (createBackRef) { 342 Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); 343 fs.mkdirs(backRefssDir); 344 345 // Create the reference for the link 346 backRefPath = new Path(backRefssDir, refName); 347 fs.createNewFile(backRefPath); 348 } 349 try { 350 // Create the link 351 if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) { 352 return name; 353 } 354 } catch (IOException e) { 355 LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(), 356 e); 357 // Revert the reference if the link creation failed 358 if (createBackRef) { 359 fs.delete(backRefPath, false); 360 } 361 throw e; 362 } 363 throw new IOException("File link=" + name + " already exists under " 364 + ctx.getFamilyStoreDirectoryPath() + " folder."); 365 366 } 367 368 public String createFromHFileLink(final String hfileLinkName, final boolean createBackRef) 369 throws IOException { 370 Matcher m = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName); 371 if (!m.matches()) { 372 throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!"); 373 } 374 return createHFileLink(TableName.valueOf(m.group(1), m.group(2)), m.group(3), m.group(4), 375 createBackRef); 376 } 377 378 public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException { 379 archiveStoreFiles(storeFiles); 380 } 381 382 protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException { 383 HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(), 384 ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(), 385 storeFiles); 386 } 387 388 /** 389 * For primary replica, we will call load once when opening a region, and the implementation could 390 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 391 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 392 * {@code true}. 393 */ 394 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 395 396 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 397 398 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 399 Collection<StoreFileInfo> newFiles) throws IOException; 400 401 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 402 403}