001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.concurrent.locks.ReentrantReadWriteLock; 052import java.util.function.Predicate; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsAction; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellComparator; 062import org.apache.hadoop.hbase.CellUtil; 063import org.apache.hadoop.hbase.CompoundConfiguration; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.MemoryCompactionPolicy; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.backup.FailedArchiveException; 068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 069import org.apache.hadoop.hbase.client.RegionInfo; 070import org.apache.hadoop.hbase.client.Scan; 071import org.apache.hadoop.hbase.conf.ConfigurationManager; 072import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 073import org.apache.hadoop.hbase.io.HeapSize; 074import org.apache.hadoop.hbase.io.compress.Compression; 075import org.apache.hadoop.hbase.io.crypto.Encryption; 076import org.apache.hadoop.hbase.io.hfile.CacheConfig; 077import org.apache.hadoop.hbase.io.hfile.HFile; 078import org.apache.hadoop.hbase.io.hfile.HFileContext; 079import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 084import org.apache.hadoop.hbase.log.HBaseMarkers; 085import org.apache.hadoop.hbase.monitoring.MonitoredTask; 086import org.apache.hadoop.hbase.quotas.RegionSizeStore; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 090import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 091import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 092import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 093import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 094import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 095import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 096import org.apache.hadoop.hbase.security.EncryptionUtil; 097import org.apache.hadoop.hbase.security.User; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.ChecksumType; 100import org.apache.hadoop.hbase.util.ClassSize; 101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 102import org.apache.hadoop.hbase.util.Pair; 103import org.apache.hadoop.hbase.util.ReflectionUtils; 104import org.apache.hadoop.util.StringUtils; 105import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 106import org.apache.yetus.audience.InterfaceAudience; 107import org.slf4j.Logger; 108import org.slf4j.LoggerFactory; 109import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 110import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 112import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 113import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 114import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 115import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 116import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 117import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 118import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 120/** 121 * A Store holds a column family in a Region. Its a memstore and a set of zero 122 * or more StoreFiles, which stretch backwards over time. 123 * 124 * <p>There's no reason to consider append-logging at this level; all logging 125 * and locking is handled at the HRegion level. Store just provides 126 * services to manage sets of StoreFiles. One of the most important of those 127 * services is compaction services where files are aggregated once they pass 128 * a configurable threshold. 129 * 130 * <p>Locking and transactions are handled at a higher level. This API should 131 * not be called directly but by an HRegion manager. 132 */ 133@InterfaceAudience.Private 134public class HStore implements Store, HeapSize, StoreConfigInformation, 135 PropagatingConfigurationObserver { 136 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 137 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 138 "hbase.server.compactchecker.interval.multiplier"; 139 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 140 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 141 // keep in accordance with HDFS default storage policy 142 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; 143 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 144 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 145 146 // HBASE-24428 : Update compaction priority for recently split daughter regions 147 // so as to prioritize their compaction. 148 // Any compaction candidate with higher priority than compaction of newly split daugher regions 149 // should have priority value < (Integer.MIN_VALUE + 1000) 150 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 151 152 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 153 154 protected final MemStore memstore; 155 // This stores directory in the filesystem. 156 protected final HRegion region; 157 private final ColumnFamilyDescriptor family; 158 private final HRegionFileSystem fs; 159 protected Configuration conf; 160 protected CacheConfig cacheConf; 161 private long lastCompactSize = 0; 162 volatile boolean forceMajor = false; 163 /* how many bytes to write between status checks */ 164 static int closeCheckInterval = 0; 165 private AtomicLong storeSize = new AtomicLong(); 166 private AtomicLong totalUncompressedBytes = new AtomicLong(); 167 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 168 // rows that has cells from both memstore and files (or only files) 169 private LongAdder mixedRowReadsCount = new LongAdder(); 170 171 private boolean cacheOnWriteLogged; 172 173 /** 174 * RWLock for store operations. 175 * Locked in shared mode when the list of component stores is looked at: 176 * - all reads/writes to table data 177 * - checking for split 178 * Locked in exclusive mode when the list of component stores is modified: 179 * - closing 180 * - completing a compaction 181 */ 182 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 183 /** 184 * Lock specific to archiving compacted store files. This avoids races around 185 * the combination of retrieving the list of compacted files and moving them to 186 * the archive directory. Since this is usually a background process (other than 187 * on close), we don't want to handle this with the store write lock, which would 188 * block readers and degrade performance. 189 * 190 * Locked by: 191 * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() 192 * - close() 193 */ 194 final ReentrantLock archiveLock = new ReentrantLock(); 195 196 private final boolean verifyBulkLoads; 197 198 /** 199 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 200 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will 201 * log a message that identifies the Store experience this high-level of concurrency. 202 */ 203 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 204 private final int parallelPutCountPrintThreshold; 205 206 private ScanInfo scanInfo; 207 208 // All access must be synchronized. 209 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 210 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 211 212 // All access must be synchronized. 213 private final Set<ChangedReadersObserver> changedReaderObservers = 214 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 215 216 protected final int blocksize; 217 private HFileDataBlockEncoder dataBlockEncoder; 218 219 /** Checksum configuration */ 220 protected ChecksumType checksumType; 221 protected int bytesPerChecksum; 222 223 // Comparing KeyValues 224 protected final CellComparator comparator; 225 226 final StoreEngine<?, ?, ?, ?> storeEngine; 227 228 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 229 private volatile OffPeakHours offPeakHours; 230 231 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 232 private int flushRetriesNumber; 233 private int pauseTime; 234 235 private long blockingFileCount; 236 private int compactionCheckMultiplier; 237 protected Encryption.Context cryptoContext = Encryption.Context.NONE; 238 239 private AtomicLong flushedCellsCount = new AtomicLong(); 240 private AtomicLong compactedCellsCount = new AtomicLong(); 241 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 242 private AtomicLong flushedCellsSize = new AtomicLong(); 243 private AtomicLong flushedOutputFileSize = new AtomicLong(); 244 private AtomicLong compactedCellsSize = new AtomicLong(); 245 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 246 247 /** 248 * Constructor 249 * @param family HColumnDescriptor for this column 250 * @param confParam configuration object failed. Can be null. 251 */ 252 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 253 final Configuration confParam, boolean warmup) throws IOException { 254 255 this.fs = region.getRegionFileSystem(); 256 257 // Assemble the store's home directory and Ensure it exists. 258 fs.createStoreDir(family.getNameAsString()); 259 this.region = region; 260 this.family = family; 261 // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor 262 // CompoundConfiguration will look for keys in reverse order of addition, so we'd 263 // add global config first, then table and cf overrides, then cf metadata. 264 this.conf = new CompoundConfiguration() 265 .add(confParam) 266 .addBytesMap(region.getTableDescriptor().getValues()) 267 .addStringMap(family.getConfiguration()) 268 .addBytesMap(family.getValues()); 269 this.blocksize = family.getBlocksize(); 270 271 // set block storage policy for store directory 272 String policyName = family.getStoragePolicy(); 273 if (null == policyName) { 274 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 275 } 276 this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); 277 278 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 279 280 this.comparator = region.getCellComparator(); 281 // used by ScanQueryMatcher 282 long timeToPurgeDeletes = 283 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 284 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 285 // Get TTL 286 long ttl = determineTTLFromFamily(family); 287 // Why not just pass a HColumnDescriptor in here altogether? Even if have 288 // to clone it? 289 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); 290 this.memstore = getMemstore(); 291 292 this.offPeakHours = OffPeakHours.getInstance(conf); 293 294 // Setting up cache configuration for this family 295 createCacheConf(family); 296 297 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 298 299 this.blockingFileCount = 300 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 301 this.compactionCheckMultiplier = conf.getInt( 302 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 303 if (this.compactionCheckMultiplier <= 0) { 304 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 305 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 306 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 307 } 308 309 if (HStore.closeCheckInterval == 0) { 310 HStore.closeCheckInterval = conf.getInt( 311 "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); 312 } 313 314 this.storeEngine = createStoreEngine(this, this.conf, this.comparator); 315 List<HStoreFile> hStoreFiles = loadStoreFiles(warmup); 316 // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read 317 // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and 318 // update the storeSize in the completeCompaction(..) finally (just like compaction) , so 319 // no need calculate the storeSize twice. 320 this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); 321 this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); 322 this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); 323 324 // Initialize checksum type from name. The names are CRC32, CRC32C, etc. 325 this.checksumType = getChecksumType(conf); 326 // Initialize bytes per checksum 327 this.bytesPerChecksum = getBytesPerChecksum(conf); 328 flushRetriesNumber = conf.getInt( 329 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 330 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 331 if (flushRetriesNumber <= 0) { 332 throw new IllegalArgumentException( 333 "hbase.hstore.flush.retries.number must be > 0, not " 334 + flushRetriesNumber); 335 } 336 cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); 337 338 int confPrintThreshold = 339 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 340 if (confPrintThreshold < 10) { 341 confPrintThreshold = 10; 342 } 343 this.parallelPutCountPrintThreshold = confPrintThreshold; 344 LOG.info("{} created, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 345 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 346 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 347 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), 348 family.getCompressionType()); 349 cacheOnWriteLogged = false; 350 } 351 352 /** 353 * @return MemStore Instance to use in this store. 354 */ 355 private MemStore getMemstore() { 356 MemStore ms = null; 357 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 358 MemoryCompactionPolicy inMemoryCompaction = null; 359 if (this.getTableName().isSystemTable()) { 360 inMemoryCompaction = MemoryCompactionPolicy 361 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 362 } else { 363 inMemoryCompaction = family.getInMemoryCompaction(); 364 } 365 if (inMemoryCompaction == null) { 366 inMemoryCompaction = 367 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 368 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 369 } 370 switch (inMemoryCompaction) { 371 case NONE: 372 ms = ReflectionUtils.newInstance(DefaultMemStore.class, 373 new Object[] { conf, this.comparator, 374 this.getHRegion().getRegionServicesForStores()}); 375 break; 376 default: 377 Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME, 378 CompactingMemStore.class, CompactingMemStore.class); 379 ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this, 380 this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); 381 } 382 return ms; 383 } 384 385 /** 386 * Creates the cache config. 387 * @param family The current column family. 388 */ 389 protected void createCacheConf(final ColumnFamilyDescriptor family) { 390 this.cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 391 region.getRegionServicesForStores().getByteBuffAllocator()); 392 LOG.info("Created cacheConfig: " + this.getCacheConfig() + " for " + this); 393 } 394 395 /** 396 * Creates the store engine configured for the given Store. 397 * @param store The store. An unfortunate dependency needed due to it 398 * being passed to coprocessors via the compactor. 399 * @param conf Store configuration. 400 * @param kvComparator KVComparator for storeFileManager. 401 * @return StoreEngine to use. 402 */ 403 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 404 CellComparator kvComparator) throws IOException { 405 return StoreEngine.create(store, conf, comparator); 406 } 407 408 /** 409 * @return TTL in seconds of the specified family 410 */ 411 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 412 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 413 long ttl = family.getTimeToLive(); 414 if (ttl == HConstants.FOREVER) { 415 // Default is unlimited ttl. 416 ttl = Long.MAX_VALUE; 417 } else if (ttl == -1) { 418 ttl = Long.MAX_VALUE; 419 } else { 420 // Second -> ms adjust for user data 421 ttl *= 1000; 422 } 423 return ttl; 424 } 425 426 @Override 427 public String getColumnFamilyName() { 428 return this.family.getNameAsString(); 429 } 430 431 @Override 432 public TableName getTableName() { 433 return this.getRegionInfo().getTable(); 434 } 435 436 @Override 437 public FileSystem getFileSystem() { 438 return this.fs.getFileSystem(); 439 } 440 441 public HRegionFileSystem getRegionFileSystem() { 442 return this.fs; 443 } 444 445 /* Implementation of StoreConfigInformation */ 446 @Override 447 public long getStoreFileTtl() { 448 // TTL only applies if there's no MIN_VERSIONs setting on the column. 449 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 450 } 451 452 @Override 453 public long getMemStoreFlushSize() { 454 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 455 return this.region.memstoreFlushSize; 456 } 457 458 @Override 459 public MemStoreSize getFlushableSize() { 460 return this.memstore.getFlushableSize(); 461 } 462 463 @Override 464 public MemStoreSize getSnapshotSize() { 465 return this.memstore.getSnapshotSize(); 466 } 467 468 @Override 469 public long getCompactionCheckMultiplier() { 470 return this.compactionCheckMultiplier; 471 } 472 473 @Override 474 public long getBlockingFileCount() { 475 return blockingFileCount; 476 } 477 /* End implementation of StoreConfigInformation */ 478 479 /** 480 * Returns the configured bytesPerChecksum value. 481 * @param conf The configuration 482 * @return The bytesPerChecksum that is set in the configuration 483 */ 484 public static int getBytesPerChecksum(Configuration conf) { 485 return conf.getInt(HConstants.BYTES_PER_CHECKSUM, 486 HFile.DEFAULT_BYTES_PER_CHECKSUM); 487 } 488 489 /** 490 * Returns the configured checksum algorithm. 491 * @param conf The configuration 492 * @return The checksum algorithm that is set in the configuration 493 */ 494 public static ChecksumType getChecksumType(Configuration conf) { 495 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); 496 if (checksumName == null) { 497 return ChecksumType.getDefaultChecksumType(); 498 } else { 499 return ChecksumType.nameToType(checksumName); 500 } 501 } 502 503 /** 504 * @return how many bytes to write between status checks 505 */ 506 public static int getCloseCheckInterval() { 507 return closeCheckInterval; 508 } 509 510 @Override 511 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 512 return this.family; 513 } 514 515 @Override 516 public OptionalLong getMaxSequenceId() { 517 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 518 } 519 520 @Override 521 public OptionalLong getMaxMemStoreTS() { 522 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 523 } 524 525 /** 526 * @param tabledir {@link Path} to where the table is being stored 527 * @param hri {@link RegionInfo} for the region. 528 * @param family {@link ColumnFamilyDescriptor} describing the column family 529 * @return Path to family/Store home directory. 530 * @deprecated Since 05/05/2013, HBase-7808, hbase-1.0.0 531 */ 532 @Deprecated 533 public static Path getStoreHomedir(final Path tabledir, 534 final RegionInfo hri, final byte[] family) { 535 return getStoreHomedir(tabledir, hri.getEncodedName(), family); 536 } 537 538 /** 539 * @param tabledir {@link Path} to where the table is being stored 540 * @param encodedName Encoded region name. 541 * @param family {@link ColumnFamilyDescriptor} describing the column family 542 * @return Path to family/Store home directory. 543 * @deprecated Since 05/05/2013, HBase-7808, hbase-1.0.0 544 */ 545 @Deprecated 546 public static Path getStoreHomedir(final Path tabledir, 547 final String encodedName, final byte[] family) { 548 return new Path(tabledir, new Path(encodedName, Bytes.toString(family))); 549 } 550 551 /** 552 * @return the data block encoder 553 */ 554 public HFileDataBlockEncoder getDataBlockEncoder() { 555 return dataBlockEncoder; 556 } 557 558 /** 559 * Should be used only in tests. 560 * @param blockEncoder the block delta encoder to use 561 */ 562 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 563 this.dataBlockEncoder = blockEncoder; 564 } 565 566 /** 567 * Creates an unsorted list of StoreFile loaded in parallel 568 * from the given directory. 569 */ 570 private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException { 571 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); 572 return openStoreFiles(files, warmup); 573 } 574 575 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 576 throws IOException { 577 if (CollectionUtils.isEmpty(files)) { 578 return Collections.emptyList(); 579 } 580 // initialize the thread pool for opening store files in parallel.. 581 ThreadPoolExecutor storeFileOpenerThreadPool = 582 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-" + 583 this.getColumnFamilyName()); 584 CompletionService<HStoreFile> completionService = 585 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 586 587 int totalValidStoreFile = 0; 588 for (StoreFileInfo storeFileInfo : files) { 589 // open each store file in parallel 590 completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); 591 totalValidStoreFile++; 592 } 593 594 Set<String> compactedStoreFiles = new HashSet<>(); 595 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 596 IOException ioe = null; 597 try { 598 for (int i = 0; i < totalValidStoreFile; i++) { 599 try { 600 HStoreFile storeFile = completionService.take().get(); 601 if (storeFile != null) { 602 LOG.debug("loaded {}", storeFile); 603 results.add(storeFile); 604 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 605 } 606 } catch (InterruptedException e) { 607 if (ioe == null) { 608 ioe = new InterruptedIOException(e.getMessage()); 609 } 610 } catch (ExecutionException e) { 611 if (ioe == null) { 612 ioe = new IOException(e.getCause()); 613 } 614 } 615 } 616 } finally { 617 storeFileOpenerThreadPool.shutdownNow(); 618 } 619 if (ioe != null) { 620 // close StoreFile readers 621 boolean evictOnClose = 622 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 623 for (HStoreFile file : results) { 624 try { 625 if (file != null) { 626 file.closeStoreFile(evictOnClose); 627 } 628 } catch (IOException e) { 629 LOG.warn("Could not close store file {}", file, e); 630 } 631 } 632 throw ioe; 633 } 634 635 // Should not archive the compacted store files when region warmup. See HBASE-22163. 636 if (!warmup) { 637 // Remove the compacted files from result 638 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 639 for (HStoreFile storeFile : results) { 640 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 641 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 642 storeFile.getReader().close(storeFile.getCacheConf() != null ? 643 storeFile.getCacheConf().shouldEvictOnClose() : true); 644 filesToRemove.add(storeFile); 645 } 646 } 647 results.removeAll(filesToRemove); 648 if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) { 649 LOG.debug("Moving the files {} to archive", filesToRemove); 650 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 651 } 652 } 653 654 return results; 655 } 656 657 @Override 658 public void refreshStoreFiles() throws IOException { 659 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName()); 660 refreshStoreFilesInternal(newFiles); 661 } 662 663 /** 664 * Replaces the store files that the store has with the given files. Mainly used by secondary 665 * region replicas to keep up to date with the primary region files. 666 */ 667 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 668 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 669 for (String file : newFiles) { 670 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); 671 } 672 refreshStoreFilesInternal(storeFiles); 673 } 674 675 /** 676 * Checks the underlying store files, and opens the files that have not 677 * been opened, and removes the store file readers for store files no longer 678 * available. Mainly used by secondary region replicas to keep up to date with 679 * the primary region files. 680 */ 681 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 682 StoreFileManager sfm = storeEngine.getStoreFileManager(); 683 Collection<HStoreFile> currentFiles = sfm.getStorefiles(); 684 Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); 685 if (currentFiles == null) { 686 currentFiles = Collections.emptySet(); 687 } 688 if (newFiles == null) { 689 newFiles = Collections.emptySet(); 690 } 691 if (compactedFiles == null) { 692 compactedFiles = Collections.emptySet(); 693 } 694 695 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 696 for (HStoreFile sf : currentFiles) { 697 currentFilesSet.put(sf.getFileInfo(), sf); 698 } 699 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 700 for (HStoreFile sf : compactedFiles) { 701 compactedFilesSet.put(sf.getFileInfo(), sf); 702 } 703 704 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 705 // Exclude the files that have already been compacted 706 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 707 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 708 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 709 710 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 711 return; 712 } 713 714 LOG.info("Refreshing store files for " + this + " files to add: " 715 + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); 716 717 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 718 for (StoreFileInfo sfi : toBeRemovedFiles) { 719 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 720 } 721 722 // try to open the files 723 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 724 725 // propogate the file changes to the underlying store file manager 726 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception 727 728 // Advance the memstore read point to be at least the new store files seqIds so that 729 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 730 // in-flight transactions might be made visible) 731 if (!toBeAddedFiles.isEmpty()) { 732 // we must have the max sequence id here as we do have several store files 733 region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); 734 } 735 736 completeCompaction(toBeRemovedStoreFiles); 737 } 738 739 @VisibleForTesting 740 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { 741 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), 742 p, isPrimaryReplicaStore()); 743 return createStoreFileAndReader(info); 744 } 745 746 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 747 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); 748 HStoreFile storeFile = new HStoreFile(info, this.family.getBloomFilterType(), this.cacheConf); 749 storeFile.initReader(); 750 return storeFile; 751 } 752 753 /** 754 * This message intends to inform the MemStore that next coming updates 755 * are going to be part of the replaying edits from WAL 756 */ 757 public void startReplayingFromWAL(){ 758 this.memstore.startReplayingFromWAL(); 759 } 760 761 /** 762 * This message intends to inform the MemStore that the replaying edits from WAL 763 * are done 764 */ 765 public void stopReplayingFromWAL(){ 766 this.memstore.stopReplayingFromWAL(); 767 } 768 769 /** 770 * Adds a value to the memstore 771 */ 772 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 773 lock.readLock().lock(); 774 try { 775 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 776 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 777 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 778 } 779 this.memstore.add(cell, memstoreSizing); 780 } finally { 781 lock.readLock().unlock(); 782 currentParallelPutCount.decrementAndGet(); 783 } 784 } 785 786 /** 787 * Adds the specified value to the memstore 788 */ 789 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 790 lock.readLock().lock(); 791 try { 792 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 793 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 794 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 795 } 796 memstore.add(cells, memstoreSizing); 797 } finally { 798 lock.readLock().unlock(); 799 currentParallelPutCount.decrementAndGet(); 800 } 801 } 802 803 @Override 804 public long timeOfOldestEdit() { 805 return memstore.timeOfOldestEdit(); 806 } 807 808 /** 809 * @return All store files. 810 */ 811 @Override 812 public Collection<HStoreFile> getStorefiles() { 813 return this.storeEngine.getStoreFileManager().getStorefiles(); 814 } 815 816 @Override 817 public Collection<HStoreFile> getCompactedFiles() { 818 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 819 } 820 821 /** 822 * This throws a WrongRegionException if the HFile does not fit in this region, or an 823 * InvalidHFileException if the HFile is not valid. 824 */ 825 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 826 HFile.Reader reader = null; 827 try { 828 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 829 FileSystem srcFs = srcPath.getFileSystem(conf); 830 srcFs.access(srcPath, FsAction.READ_WRITE); 831 reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf); 832 833 Optional<byte[]> firstKey = reader.getFirstRowKey(); 834 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 835 Optional<Cell> lk = reader.getLastKey(); 836 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 837 byte[] lastKey = CellUtil.cloneRow(lk.get()); 838 839 if (LOG.isDebugEnabled()) { 840 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + 841 " last=" + Bytes.toStringBinary(lastKey)); 842 LOG.debug("Region bounds: first=" + 843 Bytes.toStringBinary(getRegionInfo().getStartKey()) + 844 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 845 } 846 847 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 848 throw new WrongRegionException( 849 "Bulk load file " + srcPath.toString() + " does not fit inside region " 850 + this.getRegionInfo().getRegionNameAsString()); 851 } 852 853 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, 854 HConstants.DEFAULT_MAX_FILE_SIZE)) { 855 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + 856 reader.length() + " bytes can be problematic as it may lead to oversplitting."); 857 } 858 859 if (verifyBulkLoads) { 860 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 861 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 862 Cell prevCell = null; 863 HFileScanner scanner = reader.getScanner(false, false, false); 864 scanner.seekTo(); 865 do { 866 Cell cell = scanner.getCell(); 867 if (prevCell != null) { 868 if (comparator.compareRows(prevCell, cell) > 0) { 869 throw new InvalidHFileException("Previous row is greater than" 870 + " current row: path=" + srcPath + " previous=" 871 + CellUtil.getCellKeyAsString(prevCell) + " current=" 872 + CellUtil.getCellKeyAsString(cell)); 873 } 874 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 875 throw new InvalidHFileException("Previous key had different" 876 + " family compared to current key: path=" + srcPath 877 + " previous=" 878 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 879 prevCell.getFamilyLength()) 880 + " current=" 881 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 882 cell.getFamilyLength())); 883 } 884 } 885 prevCell = cell; 886 } while (scanner.next()); 887 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + 888 " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 889 } 890 } finally { 891 if (reader != null) { 892 reader.close(); 893 } 894 } 895 } 896 897 /** 898 * This method should only be called from Region. It is assumed that the ranges of values in the 899 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 900 * 901 * @param seqNum sequence Id associated with the HFile 902 */ 903 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 904 Path srcPath = new Path(srcPathStr); 905 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 906 } 907 908 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 909 Path srcPath = new Path(srcPathStr); 910 try { 911 fs.commitStoreFile(srcPath, dstPath); 912 } finally { 913 if (this.getCoprocessorHost() != null) { 914 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 915 } 916 } 917 918 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " 919 + dstPath + " - updating store file list."); 920 921 HStoreFile sf = createStoreFileAndReader(dstPath); 922 bulkLoadHFile(sf); 923 924 LOG.info("Successfully loaded {} into {} (new location: {})", 925 srcPath, this, dstPath); 926 927 return dstPath; 928 } 929 930 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 931 HStoreFile sf = createStoreFileAndReader(fileInfo); 932 bulkLoadHFile(sf); 933 } 934 935 private void bulkLoadHFile(HStoreFile sf) throws IOException { 936 StoreFileReader r = sf.getReader(); 937 this.storeSize.addAndGet(r.length()); 938 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 939 940 // Append the new storefile into the list 941 this.lock.writeLock().lock(); 942 try { 943 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 944 } finally { 945 // We need the lock, as long as we are updating the storeFiles 946 // or changing the memstore. Let us release it before calling 947 // notifyChangeReadersObservers. See HBASE-4485 for a possible 948 // deadlock scenario that could have happened if continue to hold 949 // the lock. 950 this.lock.writeLock().unlock(); 951 } 952 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 953 if (LOG.isTraceEnabled()) { 954 String traceMessage = "BULK LOAD time,size,store size,store files [" 955 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize 956 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 957 LOG.trace(traceMessage); 958 } 959 } 960 961 /** 962 * Close all the readers We don't need to worry about subsequent requests because the Region holds 963 * a write lock that will prevent any more reads or writes. 964 * @return the {@link StoreFile StoreFiles} that were previously being used. 965 * @throws IOException on failure 966 */ 967 public ImmutableCollection<HStoreFile> close() throws IOException { 968 this.archiveLock.lock(); 969 this.lock.writeLock().lock(); 970 try { 971 // Clear so metrics doesn't find them. 972 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 973 Collection<HStoreFile> compactedfiles = 974 storeEngine.getStoreFileManager().clearCompactedFiles(); 975 // clear the compacted files 976 if (CollectionUtils.isNotEmpty(compactedfiles)) { 977 removeCompactedfiles(compactedfiles, cacheConf != null ? 978 cacheConf.shouldEvictOnClose() : true); 979 } 980 if (!result.isEmpty()) { 981 // initialize the thread pool for closing store files in parallel. 982 ThreadPoolExecutor storeFileCloserThreadPool = this.region 983 .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 984 + this.getColumnFamilyName()); 985 986 // close each store file in parallel 987 CompletionService<Void> completionService = 988 new ExecutorCompletionService<>(storeFileCloserThreadPool); 989 for (HStoreFile f : result) { 990 completionService.submit(new Callable<Void>() { 991 @Override 992 public Void call() throws IOException { 993 boolean evictOnClose = 994 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 995 f.closeStoreFile(evictOnClose); 996 return null; 997 } 998 }); 999 } 1000 1001 IOException ioe = null; 1002 try { 1003 for (int i = 0; i < result.size(); i++) { 1004 try { 1005 Future<Void> future = completionService.take(); 1006 future.get(); 1007 } catch (InterruptedException e) { 1008 if (ioe == null) { 1009 ioe = new InterruptedIOException(); 1010 ioe.initCause(e); 1011 } 1012 } catch (ExecutionException e) { 1013 if (ioe == null) { 1014 ioe = new IOException(e.getCause()); 1015 } 1016 } 1017 } 1018 } finally { 1019 storeFileCloserThreadPool.shutdownNow(); 1020 } 1021 if (ioe != null) { 1022 throw ioe; 1023 } 1024 } 1025 LOG.trace("Closed {}", this); 1026 return result; 1027 } finally { 1028 this.lock.writeLock().unlock(); 1029 this.archiveLock.unlock(); 1030 } 1031 } 1032 1033 /** 1034 * Snapshot this stores memstore. Call before running 1035 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, 1036 * FlushLifeCycleTracker)} 1037 * so it has some work to do. 1038 */ 1039 void snapshot() { 1040 this.lock.writeLock().lock(); 1041 try { 1042 this.memstore.snapshot(); 1043 } finally { 1044 this.lock.writeLock().unlock(); 1045 } 1046 } 1047 1048 /** 1049 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. 1050 * @param logCacheFlushId flush sequence number 1051 * @return The path name of the tmp file to which the store was flushed 1052 * @throws IOException if exception occurs during process 1053 */ 1054 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 1055 MonitoredTask status, ThroughputController throughputController, 1056 FlushLifeCycleTracker tracker) throws IOException { 1057 // If an exception happens flushing, we let it out without clearing 1058 // the memstore snapshot. The old snapshot will be returned when we say 1059 // 'snapshot', the next time flush comes around. 1060 // Retry after catching exception when flushing, otherwise server will abort 1061 // itself 1062 StoreFlusher flusher = storeEngine.getStoreFlusher(); 1063 IOException lastException = null; 1064 for (int i = 0; i < flushRetriesNumber; i++) { 1065 try { 1066 List<Path> pathNames = 1067 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); 1068 Path lastPathName = null; 1069 try { 1070 for (Path pathName : pathNames) { 1071 lastPathName = pathName; 1072 validateStoreFile(pathName); 1073 } 1074 return pathNames; 1075 } catch (Exception e) { 1076 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 1077 if (e instanceof IOException) { 1078 lastException = (IOException) e; 1079 } else { 1080 lastException = new IOException(e); 1081 } 1082 } 1083 } catch (IOException e) { 1084 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 1085 lastException = e; 1086 } 1087 if (lastException != null && i < (flushRetriesNumber - 1)) { 1088 try { 1089 Thread.sleep(pauseTime); 1090 } catch (InterruptedException e) { 1091 IOException iie = new InterruptedIOException(); 1092 iie.initCause(e); 1093 throw iie; 1094 } 1095 } 1096 } 1097 throw lastException; 1098 } 1099 1100 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 1101 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 1102 FileSystem srcFs = path.getFileSystem(conf); 1103 srcFs.access(path, FsAction.READ_WRITE); 1104 try (HFile.Reader reader = 1105 HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) { 1106 Optional<byte[]> firstKey = reader.getFirstRowKey(); 1107 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 1108 Optional<Cell> lk = reader.getLastKey(); 1109 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 1110 byte[] lastKey = CellUtil.cloneRow(lk.get()); 1111 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 1112 throw new WrongRegionException("Recovered hfile " + path.toString() + 1113 " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 1114 } 1115 } 1116 1117 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1118 HStoreFile sf = createStoreFileAndReader(dstPath); 1119 StoreFileReader r = sf.getReader(); 1120 this.storeSize.addAndGet(r.length()); 1121 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1122 1123 this.lock.writeLock().lock(); 1124 try { 1125 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 1126 } finally { 1127 this.lock.writeLock().unlock(); 1128 } 1129 1130 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 1131 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 1132 return sf; 1133 } 1134 1135 /** 1136 * @param path The pathname of the tmp file into which the store was flushed 1137 * @return store file created. 1138 */ 1139 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) 1140 throws IOException { 1141 // Write-out finished successfully, move into the right spot 1142 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1143 1144 status.setStatus("Flushing " + this + ": reopening flushed file"); 1145 HStoreFile sf = createStoreFileAndReader(dstPath); 1146 1147 StoreFileReader r = sf.getReader(); 1148 this.storeSize.addAndGet(r.length()); 1149 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1150 1151 if (LOG.isInfoEnabled()) { 1152 LOG.info("Added " + sf + ", entries=" + r.getEntries() + 1153 ", sequenceid=" + logCacheFlushId + 1154 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1155 } 1156 return sf; 1157 } 1158 1159 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1160 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1161 boolean shouldDropBehind) throws IOException { 1162 return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, 1163 includesTag, shouldDropBehind, -1); 1164 } 1165 1166 /** 1167 * @param compression Compression algorithm to use 1168 * @param isCompaction whether we are creating a new file in a compaction 1169 * @param includeMVCCReadpoint - whether to include MVCC or not 1170 * @param includesTag - includesTag or not 1171 * @return Writer for a new StoreFile in the tmp dir. 1172 */ 1173 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of 1174 // compaction 1175 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1176 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1177 boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { 1178 // creating new cache config for each new writer 1179 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 1180 if (isCompaction) { 1181 // Don't cache data on write on compactions, unless specifically configured to do so 1182 // Cache only when total file size remains lower than configured threshold 1183 final boolean cacheCompactedBlocksOnWrite = 1184 cacheConf.shouldCacheCompactedBlocksOnWrite(); 1185 // if data blocks are to be cached on write 1186 // during compaction, we should forcefully 1187 // cache index and bloom blocks as well 1188 if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf 1189 .getCacheCompactedBlocksOnWriteThreshold()) { 1190 writerCacheConf.enableCacheOnWrite(); 1191 if (!cacheOnWriteLogged) { 1192 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + 1193 "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 1194 cacheOnWriteLogged = true; 1195 } 1196 } else { 1197 writerCacheConf.setCacheDataOnWrite(false); 1198 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 1199 // checking condition once again for logging 1200 LOG.debug( 1201 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 1202 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 1203 this, totalCompactedFilesSize, 1204 cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 1205 } 1206 } 1207 } else { 1208 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 1209 if (shouldCacheDataOnWrite) { 1210 writerCacheConf.enableCacheOnWrite(); 1211 if (!cacheOnWriteLogged) { 1212 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + 1213 "Index blocks and Bloom filter blocks", this); 1214 cacheOnWriteLogged = true; 1215 } 1216 } 1217 } 1218 InetSocketAddress[] favoredNodes = null; 1219 if (region.getRegionServerServices() != null) { 1220 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( 1221 region.getRegionInfo().getEncodedName()); 1222 } 1223 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, 1224 cryptoContext); 1225 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); 1226 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, 1227 this.getFileSystem()) 1228 .withOutputDir(familyTempDir) 1229 .withBloomType(family.getBloomFilterType()) 1230 .withMaxKeyCount(maxKeyCount) 1231 .withFavoredNodes(favoredNodes) 1232 .withFileContext(hFileContext) 1233 .withShouldDropCacheBehind(shouldDropBehind) 1234 .withCompactedFilesSupplier(this::getCompactedFiles); 1235 return builder.build(); 1236 } 1237 1238 private HFileContext createFileContext(Compression.Algorithm compression, 1239 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { 1240 if (compression == null) { 1241 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 1242 } 1243 HFileContext hFileContext = new HFileContextBuilder() 1244 .withIncludesMvcc(includeMVCCReadpoint) 1245 .withIncludesTags(includesTag) 1246 .withCompression(compression) 1247 .withCompressTags(family.isCompressTags()) 1248 .withChecksumType(checksumType) 1249 .withBytesPerCheckSum(bytesPerChecksum) 1250 .withBlockSize(blocksize) 1251 .withHBaseCheckSum(true) 1252 .withDataBlockEncoding(family.getDataBlockEncoding()) 1253 .withEncryptionContext(cryptoContext) 1254 .withCreateTime(EnvironmentEdgeManager.currentTime()) 1255 .withColumnFamily(family.getName()) 1256 .withTableName(region.getTableDescriptor() 1257 .getTableName().getName()) 1258 .withCellComparator(this.comparator) 1259 .build(); 1260 return hFileContext; 1261 } 1262 1263 1264 private long getTotalSize(Collection<HStoreFile> sfs) { 1265 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 1266 } 1267 1268 /** 1269 * Change storeFiles adding into place the Reader produced by this new flush. 1270 * @param sfs Store files 1271 * @return Whether compaction is required. 1272 */ 1273 private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { 1274 this.lock.writeLock().lock(); 1275 try { 1276 this.storeEngine.getStoreFileManager().insertNewFiles(sfs); 1277 if (snapshotId > 0) { 1278 this.memstore.clearSnapshot(snapshotId); 1279 } 1280 } finally { 1281 // We need the lock, as long as we are updating the storeFiles 1282 // or changing the memstore. Let us release it before calling 1283 // notifyChangeReadersObservers. See HBASE-4485 for a possible 1284 // deadlock scenario that could have happened if continue to hold 1285 // the lock. 1286 this.lock.writeLock().unlock(); 1287 } 1288 // notify to be called here - only in case of flushes 1289 notifyChangedReadersObservers(sfs); 1290 if (LOG.isTraceEnabled()) { 1291 long totalSize = getTotalSize(sfs); 1292 String traceMessage = "FLUSH time,count,size,store size,store files [" 1293 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize 1294 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 1295 LOG.trace(traceMessage); 1296 } 1297 return needsCompaction(); 1298 } 1299 1300 /** 1301 * Notify all observers that set of Readers has changed. 1302 */ 1303 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 1304 for (ChangedReadersObserver o : this.changedReaderObservers) { 1305 List<KeyValueScanner> memStoreScanners; 1306 this.lock.readLock().lock(); 1307 try { 1308 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 1309 } finally { 1310 this.lock.readLock().unlock(); 1311 } 1312 o.updateReaders(sfs, memStoreScanners); 1313 } 1314 } 1315 1316 /** 1317 * Get all scanners with no filtering based on TTL (that happens further down the line). 1318 * @param cacheBlocks cache the blocks or not 1319 * @param usePread true to use pread, false if not 1320 * @param isCompaction true if the scanner is created for compaction 1321 * @param matcher the scan query matcher 1322 * @param startRow the start row 1323 * @param stopRow the stop row 1324 * @param readPt the read point of the current scan 1325 * @return all scanners for this store 1326 */ 1327 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 1328 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 1329 throws IOException { 1330 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 1331 readPt); 1332 } 1333 1334 /** 1335 * Get all scanners with no filtering based on TTL (that happens further down the line). 1336 * @param cacheBlocks cache the blocks or not 1337 * @param usePread true to use pread, false if not 1338 * @param isCompaction true if the scanner is created for compaction 1339 * @param matcher the scan query matcher 1340 * @param startRow the start row 1341 * @param includeStartRow true to include start row, false if not 1342 * @param stopRow the stop row 1343 * @param includeStopRow true to include stop row, false if not 1344 * @param readPt the read point of the current scan 1345 * @return all scanners for this store 1346 */ 1347 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 1348 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 1349 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 1350 Collection<HStoreFile> storeFilesToScan; 1351 List<KeyValueScanner> memStoreScanners; 1352 this.lock.readLock().lock(); 1353 try { 1354 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 1355 includeStartRow, stopRow, includeStopRow); 1356 memStoreScanners = this.memstore.getScanners(readPt); 1357 } finally { 1358 this.lock.readLock().unlock(); 1359 } 1360 1361 try { 1362 // First the store file scanners 1363 1364 // TODO this used to get the store files in descending order, 1365 // but now we get them in ascending order, which I think is 1366 // actually more correct, since memstore get put at the end. 1367 List<StoreFileScanner> sfScanners = StoreFileScanner 1368 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, 1369 matcher, readPt); 1370 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1371 scanners.addAll(sfScanners); 1372 // Then the memstore scanners 1373 scanners.addAll(memStoreScanners); 1374 return scanners; 1375 } catch (Throwable t) { 1376 clearAndClose(memStoreScanners); 1377 throw t instanceof IOException ? (IOException) t : new IOException(t); 1378 } 1379 } 1380 1381 private static void clearAndClose(List<KeyValueScanner> scanners) { 1382 if (scanners == null) { 1383 return; 1384 } 1385 for (KeyValueScanner s : scanners) { 1386 s.close(); 1387 } 1388 scanners.clear(); 1389 } 1390 1391 /** 1392 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1393 * (that happens further down the line). 1394 * @param files the list of files on which the scanners has to be created 1395 * @param cacheBlocks cache the blocks or not 1396 * @param usePread true to use pread, false if not 1397 * @param isCompaction true if the scanner is created for compaction 1398 * @param matcher the scan query matcher 1399 * @param startRow the start row 1400 * @param stopRow the stop row 1401 * @param readPt the read point of the current scan 1402 * @param includeMemstoreScanner true if memstore has to be included 1403 * @return scanners on the given files and on the memstore if specified 1404 */ 1405 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1406 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1407 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1408 throws IOException { 1409 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1410 false, readPt, includeMemstoreScanner); 1411 } 1412 1413 /** 1414 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1415 * (that happens further down the line). 1416 * @param files the list of files on which the scanners has to be created 1417 * @param cacheBlocks ache the blocks or not 1418 * @param usePread true to use pread, false if not 1419 * @param isCompaction true if the scanner is created for compaction 1420 * @param matcher the scan query matcher 1421 * @param startRow the start row 1422 * @param includeStartRow true to include start row, false if not 1423 * @param stopRow the stop row 1424 * @param includeStopRow true to include stop row, false if not 1425 * @param readPt the read point of the current scan 1426 * @param includeMemstoreScanner true if memstore has to be included 1427 * @return scanners on the given files and on the memstore if specified 1428 */ 1429 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1430 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1431 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1432 boolean includeMemstoreScanner) throws IOException { 1433 List<KeyValueScanner> memStoreScanners = null; 1434 if (includeMemstoreScanner) { 1435 this.lock.readLock().lock(); 1436 try { 1437 memStoreScanners = this.memstore.getScanners(readPt); 1438 } finally { 1439 this.lock.readLock().unlock(); 1440 } 1441 } 1442 try { 1443 List<StoreFileScanner> sfScanners = StoreFileScanner 1444 .getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher, 1445 readPt); 1446 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1447 scanners.addAll(sfScanners); 1448 // Then the memstore scanners 1449 if (memStoreScanners != null) { 1450 scanners.addAll(memStoreScanners); 1451 } 1452 return scanners; 1453 } catch (Throwable t) { 1454 clearAndClose(memStoreScanners); 1455 throw t instanceof IOException ? (IOException) t : new IOException(t); 1456 } 1457 } 1458 1459 /** 1460 * @param o Observer who wants to know about changes in set of Readers 1461 */ 1462 public void addChangedReaderObserver(ChangedReadersObserver o) { 1463 this.changedReaderObservers.add(o); 1464 } 1465 1466 /** 1467 * @param o Observer no longer interested in changes in set of Readers. 1468 */ 1469 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1470 // We don't check if observer present; it may not be (legitimately) 1471 this.changedReaderObservers.remove(o); 1472 } 1473 1474 ////////////////////////////////////////////////////////////////////////////// 1475 // Compaction 1476 ////////////////////////////////////////////////////////////////////////////// 1477 1478 /** 1479 * Compact the StoreFiles. This method may take some time, so the calling 1480 * thread must be able to block for long periods. 1481 * 1482 * <p>During this time, the Store can work as usual, getting values from 1483 * StoreFiles and writing new StoreFiles from the memstore. 1484 * 1485 * Existing StoreFiles are not destroyed until the new compacted StoreFile is 1486 * completely written-out to disk. 1487 * 1488 * <p>The compactLock prevents multiple simultaneous compactions. 1489 * The structureLock prevents us from interfering with other write operations. 1490 * 1491 * <p>We don't want to hold the structureLock for the whole time, as a compact() 1492 * can be lengthy and we want to allow cache-flushes during this period. 1493 * 1494 * <p> Compaction event should be idempotent, since there is no IO Fencing for 1495 * the region directory in hdfs. A region server might still try to complete the 1496 * compaction after it lost the region. That is why the following events are carefully 1497 * ordered for a compaction: 1498 * 1. Compaction writes new files under region/.tmp directory (compaction output) 1499 * 2. Compaction atomically moves the temporary file under region directory 1500 * 3. Compaction appends a WAL edit containing the compaction input and output files. 1501 * Forces sync on WAL. 1502 * 4. Compaction deletes the input files from the region directory. 1503 * 1504 * Failure conditions are handled like this: 1505 * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes 1506 * the compaction later, it will only write the new data file to the region directory. 1507 * Since we already have this data, this will be idempotent but we will have a redundant 1508 * copy of the data. 1509 * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The 1510 * RS that failed won't be able to finish sync() for WAL because of lease recovery in WAL. 1511 * - If RS fails after 3, the region region server who opens the region will pick up the 1512 * the compaction marker from the WAL and replay it by removing the compaction input files. 1513 * Failed RS can also attempt to delete those files, but the operation will be idempotent 1514 * 1515 * See HBASE-2231 for details. 1516 * 1517 * @param compaction compaction details obtained from requestCompaction() 1518 * @return Storefile we compacted into or null if we failed or opted out early. 1519 */ 1520 public List<HStoreFile> compact(CompactionContext compaction, 1521 ThroughputController throughputController, User user) throws IOException { 1522 assert compaction != null; 1523 CompactionRequestImpl cr = compaction.getRequest(); 1524 try { 1525 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1526 // because we need to clean up after it on the way out in a finally 1527 // block below 1528 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1529 assert compaction.hasSelection(); 1530 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1531 assert !filesToCompact.isEmpty(); 1532 synchronized (filesCompacting) { 1533 // sanity check: we're compacting files that this store knows about 1534 // TODO: change this to LOG.error() after more debugging 1535 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1536 } 1537 1538 // Ready to go. Have list of files to compact. 1539 LOG.info("Starting compaction of " + filesToCompact + 1540 " into tmpdir=" + fs.getTempDir() + ", totalSize=" + 1541 TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1542 1543 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1544 compaction.compact(throughputController, user)); 1545 } finally { 1546 finishCompactionRequest(cr); 1547 } 1548 } 1549 1550 @VisibleForTesting 1551 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1552 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 1553 List<Path> newFiles) throws IOException { 1554 // Do the steps necessary to complete the compaction. 1555 List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); 1556 writeCompactionWalRecord(filesToCompact, sfs); 1557 replaceStoreFiles(filesToCompact, sfs); 1558 if (cr.isMajor()) { 1559 majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1560 majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1561 } else { 1562 compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1563 compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1564 } 1565 long outputBytes = getTotalSize(sfs); 1566 1567 // At this point the store will use new files for all new scanners. 1568 completeCompaction(filesToCompact); // update store size. 1569 1570 long now = EnvironmentEdgeManager.currentTime(); 1571 if (region.getRegionServerServices() != null 1572 && region.getRegionServerServices().getMetrics() != null) { 1573 region.getRegionServerServices().getMetrics().updateCompaction( 1574 region.getTableDescriptor().getTableName().getNameAsString(), 1575 cr.isMajor(), now - compactionStartTime, cr.getFiles().size(), 1576 newFiles.size(), cr.getSize(), outputBytes); 1577 1578 } 1579 1580 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1581 return sfs; 1582 } 1583 1584 private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, 1585 List<Path> newFiles, User user) throws IOException { 1586 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 1587 for (Path newFile : newFiles) { 1588 assert newFile != null; 1589 HStoreFile sf = moveFileIntoPlace(newFile); 1590 if (this.getCoprocessorHost() != null) { 1591 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1592 } 1593 assert sf != null; 1594 sfs.add(sf); 1595 } 1596 return sfs; 1597 } 1598 1599 // Package-visible for tests 1600 HStoreFile moveFileIntoPlace(Path newFile) throws IOException { 1601 validateStoreFile(newFile); 1602 // Move the file into the right spot 1603 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); 1604 return createStoreFileAndReader(destPath); 1605 } 1606 1607 /** 1608 * Writes the compaction WAL record. 1609 * @param filesCompacted Files compacted (input). 1610 * @param newFiles Files from compaction. 1611 */ 1612 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1613 Collection<HStoreFile> newFiles) throws IOException { 1614 if (region.getWAL() == null) { 1615 return; 1616 } 1617 List<Path> inputPaths = 1618 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1619 List<Path> outputPaths = 1620 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1621 RegionInfo info = this.region.getRegionInfo(); 1622 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1623 family.getName(), inputPaths, outputPaths, 1624 fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1625 // Fix reaching into Region to get the maxWaitForSeqId. 1626 // Does this method belong in Region altogether given it is making so many references up there? 1627 // Could be Region#writeCompactionMarker(compactionDescriptor); 1628 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1629 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1630 } 1631 1632 @VisibleForTesting 1633 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) 1634 throws IOException { 1635 this.lock.writeLock().lock(); 1636 try { 1637 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); 1638 synchronized (filesCompacting) { 1639 filesCompacting.removeAll(compactedFiles); 1640 } 1641 1642 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1643 // sizes later so it's not super-critical if we miss these. 1644 RegionServerServices rsServices = region.getRegionServerServices(); 1645 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1646 updateSpaceQuotaAfterFileReplacement( 1647 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1648 compactedFiles, result); 1649 } 1650 } finally { 1651 this.lock.writeLock().unlock(); 1652 } 1653 } 1654 1655 /** 1656 * Updates the space quota usage for this region, removing the size for files compacted away 1657 * and adding in the size for new files. 1658 * 1659 * @param sizeStore The object tracking changes in region size for space quotas. 1660 * @param regionInfo The identifier for the region whose size is being updated. 1661 * @param oldFiles Files removed from this store's region. 1662 * @param newFiles Files added to this store's region. 1663 */ 1664 void updateSpaceQuotaAfterFileReplacement( 1665 RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles, 1666 Collection<HStoreFile> newFiles) { 1667 long delta = 0; 1668 if (oldFiles != null) { 1669 for (HStoreFile compactedFile : oldFiles) { 1670 if (compactedFile.isHFile()) { 1671 delta -= compactedFile.getReader().length(); 1672 } 1673 } 1674 } 1675 if (newFiles != null) { 1676 for (HStoreFile newFile : newFiles) { 1677 if (newFile.isHFile()) { 1678 delta += newFile.getReader().length(); 1679 } 1680 } 1681 } 1682 sizeStore.incrementRegionSize(regionInfo, delta); 1683 } 1684 1685 /** 1686 * Log a very elaborate compaction completion message. 1687 * @param cr Request. 1688 * @param sfs Resulting files. 1689 * @param compactionStartTime Start time. 1690 */ 1691 private void logCompactionEndMessage( 1692 CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) { 1693 StringBuilder message = new StringBuilder( 1694 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " 1695 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " 1696 + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1697 if (sfs.isEmpty()) { 1698 message.append("none, "); 1699 } else { 1700 for (HStoreFile sf: sfs) { 1701 message.append(sf.getPath().getName()); 1702 message.append("(size="); 1703 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1704 message.append("), "); 1705 } 1706 } 1707 message.append("total size for store is ") 1708 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1709 .append(". This selection was in queue for ") 1710 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1711 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1712 .append(" to execute."); 1713 LOG.info(message.toString()); 1714 if (LOG.isTraceEnabled()) { 1715 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1716 long resultSize = getTotalSize(sfs); 1717 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1718 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1719 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1720 LOG.trace(traceMessage); 1721 } 1722 } 1723 1724 /** 1725 * Call to complete a compaction. Its for the case where we find in the WAL a compaction 1726 * that was not finished. We could find one recovering a WAL after a regionserver crash. 1727 * See HBASE-2231. 1728 */ 1729 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1730 boolean removeFiles) throws IOException { 1731 LOG.debug("Completing compaction from the WAL marker"); 1732 List<String> compactionInputs = compaction.getCompactionInputList(); 1733 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1734 1735 // The Compaction Marker is written after the compaction is completed, 1736 // and the files moved into the region/family folder. 1737 // 1738 // If we crash after the entry is written, we may not have removed the 1739 // input files, but the output file is present. 1740 // (The unremoved input files will be removed by this function) 1741 // 1742 // If we scan the directory and the file is not present, it can mean that: 1743 // - The file was manually removed by the user 1744 // - The file was removed as consequence of subsequent compaction 1745 // so, we can't do anything with the "compaction output list" because those 1746 // files have already been loaded when opening the region (by virtue of 1747 // being in the store's folder) or they may be missing due to a compaction. 1748 1749 String familyName = this.getColumnFamilyName(); 1750 Set<String> inputFiles = new HashSet<>(); 1751 for (String compactionInput : compactionInputs) { 1752 Path inputPath = fs.getStoreFilePath(familyName, compactionInput); 1753 inputFiles.add(inputPath.getName()); 1754 } 1755 1756 //some of the input files might already be deleted 1757 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1758 for (HStoreFile sf : this.getStorefiles()) { 1759 if (inputFiles.contains(sf.getPath().getName())) { 1760 inputStoreFiles.add(sf); 1761 } 1762 } 1763 1764 // check whether we need to pick up the new files 1765 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1766 1767 if (pickCompactionFiles) { 1768 for (HStoreFile sf : this.getStorefiles()) { 1769 compactionOutputs.remove(sf.getPath().getName()); 1770 } 1771 for (String compactionOutput : compactionOutputs) { 1772 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1773 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 1774 outputStoreFiles.add(storeFile); 1775 } 1776 } 1777 1778 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1779 LOG.info("Replaying compaction marker, replacing input files: " + 1780 inputStoreFiles + " with output files : " + outputStoreFiles); 1781 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); 1782 this.completeCompaction(inputStoreFiles); 1783 } 1784 } 1785 1786 /** 1787 * This method tries to compact N recent files for testing. 1788 * Note that because compacting "recent" files only makes sense for some policies, 1789 * e.g. the default one, it assumes default policy is used. It doesn't use policy, 1790 * but instead makes a compaction candidate list by itself. 1791 * @param N Number of files. 1792 */ 1793 @VisibleForTesting 1794 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { 1795 List<HStoreFile> filesToCompact; 1796 boolean isMajor; 1797 1798 this.lock.readLock().lock(); 1799 try { 1800 synchronized (filesCompacting) { 1801 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles()); 1802 if (!filesCompacting.isEmpty()) { 1803 // exclude all files older than the newest file we're currently 1804 // compacting. this allows us to preserve contiguity (HBASE-2856) 1805 HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); 1806 int idx = filesToCompact.indexOf(last); 1807 Preconditions.checkArgument(idx != -1); 1808 filesToCompact.subList(0, idx + 1).clear(); 1809 } 1810 int count = filesToCompact.size(); 1811 if (N > count) { 1812 throw new RuntimeException("Not enough files"); 1813 } 1814 1815 filesToCompact = filesToCompact.subList(count - N, count); 1816 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); 1817 filesCompacting.addAll(filesToCompact); 1818 Collections.sort(filesCompacting, storeEngine.getStoreFileManager() 1819 .getStoreFileComparator()); 1820 } 1821 } finally { 1822 this.lock.readLock().unlock(); 1823 } 1824 1825 try { 1826 // Ready to go. Have list of files to compact. 1827 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor()) 1828 .compactForTesting(filesToCompact, isMajor); 1829 for (Path newFile: newFiles) { 1830 // Move the compaction into place. 1831 HStoreFile sf = moveFileIntoPlace(newFile); 1832 if (this.getCoprocessorHost() != null) { 1833 this.getCoprocessorHost().postCompact(this, sf, null, null, null); 1834 } 1835 replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); 1836 completeCompaction(filesToCompact); 1837 } 1838 } finally { 1839 synchronized (filesCompacting) { 1840 filesCompacting.removeAll(filesToCompact); 1841 } 1842 } 1843 } 1844 1845 @Override 1846 public boolean hasReferences() { 1847 // Grab the read lock here, because we need to ensure that: only when the atomic 1848 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1849 this.lock.readLock().lock(); 1850 try { 1851 // Merge the current store files with compacted files here due to HBASE-20940. 1852 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1853 allStoreFiles.addAll(getCompactedFiles()); 1854 return StoreUtils.hasReferences(allStoreFiles); 1855 } finally { 1856 this.lock.readLock().unlock(); 1857 } 1858 } 1859 1860 /** 1861 * getter for CompactionProgress object 1862 * @return CompactionProgress object; can be null 1863 */ 1864 public CompactionProgress getCompactionProgress() { 1865 return this.storeEngine.getCompactor().getProgress(); 1866 } 1867 1868 @Override 1869 public boolean shouldPerformMajorCompaction() throws IOException { 1870 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1871 // TODO: what are these reader checks all over the place? 1872 if (sf.getReader() == null) { 1873 LOG.debug("StoreFile {} has null Reader", sf); 1874 return false; 1875 } 1876 } 1877 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( 1878 this.storeEngine.getStoreFileManager().getStorefiles()); 1879 } 1880 1881 public Optional<CompactionContext> requestCompaction() throws IOException { 1882 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1883 } 1884 1885 public Optional<CompactionContext> requestCompaction(int priority, 1886 CompactionLifeCycleTracker tracker, User user) throws IOException { 1887 // don't even select for compaction if writes are disabled 1888 if (!this.areWritesEnabled()) { 1889 return Optional.empty(); 1890 } 1891 // Before we do compaction, try to get rid of unneeded files to simplify things. 1892 removeUnneededFiles(); 1893 1894 final CompactionContext compaction = storeEngine.createCompaction(); 1895 CompactionRequestImpl request = null; 1896 this.lock.readLock().lock(); 1897 try { 1898 synchronized (filesCompacting) { 1899 // First, see if coprocessor would want to override selection. 1900 if (this.getCoprocessorHost() != null) { 1901 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1902 boolean override = getCoprocessorHost().preCompactSelection(this, 1903 candidatesForCoproc, tracker, user); 1904 if (override) { 1905 // Coprocessor is overriding normal file selection. 1906 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1907 } 1908 } 1909 1910 // Normal case - coprocessor is not overriding file selection. 1911 if (!compaction.hasSelection()) { 1912 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1913 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && 1914 offPeakCompactionTracker.compareAndSet(false, true); 1915 try { 1916 compaction.select(this.filesCompacting, isUserCompaction, 1917 mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); 1918 } catch (IOException e) { 1919 if (mayUseOffPeak) { 1920 offPeakCompactionTracker.set(false); 1921 } 1922 throw e; 1923 } 1924 assert compaction.hasSelection(); 1925 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1926 // Compaction policy doesn't want to take advantage of off-peak. 1927 offPeakCompactionTracker.set(false); 1928 } 1929 } 1930 if (this.getCoprocessorHost() != null) { 1931 this.getCoprocessorHost().postCompactSelection( 1932 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1933 compaction.getRequest(), user); 1934 } 1935 // Finally, we have the resulting files list. Check if we have any files at all. 1936 request = compaction.getRequest(); 1937 Collection<HStoreFile> selectedFiles = request.getFiles(); 1938 if (selectedFiles.isEmpty()) { 1939 return Optional.empty(); 1940 } 1941 1942 addToCompactingFiles(selectedFiles); 1943 1944 // If we're enqueuing a major, clear the force flag. 1945 this.forceMajor = this.forceMajor && !request.isMajor(); 1946 1947 // Set common request properties. 1948 // Set priority, either override value supplied by caller or from store. 1949 final int compactionPriority = 1950 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1951 request.setPriority(compactionPriority); 1952 1953 if (request.isAfterSplit()) { 1954 // If the store belongs to recently splitted daughter regions, better we consider 1955 // them with the higher priority in the compaction queue. 1956 // Override priority if it is lower (higher int value) than 1957 // SPLIT_REGION_COMPACTION_PRIORITY 1958 final int splitHousekeepingPriority = 1959 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1960 request.setPriority(splitHousekeepingPriority); 1961 LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it" 1962 + " belongs to recently split daughter region {}", splitHousekeepingPriority, 1963 this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString()); 1964 } 1965 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1966 request.setTracker(tracker); 1967 } 1968 } finally { 1969 this.lock.readLock().unlock(); 1970 } 1971 1972 if (LOG.isDebugEnabled()) { 1973 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1974 + (request.isAllFiles() ? " (all files)" : "")); 1975 } 1976 this.region.reportCompactionRequestStart(request.isMajor()); 1977 return Optional.of(compaction); 1978 } 1979 1980 /** Adds the files to compacting files. filesCompacting must be locked. */ 1981 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1982 if (CollectionUtils.isEmpty(filesToAdd)) { 1983 return; 1984 } 1985 // Check that we do not try to compact the same StoreFile twice. 1986 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1987 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1988 } 1989 filesCompacting.addAll(filesToAdd); 1990 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1991 } 1992 1993 private void removeUnneededFiles() throws IOException { 1994 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 1995 return; 1996 } 1997 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1998 LOG.debug("Skipping expired store file removal due to min version of {} being {}", 1999 this, getColumnFamilyDescriptor().getMinVersions()); 2000 return; 2001 } 2002 this.lock.readLock().lock(); 2003 Collection<HStoreFile> delSfs = null; 2004 try { 2005 synchronized (filesCompacting) { 2006 long cfTtl = getStoreFileTtl(); 2007 if (cfTtl != Long.MAX_VALUE) { 2008 delSfs = storeEngine.getStoreFileManager().getUnneededFiles( 2009 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 2010 addToCompactingFiles(delSfs); 2011 } 2012 } 2013 } finally { 2014 this.lock.readLock().unlock(); 2015 } 2016 2017 if (CollectionUtils.isEmpty(delSfs)) { 2018 return; 2019 } 2020 2021 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 2022 writeCompactionWalRecord(delSfs, newFiles); 2023 replaceStoreFiles(delSfs, newFiles); 2024 completeCompaction(delSfs); 2025 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " 2026 + this + "; total size is " 2027 + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 2028 } 2029 2030 public void cancelRequestedCompaction(CompactionContext compaction) { 2031 finishCompactionRequest(compaction.getRequest()); 2032 } 2033 2034 private void finishCompactionRequest(CompactionRequestImpl cr) { 2035 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 2036 if (cr.isOffPeak()) { 2037 offPeakCompactionTracker.set(false); 2038 cr.setOffPeak(false); 2039 } 2040 synchronized (filesCompacting) { 2041 filesCompacting.removeAll(cr.getFiles()); 2042 } 2043 } 2044 2045 /** 2046 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 2047 * operation. 2048 * @param path the path to the store file 2049 */ 2050 private void validateStoreFile(Path path) throws IOException { 2051 HStoreFile storeFile = null; 2052 try { 2053 storeFile = createStoreFileAndReader(path); 2054 } catch (IOException e) { 2055 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 2056 throw e; 2057 } finally { 2058 if (storeFile != null) { 2059 storeFile.closeStoreFile(false); 2060 } 2061 } 2062 } 2063 2064 /** 2065 * Update counts. 2066 * @param compactedFiles list of files that were compacted 2067 */ 2068 @VisibleForTesting 2069 protected void completeCompaction(Collection<HStoreFile> compactedFiles) 2070 // Rename this method! TODO. 2071 throws IOException { 2072 this.storeSize.set(0L); 2073 this.totalUncompressedBytes.set(0L); 2074 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 2075 StoreFileReader r = hsf.getReader(); 2076 if (r == null) { 2077 LOG.warn("StoreFile {} has a null Reader", hsf); 2078 continue; 2079 } 2080 this.storeSize.addAndGet(r.length()); 2081 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 2082 } 2083 } 2084 2085 /* 2086 * @param wantedVersions How many versions were asked for. 2087 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 2088 */ 2089 int versionsToReturn(final int wantedVersions) { 2090 if (wantedVersions <= 0) { 2091 throw new IllegalArgumentException("Number of versions must be > 0"); 2092 } 2093 // Make sure we do not return more than maximum versions for this store. 2094 int maxVersions = this.family.getMaxVersions(); 2095 return wantedVersions > maxVersions ? maxVersions: wantedVersions; 2096 } 2097 2098 @Override 2099 public boolean canSplit() { 2100 // Not split-able if we find a reference store file present in the store. 2101 boolean result = !hasReferences(); 2102 if (!result) { 2103 LOG.trace("Not splittable; has references: {}", this); 2104 } 2105 return result; 2106 } 2107 2108 /** 2109 * Determines if Store should be split. 2110 */ 2111 public Optional<byte[]> getSplitPoint() { 2112 this.lock.readLock().lock(); 2113 try { 2114 // Should already be enforced by the split policy! 2115 assert !this.getRegionInfo().isMetaRegion(); 2116 // Not split-able if we find a reference store file present in the store. 2117 if (hasReferences()) { 2118 LOG.trace("Not splittable; has references: {}", this); 2119 return Optional.empty(); 2120 } 2121 return this.storeEngine.getStoreFileManager().getSplitPoint(); 2122 } catch(IOException e) { 2123 LOG.warn("Failed getting store size for {}", this, e); 2124 } finally { 2125 this.lock.readLock().unlock(); 2126 } 2127 return Optional.empty(); 2128 } 2129 2130 @Override 2131 public long getLastCompactSize() { 2132 return this.lastCompactSize; 2133 } 2134 2135 @Override 2136 public long getSize() { 2137 return storeSize.get(); 2138 } 2139 2140 public void triggerMajorCompaction() { 2141 this.forceMajor = true; 2142 } 2143 2144 ////////////////////////////////////////////////////////////////////////////// 2145 // File administration 2146 ////////////////////////////////////////////////////////////////////////////// 2147 2148 /** 2149 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 2150 * compaction. 2151 * @param scan Scan to apply when scanning the stores 2152 * @param targetCols columns to scan 2153 * @return a scanner over the current key values 2154 * @throws IOException on failure 2155 */ 2156 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 2157 throws IOException { 2158 lock.readLock().lock(); 2159 try { 2160 ScanInfo scanInfo; 2161 if (this.getCoprocessorHost() != null) { 2162 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 2163 } else { 2164 scanInfo = getScanInfo(); 2165 } 2166 return createScanner(scan, scanInfo, targetCols, readPt); 2167 } finally { 2168 lock.readLock().unlock(); 2169 } 2170 } 2171 2172 // HMobStore will override this method to return its own implementation. 2173 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 2174 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 2175 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 2176 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 2177 } 2178 2179 /** 2180 * Recreates the scanners on the current list of active store file scanners 2181 * @param currentFileScanners the current set of active store file scanners 2182 * @param cacheBlocks cache the blocks or not 2183 * @param usePread use pread or not 2184 * @param isCompaction is the scanner for compaction 2185 * @param matcher the scan query matcher 2186 * @param startRow the scan's start row 2187 * @param includeStartRow should the scan include the start row 2188 * @param stopRow the scan's stop row 2189 * @param includeStopRow should the scan include the stop row 2190 * @param readPt the read point of the current scane 2191 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 2192 * @return list of scanners recreated on the current Scanners 2193 */ 2194 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 2195 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 2196 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 2197 boolean includeMemstoreScanner) throws IOException { 2198 this.lock.readLock().lock(); 2199 try { 2200 Map<String, HStoreFile> name2File = 2201 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 2202 for (HStoreFile file : getStorefiles()) { 2203 name2File.put(file.getFileInfo().getActiveFileName(), file); 2204 } 2205 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 2206 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 2207 name2File.put(file.getFileInfo().getActiveFileName(), file); 2208 } 2209 List<HStoreFile> filesToReopen = new ArrayList<>(); 2210 for (KeyValueScanner kvs : currentFileScanners) { 2211 assert kvs.isFileScanner(); 2212 if (kvs.peek() == null) { 2213 continue; 2214 } 2215 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 2216 } 2217 if (filesToReopen.isEmpty()) { 2218 return null; 2219 } 2220 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 2221 includeStartRow, stopRow, includeStopRow, readPt, false); 2222 } finally { 2223 this.lock.readLock().unlock(); 2224 } 2225 } 2226 2227 @Override 2228 public String toString() { 2229 return this.getRegionInfo().getShortNameToLog()+ "/" + this.getColumnFamilyName(); 2230 } 2231 2232 @Override 2233 public int getStorefilesCount() { 2234 return this.storeEngine.getStoreFileManager().getStorefileCount(); 2235 } 2236 2237 @Override 2238 public int getCompactedFilesCount() { 2239 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 2240 } 2241 2242 private LongStream getStoreFileAgeStream() { 2243 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2244 if (sf.getReader() == null) { 2245 LOG.warn("StoreFile {} has a null Reader", sf); 2246 return false; 2247 } else { 2248 return true; 2249 } 2250 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 2251 .map(t -> EnvironmentEdgeManager.currentTime() - t); 2252 } 2253 2254 @Override 2255 public OptionalLong getMaxStoreFileAge() { 2256 return getStoreFileAgeStream().max(); 2257 } 2258 2259 @Override 2260 public OptionalLong getMinStoreFileAge() { 2261 return getStoreFileAgeStream().min(); 2262 } 2263 2264 @Override 2265 public OptionalDouble getAvgStoreFileAge() { 2266 return getStoreFileAgeStream().average(); 2267 } 2268 2269 @Override 2270 public long getNumReferenceFiles() { 2271 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2272 .filter(HStoreFile::isReference).count(); 2273 } 2274 2275 @Override 2276 public long getNumHFiles() { 2277 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2278 .filter(HStoreFile::isHFile).count(); 2279 } 2280 2281 @Override 2282 public long getStoreSizeUncompressed() { 2283 return this.totalUncompressedBytes.get(); 2284 } 2285 2286 @Override 2287 public long getStorefilesSize() { 2288 // Include all StoreFiles 2289 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); 2290 } 2291 2292 @Override 2293 public long getHFilesSize() { 2294 // Include only StoreFiles which are HFiles 2295 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 2296 HStoreFile::isHFile); 2297 } 2298 2299 private long getTotalUncompressedBytes(List<HStoreFile> files) { 2300 return files.stream() 2301 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) 2302 .sum(); 2303 } 2304 2305 private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) { 2306 return files.stream().filter(predicate) 2307 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); 2308 } 2309 2310 private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) { 2311 if (file == null) { 2312 return 0L; 2313 } 2314 StoreFileReader reader = file.getReader(); 2315 if (reader == null) { 2316 return 0L; 2317 } 2318 return f.applyAsLong(reader); 2319 } 2320 2321 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 2322 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2323 .mapToLong(file -> getStorefileFieldSize(file, f)).sum(); 2324 } 2325 2326 @Override 2327 public long getStorefilesRootLevelIndexSize() { 2328 return getStorefilesFieldSize(StoreFileReader::indexSize); 2329 } 2330 2331 @Override 2332 public long getTotalStaticIndexSize() { 2333 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 2334 } 2335 2336 @Override 2337 public long getTotalStaticBloomSize() { 2338 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 2339 } 2340 2341 @Override 2342 public MemStoreSize getMemStoreSize() { 2343 return this.memstore.size(); 2344 } 2345 2346 @Override 2347 public int getCompactPriority() { 2348 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 2349 if (priority == PRIORITY_USER) { 2350 LOG.warn("Compaction priority is USER despite there being no user compaction"); 2351 } 2352 return priority; 2353 } 2354 2355 public boolean throttleCompaction(long compactionSize) { 2356 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 2357 } 2358 2359 public HRegion getHRegion() { 2360 return this.region; 2361 } 2362 2363 public RegionCoprocessorHost getCoprocessorHost() { 2364 return this.region.getCoprocessorHost(); 2365 } 2366 2367 @Override 2368 public RegionInfo getRegionInfo() { 2369 return this.fs.getRegionInfo(); 2370 } 2371 2372 @Override 2373 public boolean areWritesEnabled() { 2374 return this.region.areWritesEnabled(); 2375 } 2376 2377 @Override 2378 public long getSmallestReadPoint() { 2379 return this.region.getSmallestReadPoint(); 2380 } 2381 2382 /** 2383 * Adds or replaces the specified KeyValues. 2384 * <p> 2385 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 2386 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 2387 * <p> 2388 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 2389 * across all of them. 2390 * @param readpoint readpoint below which we can safely remove duplicate KVs 2391 */ 2392 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 2393 throws IOException { 2394 this.lock.readLock().lock(); 2395 try { 2396 this.memstore.upsert(cells, readpoint, memstoreSizing); 2397 } finally { 2398 this.lock.readLock().unlock(); 2399 } 2400 } 2401 2402 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 2403 return new StoreFlusherImpl(cacheFlushId, tracker); 2404 } 2405 2406 private final class StoreFlusherImpl implements StoreFlushContext { 2407 2408 private final FlushLifeCycleTracker tracker; 2409 private final long cacheFlushSeqNum; 2410 private MemStoreSnapshot snapshot; 2411 private List<Path> tempFiles; 2412 private List<Path> committedFiles; 2413 private long cacheFlushCount; 2414 private long cacheFlushSize; 2415 private long outputFileSize; 2416 2417 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 2418 this.cacheFlushSeqNum = cacheFlushSeqNum; 2419 this.tracker = tracker; 2420 } 2421 2422 /** 2423 * This is not thread safe. The caller should have a lock on the region or the store. 2424 * If necessary, the lock can be added with the patch provided in HBASE-10087 2425 */ 2426 @Override 2427 public MemStoreSize prepare() { 2428 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 2429 this.snapshot = memstore.snapshot(); 2430 this.cacheFlushCount = snapshot.getCellsCount(); 2431 this.cacheFlushSize = snapshot.getDataSize(); 2432 committedFiles = new ArrayList<>(1); 2433 return snapshot.getMemStoreSize(); 2434 } 2435 2436 @Override 2437 public void flushCache(MonitoredTask status) throws IOException { 2438 RegionServerServices rsService = region.getRegionServerServices(); 2439 ThroughputController throughputController = 2440 rsService == null ? null : rsService.getFlushThroughputController(); 2441 tempFiles = 2442 HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); 2443 } 2444 2445 @Override 2446 public boolean commit(MonitoredTask status) throws IOException { 2447 if (CollectionUtils.isEmpty(this.tempFiles)) { 2448 return false; 2449 } 2450 List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); 2451 for (Path storeFilePath : tempFiles) { 2452 try { 2453 HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); 2454 outputFileSize += sf.getReader().length(); 2455 storeFiles.add(sf); 2456 } catch (IOException ex) { 2457 LOG.error("Failed to commit store file {}", storeFilePath, ex); 2458 // Try to delete the files we have committed before. 2459 for (HStoreFile sf : storeFiles) { 2460 Path pathToDelete = sf.getPath(); 2461 try { 2462 sf.deleteStoreFile(); 2463 } catch (IOException deleteEx) { 2464 LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " 2465 + "halting {}", pathToDelete, ex); 2466 Runtime.getRuntime().halt(1); 2467 } 2468 } 2469 throw new IOException("Failed to commit the flush", ex); 2470 } 2471 } 2472 2473 for (HStoreFile sf : storeFiles) { 2474 if (HStore.this.getCoprocessorHost() != null) { 2475 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2476 } 2477 committedFiles.add(sf.getPath()); 2478 } 2479 2480 HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); 2481 HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); 2482 HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); 2483 2484 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2485 return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); 2486 } 2487 2488 @Override 2489 public long getOutputFileSize() { 2490 return outputFileSize; 2491 } 2492 2493 @Override 2494 public List<Path> getCommittedFiles() { 2495 return committedFiles; 2496 } 2497 2498 /** 2499 * Similar to commit, but called in secondary region replicas for replaying the 2500 * flush cache from primary region. Adds the new files to the store, and drops the 2501 * snapshot depending on dropMemstoreSnapshot argument. 2502 * @param fileNames names of the flushed files 2503 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2504 */ 2505 @Override 2506 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2507 throws IOException { 2508 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2509 for (String file : fileNames) { 2510 // open the file as a store file (hfile link, etc) 2511 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); 2512 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 2513 storeFiles.add(storeFile); 2514 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2515 HStore.this.totalUncompressedBytes 2516 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2517 if (LOG.isInfoEnabled()) { 2518 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + 2519 ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2520 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2521 } 2522 } 2523 2524 long snapshotId = -1; // -1 means do not drop 2525 if (dropMemstoreSnapshot && snapshot != null) { 2526 snapshotId = snapshot.getId(); 2527 snapshot.close(); 2528 } 2529 HStore.this.updateStorefiles(storeFiles, snapshotId); 2530 } 2531 2532 /** 2533 * Abort the snapshot preparation. Drops the snapshot if any. 2534 */ 2535 @Override 2536 public void abort() throws IOException { 2537 if (snapshot != null) { 2538 //We need to close the snapshot when aborting, otherwise, the segment scanner 2539 //won't be closed. If we are using MSLAB, the chunk referenced by those scanners 2540 //can't be released, thus memory leak 2541 snapshot.close(); 2542 HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); 2543 } 2544 } 2545 } 2546 2547 @Override 2548 public boolean needsCompaction() { 2549 List<HStoreFile> filesCompactingClone = null; 2550 synchronized (filesCompacting) { 2551 filesCompactingClone = Lists.newArrayList(filesCompacting); 2552 } 2553 return this.storeEngine.needsCompaction(filesCompactingClone); 2554 } 2555 2556 /** 2557 * Used for tests. 2558 * @return cache configuration for this Store. 2559 */ 2560 @VisibleForTesting 2561 public CacheConfig getCacheConfig() { 2562 return this.cacheConf; 2563 } 2564 2565 public static final long FIXED_OVERHEAD = 2566 ClassSize.align(ClassSize.OBJECT + (29 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 2567 + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); 2568 2569 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD 2570 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK 2571 + ClassSize.CONCURRENT_SKIPLISTMAP 2572 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT 2573 + ScanInfo.FIXED_OVERHEAD); 2574 2575 @Override 2576 public long heapSize() { 2577 MemStoreSize memstoreSize = this.memstore.size(); 2578 return DEEP_OVERHEAD + memstoreSize.getHeapSize(); 2579 } 2580 2581 @Override 2582 public CellComparator getComparator() { 2583 return comparator; 2584 } 2585 2586 public ScanInfo getScanInfo() { 2587 return scanInfo; 2588 } 2589 2590 /** 2591 * Set scan info, used by test 2592 * @param scanInfo new scan info to use for test 2593 */ 2594 void setScanInfo(ScanInfo scanInfo) { 2595 this.scanInfo = scanInfo; 2596 } 2597 2598 @Override 2599 public boolean hasTooManyStoreFiles() { 2600 return getStorefilesCount() > this.blockingFileCount; 2601 } 2602 2603 @Override 2604 public long getFlushedCellsCount() { 2605 return flushedCellsCount.get(); 2606 } 2607 2608 @Override 2609 public long getFlushedCellsSize() { 2610 return flushedCellsSize.get(); 2611 } 2612 2613 @Override 2614 public long getFlushedOutputFileSize() { 2615 return flushedOutputFileSize.get(); 2616 } 2617 2618 @Override 2619 public long getCompactedCellsCount() { 2620 return compactedCellsCount.get(); 2621 } 2622 2623 @Override 2624 public long getCompactedCellsSize() { 2625 return compactedCellsSize.get(); 2626 } 2627 2628 @Override 2629 public long getMajorCompactedCellsCount() { 2630 return majorCompactedCellsCount.get(); 2631 } 2632 2633 @Override 2634 public long getMajorCompactedCellsSize() { 2635 return majorCompactedCellsSize.get(); 2636 } 2637 2638 /** 2639 * Returns the StoreEngine that is backing this concrete implementation of Store. 2640 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2641 */ 2642 @VisibleForTesting 2643 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2644 return this.storeEngine; 2645 } 2646 2647 protected OffPeakHours getOffPeakHours() { 2648 return this.offPeakHours; 2649 } 2650 2651 /** 2652 * {@inheritDoc} 2653 */ 2654 @Override 2655 public void onConfigurationChange(Configuration conf) { 2656 this.conf = new CompoundConfiguration() 2657 .add(conf) 2658 .addBytesMap(family.getValues()); 2659 this.storeEngine.compactionPolicy.setConf(conf); 2660 this.offPeakHours = OffPeakHours.getInstance(conf); 2661 } 2662 2663 /** 2664 * {@inheritDoc} 2665 */ 2666 @Override 2667 public void registerChildren(ConfigurationManager manager) { 2668 // No children to register 2669 } 2670 2671 /** 2672 * {@inheritDoc} 2673 */ 2674 @Override 2675 public void deregisterChildren(ConfigurationManager manager) { 2676 // No children to deregister 2677 } 2678 2679 @Override 2680 public double getCompactionPressure() { 2681 return storeEngine.getStoreFileManager().getCompactionPressure(); 2682 } 2683 2684 @Override 2685 public boolean isPrimaryReplicaStore() { 2686 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2687 } 2688 2689 /** 2690 * Sets the store up for a region level snapshot operation. 2691 * @see #postSnapshotOperation() 2692 */ 2693 public void preSnapshotOperation() { 2694 archiveLock.lock(); 2695 } 2696 2697 /** 2698 * Perform tasks needed after the completion of snapshot operation. 2699 * @see #preSnapshotOperation() 2700 */ 2701 public void postSnapshotOperation() { 2702 archiveLock.unlock(); 2703 } 2704 2705 /** 2706 * Closes and archives the compacted files under this store 2707 */ 2708 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2709 // ensure other threads do not attempt to archive the same files on close() 2710 archiveLock.lock(); 2711 try { 2712 lock.readLock().lock(); 2713 Collection<HStoreFile> copyCompactedfiles = null; 2714 try { 2715 Collection<HStoreFile> compactedfiles = 2716 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2717 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2718 // Do a copy under read lock 2719 copyCompactedfiles = new ArrayList<>(compactedfiles); 2720 } else { 2721 LOG.trace("No compacted files to archive"); 2722 } 2723 } finally { 2724 lock.readLock().unlock(); 2725 } 2726 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2727 removeCompactedfiles(copyCompactedfiles, true); 2728 } 2729 } finally { 2730 archiveLock.unlock(); 2731 } 2732 } 2733 2734 /** 2735 * Archives and removes the compacted files 2736 * @param compactedfiles The compacted files in this store that are not active in reads 2737 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2738 * closed, false if not 2739 */ 2740 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2741 throws IOException { 2742 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2743 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2744 for (final HStoreFile file : compactedfiles) { 2745 synchronized (file) { 2746 try { 2747 StoreFileReader r = file.getReader(); 2748 if (r == null) { 2749 LOG.debug("The file {} was closed but still not archived", file); 2750 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2751 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2752 // for that. 2753 long length = getStoreFileSize(file); 2754 filesToRemove.add(file); 2755 storeFileSizes.add(length); 2756 continue; 2757 } 2758 2759 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2760 // Even if deleting fails we need not bother as any new scanners won't be 2761 // able to use the compacted file as the status is already compactedAway 2762 LOG.trace("Closing and archiving the file {}", file); 2763 // Copy the file size before closing the reader 2764 final long length = r.length(); 2765 r.close(evictOnClose); 2766 // Just close and return 2767 filesToRemove.add(file); 2768 // Only add the length if we successfully added the file to `filesToRemove` 2769 storeFileSizes.add(length); 2770 } else { 2771 LOG.info("Can't archive compacted file " + file.getPath() 2772 + " because of either isCompactedAway=" + file.isCompactedAway() 2773 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2774 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2775 } 2776 } catch (Exception e) { 2777 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2778 e); 2779 } 2780 } 2781 } 2782 if (this.isPrimaryReplicaStore()) { 2783 // Only the primary region is allowed to move the file to archive. 2784 // The secondary region does not move the files to archive. Any active reads from 2785 // the secondary region will still work because the file as such has active readers on it. 2786 if (!filesToRemove.isEmpty()) { 2787 LOG.debug("Moving the files {} to archive", filesToRemove); 2788 // Only if this is successful it has to be removed 2789 try { 2790 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2791 filesToRemove); 2792 } catch (FailedArchiveException fae) { 2793 // Even if archiving some files failed, we still need to clear out any of the 2794 // files which were successfully archived. Otherwise we will receive a 2795 // FileNotFoundException when we attempt to re-archive them in the next go around. 2796 Collection<Path> failedFiles = fae.getFailedFiles(); 2797 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2798 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2799 while (iter.hasNext()) { 2800 sizeIter.next(); 2801 if (failedFiles.contains(iter.next().getPath())) { 2802 iter.remove(); 2803 sizeIter.remove(); 2804 } 2805 } 2806 if (!filesToRemove.isEmpty()) { 2807 clearCompactedfiles(filesToRemove); 2808 } 2809 throw fae; 2810 } 2811 } 2812 } 2813 if (!filesToRemove.isEmpty()) { 2814 // Clear the compactedfiles from the store file manager 2815 clearCompactedfiles(filesToRemove); 2816 // Try to send report of this archival to the Master for updating quota usage faster 2817 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2818 } 2819 } 2820 2821 /** 2822 * Computes the length of a store file without succumbing to any errors along the way. If an 2823 * error is encountered, the implementation returns {@code 0} instead of the actual size. 2824 * 2825 * @param file The file to compute the size of. 2826 * @return The size in bytes of the provided {@code file}. 2827 */ 2828 long getStoreFileSize(HStoreFile file) { 2829 long length = 0; 2830 try { 2831 file.initReader(); 2832 length = file.getReader().length(); 2833 } catch (IOException e) { 2834 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2835 file, e); 2836 } finally { 2837 try { 2838 file.closeStoreFile( 2839 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2840 } catch (IOException e) { 2841 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", 2842 file, e); 2843 } 2844 } 2845 return length; 2846 } 2847 2848 public Long preFlushSeqIDEstimation() { 2849 return memstore.preFlushSeqIDEstimation(); 2850 } 2851 2852 @Override 2853 public boolean isSloppyMemStore() { 2854 return this.memstore.isSloppy(); 2855 } 2856 2857 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2858 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2859 try { 2860 lock.writeLock().lock(); 2861 this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); 2862 } finally { 2863 lock.writeLock().unlock(); 2864 } 2865 } 2866 2867 @Override 2868 public int getCurrentParallelPutCount() { 2869 return currentParallelPutCount.get(); 2870 } 2871 2872 public int getStoreRefCount() { 2873 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2874 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2875 .mapToInt(HStoreFile::getRefCount).sum(); 2876 } 2877 2878 /** 2879 * @return get maximum ref count of storeFile among all compacted HStore Files 2880 * for the HStore 2881 */ 2882 public int getMaxCompactedStoreFileRefCount() { 2883 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2884 .getCompactedfiles() 2885 .stream() 2886 .filter(sf -> sf.getReader() != null) 2887 .filter(HStoreFile::isHFile) 2888 .mapToInt(HStoreFile::getRefCount) 2889 .max(); 2890 return maxCompactedStoreFileRefCount.isPresent() 2891 ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2892 } 2893 2894 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2895 // Sanity check from the caller 2896 if (archivedFiles.size() != fileSizes.size()) { 2897 throw new RuntimeException("Coding error: should never see lists of varying size"); 2898 } 2899 RegionServerServices rss = this.region.getRegionServerServices(); 2900 if (rss == null) { 2901 return; 2902 } 2903 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2904 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2905 for (StoreFile storeFile : archivedFiles) { 2906 final long fileSize = fileSizeIter.next(); 2907 if (storeFile.isHFile() && fileSize != 0) { 2908 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2909 } 2910 } 2911 if (LOG.isTraceEnabled()) { 2912 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2913 + filesWithSizes); 2914 } 2915 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2916 if (!success) { 2917 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2918 } 2919 } 2920 @Override 2921 public long getMemstoreOnlyRowReadsCount() { 2922 return memstoreOnlyRowReadsCount.sum(); 2923 } 2924 2925 @Override 2926 public long getMixedRowReadsCount() { 2927 return mixedRowReadsCount.sum(); 2928 } 2929 2930 void updateMetricsStore(boolean memstoreRead) { 2931 if (memstoreRead) { 2932 memstoreOnlyRowReadsCount.increment(); 2933 } else { 2934 mixedRowReadsCount.increment(); 2935 } 2936 } 2937}