001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.MemoryCompactionPolicy; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.backup.FailedArchiveException; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.conf.ConfigurationManager; 071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 072import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 073import org.apache.hadoop.hbase.io.HeapSize; 074import org.apache.hadoop.hbase.io.hfile.CacheConfig; 075import org.apache.hadoop.hbase.io.hfile.HFile; 076import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 077import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 078import org.apache.hadoop.hbase.io.hfile.HFileScanner; 079import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 080import org.apache.hadoop.hbase.monitoring.MonitoredTask; 081import org.apache.hadoop.hbase.quotas.RegionSizeStore; 082import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 083import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 084import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 086import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 087import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 088import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 089import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 090import org.apache.hadoop.hbase.security.EncryptionUtil; 091import org.apache.hadoop.hbase.security.User; 092import org.apache.hadoop.hbase.util.Bytes; 093import org.apache.hadoop.hbase.util.ClassSize; 094import org.apache.hadoop.hbase.util.CommonFSUtils; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.Pair; 097import org.apache.hadoop.hbase.util.ReflectionUtils; 098import org.apache.hadoop.util.StringUtils; 099import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 100import org.apache.yetus.audience.InterfaceAudience; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 107import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 108import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 110import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 111 112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 114 115/** 116 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 117 * which stretch backwards over time. 118 * <p> 119 * There's no reason to consider append-logging at this level; all logging and locking is handled at 120 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 121 * important of those services is compaction services where files are aggregated once they pass a 122 * configurable threshold. 123 * <p> 124 * Locking and transactions are handled at a higher level. This API should not be called directly 125 * but by an HRegion manager. 126 */ 127@InterfaceAudience.Private 128public class HStore 129 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 130 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 131 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 132 "hbase.server.compactchecker.interval.multiplier"; 133 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 134 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 135 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 136 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 137 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 138 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 139 140 // HBASE-24428 : Update compaction priority for recently split daughter regions 141 // so as to prioritize their compaction. 142 // Any compaction candidate with higher priority than compaction of newly split daugher regions 143 // should have priority value < (Integer.MIN_VALUE + 1000) 144 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 145 146 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 147 148 protected final MemStore memstore; 149 // This stores directory in the filesystem. 150 private final HRegion region; 151 protected Configuration conf; 152 private long lastCompactSize = 0; 153 volatile boolean forceMajor = false; 154 private AtomicLong storeSize = new AtomicLong(); 155 private AtomicLong totalUncompressedBytes = new AtomicLong(); 156 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 157 // rows that has cells from both memstore and files (or only files) 158 private LongAdder mixedRowReadsCount = new LongAdder(); 159 160 /** 161 * Lock specific to archiving compacted store files. This avoids races around the combination of 162 * retrieving the list of compacted files and moving them to the archive directory. Since this is 163 * usually a background process (other than on close), we don't want to handle this with the store 164 * write lock, which would block readers and degrade performance. Locked by: - 165 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 166 */ 167 final ReentrantLock archiveLock = new ReentrantLock(); 168 169 private final boolean verifyBulkLoads; 170 171 /** 172 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 173 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 174 * message that identifies the Store experience this high-level of concurrency. 175 */ 176 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 177 private final int parallelPutCountPrintThreshold; 178 179 private ScanInfo scanInfo; 180 181 // All access must be synchronized. 182 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 183 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 184 185 // All access must be synchronized. 186 private final Set<ChangedReadersObserver> changedReaderObservers = 187 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 188 189 private HFileDataBlockEncoder dataBlockEncoder; 190 191 final StoreEngine<?, ?, ?, ?> storeEngine; 192 193 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 194 private volatile OffPeakHours offPeakHours; 195 196 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 197 private int flushRetriesNumber; 198 private int pauseTime; 199 200 private long blockingFileCount; 201 private int compactionCheckMultiplier; 202 203 private AtomicLong flushedCellsCount = new AtomicLong(); 204 private AtomicLong compactedCellsCount = new AtomicLong(); 205 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 206 private AtomicLong flushedCellsSize = new AtomicLong(); 207 private AtomicLong flushedOutputFileSize = new AtomicLong(); 208 private AtomicLong compactedCellsSize = new AtomicLong(); 209 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 210 211 private final StoreContext storeContext; 212 213 // Used to track the store files which are currently being written. For compaction, if we want to 214 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 215 // track the store files being written when flushing. 216 // Notice that the creation is in the background compaction or flush thread and we will get the 217 // files in other thread, so it needs to be thread safe. 218 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 219 220 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 221 222 @Override 223 public void accept(Path t) { 224 files.add(t); 225 } 226 227 public Set<Path> get() { 228 return Collections.unmodifiableSet(files); 229 } 230 } 231 232 // We may have multiple compaction running at the same time, and flush can also happen at the same 233 // time, so here we need to use a collection, and the collection needs to be thread safe. 234 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 235 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 236 // IdentityHashMap if later we want to implement hashCode or equals. 237 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 238 Collections.newSetFromMap(new ConcurrentHashMap<>()); 239 240 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 241 // the broken store files under the data directory, which means we do not need to track the store 242 // file writer creation. So here we abstract a factory to return different trackers for different 243 // SFT implementations. 244 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 245 246 /** 247 * Constructor 248 * @param family HColumnDescriptor for this column 249 * @param confParam configuration object failed. Can be null. 250 */ 251 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 252 final Configuration confParam, boolean warmup) throws IOException { 253 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 254 255 this.region = region; 256 this.storeContext = initializeStoreContext(family); 257 258 // Assemble the store's home directory and Ensure it exists. 259 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 260 261 // set block storage policy for store directory 262 String policyName = family.getStoragePolicy(); 263 if (null == policyName) { 264 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 265 } 266 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 267 268 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 269 270 // used by ScanQueryMatcher 271 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 272 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 273 // Get TTL 274 long ttl = determineTTLFromFamily(family); 275 // Why not just pass a HColumnDescriptor in here altogether? Even if have 276 // to clone it? 277 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, region.getCellComparator()); 278 this.memstore = getMemstore(); 279 280 this.offPeakHours = OffPeakHours.getInstance(conf); 281 282 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 283 284 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 285 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 286 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 287 if (this.compactionCheckMultiplier <= 0) { 288 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 289 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 290 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 291 } 292 293 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 294 storeEngine.initialize(warmup); 295 // if require writing to tmp dir first, then we just return null, which indicate that we do not 296 // need to track the creation of store file writer, otherwise we return a new 297 // StoreFileWriterCreationTracker. 298 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 299 ? () -> null 300 : () -> new StoreFileWriterCreationTracker(); 301 refreshStoreSizeAndTotalBytes(); 302 303 flushRetriesNumber = 304 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 305 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 306 if (flushRetriesNumber <= 0) { 307 throw new IllegalArgumentException( 308 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 309 } 310 311 int confPrintThreshold = 312 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 313 if (confPrintThreshold < 10) { 314 confPrintThreshold = 10; 315 } 316 this.parallelPutCountPrintThreshold = confPrintThreshold; 317 318 LOG.info( 319 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 320 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 321 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 322 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 323 } 324 325 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 326 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 327 .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family)) 328 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 329 .withCellComparator(region.getCellComparator()).withColumnFamilyDescriptor(family) 330 .withCompactedFilesSupplier(this::getCompactedFiles) 331 .withRegionFileSystem(region.getRegionFileSystem()) 332 .withFavoredNodesSupplier(this::getFavoredNodes) 333 .withFamilyStoreDirectoryPath( 334 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 335 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 336 } 337 338 private InetSocketAddress[] getFavoredNodes() { 339 InetSocketAddress[] favoredNodes = null; 340 if (region.getRegionServerServices() != null) { 341 favoredNodes = region.getRegionServerServices() 342 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 343 } 344 return favoredNodes; 345 } 346 347 /** Returns MemStore Instance to use in this store. */ 348 private MemStore getMemstore() { 349 MemStore ms = null; 350 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 351 MemoryCompactionPolicy inMemoryCompaction = null; 352 if (this.getTableName().isSystemTable()) { 353 inMemoryCompaction = MemoryCompactionPolicy 354 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 355 } else { 356 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 357 } 358 if (inMemoryCompaction == null) { 359 inMemoryCompaction = 360 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 361 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 362 } 363 364 switch (inMemoryCompaction) { 365 case NONE: 366 Class<? extends MemStore> memStoreClass = 367 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 368 ms = ReflectionUtils.newInstance(memStoreClass, 369 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 370 break; 371 default: 372 Class<? extends CompactingMemStore> compactingMemStoreClass = 373 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 374 ms = 375 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 376 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 377 } 378 return ms; 379 } 380 381 /** 382 * Creates the cache config. 383 * @param family The current column family. 384 */ 385 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 386 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 387 region.getRegionServicesForStores().getByteBuffAllocator()); 388 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 389 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 390 return cacheConf; 391 } 392 393 /** 394 * Creates the store engine configured for the given Store. 395 * @param store The store. An unfortunate dependency needed due to it being passed to 396 * coprocessors via the compactor. 397 * @param conf Store configuration. 398 * @param kvComparator KVComparator for storeFileManager. 399 * @return StoreEngine to use. 400 */ 401 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 402 CellComparator kvComparator) throws IOException { 403 return StoreEngine.create(store, conf, kvComparator); 404 } 405 406 /** Returns TTL in seconds of the specified family */ 407 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 408 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 409 long ttl = family.getTimeToLive(); 410 if (ttl == HConstants.FOREVER) { 411 // Default is unlimited ttl. 412 ttl = Long.MAX_VALUE; 413 } else if (ttl == -1) { 414 ttl = Long.MAX_VALUE; 415 } else { 416 // Second -> ms adjust for user data 417 ttl *= 1000; 418 } 419 return ttl; 420 } 421 422 StoreContext getStoreContext() { 423 return storeContext; 424 } 425 426 @Override 427 public String getColumnFamilyName() { 428 return this.storeContext.getFamily().getNameAsString(); 429 } 430 431 @Override 432 public TableName getTableName() { 433 return this.getRegionInfo().getTable(); 434 } 435 436 @Override 437 public FileSystem getFileSystem() { 438 return storeContext.getRegionFileSystem().getFileSystem(); 439 } 440 441 public HRegionFileSystem getRegionFileSystem() { 442 return storeContext.getRegionFileSystem(); 443 } 444 445 /* Implementation of StoreConfigInformation */ 446 @Override 447 public long getStoreFileTtl() { 448 // TTL only applies if there's no MIN_VERSIONs setting on the column. 449 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 450 } 451 452 @Override 453 public long getMemStoreFlushSize() { 454 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 455 return this.region.memstoreFlushSize; 456 } 457 458 @Override 459 public MemStoreSize getFlushableSize() { 460 return this.memstore.getFlushableSize(); 461 } 462 463 @Override 464 public MemStoreSize getSnapshotSize() { 465 return this.memstore.getSnapshotSize(); 466 } 467 468 @Override 469 public long getCompactionCheckMultiplier() { 470 return this.compactionCheckMultiplier; 471 } 472 473 @Override 474 public long getBlockingFileCount() { 475 return blockingFileCount; 476 } 477 /* End implementation of StoreConfigInformation */ 478 479 @Override 480 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 481 return this.storeContext.getFamily(); 482 } 483 484 @Override 485 public OptionalLong getMaxSequenceId() { 486 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 487 } 488 489 @Override 490 public OptionalLong getMaxMemStoreTS() { 491 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 492 } 493 494 /** Returns the data block encoder */ 495 public HFileDataBlockEncoder getDataBlockEncoder() { 496 return dataBlockEncoder; 497 } 498 499 /** 500 * Should be used only in tests. 501 * @param blockEncoder the block delta encoder to use 502 */ 503 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 504 this.dataBlockEncoder = blockEncoder; 505 } 506 507 private void postRefreshStoreFiles() throws IOException { 508 // Advance the memstore read point to be at least the new store files seqIds so that 509 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 510 // in-flight transactions might be made visible) 511 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 512 refreshStoreSizeAndTotalBytes(); 513 } 514 515 @Override 516 public void refreshStoreFiles() throws IOException { 517 storeEngine.refreshStoreFiles(); 518 postRefreshStoreFiles(); 519 } 520 521 /** 522 * Replaces the store files that the store has with the given files. Mainly used by secondary 523 * region replicas to keep up to date with the primary region files. 524 */ 525 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 526 storeEngine.refreshStoreFiles(newFiles); 527 postRefreshStoreFiles(); 528 } 529 530 /** 531 * This message intends to inform the MemStore that next coming updates are going to be part of 532 * the replaying edits from WAL 533 */ 534 public void startReplayingFromWAL() { 535 this.memstore.startReplayingFromWAL(); 536 } 537 538 /** 539 * This message intends to inform the MemStore that the replaying edits from WAL are done 540 */ 541 public void stopReplayingFromWAL() { 542 this.memstore.stopReplayingFromWAL(); 543 } 544 545 /** 546 * Adds a value to the memstore 547 */ 548 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 549 storeEngine.readLock(); 550 try { 551 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 552 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 553 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 554 } 555 this.memstore.add(cell, memstoreSizing); 556 } finally { 557 storeEngine.readUnlock(); 558 currentParallelPutCount.decrementAndGet(); 559 } 560 } 561 562 /** 563 * Adds the specified value to the memstore 564 */ 565 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 566 storeEngine.readLock(); 567 try { 568 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 569 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 570 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 571 } 572 memstore.add(cells, memstoreSizing); 573 } finally { 574 storeEngine.readUnlock(); 575 currentParallelPutCount.decrementAndGet(); 576 } 577 } 578 579 @Override 580 public long timeOfOldestEdit() { 581 return memstore.timeOfOldestEdit(); 582 } 583 584 /** Returns All store files. */ 585 @Override 586 public Collection<HStoreFile> getStorefiles() { 587 return this.storeEngine.getStoreFileManager().getStorefiles(); 588 } 589 590 @Override 591 public Collection<HStoreFile> getCompactedFiles() { 592 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 593 } 594 595 /** 596 * This throws a WrongRegionException if the HFile does not fit in this region, or an 597 * InvalidHFileException if the HFile is not valid. 598 */ 599 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 600 HFile.Reader reader = null; 601 try { 602 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 603 FileSystem srcFs = srcPath.getFileSystem(conf); 604 srcFs.access(srcPath, FsAction.READ_WRITE); 605 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 606 607 Optional<byte[]> firstKey = reader.getFirstRowKey(); 608 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 609 Optional<Cell> lk = reader.getLastKey(); 610 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 611 byte[] lastKey = CellUtil.cloneRow(lk.get()); 612 613 if (LOG.isDebugEnabled()) { 614 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 615 + Bytes.toStringBinary(lastKey)); 616 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 617 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 618 } 619 620 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 621 throw new WrongRegionException("Bulk load file " + srcPath.toString() 622 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 623 } 624 625 if ( 626 reader.length() 627 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 628 ) { 629 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 630 + " bytes can be problematic as it may lead to oversplitting."); 631 } 632 633 if (verifyBulkLoads) { 634 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 635 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 636 Cell prevCell = null; 637 HFileScanner scanner = reader.getScanner(conf, false, false, false); 638 scanner.seekTo(); 639 do { 640 Cell cell = scanner.getCell(); 641 if (prevCell != null) { 642 if (getComparator().compareRows(prevCell, cell) > 0) { 643 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 644 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 645 + CellUtil.getCellKeyAsString(cell)); 646 } 647 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 648 throw new InvalidHFileException("Previous key had different" 649 + " family compared to current key: path=" + srcPath + " previous=" 650 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 651 prevCell.getFamilyLength()) 652 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 653 cell.getFamilyLength())); 654 } 655 } 656 prevCell = cell; 657 } while (scanner.next()); 658 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 659 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 660 } 661 } finally { 662 if (reader != null) { 663 reader.close(); 664 } 665 } 666 } 667 668 /** 669 * This method should only be called from Region. It is assumed that the ranges of values in the 670 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 671 * @param seqNum sequence Id associated with the HFile 672 */ 673 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 674 Path srcPath = new Path(srcPathStr); 675 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 676 } 677 678 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 679 Path srcPath = new Path(srcPathStr); 680 try { 681 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 682 } finally { 683 if (this.getCoprocessorHost() != null) { 684 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 685 } 686 } 687 688 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 689 + " - updating store file list."); 690 691 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 692 bulkLoadHFile(sf); 693 694 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 695 696 return dstPath; 697 } 698 699 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 700 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 701 bulkLoadHFile(sf); 702 } 703 704 private void bulkLoadHFile(HStoreFile sf) throws IOException { 705 StoreFileReader r = sf.getReader(); 706 this.storeSize.addAndGet(r.length()); 707 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 708 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 709 }); 710 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 711 if (LOG.isTraceEnabled()) { 712 String traceMessage = "BULK LOAD time,size,store size,store files [" 713 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 714 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 715 LOG.trace(traceMessage); 716 } 717 } 718 719 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 720 // Clear so metrics doesn't find them. 721 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 722 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 723 // clear the compacted files 724 if (CollectionUtils.isNotEmpty(compactedfiles)) { 725 removeCompactedfiles(compactedfiles, 726 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 727 } 728 if (!result.isEmpty()) { 729 // initialize the thread pool for closing store files in parallel. 730 ThreadPoolExecutor storeFileCloserThreadPool = 731 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 732 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 733 734 // close each store file in parallel 735 CompletionService<Void> completionService = 736 new ExecutorCompletionService<>(storeFileCloserThreadPool); 737 for (HStoreFile f : result) { 738 completionService.submit(new Callable<Void>() { 739 @Override 740 public Void call() throws IOException { 741 boolean evictOnClose = 742 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 743 f.closeStoreFile(evictOnClose); 744 return null; 745 } 746 }); 747 } 748 749 IOException ioe = null; 750 try { 751 for (int i = 0; i < result.size(); i++) { 752 try { 753 Future<Void> future = completionService.take(); 754 future.get(); 755 } catch (InterruptedException e) { 756 if (ioe == null) { 757 ioe = new InterruptedIOException(); 758 ioe.initCause(e); 759 } 760 } catch (ExecutionException e) { 761 if (ioe == null) { 762 ioe = new IOException(e.getCause()); 763 } 764 } 765 } 766 } finally { 767 storeFileCloserThreadPool.shutdownNow(); 768 } 769 if (ioe != null) { 770 throw ioe; 771 } 772 } 773 LOG.trace("Closed {}", this); 774 return result; 775 } 776 777 /** 778 * Close all the readers We don't need to worry about subsequent requests because the Region holds 779 * a write lock that will prevent any more reads or writes. 780 * @return the {@link StoreFile StoreFiles} that were previously being used. 781 * @throws IOException on failure 782 */ 783 public ImmutableCollection<HStoreFile> close() throws IOException { 784 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 785 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 786 // Change later if findbugs becomes smarter in the future. 787 this.archiveLock.lock(); 788 try { 789 this.storeEngine.writeLock(); 790 try { 791 return closeWithoutLock(); 792 } finally { 793 this.storeEngine.writeUnlock(); 794 } 795 } finally { 796 this.archiveLock.unlock(); 797 } 798 } 799 800 /** 801 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 802 * previously. 803 * @param logCacheFlushId flush sequence number 804 * @return The path name of the tmp file to which the store was flushed 805 * @throws IOException if exception occurs during process 806 */ 807 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 808 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 809 Consumer<Path> writerCreationTracker) throws IOException { 810 // If an exception happens flushing, we let it out without clearing 811 // the memstore snapshot. The old snapshot will be returned when we say 812 // 'snapshot', the next time flush comes around. 813 // Retry after catching exception when flushing, otherwise server will abort 814 // itself 815 StoreFlusher flusher = storeEngine.getStoreFlusher(); 816 IOException lastException = null; 817 for (int i = 0; i < flushRetriesNumber; i++) { 818 try { 819 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 820 throughputController, tracker, writerCreationTracker); 821 Path lastPathName = null; 822 try { 823 for (Path pathName : pathNames) { 824 lastPathName = pathName; 825 storeEngine.validateStoreFile(pathName); 826 } 827 return pathNames; 828 } catch (Exception e) { 829 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 830 if (e instanceof IOException) { 831 lastException = (IOException) e; 832 } else { 833 lastException = new IOException(e); 834 } 835 } 836 } catch (IOException e) { 837 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 838 lastException = e; 839 } 840 if (lastException != null && i < (flushRetriesNumber - 1)) { 841 try { 842 Thread.sleep(pauseTime); 843 } catch (InterruptedException e) { 844 IOException iie = new InterruptedIOException(); 845 iie.initCause(e); 846 throw iie; 847 } 848 } 849 } 850 throw lastException; 851 } 852 853 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 854 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 855 FileSystem srcFs = path.getFileSystem(conf); 856 srcFs.access(path, FsAction.READ_WRITE); 857 try (HFile.Reader reader = 858 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 859 Optional<byte[]> firstKey = reader.getFirstRowKey(); 860 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 861 Optional<Cell> lk = reader.getLastKey(); 862 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 863 byte[] lastKey = CellUtil.cloneRow(lk.get()); 864 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 865 throw new WrongRegionException("Recovered hfile " + path.toString() 866 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 867 } 868 } 869 870 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 871 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 872 StoreFileReader r = sf.getReader(); 873 this.storeSize.addAndGet(r.length()); 874 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 875 876 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 877 }); 878 879 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 880 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 881 return sf; 882 } 883 884 private long getTotalSize(Collection<HStoreFile> sfs) { 885 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 886 } 887 888 private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException { 889 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 890 // close {@link DefaultMemStore#snapshot}, which may be used by 891 // {@link DefaultMemStore#getScanners}. 892 storeEngine.addStoreFiles(sfs, 893 snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> { 894 }); 895 // notify to be called here - only in case of flushes 896 notifyChangedReadersObservers(sfs); 897 if (LOG.isTraceEnabled()) { 898 long totalSize = getTotalSize(sfs); 899 String traceMessage = "FLUSH time,count,size,store size,store files [" 900 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 901 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 902 LOG.trace(traceMessage); 903 } 904 return needsCompaction(); 905 } 906 907 /** 908 * Notify all observers that set of Readers has changed. 909 */ 910 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 911 for (ChangedReadersObserver o : this.changedReaderObservers) { 912 List<KeyValueScanner> memStoreScanners; 913 this.storeEngine.readLock(); 914 try { 915 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 916 } finally { 917 this.storeEngine.readUnlock(); 918 } 919 o.updateReaders(sfs, memStoreScanners); 920 } 921 } 922 923 /** 924 * Get all scanners with no filtering based on TTL (that happens further down the line). 925 * @param cacheBlocks cache the blocks or not 926 * @param usePread true to use pread, false if not 927 * @param isCompaction true if the scanner is created for compaction 928 * @param matcher the scan query matcher 929 * @param startRow the start row 930 * @param stopRow the stop row 931 * @param readPt the read point of the current scan 932 * @return all scanners for this store 933 */ 934 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 935 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 936 throws IOException { 937 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 938 readPt); 939 } 940 941 /** 942 * Get all scanners with no filtering based on TTL (that happens further down the line). 943 * @param cacheBlocks cache the blocks or not 944 * @param usePread true to use pread, false if not 945 * @param isCompaction true if the scanner is created for compaction 946 * @param matcher the scan query matcher 947 * @param startRow the start row 948 * @param includeStartRow true to include start row, false if not 949 * @param stopRow the stop row 950 * @param includeStopRow true to include stop row, false if not 951 * @param readPt the read point of the current scan 952 * @return all scanners for this store 953 */ 954 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 955 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 956 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 957 Collection<HStoreFile> storeFilesToScan; 958 List<KeyValueScanner> memStoreScanners; 959 this.storeEngine.readLock(); 960 try { 961 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 962 includeStartRow, stopRow, includeStopRow); 963 memStoreScanners = this.memstore.getScanners(readPt); 964 } finally { 965 this.storeEngine.readUnlock(); 966 } 967 968 try { 969 // First the store file scanners 970 971 // TODO this used to get the store files in descending order, 972 // but now we get them in ascending order, which I think is 973 // actually more correct, since memstore get put at the end. 974 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 975 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 976 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 977 scanners.addAll(sfScanners); 978 // Then the memstore scanners 979 scanners.addAll(memStoreScanners); 980 return scanners; 981 } catch (Throwable t) { 982 clearAndClose(memStoreScanners); 983 throw t instanceof IOException ? (IOException) t : new IOException(t); 984 } 985 } 986 987 private static void clearAndClose(List<KeyValueScanner> scanners) { 988 if (scanners == null) { 989 return; 990 } 991 for (KeyValueScanner s : scanners) { 992 s.close(); 993 } 994 scanners.clear(); 995 } 996 997 /** 998 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 999 * (that happens further down the line). 1000 * @param files the list of files on which the scanners has to be created 1001 * @param cacheBlocks cache the blocks or not 1002 * @param usePread true to use pread, false if not 1003 * @param isCompaction true if the scanner is created for compaction 1004 * @param matcher the scan query matcher 1005 * @param startRow the start row 1006 * @param stopRow the stop row 1007 * @param readPt the read point of the current scan 1008 * @param includeMemstoreScanner true if memstore has to be included 1009 * @return scanners on the given files and on the memstore if specified 1010 */ 1011 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1012 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1013 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1014 throws IOException { 1015 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1016 false, readPt, includeMemstoreScanner); 1017 } 1018 1019 /** 1020 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1021 * (that happens further down the line). 1022 * @param files the list of files on which the scanners has to be created 1023 * @param cacheBlocks ache the blocks or not 1024 * @param usePread true to use pread, false if not 1025 * @param isCompaction true if the scanner is created for compaction 1026 * @param matcher the scan query matcher 1027 * @param startRow the start row 1028 * @param includeStartRow true to include start row, false if not 1029 * @param stopRow the stop row 1030 * @param includeStopRow true to include stop row, false if not 1031 * @param readPt the read point of the current scan 1032 * @param includeMemstoreScanner true if memstore has to be included 1033 * @return scanners on the given files and on the memstore if specified 1034 */ 1035 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1036 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1037 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1038 boolean includeMemstoreScanner) throws IOException { 1039 List<KeyValueScanner> memStoreScanners = null; 1040 if (includeMemstoreScanner) { 1041 this.storeEngine.readLock(); 1042 try { 1043 memStoreScanners = this.memstore.getScanners(readPt); 1044 } finally { 1045 this.storeEngine.readUnlock(); 1046 } 1047 } 1048 try { 1049 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1050 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1051 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1052 scanners.addAll(sfScanners); 1053 // Then the memstore scanners 1054 if (memStoreScanners != null) { 1055 scanners.addAll(memStoreScanners); 1056 } 1057 return scanners; 1058 } catch (Throwable t) { 1059 clearAndClose(memStoreScanners); 1060 throw t instanceof IOException ? (IOException) t : new IOException(t); 1061 } 1062 } 1063 1064 /** 1065 * @param o Observer who wants to know about changes in set of Readers 1066 */ 1067 public void addChangedReaderObserver(ChangedReadersObserver o) { 1068 this.changedReaderObservers.add(o); 1069 } 1070 1071 /** 1072 * @param o Observer no longer interested in changes in set of Readers. 1073 */ 1074 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1075 // We don't check if observer present; it may not be (legitimately) 1076 this.changedReaderObservers.remove(o); 1077 } 1078 1079 ////////////////////////////////////////////////////////////////////////////// 1080 // Compaction 1081 ////////////////////////////////////////////////////////////////////////////// 1082 1083 /** 1084 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1085 * block for long periods. 1086 * <p> 1087 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1088 * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted 1089 * StoreFile is completely written-out to disk. 1090 * <p> 1091 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1092 * interfering with other write operations. 1093 * <p> 1094 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1095 * we want to allow cache-flushes during this period. 1096 * <p> 1097 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1098 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1099 * is why the following events are carefully ordered for a compaction: 1. Compaction writes new 1100 * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the 1101 * temporary file under region directory 3. Compaction appends a WAL edit containing the 1102 * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files 1103 * from the region directory. Failure conditions are handled like this: - If RS fails before 2, 1104 * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only 1105 * write the new data file to the region directory. Since we already have this data, this will be 1106 * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the 1107 * region will have a redundant copy of the data. The RS that failed won't be able to finish 1108 * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region 1109 * server who opens the region will pick up the the compaction marker from the WAL and replay it 1110 * by removing the compaction input files. Failed RS can also attempt to delete those files, but 1111 * the operation will be idempotent See HBASE-2231 for details. 1112 * @param compaction compaction details obtained from requestCompaction() 1113 * @return Storefile we compacted into or null if we failed or opted out early. 1114 */ 1115 public List<HStoreFile> compact(CompactionContext compaction, 1116 ThroughputController throughputController, User user) throws IOException { 1117 assert compaction != null; 1118 CompactionRequestImpl cr = compaction.getRequest(); 1119 StoreFileWriterCreationTracker writerCreationTracker = 1120 storeFileWriterCreationTrackerFactory.get(); 1121 if (writerCreationTracker != null) { 1122 cr.setWriterCreationTracker(writerCreationTracker); 1123 storeFileWriterCreationTrackers.add(writerCreationTracker); 1124 } 1125 try { 1126 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1127 // because we need to clean up after it on the way out in a finally 1128 // block below 1129 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1130 assert compaction.hasSelection(); 1131 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1132 assert !filesToCompact.isEmpty(); 1133 synchronized (filesCompacting) { 1134 // sanity check: we're compacting files that this store knows about 1135 // TODO: change this to LOG.error() after more debugging 1136 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1137 } 1138 1139 // Ready to go. Have list of files to compact. 1140 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1141 + getRegionFileSystem().getTempDir() + ", totalSize=" 1142 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1143 1144 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1145 compaction.compact(throughputController, user)); 1146 } finally { 1147 finishCompactionRequest(cr); 1148 } 1149 } 1150 1151 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1152 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1153 throws IOException { 1154 // Do the steps necessary to complete the compaction. 1155 setStoragePolicyFromFileName(newFiles); 1156 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true); 1157 if (this.getCoprocessorHost() != null) { 1158 for (HStoreFile sf : sfs) { 1159 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1160 } 1161 } 1162 replaceStoreFiles(filesToCompact, sfs, true); 1163 1164 long outputBytes = getTotalSize(sfs); 1165 1166 // At this point the store will use new files for all new scanners. 1167 refreshStoreSizeAndTotalBytes(); // update store size. 1168 1169 long now = EnvironmentEdgeManager.currentTime(); 1170 if ( 1171 region.getRegionServerServices() != null 1172 && region.getRegionServerServices().getMetrics() != null 1173 ) { 1174 region.getRegionServerServices().getMetrics().updateCompaction( 1175 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1176 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1177 outputBytes); 1178 1179 } 1180 1181 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1182 return sfs; 1183 } 1184 1185 // Set correct storage policy from the file name of DTCP. 1186 // Rename file will not change the storage policy. 1187 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1188 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1189 for (Path newFile : newFiles) { 1190 if (newFile.getParent().getName().startsWith(prefix)) { 1191 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1192 newFile.getParent().getName().substring(prefix.length())); 1193 } 1194 } 1195 } 1196 1197 /** 1198 * Writes the compaction WAL record. 1199 * @param filesCompacted Files compacted (input). 1200 * @param newFiles Files from compaction. 1201 */ 1202 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1203 Collection<HStoreFile> newFiles) throws IOException { 1204 if (region.getWAL() == null) { 1205 return; 1206 } 1207 List<Path> inputPaths = 1208 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1209 List<Path> outputPaths = 1210 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1211 RegionInfo info = this.region.getRegionInfo(); 1212 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1213 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1214 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1215 // Fix reaching into Region to get the maxWaitForSeqId. 1216 // Does this method belong in Region altogether given it is making so many references up there? 1217 // Could be Region#writeCompactionMarker(compactionDescriptor); 1218 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1219 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1220 } 1221 1222 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1223 allowedOnPath = ".*/(HStore|TestHStore).java") 1224 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1225 boolean writeCompactionMarker) throws IOException { 1226 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1227 if (writeCompactionMarker) { 1228 writeCompactionWalRecord(compactedFiles, result); 1229 } 1230 }, () -> { 1231 synchronized (filesCompacting) { 1232 filesCompacting.removeAll(compactedFiles); 1233 } 1234 }); 1235 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1236 // sizes later so it's not super-critical if we miss these. 1237 RegionServerServices rsServices = region.getRegionServerServices(); 1238 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1239 updateSpaceQuotaAfterFileReplacement( 1240 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1241 compactedFiles, result); 1242 } 1243 } 1244 1245 /** 1246 * Updates the space quota usage for this region, removing the size for files compacted away and 1247 * adding in the size for new files. 1248 * @param sizeStore The object tracking changes in region size for space quotas. 1249 * @param regionInfo The identifier for the region whose size is being updated. 1250 * @param oldFiles Files removed from this store's region. 1251 * @param newFiles Files added to this store's region. 1252 */ 1253 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1254 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1255 long delta = 0; 1256 if (oldFiles != null) { 1257 for (HStoreFile compactedFile : oldFiles) { 1258 if (compactedFile.isHFile()) { 1259 delta -= compactedFile.getReader().length(); 1260 } 1261 } 1262 } 1263 if (newFiles != null) { 1264 for (HStoreFile newFile : newFiles) { 1265 if (newFile.isHFile()) { 1266 delta += newFile.getReader().length(); 1267 } 1268 } 1269 } 1270 sizeStore.incrementRegionSize(regionInfo, delta); 1271 } 1272 1273 /** 1274 * Log a very elaborate compaction completion message. 1275 * @param cr Request. 1276 * @param sfs Resulting files. 1277 * @param compactionStartTime Start time. 1278 */ 1279 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1280 long compactionStartTime) { 1281 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1282 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1283 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1284 if (sfs.isEmpty()) { 1285 message.append("none, "); 1286 } else { 1287 for (HStoreFile sf : sfs) { 1288 message.append(sf.getPath().getName()); 1289 message.append("(size="); 1290 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1291 message.append("), "); 1292 } 1293 } 1294 message.append("total size for store is ") 1295 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1296 .append(". This selection was in queue for ") 1297 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1298 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1299 .append(" to execute."); 1300 LOG.info(message.toString()); 1301 if (LOG.isTraceEnabled()) { 1302 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1303 long resultSize = getTotalSize(sfs); 1304 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1305 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1306 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1307 LOG.trace(traceMessage); 1308 } 1309 } 1310 1311 /** 1312 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1313 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1314 */ 1315 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1316 boolean removeFiles) throws IOException { 1317 LOG.debug("Completing compaction from the WAL marker"); 1318 List<String> compactionInputs = compaction.getCompactionInputList(); 1319 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1320 1321 // The Compaction Marker is written after the compaction is completed, 1322 // and the files moved into the region/family folder. 1323 // 1324 // If we crash after the entry is written, we may not have removed the 1325 // input files, but the output file is present. 1326 // (The unremoved input files will be removed by this function) 1327 // 1328 // If we scan the directory and the file is not present, it can mean that: 1329 // - The file was manually removed by the user 1330 // - The file was removed as consequence of subsequent compaction 1331 // so, we can't do anything with the "compaction output list" because those 1332 // files have already been loaded when opening the region (by virtue of 1333 // being in the store's folder) or they may be missing due to a compaction. 1334 1335 String familyName = this.getColumnFamilyName(); 1336 Set<String> inputFiles = new HashSet<>(); 1337 for (String compactionInput : compactionInputs) { 1338 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1339 inputFiles.add(inputPath.getName()); 1340 } 1341 1342 // some of the input files might already be deleted 1343 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1344 for (HStoreFile sf : this.getStorefiles()) { 1345 if (inputFiles.contains(sf.getPath().getName())) { 1346 inputStoreFiles.add(sf); 1347 } 1348 } 1349 1350 // check whether we need to pick up the new files 1351 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1352 1353 if (pickCompactionFiles) { 1354 for (HStoreFile sf : this.getStorefiles()) { 1355 compactionOutputs.remove(sf.getPath().getName()); 1356 } 1357 for (String compactionOutput : compactionOutputs) { 1358 StoreFileInfo storeFileInfo = 1359 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1360 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1361 outputStoreFiles.add(storeFile); 1362 } 1363 } 1364 1365 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1366 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1367 + " with output files : " + outputStoreFiles); 1368 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1369 this.refreshStoreSizeAndTotalBytes(); 1370 } 1371 } 1372 1373 @Override 1374 public boolean hasReferences() { 1375 // Grab the read lock here, because we need to ensure that: only when the atomic 1376 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1377 this.storeEngine.readLock(); 1378 try { 1379 // Merge the current store files with compacted files here due to HBASE-20940. 1380 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1381 allStoreFiles.addAll(getCompactedFiles()); 1382 return StoreUtils.hasReferences(allStoreFiles); 1383 } finally { 1384 this.storeEngine.readUnlock(); 1385 } 1386 } 1387 1388 /** 1389 * getter for CompactionProgress object 1390 * @return CompactionProgress object; can be null 1391 */ 1392 public CompactionProgress getCompactionProgress() { 1393 return this.storeEngine.getCompactor().getProgress(); 1394 } 1395 1396 @Override 1397 public boolean shouldPerformMajorCompaction() throws IOException { 1398 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1399 // TODO: what are these reader checks all over the place? 1400 if (sf.getReader() == null) { 1401 LOG.debug("StoreFile {} has null Reader", sf); 1402 return false; 1403 } 1404 } 1405 return storeEngine.getCompactionPolicy() 1406 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles()); 1407 } 1408 1409 public Optional<CompactionContext> requestCompaction() throws IOException { 1410 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1411 } 1412 1413 public Optional<CompactionContext> requestCompaction(int priority, 1414 CompactionLifeCycleTracker tracker, User user) throws IOException { 1415 // don't even select for compaction if writes are disabled 1416 if (!this.areWritesEnabled()) { 1417 return Optional.empty(); 1418 } 1419 // Before we do compaction, try to get rid of unneeded files to simplify things. 1420 removeUnneededFiles(); 1421 1422 final CompactionContext compaction = storeEngine.createCompaction(); 1423 CompactionRequestImpl request = null; 1424 this.storeEngine.readLock(); 1425 try { 1426 synchronized (filesCompacting) { 1427 // First, see if coprocessor would want to override selection. 1428 if (this.getCoprocessorHost() != null) { 1429 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1430 boolean override = 1431 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1432 if (override) { 1433 // Coprocessor is overriding normal file selection. 1434 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1435 } 1436 } 1437 1438 // Normal case - coprocessor is not overriding file selection. 1439 if (!compaction.hasSelection()) { 1440 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1441 boolean mayUseOffPeak = 1442 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1443 try { 1444 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1445 forceMajor && filesCompacting.isEmpty()); 1446 } catch (IOException e) { 1447 if (mayUseOffPeak) { 1448 offPeakCompactionTracker.set(false); 1449 } 1450 throw e; 1451 } 1452 assert compaction.hasSelection(); 1453 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1454 // Compaction policy doesn't want to take advantage of off-peak. 1455 offPeakCompactionTracker.set(false); 1456 } 1457 } 1458 if (this.getCoprocessorHost() != null) { 1459 this.getCoprocessorHost().postCompactSelection(this, 1460 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1461 compaction.getRequest(), user); 1462 } 1463 // Finally, we have the resulting files list. Check if we have any files at all. 1464 request = compaction.getRequest(); 1465 Collection<HStoreFile> selectedFiles = request.getFiles(); 1466 if (selectedFiles.isEmpty()) { 1467 return Optional.empty(); 1468 } 1469 1470 addToCompactingFiles(selectedFiles); 1471 1472 // If we're enqueuing a major, clear the force flag. 1473 this.forceMajor = this.forceMajor && !request.isMajor(); 1474 1475 // Set common request properties. 1476 // Set priority, either override value supplied by caller or from store. 1477 final int compactionPriority = 1478 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1479 request.setPriority(compactionPriority); 1480 1481 if (request.isAfterSplit()) { 1482 // If the store belongs to recently splitted daughter regions, better we consider 1483 // them with the higher priority in the compaction queue. 1484 // Override priority if it is lower (higher int value) than 1485 // SPLIT_REGION_COMPACTION_PRIORITY 1486 final int splitHousekeepingPriority = 1487 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1488 request.setPriority(splitHousekeepingPriority); 1489 LOG.info( 1490 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1491 + " belongs to recently split daughter region {}", 1492 splitHousekeepingPriority, this.getColumnFamilyName(), 1493 getRegionInfo().getRegionNameAsString()); 1494 } 1495 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1496 request.setTracker(tracker); 1497 } 1498 } finally { 1499 this.storeEngine.readUnlock(); 1500 } 1501 1502 if (LOG.isDebugEnabled()) { 1503 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1504 + (request.isAllFiles() ? " (all files)" : "")); 1505 } 1506 this.region.reportCompactionRequestStart(request.isMajor()); 1507 return Optional.of(compaction); 1508 } 1509 1510 /** Adds the files to compacting files. filesCompacting must be locked. */ 1511 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1512 if (CollectionUtils.isEmpty(filesToAdd)) { 1513 return; 1514 } 1515 // Check that we do not try to compact the same StoreFile twice. 1516 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1517 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1518 } 1519 filesCompacting.addAll(filesToAdd); 1520 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1521 } 1522 1523 private void removeUnneededFiles() throws IOException { 1524 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1525 return; 1526 } 1527 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1528 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1529 getColumnFamilyDescriptor().getMinVersions()); 1530 return; 1531 } 1532 this.storeEngine.readLock(); 1533 Collection<HStoreFile> delSfs = null; 1534 try { 1535 synchronized (filesCompacting) { 1536 long cfTtl = getStoreFileTtl(); 1537 if (cfTtl != Long.MAX_VALUE) { 1538 delSfs = storeEngine.getStoreFileManager() 1539 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1540 addToCompactingFiles(delSfs); 1541 } 1542 } 1543 } finally { 1544 this.storeEngine.readUnlock(); 1545 } 1546 1547 if (CollectionUtils.isEmpty(delSfs)) { 1548 return; 1549 } 1550 1551 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1552 replaceStoreFiles(delSfs, newFiles, true); 1553 refreshStoreSizeAndTotalBytes(); 1554 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1555 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1556 } 1557 1558 public void cancelRequestedCompaction(CompactionContext compaction) { 1559 finishCompactionRequest(compaction.getRequest()); 1560 } 1561 1562 protected void finishCompactionRequest(CompactionRequestImpl cr) { 1563 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1564 if (cr.isOffPeak()) { 1565 offPeakCompactionTracker.set(false); 1566 cr.setOffPeak(false); 1567 } 1568 synchronized (filesCompacting) { 1569 filesCompacting.removeAll(cr.getFiles()); 1570 } 1571 // The tracker could be null, for example, we do not need to track the creation of store file 1572 // writer due to different implementation of SFT, or the compaction is canceled. 1573 if (cr.getWriterCreationTracker() != null) { 1574 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1575 } 1576 } 1577 1578 /** 1579 * Update counts. 1580 */ 1581 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1582 this.storeSize.set(0L); 1583 this.totalUncompressedBytes.set(0L); 1584 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1585 StoreFileReader r = hsf.getReader(); 1586 if (r == null) { 1587 LOG.warn("StoreFile {} has a null Reader", hsf); 1588 continue; 1589 } 1590 this.storeSize.addAndGet(r.length()); 1591 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1592 } 1593 } 1594 1595 /* 1596 * @param wantedVersions How many versions were asked for. 1597 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1598 */ 1599 int versionsToReturn(final int wantedVersions) { 1600 if (wantedVersions <= 0) { 1601 throw new IllegalArgumentException("Number of versions must be > 0"); 1602 } 1603 // Make sure we do not return more than maximum versions for this store. 1604 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1605 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1606 } 1607 1608 @Override 1609 public boolean canSplit() { 1610 // Not split-able if we find a reference store file present in the store. 1611 boolean result = !hasReferences(); 1612 if (!result) { 1613 LOG.trace("Not splittable; has references: {}", this); 1614 } 1615 return result; 1616 } 1617 1618 /** 1619 * Determines if Store should be split. 1620 */ 1621 public Optional<byte[]> getSplitPoint() { 1622 this.storeEngine.readLock(); 1623 try { 1624 // Should already be enforced by the split policy! 1625 assert !this.getRegionInfo().isMetaRegion(); 1626 // Not split-able if we find a reference store file present in the store. 1627 if (hasReferences()) { 1628 LOG.trace("Not splittable; has references: {}", this); 1629 return Optional.empty(); 1630 } 1631 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1632 } catch (IOException e) { 1633 LOG.warn("Failed getting store size for {}", this, e); 1634 } finally { 1635 this.storeEngine.readUnlock(); 1636 } 1637 return Optional.empty(); 1638 } 1639 1640 @Override 1641 public long getLastCompactSize() { 1642 return this.lastCompactSize; 1643 } 1644 1645 @Override 1646 public long getSize() { 1647 return storeSize.get(); 1648 } 1649 1650 public void triggerMajorCompaction() { 1651 this.forceMajor = true; 1652 } 1653 1654 ////////////////////////////////////////////////////////////////////////////// 1655 // File administration 1656 ////////////////////////////////////////////////////////////////////////////// 1657 1658 /** 1659 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1660 * compaction. 1661 * @param scan Scan to apply when scanning the stores 1662 * @param targetCols columns to scan 1663 * @return a scanner over the current key values 1664 * @throws IOException on failure 1665 */ 1666 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1667 throws IOException { 1668 storeEngine.readLock(); 1669 try { 1670 ScanInfo scanInfo; 1671 if (this.getCoprocessorHost() != null) { 1672 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1673 } else { 1674 scanInfo = getScanInfo(); 1675 } 1676 return createScanner(scan, scanInfo, targetCols, readPt); 1677 } finally { 1678 storeEngine.readUnlock(); 1679 } 1680 } 1681 1682 // HMobStore will override this method to return its own implementation. 1683 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1684 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1685 return scan.isReversed() 1686 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1687 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1688 } 1689 1690 /** 1691 * Recreates the scanners on the current list of active store file scanners 1692 * @param currentFileScanners the current set of active store file scanners 1693 * @param cacheBlocks cache the blocks or not 1694 * @param usePread use pread or not 1695 * @param isCompaction is the scanner for compaction 1696 * @param matcher the scan query matcher 1697 * @param startRow the scan's start row 1698 * @param includeStartRow should the scan include the start row 1699 * @param stopRow the scan's stop row 1700 * @param includeStopRow should the scan include the stop row 1701 * @param readPt the read point of the current scane 1702 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1703 * @return list of scanners recreated on the current Scanners 1704 */ 1705 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1706 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1707 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1708 boolean includeMemstoreScanner) throws IOException { 1709 this.storeEngine.readLock(); 1710 try { 1711 Map<String, HStoreFile> name2File = 1712 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1713 for (HStoreFile file : getStorefiles()) { 1714 name2File.put(file.getFileInfo().getActiveFileName(), file); 1715 } 1716 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1717 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1718 name2File.put(file.getFileInfo().getActiveFileName(), file); 1719 } 1720 List<HStoreFile> filesToReopen = new ArrayList<>(); 1721 for (KeyValueScanner kvs : currentFileScanners) { 1722 assert kvs.isFileScanner(); 1723 if (kvs.peek() == null) { 1724 continue; 1725 } 1726 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1727 } 1728 if (filesToReopen.isEmpty()) { 1729 return null; 1730 } 1731 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1732 includeStartRow, stopRow, includeStopRow, readPt, false); 1733 } finally { 1734 this.storeEngine.readUnlock(); 1735 } 1736 } 1737 1738 @Override 1739 public String toString() { 1740 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1741 } 1742 1743 @Override 1744 public int getStorefilesCount() { 1745 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1746 } 1747 1748 @Override 1749 public int getCompactedFilesCount() { 1750 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1751 } 1752 1753 private LongStream getStoreFileAgeStream() { 1754 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 1755 if (sf.getReader() == null) { 1756 LOG.warn("StoreFile {} has a null Reader", sf); 1757 return false; 1758 } else { 1759 return true; 1760 } 1761 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1762 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1763 } 1764 1765 @Override 1766 public OptionalLong getMaxStoreFileAge() { 1767 return getStoreFileAgeStream().max(); 1768 } 1769 1770 @Override 1771 public OptionalLong getMinStoreFileAge() { 1772 return getStoreFileAgeStream().min(); 1773 } 1774 1775 @Override 1776 public OptionalDouble getAvgStoreFileAge() { 1777 return getStoreFileAgeStream().average(); 1778 } 1779 1780 @Override 1781 public long getNumReferenceFiles() { 1782 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1783 .filter(HStoreFile::isReference).count(); 1784 } 1785 1786 @Override 1787 public long getNumHFiles() { 1788 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1789 .filter(HStoreFile::isHFile).count(); 1790 } 1791 1792 @Override 1793 public long getStoreSizeUncompressed() { 1794 return this.totalUncompressedBytes.get(); 1795 } 1796 1797 @Override 1798 public long getStorefilesSize() { 1799 // Include all StoreFiles 1800 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 1801 sf -> true); 1802 } 1803 1804 @Override 1805 public long getHFilesSize() { 1806 // Include only StoreFiles which are HFiles 1807 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 1808 HStoreFile::isHFile); 1809 } 1810 1811 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1812 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 1813 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1814 } 1815 1816 @Override 1817 public long getStorefilesRootLevelIndexSize() { 1818 return getStorefilesFieldSize(StoreFileReader::indexSize); 1819 } 1820 1821 @Override 1822 public long getTotalStaticIndexSize() { 1823 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1824 } 1825 1826 @Override 1827 public long getTotalStaticBloomSize() { 1828 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1829 } 1830 1831 @Override 1832 public MemStoreSize getMemStoreSize() { 1833 return this.memstore.size(); 1834 } 1835 1836 @Override 1837 public int getCompactPriority() { 1838 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1839 if (priority == PRIORITY_USER) { 1840 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1841 } 1842 return priority; 1843 } 1844 1845 public boolean throttleCompaction(long compactionSize) { 1846 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1847 } 1848 1849 public HRegion getHRegion() { 1850 return this.region; 1851 } 1852 1853 public RegionCoprocessorHost getCoprocessorHost() { 1854 return this.region.getCoprocessorHost(); 1855 } 1856 1857 @Override 1858 public RegionInfo getRegionInfo() { 1859 return getRegionFileSystem().getRegionInfo(); 1860 } 1861 1862 @Override 1863 public boolean areWritesEnabled() { 1864 return this.region.areWritesEnabled(); 1865 } 1866 1867 @Override 1868 public long getSmallestReadPoint() { 1869 return this.region.getSmallestReadPoint(); 1870 } 1871 1872 /** 1873 * Adds or replaces the specified KeyValues. 1874 * <p> 1875 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1876 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1877 * <p> 1878 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1879 * across all of them. 1880 * @param readpoint readpoint below which we can safely remove duplicate KVs 1881 */ 1882 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 1883 throws IOException { 1884 this.storeEngine.readLock(); 1885 try { 1886 this.memstore.upsert(cells, readpoint, memstoreSizing); 1887 } finally { 1888 this.storeEngine.readUnlock(); 1889 } 1890 } 1891 1892 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1893 return new StoreFlusherImpl(cacheFlushId, tracker); 1894 } 1895 1896 private final class StoreFlusherImpl implements StoreFlushContext { 1897 1898 private final FlushLifeCycleTracker tracker; 1899 private final StoreFileWriterCreationTracker writerCreationTracker; 1900 private final long cacheFlushSeqNum; 1901 private MemStoreSnapshot snapshot; 1902 private List<Path> tempFiles; 1903 private List<Path> committedFiles; 1904 private long cacheFlushCount; 1905 private long cacheFlushSize; 1906 private long outputFileSize; 1907 1908 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1909 this.cacheFlushSeqNum = cacheFlushSeqNum; 1910 this.tracker = tracker; 1911 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1912 } 1913 1914 /** 1915 * This is not thread safe. The caller should have a lock on the region or the store. If 1916 * necessary, the lock can be added with the patch provided in HBASE-10087 1917 */ 1918 @Override 1919 public MemStoreSize prepare() { 1920 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1921 this.snapshot = memstore.snapshot(); 1922 this.cacheFlushCount = snapshot.getCellsCount(); 1923 this.cacheFlushSize = snapshot.getDataSize(); 1924 committedFiles = new ArrayList<>(1); 1925 return snapshot.getMemStoreSize(); 1926 } 1927 1928 @Override 1929 public void flushCache(MonitoredTask status) throws IOException { 1930 RegionServerServices rsService = region.getRegionServerServices(); 1931 ThroughputController throughputController = 1932 rsService == null ? null : rsService.getFlushThroughputController(); 1933 // it could be null if we do not need to track the creation of store file writer due to 1934 // different SFT implementation. 1935 if (writerCreationTracker != null) { 1936 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1937 } 1938 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1939 tracker, writerCreationTracker); 1940 } 1941 1942 @Override 1943 public boolean commit(MonitoredTask status) throws IOException { 1944 try { 1945 if (CollectionUtils.isEmpty(this.tempFiles)) { 1946 return false; 1947 } 1948 status.setStatus("Flushing " + this + ": reopening flushed file"); 1949 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false); 1950 for (HStoreFile sf : storeFiles) { 1951 StoreFileReader r = sf.getReader(); 1952 if (LOG.isInfoEnabled()) { 1953 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 1954 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1955 } 1956 outputFileSize += r.length(); 1957 storeSize.addAndGet(r.length()); 1958 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1959 committedFiles.add(sf.getPath()); 1960 } 1961 1962 flushedCellsCount.addAndGet(cacheFlushCount); 1963 flushedCellsSize.addAndGet(cacheFlushSize); 1964 flushedOutputFileSize.addAndGet(outputFileSize); 1965 // call coprocessor after we have done all the accounting above 1966 for (HStoreFile sf : storeFiles) { 1967 if (getCoprocessorHost() != null) { 1968 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 1969 } 1970 } 1971 // Add new file to store files. Clear snapshot too while we have the Store write lock. 1972 return completeFlush(storeFiles, snapshot.getId()); 1973 } finally { 1974 if (writerCreationTracker != null) { 1975 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 1976 } 1977 } 1978 } 1979 1980 @Override 1981 public long getOutputFileSize() { 1982 return outputFileSize; 1983 } 1984 1985 @Override 1986 public List<Path> getCommittedFiles() { 1987 return committedFiles; 1988 } 1989 1990 /** 1991 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 1992 * primary region. Adds the new files to the store, and drops the snapshot depending on 1993 * dropMemstoreSnapshot argument. 1994 * @param fileNames names of the flushed files 1995 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 1996 */ 1997 @Override 1998 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 1999 throws IOException { 2000 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2001 for (String file : fileNames) { 2002 // open the file as a store file (hfile link, etc) 2003 StoreFileInfo storeFileInfo = 2004 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); 2005 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2006 storeFiles.add(storeFile); 2007 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2008 HStore.this.totalUncompressedBytes 2009 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2010 if (LOG.isInfoEnabled()) { 2011 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2012 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2013 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2014 } 2015 } 2016 2017 long snapshotId = -1; // -1 means do not drop 2018 if (dropMemstoreSnapshot && snapshot != null) { 2019 snapshotId = snapshot.getId(); 2020 } 2021 HStore.this.completeFlush(storeFiles, snapshotId); 2022 } 2023 2024 /** 2025 * Abort the snapshot preparation. Drops the snapshot if any. 2026 */ 2027 @Override 2028 public void abort() throws IOException { 2029 if (snapshot != null) { 2030 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2031 } 2032 } 2033 } 2034 2035 @Override 2036 public boolean needsCompaction() { 2037 List<HStoreFile> filesCompactingClone = null; 2038 synchronized (filesCompacting) { 2039 filesCompactingClone = Lists.newArrayList(filesCompacting); 2040 } 2041 return this.storeEngine.needsCompaction(filesCompactingClone); 2042 } 2043 2044 /** 2045 * Used for tests. 2046 * @return cache configuration for this Store. 2047 */ 2048 public CacheConfig getCacheConfig() { 2049 return storeContext.getCacheConf(); 2050 } 2051 2052 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2053 2054 public static final long DEEP_OVERHEAD = ClassSize.align( 2055 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2056 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2057 2058 @Override 2059 public long heapSize() { 2060 MemStoreSize memstoreSize = this.memstore.size(); 2061 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2062 } 2063 2064 @Override 2065 public CellComparator getComparator() { 2066 return storeContext.getComparator(); 2067 } 2068 2069 public ScanInfo getScanInfo() { 2070 return scanInfo; 2071 } 2072 2073 /** 2074 * Set scan info, used by test 2075 * @param scanInfo new scan info to use for test 2076 */ 2077 void setScanInfo(ScanInfo scanInfo) { 2078 this.scanInfo = scanInfo; 2079 } 2080 2081 @Override 2082 public boolean hasTooManyStoreFiles() { 2083 return getStorefilesCount() > this.blockingFileCount; 2084 } 2085 2086 @Override 2087 public long getFlushedCellsCount() { 2088 return flushedCellsCount.get(); 2089 } 2090 2091 @Override 2092 public long getFlushedCellsSize() { 2093 return flushedCellsSize.get(); 2094 } 2095 2096 @Override 2097 public long getFlushedOutputFileSize() { 2098 return flushedOutputFileSize.get(); 2099 } 2100 2101 @Override 2102 public long getCompactedCellsCount() { 2103 return compactedCellsCount.get(); 2104 } 2105 2106 @Override 2107 public long getCompactedCellsSize() { 2108 return compactedCellsSize.get(); 2109 } 2110 2111 @Override 2112 public long getMajorCompactedCellsCount() { 2113 return majorCompactedCellsCount.get(); 2114 } 2115 2116 @Override 2117 public long getMajorCompactedCellsSize() { 2118 return majorCompactedCellsSize.get(); 2119 } 2120 2121 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2122 if (isMajor) { 2123 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2124 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2125 } else { 2126 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2127 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2128 } 2129 } 2130 2131 /** 2132 * Returns the StoreEngine that is backing this concrete implementation of Store. 2133 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2134 */ 2135 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2136 return this.storeEngine; 2137 } 2138 2139 protected OffPeakHours getOffPeakHours() { 2140 return this.offPeakHours; 2141 } 2142 2143 @Override 2144 public void onConfigurationChange(Configuration conf) { 2145 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2146 getColumnFamilyDescriptor()); 2147 this.conf = storeConf; 2148 this.storeEngine.compactionPolicy.setConf(storeConf); 2149 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2150 } 2151 2152 /** 2153 * {@inheritDoc} 2154 */ 2155 @Override 2156 public void registerChildren(ConfigurationManager manager) { 2157 // No children to register 2158 } 2159 2160 /** 2161 * {@inheritDoc} 2162 */ 2163 @Override 2164 public void deregisterChildren(ConfigurationManager manager) { 2165 // No children to deregister 2166 } 2167 2168 @Override 2169 public double getCompactionPressure() { 2170 return storeEngine.getStoreFileManager().getCompactionPressure(); 2171 } 2172 2173 @Override 2174 public boolean isPrimaryReplicaStore() { 2175 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2176 } 2177 2178 /** 2179 * Sets the store up for a region level snapshot operation. 2180 * @see #postSnapshotOperation() 2181 */ 2182 public void preSnapshotOperation() { 2183 archiveLock.lock(); 2184 } 2185 2186 /** 2187 * Perform tasks needed after the completion of snapshot operation. 2188 * @see #preSnapshotOperation() 2189 */ 2190 public void postSnapshotOperation() { 2191 archiveLock.unlock(); 2192 } 2193 2194 /** 2195 * Closes and archives the compacted files under this store 2196 */ 2197 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2198 // ensure other threads do not attempt to archive the same files on close() 2199 archiveLock.lock(); 2200 try { 2201 storeEngine.readLock(); 2202 Collection<HStoreFile> copyCompactedfiles = null; 2203 try { 2204 Collection<HStoreFile> compactedfiles = 2205 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2206 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2207 // Do a copy under read lock 2208 copyCompactedfiles = new ArrayList<>(compactedfiles); 2209 } else { 2210 LOG.trace("No compacted files to archive"); 2211 } 2212 } finally { 2213 storeEngine.readUnlock(); 2214 } 2215 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2216 removeCompactedfiles(copyCompactedfiles, true); 2217 } 2218 } finally { 2219 archiveLock.unlock(); 2220 } 2221 } 2222 2223 /** 2224 * Archives and removes the compacted files 2225 * @param compactedfiles The compacted files in this store that are not active in reads 2226 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2227 * closed, false if not 2228 */ 2229 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2230 throws IOException { 2231 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2232 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2233 for (final HStoreFile file : compactedfiles) { 2234 synchronized (file) { 2235 try { 2236 StoreFileReader r = file.getReader(); 2237 if (r == null) { 2238 LOG.debug("The file {} was closed but still not archived", file); 2239 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2240 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2241 // for that. 2242 long length = getStoreFileSize(file); 2243 filesToRemove.add(file); 2244 storeFileSizes.add(length); 2245 continue; 2246 } 2247 2248 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2249 // Even if deleting fails we need not bother as any new scanners won't be 2250 // able to use the compacted file as the status is already compactedAway 2251 LOG.trace("Closing and archiving the file {}", file); 2252 // Copy the file size before closing the reader 2253 final long length = r.length(); 2254 r.close(evictOnClose); 2255 // Just close and return 2256 filesToRemove.add(file); 2257 // Only add the length if we successfully added the file to `filesToRemove` 2258 storeFileSizes.add(length); 2259 } else { 2260 LOG.info("Can't archive compacted file " + file.getPath() 2261 + " because of either isCompactedAway=" + file.isCompactedAway() 2262 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2263 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2264 } 2265 } catch (Exception e) { 2266 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2267 e); 2268 } 2269 } 2270 } 2271 if (this.isPrimaryReplicaStore()) { 2272 // Only the primary region is allowed to move the file to archive. 2273 // The secondary region does not move the files to archive. Any active reads from 2274 // the secondary region will still work because the file as such has active readers on it. 2275 if (!filesToRemove.isEmpty()) { 2276 LOG.debug("Moving the files {} to archive", filesToRemove); 2277 // Only if this is successful it has to be removed 2278 try { 2279 getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2280 filesToRemove); 2281 } catch (FailedArchiveException fae) { 2282 // Even if archiving some files failed, we still need to clear out any of the 2283 // files which were successfully archived. Otherwise we will receive a 2284 // FileNotFoundException when we attempt to re-archive them in the next go around. 2285 Collection<Path> failedFiles = fae.getFailedFiles(); 2286 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2287 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2288 while (iter.hasNext()) { 2289 sizeIter.next(); 2290 if (failedFiles.contains(iter.next().getPath())) { 2291 iter.remove(); 2292 sizeIter.remove(); 2293 } 2294 } 2295 if (!filesToRemove.isEmpty()) { 2296 clearCompactedfiles(filesToRemove); 2297 } 2298 throw fae; 2299 } 2300 } 2301 } 2302 if (!filesToRemove.isEmpty()) { 2303 // Clear the compactedfiles from the store file manager 2304 clearCompactedfiles(filesToRemove); 2305 // Try to send report of this archival to the Master for updating quota usage faster 2306 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2307 } 2308 } 2309 2310 /** 2311 * Computes the length of a store file without succumbing to any errors along the way. If an error 2312 * is encountered, the implementation returns {@code 0} instead of the actual size. 2313 * @param file The file to compute the size of. 2314 * @return The size in bytes of the provided {@code file}. 2315 */ 2316 long getStoreFileSize(HStoreFile file) { 2317 long length = 0; 2318 try { 2319 file.initReader(); 2320 length = file.getReader().length(); 2321 } catch (IOException e) { 2322 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2323 file, e); 2324 } finally { 2325 try { 2326 file.closeStoreFile( 2327 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2328 } catch (IOException e) { 2329 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2330 e); 2331 } 2332 } 2333 return length; 2334 } 2335 2336 public Long preFlushSeqIDEstimation() { 2337 return memstore.preFlushSeqIDEstimation(); 2338 } 2339 2340 @Override 2341 public boolean isSloppyMemStore() { 2342 return this.memstore.isSloppy(); 2343 } 2344 2345 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2346 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2347 storeEngine.removeCompactedFiles(filesToRemove); 2348 } 2349 2350 @Override 2351 public int getCurrentParallelPutCount() { 2352 return currentParallelPutCount.get(); 2353 } 2354 2355 public int getStoreRefCount() { 2356 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2357 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2358 .mapToInt(HStoreFile::getRefCount).sum(); 2359 } 2360 2361 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2362 public int getMaxCompactedStoreFileRefCount() { 2363 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2364 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2365 .mapToInt(HStoreFile::getRefCount).max(); 2366 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2367 } 2368 2369 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2370 // Sanity check from the caller 2371 if (archivedFiles.size() != fileSizes.size()) { 2372 throw new RuntimeException("Coding error: should never see lists of varying size"); 2373 } 2374 RegionServerServices rss = this.region.getRegionServerServices(); 2375 if (rss == null) { 2376 return; 2377 } 2378 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2379 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2380 for (StoreFile storeFile : archivedFiles) { 2381 final long fileSize = fileSizeIter.next(); 2382 if (storeFile.isHFile() && fileSize != 0) { 2383 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2384 } 2385 } 2386 if (LOG.isTraceEnabled()) { 2387 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2388 + filesWithSizes); 2389 } 2390 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2391 if (!success) { 2392 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2393 } 2394 } 2395 2396 @Override 2397 public long getMemstoreOnlyRowReadsCount() { 2398 return memstoreOnlyRowReadsCount.sum(); 2399 } 2400 2401 @Override 2402 public long getMixedRowReadsCount() { 2403 return mixedRowReadsCount.sum(); 2404 } 2405 2406 @Override 2407 public Configuration getReadOnlyConfiguration() { 2408 return new ReadOnlyConfiguration(this.conf); 2409 } 2410 2411 void updateMetricsStore(boolean memstoreRead) { 2412 if (memstoreRead) { 2413 memstoreOnlyRowReadsCount.increment(); 2414 } else { 2415 mixedRowReadsCount.increment(); 2416 } 2417 } 2418 2419 /** 2420 * Return the storefiles which are currently being written to. Mainly used by 2421 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2422 * SFT yet. 2423 */ 2424 public Set<Path> getStoreFilesBeingWritten() { 2425 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2426 .collect(Collectors.toSet()); 2427 } 2428 2429 @Override 2430 public long getBloomFilterRequestsCount() { 2431 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2432 } 2433 2434 @Override 2435 public long getBloomFilterNegativeResultsCount() { 2436 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2437 } 2438 2439 @Override 2440 public long getBloomFilterEligibleRequestsCount() { 2441 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2442 } 2443}