001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.ExtendedCell; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.InnerStoreCellComparator; 066import org.apache.hadoop.hbase.MemoryCompactionPolicy; 067import org.apache.hadoop.hbase.MetaCellComparator; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.backup.FailedArchiveException; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.conf.ConfigurationManager; 074import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 075import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 076import org.apache.hadoop.hbase.io.HeapSize; 077import org.apache.hadoop.hbase.io.hfile.CacheConfig; 078import org.apache.hadoop.hbase.io.hfile.HFile; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 083import org.apache.hadoop.hbase.monitoring.MonitoredTask; 084import org.apache.hadoop.hbase.quotas.RegionSizeStore; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 089import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 090import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 091import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 092import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 093import org.apache.hadoop.hbase.security.EncryptionUtil; 094import org.apache.hadoop.hbase.security.User; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.ClassSize; 097import org.apache.hadoop.hbase.util.CommonFSUtils; 098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.hbase.util.ReflectionUtils; 101import org.apache.hadoop.util.StringUtils; 102import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 109import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 110import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 111import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 112import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 113import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 114 115import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 117 118/** 119 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 120 * which stretch backwards over time. 121 * <p> 122 * There's no reason to consider append-logging at this level; all logging and locking is handled at 123 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 124 * important of those services is compaction services where files are aggregated once they pass a 125 * configurable threshold. 126 * <p> 127 * Locking and transactions are handled at a higher level. This API should not be called directly 128 * but by an HRegion manager. 129 */ 130@InterfaceAudience.Private 131public class HStore 132 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 133 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 134 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 135 "hbase.server.compactchecker.interval.multiplier"; 136 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 137 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 138 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 139 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 140 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 141 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 142 143 // HBASE-24428 : Update compaction priority for recently split daughter regions 144 // so as to prioritize their compaction. 145 // Any compaction candidate with higher priority than compaction of newly split daugher regions 146 // should have priority value < (Integer.MIN_VALUE + 1000) 147 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 148 149 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 150 151 protected final MemStore memstore; 152 // This stores directory in the filesystem. 153 private final HRegion region; 154 protected Configuration conf; 155 private long lastCompactSize = 0; 156 volatile boolean forceMajor = false; 157 private AtomicLong storeSize = new AtomicLong(); 158 private AtomicLong totalUncompressedBytes = new AtomicLong(); 159 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 160 // rows that has cells from both memstore and files (or only files) 161 private LongAdder mixedRowReadsCount = new LongAdder(); 162 163 /** 164 * Lock specific to archiving compacted store files. This avoids races around the combination of 165 * retrieving the list of compacted files and moving them to the archive directory. Since this is 166 * usually a background process (other than on close), we don't want to handle this with the store 167 * write lock, which would block readers and degrade performance. Locked by: - 168 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 169 */ 170 final ReentrantLock archiveLock = new ReentrantLock(); 171 172 private final boolean verifyBulkLoads; 173 174 /** 175 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 176 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 177 * message that identifies the Store experience this high-level of concurrency. 178 */ 179 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 180 private final int parallelPutCountPrintThreshold; 181 182 private ScanInfo scanInfo; 183 184 // All access must be synchronized. 185 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 186 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 187 188 // All access must be synchronized. 189 private final Set<ChangedReadersObserver> changedReaderObservers = 190 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 191 192 private HFileDataBlockEncoder dataBlockEncoder; 193 194 final StoreEngine<?, ?, ?, ?> storeEngine; 195 196 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 197 private volatile OffPeakHours offPeakHours; 198 199 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 200 private int flushRetriesNumber; 201 private int pauseTime; 202 203 private long blockingFileCount; 204 private int compactionCheckMultiplier; 205 206 private AtomicLong flushedCellsCount = new AtomicLong(); 207 private AtomicLong compactedCellsCount = new AtomicLong(); 208 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 209 private AtomicLong flushedCellsSize = new AtomicLong(); 210 private AtomicLong flushedOutputFileSize = new AtomicLong(); 211 private AtomicLong compactedCellsSize = new AtomicLong(); 212 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 213 214 private final StoreContext storeContext; 215 216 // Used to track the store files which are currently being written. For compaction, if we want to 217 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 218 // track the store files being written when flushing. 219 // Notice that the creation is in the background compaction or flush thread and we will get the 220 // files in other thread, so it needs to be thread safe. 221 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 222 223 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 224 225 @Override 226 public void accept(Path t) { 227 files.add(t); 228 } 229 230 public Set<Path> get() { 231 return Collections.unmodifiableSet(files); 232 } 233 } 234 235 // We may have multiple compaction running at the same time, and flush can also happen at the same 236 // time, so here we need to use a collection, and the collection needs to be thread safe. 237 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 238 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 239 // IdentityHashMap if later we want to implement hashCode or equals. 240 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 241 Collections.newSetFromMap(new ConcurrentHashMap<>()); 242 243 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 244 // the broken store files under the data directory, which means we do not need to track the store 245 // file writer creation. So here we abstract a factory to return different trackers for different 246 // SFT implementations. 247 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 248 249 private final boolean warmup; 250 251 /** 252 * Constructor 253 * @param family HColumnDescriptor for this column 254 * @param confParam configuration object failed. Can be null. 255 */ 256 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 257 final Configuration confParam, boolean warmup) throws IOException { 258 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 259 260 this.region = region; 261 this.storeContext = initializeStoreContext(family); 262 263 // Assemble the store's home directory and Ensure it exists. 264 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 265 266 // set block storage policy for store directory 267 String policyName = family.getStoragePolicy(); 268 if (null == policyName) { 269 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 270 } 271 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 272 273 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 274 275 // used by ScanQueryMatcher 276 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 277 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 278 // Get TTL 279 long ttl = determineTTLFromFamily(family); 280 // Why not just pass a HColumnDescriptor in here altogether? Even if have 281 // to clone it? 282 scanInfo = 283 new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator()); 284 this.memstore = getMemstore(); 285 286 this.offPeakHours = OffPeakHours.getInstance(conf); 287 288 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 289 290 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 291 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 292 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 293 if (this.compactionCheckMultiplier <= 0) { 294 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 295 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 296 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 297 } 298 299 this.warmup = warmup; 300 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 301 storeEngine.initialize(warmup); 302 // if require writing to tmp dir first, then we just return null, which indicate that we do not 303 // need to track the creation of store file writer, otherwise we return a new 304 // StoreFileWriterCreationTracker. 305 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 306 ? () -> null 307 : () -> new StoreFileWriterCreationTracker(); 308 refreshStoreSizeAndTotalBytes(); 309 310 flushRetriesNumber = 311 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 312 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 313 if (flushRetriesNumber <= 0) { 314 throw new IllegalArgumentException( 315 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 316 } 317 318 int confPrintThreshold = 319 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 320 if (confPrintThreshold < 10) { 321 confPrintThreshold = 10; 322 } 323 this.parallelPutCountPrintThreshold = confPrintThreshold; 324 325 LOG.info( 326 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 327 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 328 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 329 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 330 } 331 332 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 333 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 334 .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family)) 335 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 336 .withCellComparator(region.getTableDescriptor().isMetaTable() || conf 337 .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR) 338 ? MetaCellComparator.META_COMPARATOR 339 : InnerStoreCellComparator.INNER_STORE_COMPARATOR) 340 .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles) 341 .withRegionFileSystem(region.getRegionFileSystem()) 342 .withFavoredNodesSupplier(this::getFavoredNodes) 343 .withFamilyStoreDirectoryPath( 344 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 345 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 346 } 347 348 private InetSocketAddress[] getFavoredNodes() { 349 InetSocketAddress[] favoredNodes = null; 350 if (region.getRegionServerServices() != null) { 351 favoredNodes = region.getRegionServerServices() 352 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 353 } 354 return favoredNodes; 355 } 356 357 /** Returns MemStore Instance to use in this store. */ 358 private MemStore getMemstore() { 359 MemStore ms = null; 360 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 361 MemoryCompactionPolicy inMemoryCompaction = null; 362 if (this.getTableName().isSystemTable()) { 363 inMemoryCompaction = MemoryCompactionPolicy 364 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE")); 365 } else { 366 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 367 } 368 if (inMemoryCompaction == null) { 369 inMemoryCompaction = 370 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 371 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 372 } 373 374 switch (inMemoryCompaction) { 375 case NONE: 376 Class<? extends MemStore> memStoreClass = 377 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 378 ms = ReflectionUtils.newInstance(memStoreClass, 379 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 380 break; 381 default: 382 Class<? extends CompactingMemStore> compactingMemStoreClass = 383 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 384 ms = 385 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 386 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 387 } 388 return ms; 389 } 390 391 /** 392 * Creates the cache config. 393 * @param family The current column family. 394 */ 395 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 396 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 397 region.getRegionServicesForStores().getByteBuffAllocator()); 398 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 399 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 400 return cacheConf; 401 } 402 403 /** 404 * Creates the store engine configured for the given Store. 405 * @param store The store. An unfortunate dependency needed due to it being passed to 406 * coprocessors via the compactor. 407 * @param conf Store configuration. 408 * @param kvComparator KVComparator for storeFileManager. 409 * @return StoreEngine to use. 410 */ 411 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 412 CellComparator kvComparator) throws IOException { 413 return StoreEngine.create(store, conf, kvComparator); 414 } 415 416 /** Returns TTL in seconds of the specified family */ 417 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 418 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 419 long ttl = family.getTimeToLive(); 420 if (ttl == HConstants.FOREVER) { 421 // Default is unlimited ttl. 422 ttl = Long.MAX_VALUE; 423 } else if (ttl == -1) { 424 ttl = Long.MAX_VALUE; 425 } else { 426 // Second -> ms adjust for user data 427 ttl *= 1000; 428 } 429 return ttl; 430 } 431 432 StoreContext getStoreContext() { 433 return storeContext; 434 } 435 436 @Override 437 public String getColumnFamilyName() { 438 return this.storeContext.getFamily().getNameAsString(); 439 } 440 441 @Override 442 public TableName getTableName() { 443 return this.getRegionInfo().getTable(); 444 } 445 446 @Override 447 public FileSystem getFileSystem() { 448 return storeContext.getRegionFileSystem().getFileSystem(); 449 } 450 451 public HRegionFileSystem getRegionFileSystem() { 452 return storeContext.getRegionFileSystem(); 453 } 454 455 /* Implementation of StoreConfigInformation */ 456 @Override 457 public long getStoreFileTtl() { 458 // TTL only applies if there's no MIN_VERSIONs setting on the column. 459 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 460 } 461 462 @Override 463 public long getMemStoreFlushSize() { 464 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 465 return this.region.memstoreFlushSize; 466 } 467 468 @Override 469 public MemStoreSize getFlushableSize() { 470 return this.memstore.getFlushableSize(); 471 } 472 473 @Override 474 public MemStoreSize getSnapshotSize() { 475 return this.memstore.getSnapshotSize(); 476 } 477 478 @Override 479 public long getCompactionCheckMultiplier() { 480 return this.compactionCheckMultiplier; 481 } 482 483 @Override 484 public long getBlockingFileCount() { 485 return blockingFileCount; 486 } 487 /* End implementation of StoreConfigInformation */ 488 489 @Override 490 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 491 return this.storeContext.getFamily(); 492 } 493 494 @Override 495 public OptionalLong getMaxSequenceId() { 496 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 497 } 498 499 @Override 500 public OptionalLong getMaxMemStoreTS() { 501 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 502 } 503 504 /** Returns the data block encoder */ 505 public HFileDataBlockEncoder getDataBlockEncoder() { 506 return dataBlockEncoder; 507 } 508 509 /** 510 * Should be used only in tests. 511 * @param blockEncoder the block delta encoder to use 512 */ 513 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 514 this.dataBlockEncoder = blockEncoder; 515 } 516 517 private void postRefreshStoreFiles() throws IOException { 518 // Advance the memstore read point to be at least the new store files seqIds so that 519 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 520 // in-flight transactions might be made visible) 521 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 522 refreshStoreSizeAndTotalBytes(); 523 } 524 525 @Override 526 public void refreshStoreFiles() throws IOException { 527 storeEngine.refreshStoreFiles(); 528 postRefreshStoreFiles(); 529 } 530 531 /** 532 * Replaces the store files that the store has with the given files. Mainly used by secondary 533 * region replicas to keep up to date with the primary region files. 534 */ 535 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 536 storeEngine.refreshStoreFiles(newFiles); 537 postRefreshStoreFiles(); 538 } 539 540 /** 541 * This message intends to inform the MemStore that next coming updates are going to be part of 542 * the replaying edits from WAL 543 */ 544 public void startReplayingFromWAL() { 545 this.memstore.startReplayingFromWAL(); 546 } 547 548 /** 549 * This message intends to inform the MemStore that the replaying edits from WAL are done 550 */ 551 public void stopReplayingFromWAL() { 552 this.memstore.stopReplayingFromWAL(); 553 } 554 555 /** 556 * Adds a value to the memstore 557 */ 558 public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) { 559 storeEngine.readLock(); 560 try { 561 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 562 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 563 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 564 } 565 this.memstore.add(cell, memstoreSizing); 566 } finally { 567 storeEngine.readUnlock(); 568 currentParallelPutCount.decrementAndGet(); 569 } 570 } 571 572 /** 573 * Adds the specified value to the memstore 574 */ 575 public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) { 576 storeEngine.readLock(); 577 try { 578 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 579 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 580 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 581 } 582 memstore.add(cells, memstoreSizing); 583 } finally { 584 storeEngine.readUnlock(); 585 currentParallelPutCount.decrementAndGet(); 586 } 587 } 588 589 @Override 590 public long timeOfOldestEdit() { 591 return memstore.timeOfOldestEdit(); 592 } 593 594 /** Returns All store files. */ 595 @Override 596 public Collection<HStoreFile> getStorefiles() { 597 return this.storeEngine.getStoreFileManager().getStoreFiles(); 598 } 599 600 @Override 601 public Collection<HStoreFile> getCompactedFiles() { 602 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 603 } 604 605 /** 606 * This throws a WrongRegionException if the HFile does not fit in this region, or an 607 * InvalidHFileException if the HFile is not valid. 608 */ 609 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 610 HFile.Reader reader = null; 611 try { 612 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 613 FileSystem srcFs = srcPath.getFileSystem(conf); 614 srcFs.access(srcPath, FsAction.READ_WRITE); 615 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 616 617 Optional<byte[]> firstKey = reader.getFirstRowKey(); 618 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 619 Optional<ExtendedCell> lk = reader.getLastKey(); 620 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 621 byte[] lastKey = CellUtil.cloneRow(lk.get()); 622 623 if (LOG.isDebugEnabled()) { 624 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 625 + Bytes.toStringBinary(lastKey)); 626 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 627 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 628 } 629 630 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 631 throw new WrongRegionException("Bulk load file " + srcPath.toString() 632 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 633 } 634 635 if ( 636 reader.length() 637 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 638 ) { 639 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 640 + " bytes can be problematic as it may lead to oversplitting."); 641 } 642 643 if (verifyBulkLoads) { 644 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 645 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 646 Cell prevCell = null; 647 HFileScanner scanner = reader.getScanner(conf, false, false, false); 648 scanner.seekTo(); 649 do { 650 Cell cell = scanner.getCell(); 651 if (prevCell != null) { 652 if (getComparator().compareRows(prevCell, cell) > 0) { 653 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 654 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 655 + CellUtil.getCellKeyAsString(cell)); 656 } 657 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 658 throw new InvalidHFileException("Previous key had different" 659 + " family compared to current key: path=" + srcPath + " previous=" 660 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 661 prevCell.getFamilyLength()) 662 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 663 cell.getFamilyLength())); 664 } 665 } 666 prevCell = cell; 667 } while (scanner.next()); 668 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 669 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 670 } 671 } finally { 672 if (reader != null) { 673 reader.close(); 674 } 675 } 676 } 677 678 /** 679 * This method should only be called from Region. It is assumed that the ranges of values in the 680 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 681 * @param seqNum sequence Id associated with the HFile 682 */ 683 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 684 Path srcPath = new Path(srcPathStr); 685 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 686 } 687 688 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 689 Path srcPath = new Path(srcPathStr); 690 try { 691 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 692 } finally { 693 if (this.getCoprocessorHost() != null) { 694 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 695 } 696 } 697 698 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 699 + " - updating store file list."); 700 701 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 702 bulkLoadHFile(sf); 703 704 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 705 706 return dstPath; 707 } 708 709 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 710 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 711 bulkLoadHFile(sf); 712 } 713 714 private void bulkLoadHFile(HStoreFile sf) throws IOException { 715 StoreFileReader r = sf.getReader(); 716 this.storeSize.addAndGet(r.length()); 717 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 718 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 719 }); 720 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 721 if (LOG.isTraceEnabled()) { 722 String traceMessage = "BULK LOAD time,size,store size,store files [" 723 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 724 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 725 LOG.trace(traceMessage); 726 } 727 } 728 729 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 730 memstore.close(); 731 // Clear so metrics doesn't find them. 732 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 733 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 734 // clear the compacted files 735 if (CollectionUtils.isNotEmpty(compactedfiles)) { 736 removeCompactedfiles(compactedfiles, 737 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 738 } 739 if (!result.isEmpty()) { 740 // initialize the thread pool for closing store files in parallel. 741 ThreadPoolExecutor storeFileCloserThreadPool = 742 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 743 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 744 745 // close each store file in parallel 746 CompletionService<Void> completionService = 747 new ExecutorCompletionService<>(storeFileCloserThreadPool); 748 for (HStoreFile f : result) { 749 completionService.submit(new Callable<Void>() { 750 @Override 751 public Void call() throws IOException { 752 boolean evictOnClose = 753 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 754 f.closeStoreFile(!warmup && evictOnClose); 755 return null; 756 } 757 }); 758 } 759 760 IOException ioe = null; 761 try { 762 for (int i = 0; i < result.size(); i++) { 763 try { 764 Future<Void> future = completionService.take(); 765 future.get(); 766 } catch (InterruptedException e) { 767 if (ioe == null) { 768 ioe = new InterruptedIOException(); 769 ioe.initCause(e); 770 } 771 } catch (ExecutionException e) { 772 if (ioe == null) { 773 ioe = new IOException(e.getCause()); 774 } 775 } 776 } 777 } finally { 778 storeFileCloserThreadPool.shutdownNow(); 779 } 780 if (ioe != null) { 781 throw ioe; 782 } 783 } 784 LOG.trace("Closed {}", this); 785 return result; 786 } 787 788 /** 789 * Close all the readers We don't need to worry about subsequent requests because the Region holds 790 * a write lock that will prevent any more reads or writes. 791 * @return the {@link StoreFile StoreFiles} that were previously being used. 792 * @throws IOException on failure 793 */ 794 public ImmutableCollection<HStoreFile> close() throws IOException { 795 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 796 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 797 // Change later if findbugs becomes smarter in the future. 798 this.archiveLock.lock(); 799 try { 800 this.storeEngine.writeLock(); 801 try { 802 return closeWithoutLock(); 803 } finally { 804 this.storeEngine.writeUnlock(); 805 } 806 } finally { 807 this.archiveLock.unlock(); 808 } 809 } 810 811 /** 812 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 813 * previously. 814 * @param logCacheFlushId flush sequence number 815 * @return The path name of the tmp file to which the store was flushed 816 * @throws IOException if exception occurs during process 817 */ 818 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 819 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 820 Consumer<Path> writerCreationTracker) throws IOException { 821 // If an exception happens flushing, we let it out without clearing 822 // the memstore snapshot. The old snapshot will be returned when we say 823 // 'snapshot', the next time flush comes around. 824 // Retry after catching exception when flushing, otherwise server will abort 825 // itself 826 StoreFlusher flusher = storeEngine.getStoreFlusher(); 827 IOException lastException = null; 828 for (int i = 0; i < flushRetriesNumber; i++) { 829 try { 830 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 831 throughputController, tracker, writerCreationTracker); 832 Path lastPathName = null; 833 try { 834 for (Path pathName : pathNames) { 835 lastPathName = pathName; 836 storeEngine.validateStoreFile(pathName); 837 } 838 return pathNames; 839 } catch (Exception e) { 840 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 841 if (e instanceof IOException) { 842 lastException = (IOException) e; 843 } else { 844 lastException = new IOException(e); 845 } 846 } 847 } catch (IOException e) { 848 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 849 lastException = e; 850 } 851 if (lastException != null && i < (flushRetriesNumber - 1)) { 852 try { 853 Thread.sleep(pauseTime); 854 } catch (InterruptedException e) { 855 IOException iie = new InterruptedIOException(); 856 iie.initCause(e); 857 throw iie; 858 } 859 } 860 } 861 throw lastException; 862 } 863 864 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 865 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 866 FileSystem srcFs = path.getFileSystem(conf); 867 srcFs.access(path, FsAction.READ_WRITE); 868 try (HFile.Reader reader = 869 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 870 Optional<byte[]> firstKey = reader.getFirstRowKey(); 871 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 872 Optional<ExtendedCell> lk = reader.getLastKey(); 873 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 874 byte[] lastKey = CellUtil.cloneRow(lk.get()); 875 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 876 throw new WrongRegionException("Recovered hfile " + path.toString() 877 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 878 } 879 } 880 881 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 882 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 883 StoreFileReader r = sf.getReader(); 884 this.storeSize.addAndGet(r.length()); 885 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 886 887 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 888 }); 889 890 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 891 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 892 return sf; 893 } 894 895 private long getTotalSize(Collection<HStoreFile> sfs) { 896 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 897 } 898 899 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 900 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 901 // close {@link DefaultMemStore#snapshot}, which may be used by 902 // {@link DefaultMemStore#getScanners}. 903 storeEngine.addStoreFiles(sfs, 904 // NOTE: here we must increase the refCount for storeFiles because we would open the 905 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 906 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 907 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 908 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 909 // storeEngine lock. see HBASE-27519 for more details. 910 snapshotId > 0 ? () -> { 911 this.memstore.clearSnapshot(snapshotId); 912 HStoreFile.increaseStoreFilesRefeCount(sfs); 913 } : () -> { 914 HStoreFile.increaseStoreFilesRefeCount(sfs); 915 }); 916 // notify to be called here - only in case of flushes 917 try { 918 notifyChangedReadersObservers(sfs); 919 } finally { 920 HStoreFile.decreaseStoreFilesRefeCount(sfs); 921 } 922 if (LOG.isTraceEnabled()) { 923 long totalSize = getTotalSize(sfs); 924 String traceMessage = "FLUSH time,count,size,store size,store files [" 925 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 926 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 927 LOG.trace(traceMessage); 928 } 929 return needsCompaction(); 930 } 931 932 /** 933 * Notify all observers that set of Readers has changed. 934 */ 935 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 936 for (ChangedReadersObserver o : this.changedReaderObservers) { 937 List<KeyValueScanner> memStoreScanners; 938 this.storeEngine.readLock(); 939 try { 940 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 941 } finally { 942 this.storeEngine.readUnlock(); 943 } 944 o.updateReaders(sfs, memStoreScanners); 945 } 946 } 947 948 /** 949 * Get all scanners with no filtering based on TTL (that happens further down the line). 950 * @param cacheBlocks cache the blocks or not 951 * @param usePread true to use pread, false if not 952 * @param isCompaction true if the scanner is created for compaction 953 * @param matcher the scan query matcher 954 * @param startRow the start row 955 * @param stopRow the stop row 956 * @param readPt the read point of the current scan 957 * @return all scanners for this store 958 */ 959 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 960 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, 961 boolean onlyLatestVersion) throws IOException { 962 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 963 readPt, onlyLatestVersion); 964 } 965 966 /** 967 * Get all scanners with no filtering based on TTL (that happens further down the line). 968 * @param cacheBlocks cache the blocks or not 969 * @param usePread true to use pread, false if not 970 * @param isCompaction true if the scanner is created for compaction 971 * @param matcher the scan query matcher 972 * @param startRow the start row 973 * @param includeStartRow true to include start row, false if not 974 * @param stopRow the stop row 975 * @param includeStopRow true to include stop row, false if not 976 * @param readPt the read point of the current scan 977 * @return all scanners for this store 978 */ 979 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 980 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 981 byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) 982 throws IOException { 983 Collection<HStoreFile> storeFilesToScan; 984 List<KeyValueScanner> memStoreScanners; 985 this.storeEngine.readLock(); 986 try { 987 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 988 includeStartRow, stopRow, includeStopRow, onlyLatestVersion); 989 memStoreScanners = this.memstore.getScanners(readPt); 990 // NOTE: here we must increase the refCount for storeFiles because we would open the 991 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 992 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 993 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 994 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 995 // for more details. 996 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 997 } finally { 998 this.storeEngine.readUnlock(); 999 } 1000 try { 1001 // First the store file scanners 1002 1003 // TODO this used to get the store files in descending order, 1004 // but now we get them in ascending order, which I think is 1005 // actually more correct, since memstore get put at the end. 1006 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 1007 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1008 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1009 scanners.addAll(sfScanners); 1010 // Then the memstore scanners 1011 scanners.addAll(memStoreScanners); 1012 return scanners; 1013 } catch (Throwable t) { 1014 clearAndClose(memStoreScanners); 1015 throw t instanceof IOException ? (IOException) t : new IOException(t); 1016 } finally { 1017 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1018 } 1019 } 1020 1021 private static void clearAndClose(List<KeyValueScanner> scanners) { 1022 if (scanners == null) { 1023 return; 1024 } 1025 for (KeyValueScanner s : scanners) { 1026 s.close(); 1027 } 1028 scanners.clear(); 1029 } 1030 1031 /** 1032 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1033 * (that happens further down the line). 1034 * @param files the list of files on which the scanners has to be created 1035 * @param cacheBlocks cache the blocks or not 1036 * @param usePread true to use pread, false if not 1037 * @param isCompaction true if the scanner is created for compaction 1038 * @param matcher the scan query matcher 1039 * @param startRow the start row 1040 * @param stopRow the stop row 1041 * @param readPt the read point of the current scan 1042 * @param includeMemstoreScanner true if memstore has to be included 1043 * @return scanners on the given files and on the memstore if specified 1044 */ 1045 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1046 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1047 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, 1048 boolean onlyLatestVersion) throws IOException { 1049 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1050 false, readPt, includeMemstoreScanner, onlyLatestVersion); 1051 } 1052 1053 /** 1054 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1055 * (that happens further down the line). 1056 * @param files the list of files on which the scanners has to be created 1057 * @param cacheBlocks ache the blocks or not 1058 * @param usePread true to use pread, false if not 1059 * @param isCompaction true if the scanner is created for compaction 1060 * @param matcher the scan query matcher 1061 * @param startRow the start row 1062 * @param includeStartRow true to include start row, false if not 1063 * @param stopRow the stop row 1064 * @param includeStopRow true to include stop row, false if not 1065 * @param readPt the read point of the current scan 1066 * @param includeMemstoreScanner true if memstore has to be included 1067 * @return scanners on the given files and on the memstore if specified 1068 */ 1069 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1070 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1071 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1072 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1073 List<KeyValueScanner> memStoreScanners = null; 1074 if (includeMemstoreScanner) { 1075 this.storeEngine.readLock(); 1076 try { 1077 memStoreScanners = this.memstore.getScanners(readPt); 1078 } finally { 1079 this.storeEngine.readUnlock(); 1080 } 1081 } 1082 try { 1083 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1084 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1085 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1086 scanners.addAll(sfScanners); 1087 // Then the memstore scanners 1088 if (memStoreScanners != null) { 1089 scanners.addAll(memStoreScanners); 1090 } 1091 return scanners; 1092 } catch (Throwable t) { 1093 clearAndClose(memStoreScanners); 1094 throw t instanceof IOException ? (IOException) t : new IOException(t); 1095 } 1096 } 1097 1098 /** 1099 * @param o Observer who wants to know about changes in set of Readers 1100 */ 1101 public void addChangedReaderObserver(ChangedReadersObserver o) { 1102 this.changedReaderObservers.add(o); 1103 } 1104 1105 /** 1106 * @param o Observer no longer interested in changes in set of Readers. 1107 */ 1108 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1109 // We don't check if observer present; it may not be (legitimately) 1110 this.changedReaderObservers.remove(o); 1111 } 1112 1113 ////////////////////////////////////////////////////////////////////////////// 1114 // Compaction 1115 ////////////////////////////////////////////////////////////////////////////// 1116 1117 /** 1118 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1119 * block for long periods. 1120 * <p> 1121 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1122 * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted 1123 * StoreFile is completely written-out to disk. 1124 * <p> 1125 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1126 * interfering with other write operations. 1127 * <p> 1128 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1129 * we want to allow cache-flushes during this period. 1130 * <p> 1131 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1132 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1133 * is why the following events are carefully ordered for a compaction: 1. Compaction writes new 1134 * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the 1135 * temporary file under region directory 3. Compaction appends a WAL edit containing the 1136 * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files 1137 * from the region directory. Failure conditions are handled like this: - If RS fails before 2, 1138 * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only 1139 * write the new data file to the region directory. Since we already have this data, this will be 1140 * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the 1141 * region will have a redundant copy of the data. The RS that failed won't be able to finish 1142 * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region 1143 * server who opens the region will pick up the the compaction marker from the WAL and replay it 1144 * by removing the compaction input files. Failed RS can also attempt to delete those files, but 1145 * the operation will be idempotent See HBASE-2231 for details. 1146 * @param compaction compaction details obtained from requestCompaction() 1147 * @return Storefile we compacted into or null if we failed or opted out early. 1148 */ 1149 public List<HStoreFile> compact(CompactionContext compaction, 1150 ThroughputController throughputController, User user) throws IOException { 1151 assert compaction != null; 1152 CompactionRequestImpl cr = compaction.getRequest(); 1153 StoreFileWriterCreationTracker writerCreationTracker = 1154 storeFileWriterCreationTrackerFactory.get(); 1155 if (writerCreationTracker != null) { 1156 cr.setWriterCreationTracker(writerCreationTracker); 1157 storeFileWriterCreationTrackers.add(writerCreationTracker); 1158 } 1159 try { 1160 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1161 // because we need to clean up after it on the way out in a finally 1162 // block below 1163 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1164 assert compaction.hasSelection(); 1165 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1166 assert !filesToCompact.isEmpty(); 1167 synchronized (filesCompacting) { 1168 // sanity check: we're compacting files that this store knows about 1169 // TODO: change this to LOG.error() after more debugging 1170 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1171 } 1172 1173 // Ready to go. Have list of files to compact. 1174 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1175 + getRegionFileSystem().getTempDir() + ", totalSize=" 1176 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1177 1178 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1179 compaction.compact(throughputController, user)); 1180 } finally { 1181 finishCompactionRequest(cr); 1182 } 1183 } 1184 1185 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1186 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1187 throws IOException { 1188 // Do the steps necessary to complete the compaction. 1189 setStoragePolicyFromFileName(newFiles); 1190 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true); 1191 if (this.getCoprocessorHost() != null) { 1192 for (HStoreFile sf : sfs) { 1193 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1194 } 1195 } 1196 replaceStoreFiles(filesToCompact, sfs, true); 1197 1198 long outputBytes = getTotalSize(sfs); 1199 1200 // At this point the store will use new files for all new scanners. 1201 refreshStoreSizeAndTotalBytes(); // update store size. 1202 1203 long now = EnvironmentEdgeManager.currentTime(); 1204 if ( 1205 region.getRegionServerServices() != null 1206 && region.getRegionServerServices().getMetrics() != null 1207 ) { 1208 region.getRegionServerServices().getMetrics().updateCompaction( 1209 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1210 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1211 outputBytes); 1212 1213 } 1214 1215 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1216 return sfs; 1217 } 1218 1219 // Set correct storage policy from the file name of DTCP. 1220 // Rename file will not change the storage policy. 1221 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1222 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1223 for (Path newFile : newFiles) { 1224 if (newFile.getParent().getName().startsWith(prefix)) { 1225 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1226 newFile.getParent().getName().substring(prefix.length())); 1227 } 1228 } 1229 } 1230 1231 /** 1232 * Writes the compaction WAL record. 1233 * @param filesCompacted Files compacted (input). 1234 * @param newFiles Files from compaction. 1235 */ 1236 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1237 Collection<HStoreFile> newFiles) throws IOException { 1238 if (region.getWAL() == null) { 1239 return; 1240 } 1241 List<Path> inputPaths = 1242 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1243 List<Path> outputPaths = 1244 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1245 RegionInfo info = this.region.getRegionInfo(); 1246 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1247 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1248 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1249 // Fix reaching into Region to get the maxWaitForSeqId. 1250 // Does this method belong in Region altogether given it is making so many references up there? 1251 // Could be Region#writeCompactionMarker(compactionDescriptor); 1252 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1253 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1254 region.getRegionReplicationSink().orElse(null)); 1255 } 1256 1257 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1258 allowedOnPath = ".*/(HStore|TestHStore).java") 1259 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1260 boolean writeCompactionMarker) throws IOException { 1261 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1262 if (writeCompactionMarker) { 1263 writeCompactionWalRecord(compactedFiles, result); 1264 } 1265 }, () -> { 1266 synchronized (filesCompacting) { 1267 filesCompacting.removeAll(compactedFiles); 1268 } 1269 }); 1270 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1271 // sizes later so it's not super-critical if we miss these. 1272 RegionServerServices rsServices = region.getRegionServerServices(); 1273 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1274 updateSpaceQuotaAfterFileReplacement( 1275 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1276 compactedFiles, result); 1277 } 1278 } 1279 1280 /** 1281 * Updates the space quota usage for this region, removing the size for files compacted away and 1282 * adding in the size for new files. 1283 * @param sizeStore The object tracking changes in region size for space quotas. 1284 * @param regionInfo The identifier for the region whose size is being updated. 1285 * @param oldFiles Files removed from this store's region. 1286 * @param newFiles Files added to this store's region. 1287 */ 1288 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1289 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1290 long delta = 0; 1291 if (oldFiles != null) { 1292 for (HStoreFile compactedFile : oldFiles) { 1293 if (compactedFile.isHFile()) { 1294 delta -= compactedFile.getReader().length(); 1295 } 1296 } 1297 } 1298 if (newFiles != null) { 1299 for (HStoreFile newFile : newFiles) { 1300 if (newFile.isHFile()) { 1301 delta += newFile.getReader().length(); 1302 } 1303 } 1304 } 1305 sizeStore.incrementRegionSize(regionInfo, delta); 1306 } 1307 1308 /** 1309 * Log a very elaborate compaction completion message. 1310 * @param cr Request. 1311 * @param sfs Resulting files. 1312 * @param compactionStartTime Start time. 1313 */ 1314 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1315 long compactionStartTime) { 1316 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1317 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1318 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1319 if (sfs.isEmpty()) { 1320 message.append("none, "); 1321 } else { 1322 for (HStoreFile sf : sfs) { 1323 message.append(sf.getPath().getName()); 1324 message.append("(size="); 1325 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1326 message.append("), "); 1327 } 1328 } 1329 message.append("total size for store is ") 1330 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1331 .append(". This selection was in queue for ") 1332 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1333 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1334 .append(" to execute."); 1335 LOG.info(message.toString()); 1336 if (LOG.isTraceEnabled()) { 1337 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1338 long resultSize = getTotalSize(sfs); 1339 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1340 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1341 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1342 LOG.trace(traceMessage); 1343 } 1344 } 1345 1346 /** 1347 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1348 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1349 */ 1350 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1351 boolean removeFiles) throws IOException { 1352 LOG.debug("Completing compaction from the WAL marker"); 1353 List<String> compactionInputs = compaction.getCompactionInputList(); 1354 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1355 1356 // The Compaction Marker is written after the compaction is completed, 1357 // and the files moved into the region/family folder. 1358 // 1359 // If we crash after the entry is written, we may not have removed the 1360 // input files, but the output file is present. 1361 // (The unremoved input files will be removed by this function) 1362 // 1363 // If we scan the directory and the file is not present, it can mean that: 1364 // - The file was manually removed by the user 1365 // - The file was removed as consequence of subsequent compaction 1366 // so, we can't do anything with the "compaction output list" because those 1367 // files have already been loaded when opening the region (by virtue of 1368 // being in the store's folder) or they may be missing due to a compaction. 1369 1370 String familyName = this.getColumnFamilyName(); 1371 Set<String> inputFiles = new HashSet<>(); 1372 for (String compactionInput : compactionInputs) { 1373 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1374 inputFiles.add(inputPath.getName()); 1375 } 1376 1377 // some of the input files might already be deleted 1378 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1379 for (HStoreFile sf : this.getStorefiles()) { 1380 if (inputFiles.contains(sf.getPath().getName())) { 1381 inputStoreFiles.add(sf); 1382 } 1383 } 1384 1385 // check whether we need to pick up the new files 1386 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1387 1388 if (pickCompactionFiles) { 1389 for (HStoreFile sf : this.getStorefiles()) { 1390 compactionOutputs.remove(sf.getPath().getName()); 1391 } 1392 for (String compactionOutput : compactionOutputs) { 1393 StoreFileInfo storeFileInfo = 1394 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1395 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1396 outputStoreFiles.add(storeFile); 1397 } 1398 } 1399 1400 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1401 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1402 + " with output files : " + outputStoreFiles); 1403 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1404 this.refreshStoreSizeAndTotalBytes(); 1405 } 1406 } 1407 1408 @Override 1409 public boolean hasReferences() { 1410 // Grab the read lock here, because we need to ensure that: only when the atomic 1411 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1412 this.storeEngine.readLock(); 1413 try { 1414 // Merge the current store files with compacted files here due to HBASE-20940. 1415 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1416 allStoreFiles.addAll(getCompactedFiles()); 1417 return StoreUtils.hasReferences(allStoreFiles); 1418 } finally { 1419 this.storeEngine.readUnlock(); 1420 } 1421 } 1422 1423 /** 1424 * getter for CompactionProgress object 1425 * @return CompactionProgress object; can be null 1426 */ 1427 public CompactionProgress getCompactionProgress() { 1428 return this.storeEngine.getCompactor().getProgress(); 1429 } 1430 1431 @Override 1432 public boolean shouldPerformMajorCompaction() throws IOException { 1433 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1434 // TODO: what are these reader checks all over the place? 1435 if (sf.getReader() == null) { 1436 LOG.debug("StoreFile {} has null Reader", sf); 1437 return false; 1438 } 1439 } 1440 return storeEngine.getCompactionPolicy() 1441 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); 1442 } 1443 1444 public Optional<CompactionContext> requestCompaction() throws IOException { 1445 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1446 } 1447 1448 public Optional<CompactionContext> requestCompaction(int priority, 1449 CompactionLifeCycleTracker tracker, User user) throws IOException { 1450 // don't even select for compaction if writes are disabled 1451 if (!this.areWritesEnabled()) { 1452 return Optional.empty(); 1453 } 1454 // Before we do compaction, try to get rid of unneeded files to simplify things. 1455 removeUnneededFiles(); 1456 1457 final CompactionContext compaction = storeEngine.createCompaction(); 1458 CompactionRequestImpl request = null; 1459 this.storeEngine.readLock(); 1460 try { 1461 synchronized (filesCompacting) { 1462 // First, see if coprocessor would want to override selection. 1463 if (this.getCoprocessorHost() != null) { 1464 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1465 boolean override = 1466 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1467 if (override) { 1468 // Coprocessor is overriding normal file selection. 1469 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1470 } 1471 } 1472 1473 // Normal case - coprocessor is not overriding file selection. 1474 if (!compaction.hasSelection()) { 1475 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1476 boolean mayUseOffPeak = 1477 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1478 try { 1479 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1480 forceMajor && filesCompacting.isEmpty()); 1481 } catch (IOException e) { 1482 if (mayUseOffPeak) { 1483 offPeakCompactionTracker.set(false); 1484 } 1485 throw e; 1486 } 1487 assert compaction.hasSelection(); 1488 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1489 // Compaction policy doesn't want to take advantage of off-peak. 1490 offPeakCompactionTracker.set(false); 1491 } 1492 } 1493 if (this.getCoprocessorHost() != null) { 1494 this.getCoprocessorHost().postCompactSelection(this, 1495 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1496 compaction.getRequest(), user); 1497 } 1498 // Finally, we have the resulting files list. Check if we have any files at all. 1499 request = compaction.getRequest(); 1500 Collection<HStoreFile> selectedFiles = request.getFiles(); 1501 if (selectedFiles.isEmpty()) { 1502 return Optional.empty(); 1503 } 1504 1505 addToCompactingFiles(selectedFiles); 1506 1507 // If we're enqueuing a major, clear the force flag. 1508 this.forceMajor = this.forceMajor && !request.isMajor(); 1509 1510 // Set common request properties. 1511 // Set priority, either override value supplied by caller or from store. 1512 final int compactionPriority = 1513 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1514 request.setPriority(compactionPriority); 1515 1516 if (request.isAfterSplit()) { 1517 // If the store belongs to recently splitted daughter regions, better we consider 1518 // them with the higher priority in the compaction queue. 1519 // Override priority if it is lower (higher int value) than 1520 // SPLIT_REGION_COMPACTION_PRIORITY 1521 final int splitHousekeepingPriority = 1522 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1523 request.setPriority(splitHousekeepingPriority); 1524 LOG.info( 1525 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1526 + " belongs to recently split daughter region {}", 1527 splitHousekeepingPriority, this.getColumnFamilyName(), 1528 getRegionInfo().getRegionNameAsString()); 1529 } 1530 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1531 request.setTracker(tracker); 1532 } 1533 } finally { 1534 this.storeEngine.readUnlock(); 1535 } 1536 1537 if (LOG.isDebugEnabled()) { 1538 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1539 + (request.isAllFiles() ? " (all files)" : "")); 1540 } 1541 this.region.reportCompactionRequestStart(request.isMajor()); 1542 return Optional.of(compaction); 1543 } 1544 1545 /** Adds the files to compacting files. filesCompacting must be locked. */ 1546 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1547 if (CollectionUtils.isEmpty(filesToAdd)) { 1548 return; 1549 } 1550 // Check that we do not try to compact the same StoreFile twice. 1551 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1552 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1553 } 1554 filesCompacting.addAll(filesToAdd); 1555 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1556 } 1557 1558 private void removeUnneededFiles() throws IOException { 1559 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1560 return; 1561 } 1562 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1563 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1564 getColumnFamilyDescriptor().getMinVersions()); 1565 return; 1566 } 1567 this.storeEngine.readLock(); 1568 Collection<HStoreFile> delSfs = null; 1569 try { 1570 synchronized (filesCompacting) { 1571 long cfTtl = getStoreFileTtl(); 1572 if (cfTtl != Long.MAX_VALUE) { 1573 delSfs = storeEngine.getStoreFileManager() 1574 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1575 addToCompactingFiles(delSfs); 1576 } 1577 } 1578 } finally { 1579 this.storeEngine.readUnlock(); 1580 } 1581 1582 if (CollectionUtils.isEmpty(delSfs)) { 1583 return; 1584 } 1585 1586 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1587 replaceStoreFiles(delSfs, newFiles, true); 1588 refreshStoreSizeAndTotalBytes(); 1589 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1590 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1591 } 1592 1593 public void cancelRequestedCompaction(CompactionContext compaction) { 1594 finishCompactionRequest(compaction.getRequest()); 1595 } 1596 1597 private void finishCompactionRequest(CompactionRequestImpl cr) { 1598 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1599 if (cr.isOffPeak()) { 1600 offPeakCompactionTracker.set(false); 1601 cr.setOffPeak(false); 1602 } 1603 synchronized (filesCompacting) { 1604 filesCompacting.removeAll(cr.getFiles()); 1605 } 1606 // The tracker could be null, for example, we do not need to track the creation of store file 1607 // writer due to different implementation of SFT, or the compaction is canceled. 1608 if (cr.getWriterCreationTracker() != null) { 1609 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1610 } 1611 } 1612 1613 /** 1614 * Update counts. 1615 */ 1616 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1617 this.storeSize.set(0L); 1618 this.totalUncompressedBytes.set(0L); 1619 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1620 StoreFileReader r = hsf.getReader(); 1621 if (r == null) { 1622 LOG.debug("StoreFile {} has a null Reader", hsf); 1623 continue; 1624 } 1625 this.storeSize.addAndGet(r.length()); 1626 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1627 } 1628 } 1629 1630 /* 1631 * @param wantedVersions How many versions were asked for. 1632 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1633 */ 1634 int versionsToReturn(final int wantedVersions) { 1635 if (wantedVersions <= 0) { 1636 throw new IllegalArgumentException("Number of versions must be > 0"); 1637 } 1638 // Make sure we do not return more than maximum versions for this store. 1639 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1640 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1641 } 1642 1643 @Override 1644 public boolean canSplit() { 1645 // Not split-able if we find a reference store file present in the store. 1646 boolean result = !hasReferences(); 1647 if (!result) { 1648 LOG.trace("Not splittable; has references: {}", this); 1649 } 1650 return result; 1651 } 1652 1653 /** 1654 * Determines if Store should be split. 1655 */ 1656 public Optional<byte[]> getSplitPoint() { 1657 this.storeEngine.readLock(); 1658 try { 1659 // Should already be enforced by the split policy! 1660 assert !this.getRegionInfo().isMetaRegion(); 1661 // Not split-able if we find a reference store file present in the store. 1662 if (hasReferences()) { 1663 LOG.trace("Not splittable; has references: {}", this); 1664 return Optional.empty(); 1665 } 1666 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1667 } catch (IOException e) { 1668 LOG.warn("Failed getting store size for {}", this, e); 1669 } finally { 1670 this.storeEngine.readUnlock(); 1671 } 1672 return Optional.empty(); 1673 } 1674 1675 @Override 1676 public long getLastCompactSize() { 1677 return this.lastCompactSize; 1678 } 1679 1680 @Override 1681 public long getSize() { 1682 return storeSize.get(); 1683 } 1684 1685 public void triggerMajorCompaction() { 1686 this.forceMajor = true; 1687 } 1688 1689 ////////////////////////////////////////////////////////////////////////////// 1690 // File administration 1691 ////////////////////////////////////////////////////////////////////////////// 1692 1693 /** 1694 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1695 * compaction. 1696 * @param scan Scan to apply when scanning the stores 1697 * @param targetCols columns to scan 1698 * @return a scanner over the current key values 1699 * @throws IOException on failure 1700 */ 1701 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1702 throws IOException { 1703 storeEngine.readLock(); 1704 try { 1705 ScanInfo scanInfo; 1706 if (this.getCoprocessorHost() != null) { 1707 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1708 } else { 1709 scanInfo = getScanInfo(); 1710 } 1711 return createScanner(scan, scanInfo, targetCols, readPt); 1712 } finally { 1713 storeEngine.readUnlock(); 1714 } 1715 } 1716 1717 // HMobStore will override this method to return its own implementation. 1718 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1719 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1720 return scan.isReversed() 1721 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1722 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1723 } 1724 1725 /** 1726 * Recreates the scanners on the current list of active store file scanners 1727 * @param currentFileScanners the current set of active store file scanners 1728 * @param cacheBlocks cache the blocks or not 1729 * @param usePread use pread or not 1730 * @param isCompaction is the scanner for compaction 1731 * @param matcher the scan query matcher 1732 * @param startRow the scan's start row 1733 * @param includeStartRow should the scan include the start row 1734 * @param stopRow the scan's stop row 1735 * @param includeStopRow should the scan include the stop row 1736 * @param readPt the read point of the current scane 1737 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1738 * @return list of scanners recreated on the current Scanners 1739 */ 1740 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1741 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1742 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1743 boolean includeMemstoreScanner) throws IOException { 1744 this.storeEngine.readLock(); 1745 try { 1746 Map<String, HStoreFile> name2File = 1747 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1748 for (HStoreFile file : getStorefiles()) { 1749 name2File.put(file.getFileInfo().getActiveFileName(), file); 1750 } 1751 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1752 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1753 name2File.put(file.getFileInfo().getActiveFileName(), file); 1754 } 1755 List<HStoreFile> filesToReopen = new ArrayList<>(); 1756 for (KeyValueScanner kvs : currentFileScanners) { 1757 assert kvs.isFileScanner(); 1758 if (kvs.peek() == null) { 1759 continue; 1760 } 1761 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1762 } 1763 if (filesToReopen.isEmpty()) { 1764 return null; 1765 } 1766 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1767 includeStartRow, stopRow, includeStopRow, readPt, false, false); 1768 } finally { 1769 this.storeEngine.readUnlock(); 1770 } 1771 } 1772 1773 @Override 1774 public String toString() { 1775 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1776 } 1777 1778 @Override 1779 public int getStorefilesCount() { 1780 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1781 } 1782 1783 @Override 1784 public int getCompactedFilesCount() { 1785 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1786 } 1787 1788 private LongStream getStoreFileAgeStream() { 1789 return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { 1790 if (sf.getReader() == null) { 1791 LOG.debug("StoreFile {} has a null Reader", sf); 1792 return false; 1793 } else { 1794 return true; 1795 } 1796 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1797 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1798 } 1799 1800 @Override 1801 public OptionalLong getMaxStoreFileAge() { 1802 return getStoreFileAgeStream().max(); 1803 } 1804 1805 @Override 1806 public OptionalLong getMinStoreFileAge() { 1807 return getStoreFileAgeStream().min(); 1808 } 1809 1810 @Override 1811 public OptionalDouble getAvgStoreFileAge() { 1812 return getStoreFileAgeStream().average(); 1813 } 1814 1815 @Override 1816 public long getNumReferenceFiles() { 1817 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1818 .filter(HStoreFile::isReference).count(); 1819 } 1820 1821 @Override 1822 public long getNumHFiles() { 1823 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1824 .filter(HStoreFile::isHFile).count(); 1825 } 1826 1827 @Override 1828 public long getStoreSizeUncompressed() { 1829 return this.totalUncompressedBytes.get(); 1830 } 1831 1832 @Override 1833 public long getStorefilesSize() { 1834 // Include all StoreFiles 1835 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1836 sf -> true); 1837 } 1838 1839 @Override 1840 public long getHFilesSize() { 1841 // Include only StoreFiles which are HFiles 1842 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1843 HStoreFile::isHFile); 1844 } 1845 1846 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1847 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1848 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1849 } 1850 1851 @Override 1852 public long getStorefilesRootLevelIndexSize() { 1853 return getStorefilesFieldSize(StoreFileReader::indexSize); 1854 } 1855 1856 @Override 1857 public long getTotalStaticIndexSize() { 1858 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1859 } 1860 1861 @Override 1862 public long getTotalStaticBloomSize() { 1863 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1864 } 1865 1866 @Override 1867 public MemStoreSize getMemStoreSize() { 1868 return this.memstore.size(); 1869 } 1870 1871 @Override 1872 public int getCompactPriority() { 1873 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1874 if (priority == PRIORITY_USER) { 1875 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1876 } 1877 return priority; 1878 } 1879 1880 public boolean throttleCompaction(long compactionSize) { 1881 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1882 } 1883 1884 public HRegion getHRegion() { 1885 return this.region; 1886 } 1887 1888 public RegionCoprocessorHost getCoprocessorHost() { 1889 return this.region.getCoprocessorHost(); 1890 } 1891 1892 @Override 1893 public RegionInfo getRegionInfo() { 1894 return getRegionFileSystem().getRegionInfo(); 1895 } 1896 1897 @Override 1898 public boolean areWritesEnabled() { 1899 return this.region.areWritesEnabled(); 1900 } 1901 1902 @Override 1903 public long getSmallestReadPoint() { 1904 return this.region.getSmallestReadPoint(); 1905 } 1906 1907 /** 1908 * Adds or replaces the specified KeyValues. 1909 * <p> 1910 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1911 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1912 * <p> 1913 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1914 * across all of them. 1915 * @param readpoint readpoint below which we can safely remove duplicate KVs 1916 */ 1917 public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1918 this.storeEngine.readLock(); 1919 try { 1920 this.memstore.upsert(cells, readpoint, memstoreSizing); 1921 } finally { 1922 this.storeEngine.readUnlock(); 1923 } 1924 } 1925 1926 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1927 return new StoreFlusherImpl(cacheFlushId, tracker); 1928 } 1929 1930 private final class StoreFlusherImpl implements StoreFlushContext { 1931 1932 private final FlushLifeCycleTracker tracker; 1933 private final StoreFileWriterCreationTracker writerCreationTracker; 1934 private final long cacheFlushSeqNum; 1935 private MemStoreSnapshot snapshot; 1936 private List<Path> tempFiles; 1937 private List<Path> committedFiles; 1938 private long cacheFlushCount; 1939 private long cacheFlushSize; 1940 private long outputFileSize; 1941 1942 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1943 this.cacheFlushSeqNum = cacheFlushSeqNum; 1944 this.tracker = tracker; 1945 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1946 } 1947 1948 /** 1949 * This is not thread safe. The caller should have a lock on the region or the store. If 1950 * necessary, the lock can be added with the patch provided in HBASE-10087 1951 */ 1952 @Override 1953 public MemStoreSize prepare() { 1954 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1955 this.snapshot = memstore.snapshot(); 1956 this.cacheFlushCount = snapshot.getCellsCount(); 1957 this.cacheFlushSize = snapshot.getDataSize(); 1958 committedFiles = new ArrayList<>(1); 1959 return snapshot.getMemStoreSize(); 1960 } 1961 1962 @Override 1963 public void flushCache(MonitoredTask status) throws IOException { 1964 RegionServerServices rsService = region.getRegionServerServices(); 1965 ThroughputController throughputController = 1966 rsService == null ? null : rsService.getFlushThroughputController(); 1967 // it could be null if we do not need to track the creation of store file writer due to 1968 // different SFT implementation. 1969 if (writerCreationTracker != null) { 1970 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1971 } 1972 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1973 tracker, writerCreationTracker); 1974 } 1975 1976 @Override 1977 public boolean commit(MonitoredTask status) throws IOException { 1978 try { 1979 if (CollectionUtils.isEmpty(this.tempFiles)) { 1980 return false; 1981 } 1982 status.setStatus("Flushing " + this + ": reopening flushed file"); 1983 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false); 1984 for (HStoreFile sf : storeFiles) { 1985 StoreFileReader r = sf.getReader(); 1986 if (LOG.isInfoEnabled()) { 1987 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 1988 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1989 } 1990 outputFileSize += r.length(); 1991 storeSize.addAndGet(r.length()); 1992 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1993 committedFiles.add(sf.getPath()); 1994 } 1995 1996 flushedCellsCount.addAndGet(cacheFlushCount); 1997 flushedCellsSize.addAndGet(cacheFlushSize); 1998 flushedOutputFileSize.addAndGet(outputFileSize); 1999 // call coprocessor after we have done all the accounting above 2000 for (HStoreFile sf : storeFiles) { 2001 if (getCoprocessorHost() != null) { 2002 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2003 } 2004 } 2005 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2006 return completeFlush(storeFiles, snapshot.getId()); 2007 } finally { 2008 if (writerCreationTracker != null) { 2009 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2010 } 2011 } 2012 } 2013 2014 @Override 2015 public long getOutputFileSize() { 2016 return outputFileSize; 2017 } 2018 2019 @Override 2020 public List<Path> getCommittedFiles() { 2021 return committedFiles; 2022 } 2023 2024 /** 2025 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2026 * primary region. Adds the new files to the store, and drops the snapshot depending on 2027 * dropMemstoreSnapshot argument. 2028 * @param fileNames names of the flushed files 2029 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2030 */ 2031 @Override 2032 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2033 throws IOException { 2034 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2035 for (String file : fileNames) { 2036 // open the file as a store file (hfile link, etc) 2037 StoreFileInfo storeFileInfo = 2038 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); 2039 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2040 storeFiles.add(storeFile); 2041 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2042 HStore.this.totalUncompressedBytes 2043 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2044 if (LOG.isInfoEnabled()) { 2045 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2046 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2047 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2048 } 2049 } 2050 2051 long snapshotId = -1; // -1 means do not drop 2052 if (dropMemstoreSnapshot && snapshot != null) { 2053 snapshotId = snapshot.getId(); 2054 } 2055 HStore.this.completeFlush(storeFiles, snapshotId); 2056 } 2057 2058 /** 2059 * Abort the snapshot preparation. Drops the snapshot if any. 2060 */ 2061 @Override 2062 public void abort() throws IOException { 2063 if (snapshot != null) { 2064 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2065 } 2066 } 2067 } 2068 2069 @Override 2070 public boolean needsCompaction() { 2071 List<HStoreFile> filesCompactingClone = null; 2072 synchronized (filesCompacting) { 2073 filesCompactingClone = Lists.newArrayList(filesCompacting); 2074 } 2075 return this.storeEngine.needsCompaction(filesCompactingClone); 2076 } 2077 2078 /** 2079 * Used for tests. 2080 * @return cache configuration for this Store. 2081 */ 2082 public CacheConfig getCacheConfig() { 2083 return storeContext.getCacheConf(); 2084 } 2085 2086 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2087 2088 public static final long DEEP_OVERHEAD = ClassSize.align( 2089 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2090 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2091 2092 @Override 2093 public long heapSize() { 2094 MemStoreSize memstoreSize = this.memstore.size(); 2095 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2096 } 2097 2098 @Override 2099 public CellComparator getComparator() { 2100 return storeContext.getComparator(); 2101 } 2102 2103 public ScanInfo getScanInfo() { 2104 return scanInfo; 2105 } 2106 2107 /** 2108 * Set scan info, used by test 2109 * @param scanInfo new scan info to use for test 2110 */ 2111 void setScanInfo(ScanInfo scanInfo) { 2112 this.scanInfo = scanInfo; 2113 } 2114 2115 @Override 2116 public boolean hasTooManyStoreFiles() { 2117 return getStorefilesCount() > this.blockingFileCount; 2118 } 2119 2120 @Override 2121 public long getFlushedCellsCount() { 2122 return flushedCellsCount.get(); 2123 } 2124 2125 @Override 2126 public long getFlushedCellsSize() { 2127 return flushedCellsSize.get(); 2128 } 2129 2130 @Override 2131 public long getFlushedOutputFileSize() { 2132 return flushedOutputFileSize.get(); 2133 } 2134 2135 @Override 2136 public long getCompactedCellsCount() { 2137 return compactedCellsCount.get(); 2138 } 2139 2140 @Override 2141 public long getCompactedCellsSize() { 2142 return compactedCellsSize.get(); 2143 } 2144 2145 @Override 2146 public long getMajorCompactedCellsCount() { 2147 return majorCompactedCellsCount.get(); 2148 } 2149 2150 @Override 2151 public long getMajorCompactedCellsSize() { 2152 return majorCompactedCellsSize.get(); 2153 } 2154 2155 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2156 if (isMajor) { 2157 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2158 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2159 } else { 2160 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2161 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2162 } 2163 } 2164 2165 /** 2166 * Returns the StoreEngine that is backing this concrete implementation of Store. 2167 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2168 */ 2169 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2170 return this.storeEngine; 2171 } 2172 2173 protected OffPeakHours getOffPeakHours() { 2174 return this.offPeakHours; 2175 } 2176 2177 @Override 2178 public void onConfigurationChange(Configuration conf) { 2179 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2180 getColumnFamilyDescriptor()); 2181 this.conf = storeConf; 2182 this.storeEngine.compactionPolicy.setConf(storeConf); 2183 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2184 } 2185 2186 /** 2187 * {@inheritDoc} 2188 */ 2189 @Override 2190 public void registerChildren(ConfigurationManager manager) { 2191 CacheConfig cacheConfig = this.storeContext.getCacheConf(); 2192 if (cacheConfig != null) { 2193 manager.registerObserver(cacheConfig); 2194 } 2195 } 2196 2197 /** 2198 * {@inheritDoc} 2199 */ 2200 @Override 2201 public void deregisterChildren(ConfigurationManager manager) { 2202 // No children to deregister 2203 } 2204 2205 @Override 2206 public double getCompactionPressure() { 2207 return storeEngine.getStoreFileManager().getCompactionPressure(); 2208 } 2209 2210 @Override 2211 public boolean isPrimaryReplicaStore() { 2212 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2213 } 2214 2215 /** 2216 * Sets the store up for a region level snapshot operation. 2217 * @see #postSnapshotOperation() 2218 */ 2219 public void preSnapshotOperation() { 2220 archiveLock.lock(); 2221 } 2222 2223 /** 2224 * Perform tasks needed after the completion of snapshot operation. 2225 * @see #preSnapshotOperation() 2226 */ 2227 public void postSnapshotOperation() { 2228 archiveLock.unlock(); 2229 } 2230 2231 /** 2232 * Closes and archives the compacted files under this store 2233 */ 2234 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2235 // ensure other threads do not attempt to archive the same files on close() 2236 archiveLock.lock(); 2237 try { 2238 storeEngine.readLock(); 2239 Collection<HStoreFile> copyCompactedfiles = null; 2240 try { 2241 Collection<HStoreFile> compactedfiles = 2242 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2243 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2244 // Do a copy under read lock 2245 copyCompactedfiles = new ArrayList<>(compactedfiles); 2246 } else { 2247 LOG.trace("No compacted files to archive"); 2248 } 2249 } finally { 2250 storeEngine.readUnlock(); 2251 } 2252 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2253 removeCompactedfiles(copyCompactedfiles, true); 2254 } 2255 } finally { 2256 archiveLock.unlock(); 2257 } 2258 } 2259 2260 /** 2261 * Archives and removes the compacted files 2262 * @param compactedfiles The compacted files in this store that are not active in reads 2263 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2264 * closed, false if not 2265 */ 2266 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2267 throws IOException { 2268 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2269 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2270 for (final HStoreFile file : compactedfiles) { 2271 synchronized (file) { 2272 try { 2273 StoreFileReader r = file.getReader(); 2274 if (r == null) { 2275 LOG.debug("The file {} was closed but still not archived", file); 2276 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2277 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2278 // for that. 2279 long length = getStoreFileSize(file); 2280 filesToRemove.add(file); 2281 storeFileSizes.add(length); 2282 continue; 2283 } 2284 2285 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2286 // Even if deleting fails we need not bother as any new scanners won't be 2287 // able to use the compacted file as the status is already compactedAway 2288 LOG.trace("Closing and archiving the file {}", file); 2289 // Copy the file size before closing the reader 2290 final long length = r.length(); 2291 r.close(evictOnClose); 2292 // Just close and return 2293 filesToRemove.add(file); 2294 // Only add the length if we successfully added the file to `filesToRemove` 2295 storeFileSizes.add(length); 2296 } else { 2297 LOG.info("Can't archive compacted file " + file.getPath() 2298 + " because of either isCompactedAway=" + file.isCompactedAway() 2299 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2300 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2301 } 2302 } catch (Exception e) { 2303 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2304 e); 2305 } 2306 } 2307 } 2308 if (this.isPrimaryReplicaStore()) { 2309 // Only the primary region is allowed to move the file to archive. 2310 // The secondary region does not move the files to archive. Any active reads from 2311 // the secondary region will still work because the file as such has active readers on it. 2312 if (!filesToRemove.isEmpty()) { 2313 LOG.debug("Moving the files {} to archive", filesToRemove); 2314 // Only if this is successful it has to be removed 2315 try { 2316 getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2317 filesToRemove); 2318 } catch (FailedArchiveException fae) { 2319 // Even if archiving some files failed, we still need to clear out any of the 2320 // files which were successfully archived. Otherwise we will receive a 2321 // FileNotFoundException when we attempt to re-archive them in the next go around. 2322 Collection<Path> failedFiles = fae.getFailedFiles(); 2323 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2324 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2325 while (iter.hasNext()) { 2326 sizeIter.next(); 2327 if (failedFiles.contains(iter.next().getPath())) { 2328 iter.remove(); 2329 sizeIter.remove(); 2330 } 2331 } 2332 if (!filesToRemove.isEmpty()) { 2333 clearCompactedfiles(filesToRemove); 2334 } 2335 throw fae; 2336 } 2337 } 2338 } 2339 if (!filesToRemove.isEmpty()) { 2340 // Clear the compactedfiles from the store file manager 2341 clearCompactedfiles(filesToRemove); 2342 // Try to send report of this archival to the Master for updating quota usage faster 2343 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2344 } 2345 } 2346 2347 /** 2348 * Computes the length of a store file without succumbing to any errors along the way. If an error 2349 * is encountered, the implementation returns {@code 0} instead of the actual size. 2350 * @param file The file to compute the size of. 2351 * @return The size in bytes of the provided {@code file}. 2352 */ 2353 long getStoreFileSize(HStoreFile file) { 2354 long length = 0; 2355 try { 2356 file.initReader(); 2357 length = file.getReader().length(); 2358 } catch (IOException e) { 2359 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2360 file, e); 2361 } finally { 2362 try { 2363 file.closeStoreFile( 2364 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2365 } catch (IOException e) { 2366 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2367 e); 2368 } 2369 } 2370 return length; 2371 } 2372 2373 public Long preFlushSeqIDEstimation() { 2374 return memstore.preFlushSeqIDEstimation(); 2375 } 2376 2377 @Override 2378 public boolean isSloppyMemStore() { 2379 return this.memstore.isSloppy(); 2380 } 2381 2382 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2383 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2384 storeEngine.removeCompactedFiles(filesToRemove); 2385 } 2386 2387 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2388 // Sanity check from the caller 2389 if (archivedFiles.size() != fileSizes.size()) { 2390 throw new RuntimeException("Coding error: should never see lists of varying size"); 2391 } 2392 RegionServerServices rss = this.region.getRegionServerServices(); 2393 if (rss == null) { 2394 return; 2395 } 2396 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2397 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2398 for (StoreFile storeFile : archivedFiles) { 2399 final long fileSize = fileSizeIter.next(); 2400 if (storeFile.isHFile() && fileSize != 0) { 2401 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2402 } 2403 } 2404 if (LOG.isTraceEnabled()) { 2405 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2406 + filesWithSizes); 2407 } 2408 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2409 if (!success) { 2410 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2411 } 2412 } 2413 2414 @Override 2415 public int getCurrentParallelPutCount() { 2416 return currentParallelPutCount.get(); 2417 } 2418 2419 public int getStoreRefCount() { 2420 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 2421 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2422 .mapToInt(HStoreFile::getRefCount).sum(); 2423 } 2424 2425 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2426 public int getMaxCompactedStoreFileRefCount() { 2427 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2428 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2429 .mapToInt(HStoreFile::getRefCount).max(); 2430 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2431 } 2432 2433 @Override 2434 public long getMemstoreOnlyRowReadsCount() { 2435 return memstoreOnlyRowReadsCount.sum(); 2436 } 2437 2438 @Override 2439 public long getMixedRowReadsCount() { 2440 return mixedRowReadsCount.sum(); 2441 } 2442 2443 @Override 2444 public Configuration getReadOnlyConfiguration() { 2445 return new ReadOnlyConfiguration(this.conf); 2446 } 2447 2448 void updateMetricsStore(boolean memstoreRead) { 2449 if (memstoreRead) { 2450 memstoreOnlyRowReadsCount.increment(); 2451 } else { 2452 mixedRowReadsCount.increment(); 2453 } 2454 } 2455 2456 /** 2457 * Return the storefiles which are currently being written to. Mainly used by 2458 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2459 * SFT yet. 2460 */ 2461 public Set<Path> getStoreFilesBeingWritten() { 2462 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2463 .collect(Collectors.toSet()); 2464 } 2465 2466 @Override 2467 public long getBloomFilterRequestsCount() { 2468 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2469 } 2470 2471 @Override 2472 public long getBloomFilterNegativeResultsCount() { 2473 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2474 } 2475 2476 @Override 2477 public long getBloomFilterEligibleRequestsCount() { 2478 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2479 } 2480}