001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.MemoryCompactionPolicy; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.backup.FailedArchiveException; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.conf.ConfigurationManager; 071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 072import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 073import org.apache.hadoop.hbase.io.HeapSize; 074import org.apache.hadoop.hbase.io.hfile.CacheConfig; 075import org.apache.hadoop.hbase.io.hfile.HFile; 076import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 077import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 078import org.apache.hadoop.hbase.io.hfile.HFileScanner; 079import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 080import org.apache.hadoop.hbase.monitoring.MonitoredTask; 081import org.apache.hadoop.hbase.quotas.RegionSizeStore; 082import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 083import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 084import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 086import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 087import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 088import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 089import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 090import org.apache.hadoop.hbase.security.EncryptionUtil; 091import org.apache.hadoop.hbase.security.User; 092import org.apache.hadoop.hbase.util.Bytes; 093import org.apache.hadoop.hbase.util.ClassSize; 094import org.apache.hadoop.hbase.util.CommonFSUtils; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.Pair; 097import org.apache.hadoop.hbase.util.ReflectionUtils; 098import org.apache.hadoop.util.StringUtils; 099import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 100import org.apache.yetus.audience.InterfaceAudience; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 107import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 108import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 110import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 111 112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 114 115/** 116 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 117 * which stretch backwards over time. 118 * <p> 119 * There's no reason to consider append-logging at this level; all logging and locking is handled at 120 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 121 * important of those services is compaction services where files are aggregated once they pass a 122 * configurable threshold. 123 * <p> 124 * Locking and transactions are handled at a higher level. This API should not be called directly 125 * but by an HRegion manager. 126 */ 127@InterfaceAudience.Private 128public class HStore 129 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 130 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 131 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 132 "hbase.server.compactchecker.interval.multiplier"; 133 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 134 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 135 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 136 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 137 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 138 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 139 140 // HBASE-24428 : Update compaction priority for recently split daughter regions 141 // so as to prioritize their compaction. 142 // Any compaction candidate with higher priority than compaction of newly split daugher regions 143 // should have priority value < (Integer.MIN_VALUE + 1000) 144 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 145 146 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 147 148 protected final MemStore memstore; 149 // This stores directory in the filesystem. 150 private final HRegion region; 151 protected Configuration conf; 152 private long lastCompactSize = 0; 153 volatile boolean forceMajor = false; 154 private AtomicLong storeSize = new AtomicLong(); 155 private AtomicLong totalUncompressedBytes = new AtomicLong(); 156 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 157 // rows that has cells from both memstore and files (or only files) 158 private LongAdder mixedRowReadsCount = new LongAdder(); 159 160 /** 161 * Lock specific to archiving compacted store files. This avoids races around the combination of 162 * retrieving the list of compacted files and moving them to the archive directory. Since this is 163 * usually a background process (other than on close), we don't want to handle this with the store 164 * write lock, which would block readers and degrade performance. Locked by: - 165 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 166 */ 167 final ReentrantLock archiveLock = new ReentrantLock(); 168 169 private final boolean verifyBulkLoads; 170 171 /** 172 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 173 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 174 * message that identifies the Store experience this high-level of concurrency. 175 */ 176 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 177 private final int parallelPutCountPrintThreshold; 178 179 private ScanInfo scanInfo; 180 181 // All access must be synchronized. 182 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 183 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 184 185 // All access must be synchronized. 186 private final Set<ChangedReadersObserver> changedReaderObservers = 187 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 188 189 private HFileDataBlockEncoder dataBlockEncoder; 190 191 final StoreEngine<?, ?, ?, ?> storeEngine; 192 193 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 194 private volatile OffPeakHours offPeakHours; 195 196 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 197 private int flushRetriesNumber; 198 private int pauseTime; 199 200 private long blockingFileCount; 201 private int compactionCheckMultiplier; 202 203 private AtomicLong flushedCellsCount = new AtomicLong(); 204 private AtomicLong compactedCellsCount = new AtomicLong(); 205 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 206 private AtomicLong flushedCellsSize = new AtomicLong(); 207 private AtomicLong flushedOutputFileSize = new AtomicLong(); 208 private AtomicLong compactedCellsSize = new AtomicLong(); 209 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 210 211 private final StoreContext storeContext; 212 213 // Used to track the store files which are currently being written. For compaction, if we want to 214 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 215 // track the store files being written when flushing. 216 // Notice that the creation is in the background compaction or flush thread and we will get the 217 // files in other thread, so it needs to be thread safe. 218 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 219 220 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 221 222 @Override 223 public void accept(Path t) { 224 files.add(t); 225 } 226 227 public Set<Path> get() { 228 return Collections.unmodifiableSet(files); 229 } 230 } 231 232 // We may have multiple compaction running at the same time, and flush can also happen at the same 233 // time, so here we need to use a collection, and the collection needs to be thread safe. 234 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 235 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 236 // IdentityHashMap if later we want to implement hashCode or equals. 237 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 238 Collections.newSetFromMap(new ConcurrentHashMap<>()); 239 240 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 241 // the broken store files under the data directory, which means we do not need to track the store 242 // file writer creation. So here we abstract a factory to return different trackers for different 243 // SFT implementations. 244 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 245 246 private final boolean warmup; 247 248 /** 249 * Constructor 250 * @param family HColumnDescriptor for this column 251 * @param confParam configuration object failed. Can be null. 252 */ 253 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 254 final Configuration confParam, boolean warmup) throws IOException { 255 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 256 257 this.region = region; 258 this.storeContext = initializeStoreContext(family); 259 260 // Assemble the store's home directory and Ensure it exists. 261 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 262 263 // set block storage policy for store directory 264 String policyName = family.getStoragePolicy(); 265 if (null == policyName) { 266 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 267 } 268 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 269 270 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 271 272 // used by ScanQueryMatcher 273 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 274 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 275 // Get TTL 276 long ttl = determineTTLFromFamily(family); 277 // Why not just pass a HColumnDescriptor in here altogether? Even if have 278 // to clone it? 279 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, region.getCellComparator()); 280 this.memstore = getMemstore(); 281 282 this.offPeakHours = OffPeakHours.getInstance(conf); 283 284 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 285 286 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 287 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 288 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 289 if (this.compactionCheckMultiplier <= 0) { 290 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 291 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 292 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 293 } 294 295 this.warmup = warmup; 296 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 297 storeEngine.initialize(warmup); 298 // if require writing to tmp dir first, then we just return null, which indicate that we do not 299 // need to track the creation of store file writer, otherwise we return a new 300 // StoreFileWriterCreationTracker. 301 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 302 ? () -> null 303 : () -> new StoreFileWriterCreationTracker(); 304 refreshStoreSizeAndTotalBytes(); 305 306 flushRetriesNumber = 307 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 308 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 309 if (flushRetriesNumber <= 0) { 310 throw new IllegalArgumentException( 311 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 312 } 313 314 int confPrintThreshold = 315 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 316 if (confPrintThreshold < 10) { 317 confPrintThreshold = 10; 318 } 319 this.parallelPutCountPrintThreshold = confPrintThreshold; 320 321 LOG.info( 322 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 323 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 324 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 325 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 326 } 327 328 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 329 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 330 .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family)) 331 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 332 .withCellComparator(region.getCellComparator()).withColumnFamilyDescriptor(family) 333 .withCompactedFilesSupplier(this::getCompactedFiles) 334 .withRegionFileSystem(region.getRegionFileSystem()) 335 .withFavoredNodesSupplier(this::getFavoredNodes) 336 .withFamilyStoreDirectoryPath( 337 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 338 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 339 } 340 341 private InetSocketAddress[] getFavoredNodes() { 342 InetSocketAddress[] favoredNodes = null; 343 if (region.getRegionServerServices() != null) { 344 favoredNodes = region.getRegionServerServices() 345 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 346 } 347 return favoredNodes; 348 } 349 350 /** Returns MemStore Instance to use in this store. */ 351 private MemStore getMemstore() { 352 MemStore ms = null; 353 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 354 MemoryCompactionPolicy inMemoryCompaction = null; 355 if (this.getTableName().isSystemTable()) { 356 inMemoryCompaction = MemoryCompactionPolicy 357 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE")); 358 } else { 359 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 360 } 361 if (inMemoryCompaction == null) { 362 inMemoryCompaction = 363 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 364 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 365 } 366 367 switch (inMemoryCompaction) { 368 case NONE: 369 Class<? extends MemStore> memStoreClass = 370 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 371 ms = ReflectionUtils.newInstance(memStoreClass, 372 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 373 break; 374 default: 375 Class<? extends CompactingMemStore> compactingMemStoreClass = 376 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 377 ms = 378 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 379 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 380 } 381 return ms; 382 } 383 384 /** 385 * Creates the cache config. 386 * @param family The current column family. 387 */ 388 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 389 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 390 region.getRegionServicesForStores().getByteBuffAllocator()); 391 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 392 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 393 return cacheConf; 394 } 395 396 /** 397 * Creates the store engine configured for the given Store. 398 * @param store The store. An unfortunate dependency needed due to it being passed to 399 * coprocessors via the compactor. 400 * @param conf Store configuration. 401 * @param kvComparator KVComparator for storeFileManager. 402 * @return StoreEngine to use. 403 */ 404 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 405 CellComparator kvComparator) throws IOException { 406 return StoreEngine.create(store, conf, kvComparator); 407 } 408 409 /** Returns TTL in seconds of the specified family */ 410 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 411 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 412 long ttl = family.getTimeToLive(); 413 if (ttl == HConstants.FOREVER) { 414 // Default is unlimited ttl. 415 ttl = Long.MAX_VALUE; 416 } else if (ttl == -1) { 417 ttl = Long.MAX_VALUE; 418 } else { 419 // Second -> ms adjust for user data 420 ttl *= 1000; 421 } 422 return ttl; 423 } 424 425 StoreContext getStoreContext() { 426 return storeContext; 427 } 428 429 @Override 430 public String getColumnFamilyName() { 431 return this.storeContext.getFamily().getNameAsString(); 432 } 433 434 @Override 435 public TableName getTableName() { 436 return this.getRegionInfo().getTable(); 437 } 438 439 @Override 440 public FileSystem getFileSystem() { 441 return storeContext.getRegionFileSystem().getFileSystem(); 442 } 443 444 public HRegionFileSystem getRegionFileSystem() { 445 return storeContext.getRegionFileSystem(); 446 } 447 448 /* Implementation of StoreConfigInformation */ 449 @Override 450 public long getStoreFileTtl() { 451 // TTL only applies if there's no MIN_VERSIONs setting on the column. 452 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 453 } 454 455 @Override 456 public long getMemStoreFlushSize() { 457 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 458 return this.region.memstoreFlushSize; 459 } 460 461 @Override 462 public MemStoreSize getFlushableSize() { 463 return this.memstore.getFlushableSize(); 464 } 465 466 @Override 467 public MemStoreSize getSnapshotSize() { 468 return this.memstore.getSnapshotSize(); 469 } 470 471 @Override 472 public long getCompactionCheckMultiplier() { 473 return this.compactionCheckMultiplier; 474 } 475 476 @Override 477 public long getBlockingFileCount() { 478 return blockingFileCount; 479 } 480 /* End implementation of StoreConfigInformation */ 481 482 @Override 483 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 484 return this.storeContext.getFamily(); 485 } 486 487 @Override 488 public OptionalLong getMaxSequenceId() { 489 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 490 } 491 492 @Override 493 public OptionalLong getMaxMemStoreTS() { 494 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 495 } 496 497 /** Returns the data block encoder */ 498 public HFileDataBlockEncoder getDataBlockEncoder() { 499 return dataBlockEncoder; 500 } 501 502 /** 503 * Should be used only in tests. 504 * @param blockEncoder the block delta encoder to use 505 */ 506 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 507 this.dataBlockEncoder = blockEncoder; 508 } 509 510 private void postRefreshStoreFiles() throws IOException { 511 // Advance the memstore read point to be at least the new store files seqIds so that 512 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 513 // in-flight transactions might be made visible) 514 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 515 refreshStoreSizeAndTotalBytes(); 516 } 517 518 @Override 519 public void refreshStoreFiles() throws IOException { 520 storeEngine.refreshStoreFiles(); 521 postRefreshStoreFiles(); 522 } 523 524 /** 525 * Replaces the store files that the store has with the given files. Mainly used by secondary 526 * region replicas to keep up to date with the primary region files. 527 */ 528 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 529 storeEngine.refreshStoreFiles(newFiles); 530 postRefreshStoreFiles(); 531 } 532 533 /** 534 * This message intends to inform the MemStore that next coming updates are going to be part of 535 * the replaying edits from WAL 536 */ 537 public void startReplayingFromWAL() { 538 this.memstore.startReplayingFromWAL(); 539 } 540 541 /** 542 * This message intends to inform the MemStore that the replaying edits from WAL are done 543 */ 544 public void stopReplayingFromWAL() { 545 this.memstore.stopReplayingFromWAL(); 546 } 547 548 /** 549 * Adds a value to the memstore 550 */ 551 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 552 storeEngine.readLock(); 553 try { 554 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 555 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 556 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 557 } 558 this.memstore.add(cell, memstoreSizing); 559 } finally { 560 storeEngine.readUnlock(); 561 currentParallelPutCount.decrementAndGet(); 562 } 563 } 564 565 /** 566 * Adds the specified value to the memstore 567 */ 568 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 569 storeEngine.readLock(); 570 try { 571 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 572 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 573 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 574 } 575 memstore.add(cells, memstoreSizing); 576 } finally { 577 storeEngine.readUnlock(); 578 currentParallelPutCount.decrementAndGet(); 579 } 580 } 581 582 @Override 583 public long timeOfOldestEdit() { 584 return memstore.timeOfOldestEdit(); 585 } 586 587 /** Returns All store files. */ 588 @Override 589 public Collection<HStoreFile> getStorefiles() { 590 return this.storeEngine.getStoreFileManager().getStorefiles(); 591 } 592 593 @Override 594 public Collection<HStoreFile> getCompactedFiles() { 595 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 596 } 597 598 /** 599 * This throws a WrongRegionException if the HFile does not fit in this region, or an 600 * InvalidHFileException if the HFile is not valid. 601 */ 602 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 603 HFile.Reader reader = null; 604 try { 605 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 606 FileSystem srcFs = srcPath.getFileSystem(conf); 607 srcFs.access(srcPath, FsAction.READ_WRITE); 608 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 609 610 Optional<byte[]> firstKey = reader.getFirstRowKey(); 611 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 612 Optional<Cell> lk = reader.getLastKey(); 613 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 614 byte[] lastKey = CellUtil.cloneRow(lk.get()); 615 616 if (LOG.isDebugEnabled()) { 617 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 618 + Bytes.toStringBinary(lastKey)); 619 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 620 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 621 } 622 623 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 624 throw new WrongRegionException("Bulk load file " + srcPath.toString() 625 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 626 } 627 628 if ( 629 reader.length() 630 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 631 ) { 632 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 633 + " bytes can be problematic as it may lead to oversplitting."); 634 } 635 636 if (verifyBulkLoads) { 637 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 638 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 639 Cell prevCell = null; 640 HFileScanner scanner = reader.getScanner(conf, false, false, false); 641 scanner.seekTo(); 642 do { 643 Cell cell = scanner.getCell(); 644 if (prevCell != null) { 645 if (getComparator().compareRows(prevCell, cell) > 0) { 646 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 647 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 648 + CellUtil.getCellKeyAsString(cell)); 649 } 650 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 651 throw new InvalidHFileException("Previous key had different" 652 + " family compared to current key: path=" + srcPath + " previous=" 653 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 654 prevCell.getFamilyLength()) 655 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 656 cell.getFamilyLength())); 657 } 658 } 659 prevCell = cell; 660 } while (scanner.next()); 661 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 662 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 663 } 664 } finally { 665 if (reader != null) { 666 reader.close(); 667 } 668 } 669 } 670 671 /** 672 * This method should only be called from Region. It is assumed that the ranges of values in the 673 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 674 * @param seqNum sequence Id associated with the HFile 675 */ 676 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 677 Path srcPath = new Path(srcPathStr); 678 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 679 } 680 681 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 682 Path srcPath = new Path(srcPathStr); 683 try { 684 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 685 } finally { 686 if (this.getCoprocessorHost() != null) { 687 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 688 } 689 } 690 691 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 692 + " - updating store file list."); 693 694 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 695 bulkLoadHFile(sf); 696 697 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 698 699 return dstPath; 700 } 701 702 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 703 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 704 bulkLoadHFile(sf); 705 } 706 707 private void bulkLoadHFile(HStoreFile sf) throws IOException { 708 StoreFileReader r = sf.getReader(); 709 this.storeSize.addAndGet(r.length()); 710 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 711 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 712 }); 713 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 714 if (LOG.isTraceEnabled()) { 715 String traceMessage = "BULK LOAD time,size,store size,store files [" 716 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 717 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 718 LOG.trace(traceMessage); 719 } 720 } 721 722 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 723 // Clear so metrics doesn't find them. 724 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 725 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 726 // clear the compacted files 727 if (CollectionUtils.isNotEmpty(compactedfiles)) { 728 removeCompactedfiles(compactedfiles, 729 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 730 } 731 if (!result.isEmpty()) { 732 // initialize the thread pool for closing store files in parallel. 733 ThreadPoolExecutor storeFileCloserThreadPool = 734 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 735 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 736 737 // close each store file in parallel 738 CompletionService<Void> completionService = 739 new ExecutorCompletionService<>(storeFileCloserThreadPool); 740 for (HStoreFile f : result) { 741 completionService.submit(new Callable<Void>() { 742 @Override 743 public Void call() throws IOException { 744 boolean evictOnClose = 745 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 746 f.closeStoreFile(!warmup && evictOnClose); 747 return null; 748 } 749 }); 750 } 751 752 IOException ioe = null; 753 try { 754 for (int i = 0; i < result.size(); i++) { 755 try { 756 Future<Void> future = completionService.take(); 757 future.get(); 758 } catch (InterruptedException e) { 759 if (ioe == null) { 760 ioe = new InterruptedIOException(); 761 ioe.initCause(e); 762 } 763 } catch (ExecutionException e) { 764 if (ioe == null) { 765 ioe = new IOException(e.getCause()); 766 } 767 } 768 } 769 } finally { 770 storeFileCloserThreadPool.shutdownNow(); 771 } 772 if (ioe != null) { 773 throw ioe; 774 } 775 } 776 LOG.trace("Closed {}", this); 777 return result; 778 } 779 780 /** 781 * Close all the readers We don't need to worry about subsequent requests because the Region holds 782 * a write lock that will prevent any more reads or writes. 783 * @return the {@link StoreFile StoreFiles} that were previously being used. 784 * @throws IOException on failure 785 */ 786 public ImmutableCollection<HStoreFile> close() throws IOException { 787 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 788 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 789 // Change later if findbugs becomes smarter in the future. 790 this.archiveLock.lock(); 791 try { 792 this.storeEngine.writeLock(); 793 try { 794 return closeWithoutLock(); 795 } finally { 796 this.storeEngine.writeUnlock(); 797 } 798 } finally { 799 this.archiveLock.unlock(); 800 } 801 } 802 803 /** 804 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 805 * previously. 806 * @param logCacheFlushId flush sequence number 807 * @return The path name of the tmp file to which the store was flushed 808 * @throws IOException if exception occurs during process 809 */ 810 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 811 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 812 Consumer<Path> writerCreationTracker) throws IOException { 813 // If an exception happens flushing, we let it out without clearing 814 // the memstore snapshot. The old snapshot will be returned when we say 815 // 'snapshot', the next time flush comes around. 816 // Retry after catching exception when flushing, otherwise server will abort 817 // itself 818 StoreFlusher flusher = storeEngine.getStoreFlusher(); 819 IOException lastException = null; 820 for (int i = 0; i < flushRetriesNumber; i++) { 821 try { 822 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 823 throughputController, tracker, writerCreationTracker); 824 Path lastPathName = null; 825 try { 826 for (Path pathName : pathNames) { 827 lastPathName = pathName; 828 storeEngine.validateStoreFile(pathName); 829 } 830 return pathNames; 831 } catch (Exception e) { 832 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 833 if (e instanceof IOException) { 834 lastException = (IOException) e; 835 } else { 836 lastException = new IOException(e); 837 } 838 } 839 } catch (IOException e) { 840 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 841 lastException = e; 842 } 843 if (lastException != null && i < (flushRetriesNumber - 1)) { 844 try { 845 Thread.sleep(pauseTime); 846 } catch (InterruptedException e) { 847 IOException iie = new InterruptedIOException(); 848 iie.initCause(e); 849 throw iie; 850 } 851 } 852 } 853 throw lastException; 854 } 855 856 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 857 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 858 FileSystem srcFs = path.getFileSystem(conf); 859 srcFs.access(path, FsAction.READ_WRITE); 860 try (HFile.Reader reader = 861 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 862 Optional<byte[]> firstKey = reader.getFirstRowKey(); 863 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 864 Optional<Cell> lk = reader.getLastKey(); 865 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 866 byte[] lastKey = CellUtil.cloneRow(lk.get()); 867 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 868 throw new WrongRegionException("Recovered hfile " + path.toString() 869 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 870 } 871 } 872 873 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 874 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 875 StoreFileReader r = sf.getReader(); 876 this.storeSize.addAndGet(r.length()); 877 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 878 879 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 880 }); 881 882 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 883 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 884 return sf; 885 } 886 887 private long getTotalSize(Collection<HStoreFile> sfs) { 888 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 889 } 890 891 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 892 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 893 // close {@link DefaultMemStore#snapshot}, which may be used by 894 // {@link DefaultMemStore#getScanners}. 895 storeEngine.addStoreFiles(sfs, 896 // NOTE: here we must increase the refCount for storeFiles because we would open the 897 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 898 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 899 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 900 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 901 // storeEngine lock. see HBASE-27519 for more details. 902 snapshotId > 0 ? () -> { 903 this.memstore.clearSnapshot(snapshotId); 904 HStoreFile.increaseStoreFilesRefeCount(sfs); 905 } : () -> { 906 HStoreFile.increaseStoreFilesRefeCount(sfs); 907 }); 908 // notify to be called here - only in case of flushes 909 try { 910 notifyChangedReadersObservers(sfs); 911 } finally { 912 HStoreFile.decreaseStoreFilesRefeCount(sfs); 913 } 914 if (LOG.isTraceEnabled()) { 915 long totalSize = getTotalSize(sfs); 916 String traceMessage = "FLUSH time,count,size,store size,store files [" 917 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 918 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 919 LOG.trace(traceMessage); 920 } 921 return needsCompaction(); 922 } 923 924 /** 925 * Notify all observers that set of Readers has changed. 926 */ 927 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 928 for (ChangedReadersObserver o : this.changedReaderObservers) { 929 List<KeyValueScanner> memStoreScanners; 930 this.storeEngine.readLock(); 931 try { 932 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 933 } finally { 934 this.storeEngine.readUnlock(); 935 } 936 o.updateReaders(sfs, memStoreScanners); 937 } 938 } 939 940 /** 941 * Get all scanners with no filtering based on TTL (that happens further down the line). 942 * @param cacheBlocks cache the blocks or not 943 * @param usePread true to use pread, false if not 944 * @param isCompaction true if the scanner is created for compaction 945 * @param matcher the scan query matcher 946 * @param startRow the start row 947 * @param stopRow the stop row 948 * @param readPt the read point of the current scan 949 * @return all scanners for this store 950 */ 951 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 952 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 953 throws IOException { 954 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 955 readPt); 956 } 957 958 /** 959 * Get all scanners with no filtering based on TTL (that happens further down the line). 960 * @param cacheBlocks cache the blocks or not 961 * @param usePread true to use pread, false if not 962 * @param isCompaction true if the scanner is created for compaction 963 * @param matcher the scan query matcher 964 * @param startRow the start row 965 * @param includeStartRow true to include start row, false if not 966 * @param stopRow the stop row 967 * @param includeStopRow true to include stop row, false if not 968 * @param readPt the read point of the current scan 969 * @return all scanners for this store 970 */ 971 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 972 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 973 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 974 Collection<HStoreFile> storeFilesToScan; 975 List<KeyValueScanner> memStoreScanners; 976 this.storeEngine.readLock(); 977 try { 978 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 979 includeStartRow, stopRow, includeStopRow); 980 memStoreScanners = this.memstore.getScanners(readPt); 981 // NOTE: here we must increase the refCount for storeFiles because we would open the 982 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 983 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 984 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 985 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 986 // for more details. 987 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 988 } finally { 989 this.storeEngine.readUnlock(); 990 } 991 try { 992 // First the store file scanners 993 994 // TODO this used to get the store files in descending order, 995 // but now we get them in ascending order, which I think is 996 // actually more correct, since memstore get put at the end. 997 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 998 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 999 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1000 scanners.addAll(sfScanners); 1001 // Then the memstore scanners 1002 scanners.addAll(memStoreScanners); 1003 return scanners; 1004 } catch (Throwable t) { 1005 clearAndClose(memStoreScanners); 1006 throw t instanceof IOException ? (IOException) t : new IOException(t); 1007 } finally { 1008 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1009 } 1010 } 1011 1012 private static void clearAndClose(List<KeyValueScanner> scanners) { 1013 if (scanners == null) { 1014 return; 1015 } 1016 for (KeyValueScanner s : scanners) { 1017 s.close(); 1018 } 1019 scanners.clear(); 1020 } 1021 1022 /** 1023 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1024 * (that happens further down the line). 1025 * @param files the list of files on which the scanners has to be created 1026 * @param cacheBlocks cache the blocks or not 1027 * @param usePread true to use pread, false if not 1028 * @param isCompaction true if the scanner is created for compaction 1029 * @param matcher the scan query matcher 1030 * @param startRow the start row 1031 * @param stopRow the stop row 1032 * @param readPt the read point of the current scan 1033 * @param includeMemstoreScanner true if memstore has to be included 1034 * @return scanners on the given files and on the memstore if specified 1035 */ 1036 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1037 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1038 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1039 throws IOException { 1040 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1041 false, readPt, includeMemstoreScanner); 1042 } 1043 1044 /** 1045 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1046 * (that happens further down the line). 1047 * @param files the list of files on which the scanners has to be created 1048 * @param cacheBlocks ache the blocks or not 1049 * @param usePread true to use pread, false if not 1050 * @param isCompaction true if the scanner is created for compaction 1051 * @param matcher the scan query matcher 1052 * @param startRow the start row 1053 * @param includeStartRow true to include start row, false if not 1054 * @param stopRow the stop row 1055 * @param includeStopRow true to include stop row, false if not 1056 * @param readPt the read point of the current scan 1057 * @param includeMemstoreScanner true if memstore has to be included 1058 * @return scanners on the given files and on the memstore if specified 1059 */ 1060 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1061 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1062 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1063 boolean includeMemstoreScanner) throws IOException { 1064 List<KeyValueScanner> memStoreScanners = null; 1065 if (includeMemstoreScanner) { 1066 this.storeEngine.readLock(); 1067 try { 1068 memStoreScanners = this.memstore.getScanners(readPt); 1069 } finally { 1070 this.storeEngine.readUnlock(); 1071 } 1072 } 1073 try { 1074 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1075 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1076 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1077 scanners.addAll(sfScanners); 1078 // Then the memstore scanners 1079 if (memStoreScanners != null) { 1080 scanners.addAll(memStoreScanners); 1081 } 1082 return scanners; 1083 } catch (Throwable t) { 1084 clearAndClose(memStoreScanners); 1085 throw t instanceof IOException ? (IOException) t : new IOException(t); 1086 } 1087 } 1088 1089 /** 1090 * @param o Observer who wants to know about changes in set of Readers 1091 */ 1092 public void addChangedReaderObserver(ChangedReadersObserver o) { 1093 this.changedReaderObservers.add(o); 1094 } 1095 1096 /** 1097 * @param o Observer no longer interested in changes in set of Readers. 1098 */ 1099 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1100 // We don't check if observer present; it may not be (legitimately) 1101 this.changedReaderObservers.remove(o); 1102 } 1103 1104 ////////////////////////////////////////////////////////////////////////////// 1105 // Compaction 1106 ////////////////////////////////////////////////////////////////////////////// 1107 1108 /** 1109 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1110 * block for long periods. 1111 * <p> 1112 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1113 * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted 1114 * StoreFile is completely written-out to disk. 1115 * <p> 1116 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1117 * interfering with other write operations. 1118 * <p> 1119 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1120 * we want to allow cache-flushes during this period. 1121 * <p> 1122 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1123 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1124 * is why the following events are carefully ordered for a compaction: 1. Compaction writes new 1125 * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the 1126 * temporary file under region directory 3. Compaction appends a WAL edit containing the 1127 * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files 1128 * from the region directory. Failure conditions are handled like this: - If RS fails before 2, 1129 * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only 1130 * write the new data file to the region directory. Since we already have this data, this will be 1131 * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the 1132 * region will have a redundant copy of the data. The RS that failed won't be able to finish 1133 * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region 1134 * server who opens the region will pick up the the compaction marker from the WAL and replay it 1135 * by removing the compaction input files. Failed RS can also attempt to delete those files, but 1136 * the operation will be idempotent See HBASE-2231 for details. 1137 * @param compaction compaction details obtained from requestCompaction() 1138 * @return Storefile we compacted into or null if we failed or opted out early. 1139 */ 1140 public List<HStoreFile> compact(CompactionContext compaction, 1141 ThroughputController throughputController, User user) throws IOException { 1142 assert compaction != null; 1143 CompactionRequestImpl cr = compaction.getRequest(); 1144 StoreFileWriterCreationTracker writerCreationTracker = 1145 storeFileWriterCreationTrackerFactory.get(); 1146 if (writerCreationTracker != null) { 1147 cr.setWriterCreationTracker(writerCreationTracker); 1148 storeFileWriterCreationTrackers.add(writerCreationTracker); 1149 } 1150 try { 1151 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1152 // because we need to clean up after it on the way out in a finally 1153 // block below 1154 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1155 assert compaction.hasSelection(); 1156 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1157 assert !filesToCompact.isEmpty(); 1158 synchronized (filesCompacting) { 1159 // sanity check: we're compacting files that this store knows about 1160 // TODO: change this to LOG.error() after more debugging 1161 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1162 } 1163 1164 // Ready to go. Have list of files to compact. 1165 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1166 + getRegionFileSystem().getTempDir() + ", totalSize=" 1167 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1168 1169 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1170 compaction.compact(throughputController, user)); 1171 } finally { 1172 finishCompactionRequest(cr); 1173 } 1174 } 1175 1176 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1177 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1178 throws IOException { 1179 // Do the steps necessary to complete the compaction. 1180 setStoragePolicyFromFileName(newFiles); 1181 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true); 1182 if (this.getCoprocessorHost() != null) { 1183 for (HStoreFile sf : sfs) { 1184 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1185 } 1186 } 1187 replaceStoreFiles(filesToCompact, sfs, true); 1188 1189 long outputBytes = getTotalSize(sfs); 1190 1191 // At this point the store will use new files for all new scanners. 1192 refreshStoreSizeAndTotalBytes(); // update store size. 1193 1194 long now = EnvironmentEdgeManager.currentTime(); 1195 if ( 1196 region.getRegionServerServices() != null 1197 && region.getRegionServerServices().getMetrics() != null 1198 ) { 1199 region.getRegionServerServices().getMetrics().updateCompaction( 1200 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1201 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1202 outputBytes); 1203 1204 } 1205 1206 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1207 return sfs; 1208 } 1209 1210 // Set correct storage policy from the file name of DTCP. 1211 // Rename file will not change the storage policy. 1212 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1213 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1214 for (Path newFile : newFiles) { 1215 if (newFile.getParent().getName().startsWith(prefix)) { 1216 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1217 newFile.getParent().getName().substring(prefix.length())); 1218 } 1219 } 1220 } 1221 1222 /** 1223 * Writes the compaction WAL record. 1224 * @param filesCompacted Files compacted (input). 1225 * @param newFiles Files from compaction. 1226 */ 1227 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1228 Collection<HStoreFile> newFiles) throws IOException { 1229 if (region.getWAL() == null) { 1230 return; 1231 } 1232 List<Path> inputPaths = 1233 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1234 List<Path> outputPaths = 1235 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1236 RegionInfo info = this.region.getRegionInfo(); 1237 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1238 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1239 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1240 // Fix reaching into Region to get the maxWaitForSeqId. 1241 // Does this method belong in Region altogether given it is making so many references up there? 1242 // Could be Region#writeCompactionMarker(compactionDescriptor); 1243 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1244 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1245 region.getRegionReplicationSink().orElse(null)); 1246 } 1247 1248 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1249 allowedOnPath = ".*/(HStore|TestHStore).java") 1250 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1251 boolean writeCompactionMarker) throws IOException { 1252 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1253 if (writeCompactionMarker) { 1254 writeCompactionWalRecord(compactedFiles, result); 1255 } 1256 }, () -> { 1257 synchronized (filesCompacting) { 1258 filesCompacting.removeAll(compactedFiles); 1259 } 1260 }); 1261 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1262 // sizes later so it's not super-critical if we miss these. 1263 RegionServerServices rsServices = region.getRegionServerServices(); 1264 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1265 updateSpaceQuotaAfterFileReplacement( 1266 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1267 compactedFiles, result); 1268 } 1269 } 1270 1271 /** 1272 * Updates the space quota usage for this region, removing the size for files compacted away and 1273 * adding in the size for new files. 1274 * @param sizeStore The object tracking changes in region size for space quotas. 1275 * @param regionInfo The identifier for the region whose size is being updated. 1276 * @param oldFiles Files removed from this store's region. 1277 * @param newFiles Files added to this store's region. 1278 */ 1279 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1280 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1281 long delta = 0; 1282 if (oldFiles != null) { 1283 for (HStoreFile compactedFile : oldFiles) { 1284 if (compactedFile.isHFile()) { 1285 delta -= compactedFile.getReader().length(); 1286 } 1287 } 1288 } 1289 if (newFiles != null) { 1290 for (HStoreFile newFile : newFiles) { 1291 if (newFile.isHFile()) { 1292 delta += newFile.getReader().length(); 1293 } 1294 } 1295 } 1296 sizeStore.incrementRegionSize(regionInfo, delta); 1297 } 1298 1299 /** 1300 * Log a very elaborate compaction completion message. 1301 * @param cr Request. 1302 * @param sfs Resulting files. 1303 * @param compactionStartTime Start time. 1304 */ 1305 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1306 long compactionStartTime) { 1307 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1308 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1309 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1310 if (sfs.isEmpty()) { 1311 message.append("none, "); 1312 } else { 1313 for (HStoreFile sf : sfs) { 1314 message.append(sf.getPath().getName()); 1315 message.append("(size="); 1316 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1317 message.append("), "); 1318 } 1319 } 1320 message.append("total size for store is ") 1321 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1322 .append(". This selection was in queue for ") 1323 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1324 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1325 .append(" to execute."); 1326 LOG.info(message.toString()); 1327 if (LOG.isTraceEnabled()) { 1328 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1329 long resultSize = getTotalSize(sfs); 1330 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1331 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1332 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1333 LOG.trace(traceMessage); 1334 } 1335 } 1336 1337 /** 1338 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1339 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1340 */ 1341 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1342 boolean removeFiles) throws IOException { 1343 LOG.debug("Completing compaction from the WAL marker"); 1344 List<String> compactionInputs = compaction.getCompactionInputList(); 1345 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1346 1347 // The Compaction Marker is written after the compaction is completed, 1348 // and the files moved into the region/family folder. 1349 // 1350 // If we crash after the entry is written, we may not have removed the 1351 // input files, but the output file is present. 1352 // (The unremoved input files will be removed by this function) 1353 // 1354 // If we scan the directory and the file is not present, it can mean that: 1355 // - The file was manually removed by the user 1356 // - The file was removed as consequence of subsequent compaction 1357 // so, we can't do anything with the "compaction output list" because those 1358 // files have already been loaded when opening the region (by virtue of 1359 // being in the store's folder) or they may be missing due to a compaction. 1360 1361 String familyName = this.getColumnFamilyName(); 1362 Set<String> inputFiles = new HashSet<>(); 1363 for (String compactionInput : compactionInputs) { 1364 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1365 inputFiles.add(inputPath.getName()); 1366 } 1367 1368 // some of the input files might already be deleted 1369 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1370 for (HStoreFile sf : this.getStorefiles()) { 1371 if (inputFiles.contains(sf.getPath().getName())) { 1372 inputStoreFiles.add(sf); 1373 } 1374 } 1375 1376 // check whether we need to pick up the new files 1377 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1378 1379 if (pickCompactionFiles) { 1380 for (HStoreFile sf : this.getStorefiles()) { 1381 compactionOutputs.remove(sf.getPath().getName()); 1382 } 1383 for (String compactionOutput : compactionOutputs) { 1384 StoreFileInfo storeFileInfo = 1385 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1386 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1387 outputStoreFiles.add(storeFile); 1388 } 1389 } 1390 1391 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1392 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1393 + " with output files : " + outputStoreFiles); 1394 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1395 this.refreshStoreSizeAndTotalBytes(); 1396 } 1397 } 1398 1399 @Override 1400 public boolean hasReferences() { 1401 // Grab the read lock here, because we need to ensure that: only when the atomic 1402 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1403 this.storeEngine.readLock(); 1404 try { 1405 // Merge the current store files with compacted files here due to HBASE-20940. 1406 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1407 allStoreFiles.addAll(getCompactedFiles()); 1408 return StoreUtils.hasReferences(allStoreFiles); 1409 } finally { 1410 this.storeEngine.readUnlock(); 1411 } 1412 } 1413 1414 /** 1415 * getter for CompactionProgress object 1416 * @return CompactionProgress object; can be null 1417 */ 1418 public CompactionProgress getCompactionProgress() { 1419 return this.storeEngine.getCompactor().getProgress(); 1420 } 1421 1422 @Override 1423 public boolean shouldPerformMajorCompaction() throws IOException { 1424 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1425 // TODO: what are these reader checks all over the place? 1426 if (sf.getReader() == null) { 1427 LOG.debug("StoreFile {} has null Reader", sf); 1428 return false; 1429 } 1430 } 1431 return storeEngine.getCompactionPolicy() 1432 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles()); 1433 } 1434 1435 public Optional<CompactionContext> requestCompaction() throws IOException { 1436 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1437 } 1438 1439 public Optional<CompactionContext> requestCompaction(int priority, 1440 CompactionLifeCycleTracker tracker, User user) throws IOException { 1441 // don't even select for compaction if writes are disabled 1442 if (!this.areWritesEnabled()) { 1443 return Optional.empty(); 1444 } 1445 // Before we do compaction, try to get rid of unneeded files to simplify things. 1446 removeUnneededFiles(); 1447 1448 final CompactionContext compaction = storeEngine.createCompaction(); 1449 CompactionRequestImpl request = null; 1450 this.storeEngine.readLock(); 1451 try { 1452 synchronized (filesCompacting) { 1453 // First, see if coprocessor would want to override selection. 1454 if (this.getCoprocessorHost() != null) { 1455 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1456 boolean override = 1457 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1458 if (override) { 1459 // Coprocessor is overriding normal file selection. 1460 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1461 } 1462 } 1463 1464 // Normal case - coprocessor is not overriding file selection. 1465 if (!compaction.hasSelection()) { 1466 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1467 boolean mayUseOffPeak = 1468 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1469 try { 1470 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1471 forceMajor && filesCompacting.isEmpty()); 1472 } catch (IOException e) { 1473 if (mayUseOffPeak) { 1474 offPeakCompactionTracker.set(false); 1475 } 1476 throw e; 1477 } 1478 assert compaction.hasSelection(); 1479 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1480 // Compaction policy doesn't want to take advantage of off-peak. 1481 offPeakCompactionTracker.set(false); 1482 } 1483 } 1484 if (this.getCoprocessorHost() != null) { 1485 this.getCoprocessorHost().postCompactSelection(this, 1486 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1487 compaction.getRequest(), user); 1488 } 1489 // Finally, we have the resulting files list. Check if we have any files at all. 1490 request = compaction.getRequest(); 1491 Collection<HStoreFile> selectedFiles = request.getFiles(); 1492 if (selectedFiles.isEmpty()) { 1493 return Optional.empty(); 1494 } 1495 1496 addToCompactingFiles(selectedFiles); 1497 1498 // If we're enqueuing a major, clear the force flag. 1499 this.forceMajor = this.forceMajor && !request.isMajor(); 1500 1501 // Set common request properties. 1502 // Set priority, either override value supplied by caller or from store. 1503 final int compactionPriority = 1504 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1505 request.setPriority(compactionPriority); 1506 1507 if (request.isAfterSplit()) { 1508 // If the store belongs to recently splitted daughter regions, better we consider 1509 // them with the higher priority in the compaction queue. 1510 // Override priority if it is lower (higher int value) than 1511 // SPLIT_REGION_COMPACTION_PRIORITY 1512 final int splitHousekeepingPriority = 1513 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1514 request.setPriority(splitHousekeepingPriority); 1515 LOG.info( 1516 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1517 + " belongs to recently split daughter region {}", 1518 splitHousekeepingPriority, this.getColumnFamilyName(), 1519 getRegionInfo().getRegionNameAsString()); 1520 } 1521 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1522 request.setTracker(tracker); 1523 } 1524 } finally { 1525 this.storeEngine.readUnlock(); 1526 } 1527 1528 if (LOG.isDebugEnabled()) { 1529 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1530 + (request.isAllFiles() ? " (all files)" : "")); 1531 } 1532 this.region.reportCompactionRequestStart(request.isMajor()); 1533 return Optional.of(compaction); 1534 } 1535 1536 /** Adds the files to compacting files. filesCompacting must be locked. */ 1537 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1538 if (CollectionUtils.isEmpty(filesToAdd)) { 1539 return; 1540 } 1541 // Check that we do not try to compact the same StoreFile twice. 1542 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1543 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1544 } 1545 filesCompacting.addAll(filesToAdd); 1546 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1547 } 1548 1549 private void removeUnneededFiles() throws IOException { 1550 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1551 return; 1552 } 1553 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1554 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1555 getColumnFamilyDescriptor().getMinVersions()); 1556 return; 1557 } 1558 this.storeEngine.readLock(); 1559 Collection<HStoreFile> delSfs = null; 1560 try { 1561 synchronized (filesCompacting) { 1562 long cfTtl = getStoreFileTtl(); 1563 if (cfTtl != Long.MAX_VALUE) { 1564 delSfs = storeEngine.getStoreFileManager() 1565 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1566 addToCompactingFiles(delSfs); 1567 } 1568 } 1569 } finally { 1570 this.storeEngine.readUnlock(); 1571 } 1572 1573 if (CollectionUtils.isEmpty(delSfs)) { 1574 return; 1575 } 1576 1577 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1578 replaceStoreFiles(delSfs, newFiles, true); 1579 refreshStoreSizeAndTotalBytes(); 1580 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1581 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1582 } 1583 1584 public void cancelRequestedCompaction(CompactionContext compaction) { 1585 finishCompactionRequest(compaction.getRequest()); 1586 } 1587 1588 private void finishCompactionRequest(CompactionRequestImpl cr) { 1589 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1590 if (cr.isOffPeak()) { 1591 offPeakCompactionTracker.set(false); 1592 cr.setOffPeak(false); 1593 } 1594 synchronized (filesCompacting) { 1595 filesCompacting.removeAll(cr.getFiles()); 1596 } 1597 // The tracker could be null, for example, we do not need to track the creation of store file 1598 // writer due to different implementation of SFT, or the compaction is canceled. 1599 if (cr.getWriterCreationTracker() != null) { 1600 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1601 } 1602 } 1603 1604 /** 1605 * Update counts. 1606 */ 1607 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1608 this.storeSize.set(0L); 1609 this.totalUncompressedBytes.set(0L); 1610 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1611 StoreFileReader r = hsf.getReader(); 1612 if (r == null) { 1613 LOG.warn("StoreFile {} has a null Reader", hsf); 1614 continue; 1615 } 1616 this.storeSize.addAndGet(r.length()); 1617 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1618 } 1619 } 1620 1621 /* 1622 * @param wantedVersions How many versions were asked for. 1623 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1624 */ 1625 int versionsToReturn(final int wantedVersions) { 1626 if (wantedVersions <= 0) { 1627 throw new IllegalArgumentException("Number of versions must be > 0"); 1628 } 1629 // Make sure we do not return more than maximum versions for this store. 1630 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1631 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1632 } 1633 1634 @Override 1635 public boolean canSplit() { 1636 // Not split-able if we find a reference store file present in the store. 1637 boolean result = !hasReferences(); 1638 if (!result) { 1639 LOG.trace("Not splittable; has references: {}", this); 1640 } 1641 return result; 1642 } 1643 1644 /** 1645 * Determines if Store should be split. 1646 */ 1647 public Optional<byte[]> getSplitPoint() { 1648 this.storeEngine.readLock(); 1649 try { 1650 // Should already be enforced by the split policy! 1651 assert !this.getRegionInfo().isMetaRegion(); 1652 // Not split-able if we find a reference store file present in the store. 1653 if (hasReferences()) { 1654 LOG.trace("Not splittable; has references: {}", this); 1655 return Optional.empty(); 1656 } 1657 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1658 } catch (IOException e) { 1659 LOG.warn("Failed getting store size for {}", this, e); 1660 } finally { 1661 this.storeEngine.readUnlock(); 1662 } 1663 return Optional.empty(); 1664 } 1665 1666 @Override 1667 public long getLastCompactSize() { 1668 return this.lastCompactSize; 1669 } 1670 1671 @Override 1672 public long getSize() { 1673 return storeSize.get(); 1674 } 1675 1676 public void triggerMajorCompaction() { 1677 this.forceMajor = true; 1678 } 1679 1680 ////////////////////////////////////////////////////////////////////////////// 1681 // File administration 1682 ////////////////////////////////////////////////////////////////////////////// 1683 1684 /** 1685 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1686 * compaction. 1687 * @param scan Scan to apply when scanning the stores 1688 * @param targetCols columns to scan 1689 * @return a scanner over the current key values 1690 * @throws IOException on failure 1691 */ 1692 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1693 throws IOException { 1694 storeEngine.readLock(); 1695 try { 1696 ScanInfo scanInfo; 1697 if (this.getCoprocessorHost() != null) { 1698 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1699 } else { 1700 scanInfo = getScanInfo(); 1701 } 1702 return createScanner(scan, scanInfo, targetCols, readPt); 1703 } finally { 1704 storeEngine.readUnlock(); 1705 } 1706 } 1707 1708 // HMobStore will override this method to return its own implementation. 1709 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1710 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1711 return scan.isReversed() 1712 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1713 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1714 } 1715 1716 /** 1717 * Recreates the scanners on the current list of active store file scanners 1718 * @param currentFileScanners the current set of active store file scanners 1719 * @param cacheBlocks cache the blocks or not 1720 * @param usePread use pread or not 1721 * @param isCompaction is the scanner for compaction 1722 * @param matcher the scan query matcher 1723 * @param startRow the scan's start row 1724 * @param includeStartRow should the scan include the start row 1725 * @param stopRow the scan's stop row 1726 * @param includeStopRow should the scan include the stop row 1727 * @param readPt the read point of the current scane 1728 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1729 * @return list of scanners recreated on the current Scanners 1730 */ 1731 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1732 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1733 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1734 boolean includeMemstoreScanner) throws IOException { 1735 this.storeEngine.readLock(); 1736 try { 1737 Map<String, HStoreFile> name2File = 1738 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1739 for (HStoreFile file : getStorefiles()) { 1740 name2File.put(file.getFileInfo().getActiveFileName(), file); 1741 } 1742 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1743 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1744 name2File.put(file.getFileInfo().getActiveFileName(), file); 1745 } 1746 List<HStoreFile> filesToReopen = new ArrayList<>(); 1747 for (KeyValueScanner kvs : currentFileScanners) { 1748 assert kvs.isFileScanner(); 1749 if (kvs.peek() == null) { 1750 continue; 1751 } 1752 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1753 } 1754 if (filesToReopen.isEmpty()) { 1755 return null; 1756 } 1757 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1758 includeStartRow, stopRow, includeStopRow, readPt, false); 1759 } finally { 1760 this.storeEngine.readUnlock(); 1761 } 1762 } 1763 1764 @Override 1765 public String toString() { 1766 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1767 } 1768 1769 @Override 1770 public int getStorefilesCount() { 1771 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1772 } 1773 1774 @Override 1775 public int getCompactedFilesCount() { 1776 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1777 } 1778 1779 private LongStream getStoreFileAgeStream() { 1780 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 1781 if (sf.getReader() == null) { 1782 LOG.warn("StoreFile {} has a null Reader", sf); 1783 return false; 1784 } else { 1785 return true; 1786 } 1787 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1788 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1789 } 1790 1791 @Override 1792 public OptionalLong getMaxStoreFileAge() { 1793 return getStoreFileAgeStream().max(); 1794 } 1795 1796 @Override 1797 public OptionalLong getMinStoreFileAge() { 1798 return getStoreFileAgeStream().min(); 1799 } 1800 1801 @Override 1802 public OptionalDouble getAvgStoreFileAge() { 1803 return getStoreFileAgeStream().average(); 1804 } 1805 1806 @Override 1807 public long getNumReferenceFiles() { 1808 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1809 .filter(HStoreFile::isReference).count(); 1810 } 1811 1812 @Override 1813 public long getNumHFiles() { 1814 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1815 .filter(HStoreFile::isHFile).count(); 1816 } 1817 1818 @Override 1819 public long getStoreSizeUncompressed() { 1820 return this.totalUncompressedBytes.get(); 1821 } 1822 1823 @Override 1824 public long getStorefilesSize() { 1825 // Include all StoreFiles 1826 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 1827 sf -> true); 1828 } 1829 1830 @Override 1831 public long getHFilesSize() { 1832 // Include only StoreFiles which are HFiles 1833 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 1834 HStoreFile::isHFile); 1835 } 1836 1837 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1838 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1839 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1840 } 1841 1842 @Override 1843 public long getStorefilesRootLevelIndexSize() { 1844 return getStorefilesFieldSize(StoreFileReader::indexSize); 1845 } 1846 1847 @Override 1848 public long getTotalStaticIndexSize() { 1849 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1850 } 1851 1852 @Override 1853 public long getTotalStaticBloomSize() { 1854 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1855 } 1856 1857 @Override 1858 public MemStoreSize getMemStoreSize() { 1859 return this.memstore.size(); 1860 } 1861 1862 @Override 1863 public int getCompactPriority() { 1864 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1865 if (priority == PRIORITY_USER) { 1866 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1867 } 1868 return priority; 1869 } 1870 1871 public boolean throttleCompaction(long compactionSize) { 1872 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1873 } 1874 1875 public HRegion getHRegion() { 1876 return this.region; 1877 } 1878 1879 public RegionCoprocessorHost getCoprocessorHost() { 1880 return this.region.getCoprocessorHost(); 1881 } 1882 1883 @Override 1884 public RegionInfo getRegionInfo() { 1885 return getRegionFileSystem().getRegionInfo(); 1886 } 1887 1888 @Override 1889 public boolean areWritesEnabled() { 1890 return this.region.areWritesEnabled(); 1891 } 1892 1893 @Override 1894 public long getSmallestReadPoint() { 1895 return this.region.getSmallestReadPoint(); 1896 } 1897 1898 /** 1899 * Adds or replaces the specified KeyValues. 1900 * <p> 1901 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1902 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1903 * <p> 1904 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1905 * across all of them. 1906 * @param readpoint readpoint below which we can safely remove duplicate KVs 1907 */ 1908 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1909 this.storeEngine.readLock(); 1910 try { 1911 this.memstore.upsert(cells, readpoint, memstoreSizing); 1912 } finally { 1913 this.storeEngine.readUnlock(); 1914 } 1915 } 1916 1917 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1918 return new StoreFlusherImpl(cacheFlushId, tracker); 1919 } 1920 1921 private final class StoreFlusherImpl implements StoreFlushContext { 1922 1923 private final FlushLifeCycleTracker tracker; 1924 private final StoreFileWriterCreationTracker writerCreationTracker; 1925 private final long cacheFlushSeqNum; 1926 private MemStoreSnapshot snapshot; 1927 private List<Path> tempFiles; 1928 private List<Path> committedFiles; 1929 private long cacheFlushCount; 1930 private long cacheFlushSize; 1931 private long outputFileSize; 1932 1933 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1934 this.cacheFlushSeqNum = cacheFlushSeqNum; 1935 this.tracker = tracker; 1936 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1937 } 1938 1939 /** 1940 * This is not thread safe. The caller should have a lock on the region or the store. If 1941 * necessary, the lock can be added with the patch provided in HBASE-10087 1942 */ 1943 @Override 1944 public MemStoreSize prepare() { 1945 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1946 this.snapshot = memstore.snapshot(); 1947 this.cacheFlushCount = snapshot.getCellsCount(); 1948 this.cacheFlushSize = snapshot.getDataSize(); 1949 committedFiles = new ArrayList<>(1); 1950 return snapshot.getMemStoreSize(); 1951 } 1952 1953 @Override 1954 public void flushCache(MonitoredTask status) throws IOException { 1955 RegionServerServices rsService = region.getRegionServerServices(); 1956 ThroughputController throughputController = 1957 rsService == null ? null : rsService.getFlushThroughputController(); 1958 // it could be null if we do not need to track the creation of store file writer due to 1959 // different SFT implementation. 1960 if (writerCreationTracker != null) { 1961 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1962 } 1963 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1964 tracker, writerCreationTracker); 1965 } 1966 1967 @Override 1968 public boolean commit(MonitoredTask status) throws IOException { 1969 try { 1970 if (CollectionUtils.isEmpty(this.tempFiles)) { 1971 return false; 1972 } 1973 status.setStatus("Flushing " + this + ": reopening flushed file"); 1974 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false); 1975 for (HStoreFile sf : storeFiles) { 1976 StoreFileReader r = sf.getReader(); 1977 if (LOG.isInfoEnabled()) { 1978 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 1979 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1980 } 1981 outputFileSize += r.length(); 1982 storeSize.addAndGet(r.length()); 1983 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1984 committedFiles.add(sf.getPath()); 1985 } 1986 1987 flushedCellsCount.addAndGet(cacheFlushCount); 1988 flushedCellsSize.addAndGet(cacheFlushSize); 1989 flushedOutputFileSize.addAndGet(outputFileSize); 1990 // call coprocessor after we have done all the accounting above 1991 for (HStoreFile sf : storeFiles) { 1992 if (getCoprocessorHost() != null) { 1993 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 1994 } 1995 } 1996 // Add new file to store files. Clear snapshot too while we have the Store write lock. 1997 return completeFlush(storeFiles, snapshot.getId()); 1998 } finally { 1999 if (writerCreationTracker != null) { 2000 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2001 } 2002 } 2003 } 2004 2005 @Override 2006 public long getOutputFileSize() { 2007 return outputFileSize; 2008 } 2009 2010 @Override 2011 public List<Path> getCommittedFiles() { 2012 return committedFiles; 2013 } 2014 2015 /** 2016 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2017 * primary region. Adds the new files to the store, and drops the snapshot depending on 2018 * dropMemstoreSnapshot argument. 2019 * @param fileNames names of the flushed files 2020 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2021 */ 2022 @Override 2023 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2024 throws IOException { 2025 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2026 for (String file : fileNames) { 2027 // open the file as a store file (hfile link, etc) 2028 StoreFileInfo storeFileInfo = 2029 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); 2030 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2031 storeFiles.add(storeFile); 2032 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2033 HStore.this.totalUncompressedBytes 2034 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2035 if (LOG.isInfoEnabled()) { 2036 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2037 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2038 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2039 } 2040 } 2041 2042 long snapshotId = -1; // -1 means do not drop 2043 if (dropMemstoreSnapshot && snapshot != null) { 2044 snapshotId = snapshot.getId(); 2045 } 2046 HStore.this.completeFlush(storeFiles, snapshotId); 2047 } 2048 2049 /** 2050 * Abort the snapshot preparation. Drops the snapshot if any. 2051 */ 2052 @Override 2053 public void abort() throws IOException { 2054 if (snapshot != null) { 2055 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2056 } 2057 } 2058 } 2059 2060 @Override 2061 public boolean needsCompaction() { 2062 List<HStoreFile> filesCompactingClone = null; 2063 synchronized (filesCompacting) { 2064 filesCompactingClone = Lists.newArrayList(filesCompacting); 2065 } 2066 return this.storeEngine.needsCompaction(filesCompactingClone); 2067 } 2068 2069 /** 2070 * Used for tests. 2071 * @return cache configuration for this Store. 2072 */ 2073 public CacheConfig getCacheConfig() { 2074 return storeContext.getCacheConf(); 2075 } 2076 2077 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2078 2079 public static final long DEEP_OVERHEAD = ClassSize.align( 2080 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2081 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2082 2083 @Override 2084 public long heapSize() { 2085 MemStoreSize memstoreSize = this.memstore.size(); 2086 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2087 } 2088 2089 @Override 2090 public CellComparator getComparator() { 2091 return storeContext.getComparator(); 2092 } 2093 2094 public ScanInfo getScanInfo() { 2095 return scanInfo; 2096 } 2097 2098 /** 2099 * Set scan info, used by test 2100 * @param scanInfo new scan info to use for test 2101 */ 2102 void setScanInfo(ScanInfo scanInfo) { 2103 this.scanInfo = scanInfo; 2104 } 2105 2106 @Override 2107 public boolean hasTooManyStoreFiles() { 2108 return getStorefilesCount() > this.blockingFileCount; 2109 } 2110 2111 @Override 2112 public long getFlushedCellsCount() { 2113 return flushedCellsCount.get(); 2114 } 2115 2116 @Override 2117 public long getFlushedCellsSize() { 2118 return flushedCellsSize.get(); 2119 } 2120 2121 @Override 2122 public long getFlushedOutputFileSize() { 2123 return flushedOutputFileSize.get(); 2124 } 2125 2126 @Override 2127 public long getCompactedCellsCount() { 2128 return compactedCellsCount.get(); 2129 } 2130 2131 @Override 2132 public long getCompactedCellsSize() { 2133 return compactedCellsSize.get(); 2134 } 2135 2136 @Override 2137 public long getMajorCompactedCellsCount() { 2138 return majorCompactedCellsCount.get(); 2139 } 2140 2141 @Override 2142 public long getMajorCompactedCellsSize() { 2143 return majorCompactedCellsSize.get(); 2144 } 2145 2146 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2147 if (isMajor) { 2148 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2149 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2150 } else { 2151 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2152 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2153 } 2154 } 2155 2156 /** 2157 * Returns the StoreEngine that is backing this concrete implementation of Store. 2158 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2159 */ 2160 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2161 return this.storeEngine; 2162 } 2163 2164 protected OffPeakHours getOffPeakHours() { 2165 return this.offPeakHours; 2166 } 2167 2168 @Override 2169 public void onConfigurationChange(Configuration conf) { 2170 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2171 getColumnFamilyDescriptor()); 2172 this.conf = storeConf; 2173 this.storeEngine.compactionPolicy.setConf(storeConf); 2174 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2175 } 2176 2177 /** 2178 * {@inheritDoc} 2179 */ 2180 @Override 2181 public void registerChildren(ConfigurationManager manager) { 2182 // No children to register 2183 } 2184 2185 /** 2186 * {@inheritDoc} 2187 */ 2188 @Override 2189 public void deregisterChildren(ConfigurationManager manager) { 2190 // No children to deregister 2191 } 2192 2193 @Override 2194 public double getCompactionPressure() { 2195 return storeEngine.getStoreFileManager().getCompactionPressure(); 2196 } 2197 2198 @Override 2199 public boolean isPrimaryReplicaStore() { 2200 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2201 } 2202 2203 /** 2204 * Sets the store up for a region level snapshot operation. 2205 * @see #postSnapshotOperation() 2206 */ 2207 public void preSnapshotOperation() { 2208 archiveLock.lock(); 2209 } 2210 2211 /** 2212 * Perform tasks needed after the completion of snapshot operation. 2213 * @see #preSnapshotOperation() 2214 */ 2215 public void postSnapshotOperation() { 2216 archiveLock.unlock(); 2217 } 2218 2219 /** 2220 * Closes and archives the compacted files under this store 2221 */ 2222 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2223 // ensure other threads do not attempt to archive the same files on close() 2224 archiveLock.lock(); 2225 try { 2226 storeEngine.readLock(); 2227 Collection<HStoreFile> copyCompactedfiles = null; 2228 try { 2229 Collection<HStoreFile> compactedfiles = 2230 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2231 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2232 // Do a copy under read lock 2233 copyCompactedfiles = new ArrayList<>(compactedfiles); 2234 } else { 2235 LOG.trace("No compacted files to archive"); 2236 } 2237 } finally { 2238 storeEngine.readUnlock(); 2239 } 2240 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2241 removeCompactedfiles(copyCompactedfiles, true); 2242 } 2243 } finally { 2244 archiveLock.unlock(); 2245 } 2246 } 2247 2248 /** 2249 * Archives and removes the compacted files 2250 * @param compactedfiles The compacted files in this store that are not active in reads 2251 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2252 * closed, false if not 2253 */ 2254 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2255 throws IOException { 2256 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2257 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2258 for (final HStoreFile file : compactedfiles) { 2259 synchronized (file) { 2260 try { 2261 StoreFileReader r = file.getReader(); 2262 if (r == null) { 2263 LOG.debug("The file {} was closed but still not archived", file); 2264 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2265 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2266 // for that. 2267 long length = getStoreFileSize(file); 2268 filesToRemove.add(file); 2269 storeFileSizes.add(length); 2270 continue; 2271 } 2272 2273 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2274 // Even if deleting fails we need not bother as any new scanners won't be 2275 // able to use the compacted file as the status is already compactedAway 2276 LOG.trace("Closing and archiving the file {}", file); 2277 // Copy the file size before closing the reader 2278 final long length = r.length(); 2279 r.close(evictOnClose); 2280 // Just close and return 2281 filesToRemove.add(file); 2282 // Only add the length if we successfully added the file to `filesToRemove` 2283 storeFileSizes.add(length); 2284 } else { 2285 LOG.info("Can't archive compacted file " + file.getPath() 2286 + " because of either isCompactedAway=" + file.isCompactedAway() 2287 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2288 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2289 } 2290 } catch (Exception e) { 2291 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2292 e); 2293 } 2294 } 2295 } 2296 if (this.isPrimaryReplicaStore()) { 2297 // Only the primary region is allowed to move the file to archive. 2298 // The secondary region does not move the files to archive. Any active reads from 2299 // the secondary region will still work because the file as such has active readers on it. 2300 if (!filesToRemove.isEmpty()) { 2301 LOG.debug("Moving the files {} to archive", filesToRemove); 2302 // Only if this is successful it has to be removed 2303 try { 2304 getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2305 filesToRemove); 2306 } catch (FailedArchiveException fae) { 2307 // Even if archiving some files failed, we still need to clear out any of the 2308 // files which were successfully archived. Otherwise we will receive a 2309 // FileNotFoundException when we attempt to re-archive them in the next go around. 2310 Collection<Path> failedFiles = fae.getFailedFiles(); 2311 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2312 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2313 while (iter.hasNext()) { 2314 sizeIter.next(); 2315 if (failedFiles.contains(iter.next().getPath())) { 2316 iter.remove(); 2317 sizeIter.remove(); 2318 } 2319 } 2320 if (!filesToRemove.isEmpty()) { 2321 clearCompactedfiles(filesToRemove); 2322 } 2323 throw fae; 2324 } 2325 } 2326 } 2327 if (!filesToRemove.isEmpty()) { 2328 // Clear the compactedfiles from the store file manager 2329 clearCompactedfiles(filesToRemove); 2330 // Try to send report of this archival to the Master for updating quota usage faster 2331 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2332 } 2333 } 2334 2335 /** 2336 * Computes the length of a store file without succumbing to any errors along the way. If an error 2337 * is encountered, the implementation returns {@code 0} instead of the actual size. 2338 * @param file The file to compute the size of. 2339 * @return The size in bytes of the provided {@code file}. 2340 */ 2341 long getStoreFileSize(HStoreFile file) { 2342 long length = 0; 2343 try { 2344 file.initReader(); 2345 length = file.getReader().length(); 2346 } catch (IOException e) { 2347 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2348 file, e); 2349 } finally { 2350 try { 2351 file.closeStoreFile( 2352 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2353 } catch (IOException e) { 2354 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2355 e); 2356 } 2357 } 2358 return length; 2359 } 2360 2361 public Long preFlushSeqIDEstimation() { 2362 return memstore.preFlushSeqIDEstimation(); 2363 } 2364 2365 @Override 2366 public boolean isSloppyMemStore() { 2367 return this.memstore.isSloppy(); 2368 } 2369 2370 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2371 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2372 storeEngine.removeCompactedFiles(filesToRemove); 2373 } 2374 2375 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2376 // Sanity check from the caller 2377 if (archivedFiles.size() != fileSizes.size()) { 2378 throw new RuntimeException("Coding error: should never see lists of varying size"); 2379 } 2380 RegionServerServices rss = this.region.getRegionServerServices(); 2381 if (rss == null) { 2382 return; 2383 } 2384 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2385 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2386 for (StoreFile storeFile : archivedFiles) { 2387 final long fileSize = fileSizeIter.next(); 2388 if (storeFile.isHFile() && fileSize != 0) { 2389 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2390 } 2391 } 2392 if (LOG.isTraceEnabled()) { 2393 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2394 + filesWithSizes); 2395 } 2396 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2397 if (!success) { 2398 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2399 } 2400 } 2401 2402 @Override 2403 public int getCurrentParallelPutCount() { 2404 return currentParallelPutCount.get(); 2405 } 2406 2407 public int getStoreRefCount() { 2408 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2409 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2410 .mapToInt(HStoreFile::getRefCount).sum(); 2411 } 2412 2413 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2414 public int getMaxCompactedStoreFileRefCount() { 2415 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2416 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2417 .mapToInt(HStoreFile::getRefCount).max(); 2418 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2419 } 2420 2421 @Override 2422 public long getMemstoreOnlyRowReadsCount() { 2423 return memstoreOnlyRowReadsCount.sum(); 2424 } 2425 2426 @Override 2427 public long getMixedRowReadsCount() { 2428 return mixedRowReadsCount.sum(); 2429 } 2430 2431 @Override 2432 public Configuration getReadOnlyConfiguration() { 2433 return new ReadOnlyConfiguration(this.conf); 2434 } 2435 2436 void updateMetricsStore(boolean memstoreRead) { 2437 if (memstoreRead) { 2438 memstoreOnlyRowReadsCount.increment(); 2439 } else { 2440 mixedRowReadsCount.increment(); 2441 } 2442 } 2443 2444 /** 2445 * Return the storefiles which are currently being written to. Mainly used by 2446 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2447 * SFT yet. 2448 */ 2449 public Set<Path> getStoreFilesBeingWritten() { 2450 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2451 .collect(Collectors.toSet()); 2452 } 2453 2454 @Override 2455 public long getBloomFilterRequestsCount() { 2456 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2457 } 2458 2459 @Override 2460 public long getBloomFilterNegativeResultsCount() { 2461 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2462 } 2463 2464 @Override 2465 public long getBloomFilterEligibleRequestsCount() { 2466 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2467 } 2468}