001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalInt; 037import java.util.OptionalLong; 038import java.util.Set; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CompletionService; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorCompletionService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.concurrent.locks.ReentrantReadWriteLock; 052import java.util.function.Predicate; 053import java.util.function.ToLongFunction; 054import java.util.stream.Collectors; 055import java.util.stream.LongStream; 056 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.Path; 060import org.apache.hadoop.fs.permission.FsAction; 061import org.apache.hadoop.hbase.Cell; 062import org.apache.hadoop.hbase.CellComparator; 063import org.apache.hadoop.hbase.CellUtil; 064import org.apache.hadoop.hbase.CompoundConfiguration; 065import org.apache.hadoop.hbase.HConstants; 066import org.apache.hadoop.hbase.MemoryCompactionPolicy; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.backup.FailedArchiveException; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.client.Scan; 072import org.apache.hadoop.hbase.conf.ConfigurationManager; 073import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 074import org.apache.hadoop.hbase.io.HeapSize; 075import org.apache.hadoop.hbase.io.compress.Compression; 076import org.apache.hadoop.hbase.io.crypto.Encryption; 077import org.apache.hadoop.hbase.io.hfile.CacheConfig; 078import org.apache.hadoop.hbase.io.hfile.HFile; 079import org.apache.hadoop.hbase.io.hfile.HFileContext; 080import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 083import org.apache.hadoop.hbase.io.hfile.HFileScanner; 084import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 085import org.apache.hadoop.hbase.log.HBaseMarkers; 086import org.apache.hadoop.hbase.monitoring.MonitoredTask; 087import org.apache.hadoop.hbase.quotas.RegionSizeStore; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 090import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 091import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 092import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 093import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 094import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 095import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 096import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 097import org.apache.hadoop.hbase.security.EncryptionUtil; 098import org.apache.hadoop.hbase.security.User; 099import org.apache.hadoop.hbase.util.Bytes; 100import org.apache.hadoop.hbase.util.ChecksumType; 101import org.apache.hadoop.hbase.util.ClassSize; 102import org.apache.hadoop.hbase.util.CommonFSUtils; 103import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 104import org.apache.hadoop.hbase.util.Pair; 105import org.apache.hadoop.hbase.util.ReflectionUtils; 106import org.apache.hadoop.util.StringUtils; 107import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 108import org.apache.yetus.audience.InterfaceAudience; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 113import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 114import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 115import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 116import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 117import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 118import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 119import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 120 121import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 123 124/** 125 * A Store holds a column family in a Region. Its a memstore and a set of zero 126 * or more StoreFiles, which stretch backwards over time. 127 * 128 * <p>There's no reason to consider append-logging at this level; all logging 129 * and locking is handled at the HRegion level. Store just provides 130 * services to manage sets of StoreFiles. One of the most important of those 131 * services is compaction services where files are aggregated once they pass 132 * a configurable threshold. 133 * 134 * <p>Locking and transactions are handled at a higher level. This API should 135 * not be called directly but by an HRegion manager. 136 */ 137@InterfaceAudience.Private 138public class HStore implements Store, HeapSize, StoreConfigInformation, 139 PropagatingConfigurationObserver { 140 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 141 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 142 "hbase.server.compactchecker.interval.multiplier"; 143 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 144 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 145 // keep in accordance with HDFS default storage policy 146 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; 147 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 148 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 149 150 // HBASE-24428 : Update compaction priority for recently split daughter regions 151 // so as to prioritize their compaction. 152 // Any compaction candidate with higher priority than compaction of newly split daugher regions 153 // should have priority value < (Integer.MIN_VALUE + 1000) 154 private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; 155 156 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 157 158 protected final MemStore memstore; 159 // This stores directory in the filesystem. 160 protected final HRegion region; 161 private final ColumnFamilyDescriptor family; 162 private final HRegionFileSystem fs; 163 protected Configuration conf; 164 protected CacheConfig cacheConf; 165 private long lastCompactSize = 0; 166 volatile boolean forceMajor = false; 167 /* how many bytes to write between status checks */ 168 static int closeCheckInterval = 0; 169 private AtomicLong storeSize = new AtomicLong(); 170 private AtomicLong totalUncompressedBytes = new AtomicLong(); 171 private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); 172 // rows that has cells from both memstore and files (or only files) 173 private LongAdder mixedRowReadsCount = new LongAdder(); 174 175 private boolean cacheOnWriteLogged; 176 177 /** 178 * RWLock for store operations. 179 * Locked in shared mode when the list of component stores is looked at: 180 * - all reads/writes to table data 181 * - checking for split 182 * Locked in exclusive mode when the list of component stores is modified: 183 * - closing 184 * - completing a compaction 185 */ 186 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 187 /** 188 * Lock specific to archiving compacted store files. This avoids races around 189 * the combination of retrieving the list of compacted files and moving them to 190 * the archive directory. Since this is usually a background process (other than 191 * on close), we don't want to handle this with the store write lock, which would 192 * block readers and degrade performance. 193 * 194 * Locked by: 195 * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() 196 * - close() 197 */ 198 final ReentrantLock archiveLock = new ReentrantLock(); 199 200 private final boolean verifyBulkLoads; 201 202 /** 203 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 204 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will 205 * log a message that identifies the Store experience this high-level of concurrency. 206 */ 207 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 208 private final int parallelPutCountPrintThreshold; 209 210 private ScanInfo scanInfo; 211 212 // All access must be synchronized. 213 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 214 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 215 216 // All access must be synchronized. 217 private final Set<ChangedReadersObserver> changedReaderObservers = 218 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 219 220 protected final int blocksize; 221 private HFileDataBlockEncoder dataBlockEncoder; 222 223 /** Checksum configuration */ 224 protected ChecksumType checksumType; 225 protected int bytesPerChecksum; 226 227 // Comparing KeyValues 228 protected final CellComparator comparator; 229 230 final StoreEngine<?, ?, ?, ?> storeEngine; 231 232 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 233 private volatile OffPeakHours offPeakHours; 234 235 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 236 private int flushRetriesNumber; 237 private int pauseTime; 238 239 private long blockingFileCount; 240 private int compactionCheckMultiplier; 241 protected Encryption.Context cryptoContext = Encryption.Context.NONE; 242 243 private AtomicLong flushedCellsCount = new AtomicLong(); 244 private AtomicLong compactedCellsCount = new AtomicLong(); 245 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 246 private AtomicLong flushedCellsSize = new AtomicLong(); 247 private AtomicLong flushedOutputFileSize = new AtomicLong(); 248 private AtomicLong compactedCellsSize = new AtomicLong(); 249 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 250 251 /** 252 * Constructor 253 * @param family HColumnDescriptor for this column 254 * @param confParam configuration object failed. Can be null. 255 */ 256 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 257 final Configuration confParam, boolean warmup) throws IOException { 258 259 this.fs = region.getRegionFileSystem(); 260 261 // Assemble the store's home directory and Ensure it exists. 262 fs.createStoreDir(family.getNameAsString()); 263 this.region = region; 264 this.family = family; 265 // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor 266 // CompoundConfiguration will look for keys in reverse order of addition, so we'd 267 // add global config first, then table and cf overrides, then cf metadata. 268 this.conf = new CompoundConfiguration() 269 .add(confParam) 270 .addBytesMap(region.getTableDescriptor().getValues()) 271 .addStringMap(family.getConfiguration()) 272 .addBytesMap(family.getValues()); 273 this.blocksize = family.getBlocksize(); 274 275 // set block storage policy for store directory 276 String policyName = family.getStoragePolicy(); 277 if (null == policyName) { 278 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 279 } 280 this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); 281 282 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 283 284 this.comparator = region.getCellComparator(); 285 // used by ScanQueryMatcher 286 long timeToPurgeDeletes = 287 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 288 LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this); 289 // Get TTL 290 long ttl = determineTTLFromFamily(family); 291 // Why not just pass a HColumnDescriptor in here altogether? Even if have 292 // to clone it? 293 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); 294 this.memstore = getMemstore(); 295 296 this.offPeakHours = OffPeakHours.getInstance(conf); 297 298 // Setting up cache configuration for this family 299 createCacheConf(family); 300 301 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 302 303 this.blockingFileCount = 304 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 305 this.compactionCheckMultiplier = conf.getInt( 306 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 307 if (this.compactionCheckMultiplier <= 0) { 308 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 309 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 310 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 311 } 312 313 if (HStore.closeCheckInterval == 0) { 314 HStore.closeCheckInterval = conf.getInt( 315 "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); 316 } 317 318 this.storeEngine = createStoreEngine(this, this.conf, this.comparator); 319 List<HStoreFile> hStoreFiles = loadStoreFiles(warmup); 320 // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read 321 // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and 322 // update the storeSize in the completeCompaction(..) finally (just like compaction) , so 323 // no need calculate the storeSize twice. 324 this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); 325 this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); 326 this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); 327 328 // Initialize checksum type from name. The names are CRC32, CRC32C, etc. 329 this.checksumType = getChecksumType(conf); 330 // Initialize bytes per checksum 331 this.bytesPerChecksum = getBytesPerChecksum(conf); 332 flushRetriesNumber = conf.getInt( 333 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 334 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 335 if (flushRetriesNumber <= 0) { 336 throw new IllegalArgumentException( 337 "hbase.hstore.flush.retries.number must be > 0, not " 338 + flushRetriesNumber); 339 } 340 cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); 341 342 int confPrintThreshold = 343 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 344 if (confPrintThreshold < 10) { 345 confPrintThreshold = 10; 346 } 347 this.parallelPutCountPrintThreshold = confPrintThreshold; 348 349 LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 350 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 351 this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 352 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), 353 family.getCompressionType()); 354 cacheOnWriteLogged = false; 355 } 356 357 /** 358 * @return MemStore Instance to use in this store. 359 */ 360 private MemStore getMemstore() { 361 MemStore ms = null; 362 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 363 MemoryCompactionPolicy inMemoryCompaction = null; 364 if (this.getTableName().isSystemTable()) { 365 inMemoryCompaction = MemoryCompactionPolicy 366 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 367 } else { 368 inMemoryCompaction = family.getInMemoryCompaction(); 369 } 370 if (inMemoryCompaction == null) { 371 inMemoryCompaction = 372 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 373 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 374 } 375 switch (inMemoryCompaction) { 376 case NONE: 377 ms = ReflectionUtils.newInstance(DefaultMemStore.class, 378 new Object[] { conf, this.comparator, 379 this.getHRegion().getRegionServicesForStores()}); 380 break; 381 default: 382 Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME, 383 CompactingMemStore.class, CompactingMemStore.class); 384 ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this, 385 this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); 386 } 387 return ms; 388 } 389 390 /** 391 * Creates the cache config. 392 * @param family The current column family. 393 */ 394 protected void createCacheConf(final ColumnFamilyDescriptor family) { 395 this.cacheConf = new CacheConfig(conf, family, region.getBlockCache(), 396 region.getRegionServicesForStores().getByteBuffAllocator()); 397 LOG.info("Created cacheConfig: " + this.getCacheConfig() + " for " + this); 398 } 399 400 /** 401 * Creates the store engine configured for the given Store. 402 * @param store The store. An unfortunate dependency needed due to it 403 * being passed to coprocessors via the compactor. 404 * @param conf Store configuration. 405 * @param kvComparator KVComparator for storeFileManager. 406 * @return StoreEngine to use. 407 */ 408 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 409 CellComparator kvComparator) throws IOException { 410 return StoreEngine.create(store, conf, comparator); 411 } 412 413 /** 414 * @return TTL in seconds of the specified family 415 */ 416 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 417 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 418 long ttl = family.getTimeToLive(); 419 if (ttl == HConstants.FOREVER) { 420 // Default is unlimited ttl. 421 ttl = Long.MAX_VALUE; 422 } else if (ttl == -1) { 423 ttl = Long.MAX_VALUE; 424 } else { 425 // Second -> ms adjust for user data 426 ttl *= 1000; 427 } 428 return ttl; 429 } 430 431 @Override 432 public String getColumnFamilyName() { 433 return this.family.getNameAsString(); 434 } 435 436 @Override 437 public TableName getTableName() { 438 return this.getRegionInfo().getTable(); 439 } 440 441 @Override 442 public FileSystem getFileSystem() { 443 return this.fs.getFileSystem(); 444 } 445 446 public HRegionFileSystem getRegionFileSystem() { 447 return this.fs; 448 } 449 450 /* Implementation of StoreConfigInformation */ 451 @Override 452 public long getStoreFileTtl() { 453 // TTL only applies if there's no MIN_VERSIONs setting on the column. 454 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 455 } 456 457 @Override 458 public long getMemStoreFlushSize() { 459 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 460 return this.region.memstoreFlushSize; 461 } 462 463 @Override 464 public MemStoreSize getFlushableSize() { 465 return this.memstore.getFlushableSize(); 466 } 467 468 @Override 469 public MemStoreSize getSnapshotSize() { 470 return this.memstore.getSnapshotSize(); 471 } 472 473 @Override 474 public long getCompactionCheckMultiplier() { 475 return this.compactionCheckMultiplier; 476 } 477 478 @Override 479 public long getBlockingFileCount() { 480 return blockingFileCount; 481 } 482 /* End implementation of StoreConfigInformation */ 483 484 /** 485 * Returns the configured bytesPerChecksum value. 486 * @param conf The configuration 487 * @return The bytesPerChecksum that is set in the configuration 488 */ 489 public static int getBytesPerChecksum(Configuration conf) { 490 return conf.getInt(HConstants.BYTES_PER_CHECKSUM, 491 HFile.DEFAULT_BYTES_PER_CHECKSUM); 492 } 493 494 /** 495 * Returns the configured checksum algorithm. 496 * @param conf The configuration 497 * @return The checksum algorithm that is set in the configuration 498 */ 499 public static ChecksumType getChecksumType(Configuration conf) { 500 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); 501 if (checksumName == null) { 502 return ChecksumType.getDefaultChecksumType(); 503 } else { 504 return ChecksumType.nameToType(checksumName); 505 } 506 } 507 508 /** 509 * @return how many bytes to write between status checks 510 */ 511 public static int getCloseCheckInterval() { 512 return closeCheckInterval; 513 } 514 515 @Override 516 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 517 return this.family; 518 } 519 520 @Override 521 public OptionalLong getMaxSequenceId() { 522 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 523 } 524 525 @Override 526 public OptionalLong getMaxMemStoreTS() { 527 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 528 } 529 530 /** 531 * @param tabledir {@link Path} to where the table is being stored 532 * @param hri {@link RegionInfo} for the region. 533 * @param family {@link ColumnFamilyDescriptor} describing the column family 534 * @return Path to family/Store home directory. 535 * @deprecated Since 05/05/2013, HBase-7808, hbase-1.0.0 536 */ 537 @Deprecated 538 public static Path getStoreHomedir(final Path tabledir, 539 final RegionInfo hri, final byte[] family) { 540 return getStoreHomedir(tabledir, hri.getEncodedName(), family); 541 } 542 543 /** 544 * @param tabledir {@link Path} to where the table is being stored 545 * @param encodedName Encoded region name. 546 * @param family {@link ColumnFamilyDescriptor} describing the column family 547 * @return Path to family/Store home directory. 548 * @deprecated Since 05/05/2013, HBase-7808, hbase-1.0.0 549 */ 550 @Deprecated 551 public static Path getStoreHomedir(final Path tabledir, 552 final String encodedName, final byte[] family) { 553 return new Path(tabledir, new Path(encodedName, Bytes.toString(family))); 554 } 555 556 /** 557 * @return the data block encoder 558 */ 559 public HFileDataBlockEncoder getDataBlockEncoder() { 560 return dataBlockEncoder; 561 } 562 563 /** 564 * Should be used only in tests. 565 * @param blockEncoder the block delta encoder to use 566 */ 567 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 568 this.dataBlockEncoder = blockEncoder; 569 } 570 571 /** 572 * Creates an unsorted list of StoreFile loaded in parallel 573 * from the given directory. 574 */ 575 private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException { 576 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); 577 return openStoreFiles(files, warmup); 578 } 579 580 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 581 throws IOException { 582 if (CollectionUtils.isEmpty(files)) { 583 return Collections.emptyList(); 584 } 585 // initialize the thread pool for opening store files in parallel.. 586 ThreadPoolExecutor storeFileOpenerThreadPool = 587 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-" + 588 this.getColumnFamilyName()); 589 CompletionService<HStoreFile> completionService = 590 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 591 592 int totalValidStoreFile = 0; 593 for (StoreFileInfo storeFileInfo : files) { 594 // open each store file in parallel 595 completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); 596 totalValidStoreFile++; 597 } 598 599 Set<String> compactedStoreFiles = new HashSet<>(); 600 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 601 IOException ioe = null; 602 try { 603 for (int i = 0; i < totalValidStoreFile; i++) { 604 try { 605 HStoreFile storeFile = completionService.take().get(); 606 if (storeFile != null) { 607 LOG.debug("loaded {}", storeFile); 608 results.add(storeFile); 609 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 610 } 611 } catch (InterruptedException e) { 612 if (ioe == null) { 613 ioe = new InterruptedIOException(e.getMessage()); 614 } 615 } catch (ExecutionException e) { 616 if (ioe == null) { 617 ioe = new IOException(e.getCause()); 618 } 619 } 620 } 621 } finally { 622 storeFileOpenerThreadPool.shutdownNow(); 623 } 624 if (ioe != null) { 625 // close StoreFile readers 626 boolean evictOnClose = 627 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 628 for (HStoreFile file : results) { 629 try { 630 if (file != null) { 631 file.closeStoreFile(evictOnClose); 632 } 633 } catch (IOException e) { 634 LOG.warn("Could not close store file {}", file, e); 635 } 636 } 637 throw ioe; 638 } 639 640 // Should not archive the compacted store files when region warmup. See HBASE-22163. 641 if (!warmup) { 642 // Remove the compacted files from result 643 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 644 for (HStoreFile storeFile : results) { 645 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 646 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 647 storeFile.getReader().close(storeFile.getCacheConf() != null ? 648 storeFile.getCacheConf().shouldEvictOnClose() : true); 649 filesToRemove.add(storeFile); 650 } 651 } 652 results.removeAll(filesToRemove); 653 if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) { 654 LOG.debug("Moving the files {} to archive", filesToRemove); 655 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 656 } 657 } 658 659 return results; 660 } 661 662 @Override 663 public void refreshStoreFiles() throws IOException { 664 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName()); 665 refreshStoreFilesInternal(newFiles); 666 } 667 668 /** 669 * Replaces the store files that the store has with the given files. Mainly used by secondary 670 * region replicas to keep up to date with the primary region files. 671 */ 672 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 673 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 674 for (String file : newFiles) { 675 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); 676 } 677 refreshStoreFilesInternal(storeFiles); 678 } 679 680 /** 681 * Checks the underlying store files, and opens the files that have not 682 * been opened, and removes the store file readers for store files no longer 683 * available. Mainly used by secondary region replicas to keep up to date with 684 * the primary region files. 685 */ 686 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 687 StoreFileManager sfm = storeEngine.getStoreFileManager(); 688 Collection<HStoreFile> currentFiles = sfm.getStorefiles(); 689 Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); 690 if (currentFiles == null) { 691 currentFiles = Collections.emptySet(); 692 } 693 if (newFiles == null) { 694 newFiles = Collections.emptySet(); 695 } 696 if (compactedFiles == null) { 697 compactedFiles = Collections.emptySet(); 698 } 699 700 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 701 for (HStoreFile sf : currentFiles) { 702 currentFilesSet.put(sf.getFileInfo(), sf); 703 } 704 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 705 for (HStoreFile sf : compactedFiles) { 706 compactedFilesSet.put(sf.getFileInfo(), sf); 707 } 708 709 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 710 // Exclude the files that have already been compacted 711 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 712 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 713 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 714 715 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 716 return; 717 } 718 719 LOG.info("Refreshing store files for " + this + " files to add: " 720 + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); 721 722 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 723 for (StoreFileInfo sfi : toBeRemovedFiles) { 724 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 725 } 726 727 // try to open the files 728 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 729 730 // propogate the file changes to the underlying store file manager 731 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception 732 733 // Advance the memstore read point to be at least the new store files seqIds so that 734 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 735 // in-flight transactions might be made visible) 736 if (!toBeAddedFiles.isEmpty()) { 737 // we must have the max sequence id here as we do have several store files 738 region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); 739 } 740 741 completeCompaction(toBeRemovedStoreFiles); 742 } 743 744 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { 745 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), 746 p, isPrimaryReplicaStore()); 747 return createStoreFileAndReader(info); 748 } 749 750 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 751 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); 752 HStoreFile storeFile = new HStoreFile(info, this.family.getBloomFilterType(), this.cacheConf); 753 storeFile.initReader(); 754 return storeFile; 755 } 756 757 /** 758 * This message intends to inform the MemStore that next coming updates 759 * are going to be part of the replaying edits from WAL 760 */ 761 public void startReplayingFromWAL(){ 762 this.memstore.startReplayingFromWAL(); 763 } 764 765 /** 766 * This message intends to inform the MemStore that the replaying edits from WAL 767 * are done 768 */ 769 public void stopReplayingFromWAL(){ 770 this.memstore.stopReplayingFromWAL(); 771 } 772 773 /** 774 * Adds a value to the memstore 775 */ 776 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 777 lock.readLock().lock(); 778 try { 779 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 780 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 781 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 782 } 783 this.memstore.add(cell, memstoreSizing); 784 } finally { 785 lock.readLock().unlock(); 786 currentParallelPutCount.decrementAndGet(); 787 } 788 } 789 790 /** 791 * Adds the specified value to the memstore 792 */ 793 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 794 lock.readLock().lock(); 795 try { 796 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 797 LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", 798 this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName()); 799 } 800 memstore.add(cells, memstoreSizing); 801 } finally { 802 lock.readLock().unlock(); 803 currentParallelPutCount.decrementAndGet(); 804 } 805 } 806 807 @Override 808 public long timeOfOldestEdit() { 809 return memstore.timeOfOldestEdit(); 810 } 811 812 /** 813 * @return All store files. 814 */ 815 @Override 816 public Collection<HStoreFile> getStorefiles() { 817 return this.storeEngine.getStoreFileManager().getStorefiles(); 818 } 819 820 @Override 821 public Collection<HStoreFile> getCompactedFiles() { 822 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 823 } 824 825 /** 826 * This throws a WrongRegionException if the HFile does not fit in this region, or an 827 * InvalidHFileException if the HFile is not valid. 828 */ 829 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 830 HFile.Reader reader = null; 831 try { 832 LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this); 833 FileSystem srcFs = srcPath.getFileSystem(conf); 834 srcFs.access(srcPath, FsAction.READ_WRITE); 835 reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf); 836 837 Optional<byte[]> firstKey = reader.getFirstRowKey(); 838 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 839 Optional<Cell> lk = reader.getLastKey(); 840 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 841 byte[] lastKey = CellUtil.cloneRow(lk.get()); 842 843 if (LOG.isDebugEnabled()) { 844 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + 845 " last=" + Bytes.toStringBinary(lastKey)); 846 LOG.debug("Region bounds: first=" + 847 Bytes.toStringBinary(getRegionInfo().getStartKey()) + 848 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 849 } 850 851 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 852 throw new WrongRegionException( 853 "Bulk load file " + srcPath.toString() + " does not fit inside region " 854 + this.getRegionInfo().getRegionNameAsString()); 855 } 856 857 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, 858 HConstants.DEFAULT_MAX_FILE_SIZE)) { 859 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + 860 reader.length() + " bytes can be problematic as it may lead to oversplitting."); 861 } 862 863 if (verifyBulkLoads) { 864 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 865 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 866 Cell prevCell = null; 867 HFileScanner scanner = reader.getScanner(false, false, false); 868 scanner.seekTo(); 869 do { 870 Cell cell = scanner.getCell(); 871 if (prevCell != null) { 872 if (comparator.compareRows(prevCell, cell) > 0) { 873 throw new InvalidHFileException("Previous row is greater than" 874 + " current row: path=" + srcPath + " previous=" 875 + CellUtil.getCellKeyAsString(prevCell) + " current=" 876 + CellUtil.getCellKeyAsString(cell)); 877 } 878 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 879 throw new InvalidHFileException("Previous key had different" 880 + " family compared to current key: path=" + srcPath 881 + " previous=" 882 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 883 prevCell.getFamilyLength()) 884 + " current=" 885 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 886 cell.getFamilyLength())); 887 } 888 } 889 prevCell = cell; 890 } while (scanner.next()); 891 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + 892 " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms"); 893 } 894 } finally { 895 if (reader != null) { 896 reader.close(); 897 } 898 } 899 } 900 901 /** 902 * This method should only be called from Region. It is assumed that the ranges of values in the 903 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 904 * 905 * @param seqNum sequence Id associated with the HFile 906 */ 907 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 908 Path srcPath = new Path(srcPathStr); 909 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 910 } 911 912 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 913 Path srcPath = new Path(srcPathStr); 914 try { 915 fs.commitStoreFile(srcPath, dstPath); 916 } finally { 917 if (this.getCoprocessorHost() != null) { 918 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 919 } 920 } 921 922 LOG.info("Loaded HFile " + srcPath + " into " + this + " as " 923 + dstPath + " - updating store file list."); 924 925 HStoreFile sf = createStoreFileAndReader(dstPath); 926 bulkLoadHFile(sf); 927 928 LOG.info("Successfully loaded {} into {} (new location: {})", 929 srcPath, this, dstPath); 930 931 return dstPath; 932 } 933 934 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 935 HStoreFile sf = createStoreFileAndReader(fileInfo); 936 bulkLoadHFile(sf); 937 } 938 939 private void bulkLoadHFile(HStoreFile sf) throws IOException { 940 StoreFileReader r = sf.getReader(); 941 this.storeSize.addAndGet(r.length()); 942 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 943 944 // Append the new storefile into the list 945 this.lock.writeLock().lock(); 946 try { 947 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 948 } finally { 949 // We need the lock, as long as we are updating the storeFiles 950 // or changing the memstore. Let us release it before calling 951 // notifyChangeReadersObservers. See HBASE-4485 for a possible 952 // deadlock scenario that could have happened if continue to hold 953 // the lock. 954 this.lock.writeLock().unlock(); 955 } 956 LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); 957 if (LOG.isTraceEnabled()) { 958 String traceMessage = "BULK LOAD time,size,store size,store files [" 959 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize 960 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 961 LOG.trace(traceMessage); 962 } 963 } 964 965 /** 966 * Close all the readers We don't need to worry about subsequent requests because the Region holds 967 * a write lock that will prevent any more reads or writes. 968 * @return the {@link StoreFile StoreFiles} that were previously being used. 969 * @throws IOException on failure 970 */ 971 public ImmutableCollection<HStoreFile> close() throws IOException { 972 this.archiveLock.lock(); 973 this.lock.writeLock().lock(); 974 try { 975 // Clear so metrics doesn't find them. 976 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 977 Collection<HStoreFile> compactedfiles = 978 storeEngine.getStoreFileManager().clearCompactedFiles(); 979 // clear the compacted files 980 if (CollectionUtils.isNotEmpty(compactedfiles)) { 981 removeCompactedfiles(compactedfiles, cacheConf != null ? 982 cacheConf.shouldEvictOnClose() : true); 983 } 984 if (!result.isEmpty()) { 985 // initialize the thread pool for closing store files in parallel. 986 ThreadPoolExecutor storeFileCloserThreadPool = this.region 987 .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" 988 + this.getColumnFamilyName()); 989 990 // close each store file in parallel 991 CompletionService<Void> completionService = 992 new ExecutorCompletionService<>(storeFileCloserThreadPool); 993 for (HStoreFile f : result) { 994 completionService.submit(new Callable<Void>() { 995 @Override 996 public Void call() throws IOException { 997 boolean evictOnClose = 998 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 999 f.closeStoreFile(evictOnClose); 1000 return null; 1001 } 1002 }); 1003 } 1004 1005 IOException ioe = null; 1006 try { 1007 for (int i = 0; i < result.size(); i++) { 1008 try { 1009 Future<Void> future = completionService.take(); 1010 future.get(); 1011 } catch (InterruptedException e) { 1012 if (ioe == null) { 1013 ioe = new InterruptedIOException(); 1014 ioe.initCause(e); 1015 } 1016 } catch (ExecutionException e) { 1017 if (ioe == null) { 1018 ioe = new IOException(e.getCause()); 1019 } 1020 } 1021 } 1022 } finally { 1023 storeFileCloserThreadPool.shutdownNow(); 1024 } 1025 if (ioe != null) { 1026 throw ioe; 1027 } 1028 } 1029 LOG.trace("Closed {}", this); 1030 return result; 1031 } finally { 1032 this.lock.writeLock().unlock(); 1033 this.archiveLock.unlock(); 1034 } 1035 } 1036 1037 /** 1038 * Snapshot this stores memstore. Call before running 1039 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, 1040 * FlushLifeCycleTracker)} 1041 * so it has some work to do. 1042 */ 1043 void snapshot() { 1044 this.lock.writeLock().lock(); 1045 try { 1046 this.memstore.snapshot(); 1047 } finally { 1048 this.lock.writeLock().unlock(); 1049 } 1050 } 1051 1052 /** 1053 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. 1054 * @param logCacheFlushId flush sequence number 1055 * @return The path name of the tmp file to which the store was flushed 1056 * @throws IOException if exception occurs during process 1057 */ 1058 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 1059 MonitoredTask status, ThroughputController throughputController, 1060 FlushLifeCycleTracker tracker) throws IOException { 1061 // If an exception happens flushing, we let it out without clearing 1062 // the memstore snapshot. The old snapshot will be returned when we say 1063 // 'snapshot', the next time flush comes around. 1064 // Retry after catching exception when flushing, otherwise server will abort 1065 // itself 1066 StoreFlusher flusher = storeEngine.getStoreFlusher(); 1067 IOException lastException = null; 1068 for (int i = 0; i < flushRetriesNumber; i++) { 1069 try { 1070 List<Path> pathNames = 1071 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); 1072 Path lastPathName = null; 1073 try { 1074 for (Path pathName : pathNames) { 1075 lastPathName = pathName; 1076 validateStoreFile(pathName); 1077 } 1078 return pathNames; 1079 } catch (Exception e) { 1080 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 1081 if (e instanceof IOException) { 1082 lastException = (IOException) e; 1083 } else { 1084 lastException = new IOException(e); 1085 } 1086 } 1087 } catch (IOException e) { 1088 LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e); 1089 lastException = e; 1090 } 1091 if (lastException != null && i < (flushRetriesNumber - 1)) { 1092 try { 1093 Thread.sleep(pauseTime); 1094 } catch (InterruptedException e) { 1095 IOException iie = new InterruptedIOException(); 1096 iie.initCause(e); 1097 throw iie; 1098 } 1099 } 1100 } 1101 throw lastException; 1102 } 1103 1104 public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { 1105 LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this); 1106 FileSystem srcFs = path.getFileSystem(conf); 1107 srcFs.access(path, FsAction.READ_WRITE); 1108 try (HFile.Reader reader = 1109 HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) { 1110 Optional<byte[]> firstKey = reader.getFirstRowKey(); 1111 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 1112 Optional<Cell> lk = reader.getLastKey(); 1113 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 1114 byte[] lastKey = CellUtil.cloneRow(lk.get()); 1115 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 1116 throw new WrongRegionException("Recovered hfile " + path.toString() + 1117 " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); 1118 } 1119 } 1120 1121 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1122 HStoreFile sf = createStoreFileAndReader(dstPath); 1123 StoreFileReader r = sf.getReader(); 1124 this.storeSize.addAndGet(r.length()); 1125 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1126 1127 this.lock.writeLock().lock(); 1128 try { 1129 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 1130 } finally { 1131 this.lock.writeLock().unlock(); 1132 } 1133 1134 LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, 1135 r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); 1136 return sf; 1137 } 1138 1139 /** 1140 * @param path The pathname of the tmp file into which the store was flushed 1141 * @return store file created. 1142 */ 1143 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) 1144 throws IOException { 1145 // Write-out finished successfully, move into the right spot 1146 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1147 1148 status.setStatus("Flushing " + this + ": reopening flushed file"); 1149 HStoreFile sf = createStoreFileAndReader(dstPath); 1150 1151 StoreFileReader r = sf.getReader(); 1152 this.storeSize.addAndGet(r.length()); 1153 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1154 1155 if (LOG.isInfoEnabled()) { 1156 LOG.info("Added " + sf + ", entries=" + r.getEntries() + 1157 ", sequenceid=" + logCacheFlushId + 1158 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1159 } 1160 return sf; 1161 } 1162 1163 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1164 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1165 boolean shouldDropBehind) throws IOException { 1166 return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, 1167 includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING); 1168 } 1169 1170 /** 1171 * @param compression Compression algorithm to use 1172 * @param isCompaction whether we are creating a new file in a compaction 1173 * @param includeMVCCReadpoint - whether to include MVCC or not 1174 * @param includesTag - includesTag or not 1175 * @return Writer for a new StoreFile in the tmp dir. 1176 */ 1177 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of 1178 // compaction 1179 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1180 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1181 boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy) 1182 throws IOException { 1183 // creating new cache config for each new writer 1184 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 1185 if (isCompaction) { 1186 // Don't cache data on write on compactions, unless specifically configured to do so 1187 // Cache only when total file size remains lower than configured threshold 1188 final boolean cacheCompactedBlocksOnWrite = 1189 cacheConf.shouldCacheCompactedBlocksOnWrite(); 1190 // if data blocks are to be cached on write 1191 // during compaction, we should forcefully 1192 // cache index and bloom blocks as well 1193 if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf 1194 .getCacheCompactedBlocksOnWriteThreshold()) { 1195 writerCacheConf.enableCacheOnWrite(); 1196 if (!cacheOnWriteLogged) { 1197 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + 1198 "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 1199 cacheOnWriteLogged = true; 1200 } 1201 } else { 1202 writerCacheConf.setCacheDataOnWrite(false); 1203 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 1204 // checking condition once again for logging 1205 LOG.debug( 1206 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 1207 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 1208 this, totalCompactedFilesSize, 1209 cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 1210 } 1211 } 1212 } else { 1213 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 1214 if (shouldCacheDataOnWrite) { 1215 writerCacheConf.enableCacheOnWrite(); 1216 if (!cacheOnWriteLogged) { 1217 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + 1218 "Index blocks and Bloom filter blocks", this); 1219 cacheOnWriteLogged = true; 1220 } 1221 } 1222 } 1223 InetSocketAddress[] favoredNodes = null; 1224 if (region.getRegionServerServices() != null) { 1225 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( 1226 region.getRegionInfo().getEncodedName()); 1227 } 1228 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, 1229 cryptoContext); 1230 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); 1231 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, 1232 this.getFileSystem()) 1233 .withOutputDir(familyTempDir) 1234 .withBloomType(family.getBloomFilterType()) 1235 .withMaxKeyCount(maxKeyCount) 1236 .withFavoredNodes(favoredNodes) 1237 .withFileContext(hFileContext) 1238 .withShouldDropCacheBehind(shouldDropBehind) 1239 .withCompactedFilesSupplier(this::getCompactedFiles) 1240 .withFileStoragePolicy(fileStoragePolicy); 1241 return builder.build(); 1242 } 1243 1244 private HFileContext createFileContext(Compression.Algorithm compression, 1245 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { 1246 if (compression == null) { 1247 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 1248 } 1249 HFileContext hFileContext = new HFileContextBuilder() 1250 .withIncludesMvcc(includeMVCCReadpoint) 1251 .withIncludesTags(includesTag) 1252 .withCompression(compression) 1253 .withCompressTags(family.isCompressTags()) 1254 .withChecksumType(checksumType) 1255 .withBytesPerCheckSum(bytesPerChecksum) 1256 .withBlockSize(blocksize) 1257 .withHBaseCheckSum(true) 1258 .withDataBlockEncoding(family.getDataBlockEncoding()) 1259 .withEncryptionContext(cryptoContext) 1260 .withCreateTime(EnvironmentEdgeManager.currentTime()) 1261 .withColumnFamily(family.getName()) 1262 .withTableName(region.getTableDescriptor() 1263 .getTableName().getName()) 1264 .withCellComparator(this.comparator) 1265 .build(); 1266 return hFileContext; 1267 } 1268 1269 1270 private long getTotalSize(Collection<HStoreFile> sfs) { 1271 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 1272 } 1273 1274 /** 1275 * Change storeFiles adding into place the Reader produced by this new flush. 1276 * @param sfs Store files 1277 * @return Whether compaction is required. 1278 */ 1279 private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { 1280 this.lock.writeLock().lock(); 1281 try { 1282 this.storeEngine.getStoreFileManager().insertNewFiles(sfs); 1283 if (snapshotId > 0) { 1284 this.memstore.clearSnapshot(snapshotId); 1285 } 1286 } finally { 1287 // We need the lock, as long as we are updating the storeFiles 1288 // or changing the memstore. Let us release it before calling 1289 // notifyChangeReadersObservers. See HBASE-4485 for a possible 1290 // deadlock scenario that could have happened if continue to hold 1291 // the lock. 1292 this.lock.writeLock().unlock(); 1293 } 1294 // notify to be called here - only in case of flushes 1295 notifyChangedReadersObservers(sfs); 1296 if (LOG.isTraceEnabled()) { 1297 long totalSize = getTotalSize(sfs); 1298 String traceMessage = "FLUSH time,count,size,store size,store files [" 1299 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize 1300 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 1301 LOG.trace(traceMessage); 1302 } 1303 return needsCompaction(); 1304 } 1305 1306 /** 1307 * Notify all observers that set of Readers has changed. 1308 */ 1309 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 1310 for (ChangedReadersObserver o : this.changedReaderObservers) { 1311 List<KeyValueScanner> memStoreScanners; 1312 this.lock.readLock().lock(); 1313 try { 1314 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 1315 } finally { 1316 this.lock.readLock().unlock(); 1317 } 1318 o.updateReaders(sfs, memStoreScanners); 1319 } 1320 } 1321 1322 /** 1323 * Get all scanners with no filtering based on TTL (that happens further down the line). 1324 * @param cacheBlocks cache the blocks or not 1325 * @param usePread true to use pread, false if not 1326 * @param isCompaction true if the scanner is created for compaction 1327 * @param matcher the scan query matcher 1328 * @param startRow the start row 1329 * @param stopRow the stop row 1330 * @param readPt the read point of the current scan 1331 * @return all scanners for this store 1332 */ 1333 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 1334 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 1335 throws IOException { 1336 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 1337 readPt); 1338 } 1339 1340 /** 1341 * Get all scanners with no filtering based on TTL (that happens further down the line). 1342 * @param cacheBlocks cache the blocks or not 1343 * @param usePread true to use pread, false if not 1344 * @param isCompaction true if the scanner is created for compaction 1345 * @param matcher the scan query matcher 1346 * @param startRow the start row 1347 * @param includeStartRow true to include start row, false if not 1348 * @param stopRow the stop row 1349 * @param includeStopRow true to include stop row, false if not 1350 * @param readPt the read point of the current scan 1351 * @return all scanners for this store 1352 */ 1353 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 1354 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 1355 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 1356 Collection<HStoreFile> storeFilesToScan; 1357 List<KeyValueScanner> memStoreScanners; 1358 this.lock.readLock().lock(); 1359 try { 1360 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 1361 includeStartRow, stopRow, includeStopRow); 1362 memStoreScanners = this.memstore.getScanners(readPt); 1363 } finally { 1364 this.lock.readLock().unlock(); 1365 } 1366 1367 try { 1368 // First the store file scanners 1369 1370 // TODO this used to get the store files in descending order, 1371 // but now we get them in ascending order, which I think is 1372 // actually more correct, since memstore get put at the end. 1373 List<StoreFileScanner> sfScanners = StoreFileScanner 1374 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, 1375 matcher, readPt); 1376 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1377 scanners.addAll(sfScanners); 1378 // Then the memstore scanners 1379 scanners.addAll(memStoreScanners); 1380 return scanners; 1381 } catch (Throwable t) { 1382 clearAndClose(memStoreScanners); 1383 throw t instanceof IOException ? (IOException) t : new IOException(t); 1384 } 1385 } 1386 1387 private static void clearAndClose(List<KeyValueScanner> scanners) { 1388 if (scanners == null) { 1389 return; 1390 } 1391 for (KeyValueScanner s : scanners) { 1392 s.close(); 1393 } 1394 scanners.clear(); 1395 } 1396 1397 /** 1398 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1399 * (that happens further down the line). 1400 * @param files the list of files on which the scanners has to be created 1401 * @param cacheBlocks cache the blocks or not 1402 * @param usePread true to use pread, false if not 1403 * @param isCompaction true if the scanner is created for compaction 1404 * @param matcher the scan query matcher 1405 * @param startRow the start row 1406 * @param stopRow the stop row 1407 * @param readPt the read point of the current scan 1408 * @param includeMemstoreScanner true if memstore has to be included 1409 * @return scanners on the given files and on the memstore if specified 1410 */ 1411 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1412 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1413 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1414 throws IOException { 1415 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1416 false, readPt, includeMemstoreScanner); 1417 } 1418 1419 /** 1420 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1421 * (that happens further down the line). 1422 * @param files the list of files on which the scanners has to be created 1423 * @param cacheBlocks ache the blocks or not 1424 * @param usePread true to use pread, false if not 1425 * @param isCompaction true if the scanner is created for compaction 1426 * @param matcher the scan query matcher 1427 * @param startRow the start row 1428 * @param includeStartRow true to include start row, false if not 1429 * @param stopRow the stop row 1430 * @param includeStopRow true to include stop row, false if not 1431 * @param readPt the read point of the current scan 1432 * @param includeMemstoreScanner true if memstore has to be included 1433 * @return scanners on the given files and on the memstore if specified 1434 */ 1435 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1436 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1437 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1438 boolean includeMemstoreScanner) throws IOException { 1439 List<KeyValueScanner> memStoreScanners = null; 1440 if (includeMemstoreScanner) { 1441 this.lock.readLock().lock(); 1442 try { 1443 memStoreScanners = this.memstore.getScanners(readPt); 1444 } finally { 1445 this.lock.readLock().unlock(); 1446 } 1447 } 1448 try { 1449 List<StoreFileScanner> sfScanners = StoreFileScanner 1450 .getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher, 1451 readPt); 1452 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1453 scanners.addAll(sfScanners); 1454 // Then the memstore scanners 1455 if (memStoreScanners != null) { 1456 scanners.addAll(memStoreScanners); 1457 } 1458 return scanners; 1459 } catch (Throwable t) { 1460 clearAndClose(memStoreScanners); 1461 throw t instanceof IOException ? (IOException) t : new IOException(t); 1462 } 1463 } 1464 1465 /** 1466 * @param o Observer who wants to know about changes in set of Readers 1467 */ 1468 public void addChangedReaderObserver(ChangedReadersObserver o) { 1469 this.changedReaderObservers.add(o); 1470 } 1471 1472 /** 1473 * @param o Observer no longer interested in changes in set of Readers. 1474 */ 1475 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1476 // We don't check if observer present; it may not be (legitimately) 1477 this.changedReaderObservers.remove(o); 1478 } 1479 1480 ////////////////////////////////////////////////////////////////////////////// 1481 // Compaction 1482 ////////////////////////////////////////////////////////////////////////////// 1483 1484 /** 1485 * Compact the StoreFiles. This method may take some time, so the calling 1486 * thread must be able to block for long periods. 1487 * 1488 * <p>During this time, the Store can work as usual, getting values from 1489 * StoreFiles and writing new StoreFiles from the memstore. 1490 * 1491 * Existing StoreFiles are not destroyed until the new compacted StoreFile is 1492 * completely written-out to disk. 1493 * 1494 * <p>The compactLock prevents multiple simultaneous compactions. 1495 * The structureLock prevents us from interfering with other write operations. 1496 * 1497 * <p>We don't want to hold the structureLock for the whole time, as a compact() 1498 * can be lengthy and we want to allow cache-flushes during this period. 1499 * 1500 * <p> Compaction event should be idempotent, since there is no IO Fencing for 1501 * the region directory in hdfs. A region server might still try to complete the 1502 * compaction after it lost the region. That is why the following events are carefully 1503 * ordered for a compaction: 1504 * 1. Compaction writes new files under region/.tmp directory (compaction output) 1505 * 2. Compaction atomically moves the temporary file under region directory 1506 * 3. Compaction appends a WAL edit containing the compaction input and output files. 1507 * Forces sync on WAL. 1508 * 4. Compaction deletes the input files from the region directory. 1509 * 1510 * Failure conditions are handled like this: 1511 * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes 1512 * the compaction later, it will only write the new data file to the region directory. 1513 * Since we already have this data, this will be idempotent but we will have a redundant 1514 * copy of the data. 1515 * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The 1516 * RS that failed won't be able to finish sync() for WAL because of lease recovery in WAL. 1517 * - If RS fails after 3, the region region server who opens the region will pick up the 1518 * the compaction marker from the WAL and replay it by removing the compaction input files. 1519 * Failed RS can also attempt to delete those files, but the operation will be idempotent 1520 * 1521 * See HBASE-2231 for details. 1522 * 1523 * @param compaction compaction details obtained from requestCompaction() 1524 * @return Storefile we compacted into or null if we failed or opted out early. 1525 */ 1526 public List<HStoreFile> compact(CompactionContext compaction, 1527 ThroughputController throughputController, User user) throws IOException { 1528 assert compaction != null; 1529 CompactionRequestImpl cr = compaction.getRequest(); 1530 try { 1531 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1532 // because we need to clean up after it on the way out in a finally 1533 // block below 1534 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1535 assert compaction.hasSelection(); 1536 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1537 assert !filesToCompact.isEmpty(); 1538 synchronized (filesCompacting) { 1539 // sanity check: we're compacting files that this store knows about 1540 // TODO: change this to LOG.error() after more debugging 1541 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1542 } 1543 1544 // Ready to go. Have list of files to compact. 1545 LOG.info("Starting compaction of " + filesToCompact + 1546 " into tmpdir=" + fs.getTempDir() + ", totalSize=" + 1547 TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1548 1549 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1550 compaction.compact(throughputController, user)); 1551 } finally { 1552 finishCompactionRequest(cr); 1553 } 1554 } 1555 1556 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1557 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 1558 List<Path> newFiles) throws IOException { 1559 // Do the steps necessary to complete the compaction. 1560 setStoragePolicyFromFileName(newFiles); 1561 List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); 1562 writeCompactionWalRecord(filesToCompact, sfs); 1563 replaceStoreFiles(filesToCompact, sfs); 1564 if (cr.isMajor()) { 1565 majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1566 majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1567 } else { 1568 compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1569 compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1570 } 1571 long outputBytes = getTotalSize(sfs); 1572 1573 // At this point the store will use new files for all new scanners. 1574 completeCompaction(filesToCompact); // update store size. 1575 1576 long now = EnvironmentEdgeManager.currentTime(); 1577 if (region.getRegionServerServices() != null 1578 && region.getRegionServerServices().getMetrics() != null) { 1579 region.getRegionServerServices().getMetrics().updateCompaction( 1580 region.getTableDescriptor().getTableName().getNameAsString(), 1581 cr.isMajor(), now - compactionStartTime, cr.getFiles().size(), 1582 newFiles.size(), cr.getSize(), outputBytes); 1583 1584 } 1585 1586 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1587 return sfs; 1588 } 1589 1590 // Set correct storage policy from the file name of DTCP. 1591 // Rename file will not change the storage policy. 1592 private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException { 1593 String prefix = HConstants.STORAGE_POLICY_PREFIX; 1594 for (Path newFile : newFiles) { 1595 if (newFile.getParent().getName().startsWith(prefix)) { 1596 CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile, 1597 newFile.getParent().getName().substring(prefix.length())); 1598 } 1599 } 1600 } 1601 1602 private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, 1603 List<Path> newFiles, User user) throws IOException { 1604 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 1605 for (Path newFile : newFiles) { 1606 assert newFile != null; 1607 HStoreFile sf = moveFileIntoPlace(newFile); 1608 if (this.getCoprocessorHost() != null) { 1609 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1610 } 1611 assert sf != null; 1612 sfs.add(sf); 1613 } 1614 return sfs; 1615 } 1616 1617 // Package-visible for tests 1618 HStoreFile moveFileIntoPlace(Path newFile) throws IOException { 1619 validateStoreFile(newFile); 1620 // Move the file into the right spot 1621 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); 1622 return createStoreFileAndReader(destPath); 1623 } 1624 1625 /** 1626 * Writes the compaction WAL record. 1627 * @param filesCompacted Files compacted (input). 1628 * @param newFiles Files from compaction. 1629 */ 1630 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1631 Collection<HStoreFile> newFiles) throws IOException { 1632 if (region.getWAL() == null) { 1633 return; 1634 } 1635 List<Path> inputPaths = 1636 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1637 List<Path> outputPaths = 1638 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1639 RegionInfo info = this.region.getRegionInfo(); 1640 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1641 family.getName(), inputPaths, outputPaths, 1642 fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1643 // Fix reaching into Region to get the maxWaitForSeqId. 1644 // Does this method belong in Region altogether given it is making so many references up there? 1645 // Could be Region#writeCompactionMarker(compactionDescriptor); 1646 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1647 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1648 } 1649 1650 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) 1651 throws IOException { 1652 this.lock.writeLock().lock(); 1653 try { 1654 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); 1655 synchronized (filesCompacting) { 1656 filesCompacting.removeAll(compactedFiles); 1657 } 1658 1659 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1660 // sizes later so it's not super-critical if we miss these. 1661 RegionServerServices rsServices = region.getRegionServerServices(); 1662 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1663 updateSpaceQuotaAfterFileReplacement( 1664 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1665 compactedFiles, result); 1666 } 1667 } finally { 1668 this.lock.writeLock().unlock(); 1669 } 1670 } 1671 1672 /** 1673 * Updates the space quota usage for this region, removing the size for files compacted away 1674 * and adding in the size for new files. 1675 * 1676 * @param sizeStore The object tracking changes in region size for space quotas. 1677 * @param regionInfo The identifier for the region whose size is being updated. 1678 * @param oldFiles Files removed from this store's region. 1679 * @param newFiles Files added to this store's region. 1680 */ 1681 void updateSpaceQuotaAfterFileReplacement( 1682 RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles, 1683 Collection<HStoreFile> newFiles) { 1684 long delta = 0; 1685 if (oldFiles != null) { 1686 for (HStoreFile compactedFile : oldFiles) { 1687 if (compactedFile.isHFile()) { 1688 delta -= compactedFile.getReader().length(); 1689 } 1690 } 1691 } 1692 if (newFiles != null) { 1693 for (HStoreFile newFile : newFiles) { 1694 if (newFile.isHFile()) { 1695 delta += newFile.getReader().length(); 1696 } 1697 } 1698 } 1699 sizeStore.incrementRegionSize(regionInfo, delta); 1700 } 1701 1702 /** 1703 * Log a very elaborate compaction completion message. 1704 * @param cr Request. 1705 * @param sfs Resulting files. 1706 * @param compactionStartTime Start time. 1707 */ 1708 private void logCompactionEndMessage( 1709 CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) { 1710 StringBuilder message = new StringBuilder( 1711 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " 1712 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " 1713 + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1714 if (sfs.isEmpty()) { 1715 message.append("none, "); 1716 } else { 1717 for (HStoreFile sf: sfs) { 1718 message.append(sf.getPath().getName()); 1719 message.append("(size="); 1720 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1721 message.append("), "); 1722 } 1723 } 1724 message.append("total size for store is ") 1725 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1726 .append(". This selection was in queue for ") 1727 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1728 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1729 .append(" to execute."); 1730 LOG.info(message.toString()); 1731 if (LOG.isTraceEnabled()) { 1732 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1733 long resultSize = getTotalSize(sfs); 1734 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1735 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1736 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1737 LOG.trace(traceMessage); 1738 } 1739 } 1740 1741 /** 1742 * Call to complete a compaction. Its for the case where we find in the WAL a compaction 1743 * that was not finished. We could find one recovering a WAL after a regionserver crash. 1744 * See HBASE-2231. 1745 */ 1746 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1747 boolean removeFiles) throws IOException { 1748 LOG.debug("Completing compaction from the WAL marker"); 1749 List<String> compactionInputs = compaction.getCompactionInputList(); 1750 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1751 1752 // The Compaction Marker is written after the compaction is completed, 1753 // and the files moved into the region/family folder. 1754 // 1755 // If we crash after the entry is written, we may not have removed the 1756 // input files, but the output file is present. 1757 // (The unremoved input files will be removed by this function) 1758 // 1759 // If we scan the directory and the file is not present, it can mean that: 1760 // - The file was manually removed by the user 1761 // - The file was removed as consequence of subsequent compaction 1762 // so, we can't do anything with the "compaction output list" because those 1763 // files have already been loaded when opening the region (by virtue of 1764 // being in the store's folder) or they may be missing due to a compaction. 1765 1766 String familyName = this.getColumnFamilyName(); 1767 Set<String> inputFiles = new HashSet<>(); 1768 for (String compactionInput : compactionInputs) { 1769 Path inputPath = fs.getStoreFilePath(familyName, compactionInput); 1770 inputFiles.add(inputPath.getName()); 1771 } 1772 1773 //some of the input files might already be deleted 1774 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1775 for (HStoreFile sf : this.getStorefiles()) { 1776 if (inputFiles.contains(sf.getPath().getName())) { 1777 inputStoreFiles.add(sf); 1778 } 1779 } 1780 1781 // check whether we need to pick up the new files 1782 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1783 1784 if (pickCompactionFiles) { 1785 for (HStoreFile sf : this.getStorefiles()) { 1786 compactionOutputs.remove(sf.getPath().getName()); 1787 } 1788 for (String compactionOutput : compactionOutputs) { 1789 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1790 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 1791 outputStoreFiles.add(storeFile); 1792 } 1793 } 1794 1795 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1796 LOG.info("Replaying compaction marker, replacing input files: " + 1797 inputStoreFiles + " with output files : " + outputStoreFiles); 1798 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); 1799 this.completeCompaction(inputStoreFiles); 1800 } 1801 } 1802 1803 /** 1804 * This method tries to compact N recent files for testing. 1805 * Note that because compacting "recent" files only makes sense for some policies, 1806 * e.g. the default one, it assumes default policy is used. It doesn't use policy, 1807 * but instead makes a compaction candidate list by itself. 1808 * @param N Number of files. 1809 */ 1810 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { 1811 List<HStoreFile> filesToCompact; 1812 boolean isMajor; 1813 1814 this.lock.readLock().lock(); 1815 try { 1816 synchronized (filesCompacting) { 1817 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles()); 1818 if (!filesCompacting.isEmpty()) { 1819 // exclude all files older than the newest file we're currently 1820 // compacting. this allows us to preserve contiguity (HBASE-2856) 1821 HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); 1822 int idx = filesToCompact.indexOf(last); 1823 Preconditions.checkArgument(idx != -1); 1824 filesToCompact.subList(0, idx + 1).clear(); 1825 } 1826 int count = filesToCompact.size(); 1827 if (N > count) { 1828 throw new RuntimeException("Not enough files"); 1829 } 1830 1831 filesToCompact = filesToCompact.subList(count - N, count); 1832 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); 1833 filesCompacting.addAll(filesToCompact); 1834 Collections.sort(filesCompacting, storeEngine.getStoreFileManager() 1835 .getStoreFileComparator()); 1836 } 1837 } finally { 1838 this.lock.readLock().unlock(); 1839 } 1840 1841 try { 1842 // Ready to go. Have list of files to compact. 1843 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor()) 1844 .compactForTesting(filesToCompact, isMajor); 1845 for (Path newFile: newFiles) { 1846 // Move the compaction into place. 1847 HStoreFile sf = moveFileIntoPlace(newFile); 1848 if (this.getCoprocessorHost() != null) { 1849 this.getCoprocessorHost().postCompact(this, sf, null, null, null); 1850 } 1851 replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); 1852 completeCompaction(filesToCompact); 1853 } 1854 } finally { 1855 synchronized (filesCompacting) { 1856 filesCompacting.removeAll(filesToCompact); 1857 } 1858 } 1859 } 1860 1861 @Override 1862 public boolean hasReferences() { 1863 // Grab the read lock here, because we need to ensure that: only when the atomic 1864 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1865 this.lock.readLock().lock(); 1866 try { 1867 // Merge the current store files with compacted files here due to HBASE-20940. 1868 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1869 allStoreFiles.addAll(getCompactedFiles()); 1870 return StoreUtils.hasReferences(allStoreFiles); 1871 } finally { 1872 this.lock.readLock().unlock(); 1873 } 1874 } 1875 1876 /** 1877 * getter for CompactionProgress object 1878 * @return CompactionProgress object; can be null 1879 */ 1880 public CompactionProgress getCompactionProgress() { 1881 return this.storeEngine.getCompactor().getProgress(); 1882 } 1883 1884 @Override 1885 public boolean shouldPerformMajorCompaction() throws IOException { 1886 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1887 // TODO: what are these reader checks all over the place? 1888 if (sf.getReader() == null) { 1889 LOG.debug("StoreFile {} has null Reader", sf); 1890 return false; 1891 } 1892 } 1893 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( 1894 this.storeEngine.getStoreFileManager().getStorefiles()); 1895 } 1896 1897 public Optional<CompactionContext> requestCompaction() throws IOException { 1898 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1899 } 1900 1901 public Optional<CompactionContext> requestCompaction(int priority, 1902 CompactionLifeCycleTracker tracker, User user) throws IOException { 1903 // don't even select for compaction if writes are disabled 1904 if (!this.areWritesEnabled()) { 1905 return Optional.empty(); 1906 } 1907 // Before we do compaction, try to get rid of unneeded files to simplify things. 1908 removeUnneededFiles(); 1909 1910 final CompactionContext compaction = storeEngine.createCompaction(); 1911 CompactionRequestImpl request = null; 1912 this.lock.readLock().lock(); 1913 try { 1914 synchronized (filesCompacting) { 1915 // First, see if coprocessor would want to override selection. 1916 if (this.getCoprocessorHost() != null) { 1917 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1918 boolean override = getCoprocessorHost().preCompactSelection(this, 1919 candidatesForCoproc, tracker, user); 1920 if (override) { 1921 // Coprocessor is overriding normal file selection. 1922 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1923 } 1924 } 1925 1926 // Normal case - coprocessor is not overriding file selection. 1927 if (!compaction.hasSelection()) { 1928 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1929 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && 1930 offPeakCompactionTracker.compareAndSet(false, true); 1931 try { 1932 compaction.select(this.filesCompacting, isUserCompaction, 1933 mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); 1934 } catch (IOException e) { 1935 if (mayUseOffPeak) { 1936 offPeakCompactionTracker.set(false); 1937 } 1938 throw e; 1939 } 1940 assert compaction.hasSelection(); 1941 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1942 // Compaction policy doesn't want to take advantage of off-peak. 1943 offPeakCompactionTracker.set(false); 1944 } 1945 } 1946 if (this.getCoprocessorHost() != null) { 1947 this.getCoprocessorHost().postCompactSelection( 1948 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1949 compaction.getRequest(), user); 1950 } 1951 // Finally, we have the resulting files list. Check if we have any files at all. 1952 request = compaction.getRequest(); 1953 Collection<HStoreFile> selectedFiles = request.getFiles(); 1954 if (selectedFiles.isEmpty()) { 1955 return Optional.empty(); 1956 } 1957 1958 addToCompactingFiles(selectedFiles); 1959 1960 // If we're enqueuing a major, clear the force flag. 1961 this.forceMajor = this.forceMajor && !request.isMajor(); 1962 1963 // Set common request properties. 1964 // Set priority, either override value supplied by caller or from store. 1965 final int compactionPriority = 1966 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); 1967 request.setPriority(compactionPriority); 1968 1969 if (request.isAfterSplit()) { 1970 // If the store belongs to recently splitted daughter regions, better we consider 1971 // them with the higher priority in the compaction queue. 1972 // Override priority if it is lower (higher int value) than 1973 // SPLIT_REGION_COMPACTION_PRIORITY 1974 final int splitHousekeepingPriority = 1975 Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); 1976 request.setPriority(splitHousekeepingPriority); 1977 LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it" 1978 + " belongs to recently split daughter region {}", splitHousekeepingPriority, 1979 this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString()); 1980 } 1981 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1982 request.setTracker(tracker); 1983 } 1984 } finally { 1985 this.lock.readLock().unlock(); 1986 } 1987 1988 if (LOG.isDebugEnabled()) { 1989 LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1990 + (request.isAllFiles() ? " (all files)" : "")); 1991 } 1992 this.region.reportCompactionRequestStart(request.isMajor()); 1993 return Optional.of(compaction); 1994 } 1995 1996 /** Adds the files to compacting files. filesCompacting must be locked. */ 1997 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1998 if (CollectionUtils.isEmpty(filesToAdd)) { 1999 return; 2000 } 2001 // Check that we do not try to compact the same StoreFile twice. 2002 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 2003 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 2004 } 2005 filesCompacting.addAll(filesToAdd); 2006 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 2007 } 2008 2009 private void removeUnneededFiles() throws IOException { 2010 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { 2011 return; 2012 } 2013 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 2014 LOG.debug("Skipping expired store file removal due to min version of {} being {}", 2015 this, getColumnFamilyDescriptor().getMinVersions()); 2016 return; 2017 } 2018 this.lock.readLock().lock(); 2019 Collection<HStoreFile> delSfs = null; 2020 try { 2021 synchronized (filesCompacting) { 2022 long cfTtl = getStoreFileTtl(); 2023 if (cfTtl != Long.MAX_VALUE) { 2024 delSfs = storeEngine.getStoreFileManager().getUnneededFiles( 2025 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 2026 addToCompactingFiles(delSfs); 2027 } 2028 } 2029 } finally { 2030 this.lock.readLock().unlock(); 2031 } 2032 2033 if (CollectionUtils.isEmpty(delSfs)) { 2034 return; 2035 } 2036 2037 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 2038 writeCompactionWalRecord(delSfs, newFiles); 2039 replaceStoreFiles(delSfs, newFiles); 2040 completeCompaction(delSfs); 2041 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " 2042 + this + "; total size is " 2043 + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 2044 } 2045 2046 public void cancelRequestedCompaction(CompactionContext compaction) { 2047 finishCompactionRequest(compaction.getRequest()); 2048 } 2049 2050 private void finishCompactionRequest(CompactionRequestImpl cr) { 2051 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 2052 if (cr.isOffPeak()) { 2053 offPeakCompactionTracker.set(false); 2054 cr.setOffPeak(false); 2055 } 2056 synchronized (filesCompacting) { 2057 filesCompacting.removeAll(cr.getFiles()); 2058 } 2059 } 2060 2061 /** 2062 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 2063 * operation. 2064 * @param path the path to the store file 2065 */ 2066 private void validateStoreFile(Path path) throws IOException { 2067 HStoreFile storeFile = null; 2068 try { 2069 storeFile = createStoreFileAndReader(path); 2070 } catch (IOException e) { 2071 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 2072 throw e; 2073 } finally { 2074 if (storeFile != null) { 2075 storeFile.closeStoreFile(false); 2076 } 2077 } 2078 } 2079 2080 /** 2081 * Update counts. 2082 * @param compactedFiles list of files that were compacted 2083 */ 2084 protected void completeCompaction(Collection<HStoreFile> compactedFiles) 2085 // Rename this method! TODO. 2086 throws IOException { 2087 this.storeSize.set(0L); 2088 this.totalUncompressedBytes.set(0L); 2089 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 2090 StoreFileReader r = hsf.getReader(); 2091 if (r == null) { 2092 LOG.warn("StoreFile {} has a null Reader", hsf); 2093 continue; 2094 } 2095 this.storeSize.addAndGet(r.length()); 2096 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 2097 } 2098 } 2099 2100 /* 2101 * @param wantedVersions How many versions were asked for. 2102 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 2103 */ 2104 int versionsToReturn(final int wantedVersions) { 2105 if (wantedVersions <= 0) { 2106 throw new IllegalArgumentException("Number of versions must be > 0"); 2107 } 2108 // Make sure we do not return more than maximum versions for this store. 2109 int maxVersions = this.family.getMaxVersions(); 2110 return wantedVersions > maxVersions ? maxVersions: wantedVersions; 2111 } 2112 2113 @Override 2114 public boolean canSplit() { 2115 // Not split-able if we find a reference store file present in the store. 2116 boolean result = !hasReferences(); 2117 if (!result) { 2118 LOG.trace("Not splittable; has references: {}", this); 2119 } 2120 return result; 2121 } 2122 2123 /** 2124 * Determines if Store should be split. 2125 */ 2126 public Optional<byte[]> getSplitPoint() { 2127 this.lock.readLock().lock(); 2128 try { 2129 // Should already be enforced by the split policy! 2130 assert !this.getRegionInfo().isMetaRegion(); 2131 // Not split-able if we find a reference store file present in the store. 2132 if (hasReferences()) { 2133 LOG.trace("Not splittable; has references: {}", this); 2134 return Optional.empty(); 2135 } 2136 return this.storeEngine.getStoreFileManager().getSplitPoint(); 2137 } catch(IOException e) { 2138 LOG.warn("Failed getting store size for {}", this, e); 2139 } finally { 2140 this.lock.readLock().unlock(); 2141 } 2142 return Optional.empty(); 2143 } 2144 2145 @Override 2146 public long getLastCompactSize() { 2147 return this.lastCompactSize; 2148 } 2149 2150 @Override 2151 public long getSize() { 2152 return storeSize.get(); 2153 } 2154 2155 public void triggerMajorCompaction() { 2156 this.forceMajor = true; 2157 } 2158 2159 ////////////////////////////////////////////////////////////////////////////// 2160 // File administration 2161 ////////////////////////////////////////////////////////////////////////////// 2162 2163 /** 2164 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 2165 * compaction. 2166 * @param scan Scan to apply when scanning the stores 2167 * @param targetCols columns to scan 2168 * @return a scanner over the current key values 2169 * @throws IOException on failure 2170 */ 2171 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 2172 throws IOException { 2173 lock.readLock().lock(); 2174 try { 2175 ScanInfo scanInfo; 2176 if (this.getCoprocessorHost() != null) { 2177 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan); 2178 } else { 2179 scanInfo = getScanInfo(); 2180 } 2181 return createScanner(scan, scanInfo, targetCols, readPt); 2182 } finally { 2183 lock.readLock().unlock(); 2184 } 2185 } 2186 2187 // HMobStore will override this method to return its own implementation. 2188 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 2189 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 2190 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 2191 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 2192 } 2193 2194 /** 2195 * Recreates the scanners on the current list of active store file scanners 2196 * @param currentFileScanners the current set of active store file scanners 2197 * @param cacheBlocks cache the blocks or not 2198 * @param usePread use pread or not 2199 * @param isCompaction is the scanner for compaction 2200 * @param matcher the scan query matcher 2201 * @param startRow the scan's start row 2202 * @param includeStartRow should the scan include the start row 2203 * @param stopRow the scan's stop row 2204 * @param includeStopRow should the scan include the stop row 2205 * @param readPt the read point of the current scane 2206 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 2207 * @return list of scanners recreated on the current Scanners 2208 */ 2209 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 2210 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 2211 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 2212 boolean includeMemstoreScanner) throws IOException { 2213 this.lock.readLock().lock(); 2214 try { 2215 Map<String, HStoreFile> name2File = 2216 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 2217 for (HStoreFile file : getStorefiles()) { 2218 name2File.put(file.getFileInfo().getActiveFileName(), file); 2219 } 2220 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 2221 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 2222 name2File.put(file.getFileInfo().getActiveFileName(), file); 2223 } 2224 List<HStoreFile> filesToReopen = new ArrayList<>(); 2225 for (KeyValueScanner kvs : currentFileScanners) { 2226 assert kvs.isFileScanner(); 2227 if (kvs.peek() == null) { 2228 continue; 2229 } 2230 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 2231 } 2232 if (filesToReopen.isEmpty()) { 2233 return null; 2234 } 2235 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 2236 includeStartRow, stopRow, includeStopRow, readPt, false); 2237 } finally { 2238 this.lock.readLock().unlock(); 2239 } 2240 } 2241 2242 @Override 2243 public String toString() { 2244 return this.getRegionInfo().getShortNameToLog()+ "/" + this.getColumnFamilyName(); 2245 } 2246 2247 @Override 2248 public int getStorefilesCount() { 2249 return this.storeEngine.getStoreFileManager().getStorefileCount(); 2250 } 2251 2252 @Override 2253 public int getCompactedFilesCount() { 2254 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 2255 } 2256 2257 private LongStream getStoreFileAgeStream() { 2258 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2259 if (sf.getReader() == null) { 2260 LOG.warn("StoreFile {} has a null Reader", sf); 2261 return false; 2262 } else { 2263 return true; 2264 } 2265 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 2266 .map(t -> EnvironmentEdgeManager.currentTime() - t); 2267 } 2268 2269 @Override 2270 public OptionalLong getMaxStoreFileAge() { 2271 return getStoreFileAgeStream().max(); 2272 } 2273 2274 @Override 2275 public OptionalLong getMinStoreFileAge() { 2276 return getStoreFileAgeStream().min(); 2277 } 2278 2279 @Override 2280 public OptionalDouble getAvgStoreFileAge() { 2281 return getStoreFileAgeStream().average(); 2282 } 2283 2284 @Override 2285 public long getNumReferenceFiles() { 2286 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2287 .filter(HStoreFile::isReference).count(); 2288 } 2289 2290 @Override 2291 public long getNumHFiles() { 2292 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2293 .filter(HStoreFile::isHFile).count(); 2294 } 2295 2296 @Override 2297 public long getStoreSizeUncompressed() { 2298 return this.totalUncompressedBytes.get(); 2299 } 2300 2301 @Override 2302 public long getStorefilesSize() { 2303 // Include all StoreFiles 2304 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); 2305 } 2306 2307 @Override 2308 public long getHFilesSize() { 2309 // Include only StoreFiles which are HFiles 2310 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 2311 HStoreFile::isHFile); 2312 } 2313 2314 private long getTotalUncompressedBytes(List<HStoreFile> files) { 2315 return files.stream() 2316 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) 2317 .sum(); 2318 } 2319 2320 private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) { 2321 return files.stream().filter(predicate) 2322 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); 2323 } 2324 2325 private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) { 2326 if (file == null) { 2327 return 0L; 2328 } 2329 StoreFileReader reader = file.getReader(); 2330 if (reader == null) { 2331 return 0L; 2332 } 2333 return f.applyAsLong(reader); 2334 } 2335 2336 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 2337 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2338 .mapToLong(file -> getStorefileFieldSize(file, f)).sum(); 2339 } 2340 2341 @Override 2342 public long getStorefilesRootLevelIndexSize() { 2343 return getStorefilesFieldSize(StoreFileReader::indexSize); 2344 } 2345 2346 @Override 2347 public long getTotalStaticIndexSize() { 2348 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 2349 } 2350 2351 @Override 2352 public long getTotalStaticBloomSize() { 2353 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 2354 } 2355 2356 @Override 2357 public MemStoreSize getMemStoreSize() { 2358 return this.memstore.size(); 2359 } 2360 2361 @Override 2362 public int getCompactPriority() { 2363 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 2364 if (priority == PRIORITY_USER) { 2365 LOG.warn("Compaction priority is USER despite there being no user compaction"); 2366 } 2367 return priority; 2368 } 2369 2370 public boolean throttleCompaction(long compactionSize) { 2371 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 2372 } 2373 2374 public HRegion getHRegion() { 2375 return this.region; 2376 } 2377 2378 public RegionCoprocessorHost getCoprocessorHost() { 2379 return this.region.getCoprocessorHost(); 2380 } 2381 2382 @Override 2383 public RegionInfo getRegionInfo() { 2384 return this.fs.getRegionInfo(); 2385 } 2386 2387 @Override 2388 public boolean areWritesEnabled() { 2389 return this.region.areWritesEnabled(); 2390 } 2391 2392 @Override 2393 public long getSmallestReadPoint() { 2394 return this.region.getSmallestReadPoint(); 2395 } 2396 2397 /** 2398 * Adds or replaces the specified KeyValues. 2399 * <p> 2400 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 2401 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 2402 * <p> 2403 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 2404 * across all of them. 2405 * @param readpoint readpoint below which we can safely remove duplicate KVs 2406 */ 2407 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 2408 throws IOException { 2409 this.lock.readLock().lock(); 2410 try { 2411 this.memstore.upsert(cells, readpoint, memstoreSizing); 2412 } finally { 2413 this.lock.readLock().unlock(); 2414 } 2415 } 2416 2417 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 2418 return new StoreFlusherImpl(cacheFlushId, tracker); 2419 } 2420 2421 private final class StoreFlusherImpl implements StoreFlushContext { 2422 2423 private final FlushLifeCycleTracker tracker; 2424 private final long cacheFlushSeqNum; 2425 private MemStoreSnapshot snapshot; 2426 private List<Path> tempFiles; 2427 private List<Path> committedFiles; 2428 private long cacheFlushCount; 2429 private long cacheFlushSize; 2430 private long outputFileSize; 2431 2432 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 2433 this.cacheFlushSeqNum = cacheFlushSeqNum; 2434 this.tracker = tracker; 2435 } 2436 2437 /** 2438 * This is not thread safe. The caller should have a lock on the region or the store. 2439 * If necessary, the lock can be added with the patch provided in HBASE-10087 2440 */ 2441 @Override 2442 public MemStoreSize prepare() { 2443 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 2444 this.snapshot = memstore.snapshot(); 2445 this.cacheFlushCount = snapshot.getCellsCount(); 2446 this.cacheFlushSize = snapshot.getDataSize(); 2447 committedFiles = new ArrayList<>(1); 2448 return snapshot.getMemStoreSize(); 2449 } 2450 2451 @Override 2452 public void flushCache(MonitoredTask status) throws IOException { 2453 RegionServerServices rsService = region.getRegionServerServices(); 2454 ThroughputController throughputController = 2455 rsService == null ? null : rsService.getFlushThroughputController(); 2456 tempFiles = 2457 HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); 2458 } 2459 2460 @Override 2461 public boolean commit(MonitoredTask status) throws IOException { 2462 if (CollectionUtils.isEmpty(this.tempFiles)) { 2463 return false; 2464 } 2465 List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); 2466 for (Path storeFilePath : tempFiles) { 2467 try { 2468 HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); 2469 outputFileSize += sf.getReader().length(); 2470 storeFiles.add(sf); 2471 } catch (IOException ex) { 2472 LOG.error("Failed to commit store file {}", storeFilePath, ex); 2473 // Try to delete the files we have committed before. 2474 for (HStoreFile sf : storeFiles) { 2475 Path pathToDelete = sf.getPath(); 2476 try { 2477 sf.deleteStoreFile(); 2478 } catch (IOException deleteEx) { 2479 LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " 2480 + "halting {}", pathToDelete, ex); 2481 Runtime.getRuntime().halt(1); 2482 } 2483 } 2484 throw new IOException("Failed to commit the flush", ex); 2485 } 2486 } 2487 2488 for (HStoreFile sf : storeFiles) { 2489 if (HStore.this.getCoprocessorHost() != null) { 2490 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2491 } 2492 committedFiles.add(sf.getPath()); 2493 } 2494 2495 HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); 2496 HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); 2497 HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); 2498 2499 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2500 return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); 2501 } 2502 2503 @Override 2504 public long getOutputFileSize() { 2505 return outputFileSize; 2506 } 2507 2508 @Override 2509 public List<Path> getCommittedFiles() { 2510 return committedFiles; 2511 } 2512 2513 /** 2514 * Similar to commit, but called in secondary region replicas for replaying the 2515 * flush cache from primary region. Adds the new files to the store, and drops the 2516 * snapshot depending on dropMemstoreSnapshot argument. 2517 * @param fileNames names of the flushed files 2518 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2519 */ 2520 @Override 2521 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2522 throws IOException { 2523 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2524 for (String file : fileNames) { 2525 // open the file as a store file (hfile link, etc) 2526 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); 2527 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 2528 storeFiles.add(storeFile); 2529 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2530 HStore.this.totalUncompressedBytes 2531 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2532 if (LOG.isInfoEnabled()) { 2533 LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + 2534 ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2535 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2536 } 2537 } 2538 2539 long snapshotId = -1; // -1 means do not drop 2540 if (dropMemstoreSnapshot && snapshot != null) { 2541 snapshotId = snapshot.getId(); 2542 snapshot.close(); 2543 } 2544 HStore.this.updateStorefiles(storeFiles, snapshotId); 2545 } 2546 2547 /** 2548 * Abort the snapshot preparation. Drops the snapshot if any. 2549 */ 2550 @Override 2551 public void abort() throws IOException { 2552 if (snapshot != null) { 2553 //We need to close the snapshot when aborting, otherwise, the segment scanner 2554 //won't be closed. If we are using MSLAB, the chunk referenced by those scanners 2555 //can't be released, thus memory leak 2556 snapshot.close(); 2557 HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); 2558 } 2559 } 2560 } 2561 2562 @Override 2563 public boolean needsCompaction() { 2564 List<HStoreFile> filesCompactingClone = null; 2565 synchronized (filesCompacting) { 2566 filesCompactingClone = Lists.newArrayList(filesCompacting); 2567 } 2568 return this.storeEngine.needsCompaction(filesCompactingClone); 2569 } 2570 2571 /** 2572 * Used for tests. 2573 * @return cache configuration for this Store. 2574 */ 2575 public CacheConfig getCacheConfig() { 2576 return this.cacheConf; 2577 } 2578 2579 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false); 2580 2581 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD 2582 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK 2583 + ClassSize.CONCURRENT_SKIPLISTMAP 2584 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT 2585 + ScanInfo.FIXED_OVERHEAD); 2586 2587 @Override 2588 public long heapSize() { 2589 MemStoreSize memstoreSize = this.memstore.size(); 2590 return DEEP_OVERHEAD + memstoreSize.getHeapSize(); 2591 } 2592 2593 @Override 2594 public CellComparator getComparator() { 2595 return comparator; 2596 } 2597 2598 public ScanInfo getScanInfo() { 2599 return scanInfo; 2600 } 2601 2602 /** 2603 * Set scan info, used by test 2604 * @param scanInfo new scan info to use for test 2605 */ 2606 void setScanInfo(ScanInfo scanInfo) { 2607 this.scanInfo = scanInfo; 2608 } 2609 2610 @Override 2611 public boolean hasTooManyStoreFiles() { 2612 return getStorefilesCount() > this.blockingFileCount; 2613 } 2614 2615 @Override 2616 public long getFlushedCellsCount() { 2617 return flushedCellsCount.get(); 2618 } 2619 2620 @Override 2621 public long getFlushedCellsSize() { 2622 return flushedCellsSize.get(); 2623 } 2624 2625 @Override 2626 public long getFlushedOutputFileSize() { 2627 return flushedOutputFileSize.get(); 2628 } 2629 2630 @Override 2631 public long getCompactedCellsCount() { 2632 return compactedCellsCount.get(); 2633 } 2634 2635 @Override 2636 public long getCompactedCellsSize() { 2637 return compactedCellsSize.get(); 2638 } 2639 2640 @Override 2641 public long getMajorCompactedCellsCount() { 2642 return majorCompactedCellsCount.get(); 2643 } 2644 2645 @Override 2646 public long getMajorCompactedCellsSize() { 2647 return majorCompactedCellsSize.get(); 2648 } 2649 2650 /** 2651 * Returns the StoreEngine that is backing this concrete implementation of Store. 2652 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2653 */ 2654 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2655 return this.storeEngine; 2656 } 2657 2658 protected OffPeakHours getOffPeakHours() { 2659 return this.offPeakHours; 2660 } 2661 2662 /** 2663 * {@inheritDoc} 2664 */ 2665 @Override 2666 public void onConfigurationChange(Configuration conf) { 2667 this.conf = new CompoundConfiguration() 2668 .add(conf) 2669 .addBytesMap(family.getValues()); 2670 this.storeEngine.compactionPolicy.setConf(conf); 2671 this.offPeakHours = OffPeakHours.getInstance(conf); 2672 } 2673 2674 /** 2675 * {@inheritDoc} 2676 */ 2677 @Override 2678 public void registerChildren(ConfigurationManager manager) { 2679 // No children to register 2680 } 2681 2682 /** 2683 * {@inheritDoc} 2684 */ 2685 @Override 2686 public void deregisterChildren(ConfigurationManager manager) { 2687 // No children to deregister 2688 } 2689 2690 @Override 2691 public double getCompactionPressure() { 2692 return storeEngine.getStoreFileManager().getCompactionPressure(); 2693 } 2694 2695 @Override 2696 public boolean isPrimaryReplicaStore() { 2697 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2698 } 2699 2700 /** 2701 * Sets the store up for a region level snapshot operation. 2702 * @see #postSnapshotOperation() 2703 */ 2704 public void preSnapshotOperation() { 2705 archiveLock.lock(); 2706 } 2707 2708 /** 2709 * Perform tasks needed after the completion of snapshot operation. 2710 * @see #preSnapshotOperation() 2711 */ 2712 public void postSnapshotOperation() { 2713 archiveLock.unlock(); 2714 } 2715 2716 /** 2717 * Closes and archives the compacted files under this store 2718 */ 2719 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2720 // ensure other threads do not attempt to archive the same files on close() 2721 archiveLock.lock(); 2722 try { 2723 lock.readLock().lock(); 2724 Collection<HStoreFile> copyCompactedfiles = null; 2725 try { 2726 Collection<HStoreFile> compactedfiles = 2727 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2728 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2729 // Do a copy under read lock 2730 copyCompactedfiles = new ArrayList<>(compactedfiles); 2731 } else { 2732 LOG.trace("No compacted files to archive"); 2733 } 2734 } finally { 2735 lock.readLock().unlock(); 2736 } 2737 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2738 removeCompactedfiles(copyCompactedfiles, true); 2739 } 2740 } finally { 2741 archiveLock.unlock(); 2742 } 2743 } 2744 2745 /** 2746 * Archives and removes the compacted files 2747 * @param compactedfiles The compacted files in this store that are not active in reads 2748 * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is 2749 * closed, false if not 2750 */ 2751 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose) 2752 throws IOException { 2753 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2754 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2755 for (final HStoreFile file : compactedfiles) { 2756 synchronized (file) { 2757 try { 2758 StoreFileReader r = file.getReader(); 2759 if (r == null) { 2760 LOG.debug("The file {} was closed but still not archived", file); 2761 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2762 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2763 // for that. 2764 long length = getStoreFileSize(file); 2765 filesToRemove.add(file); 2766 storeFileSizes.add(length); 2767 continue; 2768 } 2769 2770 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2771 // Even if deleting fails we need not bother as any new scanners won't be 2772 // able to use the compacted file as the status is already compactedAway 2773 LOG.trace("Closing and archiving the file {}", file); 2774 // Copy the file size before closing the reader 2775 final long length = r.length(); 2776 r.close(evictOnClose); 2777 // Just close and return 2778 filesToRemove.add(file); 2779 // Only add the length if we successfully added the file to `filesToRemove` 2780 storeFileSizes.add(length); 2781 } else { 2782 LOG.info("Can't archive compacted file " + file.getPath() 2783 + " because of either isCompactedAway=" + file.isCompactedAway() 2784 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2785 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2786 } 2787 } catch (Exception e) { 2788 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2789 e); 2790 } 2791 } 2792 } 2793 if (this.isPrimaryReplicaStore()) { 2794 // Only the primary region is allowed to move the file to archive. 2795 // The secondary region does not move the files to archive. Any active reads from 2796 // the secondary region will still work because the file as such has active readers on it. 2797 if (!filesToRemove.isEmpty()) { 2798 LOG.debug("Moving the files {} to archive", filesToRemove); 2799 // Only if this is successful it has to be removed 2800 try { 2801 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), 2802 filesToRemove); 2803 } catch (FailedArchiveException fae) { 2804 // Even if archiving some files failed, we still need to clear out any of the 2805 // files which were successfully archived. Otherwise we will receive a 2806 // FileNotFoundException when we attempt to re-archive them in the next go around. 2807 Collection<Path> failedFiles = fae.getFailedFiles(); 2808 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2809 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2810 while (iter.hasNext()) { 2811 sizeIter.next(); 2812 if (failedFiles.contains(iter.next().getPath())) { 2813 iter.remove(); 2814 sizeIter.remove(); 2815 } 2816 } 2817 if (!filesToRemove.isEmpty()) { 2818 clearCompactedfiles(filesToRemove); 2819 } 2820 throw fae; 2821 } 2822 } 2823 } 2824 if (!filesToRemove.isEmpty()) { 2825 // Clear the compactedfiles from the store file manager 2826 clearCompactedfiles(filesToRemove); 2827 // Try to send report of this archival to the Master for updating quota usage faster 2828 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2829 } 2830 } 2831 2832 /** 2833 * Computes the length of a store file without succumbing to any errors along the way. If an 2834 * error is encountered, the implementation returns {@code 0} instead of the actual size. 2835 * 2836 * @param file The file to compute the size of. 2837 * @return The size in bytes of the provided {@code file}. 2838 */ 2839 long getStoreFileSize(HStoreFile file) { 2840 long length = 0; 2841 try { 2842 file.initReader(); 2843 length = file.getReader().length(); 2844 } catch (IOException e) { 2845 LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring", 2846 file, e); 2847 } finally { 2848 try { 2849 file.closeStoreFile( 2850 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2851 } catch (IOException e) { 2852 LOG.trace("Failed to close reader after computing store file size for {}, ignoring", 2853 file, e); 2854 } 2855 } 2856 return length; 2857 } 2858 2859 public Long preFlushSeqIDEstimation() { 2860 return memstore.preFlushSeqIDEstimation(); 2861 } 2862 2863 @Override 2864 public boolean isSloppyMemStore() { 2865 return this.memstore.isSloppy(); 2866 } 2867 2868 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2869 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2870 try { 2871 lock.writeLock().lock(); 2872 this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); 2873 } finally { 2874 lock.writeLock().unlock(); 2875 } 2876 } 2877 2878 @Override 2879 public int getCurrentParallelPutCount() { 2880 return currentParallelPutCount.get(); 2881 } 2882 2883 public int getStoreRefCount() { 2884 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2885 .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) 2886 .mapToInt(HStoreFile::getRefCount).sum(); 2887 } 2888 2889 /** 2890 * @return get maximum ref count of storeFile among all compacted HStore Files for the HStore 2891 */ 2892 public int getMaxCompactedStoreFileRefCount() { 2893 OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() 2894 .getCompactedfiles() 2895 .stream() 2896 .filter(sf -> sf.getReader() != null) 2897 .filter(HStoreFile::isHFile) 2898 .mapToInt(HStoreFile::getRefCount) 2899 .max(); 2900 return maxCompactedStoreFileRefCount.isPresent() 2901 ? maxCompactedStoreFileRefCount.getAsInt() : 0; 2902 } 2903 2904 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2905 // Sanity check from the caller 2906 if (archivedFiles.size() != fileSizes.size()) { 2907 throw new RuntimeException("Coding error: should never see lists of varying size"); 2908 } 2909 RegionServerServices rss = this.region.getRegionServerServices(); 2910 if (rss == null) { 2911 return; 2912 } 2913 List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2914 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2915 for (StoreFile storeFile : archivedFiles) { 2916 final long fileSize = fileSizeIter.next(); 2917 if (storeFile.isHFile() && fileSize != 0) { 2918 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2919 } 2920 } 2921 if (LOG.isTraceEnabled()) { 2922 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2923 + filesWithSizes); 2924 } 2925 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2926 if (!success) { 2927 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2928 } 2929 } 2930 @Override 2931 public long getMemstoreOnlyRowReadsCount() { 2932 return memstoreOnlyRowReadsCount.sum(); 2933 } 2934 2935 @Override 2936 public long getMixedRowReadsCount() { 2937 return mixedRowReadsCount.sum(); 2938 } 2939 2940 void updateMetricsStore(boolean memstoreRead) { 2941 if (memstoreRead) { 2942 memstoreOnlyRowReadsCount.increment(); 2943 } else { 2944 mixedRowReadsCount.increment(); 2945 } 2946 } 2947}