001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.InnerStoreCellComparator; 065import org.apache.hadoop.hbase.MemoryCompactionPolicy; 066import org.apache.hadoop.hbase.MetaCellComparator; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.backup.FailedArchiveException; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.client.Scan; 072import org.apache.hadoop.hbase.conf.ConfigurationManager; 073import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 074import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 075import org.apache.hadoop.hbase.io.HeapSize; 076import org.apache.hadoop.hbase.io.hfile.CacheConfig; 077import org.apache.hadoop.hbase.io.hfile.HFile; 078import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 080import org.apache.hadoop.hbase.io.hfile.HFileScanner; 081import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 082import org.apache.hadoop.hbase.monitoring.MonitoredTask; 083import org.apache.hadoop.hbase.quotas.RegionSizeStore; 084import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 088import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 089import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 091import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 092import org.apache.hadoop.hbase.security.EncryptionUtil; 093import org.apache.hadoop.hbase.security.User; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.ClassSize; 096import org.apache.hadoop.hbase.util.CommonFSUtils; 097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.ReflectionUtils; 100import org.apache.hadoop.util.StringUtils; 101import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 102import org.apache.yetus.audience.InterfaceAudience; 103import org.slf4j.Logger; 104import org.slf4j.LoggerFactory; 105 106import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 107import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 108import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 109import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 110import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 111import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 112import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 113 114import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 116 117/** 118 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 119 * which stretch backwards over time. 120 * <p> 121 * There's no reason to consider append-logging at this level; all logging and locking is handled at 122 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 123 * important of those services is compaction services where files are aggregated once they pass a 124 * configurable threshold. 125 * <p> 126 * Locking and transactions are handled at a higher level. This API should not be called directly 127 * but by an HRegion manager. 128 */ 129@InterfaceAudience.Private 130public class HStore 131 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 132 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 133 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 134 "hbase.server.compactchecker.interval.multiplier"; 135 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 136 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 137 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 138 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 139 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 140 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 141 142 // HBASE-24428 : Update compaction priority for recently split daughter regions 143 // so as to prioritize their compaction. 144 // Any compaction candidate with higher priority than compaction of newly split daugher regions 145 // should have priority value < (Integer.MIN_VALUE + 1000) 146 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 147 148 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 149 150 protected final MemStore memstore; 151 // This stores directory in the filesystem. 152 private final HRegion region; 153 protected Configuration conf; 154 private long lastCompactSize = 0; 155 volatile boolean forceMajor = false; 156 private AtomicLong storeSize = new AtomicLong(); 157 private AtomicLong totalUncompressedBytes = new AtomicLong(); 158 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 159 // rows that has cells from both memstore and files (or only files) 160 private LongAdder mixedRowReadsCount = new LongAdder(); 161 162 /** 163 * Lock specific to archiving compacted store files. This avoids races around the combination of 164 * retrieving the list of compacted files and moving them to the archive directory. Since this is 165 * usually a background process (other than on close), we don't want to handle this with the store 166 * write lock, which would block readers and degrade performance. Locked by: - 167 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 168 */ 169 final ReentrantLock archiveLock = new ReentrantLock(); 170 171 private final boolean verifyBulkLoads; 172 173 /** 174 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 175 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 176 * message that identifies the Store experience this high-level of concurrency. 177 */ 178 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 179 private final int parallelPutCountPrintThreshold; 180 181 private ScanInfo scanInfo; 182 183 // All access must be synchronized. 184 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 185 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 186 187 // All access must be synchronized. 188 private final Set<ChangedReadersObserver> changedReaderObservers = 189 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 190 191 private HFileDataBlockEncoder dataBlockEncoder; 192 193 final StoreEngine<?, ?, ?, ?> storeEngine; 194 195 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 196 private volatile OffPeakHours offPeakHours; 197 198 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 199 private int flushRetriesNumber; 200 private int pauseTime; 201 202 private long blockingFileCount; 203 private int compactionCheckMultiplier; 204 205 private AtomicLong flushedCellsCount = new AtomicLong(); 206 private AtomicLong compactedCellsCount = new AtomicLong(); 207 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 208 private AtomicLong flushedCellsSize = new AtomicLong(); 209 private AtomicLong flushedOutputFileSize = new AtomicLong(); 210 private AtomicLong compactedCellsSize = new AtomicLong(); 211 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 212 213 private final StoreContext storeContext; 214 215 // Used to track the store files which are currently being written. For compaction, if we want to 216 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 217 // track the store files being written when flushing. 218 // Notice that the creation is in the background compaction or flush thread and we will get the 219 // files in other thread, so it needs to be thread safe. 220 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 221 222 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 223 224 @Override 225 public void accept(Path t) { 226 files.add(t); 227 } 228 229 public Set<Path> get() { 230 return Collections.unmodifiableSet(files); 231 } 232 } 233 234 // We may have multiple compaction running at the same time, and flush can also happen at the same 235 // time, so here we need to use a collection, and the collection needs to be thread safe. 236 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 237 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 238 // IdentityHashMap if later we want to implement hashCode or equals. 239 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 240 Collections.newSetFromMap(new ConcurrentHashMap<>()); 241 242 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 243 // the broken store files under the data directory, which means we do not need to track the store 244 // file writer creation. So here we abstract a factory to return different trackers for different 245 // SFT implementations. 246 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 247 248 private final boolean warmup; 249 250 /** 251 * Constructor 252 * @param family HColumnDescriptor for this column 253 * @param confParam configuration object failed. Can be null. 254 */ 255 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 256 final Configuration confParam, boolean warmup) throws IOException { 257 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 258 259 this.region = region; 260 this.storeContext = initializeStoreContext(family); 261 262 // Assemble the store's home directory and Ensure it exists. 263 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 264 265 // set block storage policy for store directory 266 String policyName = family.getStoragePolicy(); 267 if (null == policyName) { 268 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 269 } 270 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 271 272 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 273 274 // used by ScanQueryMatcher 275 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 276 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 277 // Get TTL 278 long ttl = determineTTLFromFamily(family); 279 // Why not just pass a HColumnDescriptor in here altogether? Even if have 280 // to clone it? 281 scanInfo = 282 new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator()); 283 this.memstore = getMemstore(); 284 285 this.offPeakHours = OffPeakHours.getInstance(conf); 286 287 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 288 289 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 290 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 291 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 292 if (this.compactionCheckMultiplier <= 0) { 293 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 294 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 295 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 296 } 297 298 this.warmup = warmup; 299 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 300 storeEngine.initialize(warmup); 301 // if require writing to tmp dir first, then we just return null, which indicate that we do not 302 // need to track the creation of store file writer, otherwise we return a new 303 // StoreFileWriterCreationTracker. 304 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 305 ? () -> null 306 : () -> new StoreFileWriterCreationTracker(); 307 refreshStoreSizeAndTotalBytes(); 308 309 flushRetriesNumber = 310 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 311 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 312 if (flushRetriesNumber <= 0) { 313 throw new IllegalArgumentException( 314 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 315 } 316 317 int confPrintThreshold = 318 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 319 if (confPrintThreshold < 10) { 320 confPrintThreshold = 10; 321 } 322 this.parallelPutCountPrintThreshold = confPrintThreshold; 323 324 LOG.info( 325 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 326 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 327 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 328 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 329 } 330 331 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 332 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 333 .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family)) 334 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 335 .withCellComparator(region.getTableDescriptor().isMetaTable() || conf 336 .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR) 337 ? MetaCellComparator.META_COMPARATOR 338 : InnerStoreCellComparator.INNER_STORE_COMPARATOR) 339 .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles) 340 .withRegionFileSystem(region.getRegionFileSystem()) 341 .withFavoredNodesSupplier(this::getFavoredNodes) 342 .withFamilyStoreDirectoryPath( 343 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 344 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 345 } 346 347 private InetSocketAddress[] getFavoredNodes() { 348 InetSocketAddress[] favoredNodes = null; 349 if (region.getRegionServerServices() != null) { 350 favoredNodes = region.getRegionServerServices() 351 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 352 } 353 return favoredNodes; 354 } 355 356 /** Returns MemStore Instance to use in this store. */ 357 private MemStore getMemstore() { 358 MemStore ms = null; 359 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 360 MemoryCompactionPolicy inMemoryCompaction = null; 361 if (this.getTableName().isSystemTable()) { 362 inMemoryCompaction = MemoryCompactionPolicy 363 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE")); 364 } else { 365 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 366 } 367 if (inMemoryCompaction == null) { 368 inMemoryCompaction = 369 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 370 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 371 } 372 373 switch (inMemoryCompaction) { 374 case NONE: 375 Class<? extends MemStore> memStoreClass = 376 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 377 ms = ReflectionUtils.newInstance(memStoreClass, 378 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 379 break; 380 default: 381 Class<? extends CompactingMemStore> compactingMemStoreClass = 382 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 383 ms = 384 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 385 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 386 } 387 return ms; 388 } 389 390 /** 391 * Creates the cache config. 392 * @param family The current column family. 393 */ 394 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 395 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 396 region.getRegionServicesForStores().getByteBuffAllocator()); 397 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 398 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 399 return cacheConf; 400 } 401 402 /** 403 * Creates the store engine configured for the given Store. 404 * @param store The store. An unfortunate dependency needed due to it being passed to 405 * coprocessors via the compactor. 406 * @param conf Store configuration. 407 * @param kvComparator KVComparator for storeFileManager. 408 * @return StoreEngine to use. 409 */ 410 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 411 CellComparator kvComparator) throws IOException { 412 return StoreEngine.create(store, conf, kvComparator); 413 } 414 415 /** Returns TTL in seconds of the specified family */ 416 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 417 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 418 long ttl = family.getTimeToLive(); 419 if (ttl == HConstants.FOREVER) { 420 // Default is unlimited ttl. 421 ttl = Long.MAX_VALUE; 422 } else if (ttl == -1) { 423 ttl = Long.MAX_VALUE; 424 } else { 425 // Second -> ms adjust for user data 426 ttl *= 1000; 427 } 428 return ttl; 429 } 430 431 StoreContext getStoreContext() { 432 return storeContext; 433 } 434 435 @Override 436 public String getColumnFamilyName() { 437 return this.storeContext.getFamily().getNameAsString(); 438 } 439 440 @Override 441 public TableName getTableName() { 442 return this.getRegionInfo().getTable(); 443 } 444 445 @Override 446 public FileSystem getFileSystem() { 447 return storeContext.getRegionFileSystem().getFileSystem(); 448 } 449 450 public HRegionFileSystem getRegionFileSystem() { 451 return storeContext.getRegionFileSystem(); 452 } 453 454 /* Implementation of StoreConfigInformation */ 455 @Override 456 public long getStoreFileTtl() { 457 // TTL only applies if there's no MIN_VERSIONs setting on the column. 458 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 459 } 460 461 @Override 462 public long getMemStoreFlushSize() { 463 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 464 return this.region.memstoreFlushSize; 465 } 466 467 @Override 468 public MemStoreSize getFlushableSize() { 469 return this.memstore.getFlushableSize(); 470 } 471 472 @Override 473 public MemStoreSize getSnapshotSize() { 474 return this.memstore.getSnapshotSize(); 475 } 476 477 @Override 478 public long getCompactionCheckMultiplier() { 479 return this.compactionCheckMultiplier; 480 } 481 482 @Override 483 public long getBlockingFileCount() { 484 return blockingFileCount; 485 } 486 /* End implementation of StoreConfigInformation */ 487 488 @Override 489 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 490 return this.storeContext.getFamily(); 491 } 492 493 @Override 494 public OptionalLong getMaxSequenceId() { 495 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 496 } 497 498 @Override 499 public OptionalLong getMaxMemStoreTS() { 500 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 501 } 502 503 /** Returns the data block encoder */ 504 public HFileDataBlockEncoder getDataBlockEncoder() { 505 return dataBlockEncoder; 506 } 507 508 /** 509 * Should be used only in tests. 510 * @param blockEncoder the block delta encoder to use 511 */ 512 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 513 this.dataBlockEncoder = blockEncoder; 514 } 515 516 private void postRefreshStoreFiles() throws IOException { 517 // Advance the memstore read point to be at least the new store files seqIds so that 518 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 519 // in-flight transactions might be made visible) 520 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 521 refreshStoreSizeAndTotalBytes(); 522 } 523 524 @Override 525 public void refreshStoreFiles() throws IOException { 526 storeEngine.refreshStoreFiles(); 527 postRefreshStoreFiles(); 528 } 529 530 /** 531 * Replaces the store files that the store has with the given files. Mainly used by secondary 532 * region replicas to keep up to date with the primary region files. 533 */ 534 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 535 storeEngine.refreshStoreFiles(newFiles); 536 postRefreshStoreFiles(); 537 } 538 539 /** 540 * This message intends to inform the MemStore that next coming updates are going to be part of 541 * the replaying edits from WAL 542 */ 543 public void startReplayingFromWAL() { 544 this.memstore.startReplayingFromWAL(); 545 } 546 547 /** 548 * This message intends to inform the MemStore that the replaying edits from WAL are done 549 */ 550 public void stopReplayingFromWAL() { 551 this.memstore.stopReplayingFromWAL(); 552 } 553 554 /** 555 * Adds a value to the memstore 556 */ 557 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 558 storeEngine.readLock(); 559 try { 560 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 561 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 562 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 563 } 564 this.memstore.add(cell, memstoreSizing); 565 } finally { 566 storeEngine.readUnlock(); 567 currentParallelPutCount.decrementAndGet(); 568 } 569 } 570 571 /** 572 * Adds the specified value to the memstore 573 */ 574 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 575 storeEngine.readLock(); 576 try { 577 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 578 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 579 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 580 } 581 memstore.add(cells, memstoreSizing); 582 } finally { 583 storeEngine.readUnlock(); 584 currentParallelPutCount.decrementAndGet(); 585 } 586 } 587 588 @Override 589 public long timeOfOldestEdit() { 590 return memstore.timeOfOldestEdit(); 591 } 592 593 /** Returns All store files. */ 594 @Override 595 public Collection<HStoreFile> getStorefiles() { 596 return this.storeEngine.getStoreFileManager().getStoreFiles(); 597 } 598 599 @Override 600 public Collection<HStoreFile> getCompactedFiles() { 601 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 602 } 603 604 /** 605 * This throws a WrongRegionException if the HFile does not fit in this region, or an 606 * InvalidHFileException if the HFile is not valid. 607 */ 608 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 609 HFile.Reader reader = null; 610 try { 611 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 612 FileSystem srcFs = srcPath.getFileSystem(conf); 613 srcFs.access(srcPath, FsAction.READ_WRITE); 614 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 615 616 Optional<byte[]> firstKey = reader.getFirstRowKey(); 617 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 618 Optional<Cell> lk = reader.getLastKey(); 619 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 620 byte[] lastKey = CellUtil.cloneRow(lk.get()); 621 622 if (LOG.isDebugEnabled()) { 623 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 624 + Bytes.toStringBinary(lastKey)); 625 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 626 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 627 } 628 629 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 630 throw new WrongRegionException("Bulk load file " + srcPath.toString() 631 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 632 } 633 634 if ( 635 reader.length() 636 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 637 ) { 638 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 639 + " bytes can be problematic as it may lead to oversplitting."); 640 } 641 642 if (verifyBulkLoads) { 643 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 644 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 645 Cell prevCell = null; 646 HFileScanner scanner = reader.getScanner(conf, false, false, false); 647 scanner.seekTo(); 648 do { 649 Cell cell = scanner.getCell(); 650 if (prevCell != null) { 651 if (getComparator().compareRows(prevCell, cell) > 0) { 652 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 653 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 654 + CellUtil.getCellKeyAsString(cell)); 655 } 656 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 657 throw new InvalidHFileException("Previous key had different" 658 + " family compared to current key: path=" + srcPath + " previous=" 659 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 660 prevCell.getFamilyLength()) 661 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 662 cell.getFamilyLength())); 663 } 664 } 665 prevCell = cell; 666 } while (scanner.next()); 667 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 668 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 669 } 670 } finally { 671 if (reader != null) { 672 reader.close(); 673 } 674 } 675 } 676 677 /** 678 * This method should only be called from Region. It is assumed that the ranges of values in the 679 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 680 * @param seqNum sequence Id associated with the HFile 681 */ 682 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 683 Path srcPath = new Path(srcPathStr); 684 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 685 } 686 687 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 688 Path srcPath = new Path(srcPathStr); 689 try { 690 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 691 } finally { 692 if (this.getCoprocessorHost() != null) { 693 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 694 } 695 } 696 697 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 698 + " - updating store file list."); 699 700 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 701 bulkLoadHFile(sf); 702 703 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 704 705 return dstPath; 706 } 707 708 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 709 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 710 bulkLoadHFile(sf); 711 } 712 713 private void bulkLoadHFile(HStoreFile sf) throws IOException { 714 StoreFileReader r = sf.getReader(); 715 this.storeSize.addAndGet(r.length()); 716 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 717 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 718 }); 719 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 720 if (LOG.isTraceEnabled()) { 721 String traceMessage = "BULK LOAD time,size,store size,store files [" 722 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 723 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 724 LOG.trace(traceMessage); 725 } 726 } 727 728 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 729 memstore.close(); 730 // Clear so metrics doesn't find them. 731 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 732 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 733 // clear the compacted files 734 if (CollectionUtils.isNotEmpty(compactedfiles)) { 735 removeCompactedfiles(compactedfiles, 736 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 737 } 738 if (!result.isEmpty()) { 739 // initialize the thread pool for closing store files in parallel. 740 ThreadPoolExecutor storeFileCloserThreadPool = 741 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 742 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 743 744 // close each store file in parallel 745 CompletionService<Void> completionService = 746 new ExecutorCompletionService<>(storeFileCloserThreadPool); 747 for (HStoreFile f : result) { 748 completionService.submit(new Callable<Void>() { 749 @Override 750 public Void call() throws IOException { 751 boolean evictOnClose = 752 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 753 f.closeStoreFile(!warmup && evictOnClose); 754 return null; 755 } 756 }); 757 } 758 759 IOException ioe = null; 760 try { 761 for (int i = 0; i < result.size(); i++) { 762 try { 763 Future<Void> future = completionService.take(); 764 future.get(); 765 } catch (InterruptedException e) { 766 if (ioe == null) { 767 ioe = new InterruptedIOException(); 768 ioe.initCause(e); 769 } 770 } catch (ExecutionException e) { 771 if (ioe == null) { 772 ioe = new IOException(e.getCause()); 773 } 774 } 775 } 776 } finally { 777 storeFileCloserThreadPool.shutdownNow(); 778 } 779 if (ioe != null) { 780 throw ioe; 781 } 782 } 783 LOG.trace("Closed {}", this); 784 return result; 785 } 786 787 /** 788 * Close all the readers We don't need to worry about subsequent requests because the Region holds 789 * a write lock that will prevent any more reads or writes. 790 * @return the {@link StoreFile StoreFiles} that were previously being used. 791 * @throws IOException on failure 792 */ 793 public ImmutableCollection<HStoreFile> close() throws IOException { 794 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 795 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 796 // Change later if findbugs becomes smarter in the future. 797 this.archiveLock.lock(); 798 try { 799 this.storeEngine.writeLock(); 800 try { 801 return closeWithoutLock(); 802 } finally { 803 this.storeEngine.writeUnlock(); 804 } 805 } finally { 806 this.archiveLock.unlock(); 807 } 808 } 809 810 /** 811 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 812 * previously. 813 * @param logCacheFlushId flush sequence number 814 * @return The path name of the tmp file to which the store was flushed 815 * @throws IOException if exception occurs during process 816 */ 817 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 818 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 819 Consumer<Path> writerCreationTracker) throws IOException { 820 // If an exception happens flushing, we let it out without clearing 821 // the memstore snapshot. The old snapshot will be returned when we say 822 // 'snapshot', the next time flush comes around. 823 // Retry after catching exception when flushing, otherwise server will abort 824 // itself 825 StoreFlusher flusher = storeEngine.getStoreFlusher(); 826 IOException lastException = null; 827 for (int i = 0; i < flushRetriesNumber; i++) { 828 try { 829 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 830 throughputController, tracker, writerCreationTracker); 831 Path lastPathName = null; 832 try { 833 for (Path pathName : pathNames) { 834 lastPathName = pathName; 835 storeEngine.validateStoreFile(pathName); 836 } 837 return pathNames; 838 } catch (Exception e) { 839 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 840 if (e instanceof IOException) { 841 lastException = (IOException) e; 842 } else { 843 lastException = new IOException(e); 844 } 845 } 846 } catch (IOException e) { 847 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 848 lastException = e; 849 } 850 if (lastException != null && i < (flushRetriesNumber - 1)) { 851 try { 852 Thread.sleep(pauseTime); 853 } catch (InterruptedException e) { 854 IOException iie = new InterruptedIOException(); 855 iie.initCause(e); 856 throw iie; 857 } 858 } 859 } 860 throw lastException; 861 } 862 863 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 864 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 865 FileSystem srcFs = path.getFileSystem(conf); 866 srcFs.access(path, FsAction.READ_WRITE); 867 try (HFile.Reader reader = 868 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 869 Optional<byte[]> firstKey = reader.getFirstRowKey(); 870 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 871 Optional<Cell> lk = reader.getLastKey(); 872 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 873 byte[] lastKey = CellUtil.cloneRow(lk.get()); 874 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 875 throw new WrongRegionException("Recovered hfile " + path.toString() 876 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 877 } 878 } 879 880 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 881 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 882 StoreFileReader r = sf.getReader(); 883 this.storeSize.addAndGet(r.length()); 884 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 885 886 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 887 }); 888 889 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 890 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 891 return sf; 892 } 893 894 private long getTotalSize(Collection<HStoreFile> sfs) { 895 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 896 } 897 898 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 899 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 900 // close {@link DefaultMemStore#snapshot}, which may be used by 901 // {@link DefaultMemStore#getScanners}. 902 storeEngine.addStoreFiles(sfs, 903 // NOTE: here we must increase the refCount for storeFiles because we would open the 904 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 905 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 906 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 907 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 908 // storeEngine lock. see HBASE-27519 for more details. 909 snapshotId > 0 ? () -> { 910 this.memstore.clearSnapshot(snapshotId); 911 HStoreFile.increaseStoreFilesRefeCount(sfs); 912 } : () -> { 913 HStoreFile.increaseStoreFilesRefeCount(sfs); 914 }); 915 // notify to be called here - only in case of flushes 916 try { 917 notifyChangedReadersObservers(sfs); 918 } finally { 919 HStoreFile.decreaseStoreFilesRefeCount(sfs); 920 } 921 if (LOG.isTraceEnabled()) { 922 long totalSize = getTotalSize(sfs); 923 String traceMessage = "FLUSH time,count,size,store size,store files [" 924 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 925 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 926 LOG.trace(traceMessage); 927 } 928 return needsCompaction(); 929 } 930 931 /** 932 * Notify all observers that set of Readers has changed. 933 */ 934 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 935 for (ChangedReadersObserver o : this.changedReaderObservers) { 936 List<KeyValueScanner> memStoreScanners; 937 this.storeEngine.readLock(); 938 try { 939 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 940 } finally { 941 this.storeEngine.readUnlock(); 942 } 943 o.updateReaders(sfs, memStoreScanners); 944 } 945 } 946 947 /** 948 * Get all scanners with no filtering based on TTL (that happens further down the line). 949 * @param cacheBlocks cache the blocks or not 950 * @param usePread true to use pread, false if not 951 * @param isCompaction true if the scanner is created for compaction 952 * @param matcher the scan query matcher 953 * @param startRow the start row 954 * @param stopRow the stop row 955 * @param readPt the read point of the current scan 956 * @return all scanners for this store 957 */ 958 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 959 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, 960 boolean onlyLatestVersion) throws IOException { 961 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 962 readPt, onlyLatestVersion); 963 } 964 965 /** 966 * Get all scanners with no filtering based on TTL (that happens further down the line). 967 * @param cacheBlocks cache the blocks or not 968 * @param usePread true to use pread, false if not 969 * @param isCompaction true if the scanner is created for compaction 970 * @param matcher the scan query matcher 971 * @param startRow the start row 972 * @param includeStartRow true to include start row, false if not 973 * @param stopRow the stop row 974 * @param includeStopRow true to include stop row, false if not 975 * @param readPt the read point of the current scan 976 * @return all scanners for this store 977 */ 978 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 979 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 980 byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) 981 throws IOException { 982 Collection<HStoreFile> storeFilesToScan; 983 List<KeyValueScanner> memStoreScanners; 984 this.storeEngine.readLock(); 985 try { 986 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 987 includeStartRow, stopRow, includeStopRow, onlyLatestVersion); 988 memStoreScanners = this.memstore.getScanners(readPt); 989 // NOTE: here we must increase the refCount for storeFiles because we would open the 990 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 991 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 992 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 993 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 994 // for more details. 995 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 996 } finally { 997 this.storeEngine.readUnlock(); 998 } 999 try { 1000 // First the store file scanners 1001 1002 // TODO this used to get the store files in descending order, 1003 // but now we get them in ascending order, which I think is 1004 // actually more correct, since memstore get put at the end. 1005 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 1006 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1007 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1008 scanners.addAll(sfScanners); 1009 // Then the memstore scanners 1010 scanners.addAll(memStoreScanners); 1011 return scanners; 1012 } catch (Throwable t) { 1013 clearAndClose(memStoreScanners); 1014 throw t instanceof IOException ? (IOException) t : new IOException(t); 1015 } finally { 1016 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1017 } 1018 } 1019 1020 private static void clearAndClose(List<KeyValueScanner> scanners) { 1021 if (scanners == null) { 1022 return; 1023 } 1024 for (KeyValueScanner s : scanners) { 1025 s.close(); 1026 } 1027 scanners.clear(); 1028 } 1029 1030 /** 1031 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1032 * (that happens further down the line). 1033 * @param files the list of files on which the scanners has to be created 1034 * @param cacheBlocks cache the blocks or not 1035 * @param usePread true to use pread, false if not 1036 * @param isCompaction true if the scanner is created for compaction 1037 * @param matcher the scan query matcher 1038 * @param startRow the start row 1039 * @param stopRow the stop row 1040 * @param readPt the read point of the current scan 1041 * @param includeMemstoreScanner true if memstore has to be included 1042 * @return scanners on the given files and on the memstore if specified 1043 */ 1044 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1045 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1046 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, 1047 boolean onlyLatestVersion) throws IOException { 1048 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1049 false, readPt, includeMemstoreScanner, onlyLatestVersion); 1050 } 1051 1052 /** 1053 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1054 * (that happens further down the line). 1055 * @param files the list of files on which the scanners has to be created 1056 * @param cacheBlocks ache the blocks or not 1057 * @param usePread true to use pread, false if not 1058 * @param isCompaction true if the scanner is created for compaction 1059 * @param matcher the scan query matcher 1060 * @param startRow the start row 1061 * @param includeStartRow true to include start row, false if not 1062 * @param stopRow the stop row 1063 * @param includeStopRow true to include stop row, false if not 1064 * @param readPt the read point of the current scan 1065 * @param includeMemstoreScanner true if memstore has to be included 1066 * @return scanners on the given files and on the memstore if specified 1067 */ 1068 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1069 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1070 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1071 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1072 List<KeyValueScanner> memStoreScanners = null; 1073 if (includeMemstoreScanner) { 1074 this.storeEngine.readLock(); 1075 try { 1076 memStoreScanners = this.memstore.getScanners(readPt); 1077 } finally { 1078 this.storeEngine.readUnlock(); 1079 } 1080 } 1081 try { 1082 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1083 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1084 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1085 scanners.addAll(sfScanners); 1086 // Then the memstore scanners 1087 if (memStoreScanners != null) { 1088 scanners.addAll(memStoreScanners); 1089 } 1090 return scanners; 1091 } catch (Throwable t) { 1092 clearAndClose(memStoreScanners); 1093 throw t instanceof IOException ? (IOException) t : new IOException(t); 1094 } 1095 } 1096 1097 /** 1098 * @param o Observer who wants to know about changes in set of Readers 1099 */ 1100 public void addChangedReaderObserver(ChangedReadersObserver o) { 1101 this.changedReaderObservers.add(o); 1102 } 1103 1104 /** 1105 * @param o Observer no longer interested in changes in set of Readers. 1106 */ 1107 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1108 // We don't check if observer present; it may not be (legitimately) 1109 this.changedReaderObservers.remove(o); 1110 } 1111 1112 ////////////////////////////////////////////////////////////////////////////// 1113 // Compaction 1114 ////////////////////////////////////////////////////////////////////////////// 1115 1116 /** 1117 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1118 * block for long periods. 1119 * <p> 1120 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1121 * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted 1122 * StoreFile is completely written-out to disk. 1123 * <p> 1124 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1125 * interfering with other write operations. 1126 * <p> 1127 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1128 * we want to allow cache-flushes during this period. 1129 * <p> 1130 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1131 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1132 * is why the following events are carefully ordered for a compaction: 1. Compaction writes new 1133 * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the 1134 * temporary file under region directory 3. Compaction appends a WAL edit containing the 1135 * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files 1136 * from the region directory. Failure conditions are handled like this: - If RS fails before 2, 1137 * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only 1138 * write the new data file to the region directory. Since we already have this data, this will be 1139 * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the 1140 * region will have a redundant copy of the data. The RS that failed won't be able to finish 1141 * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region 1142 * server who opens the region will pick up the the compaction marker from the WAL and replay it 1143 * by removing the compaction input files. Failed RS can also attempt to delete those files, but 1144 * the operation will be idempotent See HBASE-2231 for details. 1145 * @param compaction compaction details obtained from requestCompaction() 1146 * @return Storefile we compacted into or null if we failed or opted out early. 1147 */ 1148 public List<HStoreFile> compact(CompactionContext compaction, 1149 ThroughputController throughputController, User user) throws IOException { 1150 assert compaction != null; 1151 CompactionRequestImpl cr = compaction.getRequest(); 1152 StoreFileWriterCreationTracker writerCreationTracker = 1153 storeFileWriterCreationTrackerFactory.get(); 1154 if (writerCreationTracker != null) { 1155 cr.setWriterCreationTracker(writerCreationTracker); 1156 storeFileWriterCreationTrackers.add(writerCreationTracker); 1157 } 1158 try { 1159 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1160 // because we need to clean up after it on the way out in a finally 1161 // block below 1162 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1163 assert compaction.hasSelection(); 1164 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1165 assert !filesToCompact.isEmpty(); 1166 synchronized (filesCompacting) { 1167 // sanity check: we're compacting files that this store knows about 1168 // TODO: change this to LOG.error() after more debugging 1169 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1170 } 1171 1172 // Ready to go. Have list of files to compact. 1173 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1174 + getRegionFileSystem().getTempDir() + ", totalSize=" 1175 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1176 1177 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1178 compaction.compact(throughputController, user)); 1179 } finally { 1180 finishCompactionRequest(cr); 1181 } 1182 } 1183 1184 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1185 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1186 throws IOException { 1187 // Do the steps necessary to complete the compaction. 1188 setStoragePolicyFromFileName(newFiles); 1189 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true); 1190 if (this.getCoprocessorHost() != null) { 1191 for (HStoreFile sf : sfs) { 1192 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1193 } 1194 } 1195 replaceStoreFiles(filesToCompact, sfs, true); 1196 1197 long outputBytes = getTotalSize(sfs); 1198 1199 // At this point the store will use new files for all new scanners. 1200 refreshStoreSizeAndTotalBytes(); // update store size. 1201 1202 long now = EnvironmentEdgeManager.currentTime(); 1203 if ( 1204 region.getRegionServerServices() != null 1205 && region.getRegionServerServices().getMetrics() != null 1206 ) { 1207 region.getRegionServerServices().getMetrics().updateCompaction( 1208 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1209 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1210 outputBytes); 1211 1212 } 1213 1214 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1215 return sfs; 1216 } 1217 1218 // Set correct storage policy from the file name of DTCP. 1219 // Rename file will not change the storage policy. 1220 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1221 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1222 for (Path newFile : newFiles) { 1223 if (newFile.getParent().getName().startsWith(prefix)) { 1224 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1225 newFile.getParent().getName().substring(prefix.length())); 1226 } 1227 } 1228 } 1229 1230 /** 1231 * Writes the compaction WAL record. 1232 * @param filesCompacted Files compacted (input). 1233 * @param newFiles Files from compaction. 1234 */ 1235 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1236 Collection<HStoreFile> newFiles) throws IOException { 1237 if (region.getWAL() == null) { 1238 return; 1239 } 1240 List<Path> inputPaths = 1241 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1242 List<Path> outputPaths = 1243 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1244 RegionInfo info = this.region.getRegionInfo(); 1245 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1246 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1247 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1248 // Fix reaching into Region to get the maxWaitForSeqId. 1249 // Does this method belong in Region altogether given it is making so many references up there? 1250 // Could be Region#writeCompactionMarker(compactionDescriptor); 1251 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1252 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1253 region.getRegionReplicationSink().orElse(null)); 1254 } 1255 1256 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1257 allowedOnPath = ".*/(HStore|TestHStore).java") 1258 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1259 boolean writeCompactionMarker) throws IOException { 1260 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1261 if (writeCompactionMarker) { 1262 writeCompactionWalRecord(compactedFiles, result); 1263 } 1264 }, () -> { 1265 synchronized (filesCompacting) { 1266 filesCompacting.removeAll(compactedFiles); 1267 } 1268 }); 1269 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1270 // sizes later so it's not super-critical if we miss these. 1271 RegionServerServices rsServices = region.getRegionServerServices(); 1272 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1273 updateSpaceQuotaAfterFileReplacement( 1274 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1275 compactedFiles, result); 1276 } 1277 } 1278 1279 /** 1280 * Updates the space quota usage for this region, removing the size for files compacted away and 1281 * adding in the size for new files. 1282 * @param sizeStore The object tracking changes in region size for space quotas. 1283 * @param regionInfo The identifier for the region whose size is being updated. 1284 * @param oldFiles Files removed from this store's region. 1285 * @param newFiles Files added to this store's region. 1286 */ 1287 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1288 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1289 long delta = 0; 1290 if (oldFiles != null) { 1291 for (HStoreFile compactedFile : oldFiles) { 1292 if (compactedFile.isHFile()) { 1293 delta -= compactedFile.getReader().length(); 1294 } 1295 } 1296 } 1297 if (newFiles != null) { 1298 for (HStoreFile newFile : newFiles) { 1299 if (newFile.isHFile()) { 1300 delta += newFile.getReader().length(); 1301 } 1302 } 1303 } 1304 sizeStore.incrementRegionSize(regionInfo, delta); 1305 } 1306 1307 /** 1308 * Log a very elaborate compaction completion message. 1309 * @param cr Request. 1310 * @param sfs Resulting files. 1311 * @param compactionStartTime Start time. 1312 */ 1313 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1314 long compactionStartTime) { 1315 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1316 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1317 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1318 if (sfs.isEmpty()) { 1319 message.append("none, "); 1320 } else { 1321 for (HStoreFile sf : sfs) { 1322 message.append(sf.getPath().getName()); 1323 message.append("(size="); 1324 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1325 message.append("), "); 1326 } 1327 } 1328 message.append("total size for store is ") 1329 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1330 .append(". This selection was in queue for ") 1331 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1332 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1333 .append(" to execute."); 1334 LOG.info(message.toString()); 1335 if (LOG.isTraceEnabled()) { 1336 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1337 long resultSize = getTotalSize(sfs); 1338 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1339 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1340 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1341 LOG.trace(traceMessage); 1342 } 1343 } 1344 1345 /** 1346 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1347 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1348 */ 1349 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1350 boolean removeFiles) throws IOException { 1351 LOG.debug("Completing compaction from the WAL marker"); 1352 List<String> compactionInputs = compaction.getCompactionInputList(); 1353 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1354 1355 // The Compaction Marker is written after the compaction is completed, 1356 // and the files moved into the region/family folder. 1357 // 1358 // If we crash after the entry is written, we may not have removed the 1359 // input files, but the output file is present. 1360 // (The unremoved input files will be removed by this function) 1361 // 1362 // If we scan the directory and the file is not present, it can mean that: 1363 // - The file was manually removed by the user 1364 // - The file was removed as consequence of subsequent compaction 1365 // so, we can't do anything with the "compaction output list" because those 1366 // files have already been loaded when opening the region (by virtue of 1367 // being in the store's folder) or they may be missing due to a compaction. 1368 1369 String familyName = this.getColumnFamilyName(); 1370 Set<String> inputFiles = new HashSet<>(); 1371 for (String compactionInput : compactionInputs) { 1372 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1373 inputFiles.add(inputPath.getName()); 1374 } 1375 1376 // some of the input files might already be deleted 1377 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1378 for (HStoreFile sf : this.getStorefiles()) { 1379 if (inputFiles.contains(sf.getPath().getName())) { 1380 inputStoreFiles.add(sf); 1381 } 1382 } 1383 1384 // check whether we need to pick up the new files 1385 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1386 1387 if (pickCompactionFiles) { 1388 for (HStoreFile sf : this.getStorefiles()) { 1389 compactionOutputs.remove(sf.getPath().getName()); 1390 } 1391 for (String compactionOutput : compactionOutputs) { 1392 StoreFileInfo storeFileInfo = 1393 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1394 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1395 outputStoreFiles.add(storeFile); 1396 } 1397 } 1398 1399 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1400 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1401 + " with output files : " + outputStoreFiles); 1402 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1403 this.refreshStoreSizeAndTotalBytes(); 1404 } 1405 } 1406 1407 @Override 1408 public boolean hasReferences() { 1409 // Grab the read lock here, because we need to ensure that: only when the atomic 1410 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1411 this.storeEngine.readLock(); 1412 try { 1413 // Merge the current store files with compacted files here due to HBASE-20940. 1414 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1415 allStoreFiles.addAll(getCompactedFiles()); 1416 return StoreUtils.hasReferences(allStoreFiles); 1417 } finally { 1418 this.storeEngine.readUnlock(); 1419 } 1420 } 1421 1422 /** 1423 * getter for CompactionProgress object 1424 * @return CompactionProgress object; can be null 1425 */ 1426 public CompactionProgress getCompactionProgress() { 1427 return this.storeEngine.getCompactor().getProgress(); 1428 } 1429 1430 @Override 1431 public boolean shouldPerformMajorCompaction() throws IOException { 1432 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1433 // TODO: what are these reader checks all over the place? 1434 if (sf.getReader() == null) { 1435 LOG.debug("StoreFile {} has null Reader", sf); 1436 return false; 1437 } 1438 } 1439 return storeEngine.getCompactionPolicy() 1440 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); 1441 } 1442 1443 public Optional<CompactionContext> requestCompaction() throws IOException { 1444 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1445 } 1446 1447 public Optional<CompactionContext> requestCompaction(int priority, 1448 CompactionLifeCycleTracker tracker, User user) throws IOException { 1449 // don't even select for compaction if writes are disabled 1450 if (!this.areWritesEnabled()) { 1451 return Optional.empty(); 1452 } 1453 // Before we do compaction, try to get rid of unneeded files to simplify things. 1454 removeUnneededFiles(); 1455 1456 final CompactionContext compaction = storeEngine.createCompaction(); 1457 CompactionRequestImpl request = null; 1458 this.storeEngine.readLock(); 1459 try { 1460 synchronized (filesCompacting) { 1461 // First, see if coprocessor would want to override selection. 1462 if (this.getCoprocessorHost() != null) { 1463 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1464 boolean override = 1465 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1466 if (override) { 1467 // Coprocessor is overriding normal file selection. 1468 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1469 } 1470 } 1471 1472 // Normal case - coprocessor is not overriding file selection. 1473 if (!compaction.hasSelection()) { 1474 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1475 boolean mayUseOffPeak = 1476 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1477 try { 1478 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1479 forceMajor && filesCompacting.isEmpty()); 1480 } catch (IOException e) { 1481 if (mayUseOffPeak) { 1482 offPeakCompactionTracker.set(false); 1483 } 1484 throw e; 1485 } 1486 assert compaction.hasSelection(); 1487 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1488 // Compaction policy doesn't want to take advantage of off-peak. 1489 offPeakCompactionTracker.set(false); 1490 } 1491 } 1492 if (this.getCoprocessorHost() != null) { 1493 this.getCoprocessorHost().postCompactSelection(this, 1494 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1495 compaction.getRequest(), user); 1496 } 1497 // Finally, we have the resulting files list. Check if we have any files at all. 1498 request = compaction.getRequest(); 1499 Collection<HStoreFile> selectedFiles = request.getFiles(); 1500 if (selectedFiles.isEmpty()) { 1501 return Optional.empty(); 1502 } 1503 1504 addToCompactingFiles(selectedFiles); 1505 1506 // If we're enqueuing a major, clear the force flag. 1507 this.forceMajor = this.forceMajor && !request.isMajor(); 1508 1509 // Set common request properties. 1510 // Set priority, either override value supplied by caller or from store. 1511 final int compactionPriority = 1512 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1513 request.setPriority(compactionPriority); 1514 1515 if (request.isAfterSplit()) { 1516 // If the store belongs to recently splitted daughter regions, better we consider 1517 // them with the higher priority in the compaction queue. 1518 // Override priority if it is lower (higher int value) than 1519 // SPLIT_REGION_COMPACTION_PRIORITY 1520 final int splitHousekeepingPriority = 1521 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1522 request.setPriority(splitHousekeepingPriority); 1523 LOG.info( 1524 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1525 + " belongs to recently split daughter region {}", 1526 splitHousekeepingPriority, this.getColumnFamilyName(), 1527 getRegionInfo().getRegionNameAsString()); 1528 } 1529 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1530 request.setTracker(tracker); 1531 } 1532 } finally { 1533 this.storeEngine.readUnlock(); 1534 } 1535 1536 if (LOG.isDebugEnabled()) { 1537 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1538 + (request.isAllFiles() ? " (all files)" : "")); 1539 } 1540 this.region.reportCompactionRequestStart(request.isMajor()); 1541 return Optional.of(compaction); 1542 } 1543 1544 /** Adds the files to compacting files. filesCompacting must be locked. */ 1545 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1546 if (CollectionUtils.isEmpty(filesToAdd)) { 1547 return; 1548 } 1549 // Check that we do not try to compact the same StoreFile twice. 1550 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1551 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1552 } 1553 filesCompacting.addAll(filesToAdd); 1554 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1555 } 1556 1557 private void removeUnneededFiles() throws IOException { 1558 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1559 return; 1560 } 1561 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1562 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1563 getColumnFamilyDescriptor().getMinVersions()); 1564 return; 1565 } 1566 this.storeEngine.readLock(); 1567 Collection<HStoreFile> delSfs = null; 1568 try { 1569 synchronized (filesCompacting) { 1570 long cfTtl = getStoreFileTtl(); 1571 if (cfTtl != Long.MAX_VALUE) { 1572 delSfs = storeEngine.getStoreFileManager() 1573 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1574 addToCompactingFiles(delSfs); 1575 } 1576 } 1577 } finally { 1578 this.storeEngine.readUnlock(); 1579 } 1580 1581 if (CollectionUtils.isEmpty(delSfs)) { 1582 return; 1583 } 1584 1585 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1586 replaceStoreFiles(delSfs, newFiles, true); 1587 refreshStoreSizeAndTotalBytes(); 1588 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1589 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1590 } 1591 1592 public void cancelRequestedCompaction(CompactionContext compaction) { 1593 finishCompactionRequest(compaction.getRequest()); 1594 } 1595 1596 private void finishCompactionRequest(CompactionRequestImpl cr) { 1597 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1598 if (cr.isOffPeak()) { 1599 offPeakCompactionTracker.set(false); 1600 cr.setOffPeak(false); 1601 } 1602 synchronized (filesCompacting) { 1603 filesCompacting.removeAll(cr.getFiles()); 1604 } 1605 // The tracker could be null, for example, we do not need to track the creation of store file 1606 // writer due to different implementation of SFT, or the compaction is canceled. 1607 if (cr.getWriterCreationTracker() != null) { 1608 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1609 } 1610 } 1611 1612 /** 1613 * Update counts. 1614 */ 1615 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1616 this.storeSize.set(0L); 1617 this.totalUncompressedBytes.set(0L); 1618 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1619 StoreFileReader r = hsf.getReader(); 1620 if (r == null) { 1621 LOG.debug("StoreFile {} has a null Reader", hsf); 1622 continue; 1623 } 1624 this.storeSize.addAndGet(r.length()); 1625 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1626 } 1627 } 1628 1629 /* 1630 * @param wantedVersions How many versions were asked for. 1631 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1632 */ 1633 int versionsToReturn(final int wantedVersions) { 1634 if (wantedVersions <= 0) { 1635 throw new IllegalArgumentException("Number of versions must be > 0"); 1636 } 1637 // Make sure we do not return more than maximum versions for this store. 1638 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1639 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1640 } 1641 1642 @Override 1643 public boolean canSplit() { 1644 // Not split-able if we find a reference store file present in the store. 1645 boolean result = !hasReferences(); 1646 if (!result) { 1647 LOG.trace("Not splittable; has references: {}", this); 1648 } 1649 return result; 1650 } 1651 1652 /** 1653 * Determines if Store should be split. 1654 */ 1655 public Optional<byte[]> getSplitPoint() { 1656 this.storeEngine.readLock(); 1657 try { 1658 // Should already be enforced by the split policy! 1659 assert !this.getRegionInfo().isMetaRegion(); 1660 // Not split-able if we find a reference store file present in the store. 1661 if (hasReferences()) { 1662 LOG.trace("Not splittable; has references: {}", this); 1663 return Optional.empty(); 1664 } 1665 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1666 } catch (IOException e) { 1667 LOG.warn("Failed getting store size for {}", this, e); 1668 } finally { 1669 this.storeEngine.readUnlock(); 1670 } 1671 return Optional.empty(); 1672 } 1673 1674 @Override 1675 public long getLastCompactSize() { 1676 return this.lastCompactSize; 1677 } 1678 1679 @Override 1680 public long getSize() { 1681 return storeSize.get(); 1682 } 1683 1684 public void triggerMajorCompaction() { 1685 this.forceMajor = true; 1686 } 1687 1688 ////////////////////////////////////////////////////////////////////////////// 1689 // File administration 1690 ////////////////////////////////////////////////////////////////////////////// 1691 1692 /** 1693 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1694 * compaction. 1695 * @param scan Scan to apply when scanning the stores 1696 * @param targetCols columns to scan 1697 * @return a scanner over the current key values 1698 * @throws IOException on failure 1699 */ 1700 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1701 throws IOException { 1702 storeEngine.readLock(); 1703 try { 1704 ScanInfo scanInfo; 1705 if (this.getCoprocessorHost() != null) { 1706 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1707 } else { 1708 scanInfo = getScanInfo(); 1709 } 1710 return createScanner(scan, scanInfo, targetCols, readPt); 1711 } finally { 1712 storeEngine.readUnlock(); 1713 } 1714 } 1715 1716 // HMobStore will override this method to return its own implementation. 1717 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1718 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1719 return scan.isReversed() 1720 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1721 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1722 } 1723 1724 /** 1725 * Recreates the scanners on the current list of active store file scanners 1726 * @param currentFileScanners the current set of active store file scanners 1727 * @param cacheBlocks cache the blocks or not 1728 * @param usePread use pread or not 1729 * @param isCompaction is the scanner for compaction 1730 * @param matcher the scan query matcher 1731 * @param startRow the scan's start row 1732 * @param includeStartRow should the scan include the start row 1733 * @param stopRow the scan's stop row 1734 * @param includeStopRow should the scan include the stop row 1735 * @param readPt the read point of the current scane 1736 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1737 * @return list of scanners recreated on the current Scanners 1738 */ 1739 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1740 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1741 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1742 boolean includeMemstoreScanner) throws IOException { 1743 this.storeEngine.readLock(); 1744 try { 1745 Map<String, HStoreFile> name2File = 1746 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1747 for (HStoreFile file : getStorefiles()) { 1748 name2File.put(file.getFileInfo().getActiveFileName(), file); 1749 } 1750 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1751 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1752 name2File.put(file.getFileInfo().getActiveFileName(), file); 1753 } 1754 List<HStoreFile> filesToReopen = new ArrayList<>(); 1755 for (KeyValueScanner kvs : currentFileScanners) { 1756 assert kvs.isFileScanner(); 1757 if (kvs.peek() == null) { 1758 continue; 1759 } 1760 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1761 } 1762 if (filesToReopen.isEmpty()) { 1763 return null; 1764 } 1765 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1766 includeStartRow, stopRow, includeStopRow, readPt, false, false); 1767 } finally { 1768 this.storeEngine.readUnlock(); 1769 } 1770 } 1771 1772 @Override 1773 public String toString() { 1774 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1775 } 1776 1777 @Override 1778 public int getStorefilesCount() { 1779 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1780 } 1781 1782 @Override 1783 public int getCompactedFilesCount() { 1784 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1785 } 1786 1787 private LongStream getStoreFileAgeStream() { 1788 return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { 1789 if (sf.getReader() == null) { 1790 LOG.debug("StoreFile {} has a null Reader", sf); 1791 return false; 1792 } else { 1793 return true; 1794 } 1795 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1796 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1797 } 1798 1799 @Override 1800 public OptionalLong getMaxStoreFileAge() { 1801 return getStoreFileAgeStream().max(); 1802 } 1803 1804 @Override 1805 public OptionalLong getMinStoreFileAge() { 1806 return getStoreFileAgeStream().min(); 1807 } 1808 1809 @Override 1810 public OptionalDouble getAvgStoreFileAge() { 1811 return getStoreFileAgeStream().average(); 1812 } 1813 1814 @Override 1815 public long getNumReferenceFiles() { 1816 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1817 .filter(HStoreFile::isReference).count(); 1818 } 1819 1820 @Override 1821 public long getNumHFiles() { 1822 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1823 .filter(HStoreFile::isHFile).count(); 1824 } 1825 1826 @Override 1827 public long getStoreSizeUncompressed() { 1828 return this.totalUncompressedBytes.get(); 1829 } 1830 1831 @Override 1832 public long getStorefilesSize() { 1833 // Include all StoreFiles 1834 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1835 sf -> true); 1836 } 1837 1838 @Override 1839 public long getHFilesSize() { 1840 // Include only StoreFiles which are HFiles 1841 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1842 HStoreFile::isHFile); 1843 } 1844 1845 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1846 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1847 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1848 } 1849 1850 @Override 1851 public long getStorefilesRootLevelIndexSize() { 1852 return getStorefilesFieldSize(StoreFileReader::indexSize); 1853 } 1854 1855 @Override 1856 public long getTotalStaticIndexSize() { 1857 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1858 } 1859 1860 @Override 1861 public long getTotalStaticBloomSize() { 1862 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1863 } 1864 1865 @Override 1866 public MemStoreSize getMemStoreSize() { 1867 return this.memstore.size(); 1868 } 1869 1870 @Override 1871 public int getCompactPriority() { 1872 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1873 if (priority == PRIORITY_USER) { 1874 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1875 } 1876 return priority; 1877 } 1878 1879 public boolean throttleCompaction(long compactionSize) { 1880 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1881 } 1882 1883 public HRegion getHRegion() { 1884 return this.region; 1885 } 1886 1887 public RegionCoprocessorHost getCoprocessorHost() { 1888 return this.region.getCoprocessorHost(); 1889 } 1890 1891 @Override 1892 public RegionInfo getRegionInfo() { 1893 return getRegionFileSystem().getRegionInfo(); 1894 } 1895 1896 @Override 1897 public boolean areWritesEnabled() { 1898 return this.region.areWritesEnabled(); 1899 } 1900 1901 @Override 1902 public long getSmallestReadPoint() { 1903 return this.region.getSmallestReadPoint(); 1904 } 1905 1906 /** 1907 * Adds or replaces the specified KeyValues. 1908 * <p> 1909 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1910 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1911 * <p> 1912 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1913 * across all of them. 1914 * @param readpoint readpoint below which we can safely remove duplicate KVs 1915 */ 1916 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1917 this.storeEngine.readLock(); 1918 try { 1919 this.memstore.upsert(cells, readpoint, memstoreSizing); 1920 } finally { 1921 this.storeEngine.readUnlock(); 1922 } 1923 } 1924 1925 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1926 return new StoreFlusherImpl(cacheFlushId, tracker); 1927 } 1928 1929 private final class StoreFlusherImpl implements StoreFlushContext { 1930 1931 private final FlushLifeCycleTracker tracker; 1932 private final StoreFileWriterCreationTracker writerCreationTracker; 1933 private final long cacheFlushSeqNum; 1934 private MemStoreSnapshot snapshot; 1935 private List<Path> tempFiles; 1936 private List<Path> committedFiles; 1937 private long cacheFlushCount; 1938 private long cacheFlushSize; 1939 private long outputFileSize; 1940 1941 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1942 this.cacheFlushSeqNum = cacheFlushSeqNum; 1943 this.tracker = tracker; 1944 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1945 } 1946 1947 /** 1948 * This is not thread safe. The caller should have a lock on the region or the store. If 1949 * necessary, the lock can be added with the patch provided in HBASE-10087 1950 */ 1951 @Override 1952 public MemStoreSize prepare() { 1953 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1954 this.snapshot = memstore.snapshot(); 1955 this.cacheFlushCount = snapshot.getCellsCount(); 1956 this.cacheFlushSize = snapshot.getDataSize(); 1957 committedFiles = new ArrayList<>(1); 1958 return snapshot.getMemStoreSize(); 1959 } 1960 1961 @Override 1962 public void flushCache(MonitoredTask status) throws IOException { 1963 RegionServerServices rsService = region.getRegionServerServices(); 1964 ThroughputController throughputController = 1965 rsService == null ? null : rsService.getFlushThroughputController(); 1966 // it could be null if we do not need to track the creation of store file writer due to 1967 // different SFT implementation. 1968 if (writerCreationTracker != null) { 1969 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1970 } 1971 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1972 tracker, writerCreationTracker); 1973 } 1974 1975 @Override 1976 public boolean commit(MonitoredTask status) throws IOException { 1977 try { 1978 if (CollectionUtils.isEmpty(this.tempFiles)) { 1979 return false; 1980 } 1981 status.setStatus("Flushing " + this + ": reopening flushed file"); 1982 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false); 1983 for (HStoreFile sf : storeFiles) { 1984 StoreFileReader r = sf.getReader(); 1985 if (LOG.isInfoEnabled()) { 1986 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 1987 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1988 } 1989 outputFileSize += r.length(); 1990 storeSize.addAndGet(r.length()); 1991 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1992 committedFiles.add(sf.getPath()); 1993 } 1994 1995 flushedCellsCount.addAndGet(cacheFlushCount); 1996 flushedCellsSize.addAndGet(cacheFlushSize); 1997 flushedOutputFileSize.addAndGet(outputFileSize); 1998 // call coprocessor after we have done all the accounting above 1999 for (HStoreFile sf : storeFiles) { 2000 if (getCoprocessorHost() != null) { 2001 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2002 } 2003 } 2004 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2005 return completeFlush(storeFiles, snapshot.getId()); 2006 } finally { 2007 if (writerCreationTracker != null) { 2008 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2009 } 2010 } 2011 } 2012 2013 @Override 2014 public long getOutputFileSize() { 2015 return outputFileSize; 2016 } 2017 2018 @Override 2019 public List<Path> getCommittedFiles() { 2020 return committedFiles; 2021 } 2022 2023 /** 2024 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2025 * primary region. Adds the new files to the store, and drops the snapshot depending on 2026 * dropMemstoreSnapshot argument. 2027 * @param fileNames names of the flushed files 2028 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2029 */ 2030 @Override 2031 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2032 throws IOException { 2033 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2034 for (String file : fileNames) { 2035 // open the file as a store file (hfile link, etc) 2036 StoreFileInfo storeFileInfo = 2037 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); 2038 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2039 storeFiles.add(storeFile); 2040 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2041 HStore.this.totalUncompressedBytes 2042 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2043 if (LOG.isInfoEnabled()) { 2044 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2045 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2046 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2047 } 2048 } 2049 2050 long snapshotId = -1; // -1 means do not drop 2051 if (dropMemstoreSnapshot && snapshot != null) { 2052 snapshotId = snapshot.getId(); 2053 } 2054 HStore.this.completeFlush(storeFiles, snapshotId); 2055 } 2056 2057 /** 2058 * Abort the snapshot preparation. Drops the snapshot if any. 2059 */ 2060 @Override 2061 public void abort() throws IOException { 2062 if (snapshot != null) { 2063 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2064 } 2065 } 2066 } 2067 2068 @Override 2069 public boolean needsCompaction() { 2070 List<HStoreFile> filesCompactingClone = null; 2071 synchronized (filesCompacting) { 2072 filesCompactingClone = Lists.newArrayList(filesCompacting); 2073 } 2074 return this.storeEngine.needsCompaction(filesCompactingClone); 2075 } 2076 2077 /** 2078 * Used for tests. 2079 * @return cache configuration for this Store. 2080 */ 2081 public CacheConfig getCacheConfig() { 2082 return storeContext.getCacheConf(); 2083 } 2084 2085 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2086 2087 public static final long DEEP_OVERHEAD = ClassSize.align( 2088 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2089 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2090 2091 @Override 2092 public long heapSize() { 2093 MemStoreSize memstoreSize = this.memstore.size(); 2094 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2095 } 2096 2097 @Override 2098 public CellComparator getComparator() { 2099 return storeContext.getComparator(); 2100 } 2101 2102 public ScanInfo getScanInfo() { 2103 return scanInfo; 2104 } 2105 2106 /** 2107 * Set scan info, used by test 2108 * @param scanInfo new scan info to use for test 2109 */ 2110 void setScanInfo(ScanInfo scanInfo) { 2111 this.scanInfo = scanInfo; 2112 } 2113 2114 @Override 2115 public boolean hasTooManyStoreFiles() { 2116 return getStorefilesCount() > this.blockingFileCount; 2117 } 2118 2119 @Override 2120 public long getFlushedCellsCount() { 2121 return flushedCellsCount.get(); 2122 } 2123 2124 @Override 2125 public long getFlushedCellsSize() { 2126 return flushedCellsSize.get(); 2127 } 2128 2129 @Override 2130 public long getFlushedOutputFileSize() { 2131 return flushedOutputFileSize.get(); 2132 } 2133 2134 @Override 2135 public long getCompactedCellsCount() { 2136 return compactedCellsCount.get(); 2137 } 2138 2139 @Override 2140 public long getCompactedCellsSize() { 2141 return compactedCellsSize.get(); 2142 } 2143 2144 @Override 2145 public long getMajorCompactedCellsCount() { 2146 return majorCompactedCellsCount.get(); 2147 } 2148 2149 @Override 2150 public long getMajorCompactedCellsSize() { 2151 return majorCompactedCellsSize.get(); 2152 } 2153 2154 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2155 if (isMajor) { 2156 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2157 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2158 } else { 2159 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2160 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2161 } 2162 } 2163 2164 /** 2165 * Returns the StoreEngine that is backing this concrete implementation of Store. 2166 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2167 */ 2168 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2169 return this.storeEngine; 2170 } 2171 2172 protected OffPeakHours getOffPeakHours() { 2173 return this.offPeakHours; 2174 } 2175 2176 @Override 2177 public void onConfigurationChange(Configuration conf) { 2178 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2179 getColumnFamilyDescriptor()); 2180 this.conf = storeConf; 2181 this.storeEngine.compactionPolicy.setConf(storeConf); 2182 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2183 } 2184 2185 /** 2186 * {@inheritDoc} 2187 */ 2188 @Override 2189 public void registerChildren(ConfigurationManager manager) { 2190 CacheConfig cacheConfig = this.storeContext.getCacheConf(); 2191 if (cacheConfig != null) { 2192 manager.registerObserver(cacheConfig); 2193 } 2194 } 2195 2196 /** 2197 * {@inheritDoc} 2198 */ 2199 @Override 2200 public void deregisterChildren(ConfigurationManager manager) { 2201 // No children to deregister 2202 } 2203 2204 @Override 2205 public double getCompactionPressure() { 2206 return storeEngine.getStoreFileManager().getCompactionPressure(); 2207 } 2208 2209 @Override 2210 public boolean isPrimaryReplicaStore() { 2211 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2212 } 2213 2214 /** 2215 * Sets the store up for a region level snapshot operation. 2216 * @see #postSnapshotOperation() 2217 */ 2218 public void preSnapshotOperation() { 2219 archiveLock.lock(); 2220 } 2221 2222 /** 2223 * Perform tasks needed after the completion of snapshot operation. 2224 * @see #preSnapshotOperation() 2225 */ 2226 public void postSnapshotOperation() { 2227 archiveLock.unlock(); 2228 } 2229 2230 /** 2231 * Closes and archives the compacted files under this store 2232 */ 2233 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2234 // ensure other threads do not attempt to archive the same files on close() 2235 archiveLock.lock(); 2236 try { 2237 storeEngine.readLock(); 2238 Collection<HStoreFile> copyCompactedfiles = null; 2239 try { 2240 Collection<HStoreFile> compactedfiles = 2241 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2242 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2243 // Do a copy under read lock 2244 copyCompactedfiles = new ArrayList<>(compactedfiles); 2245 } else { 2246 LOG.trace("No compacted files to archive"); 2247 } 2248 } finally { 2249 storeEngine.readUnlock(); 2250 } 2251 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2252 removeCompactedfiles(copyCompactedfiles, true); 2253 } 2254 } finally { 2255 archiveLock.unlock(); 2256 } 2257 } 2258 2259 /** 2260 * Archives and removes the compacted files 2261 * @param compactedfiles The compacted files in this store that are not active in reads 2262 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2263 * closed, false if not 2264 */ 2265 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2266 throws IOException { 2267 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2268 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2269 for (final HStoreFile file : compactedfiles) { 2270 synchronized (file) { 2271 try { 2272 StoreFileReader r = file.getReader(); 2273 if (r == null) { 2274 LOG.debug("The file {} was closed but still not archived", file); 2275 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2276 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2277 // for that. 2278 long length = getStoreFileSize(file); 2279 filesToRemove.add(file); 2280 storeFileSizes.add(length); 2281 continue; 2282 } 2283 2284 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2285 // Even if deleting fails we need not bother as any new scanners won't be 2286 // able to use the compacted file as the status is already compactedAway 2287 LOG.trace("Closing and archiving the file {}", file); 2288 // Copy the file size before closing the reader 2289 final long length = r.length(); 2290 r.close(evictOnClose); 2291 // Just close and return 2292 filesToRemove.add(file); 2293 // Only add the length if we successfully added the file to `filesToRemove` 2294 storeFileSizes.add(length); 2295 } else { 2296 LOG.info("Can't archive compacted file " + file.getPath() 2297 + " because of either isCompactedAway=" + file.isCompactedAway() 2298 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2299 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2300 } 2301 } catch (Exception e) { 2302 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2303 e); 2304 } 2305 } 2306 } 2307 if (this.isPrimaryReplicaStore()) { 2308 // Only the primary region is allowed to move the file to archive. 2309 // The secondary region does not move the files to archive. Any active reads from 2310 // the secondary region will still work because the file as such has active readers on it. 2311 if (!filesToRemove.isEmpty()) { 2312 LOG.debug("Moving the files {} to archive", filesToRemove); 2313 // Only if this is successful it has to be removed 2314 try { 2315 getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2316 filesToRemove); 2317 } catch (FailedArchiveException fae) { 2318 // Even if archiving some files failed, we still need to clear out any of the 2319 // files which were successfully archived. Otherwise we will receive a 2320 // FileNotFoundException when we attempt to re-archive them in the next go around. 2321 Collection<Path> failedFiles = fae.getFailedFiles(); 2322 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2323 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2324 while (iter.hasNext()) { 2325 sizeIter.next(); 2326 if (failedFiles.contains(iter.next().getPath())) { 2327 iter.remove(); 2328 sizeIter.remove(); 2329 } 2330 } 2331 if (!filesToRemove.isEmpty()) { 2332 clearCompactedfiles(filesToRemove); 2333 } 2334 throw fae; 2335 } 2336 } 2337 } 2338 if (!filesToRemove.isEmpty()) { 2339 // Clear the compactedfiles from the store file manager 2340 clearCompactedfiles(filesToRemove); 2341 // Try to send report of this archival to the Master for updating quota usage faster 2342 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2343 } 2344 } 2345 2346 /** 2347 * Computes the length of a store file without succumbing to any errors along the way. If an error 2348 * is encountered, the implementation returns {@code 0} instead of the actual size. 2349 * @param file The file to compute the size of. 2350 * @return The size in bytes of the provided {@code file}. 2351 */ 2352 long getStoreFileSize(HStoreFile file) { 2353 long length = 0; 2354 try { 2355 file.initReader(); 2356 length = file.getReader().length(); 2357 } catch (IOException e) { 2358 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2359 file, e); 2360 } finally { 2361 try { 2362 file.closeStoreFile( 2363 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2364 } catch (IOException e) { 2365 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2366 e); 2367 } 2368 } 2369 return length; 2370 } 2371 2372 public Long preFlushSeqIDEstimation() { 2373 return memstore.preFlushSeqIDEstimation(); 2374 } 2375 2376 @Override 2377 public boolean isSloppyMemStore() { 2378 return this.memstore.isSloppy(); 2379 } 2380 2381 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2382 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2383 storeEngine.removeCompactedFiles(filesToRemove); 2384 } 2385 2386 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2387 // Sanity check from the caller 2388 if (archivedFiles.size() != fileSizes.size()) { 2389 throw new RuntimeException("Coding error: should never see lists of varying size"); 2390 } 2391 RegionServerServices rss = this.region.getRegionServerServices(); 2392 if (rss == null) { 2393 return; 2394 } 2395 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2396 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2397 for (StoreFile storeFile : archivedFiles) { 2398 final long fileSize = fileSizeIter.next(); 2399 if (storeFile.isHFile() && fileSize != 0) { 2400 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2401 } 2402 } 2403 if (LOG.isTraceEnabled()) { 2404 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2405 + filesWithSizes); 2406 } 2407 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2408 if (!success) { 2409 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2410 } 2411 } 2412 2413 @Override 2414 public int getCurrentParallelPutCount() { 2415 return currentParallelPutCount.get(); 2416 } 2417 2418 public int getStoreRefCount() { 2419 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 2420 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2421 .mapToInt(HStoreFile::getRefCount).sum(); 2422 } 2423 2424 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2425 public int getMaxCompactedStoreFileRefCount() { 2426 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2427 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2428 .mapToInt(HStoreFile::getRefCount).max(); 2429 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2430 } 2431 2432 @Override 2433 public long getMemstoreOnlyRowReadsCount() { 2434 return memstoreOnlyRowReadsCount.sum(); 2435 } 2436 2437 @Override 2438 public long getMixedRowReadsCount() { 2439 return mixedRowReadsCount.sum(); 2440 } 2441 2442 @Override 2443 public Configuration getReadOnlyConfiguration() { 2444 return new ReadOnlyConfiguration(this.conf); 2445 } 2446 2447 void updateMetricsStore(boolean memstoreRead) { 2448 if (memstoreRead) { 2449 memstoreOnlyRowReadsCount.increment(); 2450 } else { 2451 mixedRowReadsCount.increment(); 2452 } 2453 } 2454 2455 /** 2456 * Return the storefiles which are currently being written to. Mainly used by 2457 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2458 * SFT yet. 2459 */ 2460 public Set<Path> getStoreFilesBeingWritten() { 2461 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2462 .collect(Collectors.toSet()); 2463 } 2464 2465 @Override 2466 public long getBloomFilterRequestsCount() { 2467 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2468 } 2469 2470 @Override 2471 public long getBloomFilterNegativeResultsCount() { 2472 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2473 } 2474 2475 @Override 2476 public long getBloomFilterEligibleRequestsCount() { 2477 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2478 } 2479}