001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.ExtendedCell; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.InnerStoreCellComparator; 066import org.apache.hadoop.hbase.MemoryCompactionPolicy; 067import org.apache.hadoop.hbase.MetaCellComparator; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.backup.FailedArchiveException; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.conf.ConfigKey; 074import org.apache.hadoop.hbase.conf.ConfigurationManager; 075import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 076import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 077import org.apache.hadoop.hbase.io.HeapSize; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 084import org.apache.hadoop.hbase.monitoring.MonitoredTask; 085import org.apache.hadoop.hbase.quotas.RegionSizeStore; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 090import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 091import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 092import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 093import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 094import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 095import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 096import org.apache.hadoop.hbase.security.SecurityUtil; 097import org.apache.hadoop.hbase.security.User; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.ClassSize; 100import org.apache.hadoop.hbase.util.CommonFSUtils; 101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 102import org.apache.hadoop.hbase.util.Pair; 103import org.apache.hadoop.hbase.util.ReflectionUtils; 104import org.apache.hadoop.util.StringUtils; 105import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 106import org.apache.yetus.audience.InterfaceAudience; 107import org.slf4j.Logger; 108import org.slf4j.LoggerFactory; 109 110import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 112import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 113import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 114import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 117 118import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 120 121/** 122 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 123 * which stretch backwards over time. 124 * <p> 125 * There's no reason to consider append-logging at this level; all logging and locking is handled at 126 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 127 * important of those services is compaction services where files are aggregated once they pass a 128 * configurable threshold. 129 * <p> 130 * Locking and transactions are handled at a higher level. This API should not be called directly 131 * but by an HRegion manager. 132 */ 133@InterfaceAudience.Private 134public class HStore 135 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 136 public static final String MEMSTORE_CLASS_NAME = 137 ConfigKey.CLASS("hbase.regionserver.memstore.class", MemStore.class); 138 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 139 ConfigKey.INT("hbase.server.compactchecker.interval.multiplier"); 140 public static final String BLOCKING_STOREFILES_KEY = 141 ConfigKey.INT("hbase.hstore.blockingStoreFiles"); 142 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 143 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 144 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 145 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 146 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 147 148 // HBASE-24428 : Update compaction priority for recently split daughter regions 149 // so as to prioritize their compaction. 150 // Any compaction candidate with higher priority than compaction of newly split daugher regions 151 // should have priority value < (Integer.MIN_VALUE + 1000) 152 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 153 154 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 155 156 protected final MemStore memstore; 157 // This stores directory in the filesystem. 158 private final HRegion region; 159 protected Configuration conf; 160 private long lastCompactSize = 0; 161 volatile boolean forceMajor = false; 162 private AtomicLong storeSize = new AtomicLong(); 163 private AtomicLong totalUncompressedBytes = new AtomicLong(); 164 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 165 // rows that has cells from both memstore and files (or only files) 166 private LongAdder mixedRowReadsCount = new LongAdder(); 167 168 /** 169 * Lock specific to archiving compacted store files. This avoids races around the combination of 170 * retrieving the list of compacted files and moving them to the archive directory. Since this is 171 * usually a background process (other than on close), we don't want to handle this with the store 172 * write lock, which would block readers and degrade performance. Locked by: - 173 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 174 */ 175 final ReentrantLock archiveLock = new ReentrantLock(); 176 177 private final boolean verifyBulkLoads; 178 179 /** 180 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 181 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 182 * message that identifies the Store experience this high-level of concurrency. 183 */ 184 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 185 private final int parallelPutCountPrintThreshold; 186 187 private ScanInfo scanInfo; 188 189 // All access must be synchronized. 190 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 191 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 192 193 // All access must be synchronized. 194 private final Set<ChangedReadersObserver> changedReaderObservers = 195 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 196 197 private HFileDataBlockEncoder dataBlockEncoder; 198 199 final StoreEngine<?, ?, ?, ?> storeEngine; 200 201 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 202 private volatile OffPeakHours offPeakHours; 203 204 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 205 private int flushRetriesNumber; 206 private int pauseTime; 207 208 private long blockingFileCount; 209 private int compactionCheckMultiplier; 210 211 private AtomicLong flushedCellsCount = new AtomicLong(); 212 private AtomicLong compactedCellsCount = new AtomicLong(); 213 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 214 private AtomicLong flushedCellsSize = new AtomicLong(); 215 private AtomicLong flushedOutputFileSize = new AtomicLong(); 216 private AtomicLong compactedCellsSize = new AtomicLong(); 217 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 218 219 private final StoreContext storeContext; 220 221 // Used to track the store files which are currently being written. For compaction, if we want to 222 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 223 // track the store files being written when flushing. 224 // Notice that the creation is in the background compaction or flush thread and we will get the 225 // files in other thread, so it needs to be thread safe. 226 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 227 228 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 229 230 @Override 231 public void accept(Path t) { 232 files.add(t); 233 } 234 235 public Set<Path> get() { 236 return Collections.unmodifiableSet(files); 237 } 238 } 239 240 // We may have multiple compaction running at the same time, and flush can also happen at the same 241 // time, so here we need to use a collection, and the collection needs to be thread safe. 242 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 243 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 244 // IdentityHashMap if later we want to implement hashCode or equals. 245 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 246 Collections.newSetFromMap(new ConcurrentHashMap<>()); 247 248 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 249 // the broken store files under the data directory, which means we do not need to track the store 250 // file writer creation. So here we abstract a factory to return different trackers for different 251 // SFT implementations. 252 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 253 254 private final boolean warmup; 255 256 /** 257 * Constructor 258 * @param family HColumnDescriptor for this column 259 * @param confParam configuration object failed. Can be null. 260 */ 261 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 262 final Configuration confParam, boolean warmup) throws IOException { 263 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 264 265 this.region = region; 266 this.storeContext = initializeStoreContext(family); 267 268 // Assemble the store's home directory and Ensure it exists. 269 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 270 271 // set block storage policy for store directory 272 String policyName = family.getStoragePolicy(); 273 if (null == policyName) { 274 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 275 } 276 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 277 278 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 279 280 // used by ScanQueryMatcher 281 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 282 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 283 // Get TTL 284 long ttl = determineTTLFromFamily(family); 285 // Why not just pass a HColumnDescriptor in here altogether? Even if have 286 // to clone it? 287 scanInfo = 288 new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator()); 289 this.memstore = getMemstore(); 290 291 this.offPeakHours = OffPeakHours.getInstance(conf); 292 293 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 294 295 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 296 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 297 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 298 if (this.compactionCheckMultiplier <= 0) { 299 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 300 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 301 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 302 } 303 304 this.warmup = warmup; 305 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 306 storeEngine.initialize(warmup); 307 // if require writing to tmp dir first, then we just return null, which indicate that we do not 308 // need to track the creation of store file writer, otherwise we return a new 309 // StoreFileWriterCreationTracker. 310 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 311 ? () -> null 312 : () -> new StoreFileWriterCreationTracker(); 313 refreshStoreSizeAndTotalBytes(); 314 315 flushRetriesNumber = 316 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 317 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 318 if (flushRetriesNumber <= 0) { 319 throw new IllegalArgumentException( 320 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 321 } 322 323 int confPrintThreshold = 324 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 325 if (confPrintThreshold < 10) { 326 confPrintThreshold = 10; 327 } 328 this.parallelPutCountPrintThreshold = confPrintThreshold; 329 330 LOG.info( 331 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 332 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 333 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 334 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 335 } 336 337 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 338 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 339 .withEncryptionContext(SecurityUtil.createEncryptionContext(conf, region.getTableDescriptor(), 340 family, region.getManagedKeyDataCache(), region.getSystemKeyCache())) 341 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 342 .withCellComparator(region.getTableDescriptor().isMetaTable() || conf 343 .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR) 344 ? MetaCellComparator.META_COMPARATOR 345 : InnerStoreCellComparator.INNER_STORE_COMPARATOR) 346 .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles) 347 .withRegionFileSystem(region.getRegionFileSystem()) 348 .withFavoredNodesSupplier(this::getFavoredNodes) 349 .withFamilyStoreDirectoryPath( 350 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 351 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 352 } 353 354 private InetSocketAddress[] getFavoredNodes() { 355 InetSocketAddress[] favoredNodes = null; 356 if (region.getRegionServerServices() != null) { 357 favoredNodes = region.getRegionServerServices() 358 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 359 } 360 return favoredNodes; 361 } 362 363 /** Returns MemStore Instance to use in this store. */ 364 private MemStore getMemstore() { 365 MemStore ms = null; 366 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 367 MemoryCompactionPolicy inMemoryCompaction = null; 368 if (this.getTableName().isSystemTable()) { 369 inMemoryCompaction = MemoryCompactionPolicy 370 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 371 } else { 372 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 373 } 374 if (inMemoryCompaction == null) { 375 inMemoryCompaction = 376 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 377 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 378 } 379 380 switch (inMemoryCompaction) { 381 case NONE: 382 Class<? extends MemStore> memStoreClass = 383 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 384 ms = ReflectionUtils.newInstance(memStoreClass, 385 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 386 break; 387 default: 388 Class<? extends CompactingMemStore> compactingMemStoreClass = 389 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 390 ms = 391 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 392 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 393 } 394 return ms; 395 } 396 397 /** 398 * Creates the cache config. 399 * @param family The current column family. 400 */ 401 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 402 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 403 region.getRegionServicesForStores().getByteBuffAllocator()); 404 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 405 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 406 return cacheConf; 407 } 408 409 /** 410 * Creates the store engine configured for the given Store. 411 * @param store The store. An unfortunate dependency needed due to it being passed to 412 * coprocessors via the compactor. 413 * @param conf Store configuration. 414 * @param kvComparator KVComparator for storeFileManager. 415 * @return StoreEngine to use. 416 */ 417 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 418 CellComparator kvComparator) throws IOException { 419 return StoreEngine.create(store, conf, kvComparator); 420 } 421 422 /** Returns TTL in seconds of the specified family */ 423 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 424 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 425 long ttl = family.getTimeToLive(); 426 if (ttl == HConstants.FOREVER) { 427 // Default is unlimited ttl. 428 ttl = Long.MAX_VALUE; 429 } else if (ttl == -1) { 430 ttl = Long.MAX_VALUE; 431 } else { 432 // Second -> ms adjust for user data 433 ttl *= 1000; 434 } 435 return ttl; 436 } 437 438 public StoreContext getStoreContext() { 439 return storeContext; 440 } 441 442 @Override 443 public String getColumnFamilyName() { 444 return this.storeContext.getFamily().getNameAsString(); 445 } 446 447 @Override 448 public TableName getTableName() { 449 return this.getRegionInfo().getTable(); 450 } 451 452 @Override 453 public FileSystem getFileSystem() { 454 return storeContext.getRegionFileSystem().getFileSystem(); 455 } 456 457 public HRegionFileSystem getRegionFileSystem() { 458 return storeContext.getRegionFileSystem(); 459 } 460 461 /* Implementation of StoreConfigInformation */ 462 @Override 463 public long getStoreFileTtl() { 464 // TTL only applies if there's no MIN_VERSIONs setting on the column. 465 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 466 } 467 468 @Override 469 public long getMemStoreFlushSize() { 470 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 471 return this.region.memstoreFlushSize; 472 } 473 474 @Override 475 public MemStoreSize getFlushableSize() { 476 return this.memstore.getFlushableSize(); 477 } 478 479 @Override 480 public MemStoreSize getSnapshotSize() { 481 return this.memstore.getSnapshotSize(); 482 } 483 484 @Override 485 public long getCompactionCheckMultiplier() { 486 return this.compactionCheckMultiplier; 487 } 488 489 @Override 490 public long getBlockingFileCount() { 491 return blockingFileCount; 492 } 493 /* End implementation of StoreConfigInformation */ 494 495 @Override 496 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 497 return this.storeContext.getFamily(); 498 } 499 500 @Override 501 public OptionalLong getMaxSequenceId() { 502 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 503 } 504 505 @Override 506 public OptionalLong getMaxMemStoreTS() { 507 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 508 } 509 510 /** Returns the data block encoder */ 511 public HFileDataBlockEncoder getDataBlockEncoder() { 512 return dataBlockEncoder; 513 } 514 515 /** 516 * Should be used only in tests. 517 * @param blockEncoder the block delta encoder to use 518 */ 519 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 520 this.dataBlockEncoder = blockEncoder; 521 } 522 523 private void postRefreshStoreFiles() throws IOException { 524 // Advance the memstore read point to be at least the new store files seqIds so that 525 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 526 // in-flight transactions might be made visible) 527 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 528 refreshStoreSizeAndTotalBytes(); 529 } 530 531 @Override 532 public void refreshStoreFiles() throws IOException { 533 storeEngine.refreshStoreFiles(); 534 postRefreshStoreFiles(); 535 } 536 537 /** 538 * Replaces the store files that the store has with the given files. Mainly used by secondary 539 * region replicas to keep up to date with the primary region files. 540 */ 541 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 542 storeEngine.refreshStoreFiles(newFiles); 543 postRefreshStoreFiles(); 544 } 545 546 /** 547 * This message intends to inform the MemStore that next coming updates are going to be part of 548 * the replaying edits from WAL 549 */ 550 public void startReplayingFromWAL() { 551 this.memstore.startReplayingFromWAL(); 552 } 553 554 /** 555 * This message intends to inform the MemStore that the replaying edits from WAL are done 556 */ 557 public void stopReplayingFromWAL() { 558 this.memstore.stopReplayingFromWAL(); 559 } 560 561 /** 562 * Adds a value to the memstore 563 */ 564 public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) { 565 storeEngine.readLock(); 566 try { 567 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 568 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 569 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 570 } 571 this.memstore.add(cell, memstoreSizing); 572 } finally { 573 storeEngine.readUnlock(); 574 currentParallelPutCount.decrementAndGet(); 575 } 576 } 577 578 /** 579 * Adds the specified value to the memstore 580 */ 581 public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) { 582 storeEngine.readLock(); 583 try { 584 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 585 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 586 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 587 } 588 memstore.add(cells, memstoreSizing); 589 } finally { 590 storeEngine.readUnlock(); 591 currentParallelPutCount.decrementAndGet(); 592 } 593 } 594 595 @Override 596 public long timeOfOldestEdit() { 597 return memstore.timeOfOldestEdit(); 598 } 599 600 /** Returns All store files. */ 601 @Override 602 public Collection<HStoreFile> getStorefiles() { 603 return this.storeEngine.getStoreFileManager().getStoreFiles(); 604 } 605 606 @Override 607 public Collection<HStoreFile> getCompactedFiles() { 608 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 609 } 610 611 /** 612 * This throws a WrongRegionException if the HFile does not fit in this region, or an 613 * InvalidHFileException if the HFile is not valid. 614 */ 615 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 616 HFile.Reader reader = null; 617 try { 618 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 619 FileSystem srcFs = srcPath.getFileSystem(conf); 620 srcFs.access(srcPath, FsAction.READ_WRITE); 621 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 622 623 Optional<byte[]> firstKey = reader.getFirstRowKey(); 624 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 625 Optional<ExtendedCell> lk = reader.getLastKey(); 626 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 627 byte[] lastKey = CellUtil.cloneRow(lk.get()); 628 629 if (LOG.isDebugEnabled()) { 630 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 631 + Bytes.toStringBinary(lastKey)); 632 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 633 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 634 } 635 636 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 637 throw new WrongRegionException("Bulk load file " + srcPath.toString() 638 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 639 } 640 641 if ( 642 reader.length() 643 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 644 ) { 645 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 646 + " bytes can be problematic as it may lead to oversplitting."); 647 } 648 649 if (verifyBulkLoads) { 650 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 651 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 652 Cell prevCell = null; 653 HFileScanner scanner = reader.getScanner(conf, false, false, false); 654 scanner.seekTo(); 655 do { 656 Cell cell = scanner.getCell(); 657 if (prevCell != null) { 658 if (getComparator().compareRows(prevCell, cell) > 0) { 659 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 660 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 661 + CellUtil.getCellKeyAsString(cell)); 662 } 663 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 664 throw new InvalidHFileException("Previous key had different" 665 + " family compared to current key: path=" + srcPath + " previous=" 666 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 667 prevCell.getFamilyLength()) 668 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 669 cell.getFamilyLength())); 670 } 671 } 672 prevCell = cell; 673 } while (scanner.next()); 674 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 675 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 676 } 677 } finally { 678 if (reader != null) { 679 reader.close(); 680 } 681 } 682 } 683 684 /** 685 * This method should only be called from Region. It is assumed that the ranges of values in the 686 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 687 * @param seqNum sequence Id associated with the HFile 688 */ 689 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 690 Path srcPath = new Path(srcPathStr); 691 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 692 } 693 694 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 695 Path srcPath = new Path(srcPathStr); 696 try { 697 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 698 } finally { 699 if (this.getCoprocessorHost() != null) { 700 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 701 } 702 } 703 704 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 705 + " - updating store file list."); 706 707 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 708 bulkLoadHFile(sf); 709 710 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 711 712 return dstPath; 713 } 714 715 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 716 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 717 bulkLoadHFile(sf); 718 } 719 720 private void bulkLoadHFile(HStoreFile sf) throws IOException { 721 StoreFileReader r = sf.getReader(); 722 this.storeSize.addAndGet(r.length()); 723 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 724 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 725 }); 726 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 727 if (LOG.isTraceEnabled()) { 728 String traceMessage = "BULK LOAD time,size,store size,store files [" 729 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 730 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 731 LOG.trace(traceMessage); 732 } 733 } 734 735 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 736 memstore.close(); 737 // Clear so metrics doesn't find them. 738 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 739 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 740 // clear the compacted files 741 if (CollectionUtils.isNotEmpty(compactedfiles)) { 742 removeCompactedfiles(compactedfiles, 743 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 744 } 745 if (!result.isEmpty()) { 746 // initialize the thread pool for closing store files in parallel. 747 ThreadPoolExecutor storeFileCloserThreadPool = 748 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 749 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 750 751 // close each store file in parallel 752 CompletionService<Void> completionService = 753 new ExecutorCompletionService<>(storeFileCloserThreadPool); 754 for (HStoreFile f : result) { 755 completionService.submit(new Callable<Void>() { 756 @Override 757 public Void call() throws IOException { 758 boolean evictOnClose = 759 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 760 f.closeStoreFile(!warmup && evictOnClose); 761 return null; 762 } 763 }); 764 } 765 766 IOException ioe = null; 767 try { 768 for (int i = 0; i < result.size(); i++) { 769 try { 770 Future<Void> future = completionService.take(); 771 future.get(); 772 } catch (InterruptedException e) { 773 if (ioe == null) { 774 ioe = new InterruptedIOException(); 775 ioe.initCause(e); 776 } 777 } catch (ExecutionException e) { 778 if (ioe == null) { 779 ioe = new IOException(e.getCause()); 780 } 781 } 782 } 783 } finally { 784 storeFileCloserThreadPool.shutdownNow(); 785 } 786 if (ioe != null) { 787 throw ioe; 788 } 789 } 790 LOG.trace("Closed {}", this); 791 return result; 792 } 793 794 /** 795 * Close all the readers We don't need to worry about subsequent requests because the Region holds 796 * a write lock that will prevent any more reads or writes. 797 * @return the {@link StoreFile StoreFiles} that were previously being used. 798 * @throws IOException on failure 799 */ 800 public ImmutableCollection<HStoreFile> close() throws IOException { 801 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 802 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 803 // Change later if findbugs becomes smarter in the future. 804 this.archiveLock.lock(); 805 try { 806 this.storeEngine.writeLock(); 807 try { 808 return closeWithoutLock(); 809 } finally { 810 this.storeEngine.writeUnlock(); 811 } 812 } finally { 813 this.archiveLock.unlock(); 814 } 815 } 816 817 /** 818 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 819 * previously. 820 * @param logCacheFlushId flush sequence number 821 * @return The path name of the tmp file to which the store was flushed 822 * @throws IOException if exception occurs during process 823 */ 824 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 825 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 826 Consumer<Path> writerCreationTracker) throws IOException { 827 // If an exception happens flushing, we let it out without clearing 828 // the memstore snapshot. The old snapshot will be returned when we say 829 // 'snapshot', the next time flush comes around. 830 // Retry after catching exception when flushing, otherwise server will abort 831 // itself 832 StoreFlusher flusher = storeEngine.getStoreFlusher(); 833 IOException lastException = null; 834 for (int i = 0; i < flushRetriesNumber; i++) { 835 try { 836 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 837 throughputController, tracker, writerCreationTracker); 838 Path lastPathName = null; 839 try { 840 for (Path pathName : pathNames) { 841 lastPathName = pathName; 842 storeEngine.validateStoreFile(pathName, false); 843 } 844 return pathNames; 845 } catch (Exception e) { 846 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 847 if (e instanceof IOException) { 848 lastException = (IOException) e; 849 } else { 850 lastException = new IOException(e); 851 } 852 } 853 } catch (IOException e) { 854 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 855 lastException = e; 856 } 857 if (lastException != null && i < (flushRetriesNumber - 1)) { 858 try { 859 Thread.sleep(pauseTime); 860 } catch (InterruptedException e) { 861 IOException iie = new InterruptedIOException(); 862 iie.initCause(e); 863 throw iie; 864 } 865 } 866 } 867 throw lastException; 868 } 869 870 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 871 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 872 FileSystem srcFs = path.getFileSystem(conf); 873 srcFs.access(path, FsAction.READ_WRITE); 874 try (HFile.Reader reader = 875 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 876 Optional<byte[]> firstKey = reader.getFirstRowKey(); 877 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 878 Optional<ExtendedCell> lk = reader.getLastKey(); 879 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 880 byte[] lastKey = CellUtil.cloneRow(lk.get()); 881 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 882 throw new WrongRegionException("Recovered hfile " + path.toString() 883 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 884 } 885 } 886 887 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 888 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 889 StoreFileReader r = sf.getReader(); 890 this.storeSize.addAndGet(r.length()); 891 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 892 893 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 894 }); 895 896 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 897 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 898 return sf; 899 } 900 901 private long getTotalSize(Collection<HStoreFile> sfs) { 902 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 903 } 904 905 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 906 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 907 // close {@link DefaultMemStore#snapshot}, which may be used by 908 // {@link DefaultMemStore#getScanners}. 909 storeEngine.addStoreFiles(sfs, 910 // NOTE: here we must increase the refCount for storeFiles because we would open the 911 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 912 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 913 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 914 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 915 // storeEngine lock. see HBASE-27519 for more details. 916 snapshotId > 0 ? () -> { 917 this.memstore.clearSnapshot(snapshotId); 918 HStoreFile.increaseStoreFilesRefeCount(sfs); 919 } : () -> { 920 HStoreFile.increaseStoreFilesRefeCount(sfs); 921 }); 922 // notify to be called here - only in case of flushes 923 try { 924 notifyChangedReadersObservers(sfs); 925 } finally { 926 HStoreFile.decreaseStoreFilesRefeCount(sfs); 927 } 928 if (LOG.isTraceEnabled()) { 929 long totalSize = getTotalSize(sfs); 930 String traceMessage = "FLUSH time,count,size,store size,store files [" 931 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 932 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 933 LOG.trace(traceMessage); 934 } 935 return needsCompaction(); 936 } 937 938 /** 939 * Notify all observers that set of Readers has changed. 940 */ 941 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 942 for (ChangedReadersObserver o : this.changedReaderObservers) { 943 List<KeyValueScanner> memStoreScanners; 944 this.storeEngine.readLock(); 945 try { 946 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 947 } finally { 948 this.storeEngine.readUnlock(); 949 } 950 o.updateReaders(sfs, memStoreScanners); 951 } 952 } 953 954 /** 955 * Get all scanners with no filtering based on TTL (that happens further down the line). 956 * @param cacheBlocks cache the blocks or not 957 * @param usePread true to use pread, false if not 958 * @param isCompaction true if the scanner is created for compaction 959 * @param matcher the scan query matcher 960 * @param startRow the start row 961 * @param stopRow the stop row 962 * @param readPt the read point of the current scan 963 * @return all scanners for this store 964 */ 965 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 966 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, 967 boolean onlyLatestVersion) throws IOException { 968 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 969 readPt, onlyLatestVersion); 970 } 971 972 /** 973 * Get all scanners with no filtering based on TTL (that happens further down the line). 974 * @param cacheBlocks cache the blocks or not 975 * @param usePread true to use pread, false if not 976 * @param isCompaction true if the scanner is created for compaction 977 * @param matcher the scan query matcher 978 * @param startRow the start row 979 * @param includeStartRow true to include start row, false if not 980 * @param stopRow the stop row 981 * @param includeStopRow true to include stop row, false if not 982 * @param readPt the read point of the current scan 983 * @return all scanners for this store 984 */ 985 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 986 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 987 byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) 988 throws IOException { 989 Collection<HStoreFile> storeFilesToScan; 990 List<KeyValueScanner> memStoreScanners; 991 this.storeEngine.readLock(); 992 try { 993 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 994 includeStartRow, stopRow, includeStopRow, onlyLatestVersion); 995 memStoreScanners = this.memstore.getScanners(readPt); 996 // NOTE: here we must increase the refCount for storeFiles because we would open the 997 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 998 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 999 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 1000 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 1001 // for more details. 1002 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 1003 } finally { 1004 this.storeEngine.readUnlock(); 1005 } 1006 try { 1007 // First the store file scanners 1008 1009 // TODO this used to get the store files in descending order, 1010 // but now we get them in ascending order, which I think is 1011 // actually more correct, since memstore get put at the end. 1012 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 1013 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1014 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1015 scanners.addAll(sfScanners); 1016 // Then the memstore scanners 1017 scanners.addAll(memStoreScanners); 1018 return scanners; 1019 } catch (Throwable t) { 1020 clearAndClose(memStoreScanners); 1021 throw t instanceof IOException ? (IOException) t : new IOException(t); 1022 } finally { 1023 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1024 } 1025 } 1026 1027 private static void clearAndClose(List<KeyValueScanner> scanners) { 1028 if (scanners == null) { 1029 return; 1030 } 1031 for (KeyValueScanner s : scanners) { 1032 s.close(); 1033 } 1034 scanners.clear(); 1035 } 1036 1037 /** 1038 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1039 * (that happens further down the line). 1040 * @param files the list of files on which the scanners has to be created 1041 * @param cacheBlocks cache the blocks or not 1042 * @param usePread true to use pread, false if not 1043 * @param isCompaction true if the scanner is created for compaction 1044 * @param matcher the scan query matcher 1045 * @param startRow the start row 1046 * @param stopRow the stop row 1047 * @param readPt the read point of the current scan 1048 * @param includeMemstoreScanner true if memstore has to be included 1049 * @return scanners on the given files and on the memstore if specified 1050 */ 1051 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1052 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1053 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, 1054 boolean onlyLatestVersion) throws IOException { 1055 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1056 false, readPt, includeMemstoreScanner, onlyLatestVersion); 1057 } 1058 1059 /** 1060 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1061 * (that happens further down the line). 1062 * @param files the list of files on which the scanners has to be created 1063 * @param cacheBlocks ache the blocks or not 1064 * @param usePread true to use pread, false if not 1065 * @param isCompaction true if the scanner is created for compaction 1066 * @param matcher the scan query matcher 1067 * @param startRow the start row 1068 * @param includeStartRow true to include start row, false if not 1069 * @param stopRow the stop row 1070 * @param includeStopRow true to include stop row, false if not 1071 * @param readPt the read point of the current scan 1072 * @param includeMemstoreScanner true if memstore has to be included 1073 * @return scanners on the given files and on the memstore if specified 1074 */ 1075 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1076 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1077 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1078 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1079 List<KeyValueScanner> memStoreScanners = null; 1080 if (includeMemstoreScanner) { 1081 this.storeEngine.readLock(); 1082 try { 1083 memStoreScanners = this.memstore.getScanners(readPt); 1084 } finally { 1085 this.storeEngine.readUnlock(); 1086 } 1087 } 1088 try { 1089 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1090 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1091 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1092 scanners.addAll(sfScanners); 1093 // Then the memstore scanners 1094 if (memStoreScanners != null) { 1095 scanners.addAll(memStoreScanners); 1096 } 1097 return scanners; 1098 } catch (Throwable t) { 1099 clearAndClose(memStoreScanners); 1100 throw t instanceof IOException ? (IOException) t : new IOException(t); 1101 } 1102 } 1103 1104 /** 1105 * @param o Observer who wants to know about changes in set of Readers 1106 */ 1107 public void addChangedReaderObserver(ChangedReadersObserver o) { 1108 this.changedReaderObservers.add(o); 1109 } 1110 1111 /** 1112 * @param o Observer no longer interested in changes in set of Readers. 1113 */ 1114 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1115 // We don't check if observer present; it may not be (legitimately) 1116 this.changedReaderObservers.remove(o); 1117 } 1118 1119 ////////////////////////////////////////////////////////////////////////////// 1120 // Compaction 1121 ////////////////////////////////////////////////////////////////////////////// 1122 1123 /** 1124 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1125 * block for long periods. 1126 * <p> 1127 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1128 * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until the new compacted 1129 * StoreFile is completely written-out to disk. 1130 * <p> 1131 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1132 * interfering with other write operations. 1133 * <p> 1134 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1135 * we want to allow cache-flushes during this period. 1136 * <p> 1137 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1138 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1139 * is why the following events are carefully ordered for a compaction: 1140 * <ol> 1141 * <li>Compaction writes new files under region/.tmp directory (compaction output)</li> 1142 * <li>Compaction atomically moves the temporary file under region directory</li> 1143 * <li>Compaction appends a WAL edit containing the compaction input and output files. Forces sync 1144 * on WAL.</li> 1145 * <li>Compaction deletes the input files from the region directory.</li> 1146 * </ol> 1147 * Failure conditions are handled like this: 1148 * <ul> 1149 * <li>If RS fails before 2, compaction won't complete. Even if RS lives on and finishes the 1150 * compaction later, it will only write the new data file to the region directory. Since we 1151 * already have this data, this will be idempotent, but we will have a redundant copy of the 1152 * data.</li> 1153 * <li>If RS fails between 2 and 3, the region will have a redundant copy of the data. The RS that 1154 * failed won't be able to finish sync() for WAL because of lease recovery in WAL.</li> 1155 * <li>If RS fails after 3, the region server who opens the region will pick up the compaction 1156 * marker from the WAL and replay it by removing the compaction input files. Failed RS can also 1157 * attempt to delete those files, but the operation will be idempotent</li> 1158 * </ul> 1159 * See HBASE-2231 for details. 1160 * @param compaction compaction details obtained from requestCompaction() 1161 * @return The storefiles that we compacted into or null if we failed or opted out early. 1162 */ 1163 public List<HStoreFile> compact(CompactionContext compaction, 1164 ThroughputController throughputController, User user) throws IOException { 1165 assert compaction != null; 1166 CompactionRequestImpl cr = compaction.getRequest(); 1167 StoreFileWriterCreationTracker writerCreationTracker = 1168 storeFileWriterCreationTrackerFactory.get(); 1169 if (writerCreationTracker != null) { 1170 cr.setWriterCreationTracker(writerCreationTracker); 1171 storeFileWriterCreationTrackers.add(writerCreationTracker); 1172 } 1173 try { 1174 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1175 // because we need to clean up after it on the way out in a finally 1176 // block below 1177 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1178 assert compaction.hasSelection(); 1179 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1180 assert !filesToCompact.isEmpty(); 1181 synchronized (filesCompacting) { 1182 // sanity check: we're compacting files that this store knows about 1183 // TODO: change this to LOG.error() after more debugging 1184 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1185 } 1186 1187 // Ready to go. Have list of files to compact. 1188 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1189 + getRegionFileSystem().getTempDir() + ", totalSize=" 1190 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1191 1192 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1193 compaction.compact(throughputController, user)); 1194 } finally { 1195 finishCompactionRequest(cr); 1196 } 1197 } 1198 1199 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1200 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1201 throws IOException { 1202 // Do the steps necessary to complete the compaction. 1203 setStoragePolicyFromFileName(newFiles); 1204 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true); 1205 if (this.getCoprocessorHost() != null) { 1206 for (HStoreFile sf : sfs) { 1207 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1208 } 1209 } 1210 replaceStoreFiles(filesToCompact, sfs, true); 1211 1212 long outputBytes = getTotalSize(sfs); 1213 1214 // At this point the store will use new files for all new scanners. 1215 refreshStoreSizeAndTotalBytes(); // update store size. 1216 1217 long now = EnvironmentEdgeManager.currentTime(); 1218 if ( 1219 region.getRegionServerServices() != null 1220 && region.getRegionServerServices().getMetrics() != null 1221 ) { 1222 region.getRegionServerServices().getMetrics().updateCompaction( 1223 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1224 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1225 outputBytes); 1226 1227 } 1228 1229 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1230 return sfs; 1231 } 1232 1233 // Set correct storage policy from the file name of DTCP. 1234 // Rename file will not change the storage policy. 1235 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1236 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1237 for (Path newFile : newFiles) { 1238 if (newFile.getParent().getName().startsWith(prefix)) { 1239 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1240 newFile.getParent().getName().substring(prefix.length())); 1241 } 1242 } 1243 } 1244 1245 /** 1246 * Writes the compaction WAL record. 1247 * @param filesCompacted Files compacted (input). 1248 * @param newFiles Files from compaction. 1249 */ 1250 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1251 Collection<HStoreFile> newFiles) throws IOException { 1252 if (region.getWAL() == null) { 1253 return; 1254 } 1255 List<Path> inputPaths = 1256 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1257 List<Path> outputPaths = 1258 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1259 RegionInfo info = this.region.getRegionInfo(); 1260 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1261 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1262 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1263 // Fix reaching into Region to get the maxWaitForSeqId. 1264 // Does this method belong in Region altogether given it is making so many references up there? 1265 // Could be Region#writeCompactionMarker(compactionDescriptor); 1266 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1267 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1268 region.getRegionReplicationSink().orElse(null)); 1269 } 1270 1271 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1272 allowedOnPath = ".*/(HStore|TestHStore).java") 1273 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1274 boolean writeCompactionMarker) throws IOException { 1275 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1276 if (writeCompactionMarker) { 1277 writeCompactionWalRecord(compactedFiles, result); 1278 } 1279 }, () -> { 1280 synchronized (filesCompacting) { 1281 filesCompacting.removeAll(compactedFiles); 1282 } 1283 }); 1284 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1285 // sizes later so it's not super-critical if we miss these. 1286 RegionServerServices rsServices = region.getRegionServerServices(); 1287 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1288 updateSpaceQuotaAfterFileReplacement( 1289 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1290 compactedFiles, result); 1291 } 1292 } 1293 1294 /** 1295 * Updates the space quota usage for this region, removing the size for files compacted away and 1296 * adding in the size for new files. 1297 * @param sizeStore The object tracking changes in region size for space quotas. 1298 * @param regionInfo The identifier for the region whose size is being updated. 1299 * @param oldFiles Files removed from this store's region. 1300 * @param newFiles Files added to this store's region. 1301 */ 1302 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1303 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1304 long delta = 0; 1305 if (oldFiles != null) { 1306 for (HStoreFile compactedFile : oldFiles) { 1307 if (compactedFile.isHFile()) { 1308 delta -= compactedFile.getReader().length(); 1309 } 1310 } 1311 } 1312 if (newFiles != null) { 1313 for (HStoreFile newFile : newFiles) { 1314 if (newFile.isHFile()) { 1315 delta += newFile.getReader().length(); 1316 } 1317 } 1318 } 1319 sizeStore.incrementRegionSize(regionInfo, delta); 1320 } 1321 1322 /** 1323 * Log a very elaborate compaction completion message. 1324 * @param cr Request. 1325 * @param sfs Resulting files. 1326 * @param compactionStartTime Start time. 1327 */ 1328 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1329 long compactionStartTime) { 1330 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1331 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1332 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1333 if (sfs.isEmpty()) { 1334 message.append("none, "); 1335 } else { 1336 for (HStoreFile sf : sfs) { 1337 message.append(sf.getPath().getName()); 1338 message.append("(size="); 1339 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1340 message.append("), "); 1341 } 1342 } 1343 message.append("total size for store is ") 1344 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1345 .append(". This selection was in queue for ") 1346 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1347 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1348 .append(" to execute."); 1349 LOG.info(message.toString()); 1350 if (LOG.isTraceEnabled()) { 1351 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1352 long resultSize = getTotalSize(sfs); 1353 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1354 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1355 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1356 LOG.trace(traceMessage); 1357 } 1358 } 1359 1360 /** 1361 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1362 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1363 */ 1364 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1365 boolean removeFiles) throws IOException { 1366 LOG.debug("Completing compaction from the WAL marker"); 1367 List<String> compactionInputs = compaction.getCompactionInputList(); 1368 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1369 1370 // The Compaction Marker is written after the compaction is completed, 1371 // and the files moved into the region/family folder. 1372 // 1373 // If we crash after the entry is written, we may not have removed the 1374 // input files, but the output file is present. 1375 // (The unremoved input files will be removed by this function) 1376 // 1377 // If we scan the directory and the file is not present, it can mean that: 1378 // - The file was manually removed by the user 1379 // - The file was removed as consequence of subsequent compaction 1380 // so, we can't do anything with the "compaction output list" because those 1381 // files have already been loaded when opening the region (by virtue of 1382 // being in the store's folder) or they may be missing due to a compaction. 1383 1384 String familyName = this.getColumnFamilyName(); 1385 Set<String> inputFiles = new HashSet<>(); 1386 for (String compactionInput : compactionInputs) { 1387 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1388 inputFiles.add(inputPath.getName()); 1389 } 1390 1391 // some of the input files might already be deleted 1392 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1393 for (HStoreFile sf : this.getStorefiles()) { 1394 if (inputFiles.contains(sf.getPath().getName())) { 1395 inputStoreFiles.add(sf); 1396 } 1397 } 1398 1399 // check whether we need to pick up the new files 1400 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1401 1402 if (pickCompactionFiles) { 1403 for (HStoreFile sf : this.getStorefiles()) { 1404 compactionOutputs.remove(sf.getPath().getName()); 1405 } 1406 for (String compactionOutput : compactionOutputs) { 1407 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext); 1408 StoreFileInfo storeFileInfo = 1409 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput, sft); 1410 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1411 outputStoreFiles.add(storeFile); 1412 } 1413 } 1414 1415 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1416 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1417 + " with output files : " + outputStoreFiles); 1418 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1419 this.refreshStoreSizeAndTotalBytes(); 1420 } 1421 } 1422 1423 @Override 1424 public boolean hasReferences() { 1425 // Grab the read lock here, because we need to ensure that: only when the atomic 1426 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1427 this.storeEngine.readLock(); 1428 try { 1429 // Merge the current store files with compacted files here due to HBASE-20940. 1430 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1431 allStoreFiles.addAll(getCompactedFiles()); 1432 return StoreUtils.hasReferences(allStoreFiles); 1433 } finally { 1434 this.storeEngine.readUnlock(); 1435 } 1436 } 1437 1438 /** 1439 * getter for CompactionProgress object 1440 * @return CompactionProgress object; can be null 1441 */ 1442 public CompactionProgress getCompactionProgress() { 1443 return this.storeEngine.getCompactor().getProgress(); 1444 } 1445 1446 @Override 1447 public boolean shouldPerformMajorCompaction() throws IOException { 1448 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1449 // TODO: what are these reader checks all over the place? 1450 if (sf.getReader() == null) { 1451 LOG.debug("StoreFile {} has null Reader", sf); 1452 return false; 1453 } 1454 } 1455 return storeEngine.getCompactionPolicy() 1456 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); 1457 } 1458 1459 public Optional<CompactionContext> requestCompaction() throws IOException { 1460 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1461 } 1462 1463 public Optional<CompactionContext> requestCompaction(int priority, 1464 CompactionLifeCycleTracker tracker, User user) throws IOException { 1465 // don't even select for compaction if writes are disabled 1466 if (!this.areWritesEnabled()) { 1467 return Optional.empty(); 1468 } 1469 // Before we do compaction, try to get rid of unneeded files to simplify things. 1470 removeUnneededFiles(); 1471 1472 final CompactionContext compaction = storeEngine.createCompaction(); 1473 CompactionRequestImpl request = null; 1474 this.storeEngine.readLock(); 1475 try { 1476 synchronized (filesCompacting) { 1477 // First, see if coprocessor would want to override selection. 1478 if (this.getCoprocessorHost() != null) { 1479 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1480 boolean override = 1481 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1482 if (override) { 1483 // Coprocessor is overriding normal file selection. 1484 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1485 } 1486 } 1487 1488 // Normal case - coprocessor is not overriding file selection. 1489 if (!compaction.hasSelection()) { 1490 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1491 boolean mayUseOffPeak = 1492 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1493 try { 1494 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1495 forceMajor && filesCompacting.isEmpty()); 1496 } catch (IOException e) { 1497 if (mayUseOffPeak) { 1498 offPeakCompactionTracker.set(false); 1499 } 1500 throw e; 1501 } 1502 assert compaction.hasSelection(); 1503 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1504 // Compaction policy doesn't want to take advantage of off-peak. 1505 offPeakCompactionTracker.set(false); 1506 } 1507 } 1508 if (this.getCoprocessorHost() != null) { 1509 this.getCoprocessorHost().postCompactSelection(this, 1510 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1511 compaction.getRequest(), user); 1512 } 1513 // Finally, we have the resulting files list. Check if we have any files at all. 1514 request = compaction.getRequest(); 1515 Collection<HStoreFile> selectedFiles = request.getFiles(); 1516 if (selectedFiles.isEmpty()) { 1517 return Optional.empty(); 1518 } 1519 1520 addToCompactingFiles(selectedFiles); 1521 1522 // If we're enqueuing a major, clear the force flag. 1523 this.forceMajor = this.forceMajor && !request.isMajor(); 1524 1525 // Set common request properties. 1526 // Set priority, either override value supplied by caller or from store. 1527 final int compactionPriority = 1528 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1529 request.setPriority(compactionPriority); 1530 1531 if (request.isAfterSplit()) { 1532 // If the store belongs to recently splitted daughter regions, better we consider 1533 // them with the higher priority in the compaction queue. 1534 // Override priority if it is lower (higher int value) than 1535 // SPLIT_REGION_COMPACTION_PRIORITY 1536 final int splitHousekeepingPriority = 1537 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1538 request.setPriority(splitHousekeepingPriority); 1539 LOG.info( 1540 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1541 + " belongs to recently split daughter region {}", 1542 splitHousekeepingPriority, this.getColumnFamilyName(), 1543 getRegionInfo().getRegionNameAsString()); 1544 } 1545 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1546 request.setTracker(tracker); 1547 } 1548 } finally { 1549 this.storeEngine.readUnlock(); 1550 } 1551 1552 if (LOG.isDebugEnabled()) { 1553 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1554 + (request.isAllFiles() ? " (all files)" : "")); 1555 } 1556 this.region.reportCompactionRequestStart(request.isMajor()); 1557 return Optional.of(compaction); 1558 } 1559 1560 /** Adds the files to compacting files. filesCompacting must be locked. */ 1561 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1562 if (CollectionUtils.isEmpty(filesToAdd)) { 1563 return; 1564 } 1565 // Check that we do not try to compact the same StoreFile twice. 1566 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1567 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1568 } 1569 filesCompacting.addAll(filesToAdd); 1570 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1571 } 1572 1573 private void removeUnneededFiles() throws IOException { 1574 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1575 return; 1576 } 1577 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1578 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1579 getColumnFamilyDescriptor().getMinVersions()); 1580 return; 1581 } 1582 this.storeEngine.readLock(); 1583 Collection<HStoreFile> delSfs = null; 1584 try { 1585 synchronized (filesCompacting) { 1586 long cfTtl = getStoreFileTtl(); 1587 if (cfTtl != Long.MAX_VALUE) { 1588 delSfs = storeEngine.getStoreFileManager() 1589 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1590 addToCompactingFiles(delSfs); 1591 } 1592 } 1593 } finally { 1594 this.storeEngine.readUnlock(); 1595 } 1596 1597 if (CollectionUtils.isEmpty(delSfs)) { 1598 return; 1599 } 1600 1601 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1602 replaceStoreFiles(delSfs, newFiles, true); 1603 refreshStoreSizeAndTotalBytes(); 1604 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1605 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1606 } 1607 1608 public void cancelRequestedCompaction(CompactionContext compaction) { 1609 finishCompactionRequest(compaction.getRequest()); 1610 } 1611 1612 private void finishCompactionRequest(CompactionRequestImpl cr) { 1613 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1614 if (cr.isOffPeak()) { 1615 offPeakCompactionTracker.set(false); 1616 cr.setOffPeak(false); 1617 } 1618 synchronized (filesCompacting) { 1619 filesCompacting.removeAll(cr.getFiles()); 1620 } 1621 // The tracker could be null, for example, we do not need to track the creation of store file 1622 // writer due to different implementation of SFT, or the compaction is canceled. 1623 if (cr.getWriterCreationTracker() != null) { 1624 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1625 } 1626 } 1627 1628 /** 1629 * Update counts. 1630 */ 1631 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1632 this.storeSize.set(0L); 1633 this.totalUncompressedBytes.set(0L); 1634 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1635 StoreFileReader r = hsf.getReader(); 1636 if (r == null) { 1637 LOG.debug("StoreFile {} has a null Reader", hsf); 1638 continue; 1639 } 1640 this.storeSize.addAndGet(r.length()); 1641 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1642 } 1643 } 1644 1645 /* 1646 * @param wantedVersions How many versions were asked for. 1647 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1648 */ 1649 int versionsToReturn(final int wantedVersions) { 1650 if (wantedVersions <= 0) { 1651 throw new IllegalArgumentException("Number of versions must be > 0"); 1652 } 1653 // Make sure we do not return more than maximum versions for this store. 1654 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1655 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1656 } 1657 1658 @Override 1659 public boolean canSplit() { 1660 // Not split-able if we find a reference store file present in the store. 1661 boolean result = !hasReferences(); 1662 if (!result) { 1663 LOG.trace("Not splittable; has references: {}", this); 1664 } 1665 return result; 1666 } 1667 1668 /** 1669 * Determines if Store should be split. 1670 */ 1671 public Optional<byte[]> getSplitPoint() { 1672 this.storeEngine.readLock(); 1673 try { 1674 // Should already be enforced by the split policy! 1675 assert !this.getRegionInfo().isMetaRegion(); 1676 // Not split-able if we find a reference store file present in the store. 1677 if (hasReferences()) { 1678 LOG.trace("Not splittable; has references: {}", this); 1679 return Optional.empty(); 1680 } 1681 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1682 } catch (IOException e) { 1683 LOG.warn("Failed getting store size for {}", this, e); 1684 } finally { 1685 this.storeEngine.readUnlock(); 1686 } 1687 return Optional.empty(); 1688 } 1689 1690 @Override 1691 public long getLastCompactSize() { 1692 return this.lastCompactSize; 1693 } 1694 1695 @Override 1696 public long getSize() { 1697 return storeSize.get(); 1698 } 1699 1700 public void triggerMajorCompaction() { 1701 this.forceMajor = true; 1702 } 1703 1704 ////////////////////////////////////////////////////////////////////////////// 1705 // File administration 1706 ////////////////////////////////////////////////////////////////////////////// 1707 1708 /** 1709 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1710 * compaction. 1711 * @param scan Scan to apply when scanning the stores 1712 * @param targetCols columns to scan 1713 * @return a scanner over the current key values 1714 * @throws IOException on failure 1715 */ 1716 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1717 throws IOException { 1718 storeEngine.readLock(); 1719 try { 1720 ScanInfo scanInfo; 1721 if (this.getCoprocessorHost() != null) { 1722 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1723 } else { 1724 scanInfo = getScanInfo(); 1725 } 1726 return createScanner(scan, scanInfo, targetCols, readPt); 1727 } finally { 1728 storeEngine.readUnlock(); 1729 } 1730 } 1731 1732 // HMobStore will override this method to return its own implementation. 1733 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1734 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1735 return scan.isReversed() 1736 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1737 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1738 } 1739 1740 /** 1741 * Recreates the scanners on the current list of active store file scanners 1742 * @param currentFileScanners the current set of active store file scanners 1743 * @param cacheBlocks cache the blocks or not 1744 * @param usePread use pread or not 1745 * @param isCompaction is the scanner for compaction 1746 * @param matcher the scan query matcher 1747 * @param startRow the scan's start row 1748 * @param includeStartRow should the scan include the start row 1749 * @param stopRow the scan's stop row 1750 * @param includeStopRow should the scan include the stop row 1751 * @param readPt the read point of the current scane 1752 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1753 * @return list of scanners recreated on the current Scanners 1754 */ 1755 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1756 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1757 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1758 boolean includeMemstoreScanner) throws IOException { 1759 this.storeEngine.readLock(); 1760 try { 1761 Map<String, HStoreFile> name2File = 1762 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1763 for (HStoreFile file : getStorefiles()) { 1764 name2File.put(file.getFileInfo().getActiveFileName(), file); 1765 } 1766 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1767 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1768 name2File.put(file.getFileInfo().getActiveFileName(), file); 1769 } 1770 List<HStoreFile> filesToReopen = new ArrayList<>(); 1771 for (KeyValueScanner kvs : currentFileScanners) { 1772 assert kvs.isFileScanner(); 1773 if (kvs.peek() == null) { 1774 continue; 1775 } 1776 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1777 } 1778 if (filesToReopen.isEmpty()) { 1779 return null; 1780 } 1781 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1782 includeStartRow, stopRow, includeStopRow, readPt, false, false); 1783 } finally { 1784 this.storeEngine.readUnlock(); 1785 } 1786 } 1787 1788 @Override 1789 public String toString() { 1790 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1791 } 1792 1793 @Override 1794 public int getStorefilesCount() { 1795 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1796 } 1797 1798 @Override 1799 public int getCompactedFilesCount() { 1800 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1801 } 1802 1803 private LongStream getStoreFileAgeStream() { 1804 return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { 1805 if (sf.getReader() == null) { 1806 LOG.debug("StoreFile {} has a null Reader", sf); 1807 return false; 1808 } else { 1809 return true; 1810 } 1811 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1812 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1813 } 1814 1815 @Override 1816 public OptionalLong getMaxStoreFileAge() { 1817 return getStoreFileAgeStream().max(); 1818 } 1819 1820 @Override 1821 public OptionalLong getMinStoreFileAge() { 1822 return getStoreFileAgeStream().min(); 1823 } 1824 1825 @Override 1826 public OptionalDouble getAvgStoreFileAge() { 1827 return getStoreFileAgeStream().average(); 1828 } 1829 1830 @Override 1831 public long getNumReferenceFiles() { 1832 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1833 .filter(HStoreFile::isReference).count(); 1834 } 1835 1836 @Override 1837 public long getNumHFiles() { 1838 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1839 .filter(HStoreFile::isHFile).count(); 1840 } 1841 1842 @Override 1843 public long getStoreSizeUncompressed() { 1844 return this.totalUncompressedBytes.get(); 1845 } 1846 1847 @Override 1848 public long getStorefilesSize() { 1849 // Include all StoreFiles 1850 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1851 sf -> true); 1852 } 1853 1854 @Override 1855 public long getHFilesSize() { 1856 // Include only StoreFiles which are HFiles 1857 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1858 HStoreFile::isHFile); 1859 } 1860 1861 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1862 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1863 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1864 } 1865 1866 @Override 1867 public long getStorefilesRootLevelIndexSize() { 1868 return getStorefilesFieldSize(StoreFileReader::indexSize); 1869 } 1870 1871 @Override 1872 public long getTotalStaticIndexSize() { 1873 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1874 } 1875 1876 @Override 1877 public long getTotalStaticBloomSize() { 1878 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1879 } 1880 1881 @Override 1882 public MemStoreSize getMemStoreSize() { 1883 return this.memstore.size(); 1884 } 1885 1886 @Override 1887 public int getCompactPriority() { 1888 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1889 if (priority == PRIORITY_USER) { 1890 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1891 } 1892 return priority; 1893 } 1894 1895 public boolean throttleCompaction(long compactionSize) { 1896 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1897 } 1898 1899 public HRegion getHRegion() { 1900 return this.region; 1901 } 1902 1903 public RegionCoprocessorHost getCoprocessorHost() { 1904 return this.region.getCoprocessorHost(); 1905 } 1906 1907 @Override 1908 public RegionInfo getRegionInfo() { 1909 return getRegionFileSystem().getRegionInfo(); 1910 } 1911 1912 @Override 1913 public boolean areWritesEnabled() { 1914 return this.region.areWritesEnabled(); 1915 } 1916 1917 @Override 1918 public long getSmallestReadPoint() { 1919 return this.region.getSmallestReadPoint(); 1920 } 1921 1922 /** 1923 * Adds or replaces the specified KeyValues. 1924 * <p> 1925 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1926 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1927 * <p> 1928 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1929 * across all of them. 1930 * @param readpoint readpoint below which we can safely remove duplicate KVs 1931 */ 1932 public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1933 this.storeEngine.readLock(); 1934 try { 1935 this.memstore.upsert(cells, readpoint, memstoreSizing); 1936 } finally { 1937 this.storeEngine.readUnlock(); 1938 } 1939 } 1940 1941 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1942 return new StoreFlusherImpl(cacheFlushId, tracker); 1943 } 1944 1945 private final class StoreFlusherImpl implements StoreFlushContext { 1946 1947 private final FlushLifeCycleTracker tracker; 1948 private final StoreFileWriterCreationTracker writerCreationTracker; 1949 private final long cacheFlushSeqNum; 1950 private MemStoreSnapshot snapshot; 1951 private List<Path> tempFiles; 1952 private List<Path> committedFiles; 1953 private long cacheFlushCount; 1954 private long cacheFlushSize; 1955 private long outputFileSize; 1956 1957 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1958 this.cacheFlushSeqNum = cacheFlushSeqNum; 1959 this.tracker = tracker; 1960 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1961 } 1962 1963 /** 1964 * This is not thread safe. The caller should have a lock on the region or the store. If 1965 * necessary, the lock can be added with the patch provided in HBASE-10087 1966 */ 1967 @Override 1968 public MemStoreSize prepare() { 1969 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1970 this.snapshot = memstore.snapshot(); 1971 this.cacheFlushCount = snapshot.getCellsCount(); 1972 this.cacheFlushSize = snapshot.getDataSize(); 1973 committedFiles = new ArrayList<>(1); 1974 return snapshot.getMemStoreSize(); 1975 } 1976 1977 @Override 1978 public void flushCache(MonitoredTask status) throws IOException { 1979 RegionServerServices rsService = region.getRegionServerServices(); 1980 ThroughputController throughputController = 1981 rsService == null ? null : rsService.getFlushThroughputController(); 1982 // it could be null if we do not need to track the creation of store file writer due to 1983 // different SFT implementation. 1984 if (writerCreationTracker != null) { 1985 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1986 } 1987 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1988 tracker, writerCreationTracker); 1989 } 1990 1991 @Override 1992 public boolean commit(MonitoredTask status) throws IOException { 1993 try { 1994 if (CollectionUtils.isEmpty(this.tempFiles)) { 1995 return false; 1996 } 1997 status.setStatus("Flushing " + this + ": reopening flushed file"); 1998 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false, false); 1999 for (HStoreFile sf : storeFiles) { 2000 StoreFileReader r = sf.getReader(); 2001 if (LOG.isInfoEnabled()) { 2002 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 2003 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 2004 } 2005 outputFileSize += r.length(); 2006 storeSize.addAndGet(r.length()); 2007 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 2008 committedFiles.add(sf.getPath()); 2009 } 2010 2011 flushedCellsCount.addAndGet(cacheFlushCount); 2012 flushedCellsSize.addAndGet(cacheFlushSize); 2013 flushedOutputFileSize.addAndGet(outputFileSize); 2014 // call coprocessor after we have done all the accounting above 2015 for (HStoreFile sf : storeFiles) { 2016 if (getCoprocessorHost() != null) { 2017 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2018 } 2019 } 2020 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2021 return completeFlush(storeFiles, snapshot.getId()); 2022 } finally { 2023 if (writerCreationTracker != null) { 2024 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2025 } 2026 } 2027 } 2028 2029 @Override 2030 public long getOutputFileSize() { 2031 return outputFileSize; 2032 } 2033 2034 @Override 2035 public List<Path> getCommittedFiles() { 2036 return committedFiles; 2037 } 2038 2039 /** 2040 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2041 * primary region. Adds the new files to the store, and drops the snapshot depending on 2042 * dropMemstoreSnapshot argument. 2043 * @param fileNames names of the flushed files 2044 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2045 */ 2046 @Override 2047 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2048 throws IOException { 2049 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2050 for (String file : fileNames) { 2051 // open the file as a store file (hfile link, etc) 2052 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext); 2053 StoreFileInfo storeFileInfo = 2054 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file, sft); 2055 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2056 storeFiles.add(storeFile); 2057 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2058 HStore.this.totalUncompressedBytes 2059 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2060 if (LOG.isInfoEnabled()) { 2061 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2062 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2063 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2064 } 2065 } 2066 2067 long snapshotId = -1; // -1 means do not drop 2068 if (dropMemstoreSnapshot && snapshot != null) { 2069 snapshotId = snapshot.getId(); 2070 } 2071 HStore.this.completeFlush(storeFiles, snapshotId); 2072 } 2073 2074 /** 2075 * Abort the snapshot preparation. Drops the snapshot if any. 2076 */ 2077 @Override 2078 public void abort() throws IOException { 2079 if (snapshot != null) { 2080 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2081 } 2082 } 2083 } 2084 2085 @Override 2086 public boolean needsCompaction() { 2087 List<HStoreFile> filesCompactingClone = null; 2088 synchronized (filesCompacting) { 2089 filesCompactingClone = Lists.newArrayList(filesCompacting); 2090 } 2091 return this.storeEngine.needsCompaction(filesCompactingClone); 2092 } 2093 2094 /** 2095 * Used for tests. 2096 * @return cache configuration for this Store. 2097 */ 2098 public CacheConfig getCacheConfig() { 2099 return storeContext.getCacheConf(); 2100 } 2101 2102 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2103 2104 public static final long DEEP_OVERHEAD = ClassSize.align( 2105 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2106 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2107 2108 @Override 2109 public long heapSize() { 2110 MemStoreSize memstoreSize = this.memstore.size(); 2111 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2112 } 2113 2114 @Override 2115 public CellComparator getComparator() { 2116 return storeContext.getComparator(); 2117 } 2118 2119 public ScanInfo getScanInfo() { 2120 return scanInfo; 2121 } 2122 2123 /** 2124 * Set scan info, used by test 2125 * @param scanInfo new scan info to use for test 2126 */ 2127 void setScanInfo(ScanInfo scanInfo) { 2128 this.scanInfo = scanInfo; 2129 } 2130 2131 @Override 2132 public boolean hasTooManyStoreFiles() { 2133 return getStorefilesCount() > this.blockingFileCount; 2134 } 2135 2136 @Override 2137 public long getFlushedCellsCount() { 2138 return flushedCellsCount.get(); 2139 } 2140 2141 @Override 2142 public long getFlushedCellsSize() { 2143 return flushedCellsSize.get(); 2144 } 2145 2146 @Override 2147 public long getFlushedOutputFileSize() { 2148 return flushedOutputFileSize.get(); 2149 } 2150 2151 @Override 2152 public long getCompactedCellsCount() { 2153 return compactedCellsCount.get(); 2154 } 2155 2156 @Override 2157 public long getCompactedCellsSize() { 2158 return compactedCellsSize.get(); 2159 } 2160 2161 @Override 2162 public long getMajorCompactedCellsCount() { 2163 return majorCompactedCellsCount.get(); 2164 } 2165 2166 @Override 2167 public long getMajorCompactedCellsSize() { 2168 return majorCompactedCellsSize.get(); 2169 } 2170 2171 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2172 if (isMajor) { 2173 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2174 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2175 } else { 2176 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2177 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2178 } 2179 } 2180 2181 /** 2182 * Returns the StoreEngine that is backing this concrete implementation of Store. 2183 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2184 */ 2185 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2186 return this.storeEngine; 2187 } 2188 2189 protected OffPeakHours getOffPeakHours() { 2190 return this.offPeakHours; 2191 } 2192 2193 @Override 2194 public void onConfigurationChange(Configuration conf) { 2195 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2196 getColumnFamilyDescriptor()); 2197 this.conf = storeConf; 2198 this.storeEngine.compactionPolicy.setConf(storeConf); 2199 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2200 } 2201 2202 /** 2203 * {@inheritDoc} 2204 */ 2205 @Override 2206 public void registerChildren(ConfigurationManager manager) { 2207 CacheConfig cacheConfig = this.storeContext.getCacheConf(); 2208 if (cacheConfig != null) { 2209 manager.registerObserver(cacheConfig); 2210 } 2211 } 2212 2213 /** 2214 * {@inheritDoc} 2215 */ 2216 @Override 2217 public void deregisterChildren(ConfigurationManager manager) { 2218 // No children to deregister 2219 } 2220 2221 @Override 2222 public double getCompactionPressure() { 2223 return storeEngine.getStoreFileManager().getCompactionPressure(); 2224 } 2225 2226 @Override 2227 public boolean isPrimaryReplicaStore() { 2228 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2229 } 2230 2231 /** 2232 * Sets the store up for a region level snapshot operation. 2233 * @see #postSnapshotOperation() 2234 */ 2235 public void preSnapshotOperation() { 2236 archiveLock.lock(); 2237 } 2238 2239 /** 2240 * Perform tasks needed after the completion of snapshot operation. 2241 * @see #preSnapshotOperation() 2242 */ 2243 public void postSnapshotOperation() { 2244 archiveLock.unlock(); 2245 } 2246 2247 /** 2248 * Closes and archives the compacted files under this store 2249 */ 2250 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2251 // ensure other threads do not attempt to archive the same files on close() 2252 archiveLock.lock(); 2253 try { 2254 storeEngine.readLock(); 2255 Collection<HStoreFile> copyCompactedfiles = null; 2256 try { 2257 Collection<HStoreFile> compactedfiles = 2258 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2259 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2260 // Do a copy under read lock 2261 copyCompactedfiles = new ArrayList<>(compactedfiles); 2262 } else { 2263 LOG.trace("No compacted files to archive"); 2264 } 2265 } finally { 2266 storeEngine.readUnlock(); 2267 } 2268 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2269 removeCompactedfiles(copyCompactedfiles, true); 2270 } 2271 } finally { 2272 archiveLock.unlock(); 2273 } 2274 } 2275 2276 /** 2277 * Archives and removes the compacted files 2278 * @param compactedfiles The compacted files in this store that are not active in reads 2279 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2280 * closed, false if not 2281 */ 2282 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2283 throws IOException { 2284 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2285 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2286 for (final HStoreFile file : compactedfiles) { 2287 synchronized (file) { 2288 try { 2289 StoreFileReader r = file.getReader(); 2290 if (r == null) { 2291 LOG.debug("The file {} was closed but still not archived", file); 2292 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2293 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2294 // for that. 2295 long length = getStoreFileSize(file); 2296 filesToRemove.add(file); 2297 storeFileSizes.add(length); 2298 continue; 2299 } 2300 2301 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2302 // Even if deleting fails we need not bother as any new scanners won't be 2303 // able to use the compacted file as the status is already compactedAway 2304 LOG.trace("Closing and archiving the file {}", file); 2305 // Copy the file size before closing the reader 2306 final long length = r.length(); 2307 r.close(evictOnClose); 2308 // Just close and return 2309 filesToRemove.add(file); 2310 // Only add the length if we successfully added the file to `filesToRemove` 2311 storeFileSizes.add(length); 2312 } else { 2313 LOG.info("Can't archive compacted file " + file.getPath() 2314 + " because of either isCompactedAway=" + file.isCompactedAway() 2315 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2316 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2317 } 2318 } catch (Exception e) { 2319 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2320 e); 2321 } 2322 } 2323 } 2324 if (this.isPrimaryReplicaStore()) { 2325 // Only the primary region is allowed to move the file to archive. 2326 // The secondary region does not move the files to archive. Any active reads from 2327 // the secondary region will still work because the file as such has active readers on it. 2328 if (!filesToRemove.isEmpty()) { 2329 LOG.debug("Moving the files {} to archive", filesToRemove); 2330 // Only if this is successful it has to be removed 2331 try { 2332 StoreFileTracker storeFileTracker = 2333 StoreFileTrackerFactory.create(conf, isPrimaryReplicaStore(), storeContext); 2334 storeFileTracker.removeStoreFiles(filesToRemove); 2335 } catch (FailedArchiveException fae) { 2336 // Even if archiving some files failed, we still need to clear out any of the 2337 // files which were successfully archived. Otherwise we will receive a 2338 // FileNotFoundException when we attempt to re-archive them in the next go around. 2339 Collection<Path> failedFiles = fae.getFailedFiles(); 2340 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2341 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2342 while (iter.hasNext()) { 2343 sizeIter.next(); 2344 if (failedFiles.contains(iter.next().getPath())) { 2345 iter.remove(); 2346 sizeIter.remove(); 2347 } 2348 } 2349 if (!filesToRemove.isEmpty()) { 2350 clearCompactedfiles(filesToRemove); 2351 } 2352 throw fae; 2353 } 2354 } 2355 } 2356 if (!filesToRemove.isEmpty()) { 2357 // Clear the compactedfiles from the store file manager 2358 clearCompactedfiles(filesToRemove); 2359 // Try to send report of this archival to the Master for updating quota usage faster 2360 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2361 } 2362 } 2363 2364 /** 2365 * Computes the length of a store file without succumbing to any errors along the way. If an error 2366 * is encountered, the implementation returns {@code 0} instead of the actual size. 2367 * @param file The file to compute the size of. 2368 * @return The size in bytes of the provided {@code file}. 2369 */ 2370 long getStoreFileSize(HStoreFile file) { 2371 long length = 0; 2372 try { 2373 file.initReader(); 2374 length = file.getReader().length(); 2375 } catch (IOException e) { 2376 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2377 file, e); 2378 } finally { 2379 try { 2380 file.closeStoreFile( 2381 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2382 } catch (IOException e) { 2383 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2384 e); 2385 } 2386 } 2387 return length; 2388 } 2389 2390 public Long preFlushSeqIDEstimation() { 2391 return memstore.preFlushSeqIDEstimation(); 2392 } 2393 2394 @Override 2395 public boolean isSloppyMemStore() { 2396 return this.memstore.isSloppy(); 2397 } 2398 2399 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2400 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2401 storeEngine.removeCompactedFiles(filesToRemove); 2402 } 2403 2404 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2405 // Sanity check from the caller 2406 if (archivedFiles.size() != fileSizes.size()) { 2407 throw new RuntimeException("Coding error: should never see lists of varying size"); 2408 } 2409 RegionServerServices rss = this.region.getRegionServerServices(); 2410 if (rss == null) { 2411 return; 2412 } 2413 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2414 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2415 for (StoreFile storeFile : archivedFiles) { 2416 final long fileSize = fileSizeIter.next(); 2417 if (storeFile.isHFile() && fileSize != 0) { 2418 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2419 } 2420 } 2421 if (LOG.isTraceEnabled()) { 2422 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2423 + filesWithSizes); 2424 } 2425 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2426 if (!success) { 2427 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2428 } 2429 } 2430 2431 @Override 2432 public int getCurrentParallelPutCount() { 2433 return currentParallelPutCount.get(); 2434 } 2435 2436 public int getStoreRefCount() { 2437 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 2438 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2439 .mapToInt(HStoreFile::getRefCount).sum(); 2440 } 2441 2442 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2443 public int getMaxCompactedStoreFileRefCount() { 2444 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2445 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2446 .mapToInt(HStoreFile::getRefCount).max(); 2447 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2448 } 2449 2450 @Override 2451 public long getMemstoreOnlyRowReadsCount() { 2452 return memstoreOnlyRowReadsCount.sum(); 2453 } 2454 2455 @Override 2456 public long getMixedRowReadsCount() { 2457 return mixedRowReadsCount.sum(); 2458 } 2459 2460 @Override 2461 public Configuration getReadOnlyConfiguration() { 2462 return new ReadOnlyConfiguration(this.conf); 2463 } 2464 2465 void updateMetricsStore(boolean memstoreRead) { 2466 if (memstoreRead) { 2467 memstoreOnlyRowReadsCount.increment(); 2468 } else { 2469 mixedRowReadsCount.increment(); 2470 } 2471 } 2472 2473 /** 2474 * Return the storefiles which are currently being written to. Mainly used by 2475 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2476 * SFT yet. 2477 */ 2478 public Set<Path> getStoreFilesBeingWritten() { 2479 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2480 .collect(Collectors.toSet()); 2481 } 2482 2483 @Override 2484 public long getBloomFilterRequestsCount() { 2485 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2486 } 2487 2488 @Override 2489 public long getBloomFilterNegativeResultsCount() { 2490 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2491 } 2492 2493 @Override 2494 public long getBloomFilterEligibleRequestsCount() { 2495 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2496 } 2497}