001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.ExtendedCell; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.InnerStoreCellComparator; 066import org.apache.hadoop.hbase.MemoryCompactionPolicy; 067import org.apache.hadoop.hbase.MetaCellComparator; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.backup.FailedArchiveException; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.conf.ConfigurationManager; 074import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 075import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 076import org.apache.hadoop.hbase.io.HeapSize; 077import org.apache.hadoop.hbase.io.hfile.CacheConfig; 078import org.apache.hadoop.hbase.io.hfile.HFile; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 083import org.apache.hadoop.hbase.monitoring.MonitoredTask; 084import org.apache.hadoop.hbase.quotas.RegionSizeStore; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 089import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 090import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 091import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 092import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 093import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 094import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 095import org.apache.hadoop.hbase.security.EncryptionUtil; 096import org.apache.hadoop.hbase.security.User; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.ClassSize; 099import org.apache.hadoop.hbase.util.CommonFSUtils; 100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.ReflectionUtils; 103import org.apache.hadoop.util.StringUtils; 104import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 105import org.apache.yetus.audience.InterfaceAudience; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 110import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 112import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 113import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 116 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 119 120/** 121 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 122 * which stretch backwards over time. 123 * <p> 124 * There's no reason to consider append-logging at this level; all logging and locking is handled at 125 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 126 * important of those services is compaction services where files are aggregated once they pass a 127 * configurable threshold. 128 * <p> 129 * Locking and transactions are handled at a higher level. This API should not be called directly 130 * but by an HRegion manager. 131 */ 132@InterfaceAudience.Private 133public class HStore 134 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 135 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 136 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 137 "hbase.server.compactchecker.interval.multiplier"; 138 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 139 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 140 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 141 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 142 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 143 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 144 145 // HBASE-24428 : Update compaction priority for recently split daughter regions 146 // so as to prioritize their compaction. 147 // Any compaction candidate with higher priority than compaction of newly split daugher regions 148 // should have priority value < (Integer.MIN_VALUE + 1000) 149 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 150 151 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 152 153 protected final MemStore memstore; 154 // This stores directory in the filesystem. 155 private final HRegion region; 156 protected Configuration conf; 157 private long lastCompactSize = 0; 158 volatile boolean forceMajor = false; 159 private AtomicLong storeSize = new AtomicLong(); 160 private AtomicLong totalUncompressedBytes = new AtomicLong(); 161 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 162 // rows that has cells from both memstore and files (or only files) 163 private LongAdder mixedRowReadsCount = new LongAdder(); 164 165 /** 166 * Lock specific to archiving compacted store files. This avoids races around the combination of 167 * retrieving the list of compacted files and moving them to the archive directory. Since this is 168 * usually a background process (other than on close), we don't want to handle this with the store 169 * write lock, which would block readers and degrade performance. Locked by: - 170 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 171 */ 172 final ReentrantLock archiveLock = new ReentrantLock(); 173 174 private final boolean verifyBulkLoads; 175 176 /** 177 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 178 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 179 * message that identifies the Store experience this high-level of concurrency. 180 */ 181 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 182 private final int parallelPutCountPrintThreshold; 183 184 private ScanInfo scanInfo; 185 186 // All access must be synchronized. 187 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 188 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 189 190 // All access must be synchronized. 191 private final Set<ChangedReadersObserver> changedReaderObservers = 192 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 193 194 private HFileDataBlockEncoder dataBlockEncoder; 195 196 final StoreEngine<?, ?, ?, ?> storeEngine; 197 198 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 199 private volatile OffPeakHours offPeakHours; 200 201 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 202 private int flushRetriesNumber; 203 private int pauseTime; 204 205 private long blockingFileCount; 206 private int compactionCheckMultiplier; 207 208 private AtomicLong flushedCellsCount = new AtomicLong(); 209 private AtomicLong compactedCellsCount = new AtomicLong(); 210 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 211 private AtomicLong flushedCellsSize = new AtomicLong(); 212 private AtomicLong flushedOutputFileSize = new AtomicLong(); 213 private AtomicLong compactedCellsSize = new AtomicLong(); 214 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 215 216 private final StoreContext storeContext; 217 218 // Used to track the store files which are currently being written. For compaction, if we want to 219 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 220 // track the store files being written when flushing. 221 // Notice that the creation is in the background compaction or flush thread and we will get the 222 // files in other thread, so it needs to be thread safe. 223 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 224 225 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 226 227 @Override 228 public void accept(Path t) { 229 files.add(t); 230 } 231 232 public Set<Path> get() { 233 return Collections.unmodifiableSet(files); 234 } 235 } 236 237 // We may have multiple compaction running at the same time, and flush can also happen at the same 238 // time, so here we need to use a collection, and the collection needs to be thread safe. 239 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 240 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 241 // IdentityHashMap if later we want to implement hashCode or equals. 242 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 243 Collections.newSetFromMap(new ConcurrentHashMap<>()); 244 245 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 246 // the broken store files under the data directory, which means we do not need to track the store 247 // file writer creation. So here we abstract a factory to return different trackers for different 248 // SFT implementations. 249 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 250 251 private final boolean warmup; 252 253 /** 254 * Constructor 255 * @param family HColumnDescriptor for this column 256 * @param confParam configuration object failed. Can be null. 257 */ 258 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 259 final Configuration confParam, boolean warmup) throws IOException { 260 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 261 262 this.region = region; 263 this.storeContext = initializeStoreContext(family); 264 265 // Assemble the store's home directory and Ensure it exists. 266 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 267 268 // set block storage policy for store directory 269 String policyName = family.getStoragePolicy(); 270 if (null == policyName) { 271 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 272 } 273 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 274 275 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 276 277 // used by ScanQueryMatcher 278 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 279 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 280 // Get TTL 281 long ttl = determineTTLFromFamily(family); 282 // Why not just pass a HColumnDescriptor in here altogether? Even if have 283 // to clone it? 284 scanInfo = 285 new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator()); 286 this.memstore = getMemstore(); 287 288 this.offPeakHours = OffPeakHours.getInstance(conf); 289 290 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 291 292 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 293 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 294 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 295 if (this.compactionCheckMultiplier <= 0) { 296 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 297 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 298 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 299 } 300 301 this.warmup = warmup; 302 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 303 storeEngine.initialize(warmup); 304 // if require writing to tmp dir first, then we just return null, which indicate that we do not 305 // need to track the creation of store file writer, otherwise we return a new 306 // StoreFileWriterCreationTracker. 307 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 308 ? () -> null 309 : () -> new StoreFileWriterCreationTracker(); 310 refreshStoreSizeAndTotalBytes(); 311 312 flushRetriesNumber = 313 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 314 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 315 if (flushRetriesNumber <= 0) { 316 throw new IllegalArgumentException( 317 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 318 } 319 320 int confPrintThreshold = 321 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 322 if (confPrintThreshold < 10) { 323 confPrintThreshold = 10; 324 } 325 this.parallelPutCountPrintThreshold = confPrintThreshold; 326 327 LOG.info( 328 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 329 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 330 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 331 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 332 } 333 334 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 335 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 336 .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family)) 337 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 338 .withCellComparator(region.getTableDescriptor().isMetaTable() || conf 339 .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR) 340 ? MetaCellComparator.META_COMPARATOR 341 : InnerStoreCellComparator.INNER_STORE_COMPARATOR) 342 .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles) 343 .withRegionFileSystem(region.getRegionFileSystem()) 344 .withFavoredNodesSupplier(this::getFavoredNodes) 345 .withFamilyStoreDirectoryPath( 346 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 347 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 348 } 349 350 private InetSocketAddress[] getFavoredNodes() { 351 InetSocketAddress[] favoredNodes = null; 352 if (region.getRegionServerServices() != null) { 353 favoredNodes = region.getRegionServerServices() 354 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 355 } 356 return favoredNodes; 357 } 358 359 /** Returns MemStore Instance to use in this store. */ 360 private MemStore getMemstore() { 361 MemStore ms = null; 362 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 363 MemoryCompactionPolicy inMemoryCompaction = null; 364 if (this.getTableName().isSystemTable()) { 365 inMemoryCompaction = MemoryCompactionPolicy 366 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 367 } else { 368 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 369 } 370 if (inMemoryCompaction == null) { 371 inMemoryCompaction = 372 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 373 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 374 } 375 376 switch (inMemoryCompaction) { 377 case NONE: 378 Class<? extends MemStore> memStoreClass = 379 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 380 ms = ReflectionUtils.newInstance(memStoreClass, 381 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 382 break; 383 default: 384 Class<? extends CompactingMemStore> compactingMemStoreClass = 385 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 386 ms = 387 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 388 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 389 } 390 return ms; 391 } 392 393 /** 394 * Creates the cache config. 395 * @param family The current column family. 396 */ 397 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 398 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 399 region.getRegionServicesForStores().getByteBuffAllocator()); 400 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 401 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 402 return cacheConf; 403 } 404 405 /** 406 * Creates the store engine configured for the given Store. 407 * @param store The store. An unfortunate dependency needed due to it being passed to 408 * coprocessors via the compactor. 409 * @param conf Store configuration. 410 * @param kvComparator KVComparator for storeFileManager. 411 * @return StoreEngine to use. 412 */ 413 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 414 CellComparator kvComparator) throws IOException { 415 return StoreEngine.create(store, conf, kvComparator); 416 } 417 418 /** Returns TTL in seconds of the specified family */ 419 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 420 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 421 long ttl = family.getTimeToLive(); 422 if (ttl == HConstants.FOREVER) { 423 // Default is unlimited ttl. 424 ttl = Long.MAX_VALUE; 425 } else if (ttl == -1) { 426 ttl = Long.MAX_VALUE; 427 } else { 428 // Second -> ms adjust for user data 429 ttl *= 1000; 430 } 431 return ttl; 432 } 433 434 public StoreContext getStoreContext() { 435 return storeContext; 436 } 437 438 @Override 439 public String getColumnFamilyName() { 440 return this.storeContext.getFamily().getNameAsString(); 441 } 442 443 @Override 444 public TableName getTableName() { 445 return this.getRegionInfo().getTable(); 446 } 447 448 @Override 449 public FileSystem getFileSystem() { 450 return storeContext.getRegionFileSystem().getFileSystem(); 451 } 452 453 public HRegionFileSystem getRegionFileSystem() { 454 return storeContext.getRegionFileSystem(); 455 } 456 457 /* Implementation of StoreConfigInformation */ 458 @Override 459 public long getStoreFileTtl() { 460 // TTL only applies if there's no MIN_VERSIONs setting on the column. 461 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 462 } 463 464 @Override 465 public long getMemStoreFlushSize() { 466 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 467 return this.region.memstoreFlushSize; 468 } 469 470 @Override 471 public MemStoreSize getFlushableSize() { 472 return this.memstore.getFlushableSize(); 473 } 474 475 @Override 476 public MemStoreSize getSnapshotSize() { 477 return this.memstore.getSnapshotSize(); 478 } 479 480 @Override 481 public long getCompactionCheckMultiplier() { 482 return this.compactionCheckMultiplier; 483 } 484 485 @Override 486 public long getBlockingFileCount() { 487 return blockingFileCount; 488 } 489 /* End implementation of StoreConfigInformation */ 490 491 @Override 492 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 493 return this.storeContext.getFamily(); 494 } 495 496 @Override 497 public OptionalLong getMaxSequenceId() { 498 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 499 } 500 501 @Override 502 public OptionalLong getMaxMemStoreTS() { 503 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 504 } 505 506 /** Returns the data block encoder */ 507 public HFileDataBlockEncoder getDataBlockEncoder() { 508 return dataBlockEncoder; 509 } 510 511 /** 512 * Should be used only in tests. 513 * @param blockEncoder the block delta encoder to use 514 */ 515 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 516 this.dataBlockEncoder = blockEncoder; 517 } 518 519 private void postRefreshStoreFiles() throws IOException { 520 // Advance the memstore read point to be at least the new store files seqIds so that 521 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 522 // in-flight transactions might be made visible) 523 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 524 refreshStoreSizeAndTotalBytes(); 525 } 526 527 @Override 528 public void refreshStoreFiles() throws IOException { 529 storeEngine.refreshStoreFiles(); 530 postRefreshStoreFiles(); 531 } 532 533 /** 534 * Replaces the store files that the store has with the given files. Mainly used by secondary 535 * region replicas to keep up to date with the primary region files. 536 */ 537 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 538 storeEngine.refreshStoreFiles(newFiles); 539 postRefreshStoreFiles(); 540 } 541 542 /** 543 * This message intends to inform the MemStore that next coming updates are going to be part of 544 * the replaying edits from WAL 545 */ 546 public void startReplayingFromWAL() { 547 this.memstore.startReplayingFromWAL(); 548 } 549 550 /** 551 * This message intends to inform the MemStore that the replaying edits from WAL are done 552 */ 553 public void stopReplayingFromWAL() { 554 this.memstore.stopReplayingFromWAL(); 555 } 556 557 /** 558 * Adds a value to the memstore 559 */ 560 public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) { 561 storeEngine.readLock(); 562 try { 563 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 564 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 565 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 566 } 567 this.memstore.add(cell, memstoreSizing); 568 } finally { 569 storeEngine.readUnlock(); 570 currentParallelPutCount.decrementAndGet(); 571 } 572 } 573 574 /** 575 * Adds the specified value to the memstore 576 */ 577 public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) { 578 storeEngine.readLock(); 579 try { 580 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 581 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 582 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 583 } 584 memstore.add(cells, memstoreSizing); 585 } finally { 586 storeEngine.readUnlock(); 587 currentParallelPutCount.decrementAndGet(); 588 } 589 } 590 591 @Override 592 public long timeOfOldestEdit() { 593 return memstore.timeOfOldestEdit(); 594 } 595 596 /** Returns All store files. */ 597 @Override 598 public Collection<HStoreFile> getStorefiles() { 599 return this.storeEngine.getStoreFileManager().getStoreFiles(); 600 } 601 602 @Override 603 public Collection<HStoreFile> getCompactedFiles() { 604 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 605 } 606 607 /** 608 * This throws a WrongRegionException if the HFile does not fit in this region, or an 609 * InvalidHFileException if the HFile is not valid. 610 */ 611 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 612 HFile.Reader reader = null; 613 try { 614 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 615 FileSystem srcFs = srcPath.getFileSystem(conf); 616 srcFs.access(srcPath, FsAction.READ_WRITE); 617 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 618 619 Optional<byte[]> firstKey = reader.getFirstRowKey(); 620 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 621 Optional<ExtendedCell> lk = reader.getLastKey(); 622 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 623 byte[] lastKey = CellUtil.cloneRow(lk.get()); 624 625 if (LOG.isDebugEnabled()) { 626 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 627 + Bytes.toStringBinary(lastKey)); 628 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 629 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 630 } 631 632 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 633 throw new WrongRegionException("Bulk load file " + srcPath.toString() 634 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 635 } 636 637 if ( 638 reader.length() 639 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 640 ) { 641 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 642 + " bytes can be problematic as it may lead to oversplitting."); 643 } 644 645 if (verifyBulkLoads) { 646 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 647 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 648 Cell prevCell = null; 649 HFileScanner scanner = reader.getScanner(conf, false, false, false); 650 scanner.seekTo(); 651 do { 652 Cell cell = scanner.getCell(); 653 if (prevCell != null) { 654 if (getComparator().compareRows(prevCell, cell) > 0) { 655 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 656 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 657 + CellUtil.getCellKeyAsString(cell)); 658 } 659 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 660 throw new InvalidHFileException("Previous key had different" 661 + " family compared to current key: path=" + srcPath + " previous=" 662 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 663 prevCell.getFamilyLength()) 664 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 665 cell.getFamilyLength())); 666 } 667 } 668 prevCell = cell; 669 } while (scanner.next()); 670 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 671 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 672 } 673 } finally { 674 if (reader != null) { 675 reader.close(); 676 } 677 } 678 } 679 680 /** 681 * This method should only be called from Region. It is assumed that the ranges of values in the 682 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 683 * @param seqNum sequence Id associated with the HFile 684 */ 685 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 686 Path srcPath = new Path(srcPathStr); 687 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 688 } 689 690 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 691 Path srcPath = new Path(srcPathStr); 692 try { 693 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 694 } finally { 695 if (this.getCoprocessorHost() != null) { 696 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 697 } 698 } 699 700 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 701 + " - updating store file list."); 702 703 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 704 bulkLoadHFile(sf); 705 706 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 707 708 return dstPath; 709 } 710 711 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 712 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 713 bulkLoadHFile(sf); 714 } 715 716 private void bulkLoadHFile(HStoreFile sf) throws IOException { 717 StoreFileReader r = sf.getReader(); 718 this.storeSize.addAndGet(r.length()); 719 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 720 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 721 }); 722 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 723 if (LOG.isTraceEnabled()) { 724 String traceMessage = "BULK LOAD time,size,store size,store files [" 725 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 726 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 727 LOG.trace(traceMessage); 728 } 729 } 730 731 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 732 memstore.close(); 733 // Clear so metrics doesn't find them. 734 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 735 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 736 // clear the compacted files 737 if (CollectionUtils.isNotEmpty(compactedfiles)) { 738 removeCompactedfiles(compactedfiles, 739 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 740 } 741 if (!result.isEmpty()) { 742 // initialize the thread pool for closing store files in parallel. 743 ThreadPoolExecutor storeFileCloserThreadPool = 744 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 745 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 746 747 // close each store file in parallel 748 CompletionService<Void> completionService = 749 new ExecutorCompletionService<>(storeFileCloserThreadPool); 750 for (HStoreFile f : result) { 751 completionService.submit(new Callable<Void>() { 752 @Override 753 public Void call() throws IOException { 754 boolean evictOnClose = 755 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 756 f.closeStoreFile(!warmup && evictOnClose); 757 return null; 758 } 759 }); 760 } 761 762 IOException ioe = null; 763 try { 764 for (int i = 0; i < result.size(); i++) { 765 try { 766 Future<Void> future = completionService.take(); 767 future.get(); 768 } catch (InterruptedException e) { 769 if (ioe == null) { 770 ioe = new InterruptedIOException(); 771 ioe.initCause(e); 772 } 773 } catch (ExecutionException e) { 774 if (ioe == null) { 775 ioe = new IOException(e.getCause()); 776 } 777 } 778 } 779 } finally { 780 storeFileCloserThreadPool.shutdownNow(); 781 } 782 if (ioe != null) { 783 throw ioe; 784 } 785 } 786 LOG.trace("Closed {}", this); 787 return result; 788 } 789 790 /** 791 * Close all the readers We don't need to worry about subsequent requests because the Region holds 792 * a write lock that will prevent any more reads or writes. 793 * @return the {@link StoreFile StoreFiles} that were previously being used. 794 * @throws IOException on failure 795 */ 796 public ImmutableCollection<HStoreFile> close() throws IOException { 797 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 798 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 799 // Change later if findbugs becomes smarter in the future. 800 this.archiveLock.lock(); 801 try { 802 this.storeEngine.writeLock(); 803 try { 804 return closeWithoutLock(); 805 } finally { 806 this.storeEngine.writeUnlock(); 807 } 808 } finally { 809 this.archiveLock.unlock(); 810 } 811 } 812 813 /** 814 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 815 * previously. 816 * @param logCacheFlushId flush sequence number 817 * @return The path name of the tmp file to which the store was flushed 818 * @throws IOException if exception occurs during process 819 */ 820 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 821 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 822 Consumer<Path> writerCreationTracker) throws IOException { 823 // If an exception happens flushing, we let it out without clearing 824 // the memstore snapshot. The old snapshot will be returned when we say 825 // 'snapshot', the next time flush comes around. 826 // Retry after catching exception when flushing, otherwise server will abort 827 // itself 828 StoreFlusher flusher = storeEngine.getStoreFlusher(); 829 IOException lastException = null; 830 for (int i = 0; i < flushRetriesNumber; i++) { 831 try { 832 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 833 throughputController, tracker, writerCreationTracker); 834 Path lastPathName = null; 835 try { 836 for (Path pathName : pathNames) { 837 lastPathName = pathName; 838 storeEngine.validateStoreFile(pathName, false); 839 } 840 return pathNames; 841 } catch (Exception e) { 842 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 843 if (e instanceof IOException) { 844 lastException = (IOException) e; 845 } else { 846 lastException = new IOException(e); 847 } 848 } 849 } catch (IOException e) { 850 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 851 lastException = e; 852 } 853 if (lastException != null && i < (flushRetriesNumber - 1)) { 854 try { 855 Thread.sleep(pauseTime); 856 } catch (InterruptedException e) { 857 IOException iie = new InterruptedIOException(); 858 iie.initCause(e); 859 throw iie; 860 } 861 } 862 } 863 throw lastException; 864 } 865 866 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 867 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 868 FileSystem srcFs = path.getFileSystem(conf); 869 srcFs.access(path, FsAction.READ_WRITE); 870 try (HFile.Reader reader = 871 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 872 Optional<byte[]> firstKey = reader.getFirstRowKey(); 873 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 874 Optional<ExtendedCell> lk = reader.getLastKey(); 875 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 876 byte[] lastKey = CellUtil.cloneRow(lk.get()); 877 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 878 throw new WrongRegionException("Recovered hfile " + path.toString() 879 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 880 } 881 } 882 883 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 884 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 885 StoreFileReader r = sf.getReader(); 886 this.storeSize.addAndGet(r.length()); 887 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 888 889 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 890 }); 891 892 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 893 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 894 return sf; 895 } 896 897 private long getTotalSize(Collection<HStoreFile> sfs) { 898 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 899 } 900 901 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 902 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 903 // close {@link DefaultMemStore#snapshot}, which may be used by 904 // {@link DefaultMemStore#getScanners}. 905 storeEngine.addStoreFiles(sfs, 906 // NOTE: here we must increase the refCount for storeFiles because we would open the 907 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 908 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 909 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 910 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 911 // storeEngine lock. see HBASE-27519 for more details. 912 snapshotId > 0 ? () -> { 913 this.memstore.clearSnapshot(snapshotId); 914 HStoreFile.increaseStoreFilesRefeCount(sfs); 915 } : () -> { 916 HStoreFile.increaseStoreFilesRefeCount(sfs); 917 }); 918 // notify to be called here - only in case of flushes 919 try { 920 notifyChangedReadersObservers(sfs); 921 } finally { 922 HStoreFile.decreaseStoreFilesRefeCount(sfs); 923 } 924 if (LOG.isTraceEnabled()) { 925 long totalSize = getTotalSize(sfs); 926 String traceMessage = "FLUSH time,count,size,store size,store files [" 927 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 928 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 929 LOG.trace(traceMessage); 930 } 931 return needsCompaction(); 932 } 933 934 /** 935 * Notify all observers that set of Readers has changed. 936 */ 937 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 938 for (ChangedReadersObserver o : this.changedReaderObservers) { 939 List<KeyValueScanner> memStoreScanners; 940 this.storeEngine.readLock(); 941 try { 942 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 943 } finally { 944 this.storeEngine.readUnlock(); 945 } 946 o.updateReaders(sfs, memStoreScanners); 947 } 948 } 949 950 /** 951 * Get all scanners with no filtering based on TTL (that happens further down the line). 952 * @param cacheBlocks cache the blocks or not 953 * @param usePread true to use pread, false if not 954 * @param isCompaction true if the scanner is created for compaction 955 * @param matcher the scan query matcher 956 * @param startRow the start row 957 * @param stopRow the stop row 958 * @param readPt the read point of the current scan 959 * @return all scanners for this store 960 */ 961 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 962 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, 963 boolean onlyLatestVersion) throws IOException { 964 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 965 readPt, onlyLatestVersion); 966 } 967 968 /** 969 * Get all scanners with no filtering based on TTL (that happens further down the line). 970 * @param cacheBlocks cache the blocks or not 971 * @param usePread true to use pread, false if not 972 * @param isCompaction true if the scanner is created for compaction 973 * @param matcher the scan query matcher 974 * @param startRow the start row 975 * @param includeStartRow true to include start row, false if not 976 * @param stopRow the stop row 977 * @param includeStopRow true to include stop row, false if not 978 * @param readPt the read point of the current scan 979 * @return all scanners for this store 980 */ 981 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 982 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 983 byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) 984 throws IOException { 985 Collection<HStoreFile> storeFilesToScan; 986 List<KeyValueScanner> memStoreScanners; 987 this.storeEngine.readLock(); 988 try { 989 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 990 includeStartRow, stopRow, includeStopRow, onlyLatestVersion); 991 memStoreScanners = this.memstore.getScanners(readPt); 992 // NOTE: here we must increase the refCount for storeFiles because we would open the 993 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 994 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 995 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 996 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 997 // for more details. 998 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 999 } finally { 1000 this.storeEngine.readUnlock(); 1001 } 1002 try { 1003 // First the store file scanners 1004 1005 // TODO this used to get the store files in descending order, 1006 // but now we get them in ascending order, which I think is 1007 // actually more correct, since memstore get put at the end. 1008 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 1009 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1010 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1011 scanners.addAll(sfScanners); 1012 // Then the memstore scanners 1013 scanners.addAll(memStoreScanners); 1014 return scanners; 1015 } catch (Throwable t) { 1016 clearAndClose(memStoreScanners); 1017 throw t instanceof IOException ? (IOException) t : new IOException(t); 1018 } finally { 1019 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1020 } 1021 } 1022 1023 private static void clearAndClose(List<KeyValueScanner> scanners) { 1024 if (scanners == null) { 1025 return; 1026 } 1027 for (KeyValueScanner s : scanners) { 1028 s.close(); 1029 } 1030 scanners.clear(); 1031 } 1032 1033 /** 1034 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1035 * (that happens further down the line). 1036 * @param files the list of files on which the scanners has to be created 1037 * @param cacheBlocks cache the blocks or not 1038 * @param usePread true to use pread, false if not 1039 * @param isCompaction true if the scanner is created for compaction 1040 * @param matcher the scan query matcher 1041 * @param startRow the start row 1042 * @param stopRow the stop row 1043 * @param readPt the read point of the current scan 1044 * @param includeMemstoreScanner true if memstore has to be included 1045 * @return scanners on the given files and on the memstore if specified 1046 */ 1047 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1048 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1049 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, 1050 boolean onlyLatestVersion) throws IOException { 1051 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1052 false, readPt, includeMemstoreScanner, onlyLatestVersion); 1053 } 1054 1055 /** 1056 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1057 * (that happens further down the line). 1058 * @param files the list of files on which the scanners has to be created 1059 * @param cacheBlocks ache the blocks or not 1060 * @param usePread true to use pread, false if not 1061 * @param isCompaction true if the scanner is created for compaction 1062 * @param matcher the scan query matcher 1063 * @param startRow the start row 1064 * @param includeStartRow true to include start row, false if not 1065 * @param stopRow the stop row 1066 * @param includeStopRow true to include stop row, false if not 1067 * @param readPt the read point of the current scan 1068 * @param includeMemstoreScanner true if memstore has to be included 1069 * @return scanners on the given files and on the memstore if specified 1070 */ 1071 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1072 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1073 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1074 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1075 List<KeyValueScanner> memStoreScanners = null; 1076 if (includeMemstoreScanner) { 1077 this.storeEngine.readLock(); 1078 try { 1079 memStoreScanners = this.memstore.getScanners(readPt); 1080 } finally { 1081 this.storeEngine.readUnlock(); 1082 } 1083 } 1084 try { 1085 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1086 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1087 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1088 scanners.addAll(sfScanners); 1089 // Then the memstore scanners 1090 if (memStoreScanners != null) { 1091 scanners.addAll(memStoreScanners); 1092 } 1093 return scanners; 1094 } catch (Throwable t) { 1095 clearAndClose(memStoreScanners); 1096 throw t instanceof IOException ? (IOException) t : new IOException(t); 1097 } 1098 } 1099 1100 /** 1101 * @param o Observer who wants to know about changes in set of Readers 1102 */ 1103 public void addChangedReaderObserver(ChangedReadersObserver o) { 1104 this.changedReaderObservers.add(o); 1105 } 1106 1107 /** 1108 * @param o Observer no longer interested in changes in set of Readers. 1109 */ 1110 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1111 // We don't check if observer present; it may not be (legitimately) 1112 this.changedReaderObservers.remove(o); 1113 } 1114 1115 ////////////////////////////////////////////////////////////////////////////// 1116 // Compaction 1117 ////////////////////////////////////////////////////////////////////////////// 1118 1119 /** 1120 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1121 * block for long periods. 1122 * <p> 1123 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1124 * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until the new compacted 1125 * StoreFile is completely written-out to disk. 1126 * <p> 1127 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1128 * interfering with other write operations. 1129 * <p> 1130 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1131 * we want to allow cache-flushes during this period. 1132 * <p> 1133 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1134 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1135 * is why the following events are carefully ordered for a compaction: 1136 * <ol> 1137 * <li>Compaction writes new files under region/.tmp directory (compaction output)</li> 1138 * <li>Compaction atomically moves the temporary file under region directory</li> 1139 * <li>Compaction appends a WAL edit containing the compaction input and output files. Forces sync 1140 * on WAL.</li> 1141 * <li>Compaction deletes the input files from the region directory.</li> 1142 * </ol> 1143 * Failure conditions are handled like this: 1144 * <ul> 1145 * <li>If RS fails before 2, compaction won't complete. Even if RS lives on and finishes the 1146 * compaction later, it will only write the new data file to the region directory. Since we 1147 * already have this data, this will be idempotent, but we will have a redundant copy of the 1148 * data.</li> 1149 * <li>If RS fails between 2 and 3, the region will have a redundant copy of the data. The RS that 1150 * failed won't be able to finish sync() for WAL because of lease recovery in WAL.</li> 1151 * <li>If RS fails after 3, the region server who opens the region will pick up the compaction 1152 * marker from the WAL and replay it by removing the compaction input files. Failed RS can also 1153 * attempt to delete those files, but the operation will be idempotent</li> 1154 * </ul> 1155 * See HBASE-2231 for details. 1156 * @param compaction compaction details obtained from requestCompaction() 1157 * @return The storefiles that we compacted into or null if we failed or opted out early. 1158 */ 1159 public List<HStoreFile> compact(CompactionContext compaction, 1160 ThroughputController throughputController, User user) throws IOException { 1161 assert compaction != null; 1162 CompactionRequestImpl cr = compaction.getRequest(); 1163 StoreFileWriterCreationTracker writerCreationTracker = 1164 storeFileWriterCreationTrackerFactory.get(); 1165 if (writerCreationTracker != null) { 1166 cr.setWriterCreationTracker(writerCreationTracker); 1167 storeFileWriterCreationTrackers.add(writerCreationTracker); 1168 } 1169 try { 1170 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1171 // because we need to clean up after it on the way out in a finally 1172 // block below 1173 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1174 assert compaction.hasSelection(); 1175 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1176 assert !filesToCompact.isEmpty(); 1177 synchronized (filesCompacting) { 1178 // sanity check: we're compacting files that this store knows about 1179 // TODO: change this to LOG.error() after more debugging 1180 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1181 } 1182 1183 // Ready to go. Have list of files to compact. 1184 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1185 + getRegionFileSystem().getTempDir() + ", totalSize=" 1186 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1187 1188 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1189 compaction.compact(throughputController, user)); 1190 } finally { 1191 finishCompactionRequest(cr); 1192 } 1193 } 1194 1195 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1196 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1197 throws IOException { 1198 // Do the steps necessary to complete the compaction. 1199 setStoragePolicyFromFileName(newFiles); 1200 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true); 1201 if (this.getCoprocessorHost() != null) { 1202 for (HStoreFile sf : sfs) { 1203 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1204 } 1205 } 1206 replaceStoreFiles(filesToCompact, sfs, true); 1207 1208 long outputBytes = getTotalSize(sfs); 1209 1210 // At this point the store will use new files for all new scanners. 1211 refreshStoreSizeAndTotalBytes(); // update store size. 1212 1213 long now = EnvironmentEdgeManager.currentTime(); 1214 if ( 1215 region.getRegionServerServices() != null 1216 && region.getRegionServerServices().getMetrics() != null 1217 ) { 1218 region.getRegionServerServices().getMetrics().updateCompaction( 1219 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1220 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1221 outputBytes); 1222 1223 } 1224 1225 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1226 return sfs; 1227 } 1228 1229 // Set correct storage policy from the file name of DTCP. 1230 // Rename file will not change the storage policy. 1231 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1232 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1233 for (Path newFile : newFiles) { 1234 if (newFile.getParent().getName().startsWith(prefix)) { 1235 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1236 newFile.getParent().getName().substring(prefix.length())); 1237 } 1238 } 1239 } 1240 1241 /** 1242 * Writes the compaction WAL record. 1243 * @param filesCompacted Files compacted (input). 1244 * @param newFiles Files from compaction. 1245 */ 1246 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1247 Collection<HStoreFile> newFiles) throws IOException { 1248 if (region.getWAL() == null) { 1249 return; 1250 } 1251 List<Path> inputPaths = 1252 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1253 List<Path> outputPaths = 1254 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1255 RegionInfo info = this.region.getRegionInfo(); 1256 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1257 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1258 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1259 // Fix reaching into Region to get the maxWaitForSeqId. 1260 // Does this method belong in Region altogether given it is making so many references up there? 1261 // Could be Region#writeCompactionMarker(compactionDescriptor); 1262 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1263 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1264 region.getRegionReplicationSink().orElse(null)); 1265 } 1266 1267 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1268 allowedOnPath = ".*/(HStore|TestHStore).java") 1269 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1270 boolean writeCompactionMarker) throws IOException { 1271 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1272 if (writeCompactionMarker) { 1273 writeCompactionWalRecord(compactedFiles, result); 1274 } 1275 }, () -> { 1276 synchronized (filesCompacting) { 1277 filesCompacting.removeAll(compactedFiles); 1278 } 1279 }); 1280 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1281 // sizes later so it's not super-critical if we miss these. 1282 RegionServerServices rsServices = region.getRegionServerServices(); 1283 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1284 updateSpaceQuotaAfterFileReplacement( 1285 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1286 compactedFiles, result); 1287 } 1288 } 1289 1290 /** 1291 * Updates the space quota usage for this region, removing the size for files compacted away and 1292 * adding in the size for new files. 1293 * @param sizeStore The object tracking changes in region size for space quotas. 1294 * @param regionInfo The identifier for the region whose size is being updated. 1295 * @param oldFiles Files removed from this store's region. 1296 * @param newFiles Files added to this store's region. 1297 */ 1298 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1299 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1300 long delta = 0; 1301 if (oldFiles != null) { 1302 for (HStoreFile compactedFile : oldFiles) { 1303 if (compactedFile.isHFile()) { 1304 delta -= compactedFile.getReader().length(); 1305 } 1306 } 1307 } 1308 if (newFiles != null) { 1309 for (HStoreFile newFile : newFiles) { 1310 if (newFile.isHFile()) { 1311 delta += newFile.getReader().length(); 1312 } 1313 } 1314 } 1315 sizeStore.incrementRegionSize(regionInfo, delta); 1316 } 1317 1318 /** 1319 * Log a very elaborate compaction completion message. 1320 * @param cr Request. 1321 * @param sfs Resulting files. 1322 * @param compactionStartTime Start time. 1323 */ 1324 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1325 long compactionStartTime) { 1326 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1327 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1328 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1329 if (sfs.isEmpty()) { 1330 message.append("none, "); 1331 } else { 1332 for (HStoreFile sf : sfs) { 1333 message.append(sf.getPath().getName()); 1334 message.append("(size="); 1335 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1336 message.append("), "); 1337 } 1338 } 1339 message.append("total size for store is ") 1340 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1341 .append(". This selection was in queue for ") 1342 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1343 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1344 .append(" to execute."); 1345 LOG.info(message.toString()); 1346 if (LOG.isTraceEnabled()) { 1347 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1348 long resultSize = getTotalSize(sfs); 1349 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1350 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1351 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1352 LOG.trace(traceMessage); 1353 } 1354 } 1355 1356 /** 1357 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1358 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1359 */ 1360 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1361 boolean removeFiles) throws IOException { 1362 LOG.debug("Completing compaction from the WAL marker"); 1363 List<String> compactionInputs = compaction.getCompactionInputList(); 1364 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1365 1366 // The Compaction Marker is written after the compaction is completed, 1367 // and the files moved into the region/family folder. 1368 // 1369 // If we crash after the entry is written, we may not have removed the 1370 // input files, but the output file is present. 1371 // (The unremoved input files will be removed by this function) 1372 // 1373 // If we scan the directory and the file is not present, it can mean that: 1374 // - The file was manually removed by the user 1375 // - The file was removed as consequence of subsequent compaction 1376 // so, we can't do anything with the "compaction output list" because those 1377 // files have already been loaded when opening the region (by virtue of 1378 // being in the store's folder) or they may be missing due to a compaction. 1379 1380 String familyName = this.getColumnFamilyName(); 1381 Set<String> inputFiles = new HashSet<>(); 1382 for (String compactionInput : compactionInputs) { 1383 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1384 inputFiles.add(inputPath.getName()); 1385 } 1386 1387 // some of the input files might already be deleted 1388 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1389 for (HStoreFile sf : this.getStorefiles()) { 1390 if (inputFiles.contains(sf.getPath().getName())) { 1391 inputStoreFiles.add(sf); 1392 } 1393 } 1394 1395 // check whether we need to pick up the new files 1396 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1397 1398 if (pickCompactionFiles) { 1399 for (HStoreFile sf : this.getStorefiles()) { 1400 compactionOutputs.remove(sf.getPath().getName()); 1401 } 1402 for (String compactionOutput : compactionOutputs) { 1403 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext); 1404 StoreFileInfo storeFileInfo = 1405 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput, sft); 1406 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1407 outputStoreFiles.add(storeFile); 1408 } 1409 } 1410 1411 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1412 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1413 + " with output files : " + outputStoreFiles); 1414 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1415 this.refreshStoreSizeAndTotalBytes(); 1416 } 1417 } 1418 1419 @Override 1420 public boolean hasReferences() { 1421 // Grab the read lock here, because we need to ensure that: only when the atomic 1422 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1423 this.storeEngine.readLock(); 1424 try { 1425 // Merge the current store files with compacted files here due to HBASE-20940. 1426 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1427 allStoreFiles.addAll(getCompactedFiles()); 1428 return StoreUtils.hasReferences(allStoreFiles); 1429 } finally { 1430 this.storeEngine.readUnlock(); 1431 } 1432 } 1433 1434 /** 1435 * getter for CompactionProgress object 1436 * @return CompactionProgress object; can be null 1437 */ 1438 public CompactionProgress getCompactionProgress() { 1439 return this.storeEngine.getCompactor().getProgress(); 1440 } 1441 1442 @Override 1443 public boolean shouldPerformMajorCompaction() throws IOException { 1444 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1445 // TODO: what are these reader checks all over the place? 1446 if (sf.getReader() == null) { 1447 LOG.debug("StoreFile {} has null Reader", sf); 1448 return false; 1449 } 1450 } 1451 return storeEngine.getCompactionPolicy() 1452 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); 1453 } 1454 1455 public Optional<CompactionContext> requestCompaction() throws IOException { 1456 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1457 } 1458 1459 public Optional<CompactionContext> requestCompaction(int priority, 1460 CompactionLifeCycleTracker tracker, User user) throws IOException { 1461 // don't even select for compaction if writes are disabled 1462 if (!this.areWritesEnabled()) { 1463 return Optional.empty(); 1464 } 1465 // Before we do compaction, try to get rid of unneeded files to simplify things. 1466 removeUnneededFiles(); 1467 1468 final CompactionContext compaction = storeEngine.createCompaction(); 1469 CompactionRequestImpl request = null; 1470 this.storeEngine.readLock(); 1471 try { 1472 synchronized (filesCompacting) { 1473 // First, see if coprocessor would want to override selection. 1474 if (this.getCoprocessorHost() != null) { 1475 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1476 boolean override = 1477 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1478 if (override) { 1479 // Coprocessor is overriding normal file selection. 1480 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1481 } 1482 } 1483 1484 // Normal case - coprocessor is not overriding file selection. 1485 if (!compaction.hasSelection()) { 1486 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1487 boolean mayUseOffPeak = 1488 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1489 try { 1490 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1491 forceMajor && filesCompacting.isEmpty()); 1492 } catch (IOException e) { 1493 if (mayUseOffPeak) { 1494 offPeakCompactionTracker.set(false); 1495 } 1496 throw e; 1497 } 1498 assert compaction.hasSelection(); 1499 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1500 // Compaction policy doesn't want to take advantage of off-peak. 1501 offPeakCompactionTracker.set(false); 1502 } 1503 } 1504 if (this.getCoprocessorHost() != null) { 1505 this.getCoprocessorHost().postCompactSelection(this, 1506 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1507 compaction.getRequest(), user); 1508 } 1509 // Finally, we have the resulting files list. Check if we have any files at all. 1510 request = compaction.getRequest(); 1511 Collection<HStoreFile> selectedFiles = request.getFiles(); 1512 if (selectedFiles.isEmpty()) { 1513 return Optional.empty(); 1514 } 1515 1516 addToCompactingFiles(selectedFiles); 1517 1518 // If we're enqueuing a major, clear the force flag. 1519 this.forceMajor = this.forceMajor && !request.isMajor(); 1520 1521 // Set common request properties. 1522 // Set priority, either override value supplied by caller or from store. 1523 final int compactionPriority = 1524 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1525 request.setPriority(compactionPriority); 1526 1527 if (request.isAfterSplit()) { 1528 // If the store belongs to recently splitted daughter regions, better we consider 1529 // them with the higher priority in the compaction queue. 1530 // Override priority if it is lower (higher int value) than 1531 // SPLIT_REGION_COMPACTION_PRIORITY 1532 final int splitHousekeepingPriority = 1533 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1534 request.setPriority(splitHousekeepingPriority); 1535 LOG.info( 1536 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1537 + " belongs to recently split daughter region {}", 1538 splitHousekeepingPriority, this.getColumnFamilyName(), 1539 getRegionInfo().getRegionNameAsString()); 1540 } 1541 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1542 request.setTracker(tracker); 1543 } 1544 } finally { 1545 this.storeEngine.readUnlock(); 1546 } 1547 1548 if (LOG.isDebugEnabled()) { 1549 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1550 + (request.isAllFiles() ? " (all files)" : "")); 1551 } 1552 this.region.reportCompactionRequestStart(request.isMajor()); 1553 return Optional.of(compaction); 1554 } 1555 1556 /** Adds the files to compacting files. filesCompacting must be locked. */ 1557 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1558 if (CollectionUtils.isEmpty(filesToAdd)) { 1559 return; 1560 } 1561 // Check that we do not try to compact the same StoreFile twice. 1562 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1563 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1564 } 1565 filesCompacting.addAll(filesToAdd); 1566 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1567 } 1568 1569 private void removeUnneededFiles() throws IOException { 1570 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1571 return; 1572 } 1573 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1574 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1575 getColumnFamilyDescriptor().getMinVersions()); 1576 return; 1577 } 1578 this.storeEngine.readLock(); 1579 Collection<HStoreFile> delSfs = null; 1580 try { 1581 synchronized (filesCompacting) { 1582 long cfTtl = getStoreFileTtl(); 1583 if (cfTtl != Long.MAX_VALUE) { 1584 delSfs = storeEngine.getStoreFileManager() 1585 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1586 addToCompactingFiles(delSfs); 1587 } 1588 } 1589 } finally { 1590 this.storeEngine.readUnlock(); 1591 } 1592 1593 if (CollectionUtils.isEmpty(delSfs)) { 1594 return; 1595 } 1596 1597 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1598 replaceStoreFiles(delSfs, newFiles, true); 1599 refreshStoreSizeAndTotalBytes(); 1600 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1601 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1602 } 1603 1604 public void cancelRequestedCompaction(CompactionContext compaction) { 1605 finishCompactionRequest(compaction.getRequest()); 1606 } 1607 1608 private void finishCompactionRequest(CompactionRequestImpl cr) { 1609 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1610 if (cr.isOffPeak()) { 1611 offPeakCompactionTracker.set(false); 1612 cr.setOffPeak(false); 1613 } 1614 synchronized (filesCompacting) { 1615 filesCompacting.removeAll(cr.getFiles()); 1616 } 1617 // The tracker could be null, for example, we do not need to track the creation of store file 1618 // writer due to different implementation of SFT, or the compaction is canceled. 1619 if (cr.getWriterCreationTracker() != null) { 1620 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1621 } 1622 } 1623 1624 /** 1625 * Update counts. 1626 */ 1627 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1628 this.storeSize.set(0L); 1629 this.totalUncompressedBytes.set(0L); 1630 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1631 StoreFileReader r = hsf.getReader(); 1632 if (r == null) { 1633 LOG.debug("StoreFile {} has a null Reader", hsf); 1634 continue; 1635 } 1636 this.storeSize.addAndGet(r.length()); 1637 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1638 } 1639 } 1640 1641 /* 1642 * @param wantedVersions How many versions were asked for. 1643 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1644 */ 1645 int versionsToReturn(final int wantedVersions) { 1646 if (wantedVersions <= 0) { 1647 throw new IllegalArgumentException("Number of versions must be > 0"); 1648 } 1649 // Make sure we do not return more than maximum versions for this store. 1650 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1651 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1652 } 1653 1654 @Override 1655 public boolean canSplit() { 1656 // Not split-able if we find a reference store file present in the store. 1657 boolean result = !hasReferences(); 1658 if (!result) { 1659 LOG.trace("Not splittable; has references: {}", this); 1660 } 1661 return result; 1662 } 1663 1664 /** 1665 * Determines if Store should be split. 1666 */ 1667 public Optional<byte[]> getSplitPoint() { 1668 this.storeEngine.readLock(); 1669 try { 1670 // Should already be enforced by the split policy! 1671 assert !this.getRegionInfo().isMetaRegion(); 1672 // Not split-able if we find a reference store file present in the store. 1673 if (hasReferences()) { 1674 LOG.trace("Not splittable; has references: {}", this); 1675 return Optional.empty(); 1676 } 1677 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1678 } catch (IOException e) { 1679 LOG.warn("Failed getting store size for {}", this, e); 1680 } finally { 1681 this.storeEngine.readUnlock(); 1682 } 1683 return Optional.empty(); 1684 } 1685 1686 @Override 1687 public long getLastCompactSize() { 1688 return this.lastCompactSize; 1689 } 1690 1691 @Override 1692 public long getSize() { 1693 return storeSize.get(); 1694 } 1695 1696 public void triggerMajorCompaction() { 1697 this.forceMajor = true; 1698 } 1699 1700 ////////////////////////////////////////////////////////////////////////////// 1701 // File administration 1702 ////////////////////////////////////////////////////////////////////////////// 1703 1704 /** 1705 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1706 * compaction. 1707 * @param scan Scan to apply when scanning the stores 1708 * @param targetCols columns to scan 1709 * @return a scanner over the current key values 1710 * @throws IOException on failure 1711 */ 1712 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1713 throws IOException { 1714 storeEngine.readLock(); 1715 try { 1716 ScanInfo scanInfo; 1717 if (this.getCoprocessorHost() != null) { 1718 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1719 } else { 1720 scanInfo = getScanInfo(); 1721 } 1722 return createScanner(scan, scanInfo, targetCols, readPt); 1723 } finally { 1724 storeEngine.readUnlock(); 1725 } 1726 } 1727 1728 // HMobStore will override this method to return its own implementation. 1729 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1730 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1731 return scan.isReversed() 1732 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1733 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1734 } 1735 1736 /** 1737 * Recreates the scanners on the current list of active store file scanners 1738 * @param currentFileScanners the current set of active store file scanners 1739 * @param cacheBlocks cache the blocks or not 1740 * @param usePread use pread or not 1741 * @param isCompaction is the scanner for compaction 1742 * @param matcher the scan query matcher 1743 * @param startRow the scan's start row 1744 * @param includeStartRow should the scan include the start row 1745 * @param stopRow the scan's stop row 1746 * @param includeStopRow should the scan include the stop row 1747 * @param readPt the read point of the current scane 1748 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1749 * @return list of scanners recreated on the current Scanners 1750 */ 1751 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1752 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1753 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1754 boolean includeMemstoreScanner) throws IOException { 1755 this.storeEngine.readLock(); 1756 try { 1757 Map<String, HStoreFile> name2File = 1758 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1759 for (HStoreFile file : getStorefiles()) { 1760 name2File.put(file.getFileInfo().getActiveFileName(), file); 1761 } 1762 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1763 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1764 name2File.put(file.getFileInfo().getActiveFileName(), file); 1765 } 1766 List<HStoreFile> filesToReopen = new ArrayList<>(); 1767 for (KeyValueScanner kvs : currentFileScanners) { 1768 assert kvs.isFileScanner(); 1769 if (kvs.peek() == null) { 1770 continue; 1771 } 1772 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1773 } 1774 if (filesToReopen.isEmpty()) { 1775 return null; 1776 } 1777 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1778 includeStartRow, stopRow, includeStopRow, readPt, false, false); 1779 } finally { 1780 this.storeEngine.readUnlock(); 1781 } 1782 } 1783 1784 @Override 1785 public String toString() { 1786 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1787 } 1788 1789 @Override 1790 public int getStorefilesCount() { 1791 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1792 } 1793 1794 @Override 1795 public int getCompactedFilesCount() { 1796 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1797 } 1798 1799 private LongStream getStoreFileAgeStream() { 1800 return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { 1801 if (sf.getReader() == null) { 1802 LOG.debug("StoreFile {} has a null Reader", sf); 1803 return false; 1804 } else { 1805 return true; 1806 } 1807 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1808 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1809 } 1810 1811 @Override 1812 public OptionalLong getMaxStoreFileAge() { 1813 return getStoreFileAgeStream().max(); 1814 } 1815 1816 @Override 1817 public OptionalLong getMinStoreFileAge() { 1818 return getStoreFileAgeStream().min(); 1819 } 1820 1821 @Override 1822 public OptionalDouble getAvgStoreFileAge() { 1823 return getStoreFileAgeStream().average(); 1824 } 1825 1826 @Override 1827 public long getNumReferenceFiles() { 1828 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1829 .filter(HStoreFile::isReference).count(); 1830 } 1831 1832 @Override 1833 public long getNumHFiles() { 1834 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1835 .filter(HStoreFile::isHFile).count(); 1836 } 1837 1838 @Override 1839 public long getStoreSizeUncompressed() { 1840 return this.totalUncompressedBytes.get(); 1841 } 1842 1843 @Override 1844 public long getStorefilesSize() { 1845 // Include all StoreFiles 1846 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1847 sf -> true); 1848 } 1849 1850 @Override 1851 public long getHFilesSize() { 1852 // Include only StoreFiles which are HFiles 1853 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1854 HStoreFile::isHFile); 1855 } 1856 1857 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1858 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1859 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1860 } 1861 1862 @Override 1863 public long getStorefilesRootLevelIndexSize() { 1864 return getStorefilesFieldSize(StoreFileReader::indexSize); 1865 } 1866 1867 @Override 1868 public long getTotalStaticIndexSize() { 1869 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1870 } 1871 1872 @Override 1873 public long getTotalStaticBloomSize() { 1874 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1875 } 1876 1877 @Override 1878 public MemStoreSize getMemStoreSize() { 1879 return this.memstore.size(); 1880 } 1881 1882 @Override 1883 public int getCompactPriority() { 1884 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1885 if (priority == PRIORITY_USER) { 1886 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1887 } 1888 return priority; 1889 } 1890 1891 public boolean throttleCompaction(long compactionSize) { 1892 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1893 } 1894 1895 public HRegion getHRegion() { 1896 return this.region; 1897 } 1898 1899 public RegionCoprocessorHost getCoprocessorHost() { 1900 return this.region.getCoprocessorHost(); 1901 } 1902 1903 @Override 1904 public RegionInfo getRegionInfo() { 1905 return getRegionFileSystem().getRegionInfo(); 1906 } 1907 1908 @Override 1909 public boolean areWritesEnabled() { 1910 return this.region.areWritesEnabled(); 1911 } 1912 1913 @Override 1914 public long getSmallestReadPoint() { 1915 return this.region.getSmallestReadPoint(); 1916 } 1917 1918 /** 1919 * Adds or replaces the specified KeyValues. 1920 * <p> 1921 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1922 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1923 * <p> 1924 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1925 * across all of them. 1926 * @param readpoint readpoint below which we can safely remove duplicate KVs 1927 */ 1928 public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1929 this.storeEngine.readLock(); 1930 try { 1931 this.memstore.upsert(cells, readpoint, memstoreSizing); 1932 } finally { 1933 this.storeEngine.readUnlock(); 1934 } 1935 } 1936 1937 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1938 return new StoreFlusherImpl(cacheFlushId, tracker); 1939 } 1940 1941 private final class StoreFlusherImpl implements StoreFlushContext { 1942 1943 private final FlushLifeCycleTracker tracker; 1944 private final StoreFileWriterCreationTracker writerCreationTracker; 1945 private final long cacheFlushSeqNum; 1946 private MemStoreSnapshot snapshot; 1947 private List<Path> tempFiles; 1948 private List<Path> committedFiles; 1949 private long cacheFlushCount; 1950 private long cacheFlushSize; 1951 private long outputFileSize; 1952 1953 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1954 this.cacheFlushSeqNum = cacheFlushSeqNum; 1955 this.tracker = tracker; 1956 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1957 } 1958 1959 /** 1960 * This is not thread safe. The caller should have a lock on the region or the store. If 1961 * necessary, the lock can be added with the patch provided in HBASE-10087 1962 */ 1963 @Override 1964 public MemStoreSize prepare() { 1965 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1966 this.snapshot = memstore.snapshot(); 1967 this.cacheFlushCount = snapshot.getCellsCount(); 1968 this.cacheFlushSize = snapshot.getDataSize(); 1969 committedFiles = new ArrayList<>(1); 1970 return snapshot.getMemStoreSize(); 1971 } 1972 1973 @Override 1974 public void flushCache(MonitoredTask status) throws IOException { 1975 RegionServerServices rsService = region.getRegionServerServices(); 1976 ThroughputController throughputController = 1977 rsService == null ? null : rsService.getFlushThroughputController(); 1978 // it could be null if we do not need to track the creation of store file writer due to 1979 // different SFT implementation. 1980 if (writerCreationTracker != null) { 1981 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1982 } 1983 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1984 tracker, writerCreationTracker); 1985 } 1986 1987 @Override 1988 public boolean commit(MonitoredTask status) throws IOException { 1989 try { 1990 if (CollectionUtils.isEmpty(this.tempFiles)) { 1991 return false; 1992 } 1993 status.setStatus("Flushing " + this + ": reopening flushed file"); 1994 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false, false); 1995 for (HStoreFile sf : storeFiles) { 1996 StoreFileReader r = sf.getReader(); 1997 if (LOG.isInfoEnabled()) { 1998 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 1999 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 2000 } 2001 outputFileSize += r.length(); 2002 storeSize.addAndGet(r.length()); 2003 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 2004 committedFiles.add(sf.getPath()); 2005 } 2006 2007 flushedCellsCount.addAndGet(cacheFlushCount); 2008 flushedCellsSize.addAndGet(cacheFlushSize); 2009 flushedOutputFileSize.addAndGet(outputFileSize); 2010 // call coprocessor after we have done all the accounting above 2011 for (HStoreFile sf : storeFiles) { 2012 if (getCoprocessorHost() != null) { 2013 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2014 } 2015 } 2016 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2017 return completeFlush(storeFiles, snapshot.getId()); 2018 } finally { 2019 if (writerCreationTracker != null) { 2020 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2021 } 2022 } 2023 } 2024 2025 @Override 2026 public long getOutputFileSize() { 2027 return outputFileSize; 2028 } 2029 2030 @Override 2031 public List<Path> getCommittedFiles() { 2032 return committedFiles; 2033 } 2034 2035 /** 2036 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2037 * primary region. Adds the new files to the store, and drops the snapshot depending on 2038 * dropMemstoreSnapshot argument. 2039 * @param fileNames names of the flushed files 2040 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2041 */ 2042 @Override 2043 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2044 throws IOException { 2045 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2046 for (String file : fileNames) { 2047 // open the file as a store file (hfile link, etc) 2048 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext); 2049 StoreFileInfo storeFileInfo = 2050 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file, sft); 2051 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2052 storeFiles.add(storeFile); 2053 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2054 HStore.this.totalUncompressedBytes 2055 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2056 if (LOG.isInfoEnabled()) { 2057 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2058 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2059 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2060 } 2061 } 2062 2063 long snapshotId = -1; // -1 means do not drop 2064 if (dropMemstoreSnapshot && snapshot != null) { 2065 snapshotId = snapshot.getId(); 2066 } 2067 HStore.this.completeFlush(storeFiles, snapshotId); 2068 } 2069 2070 /** 2071 * Abort the snapshot preparation. Drops the snapshot if any. 2072 */ 2073 @Override 2074 public void abort() throws IOException { 2075 if (snapshot != null) { 2076 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2077 } 2078 } 2079 } 2080 2081 @Override 2082 public boolean needsCompaction() { 2083 List<HStoreFile> filesCompactingClone = null; 2084 synchronized (filesCompacting) { 2085 filesCompactingClone = Lists.newArrayList(filesCompacting); 2086 } 2087 return this.storeEngine.needsCompaction(filesCompactingClone); 2088 } 2089 2090 /** 2091 * Used for tests. 2092 * @return cache configuration for this Store. 2093 */ 2094 public CacheConfig getCacheConfig() { 2095 return storeContext.getCacheConf(); 2096 } 2097 2098 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2099 2100 public static final long DEEP_OVERHEAD = ClassSize.align( 2101 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2102 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2103 2104 @Override 2105 public long heapSize() { 2106 MemStoreSize memstoreSize = this.memstore.size(); 2107 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2108 } 2109 2110 @Override 2111 public CellComparator getComparator() { 2112 return storeContext.getComparator(); 2113 } 2114 2115 public ScanInfo getScanInfo() { 2116 return scanInfo; 2117 } 2118 2119 /** 2120 * Set scan info, used by test 2121 * @param scanInfo new scan info to use for test 2122 */ 2123 void setScanInfo(ScanInfo scanInfo) { 2124 this.scanInfo = scanInfo; 2125 } 2126 2127 @Override 2128 public boolean hasTooManyStoreFiles() { 2129 return getStorefilesCount() > this.blockingFileCount; 2130 } 2131 2132 @Override 2133 public long getFlushedCellsCount() { 2134 return flushedCellsCount.get(); 2135 } 2136 2137 @Override 2138 public long getFlushedCellsSize() { 2139 return flushedCellsSize.get(); 2140 } 2141 2142 @Override 2143 public long getFlushedOutputFileSize() { 2144 return flushedOutputFileSize.get(); 2145 } 2146 2147 @Override 2148 public long getCompactedCellsCount() { 2149 return compactedCellsCount.get(); 2150 } 2151 2152 @Override 2153 public long getCompactedCellsSize() { 2154 return compactedCellsSize.get(); 2155 } 2156 2157 @Override 2158 public long getMajorCompactedCellsCount() { 2159 return majorCompactedCellsCount.get(); 2160 } 2161 2162 @Override 2163 public long getMajorCompactedCellsSize() { 2164 return majorCompactedCellsSize.get(); 2165 } 2166 2167 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2168 if (isMajor) { 2169 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2170 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2171 } else { 2172 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2173 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2174 } 2175 } 2176 2177 /** 2178 * Returns the StoreEngine that is backing this concrete implementation of Store. 2179 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2180 */ 2181 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2182 return this.storeEngine; 2183 } 2184 2185 protected OffPeakHours getOffPeakHours() { 2186 return this.offPeakHours; 2187 } 2188 2189 @Override 2190 public void onConfigurationChange(Configuration conf) { 2191 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2192 getColumnFamilyDescriptor()); 2193 this.conf = storeConf; 2194 this.storeEngine.compactionPolicy.setConf(storeConf); 2195 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2196 } 2197 2198 /** 2199 * {@inheritDoc} 2200 */ 2201 @Override 2202 public void registerChildren(ConfigurationManager manager) { 2203 CacheConfig cacheConfig = this.storeContext.getCacheConf(); 2204 if (cacheConfig != null) { 2205 manager.registerObserver(cacheConfig); 2206 } 2207 } 2208 2209 /** 2210 * {@inheritDoc} 2211 */ 2212 @Override 2213 public void deregisterChildren(ConfigurationManager manager) { 2214 // No children to deregister 2215 } 2216 2217 @Override 2218 public double getCompactionPressure() { 2219 return storeEngine.getStoreFileManager().getCompactionPressure(); 2220 } 2221 2222 @Override 2223 public boolean isPrimaryReplicaStore() { 2224 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2225 } 2226 2227 /** 2228 * Sets the store up for a region level snapshot operation. 2229 * @see #postSnapshotOperation() 2230 */ 2231 public void preSnapshotOperation() { 2232 archiveLock.lock(); 2233 } 2234 2235 /** 2236 * Perform tasks needed after the completion of snapshot operation. 2237 * @see #preSnapshotOperation() 2238 */ 2239 public void postSnapshotOperation() { 2240 archiveLock.unlock(); 2241 } 2242 2243 /** 2244 * Closes and archives the compacted files under this store 2245 */ 2246 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2247 // ensure other threads do not attempt to archive the same files on close() 2248 archiveLock.lock(); 2249 try { 2250 storeEngine.readLock(); 2251 Collection<HStoreFile> copyCompactedfiles = null; 2252 try { 2253 Collection<HStoreFile> compactedfiles = 2254 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2255 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2256 // Do a copy under read lock 2257 copyCompactedfiles = new ArrayList<>(compactedfiles); 2258 } else { 2259 LOG.trace("No compacted files to archive"); 2260 } 2261 } finally { 2262 storeEngine.readUnlock(); 2263 } 2264 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2265 removeCompactedfiles(copyCompactedfiles, true); 2266 } 2267 } finally { 2268 archiveLock.unlock(); 2269 } 2270 } 2271 2272 /** 2273 * Archives and removes the compacted files 2274 * @param compactedfiles The compacted files in this store that are not active in reads 2275 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2276 * closed, false if not 2277 */ 2278 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2279 throws IOException { 2280 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2281 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2282 for (final HStoreFile file : compactedfiles) { 2283 synchronized (file) { 2284 try { 2285 StoreFileReader r = file.getReader(); 2286 if (r == null) { 2287 LOG.debug("The file {} was closed but still not archived", file); 2288 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2289 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2290 // for that. 2291 long length = getStoreFileSize(file); 2292 filesToRemove.add(file); 2293 storeFileSizes.add(length); 2294 continue; 2295 } 2296 2297 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2298 // Even if deleting fails we need not bother as any new scanners won't be 2299 // able to use the compacted file as the status is already compactedAway 2300 LOG.trace("Closing and archiving the file {}", file); 2301 // Copy the file size before closing the reader 2302 final long length = r.length(); 2303 r.close(evictOnClose); 2304 // Just close and return 2305 filesToRemove.add(file); 2306 // Only add the length if we successfully added the file to `filesToRemove` 2307 storeFileSizes.add(length); 2308 } else { 2309 LOG.info("Can't archive compacted file " + file.getPath() 2310 + " because of either isCompactedAway=" + file.isCompactedAway() 2311 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2312 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2313 } 2314 } catch (Exception e) { 2315 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2316 e); 2317 } 2318 } 2319 } 2320 if (this.isPrimaryReplicaStore()) { 2321 // Only the primary region is allowed to move the file to archive. 2322 // The secondary region does not move the files to archive. Any active reads from 2323 // the secondary region will still work because the file as such has active readers on it. 2324 if (!filesToRemove.isEmpty()) { 2325 LOG.debug("Moving the files {} to archive", filesToRemove); 2326 // Only if this is successful it has to be removed 2327 try { 2328 getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2329 filesToRemove); 2330 } catch (FailedArchiveException fae) { 2331 // Even if archiving some files failed, we still need to clear out any of the 2332 // files which were successfully archived. Otherwise we will receive a 2333 // FileNotFoundException when we attempt to re-archive them in the next go around. 2334 Collection<Path> failedFiles = fae.getFailedFiles(); 2335 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2336 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2337 while (iter.hasNext()) { 2338 sizeIter.next(); 2339 if (failedFiles.contains(iter.next().getPath())) { 2340 iter.remove(); 2341 sizeIter.remove(); 2342 } 2343 } 2344 if (!filesToRemove.isEmpty()) { 2345 clearCompactedfiles(filesToRemove); 2346 } 2347 throw fae; 2348 } 2349 } 2350 } 2351 if (!filesToRemove.isEmpty()) { 2352 // Clear the compactedfiles from the store file manager 2353 clearCompactedfiles(filesToRemove); 2354 // Try to send report of this archival to the Master for updating quota usage faster 2355 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2356 } 2357 } 2358 2359 /** 2360 * Computes the length of a store file without succumbing to any errors along the way. If an error 2361 * is encountered, the implementation returns {@code 0} instead of the actual size. 2362 * @param file The file to compute the size of. 2363 * @return The size in bytes of the provided {@code file}. 2364 */ 2365 long getStoreFileSize(HStoreFile file) { 2366 long length = 0; 2367 try { 2368 file.initReader(); 2369 length = file.getReader().length(); 2370 } catch (IOException e) { 2371 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2372 file, e); 2373 } finally { 2374 try { 2375 file.closeStoreFile( 2376 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2377 } catch (IOException e) { 2378 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2379 e); 2380 } 2381 } 2382 return length; 2383 } 2384 2385 public Long preFlushSeqIDEstimation() { 2386 return memstore.preFlushSeqIDEstimation(); 2387 } 2388 2389 @Override 2390 public boolean isSloppyMemStore() { 2391 return this.memstore.isSloppy(); 2392 } 2393 2394 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2395 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2396 storeEngine.removeCompactedFiles(filesToRemove); 2397 } 2398 2399 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2400 // Sanity check from the caller 2401 if (archivedFiles.size() != fileSizes.size()) { 2402 throw new RuntimeException("Coding error: should never see lists of varying size"); 2403 } 2404 RegionServerServices rss = this.region.getRegionServerServices(); 2405 if (rss == null) { 2406 return; 2407 } 2408 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2409 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2410 for (StoreFile storeFile : archivedFiles) { 2411 final long fileSize = fileSizeIter.next(); 2412 if (storeFile.isHFile() && fileSize != 0) { 2413 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2414 } 2415 } 2416 if (LOG.isTraceEnabled()) { 2417 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2418 + filesWithSizes); 2419 } 2420 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2421 if (!success) { 2422 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2423 } 2424 } 2425 2426 @Override 2427 public int getCurrentParallelPutCount() { 2428 return currentParallelPutCount.get(); 2429 } 2430 2431 public int getStoreRefCount() { 2432 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 2433 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2434 .mapToInt(HStoreFile::getRefCount).sum(); 2435 } 2436 2437 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2438 public int getMaxCompactedStoreFileRefCount() { 2439 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2440 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2441 .mapToInt(HStoreFile::getRefCount).max(); 2442 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2443 } 2444 2445 @Override 2446 public long getMemstoreOnlyRowReadsCount() { 2447 return memstoreOnlyRowReadsCount.sum(); 2448 } 2449 2450 @Override 2451 public long getMixedRowReadsCount() { 2452 return mixedRowReadsCount.sum(); 2453 } 2454 2455 @Override 2456 public Configuration getReadOnlyConfiguration() { 2457 return new ReadOnlyConfiguration(this.conf); 2458 } 2459 2460 void updateMetricsStore(boolean memstoreRead) { 2461 if (memstoreRead) { 2462 memstoreOnlyRowReadsCount.increment(); 2463 } else { 2464 mixedRowReadsCount.increment(); 2465 } 2466 } 2467 2468 /** 2469 * Return the storefiles which are currently being written to. Mainly used by 2470 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2471 * SFT yet. 2472 */ 2473 public Set<Path> getStoreFilesBeingWritten() { 2474 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2475 .collect(Collectors.toSet()); 2476 } 2477 2478 @Override 2479 public long getBloomFilterRequestsCount() { 2480 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2481 } 2482 2483 @Override 2484 public long getBloomFilterNegativeResultsCount() { 2485 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2486 } 2487 2488 @Override 2489 public long getBloomFilterEligibleRequestsCount() { 2490 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2491 } 2492}