001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.BufferedInputStream; 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.regex.Matcher; 030import org.apache.commons.io.IOUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.backup.HFileArchiver; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.io.HFileLink; 041import org.apache.hadoop.hbase.io.Reference; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.io.crypto.Encryption; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.HFile; 046import org.apache.hadoop.hbase.io.hfile.HFileContext; 047import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 048import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 049import org.apache.hadoop.hbase.regionserver.HStoreFile; 050import org.apache.hadoop.hbase.regionserver.StoreContext; 051import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 052import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 053import org.apache.hadoop.hbase.regionserver.StoreUtils; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.HFileArchiveUtil; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062 063/** 064 * Base class for all store file tracker. 065 * <p/> 066 * Mainly used to place the common logic to skip persistent for secondary replicas. 067 */ 068@InterfaceAudience.Private 069abstract class StoreFileTrackerBase implements StoreFileTracker { 070 071 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 072 073 protected final Configuration conf; 074 075 protected final boolean isPrimaryReplica; 076 077 protected final StoreContext ctx; 078 079 private volatile boolean cacheOnWriteLogged; 080 081 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 082 this.conf = conf; 083 this.isPrimaryReplica = isPrimaryReplica; 084 this.ctx = ctx; 085 } 086 087 @Override 088 public final List<StoreFileInfo> load() throws IOException { 089 return doLoadStoreFiles(!isPrimaryReplica); 090 } 091 092 @Override 093 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 094 if (isPrimaryReplica) { 095 doAddNewStoreFiles(newFiles); 096 } 097 } 098 099 @Override 100 public final void replace(Collection<StoreFileInfo> compactedFiles, 101 Collection<StoreFileInfo> newFiles) throws IOException { 102 if (isPrimaryReplica) { 103 doAddCompactionResults(compactedFiles, newFiles); 104 } 105 } 106 107 @Override 108 public final void set(List<StoreFileInfo> files) throws IOException { 109 if (isPrimaryReplica) { 110 doSetStoreFiles(files); 111 } 112 } 113 114 @Override 115 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 116 builder.setValue(TRACKER_IMPL, getTrackerName()); 117 return builder; 118 } 119 120 protected final String getTrackerName() { 121 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 122 } 123 124 private HFileContext createFileContext(Compression.Algorithm compression, 125 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 126 if (compression == null) { 127 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 128 } 129 ColumnFamilyDescriptor family = ctx.getFamily(); 130 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 131 .withIncludesTags(includesTag).withCompression(compression) 132 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 133 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 134 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 135 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 136 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 137 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 138 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 139 return hFileContext; 140 } 141 142 @Override 143 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 144 if (!isPrimaryReplica) { 145 throw new IllegalStateException("Should not call create writer on secondary replicas"); 146 } 147 // creating new cache config for each new writer 148 final CacheConfig cacheConf = ctx.getCacheConf(); 149 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 150 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 151 if (params.isCompaction()) { 152 // Don't cache data on write on compactions, unless specifically configured to do so 153 // Cache only when total file size remains lower than configured threshold 154 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 155 // if data blocks are to be cached on write 156 // during compaction, we should forcefully 157 // cache index and bloom blocks as well 158 if ( 159 cacheCompactedBlocksOnWrite 160 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 161 ) { 162 writerCacheConf.enableCacheOnWrite(); 163 if (!cacheOnWriteLogged) { 164 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 165 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 166 cacheOnWriteLogged = true; 167 } 168 } else { 169 writerCacheConf.setCacheDataOnWrite(false); 170 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 171 // checking condition once again for logging 172 LOG.debug( 173 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 174 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 175 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 176 } 177 } 178 } else { 179 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 180 if (shouldCacheDataOnWrite) { 181 writerCacheConf.enableCacheOnWrite(); 182 if (!cacheOnWriteLogged) { 183 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 184 + "Index blocks and Bloom filter blocks", this); 185 cacheOnWriteLogged = true; 186 } 187 } 188 } 189 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 190 HFileContext hFileContext = createFileContext(params.compression(), 191 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 192 Path outputDir; 193 if (requireWritingToTmpDirFirst()) { 194 outputDir = 195 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 196 } else { 197 outputDir = ctx.getFamilyStoreDirectoryPath(); 198 } 199 StoreFileWriter.Builder builder = 200 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 201 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 202 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 203 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 204 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 205 .withFileStoragePolicy(params.fileStoragePolicy()) 206 .withWriterCreationTracker(params.writerCreationTracker()) 207 .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) 208 .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); 209 return builder.build(); 210 } 211 212 @Override 213 public Reference createReference(Reference reference, Path path) throws IOException { 214 FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false); 215 try { 216 out.write(reference.toByteArray()); 217 } finally { 218 out.close(); 219 } 220 return reference; 221 } 222 223 @Override 224 public Reference createAndCommitReference(Reference reference, Path path) throws IOException { 225 return createReference(reference, path); 226 } 227 228 /** 229 * Returns true if the specified family has reference files 230 * @param familyName Column Family Name 231 * @return true if family contains reference files 232 */ 233 public boolean hasReferences() throws IOException { 234 Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString()); 235 FileStatus[] files = 236 CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir); 237 if (files != null) { 238 for (FileStatus stat : files) { 239 if (stat.isDirectory()) { 240 continue; 241 } 242 if (StoreFileInfo.isReference(stat.getPath())) { 243 LOG.trace("Reference {}", stat.getPath()); 244 return true; 245 } 246 } 247 } 248 return false; 249 } 250 251 @Override 252 public Reference readReference(final Path p) throws IOException { 253 InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p); 254 try { 255 // I need to be able to move back in the stream if this is not a pb serialization so I can 256 // do the Writable decoding instead. 257 in = in.markSupported() ? in : new BufferedInputStream(in); 258 int pblen = ProtobufUtil.lengthOfPBMagic(); 259 in.mark(pblen); 260 byte[] pbuf = new byte[pblen]; 261 IOUtils.readFully(in, pbuf, 0, pblen); 262 // WATCHOUT! Return in middle of function!!! 263 if (ProtobufUtil.isPBMagicPrefix(pbuf)) { 264 return Reference.convert( 265 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in)); 266 } 267 // Else presume Writables. Need to reset the stream since it didn't start w/ pb. 268 // We won't bother rewriting thie Reference as a pb since Reference is transitory. 269 in.reset(); 270 Reference r = new Reference(); 271 DataInputStream dis = new DataInputStream(in); 272 // Set in = dis so it gets the close below in the finally on our way out. 273 in = dis; 274 r.readFields(dis); 275 return r; 276 } finally { 277 in.close(); 278 } 279 } 280 281 @Override 282 public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica) 283 throws IOException { 284 return getStoreFileInfo(null, initialPath, primaryReplica); 285 } 286 287 @Override 288 public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath, 289 boolean primaryReplica) throws IOException { 290 FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem(); 291 assert fs != null; 292 assert initialPath != null; 293 assert conf != null; 294 Reference reference = null; 295 HFileLink link = null; 296 long createdTimestamp = 0; 297 long size = 0; 298 Path p = initialPath; 299 if (HFileLink.isHFileLink(p)) { 300 // HFileLink 301 reference = null; 302 link = HFileLink.buildFromHFileLinkPattern(conf, p); 303 LOG.trace("{} is a link", p); 304 } else if (StoreFileInfo.isReference(p)) { 305 reference = readReference(p); 306 Path referencePath = StoreFileInfo.getReferredToFile(p); 307 if (HFileLink.isHFileLink(referencePath)) { 308 // HFileLink Reference 309 link = HFileLink.buildFromHFileLinkPattern(conf, referencePath); 310 } else { 311 // Reference 312 link = null; 313 } 314 LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath); 315 } else 316 if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) { 317 // HFile 318 if (fileStatus != null) { 319 createdTimestamp = fileStatus.getModificationTime(); 320 size = fileStatus.getLen(); 321 } else { 322 FileStatus fStatus = fs.getFileStatus(initialPath); 323 createdTimestamp = fStatus.getModificationTime(); 324 size = fStatus.getLen(); 325 } 326 } else { 327 throw new IOException("path=" + p + " doesn't look like a valid StoreFile"); 328 } 329 return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link, 330 isPrimaryReplica); 331 } 332 333 public HFileLink createAndCommitHFileLink(final TableName linkedTable, final String linkedRegion, 334 final String hfileName, final boolean createBackRef) throws IOException { 335 HFileLink hFileLink = createHFileLink(linkedTable, linkedRegion, hfileName, createBackRef); 336 Path path = new Path(ctx.getFamilyStoreDirectoryPath(), 337 HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName)); 338 StoreFileInfo storeFileInfo = 339 new StoreFileInfo(conf, this.ctx.getRegionFileSystem().getFileSystem(), path, hFileLink); 340 add(Arrays.asList(storeFileInfo)); 341 return hFileLink; 342 } 343 344 public HFileLink createHFileLink(final TableName linkedTable, final String linkedRegion, 345 final String hfileName, final boolean createBackRef) throws IOException { 346 String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName); 347 String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(), 348 ctx.getRegionInfo().getEncodedName()); 349 350 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 351 // Make sure the destination directory exists 352 fs.mkdirs(ctx.getFamilyStoreDirectoryPath()); 353 354 // Make sure the FileLink reference directory exists 355 Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion, 356 ctx.getFamily().getNameAsString()); 357 Path backRefPath = null; 358 if (createBackRef) { 359 Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); 360 fs.mkdirs(backRefssDir); 361 362 // Create the reference for the link 363 backRefPath = new Path(backRefssDir, refName); 364 fs.createNewFile(backRefPath); 365 } 366 try { 367 // Create the link 368 if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) { 369 return new HFileLink(new Path(ctx.getFamilyStoreDirectoryPath(), name), backRefPath, null, 370 archiveStoreDir); 371 } 372 } catch (IOException e) { 373 LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(), 374 e); 375 // Revert the reference if the link creation failed 376 if (createBackRef) { 377 fs.delete(backRefPath, false); 378 } 379 throw e; 380 } 381 throw new IOException("File link=" + name + " already exists under " 382 + ctx.getFamilyStoreDirectoryPath() + " folder."); 383 384 } 385 386 public HFileLink createFromHFileLink(final String hfileLinkName, final boolean createBackRef) 387 throws IOException { 388 Matcher hfileLinkMatcher = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName); 389 if (hfileLinkMatcher.matches()) { 390 return createHFileLink( 391 TableName.valueOf(hfileLinkMatcher.group(1), hfileLinkMatcher.group(2)), 392 hfileLinkMatcher.group(3), hfileLinkMatcher.group(4), createBackRef); 393 } 394 if (StoreFileInfo.isMobFileLink(hfileLinkName)) { 395 Matcher mobLinkMatcher = HFileLink.REF_OR_HFILE_LINK_PATTERN.matcher(hfileLinkName); 396 if (mobLinkMatcher.matches()) { 397 return createHFileLink(TableName.valueOf(mobLinkMatcher.group(1), mobLinkMatcher.group(2)), 398 mobLinkMatcher.group(3), mobLinkMatcher.group(4), createBackRef); 399 } 400 } 401 throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!"); 402 } 403 404 @Override 405 public StoreContext getStoreContext() { 406 return ctx; 407 } 408 409 public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException { 410 archiveStoreFiles(storeFiles); 411 } 412 413 protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException { 414 HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(), 415 ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(), 416 storeFiles); 417 } 418 419 /** 420 * For primary replica, we will call load once when opening a region, and the implementation could 421 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 422 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 423 * {@code true}. 424 */ 425 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 426 427 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 428 429 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 430 Collection<StoreFileInfo> newFiles) throws IOException; 431 432 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 433 434}