001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.function.Consumer; 052import java.util.function.Supplier; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.ExtendedCell; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.InnerStoreCellComparator; 066import org.apache.hadoop.hbase.MemoryCompactionPolicy; 067import org.apache.hadoop.hbase.MetaCellComparator; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.backup.FailedArchiveException; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.Scan; 073import org.apache.hadoop.hbase.conf.ConfigKey; 074import org.apache.hadoop.hbase.conf.ConfigurationManager; 075import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 076import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration; 077import org.apache.hadoop.hbase.io.HFileLink; 078import org.apache.hadoop.hbase.io.HeapSize; 079import org.apache.hadoop.hbase.io.hfile.CacheConfig; 080import org.apache.hadoop.hbase.io.hfile.HFile; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 083import org.apache.hadoop.hbase.io.hfile.HFileScanner; 084import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 085import org.apache.hadoop.hbase.monitoring.MonitoredTask; 086import org.apache.hadoop.hbase.quotas.RegionSizeStore; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 090import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 091import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 092import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 093import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 094import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 095import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 096import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 097import org.apache.hadoop.hbase.security.SecurityUtil; 098import org.apache.hadoop.hbase.security.User; 099import org.apache.hadoop.hbase.util.Bytes; 100import org.apache.hadoop.hbase.util.ClassSize; 101import org.apache.hadoop.hbase.util.CommonFSUtils; 102import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 103import org.apache.hadoop.hbase.util.Pair; 104import org.apache.hadoop.hbase.util.ReflectionUtils; 105import org.apache.hadoop.util.StringUtils; 106import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 107import org.apache.yetus.audience.InterfaceAudience; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 112import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 113import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 114import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 115import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 116import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 117import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 118 119import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 121 122/** 123 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles, 124 * which stretch backwards over time. 125 * <p> 126 * There's no reason to consider append-logging at this level; all logging and locking is handled at 127 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most 128 * important of those services is compaction services where files are aggregated once they pass a 129 * configurable threshold. 130 * <p> 131 * Locking and transactions are handled at a higher level. This API should not be called directly 132 * but by an HRegion manager. 133 */ 134@InterfaceAudience.Private 135public class HStore 136 implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 137 public static final String MEMSTORE_CLASS_NAME = 138 ConfigKey.CLASS("hbase.regionserver.memstore.class", MemStore.class); 139 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 140 ConfigKey.INT("hbase.server.compactchecker.interval.multiplier"); 141 public static final String BLOCKING_STOREFILES_KEY = 142 ConfigKey.INT("hbase.hstore.blockingStoreFiles"); 143 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 144 // "NONE" is not a valid storage policy and means we defer the policy to HDFS 145 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE"; 146 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 147 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 148 149 // HBASE-24428 : Update compaction priority for recently split daughter regions 150 // so as to prioritize their compaction. 151 // Any compaction candidate with higher priority than compaction of newly split daugher regions 152 // should have priority value < (Integer.MIN_VALUE + 1000) 153 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 154 155 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 156 157 protected final MemStore memstore; 158 // This stores directory in the filesystem. 159 private final HRegion region; 160 protected Configuration conf; 161 private long lastCompactSize = 0; 162 volatile boolean forceMajor = false; 163 private AtomicLong storeSize = new AtomicLong(); 164 private AtomicLong totalUncompressedBytes = new AtomicLong(); 165 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 166 // rows that has cells from both memstore and files (or only files) 167 private LongAdder mixedRowReadsCount = new LongAdder(); 168 169 /** 170 * Lock specific to archiving compacted store files. This avoids races around the combination of 171 * retrieving the list of compacted files and moving them to the archive directory. Since this is 172 * usually a background process (other than on close), we don't want to handle this with the store 173 * write lock, which would block readers and degrade performance. Locked by: - 174 * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close() 175 */ 176 final ReentrantLock archiveLock = new ReentrantLock(); 177 178 private final boolean verifyBulkLoads; 179 180 /** 181 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 182 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a 183 * message that identifies the Store experience this high-level of concurrency. 184 */ 185 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 186 private final int parallelPutCountPrintThreshold; 187 188 private ScanInfo scanInfo; 189 190 // All access must be synchronized. 191 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 192 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 193 194 // All access must be synchronized. 195 private final Set<ChangedReadersObserver> changedReaderObservers = 196 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 197 198 private HFileDataBlockEncoder dataBlockEncoder; 199 200 final StoreEngine<?, ?, ?, ?> storeEngine; 201 202 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 203 private volatile OffPeakHours offPeakHours; 204 205 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 206 private int flushRetriesNumber; 207 private int pauseTime; 208 209 private long blockingFileCount; 210 private int compactionCheckMultiplier; 211 212 private AtomicLong flushedCellsCount = new AtomicLong(); 213 private AtomicLong compactedCellsCount = new AtomicLong(); 214 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 215 private AtomicLong flushedCellsSize = new AtomicLong(); 216 private AtomicLong flushedOutputFileSize = new AtomicLong(); 217 private AtomicLong compactedCellsSize = new AtomicLong(); 218 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 219 220 private final StoreContext storeContext; 221 222 // Used to track the store files which are currently being written. For compaction, if we want to 223 // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to 224 // track the store files being written when flushing. 225 // Notice that the creation is in the background compaction or flush thread and we will get the 226 // files in other thread, so it needs to be thread safe. 227 private static final class StoreFileWriterCreationTracker implements Consumer<Path> { 228 229 private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>()); 230 231 @Override 232 public void accept(Path t) { 233 files.add(t); 234 } 235 236 public Set<Path> get() { 237 return Collections.unmodifiableSet(files); 238 } 239 } 240 241 // We may have multiple compaction running at the same time, and flush can also happen at the same 242 // time, so here we need to use a collection, and the collection needs to be thread safe. 243 // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to 244 // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to 245 // IdentityHashMap if later we want to implement hashCode or equals. 246 private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers = 247 Collections.newSetFromMap(new ConcurrentHashMap<>()); 248 249 // For the SFT implementation which we will write tmp store file first, we do not need to clean up 250 // the broken store files under the data directory, which means we do not need to track the store 251 // file writer creation. So here we abstract a factory to return different trackers for different 252 // SFT implementations. 253 private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory; 254 255 private final boolean warmup; 256 257 /** 258 * Constructor 259 * @param family HColumnDescriptor for this column 260 * @param confParam configuration object failed. Can be null. 261 */ 262 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 263 final Configuration confParam, boolean warmup) throws IOException { 264 this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); 265 266 this.region = region; 267 this.storeContext = initializeStoreContext(family); 268 269 // Assemble the store's home directory and Ensure it exists. 270 region.getRegionFileSystem().createStoreDir(family.getNameAsString()); 271 272 // set block storage policy for store directory 273 String policyName = family.getStoragePolicy(); 274 if (null == policyName) { 275 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 276 } 277 region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim()); 278 279 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 280 281 // used by ScanQueryMatcher 282 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 283 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 284 // Get TTL 285 long ttl = determineTTLFromFamily(family); 286 // Why not just pass a HColumnDescriptor in here altogether? Even if have 287 // to clone it? 288 scanInfo = 289 new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator()); 290 this.memstore = getMemstore(); 291 292 this.offPeakHours = OffPeakHours.getInstance(conf); 293 294 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 295 296 this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 297 this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, 298 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 299 if (this.compactionCheckMultiplier <= 0) { 300 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 301 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 302 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 303 } 304 305 this.warmup = warmup; 306 this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); 307 storeEngine.initialize(warmup); 308 // if require writing to tmp dir first, then we just return null, which indicate that we do not 309 // need to track the creation of store file writer, otherwise we return a new 310 // StoreFileWriterCreationTracker. 311 this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst() 312 ? () -> null 313 : () -> new StoreFileWriterCreationTracker(); 314 refreshStoreSizeAndTotalBytes(); 315 316 flushRetriesNumber = 317 conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 318 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 319 if (flushRetriesNumber <= 0) { 320 throw new IllegalArgumentException( 321 "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); 322 } 323 324 int confPrintThreshold = 325 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 326 if (confPrintThreshold < 10) { 327 confPrintThreshold = 10; 328 } 329 this.parallelPutCountPrintThreshold = confPrintThreshold; 330 331 LOG.info( 332 "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 333 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 334 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 335 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); 336 } 337 338 private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { 339 return new StoreContext.Builder().withBlockSize(family.getBlocksize()) 340 .withEncryptionContext(SecurityUtil.createEncryptionContext(conf, region.getTableDescriptor(), 341 family, region.getManagedKeyDataCache(), region.getSystemKeyCache())) 342 .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family)) 343 .withCellComparator(region.getTableDescriptor().isMetaTable() || conf 344 .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR) 345 ? MetaCellComparator.META_COMPARATOR 346 : InnerStoreCellComparator.INNER_STORE_COMPARATOR) 347 .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles) 348 .withRegionFileSystem(region.getRegionFileSystem()) 349 .withFavoredNodesSupplier(this::getFavoredNodes) 350 .withFamilyStoreDirectoryPath( 351 region.getRegionFileSystem().getStoreDir(family.getNameAsString())) 352 .withRegionCoprocessorHost(region.getCoprocessorHost()).build(); 353 } 354 355 private InetSocketAddress[] getFavoredNodes() { 356 InetSocketAddress[] favoredNodes = null; 357 if (region.getRegionServerServices() != null) { 358 favoredNodes = region.getRegionServerServices() 359 .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName()); 360 } 361 return favoredNodes; 362 } 363 364 /** Returns MemStore Instance to use in this store. */ 365 private MemStore getMemstore() { 366 MemStore ms = null; 367 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 368 MemoryCompactionPolicy inMemoryCompaction = null; 369 if (this.getTableName().isSystemTable()) { 370 inMemoryCompaction = MemoryCompactionPolicy 371 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 372 } else { 373 inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction(); 374 } 375 if (inMemoryCompaction == null) { 376 inMemoryCompaction = 377 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 378 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 379 } 380 381 switch (inMemoryCompaction) { 382 case NONE: 383 Class<? extends MemStore> memStoreClass = 384 conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class); 385 ms = ReflectionUtils.newInstance(memStoreClass, 386 new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() }); 387 break; 388 default: 389 Class<? extends CompactingMemStore> compactingMemStoreClass = 390 conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class); 391 ms = 392 ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(), 393 this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); 394 } 395 return ms; 396 } 397 398 /** 399 * Creates the cache config. 400 * @param family The current column family. 401 */ 402 protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) { 403 CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 404 region.getRegionServicesForStores().getByteBuffAllocator()); 405 LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf, 406 family.getNameAsString(), region.getRegionInfo().getEncodedName()); 407 return cacheConf; 408 } 409 410 /** 411 * Creates the store engine configured for the given Store. 412 * @param store The store. An unfortunate dependency needed due to it being passed to 413 * coprocessors via the compactor. 414 * @param conf Store configuration. 415 * @param kvComparator KVComparator for storeFileManager. 416 * @return StoreEngine to use. 417 */ 418 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 419 CellComparator kvComparator) throws IOException { 420 return StoreEngine.create(store, conf, kvComparator); 421 } 422 423 /** Returns TTL in seconds of the specified family */ 424 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 425 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 426 long ttl = family.getTimeToLive(); 427 if (ttl == HConstants.FOREVER) { 428 // Default is unlimited ttl. 429 ttl = Long.MAX_VALUE; 430 } else if (ttl == -1) { 431 ttl = Long.MAX_VALUE; 432 } else { 433 // Second -> ms adjust for user data 434 ttl *= 1000; 435 } 436 return ttl; 437 } 438 439 public StoreContext getStoreContext() { 440 return storeContext; 441 } 442 443 @Override 444 public String getColumnFamilyName() { 445 return this.storeContext.getFamily().getNameAsString(); 446 } 447 448 @Override 449 public TableName getTableName() { 450 return this.getRegionInfo().getTable(); 451 } 452 453 @Override 454 public FileSystem getFileSystem() { 455 return storeContext.getRegionFileSystem().getFileSystem(); 456 } 457 458 public HRegionFileSystem getRegionFileSystem() { 459 return storeContext.getRegionFileSystem(); 460 } 461 462 /* Implementation of StoreConfigInformation */ 463 @Override 464 public long getStoreFileTtl() { 465 // TTL only applies if there's no MIN_VERSIONs setting on the column. 466 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 467 } 468 469 @Override 470 public long getMemStoreFlushSize() { 471 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 472 return this.region.memstoreFlushSize; 473 } 474 475 @Override 476 public MemStoreSize getFlushableSize() { 477 return this.memstore.getFlushableSize(); 478 } 479 480 @Override 481 public MemStoreSize getSnapshotSize() { 482 return this.memstore.getSnapshotSize(); 483 } 484 485 @Override 486 public long getCompactionCheckMultiplier() { 487 return this.compactionCheckMultiplier; 488 } 489 490 @Override 491 public long getBlockingFileCount() { 492 return blockingFileCount; 493 } 494 /* End implementation of StoreConfigInformation */ 495 496 @Override 497 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 498 return this.storeContext.getFamily(); 499 } 500 501 @Override 502 public OptionalLong getMaxSequenceId() { 503 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 504 } 505 506 @Override 507 public OptionalLong getMaxMemStoreTS() { 508 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 509 } 510 511 /** Returns the data block encoder */ 512 public HFileDataBlockEncoder getDataBlockEncoder() { 513 return dataBlockEncoder; 514 } 515 516 /** 517 * Should be used only in tests. 518 * @param blockEncoder the block delta encoder to use 519 */ 520 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 521 this.dataBlockEncoder = blockEncoder; 522 } 523 524 private void postRefreshStoreFiles() throws IOException { 525 // Advance the memstore read point to be at least the new store files seqIds so that 526 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 527 // in-flight transactions might be made visible) 528 getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); 529 refreshStoreSizeAndTotalBytes(); 530 } 531 532 @Override 533 public void refreshStoreFiles() throws IOException { 534 storeEngine.refreshStoreFiles(); 535 postRefreshStoreFiles(); 536 } 537 538 /** 539 * Replaces the store files that the store has with the given files. Mainly used by secondary 540 * region replicas to keep up to date with the primary region files. 541 */ 542 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 543 storeEngine.refreshStoreFiles(newFiles); 544 postRefreshStoreFiles(); 545 } 546 547 /** 548 * This message intends to inform the MemStore that next coming updates are going to be part of 549 * the replaying edits from WAL 550 */ 551 public void startReplayingFromWAL() { 552 this.memstore.startReplayingFromWAL(); 553 } 554 555 /** 556 * This message intends to inform the MemStore that the replaying edits from WAL are done 557 */ 558 public void stopReplayingFromWAL() { 559 this.memstore.stopReplayingFromWAL(); 560 } 561 562 /** 563 * Adds a value to the memstore 564 */ 565 public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) { 566 storeEngine.readLock(); 567 try { 568 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 569 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 570 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 571 } 572 this.memstore.add(cell, memstoreSizing); 573 } finally { 574 storeEngine.readUnlock(); 575 currentParallelPutCount.decrementAndGet(); 576 } 577 } 578 579 /** 580 * Adds the specified value to the memstore 581 */ 582 public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) { 583 storeEngine.readLock(); 584 try { 585 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 586 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 587 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 588 } 589 memstore.add(cells, memstoreSizing); 590 } finally { 591 storeEngine.readUnlock(); 592 currentParallelPutCount.decrementAndGet(); 593 } 594 } 595 596 @Override 597 public long timeOfOldestEdit() { 598 return memstore.timeOfOldestEdit(); 599 } 600 601 /** Returns All store files. */ 602 @Override 603 public Collection<HStoreFile> getStorefiles() { 604 return this.storeEngine.getStoreFileManager().getStoreFiles(); 605 } 606 607 @Override 608 public Collection<HStoreFile> getCompactedFiles() { 609 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 610 } 611 612 /** 613 * This throws a WrongRegionException if the HFile does not fit in this region, or an 614 * InvalidHFileException if the HFile is not valid. 615 */ 616 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 617 HFile.Reader reader = null; 618 try { 619 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 620 FileSystem srcFs = srcPath.getFileSystem(conf); 621 srcFs.access(srcPath, FsAction.READ_WRITE); 622 reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf); 623 624 Optional<byte[]> firstKey = reader.getFirstRowKey(); 625 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 626 Optional<ExtendedCell> lk = reader.getLastKey(); 627 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 628 byte[] lastKey = CellUtil.cloneRow(lk.get()); 629 630 if (LOG.isDebugEnabled()) { 631 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" 632 + Bytes.toStringBinary(lastKey)); 633 LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) 634 + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 635 } 636 637 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 638 throw new WrongRegionException("Bulk load file " + srcPath.toString() 639 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 640 } 641 642 if ( 643 reader.length() 644 > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 645 ) { 646 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length() 647 + " bytes can be problematic as it may lead to oversplitting."); 648 } 649 650 if (verifyBulkLoads) { 651 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 652 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 653 Cell prevCell = null; 654 HFileScanner scanner = reader.getScanner(conf, false, false, false); 655 scanner.seekTo(); 656 do { 657 Cell cell = scanner.getCell(); 658 if (prevCell != null) { 659 if (getComparator().compareRows(prevCell, cell) > 0) { 660 throw new InvalidHFileException("Previous row is greater than" + " current row: path=" 661 + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current=" 662 + CellUtil.getCellKeyAsString(cell)); 663 } 664 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 665 throw new InvalidHFileException("Previous key had different" 666 + " family compared to current key: path=" + srcPath + " previous=" 667 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 668 prevCell.getFamilyLength()) 669 + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 670 cell.getFamilyLength())); 671 } 672 } 673 prevCell = cell; 674 } while (scanner.next()); 675 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took " 676 + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 677 } 678 } finally { 679 if (reader != null) { 680 reader.close(); 681 } 682 } 683 } 684 685 /** 686 * This method should only be called from Region. It is assumed that the ranges of values in the 687 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 688 * @param seqNum sequence Id associated with the HFile 689 */ 690 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 691 Path srcPath = new Path(srcPathStr); 692 return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 693 } 694 695 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 696 Path srcPath = new Path(srcPathStr); 697 try { 698 getRegionFileSystem().commitStoreFile(srcPath, dstPath); 699 } finally { 700 if (this.getCoprocessorHost() != null) { 701 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 702 } 703 } 704 705 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath 706 + " - updating store file list."); 707 708 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 709 bulkLoadHFile(sf); 710 711 LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); 712 713 return dstPath; 714 } 715 716 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 717 HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); 718 bulkLoadHFile(sf); 719 } 720 721 private void bulkLoadHFile(HStoreFile sf) throws IOException { 722 StoreFileReader r = sf.getReader(); 723 this.storeSize.addAndGet(r.length()); 724 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 725 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 726 }); 727 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 728 if (LOG.isTraceEnabled()) { 729 String traceMessage = "BULK LOAD time,size,store size,store files [" 730 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," 731 + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 732 LOG.trace(traceMessage); 733 } 734 } 735 736 private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException { 737 memstore.close(); 738 // Clear so metrics doesn't find them. 739 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 740 Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); 741 // clear the compacted files 742 if (CollectionUtils.isNotEmpty(compactedfiles)) { 743 removeCompactedfiles(compactedfiles, 744 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); 745 } 746 if (!result.isEmpty()) { 747 // initialize the thread pool for closing store files in parallel. 748 ThreadPoolExecutor storeFileCloserThreadPool = 749 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 750 + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); 751 752 // close each store file in parallel 753 CompletionService<Void> completionService = 754 new ExecutorCompletionService<>(storeFileCloserThreadPool); 755 for (HStoreFile f : result) { 756 completionService.submit(new Callable<Void>() { 757 @Override 758 public Void call() throws IOException { 759 boolean evictOnClose = 760 getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; 761 f.closeStoreFile(!warmup && evictOnClose); 762 return null; 763 } 764 }); 765 } 766 767 IOException ioe = null; 768 try { 769 for (int i = 0; i < result.size(); i++) { 770 try { 771 Future<Void> future = completionService.take(); 772 future.get(); 773 } catch (InterruptedException e) { 774 if (ioe == null) { 775 ioe = new InterruptedIOException(); 776 ioe.initCause(e); 777 } 778 } catch (ExecutionException e) { 779 if (ioe == null) { 780 ioe = new IOException(e.getCause()); 781 } 782 } 783 } 784 } finally { 785 storeFileCloserThreadPool.shutdownNow(); 786 } 787 if (ioe != null) { 788 throw ioe; 789 } 790 } 791 LOG.trace("Closed {}", this); 792 return result; 793 } 794 795 /** 796 * Close all the readers We don't need to worry about subsequent requests because the Region holds 797 * a write lock that will prevent any more reads or writes. 798 * @return the {@link StoreFile StoreFiles} that were previously being used. 799 * @throws IOException on failure 800 */ 801 public ImmutableCollection<HStoreFile> close() throws IOException { 802 // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report 803 // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... 804 // Change later if findbugs becomes smarter in the future. 805 this.archiveLock.lock(); 806 try { 807 this.storeEngine.writeLock(); 808 try { 809 return closeWithoutLock(); 810 } finally { 811 this.storeEngine.writeUnlock(); 812 } 813 } finally { 814 this.archiveLock.unlock(); 815 } 816 } 817 818 /** 819 * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called 820 * previously. 821 * @param logCacheFlushId flush sequence number 822 * @return The path name of the tmp file to which the store was flushed 823 * @throws IOException if exception occurs during process 824 */ 825 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 826 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 827 Consumer<Path> writerCreationTracker) throws IOException { 828 // If an exception happens flushing, we let it out without clearing 829 // the memstore snapshot. The old snapshot will be returned when we say 830 // 'snapshot', the next time flush comes around. 831 // Retry after catching exception when flushing, otherwise server will abort 832 // itself 833 StoreFlusher flusher = storeEngine.getStoreFlusher(); 834 IOException lastException = null; 835 for (int i = 0; i < flushRetriesNumber; i++) { 836 try { 837 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status, 838 throughputController, tracker, writerCreationTracker); 839 Path lastPathName = null; 840 try { 841 for (Path pathName : pathNames) { 842 lastPathName = pathName; 843 storeEngine.validateStoreFile(pathName, false); 844 } 845 return pathNames; 846 } catch (Exception e) { 847 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 848 if (e instanceof IOException) { 849 lastException = (IOException) e; 850 } else { 851 lastException = new IOException(e); 852 } 853 } 854 } catch (IOException e) { 855 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 856 lastException = e; 857 } 858 if (lastException != null && i < (flushRetriesNumber - 1)) { 859 try { 860 Thread.sleep(pauseTime); 861 } catch (InterruptedException e) { 862 IOException iie = new InterruptedIOException(); 863 iie.initCause(e); 864 throw iie; 865 } 866 } 867 } 868 throw lastException; 869 } 870 871 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 872 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 873 FileSystem srcFs = path.getFileSystem(conf); 874 srcFs.access(path, FsAction.READ_WRITE); 875 try (HFile.Reader reader = 876 HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) { 877 Optional<byte[]> firstKey = reader.getFirstRowKey(); 878 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 879 Optional<ExtendedCell> lk = reader.getLastKey(); 880 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 881 byte[] lastKey = CellUtil.cloneRow(lk.get()); 882 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 883 throw new WrongRegionException("Recovered hfile " + path.toString() 884 + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 885 } 886 } 887 888 Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); 889 HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); 890 StoreFileReader r = sf.getReader(); 891 this.storeSize.addAndGet(r.length()); 892 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 893 894 storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> { 895 }); 896 897 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 898 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 899 return sf; 900 } 901 902 private long getTotalSize(Collection<HStoreFile> sfs) { 903 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 904 } 905 906 private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException { 907 // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may 908 // close {@link DefaultMemStore#snapshot}, which may be used by 909 // {@link DefaultMemStore#getScanners}. 910 storeEngine.addStoreFiles(sfs, 911 // NOTE: here we must increase the refCount for storeFiles because we would open the 912 // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. 913 // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by 914 // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because 915 // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under 916 // storeEngine lock. see HBASE-27519 for more details. 917 snapshotId > 0 ? () -> { 918 this.memstore.clearSnapshot(snapshotId); 919 HStoreFile.increaseStoreFilesRefeCount(sfs); 920 } : () -> { 921 HStoreFile.increaseStoreFilesRefeCount(sfs); 922 }); 923 // notify to be called here - only in case of flushes 924 try { 925 notifyChangedReadersObservers(sfs); 926 } finally { 927 HStoreFile.decreaseStoreFilesRefeCount(sfs); 928 } 929 if (LOG.isTraceEnabled()) { 930 long totalSize = getTotalSize(sfs); 931 String traceMessage = "FLUSH time,count,size,store size,store files [" 932 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," 933 + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 934 LOG.trace(traceMessage); 935 } 936 return needsCompaction(); 937 } 938 939 /** 940 * Notify all observers that set of Readers has changed. 941 */ 942 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 943 for (ChangedReadersObserver o : this.changedReaderObservers) { 944 List<KeyValueScanner> memStoreScanners; 945 this.storeEngine.readLock(); 946 try { 947 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 948 } finally { 949 this.storeEngine.readUnlock(); 950 } 951 o.updateReaders(sfs, memStoreScanners); 952 } 953 } 954 955 /** 956 * Get all scanners with no filtering based on TTL (that happens further down the line). 957 * @param cacheBlocks cache the blocks or not 958 * @param usePread true to use pread, false if not 959 * @param isCompaction true if the scanner is created for compaction 960 * @param matcher the scan query matcher 961 * @param startRow the start row 962 * @param stopRow the stop row 963 * @param readPt the read point of the current scan 964 * @return all scanners for this store 965 */ 966 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 967 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, 968 boolean onlyLatestVersion) throws IOException { 969 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 970 readPt, onlyLatestVersion); 971 } 972 973 /** 974 * Get all scanners with no filtering based on TTL (that happens further down the line). 975 * @param cacheBlocks cache the blocks or not 976 * @param usePread true to use pread, false if not 977 * @param isCompaction true if the scanner is created for compaction 978 * @param matcher the scan query matcher 979 * @param startRow the start row 980 * @param includeStartRow true to include start row, false if not 981 * @param stopRow the stop row 982 * @param includeStopRow true to include stop row, false if not 983 * @param readPt the read point of the current scan 984 * @return all scanners for this store 985 */ 986 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 987 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 988 byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) 989 throws IOException { 990 Collection<HStoreFile> storeFilesToScan; 991 List<KeyValueScanner> memStoreScanners; 992 this.storeEngine.readLock(); 993 try { 994 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 995 includeStartRow, stopRow, includeStopRow, onlyLatestVersion); 996 memStoreScanners = this.memstore.getScanners(readPt); 997 // NOTE: here we must increase the refCount for storeFiles because we would open the 998 // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, 999 // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the 1000 // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under 1001 // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 1002 // for more details. 1003 HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); 1004 } finally { 1005 this.storeEngine.readUnlock(); 1006 } 1007 try { 1008 // First the store file scanners 1009 1010 // TODO this used to get the store files in descending order, 1011 // but now we get them in ascending order, which I think is 1012 // actually more correct, since memstore get put at the end. 1013 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( 1014 storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1015 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1016 scanners.addAll(sfScanners); 1017 // Then the memstore scanners 1018 scanners.addAll(memStoreScanners); 1019 return scanners; 1020 } catch (Throwable t) { 1021 clearAndClose(memStoreScanners); 1022 throw t instanceof IOException ? (IOException) t : new IOException(t); 1023 } finally { 1024 HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); 1025 } 1026 } 1027 1028 private static void clearAndClose(List<KeyValueScanner> scanners) { 1029 if (scanners == null) { 1030 return; 1031 } 1032 for (KeyValueScanner s : scanners) { 1033 s.close(); 1034 } 1035 scanners.clear(); 1036 } 1037 1038 /** 1039 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1040 * (that happens further down the line). 1041 * @param files the list of files on which the scanners has to be created 1042 * @param cacheBlocks cache the blocks or not 1043 * @param usePread true to use pread, false if not 1044 * @param isCompaction true if the scanner is created for compaction 1045 * @param matcher the scan query matcher 1046 * @param startRow the start row 1047 * @param stopRow the stop row 1048 * @param readPt the read point of the current scan 1049 * @param includeMemstoreScanner true if memstore has to be included 1050 * @return scanners on the given files and on the memstore if specified 1051 */ 1052 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1053 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1054 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, 1055 boolean onlyLatestVersion) throws IOException { 1056 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1057 false, readPt, includeMemstoreScanner, onlyLatestVersion); 1058 } 1059 1060 /** 1061 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1062 * (that happens further down the line). 1063 * @param files the list of files on which the scanners has to be created 1064 * @param cacheBlocks ache the blocks or not 1065 * @param usePread true to use pread, false if not 1066 * @param isCompaction true if the scanner is created for compaction 1067 * @param matcher the scan query matcher 1068 * @param startRow the start row 1069 * @param includeStartRow true to include start row, false if not 1070 * @param stopRow the stop row 1071 * @param includeStopRow true to include stop row, false if not 1072 * @param readPt the read point of the current scan 1073 * @param includeMemstoreScanner true if memstore has to be included 1074 * @return scanners on the given files and on the memstore if specified 1075 */ 1076 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1077 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1078 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1079 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1080 List<KeyValueScanner> memStoreScanners = null; 1081 if (includeMemstoreScanner) { 1082 this.storeEngine.readLock(); 1083 try { 1084 memStoreScanners = this.memstore.getScanners(readPt); 1085 } finally { 1086 this.storeEngine.readUnlock(); 1087 } 1088 } 1089 try { 1090 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1091 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1092 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1093 scanners.addAll(sfScanners); 1094 // Then the memstore scanners 1095 if (memStoreScanners != null) { 1096 scanners.addAll(memStoreScanners); 1097 } 1098 return scanners; 1099 } catch (Throwable t) { 1100 clearAndClose(memStoreScanners); 1101 throw t instanceof IOException ? (IOException) t : new IOException(t); 1102 } 1103 } 1104 1105 /** 1106 * @param o Observer who wants to know about changes in set of Readers 1107 */ 1108 public void addChangedReaderObserver(ChangedReadersObserver o) { 1109 this.changedReaderObservers.add(o); 1110 } 1111 1112 /** 1113 * @param o Observer no longer interested in changes in set of Readers. 1114 */ 1115 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1116 // We don't check if observer present; it may not be (legitimately) 1117 this.changedReaderObservers.remove(o); 1118 } 1119 1120 ////////////////////////////////////////////////////////////////////////////// 1121 // Compaction 1122 ////////////////////////////////////////////////////////////////////////////// 1123 1124 /** 1125 * Compact the StoreFiles. This method may take some time, so the calling thread must be able to 1126 * block for long periods. 1127 * <p> 1128 * During this time, the Store can work as usual, getting values from StoreFiles and writing new 1129 * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until the new compacted 1130 * StoreFile is completely written-out to disk. 1131 * <p> 1132 * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from 1133 * interfering with other write operations. 1134 * <p> 1135 * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and 1136 * we want to allow cache-flushes during this period. 1137 * <p> 1138 * Compaction event should be idempotent, since there is no IO Fencing for the region directory in 1139 * hdfs. A region server might still try to complete the compaction after it lost the region. That 1140 * is why the following events are carefully ordered for a compaction: 1141 * <ol> 1142 * <li>Compaction writes new files under region/.tmp directory (compaction output)</li> 1143 * <li>Compaction atomically moves the temporary file under region directory</li> 1144 * <li>Compaction appends a WAL edit containing the compaction input and output files. Forces sync 1145 * on WAL.</li> 1146 * <li>Compaction deletes the input files from the region directory.</li> 1147 * </ol> 1148 * Failure conditions are handled like this: 1149 * <ul> 1150 * <li>If RS fails before 2, compaction won't complete. Even if RS lives on and finishes the 1151 * compaction later, it will only write the new data file to the region directory. Since we 1152 * already have this data, this will be idempotent, but we will have a redundant copy of the 1153 * data.</li> 1154 * <li>If RS fails between 2 and 3, the region will have a redundant copy of the data. The RS that 1155 * failed won't be able to finish sync() for WAL because of lease recovery in WAL.</li> 1156 * <li>If RS fails after 3, the region server who opens the region will pick up the compaction 1157 * marker from the WAL and replay it by removing the compaction input files. Failed RS can also 1158 * attempt to delete those files, but the operation will be idempotent</li> 1159 * </ul> 1160 * See HBASE-2231 for details. 1161 * @param compaction compaction details obtained from requestCompaction() 1162 * @return The storefiles that we compacted into or null if we failed or opted out early. 1163 */ 1164 public List<HStoreFile> compact(CompactionContext compaction, 1165 ThroughputController throughputController, User user) throws IOException { 1166 assert compaction != null; 1167 CompactionRequestImpl cr = compaction.getRequest(); 1168 StoreFileWriterCreationTracker writerCreationTracker = 1169 storeFileWriterCreationTrackerFactory.get(); 1170 if (writerCreationTracker != null) { 1171 cr.setWriterCreationTracker(writerCreationTracker); 1172 storeFileWriterCreationTrackers.add(writerCreationTracker); 1173 } 1174 try { 1175 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1176 // because we need to clean up after it on the way out in a finally 1177 // block below 1178 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1179 assert compaction.hasSelection(); 1180 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1181 assert !filesToCompact.isEmpty(); 1182 synchronized (filesCompacting) { 1183 // sanity check: we're compacting files that this store knows about 1184 // TODO: change this to LOG.error() after more debugging 1185 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1186 } 1187 1188 // Ready to go. Have list of files to compact. 1189 LOG.info("Starting compaction of " + filesToCompact + " into tmpdir=" 1190 + getRegionFileSystem().getTempDir() + ", totalSize=" 1191 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1192 1193 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1194 compaction.compact(throughputController, user)); 1195 } finally { 1196 finishCompactionRequest(cr); 1197 } 1198 } 1199 1200 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1201 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles) 1202 throws IOException { 1203 // Do the steps necessary to complete the compaction. 1204 setStoragePolicyFromFileName(newFiles); 1205 List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true); 1206 if (this.getCoprocessorHost() != null) { 1207 for (HStoreFile sf : sfs) { 1208 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1209 } 1210 } 1211 replaceStoreFiles(filesToCompact, sfs, true); 1212 1213 long outputBytes = getTotalSize(sfs); 1214 1215 // At this point the store will use new files for all new scanners. 1216 refreshStoreSizeAndTotalBytes(); // update store size. 1217 1218 long now = EnvironmentEdgeManager.currentTime(); 1219 if ( 1220 region.getRegionServerServices() != null 1221 && region.getRegionServerServices().getMetrics() != null 1222 ) { 1223 region.getRegionServerServices().getMetrics().updateCompaction( 1224 region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(), 1225 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1226 outputBytes); 1227 1228 } 1229 1230 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1231 return sfs; 1232 } 1233 1234 // Set correct storage policy from the file name of DTCP. 1235 // Rename file will not change the storage policy. 1236 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1237 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1238 for (Path newFile : newFiles) { 1239 if (newFile.getParent().getName().startsWith(prefix)) { 1240 CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile, 1241 newFile.getParent().getName().substring(prefix.length())); 1242 } 1243 } 1244 } 1245 1246 /** 1247 * Writes the compaction WAL record. 1248 * @param filesCompacted Files compacted (input). 1249 * @param newFiles Files from compaction. 1250 */ 1251 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1252 Collection<HStoreFile> newFiles) throws IOException { 1253 if (region.getWAL() == null) { 1254 return; 1255 } 1256 List<Path> inputPaths = 1257 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1258 List<Path> outputPaths = 1259 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1260 RegionInfo info = this.region.getRegionInfo(); 1261 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1262 getColumnFamilyDescriptor().getName(), inputPaths, outputPaths, 1263 getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1264 // Fix reaching into Region to get the maxWaitForSeqId. 1265 // Does this method belong in Region altogether given it is making so many references up there? 1266 // Could be Region#writeCompactionMarker(compactionDescriptor); 1267 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1268 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(), 1269 region.getRegionReplicationSink().orElse(null)); 1270 } 1271 1272 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 1273 allowedOnPath = ".*/(HStore|TestHStore).java") 1274 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result, 1275 boolean writeCompactionMarker) throws IOException { 1276 storeEngine.replaceStoreFiles(compactedFiles, result, () -> { 1277 if (writeCompactionMarker) { 1278 writeCompactionWalRecord(compactedFiles, result); 1279 } 1280 }, () -> { 1281 synchronized (filesCompacting) { 1282 filesCompacting.removeAll(compactedFiles); 1283 } 1284 }); 1285 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1286 // sizes later so it's not super-critical if we miss these. 1287 RegionServerServices rsServices = region.getRegionServerServices(); 1288 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1289 updateSpaceQuotaAfterFileReplacement( 1290 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1291 compactedFiles, result); 1292 } 1293 } 1294 1295 /** 1296 * Updates the space quota usage for this region, removing the size for files compacted away and 1297 * adding in the size for new files. 1298 * @param sizeStore The object tracking changes in region size for space quotas. 1299 * @param regionInfo The identifier for the region whose size is being updated. 1300 * @param oldFiles Files removed from this store's region. 1301 * @param newFiles Files added to this store's region. 1302 */ 1303 void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo, 1304 Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) { 1305 long delta = 0; 1306 if (oldFiles != null) { 1307 for (HStoreFile compactedFile : oldFiles) { 1308 if (compactedFile.isHFile()) { 1309 delta -= compactedFile.getReader().length(); 1310 } 1311 } 1312 } 1313 if (newFiles != null) { 1314 for (HStoreFile newFile : newFiles) { 1315 if (newFile.isHFile()) { 1316 delta += newFile.getReader().length(); 1317 } 1318 } 1319 } 1320 sizeStore.incrementRegionSize(regionInfo, delta); 1321 } 1322 1323 /** 1324 * Log a very elaborate compaction completion message. 1325 * @param cr Request. 1326 * @param sfs Resulting files. 1327 * @param compactionStartTime Start time. 1328 */ 1329 private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now, 1330 long compactionStartTime) { 1331 StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "") 1332 + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") 1333 + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1334 if (sfs.isEmpty()) { 1335 message.append("none, "); 1336 } else { 1337 for (HStoreFile sf : sfs) { 1338 message.append(sf.getPath().getName()); 1339 message.append("(size="); 1340 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1341 message.append("), "); 1342 } 1343 } 1344 message.append("total size for store is ") 1345 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1346 .append(". This selection was in queue for ") 1347 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1348 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1349 .append(" to execute."); 1350 LOG.info(message.toString()); 1351 if (LOG.isTraceEnabled()) { 1352 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1353 long resultSize = getTotalSize(sfs); 1354 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1355 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1356 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1357 LOG.trace(traceMessage); 1358 } 1359 } 1360 1361 /** 1362 * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was 1363 * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231. 1364 */ 1365 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1366 boolean removeFiles) throws IOException { 1367 LOG.debug("Completing compaction from the WAL marker"); 1368 List<String> compactionInputs = compaction.getCompactionInputList(); 1369 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1370 1371 // The Compaction Marker is written after the compaction is completed, 1372 // and the files moved into the region/family folder. 1373 // 1374 // If we crash after the entry is written, we may not have removed the 1375 // input files, but the output file is present. 1376 // (The unremoved input files will be removed by this function) 1377 // 1378 // If we scan the directory and the file is not present, it can mean that: 1379 // - The file was manually removed by the user 1380 // - The file was removed as consequence of subsequent compaction 1381 // so, we can't do anything with the "compaction output list" because those 1382 // files have already been loaded when opening the region (by virtue of 1383 // being in the store's folder) or they may be missing due to a compaction. 1384 1385 String familyName = this.getColumnFamilyName(); 1386 Set<String> inputFiles = new HashSet<>(); 1387 for (String compactionInput : compactionInputs) { 1388 Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput); 1389 inputFiles.add(inputPath.getName()); 1390 } 1391 1392 // some of the input files might already be deleted 1393 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1394 for (HStoreFile sf : this.getStorefiles()) { 1395 if (inputFiles.contains(sf.getPath().getName())) { 1396 inputStoreFiles.add(sf); 1397 } else if ( 1398 !isPrimaryReplicaStore() && sf.getFileInfo().isLink() 1399 && inputFiles.contains(HFileLink.getReferencedHFileName(sf.getPath().getName())) 1400 ) { 1401 inputStoreFiles.add(sf); 1402 } 1403 } 1404 1405 // check whether we need to pick up the new files 1406 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1407 1408 if (pickCompactionFiles) { 1409 for (HStoreFile sf : this.getStorefiles()) { 1410 compactionOutputs.remove(sf.getPath().getName()); 1411 } 1412 for (String compactionOutput : compactionOutputs) { 1413 StoreFileTracker sft = 1414 StoreFileTrackerFactory.create(conf, isPrimaryReplicaStore(), storeContext); 1415 StoreFileInfo storeFileInfo = 1416 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput, sft); 1417 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 1418 outputStoreFiles.add(storeFile); 1419 } 1420 } 1421 1422 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1423 LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles 1424 + " with output files : " + outputStoreFiles); 1425 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); 1426 this.refreshStoreSizeAndTotalBytes(); 1427 } 1428 } 1429 1430 @Override 1431 public boolean hasReferences() { 1432 // Grab the read lock here, because we need to ensure that: only when the atomic 1433 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1434 this.storeEngine.readLock(); 1435 try { 1436 // Merge the current store files with compacted files here due to HBASE-20940. 1437 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1438 allStoreFiles.addAll(getCompactedFiles()); 1439 return StoreUtils.hasReferences(allStoreFiles); 1440 } finally { 1441 this.storeEngine.readUnlock(); 1442 } 1443 } 1444 1445 /** 1446 * getter for CompactionProgress object 1447 * @return CompactionProgress object; can be null 1448 */ 1449 public CompactionProgress getCompactionProgress() { 1450 return this.storeEngine.getCompactor().getProgress(); 1451 } 1452 1453 @Override 1454 public boolean shouldPerformMajorCompaction() throws IOException { 1455 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1456 // TODO: what are these reader checks all over the place? 1457 if (sf.getReader() == null) { 1458 LOG.debug("StoreFile {} has null Reader", sf); 1459 return false; 1460 } 1461 } 1462 return storeEngine.getCompactionPolicy() 1463 .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); 1464 } 1465 1466 public Optional<CompactionContext> requestCompaction() throws IOException { 1467 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1468 } 1469 1470 public Optional<CompactionContext> requestCompaction(int priority, 1471 CompactionLifeCycleTracker tracker, User user) throws IOException { 1472 // don't even select for compaction if writes are disabled 1473 if (!this.areWritesEnabled()) { 1474 return Optional.empty(); 1475 } 1476 // Before we do compaction, try to get rid of unneeded files to simplify things. 1477 removeUnneededFiles(); 1478 1479 final CompactionContext compaction = storeEngine.createCompaction(); 1480 CompactionRequestImpl request = null; 1481 this.storeEngine.readLock(); 1482 try { 1483 synchronized (filesCompacting) { 1484 // First, see if coprocessor would want to override selection. 1485 if (this.getCoprocessorHost() != null) { 1486 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1487 boolean override = 1488 getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); 1489 if (override) { 1490 // Coprocessor is overriding normal file selection. 1491 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1492 } 1493 } 1494 1495 // Normal case - coprocessor is not overriding file selection. 1496 if (!compaction.hasSelection()) { 1497 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1498 boolean mayUseOffPeak = 1499 offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true); 1500 try { 1501 compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, 1502 forceMajor && filesCompacting.isEmpty()); 1503 } catch (IOException e) { 1504 if (mayUseOffPeak) { 1505 offPeakCompactionTracker.set(false); 1506 } 1507 throw e; 1508 } 1509 assert compaction.hasSelection(); 1510 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1511 // Compaction policy doesn't want to take advantage of off-peak. 1512 offPeakCompactionTracker.set(false); 1513 } 1514 } 1515 if (this.getCoprocessorHost() != null) { 1516 this.getCoprocessorHost().postCompactSelection(this, 1517 ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1518 compaction.getRequest(), user); 1519 } 1520 // Finally, we have the resulting files list. Check if we have any files at all. 1521 request = compaction.getRequest(); 1522 Collection<HStoreFile> selectedFiles = request.getFiles(); 1523 if (selectedFiles.isEmpty()) { 1524 return Optional.empty(); 1525 } 1526 1527 addToCompactingFiles(selectedFiles); 1528 1529 // If we're enqueuing a major, clear the force flag. 1530 this.forceMajor = this.forceMajor && !request.isMajor(); 1531 1532 // Set common request properties. 1533 // Set priority, either override value supplied by caller or from store. 1534 final int compactionPriority = 1535 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1536 request.setPriority(compactionPriority); 1537 1538 if (request.isAfterSplit()) { 1539 // If the store belongs to recently splitted daughter regions, better we consider 1540 // them with the higher priority in the compaction queue. 1541 // Override priority if it is lower (higher int value) than 1542 // SPLIT_REGION_COMPACTION_PRIORITY 1543 final int splitHousekeepingPriority = 1544 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1545 request.setPriority(splitHousekeepingPriority); 1546 LOG.info( 1547 "Keeping/Overriding Compaction request priority to {} for CF {} since it" 1548 + " belongs to recently split daughter region {}", 1549 splitHousekeepingPriority, this.getColumnFamilyName(), 1550 getRegionInfo().getRegionNameAsString()); 1551 } 1552 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1553 request.setTracker(tracker); 1554 } 1555 } finally { 1556 this.storeEngine.readUnlock(); 1557 } 1558 1559 if (LOG.isDebugEnabled()) { 1560 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1561 + (request.isAllFiles() ? " (all files)" : "")); 1562 } 1563 this.region.reportCompactionRequestStart(request.isMajor()); 1564 return Optional.of(compaction); 1565 } 1566 1567 /** Adds the files to compacting files. filesCompacting must be locked. */ 1568 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1569 if (CollectionUtils.isEmpty(filesToAdd)) { 1570 return; 1571 } 1572 // Check that we do not try to compact the same StoreFile twice. 1573 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1574 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1575 } 1576 filesCompacting.addAll(filesToAdd); 1577 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1578 } 1579 1580 private void removeUnneededFiles() throws IOException { 1581 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1582 return; 1583 } 1584 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1585 LOG.debug("Skipping expired store file removal due to min version of {} being {}", this, 1586 getColumnFamilyDescriptor().getMinVersions()); 1587 return; 1588 } 1589 this.storeEngine.readLock(); 1590 Collection<HStoreFile> delSfs = null; 1591 try { 1592 synchronized (filesCompacting) { 1593 long cfTtl = getStoreFileTtl(); 1594 if (cfTtl != Long.MAX_VALUE) { 1595 delSfs = storeEngine.getStoreFileManager() 1596 .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1597 addToCompactingFiles(delSfs); 1598 } 1599 } 1600 } finally { 1601 this.storeEngine.readUnlock(); 1602 } 1603 1604 if (CollectionUtils.isEmpty(delSfs)) { 1605 return; 1606 } 1607 1608 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1609 replaceStoreFiles(delSfs, newFiles, true); 1610 refreshStoreSizeAndTotalBytes(); 1611 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this 1612 + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1613 } 1614 1615 public void cancelRequestedCompaction(CompactionContext compaction) { 1616 finishCompactionRequest(compaction.getRequest()); 1617 } 1618 1619 private void finishCompactionRequest(CompactionRequestImpl cr) { 1620 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1621 if (cr.isOffPeak()) { 1622 offPeakCompactionTracker.set(false); 1623 cr.setOffPeak(false); 1624 } 1625 synchronized (filesCompacting) { 1626 filesCompacting.removeAll(cr.getFiles()); 1627 } 1628 // The tracker could be null, for example, we do not need to track the creation of store file 1629 // writer due to different implementation of SFT, or the compaction is canceled. 1630 if (cr.getWriterCreationTracker() != null) { 1631 storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); 1632 } 1633 } 1634 1635 /** 1636 * Update counts. 1637 */ 1638 protected void refreshStoreSizeAndTotalBytes() throws IOException { 1639 this.storeSize.set(0L); 1640 this.totalUncompressedBytes.set(0L); 1641 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { 1642 StoreFileReader r = hsf.getReader(); 1643 if (r == null) { 1644 LOG.debug("StoreFile {} has a null Reader", hsf); 1645 continue; 1646 } 1647 this.storeSize.addAndGet(r.length()); 1648 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1649 } 1650 } 1651 1652 /* 1653 * @param wantedVersions How many versions were asked for. 1654 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1655 */ 1656 int versionsToReturn(final int wantedVersions) { 1657 if (wantedVersions <= 0) { 1658 throw new IllegalArgumentException("Number of versions must be > 0"); 1659 } 1660 // Make sure we do not return more than maximum versions for this store. 1661 int maxVersions = getColumnFamilyDescriptor().getMaxVersions(); 1662 return wantedVersions > maxVersions ? maxVersions : wantedVersions; 1663 } 1664 1665 @Override 1666 public boolean canSplit() { 1667 // Not split-able if we find a reference store file present in the store. 1668 boolean result = !hasReferences(); 1669 if (!result) { 1670 LOG.trace("Not splittable; has references: {}", this); 1671 } 1672 return result; 1673 } 1674 1675 /** 1676 * Determines if Store should be split. 1677 */ 1678 public Optional<byte[]> getSplitPoint() { 1679 this.storeEngine.readLock(); 1680 try { 1681 // Should already be enforced by the split policy! 1682 assert !this.getRegionInfo().isMetaRegion(); 1683 // Not split-able if we find a reference store file present in the store. 1684 if (hasReferences()) { 1685 LOG.trace("Not splittable; has references: {}", this); 1686 return Optional.empty(); 1687 } 1688 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1689 } catch (IOException e) { 1690 LOG.warn("Failed getting store size for {}", this, e); 1691 } finally { 1692 this.storeEngine.readUnlock(); 1693 } 1694 return Optional.empty(); 1695 } 1696 1697 @Override 1698 public long getLastCompactSize() { 1699 return this.lastCompactSize; 1700 } 1701 1702 @Override 1703 public long getSize() { 1704 return storeSize.get(); 1705 } 1706 1707 public void triggerMajorCompaction() { 1708 this.forceMajor = true; 1709 } 1710 1711 ////////////////////////////////////////////////////////////////////////////// 1712 // File administration 1713 ////////////////////////////////////////////////////////////////////////////// 1714 1715 /** 1716 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1717 * compaction. 1718 * @param scan Scan to apply when scanning the stores 1719 * @param targetCols columns to scan 1720 * @return a scanner over the current key values 1721 * @throws IOException on failure 1722 */ 1723 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1724 throws IOException { 1725 storeEngine.readLock(); 1726 try { 1727 ScanInfo scanInfo; 1728 if (this.getCoprocessorHost() != null) { 1729 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 1730 } else { 1731 scanInfo = getScanInfo(); 1732 } 1733 return createScanner(scan, scanInfo, targetCols, readPt); 1734 } finally { 1735 storeEngine.readUnlock(); 1736 } 1737 } 1738 1739 // HMobStore will override this method to return its own implementation. 1740 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1741 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1742 return scan.isReversed() 1743 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1744 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1745 } 1746 1747 /** 1748 * Recreates the scanners on the current list of active store file scanners 1749 * @param currentFileScanners the current set of active store file scanners 1750 * @param cacheBlocks cache the blocks or not 1751 * @param usePread use pread or not 1752 * @param isCompaction is the scanner for compaction 1753 * @param matcher the scan query matcher 1754 * @param startRow the scan's start row 1755 * @param includeStartRow should the scan include the start row 1756 * @param stopRow the scan's stop row 1757 * @param includeStopRow should the scan include the stop row 1758 * @param readPt the read point of the current scane 1759 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1760 * @return list of scanners recreated on the current Scanners 1761 */ 1762 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1763 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1764 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1765 boolean includeMemstoreScanner) throws IOException { 1766 this.storeEngine.readLock(); 1767 try { 1768 Map<String, HStoreFile> name2File = 1769 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1770 for (HStoreFile file : getStorefiles()) { 1771 name2File.put(file.getFileInfo().getActiveFileName(), file); 1772 } 1773 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1774 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1775 name2File.put(file.getFileInfo().getActiveFileName(), file); 1776 } 1777 List<HStoreFile> filesToReopen = new ArrayList<>(); 1778 for (KeyValueScanner kvs : currentFileScanners) { 1779 assert kvs.isFileScanner(); 1780 if (kvs.peek() == null) { 1781 continue; 1782 } 1783 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 1784 } 1785 if (filesToReopen.isEmpty()) { 1786 return null; 1787 } 1788 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 1789 includeStartRow, stopRow, includeStopRow, readPt, false, false); 1790 } finally { 1791 this.storeEngine.readUnlock(); 1792 } 1793 } 1794 1795 @Override 1796 public String toString() { 1797 return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName(); 1798 } 1799 1800 @Override 1801 public int getStorefilesCount() { 1802 return this.storeEngine.getStoreFileManager().getStorefileCount(); 1803 } 1804 1805 @Override 1806 public int getCompactedFilesCount() { 1807 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 1808 } 1809 1810 private LongStream getStoreFileAgeStream() { 1811 return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { 1812 if (sf.getReader() == null) { 1813 LOG.debug("StoreFile {} has a null Reader", sf); 1814 return false; 1815 } else { 1816 return true; 1817 } 1818 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 1819 .map(t -> EnvironmentEdgeManager.currentTime() - t); 1820 } 1821 1822 @Override 1823 public OptionalLong getMaxStoreFileAge() { 1824 return getStoreFileAgeStream().max(); 1825 } 1826 1827 @Override 1828 public OptionalLong getMinStoreFileAge() { 1829 return getStoreFileAgeStream().min(); 1830 } 1831 1832 @Override 1833 public OptionalDouble getAvgStoreFileAge() { 1834 return getStoreFileAgeStream().average(); 1835 } 1836 1837 @Override 1838 public long getNumReferenceFiles() { 1839 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1840 .filter(HStoreFile::isReference).count(); 1841 } 1842 1843 @Override 1844 public long getNumHFiles() { 1845 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1846 .filter(HStoreFile::isHFile).count(); 1847 } 1848 1849 @Override 1850 public long getStoreSizeUncompressed() { 1851 return this.totalUncompressedBytes.get(); 1852 } 1853 1854 @Override 1855 public long getStorefilesSize() { 1856 // Include all StoreFiles 1857 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1858 sf -> true); 1859 } 1860 1861 @Override 1862 public long getHFilesSize() { 1863 // Include only StoreFiles which are HFiles 1864 return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), 1865 HStoreFile::isHFile); 1866 } 1867 1868 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 1869 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 1870 .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); 1871 } 1872 1873 @Override 1874 public long getStorefilesRootLevelIndexSize() { 1875 return getStorefilesFieldSize(StoreFileReader::indexSize); 1876 } 1877 1878 @Override 1879 public long getTotalStaticIndexSize() { 1880 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 1881 } 1882 1883 @Override 1884 public long getTotalStaticBloomSize() { 1885 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 1886 } 1887 1888 @Override 1889 public MemStoreSize getMemStoreSize() { 1890 return this.memstore.size(); 1891 } 1892 1893 @Override 1894 public int getCompactPriority() { 1895 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 1896 if (priority == PRIORITY_USER) { 1897 LOG.warn("Compaction priority is USER despite there being no user compaction"); 1898 } 1899 return priority; 1900 } 1901 1902 public boolean throttleCompaction(long compactionSize) { 1903 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 1904 } 1905 1906 public HRegion getHRegion() { 1907 return this.region; 1908 } 1909 1910 public RegionCoprocessorHost getCoprocessorHost() { 1911 return this.region.getCoprocessorHost(); 1912 } 1913 1914 @Override 1915 public RegionInfo getRegionInfo() { 1916 return getRegionFileSystem().getRegionInfo(); 1917 } 1918 1919 @Override 1920 public boolean areWritesEnabled() { 1921 return this.region.areWritesEnabled(); 1922 } 1923 1924 @Override 1925 public long getSmallestReadPoint() { 1926 return this.region.getSmallestReadPoint(); 1927 } 1928 1929 /** 1930 * Adds or replaces the specified KeyValues. 1931 * <p> 1932 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 1933 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 1934 * <p> 1935 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 1936 * across all of them. 1937 * @param readpoint readpoint below which we can safely remove duplicate KVs 1938 */ 1939 public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) { 1940 this.storeEngine.readLock(); 1941 try { 1942 this.memstore.upsert(cells, readpoint, memstoreSizing); 1943 } finally { 1944 this.storeEngine.readUnlock(); 1945 } 1946 } 1947 1948 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 1949 return new StoreFlusherImpl(cacheFlushId, tracker); 1950 } 1951 1952 private final class StoreFlusherImpl implements StoreFlushContext { 1953 1954 private final FlushLifeCycleTracker tracker; 1955 private final StoreFileWriterCreationTracker writerCreationTracker; 1956 private final long cacheFlushSeqNum; 1957 private MemStoreSnapshot snapshot; 1958 private List<Path> tempFiles; 1959 private List<Path> committedFiles; 1960 private long cacheFlushCount; 1961 private long cacheFlushSize; 1962 private long outputFileSize; 1963 1964 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 1965 this.cacheFlushSeqNum = cacheFlushSeqNum; 1966 this.tracker = tracker; 1967 this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); 1968 } 1969 1970 /** 1971 * This is not thread safe. The caller should have a lock on the region or the store. If 1972 * necessary, the lock can be added with the patch provided in HBASE-10087 1973 */ 1974 @Override 1975 public MemStoreSize prepare() { 1976 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 1977 this.snapshot = memstore.snapshot(); 1978 this.cacheFlushCount = snapshot.getCellsCount(); 1979 this.cacheFlushSize = snapshot.getDataSize(); 1980 committedFiles = new ArrayList<>(1); 1981 return snapshot.getMemStoreSize(); 1982 } 1983 1984 @Override 1985 public void flushCache(MonitoredTask status) throws IOException { 1986 RegionServerServices rsService = region.getRegionServerServices(); 1987 ThroughputController throughputController = 1988 rsService == null ? null : rsService.getFlushThroughputController(); 1989 // it could be null if we do not need to track the creation of store file writer due to 1990 // different SFT implementation. 1991 if (writerCreationTracker != null) { 1992 HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); 1993 } 1994 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, 1995 tracker, writerCreationTracker); 1996 } 1997 1998 @Override 1999 public boolean commit(MonitoredTask status) throws IOException { 2000 try { 2001 if (CollectionUtils.isEmpty(this.tempFiles)) { 2002 return false; 2003 } 2004 status.setStatus("Flushing " + this + ": reopening flushed file"); 2005 List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false, false); 2006 for (HStoreFile sf : storeFiles) { 2007 StoreFileReader r = sf.getReader(); 2008 if (LOG.isInfoEnabled()) { 2009 LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), 2010 cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 2011 } 2012 outputFileSize += r.length(); 2013 storeSize.addAndGet(r.length()); 2014 totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 2015 committedFiles.add(sf.getPath()); 2016 } 2017 2018 flushedCellsCount.addAndGet(cacheFlushCount); 2019 flushedCellsSize.addAndGet(cacheFlushSize); 2020 flushedOutputFileSize.addAndGet(outputFileSize); 2021 // call coprocessor after we have done all the accounting above 2022 for (HStoreFile sf : storeFiles) { 2023 if (getCoprocessorHost() != null) { 2024 getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2025 } 2026 } 2027 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2028 return completeFlush(storeFiles, snapshot.getId()); 2029 } finally { 2030 if (writerCreationTracker != null) { 2031 HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); 2032 } 2033 } 2034 } 2035 2036 @Override 2037 public long getOutputFileSize() { 2038 return outputFileSize; 2039 } 2040 2041 @Override 2042 public List<Path> getCommittedFiles() { 2043 return committedFiles; 2044 } 2045 2046 /** 2047 * Similar to commit, but called in secondary region replicas for replaying the flush cache from 2048 * primary region. Adds the new files to the store, and drops the snapshot depending on 2049 * dropMemstoreSnapshot argument. 2050 * @param fileNames names of the flushed files 2051 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2052 */ 2053 @Override 2054 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2055 throws IOException { 2056 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2057 for (String file : fileNames) { 2058 // open the file as a store file (hfile link, etc) 2059 StoreFileTracker sft = 2060 StoreFileTrackerFactory.create(conf, isPrimaryReplicaStore(), storeContext); 2061 StoreFileInfo storeFileInfo = 2062 getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file, sft); 2063 HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); 2064 storeFiles.add(storeFile); 2065 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2066 HStore.this.totalUncompressedBytes 2067 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2068 if (LOG.isInfoEnabled()) { 2069 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() 2070 + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2071 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2072 } 2073 } 2074 2075 long snapshotId = -1; // -1 means do not drop 2076 if (dropMemstoreSnapshot && snapshot != null) { 2077 snapshotId = snapshot.getId(); 2078 } 2079 HStore.this.completeFlush(storeFiles, snapshotId); 2080 } 2081 2082 /** 2083 * Abort the snapshot preparation. Drops the snapshot if any. 2084 */ 2085 @Override 2086 public void abort() throws IOException { 2087 if (snapshot != null) { 2088 HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); 2089 } 2090 } 2091 } 2092 2093 @Override 2094 public boolean needsCompaction() { 2095 List<HStoreFile> filesCompactingClone = null; 2096 synchronized (filesCompacting) { 2097 filesCompactingClone = Lists.newArrayList(filesCompacting); 2098 } 2099 return this.storeEngine.needsCompaction(filesCompactingClone); 2100 } 2101 2102 /** 2103 * Used for tests. 2104 * @return cache configuration for this Store. 2105 */ 2106 public CacheConfig getCacheConfig() { 2107 return storeContext.getCacheConf(); 2108 } 2109 2110 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2111 2112 public static final long DEEP_OVERHEAD = ClassSize.align( 2113 FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP 2114 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD); 2115 2116 @Override 2117 public long heapSize() { 2118 MemStoreSize memstoreSize = this.memstore.size(); 2119 return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize(); 2120 } 2121 2122 @Override 2123 public CellComparator getComparator() { 2124 return storeContext.getComparator(); 2125 } 2126 2127 public ScanInfo getScanInfo() { 2128 return scanInfo; 2129 } 2130 2131 /** 2132 * Set scan info, used by test 2133 * @param scanInfo new scan info to use for test 2134 */ 2135 void setScanInfo(ScanInfo scanInfo) { 2136 this.scanInfo = scanInfo; 2137 } 2138 2139 @Override 2140 public boolean hasTooManyStoreFiles() { 2141 return getStorefilesCount() > this.blockingFileCount; 2142 } 2143 2144 @Override 2145 public long getFlushedCellsCount() { 2146 return flushedCellsCount.get(); 2147 } 2148 2149 @Override 2150 public long getFlushedCellsSize() { 2151 return flushedCellsSize.get(); 2152 } 2153 2154 @Override 2155 public long getFlushedOutputFileSize() { 2156 return flushedOutputFileSize.get(); 2157 } 2158 2159 @Override 2160 public long getCompactedCellsCount() { 2161 return compactedCellsCount.get(); 2162 } 2163 2164 @Override 2165 public long getCompactedCellsSize() { 2166 return compactedCellsSize.get(); 2167 } 2168 2169 @Override 2170 public long getMajorCompactedCellsCount() { 2171 return majorCompactedCellsCount.get(); 2172 } 2173 2174 @Override 2175 public long getMajorCompactedCellsSize() { 2176 return majorCompactedCellsSize.get(); 2177 } 2178 2179 public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { 2180 if (isMajor) { 2181 majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2182 majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); 2183 } else { 2184 compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); 2185 compactedCellsSize.addAndGet(progress.totalCompactedSize); 2186 } 2187 } 2188 2189 /** 2190 * Returns the StoreEngine that is backing this concrete implementation of Store. 2191 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2192 */ 2193 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2194 return this.storeEngine; 2195 } 2196 2197 protected OffPeakHours getOffPeakHours() { 2198 return this.offPeakHours; 2199 } 2200 2201 @Override 2202 public void onConfigurationChange(Configuration conf) { 2203 Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(), 2204 getColumnFamilyDescriptor()); 2205 this.conf = storeConf; 2206 this.storeEngine.compactionPolicy.setConf(storeConf); 2207 this.offPeakHours = OffPeakHours.getInstance(storeConf); 2208 } 2209 2210 /** 2211 * {@inheritDoc} 2212 */ 2213 @Override 2214 public void registerChildren(ConfigurationManager manager) { 2215 CacheConfig cacheConfig = this.storeContext.getCacheConf(); 2216 if (cacheConfig != null) { 2217 manager.registerObserver(cacheConfig); 2218 } 2219 } 2220 2221 /** 2222 * {@inheritDoc} 2223 */ 2224 @Override 2225 public void deregisterChildren(ConfigurationManager manager) { 2226 // No children to deregister 2227 } 2228 2229 @Override 2230 public double getCompactionPressure() { 2231 return storeEngine.getStoreFileManager().getCompactionPressure(); 2232 } 2233 2234 @Override 2235 public boolean isPrimaryReplicaStore() { 2236 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2237 } 2238 2239 /** 2240 * Sets the store up for a region level snapshot operation. 2241 * @see #postSnapshotOperation() 2242 */ 2243 public void preSnapshotOperation() { 2244 archiveLock.lock(); 2245 } 2246 2247 /** 2248 * Perform tasks needed after the completion of snapshot operation. 2249 * @see #preSnapshotOperation() 2250 */ 2251 public void postSnapshotOperation() { 2252 archiveLock.unlock(); 2253 } 2254 2255 /** 2256 * Closes and archives the compacted files under this store 2257 */ 2258 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2259 // ensure other threads do not attempt to archive the same files on close() 2260 archiveLock.lock(); 2261 try { 2262 storeEngine.readLock(); 2263 Collection<HStoreFile> copyCompactedfiles = null; 2264 try { 2265 Collection<HStoreFile> compactedfiles = 2266 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2267 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2268 // Do a copy under read lock 2269 copyCompactedfiles = new ArrayList<>(compactedfiles); 2270 } else { 2271 LOG.trace("No compacted files to archive"); 2272 } 2273 } finally { 2274 storeEngine.readUnlock(); 2275 } 2276 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2277 removeCompactedfiles(copyCompactedfiles, true); 2278 } 2279 } finally { 2280 archiveLock.unlock(); 2281 } 2282 } 2283 2284 /** 2285 * Archives and removes the compacted files 2286 * @param compactedfiles The compacted files in this store that are not active in reads 2287 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2288 * closed, false if not 2289 */ 2290 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2291 throws IOException { 2292 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2293 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2294 for (final HStoreFile file : compactedfiles) { 2295 synchronized (file) { 2296 try { 2297 StoreFileReader r = file.getReader(); 2298 if (r == null) { 2299 LOG.debug("The file {} was closed but still not archived", file); 2300 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2301 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2302 // for that. 2303 long length = getStoreFileSize(file); 2304 filesToRemove.add(file); 2305 storeFileSizes.add(length); 2306 continue; 2307 } 2308 2309 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2310 // Even if deleting fails we need not bother as any new scanners won't be 2311 // able to use the compacted file as the status is already compactedAway 2312 LOG.trace("Closing and archiving the file {}", file); 2313 // Copy the file size before closing the reader 2314 final long length = r.length(); 2315 r.close(evictOnClose); 2316 // Just close and return 2317 filesToRemove.add(file); 2318 // Only add the length if we successfully added the file to `filesToRemove` 2319 storeFileSizes.add(length); 2320 } else { 2321 LOG.info("Can't archive compacted file " + file.getPath() 2322 + " because of either isCompactedAway=" + file.isCompactedAway() 2323 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2324 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2325 } 2326 } catch (Exception e) { 2327 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2328 e); 2329 } 2330 } 2331 } 2332 if (this.isPrimaryReplicaStore()) { 2333 // Only the primary region is allowed to move the file to archive. 2334 // The secondary region does not move the files to archive. Any active reads from 2335 // the secondary region will still work because the file as such has active readers on it. 2336 if (!filesToRemove.isEmpty()) { 2337 LOG.debug("Moving the files {} to archive", filesToRemove); 2338 // Only if this is successful it has to be removed 2339 try { 2340 StoreFileTracker storeFileTracker = 2341 StoreFileTrackerFactory.create(conf, isPrimaryReplicaStore(), storeContext); 2342 storeFileTracker.removeStoreFiles(filesToRemove); 2343 } catch (FailedArchiveException fae) { 2344 // Even if archiving some files failed, we still need to clear out any of the 2345 // files which were successfully archived. Otherwise we will receive a 2346 // FileNotFoundException when we attempt to re-archive them in the next go around. 2347 Collection<Path> failedFiles = fae.getFailedFiles(); 2348 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2349 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2350 while (iter.hasNext()) { 2351 sizeIter.next(); 2352 if (failedFiles.contains(iter.next().getPath())) { 2353 iter.remove(); 2354 sizeIter.remove(); 2355 } 2356 } 2357 if (!filesToRemove.isEmpty()) { 2358 clearCompactedfiles(filesToRemove); 2359 } 2360 throw fae; 2361 } 2362 } 2363 } 2364 if (!filesToRemove.isEmpty()) { 2365 // Clear the compactedfiles from the store file manager 2366 clearCompactedfiles(filesToRemove); 2367 // Try to send report of this archival to the Master for updating quota usage faster 2368 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2369 } 2370 } 2371 2372 /** 2373 * Computes the length of a store file without succumbing to any errors along the way. If an error 2374 * is encountered, the implementation returns {@code 0} instead of the actual size. 2375 * @param file The file to compute the size of. 2376 * @return The size in bytes of the provided {@code file}. 2377 */ 2378 long getStoreFileSize(HStoreFile file) { 2379 long length = 0; 2380 try { 2381 file.initReader(); 2382 length = file.getReader().length(); 2383 } catch (IOException e) { 2384 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2385 file, e); 2386 } finally { 2387 try { 2388 file.closeStoreFile( 2389 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2390 } catch (IOException e) { 2391 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file, 2392 e); 2393 } 2394 } 2395 return length; 2396 } 2397 2398 public Long preFlushSeqIDEstimation() { 2399 return memstore.preFlushSeqIDEstimation(); 2400 } 2401 2402 @Override 2403 public boolean isSloppyMemStore() { 2404 return this.memstore.isSloppy(); 2405 } 2406 2407 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2408 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2409 storeEngine.removeCompactedFiles(filesToRemove); 2410 } 2411 2412 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2413 // Sanity check from the caller 2414 if (archivedFiles.size() != fileSizes.size()) { 2415 throw new RuntimeException("Coding error: should never see lists of varying size"); 2416 } 2417 RegionServerServices rss = this.region.getRegionServerServices(); 2418 if (rss == null) { 2419 return; 2420 } 2421 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2422 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2423 for (StoreFile storeFile : archivedFiles) { 2424 final long fileSize = fileSizeIter.next(); 2425 if (storeFile.isHFile() && fileSize != 0) { 2426 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2427 } 2428 } 2429 if (LOG.isTraceEnabled()) { 2430 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2431 + filesWithSizes); 2432 } 2433 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2434 if (!success) { 2435 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2436 } 2437 } 2438 2439 @Override 2440 public int getCurrentParallelPutCount() { 2441 return currentParallelPutCount.get(); 2442 } 2443 2444 public int getStoreRefCount() { 2445 return this.storeEngine.getStoreFileManager().getStoreFiles().stream() 2446 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2447 .mapToInt(HStoreFile::getRefCount).sum(); 2448 } 2449 2450 /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */ 2451 public int getMaxCompactedStoreFileRefCount() { 2452 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2453 .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2454 .mapToInt(HStoreFile::getRefCount).max(); 2455 return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2456 } 2457 2458 @Override 2459 public long getMemstoreOnlyRowReadsCount() { 2460 return memstoreOnlyRowReadsCount.sum(); 2461 } 2462 2463 @Override 2464 public long getMixedRowReadsCount() { 2465 return mixedRowReadsCount.sum(); 2466 } 2467 2468 @Override 2469 public Configuration getReadOnlyConfiguration() { 2470 return new ReadOnlyConfiguration(this.conf); 2471 } 2472 2473 void updateMetricsStore(boolean memstoreRead) { 2474 if (memstoreRead) { 2475 memstoreOnlyRowReadsCount.increment(); 2476 } else { 2477 mixedRowReadsCount.increment(); 2478 } 2479 } 2480 2481 /** 2482 * Return the storefiles which are currently being written to. Mainly used by 2483 * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in 2484 * SFT yet. 2485 */ 2486 public Set<Path> getStoreFilesBeingWritten() { 2487 return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream()) 2488 .collect(Collectors.toSet()); 2489 } 2490 2491 @Override 2492 public long getBloomFilterRequestsCount() { 2493 return storeEngine.getBloomFilterMetrics().getRequestsCount(); 2494 } 2495 2496 @Override 2497 public long getBloomFilterNegativeResultsCount() { 2498 return storeEngine.getBloomFilterMetrics().getNegativeResultsCount(); 2499 } 2500 2501 @Override 2502 public long getBloomFilterEligibleRequestsCount() { 2503 return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount(); 2504 } 2505}