001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.BufferedInputStream; 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.List; 029import java.util.regex.Matcher; 030import org.apache.commons.io.IOUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.backup.HFileArchiver; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.io.HFileLink; 042import org.apache.hadoop.hbase.io.Reference; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.crypto.Encryption; 045import org.apache.hadoop.hbase.io.hfile.CacheConfig; 046import org.apache.hadoop.hbase.io.hfile.HFile; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 050import org.apache.hadoop.hbase.regionserver.HStoreFile; 051import org.apache.hadoop.hbase.regionserver.StoreContext; 052import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 053import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 054import org.apache.hadoop.hbase.regionserver.StoreUtils; 055import org.apache.hadoop.hbase.security.access.AbstractReadOnlyController; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 064 065/** 066 * Base class for all store file tracker. 067 * <p/> 068 * Mainly used to place the common logic to skip persistent for secondary replicas. 069 */ 070@InterfaceAudience.Private 071abstract class StoreFileTrackerBase implements StoreFileTracker { 072 073 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 074 075 protected final Configuration conf; 076 077 protected final boolean isPrimaryReplica; 078 079 protected final StoreContext ctx; 080 081 private volatile boolean cacheOnWriteLogged; 082 083 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 084 this.conf = conf; 085 this.isPrimaryReplica = isPrimaryReplica; 086 this.ctx = ctx; 087 } 088 089 private boolean isReadOnlyEnabled() { 090 return conf.getBoolean(HConstants.HBASE_GLOBAL_READONLY_ENABLED_KEY, 091 HConstants.HBASE_GLOBAL_READONLY_ENABLED_DEFAULT); 092 } 093 094 private boolean isNonWritableTableWhenReadOnlyMode() { 095 return isReadOnlyEnabled() 096 && !AbstractReadOnlyController.isWritableInReadOnlyMode(ctx.getTableName()); 097 } 098 099 @Override 100 public final List<StoreFileInfo> load() throws IOException { 101 return doLoadStoreFiles(!isPrimaryReplica || isNonWritableTableWhenReadOnlyMode()); 102 } 103 104 @Override 105 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 106 if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) { 107 doAddNewStoreFiles(newFiles); 108 } 109 } 110 111 @Override 112 public final void replace(Collection<StoreFileInfo> compactedFiles, 113 Collection<StoreFileInfo> newFiles) throws IOException { 114 if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) { 115 doAddCompactionResults(compactedFiles, newFiles); 116 } 117 } 118 119 @Override 120 public final void set(List<StoreFileInfo> files) throws IOException { 121 if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) { 122 doSetStoreFiles(files); 123 } 124 } 125 126 @Override 127 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 128 builder.setValue(TRACKER_IMPL, getTrackerName()); 129 return builder; 130 } 131 132 protected final String getTrackerName() { 133 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 134 } 135 136 private HFileContext createFileContext(Compression.Algorithm compression, 137 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 138 if (compression == null) { 139 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 140 } 141 ColumnFamilyDescriptor family = ctx.getFamily(); 142 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 143 .withIncludesTags(includesTag).withCompression(compression) 144 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 145 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 146 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 147 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 148 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 149 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 150 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 151 return hFileContext; 152 } 153 154 @Override 155 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 156 if (!isPrimaryReplica || isNonWritableTableWhenReadOnlyMode()) { 157 throw new IllegalStateException( 158 "Should not call create writer on secondary replicas or in read-only mode"); 159 } 160 // creating new cache config for each new writer 161 final CacheConfig cacheConf = ctx.getCacheConf(); 162 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 163 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 164 if (params.isCompaction()) { 165 // Don't cache data on write on compactions, unless specifically configured to do so 166 // Cache only when total file size remains lower than configured threshold 167 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 168 // if data blocks are to be cached on write 169 // during compaction, we should forcefully 170 // cache index and bloom blocks as well 171 if ( 172 cacheCompactedBlocksOnWrite 173 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 174 ) { 175 writerCacheConf.enableCacheOnWrite(); 176 if (!cacheOnWriteLogged) { 177 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 178 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 179 cacheOnWriteLogged = true; 180 } 181 } else { 182 writerCacheConf.setCacheDataOnWrite(false); 183 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 184 // checking condition once again for logging 185 LOG.debug( 186 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 187 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 188 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 189 } 190 } 191 } else { 192 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 193 if (shouldCacheDataOnWrite) { 194 writerCacheConf.enableCacheOnWrite(); 195 if (!cacheOnWriteLogged) { 196 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 197 + "Index blocks and Bloom filter blocks", this); 198 cacheOnWriteLogged = true; 199 } 200 } 201 } 202 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 203 HFileContext hFileContext = createFileContext(params.compression(), 204 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 205 Path outputDir; 206 if (requireWritingToTmpDirFirst()) { 207 outputDir = 208 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 209 } else { 210 outputDir = ctx.getFamilyStoreDirectoryPath(); 211 } 212 StoreFileWriter.Builder builder = 213 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 214 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 215 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 216 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 217 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 218 .withFileStoragePolicy(params.fileStoragePolicy()) 219 .withWriterCreationTracker(params.writerCreationTracker()) 220 .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) 221 .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); 222 return builder.build(); 223 } 224 225 @Override 226 public Reference createReference(Reference reference, Path path) throws IOException { 227 FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false); 228 try { 229 out.write(reference.toByteArray()); 230 } finally { 231 out.close(); 232 } 233 return reference; 234 } 235 236 @Override 237 public Reference createAndCommitReference(Reference reference, Path path) throws IOException { 238 return createReference(reference, path); 239 } 240 241 /** 242 * Returns true if the specified family has reference files 243 * @param familyName Column Family Name 244 * @return true if family contains reference files 245 */ 246 public boolean hasReferences() throws IOException { 247 Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString()); 248 FileStatus[] files = 249 CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir); 250 if (files != null) { 251 for (FileStatus stat : files) { 252 if (stat.isDirectory()) { 253 continue; 254 } 255 if (StoreFileInfo.isReference(stat.getPath())) { 256 LOG.trace("Reference {}", stat.getPath()); 257 return true; 258 } 259 } 260 } 261 return false; 262 } 263 264 @Override 265 public Reference readReference(final Path p) throws IOException { 266 InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p); 267 try { 268 // I need to be able to move back in the stream if this is not a pb serialization so I can 269 // do the Writable decoding instead. 270 in = in.markSupported() ? in : new BufferedInputStream(in); 271 int pblen = ProtobufUtil.lengthOfPBMagic(); 272 in.mark(pblen); 273 byte[] pbuf = new byte[pblen]; 274 IOUtils.readFully(in, pbuf, 0, pblen); 275 // WATCHOUT! Return in middle of function!!! 276 if (ProtobufUtil.isPBMagicPrefix(pbuf)) { 277 return Reference.convert( 278 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in)); 279 } 280 // Else presume Writables. Need to reset the stream since it didn't start w/ pb. 281 // We won't bother rewriting thie Reference as a pb since Reference is transitory. 282 in.reset(); 283 Reference r = new Reference(); 284 DataInputStream dis = new DataInputStream(in); 285 // Set in = dis so it gets the close below in the finally on our way out. 286 in = dis; 287 r.readFields(dis); 288 return r; 289 } finally { 290 in.close(); 291 } 292 } 293 294 @Override 295 public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica) 296 throws IOException { 297 return getStoreFileInfo(null, initialPath, primaryReplica); 298 } 299 300 @Override 301 public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath, 302 boolean primaryReplica) throws IOException { 303 FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem(); 304 assert fs != null; 305 assert initialPath != null; 306 assert conf != null; 307 Reference reference = null; 308 HFileLink link = null; 309 long createdTimestamp = 0; 310 long size = 0; 311 Path p = initialPath; 312 if (HFileLink.isHFileLink(p)) { 313 // HFileLink 314 reference = null; 315 link = HFileLink.buildFromHFileLinkPattern(conf, p); 316 LOG.trace("{} is a link", p); 317 } else if (StoreFileInfo.isReference(p)) { 318 reference = readReference(p); 319 Path referencePath = StoreFileInfo.getReferredToFile(p); 320 if (HFileLink.isHFileLink(referencePath)) { 321 // HFileLink Reference 322 link = HFileLink.buildFromHFileLinkPattern(conf, referencePath); 323 } else { 324 // Reference 325 link = null; 326 } 327 LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath); 328 } else 329 if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) { 330 // HFile 331 if (fileStatus != null) { 332 createdTimestamp = fileStatus.getModificationTime(); 333 size = fileStatus.getLen(); 334 } else { 335 FileStatus fStatus = fs.getFileStatus(initialPath); 336 createdTimestamp = fStatus.getModificationTime(); 337 size = fStatus.getLen(); 338 } 339 } else { 340 throw new IOException("path=" + p + " doesn't look like a valid StoreFile"); 341 } 342 return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link, 343 isPrimaryReplica); 344 } 345 346 public HFileLink createAndCommitHFileLink(final TableName linkedTable, final String linkedRegion, 347 final String hfileName, final boolean createBackRef) throws IOException { 348 HFileLink hFileLink = createHFileLink(linkedTable, linkedRegion, hfileName, createBackRef); 349 Path path = new Path(ctx.getFamilyStoreDirectoryPath(), 350 HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName)); 351 StoreFileInfo storeFileInfo = 352 new StoreFileInfo(conf, this.ctx.getRegionFileSystem().getFileSystem(), path, hFileLink); 353 add(Arrays.asList(storeFileInfo)); 354 return hFileLink; 355 } 356 357 public HFileLink createHFileLink(final TableName linkedTable, final String linkedRegion, 358 final String hfileName, final boolean createBackRef) throws IOException { 359 String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName); 360 String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(), 361 ctx.getRegionInfo().getEncodedName()); 362 363 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 364 // Make sure the destination directory exists 365 fs.mkdirs(ctx.getFamilyStoreDirectoryPath()); 366 367 // Make sure the FileLink reference directory exists 368 Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion, 369 ctx.getFamily().getNameAsString()); 370 Path backRefPath = null; 371 if (createBackRef) { 372 Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); 373 fs.mkdirs(backRefssDir); 374 375 // Create the reference for the link 376 backRefPath = new Path(backRefssDir, refName); 377 fs.createNewFile(backRefPath); 378 } 379 try { 380 // Create the link 381 if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) { 382 return new HFileLink(new Path(ctx.getFamilyStoreDirectoryPath(), name), backRefPath, null, 383 archiveStoreDir); 384 } 385 } catch (IOException e) { 386 LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(), 387 e); 388 // Revert the reference if the link creation failed 389 if (createBackRef) { 390 fs.delete(backRefPath, false); 391 } 392 throw e; 393 } 394 throw new IOException("File link=" + name + " already exists under " 395 + ctx.getFamilyStoreDirectoryPath() + " folder."); 396 397 } 398 399 public HFileLink createFromHFileLink(final String hfileLinkName, final boolean createBackRef) 400 throws IOException { 401 Matcher hfileLinkMatcher = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName); 402 if (hfileLinkMatcher.matches()) { 403 return createHFileLink( 404 TableName.valueOf(hfileLinkMatcher.group(1), hfileLinkMatcher.group(2)), 405 hfileLinkMatcher.group(3), hfileLinkMatcher.group(4), createBackRef); 406 } 407 if (StoreFileInfo.isMobFileLink(hfileLinkName)) { 408 Matcher mobLinkMatcher = HFileLink.REF_OR_HFILE_LINK_PATTERN.matcher(hfileLinkName); 409 if (mobLinkMatcher.matches()) { 410 return createHFileLink(TableName.valueOf(mobLinkMatcher.group(1), mobLinkMatcher.group(2)), 411 mobLinkMatcher.group(3), mobLinkMatcher.group(4), createBackRef); 412 } 413 } 414 throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!"); 415 } 416 417 @Override 418 public StoreContext getStoreContext() { 419 return ctx; 420 } 421 422 public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException { 423 archiveStoreFiles(storeFiles); 424 } 425 426 protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException { 427 HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(), 428 ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(), 429 storeFiles); 430 } 431 432 /** 433 * For primary replica, we will call load once when opening a region, and the implementation could 434 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 435 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 436 * {@code true}. 437 */ 438 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 439 440 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 441 442 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 443 Collection<StoreFileInfo> newFiles) throws IOException; 444 445 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 446 447}