001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalLong; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.concurrent.CompletionService; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorCompletionService; 043import java.util.concurrent.Future; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.locks.ReentrantLock; 049import java.util.concurrent.locks.ReentrantReadWriteLock; 050import java.util.function.Predicate; 051import java.util.function.ToLongFunction; 052import java.util.stream.Collectors; 053import java.util.stream.LongStream; 054 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsAction; 059import org.apache.hadoop.hbase.Cell; 060import org.apache.hadoop.hbase.CellComparator; 061import org.apache.hadoop.hbase.CellUtil; 062import org.apache.hadoop.hbase.CompoundConfiguration; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.MemoryCompactionPolicy; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.backup.FailedArchiveException; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.conf.ConfigurationManager; 071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 072import org.apache.hadoop.hbase.io.HeapSize; 073import org.apache.hadoop.hbase.io.compress.Compression; 074import org.apache.hadoop.hbase.io.crypto.Encryption; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.HFile; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 083import org.apache.hadoop.hbase.log.HBaseMarkers; 084import org.apache.hadoop.hbase.monitoring.MonitoredTask; 085import org.apache.hadoop.hbase.quotas.RegionSizeStore; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 090import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 091import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 092import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 093import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 094import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 095import org.apache.hadoop.hbase.security.EncryptionUtil; 096import org.apache.hadoop.hbase.security.User; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.ChecksumType; 099import org.apache.hadoop.hbase.util.ClassSize; 100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.ReflectionUtils; 103import org.apache.hadoop.util.StringUtils; 104import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 105import org.apache.yetus.audience.InterfaceAudience; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 109import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 110import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 112import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 113import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 114import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 119 120/** 121 * A Store holds a column family in a Region. Its a memstore and a set of zero 122 * or more StoreFiles, which stretch backwards over time. 123 * 124 * <p>There's no reason to consider append-logging at this level; all logging 125 * and locking is handled at the HRegion level. Store just provides 126 * services to manage sets of StoreFiles. One of the most important of those 127 * services is compaction services where files are aggregated once they pass 128 * a configurable threshold. 129 * 130 * <p>Locking and transactions are handled at a higher level. This API should 131 * not be called directly but by an HRegion manager. 132 */ 133@InterfaceAudience.Private 134public class HStore implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 135 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 136 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 137 "hbase.server.compactchecker.interval.multiplier"; 138 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 139 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 140 // keep in accordance with HDFS default storage policy 141 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; 142 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 143 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 144 145 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 146 147 protected final MemStore memstore; 148 // This stores directory in the filesystem. 149 protected final HRegion region; 150 private final ColumnFamilyDescriptor family; 151 private final HRegionFileSystem fs; 152 protected Configuration conf; 153 protected CacheConfig cacheConf; 154 private long lastCompactSize = 0; 155 volatile boolean forceMajor = false; 156 /* how many bytes to write between status checks */ 157 static int closeCheckInterval = 0; 158 private AtomicLong storeSize = new AtomicLong(); 159 private AtomicLong totalUncompressedBytes = new AtomicLong(); 160 161 /** 162 * RWLock for store operations. 163 * Locked in shared mode when the list of component stores is looked at: 164 * - all reads/writes to table data 165 * - checking for split 166 * Locked in exclusive mode when the list of component stores is modified: 167 * - closing 168 * - completing a compaction 169 */ 170 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 171 /** 172 * Lock specific to archiving compacted store files. This avoids races around 173 * the combination of retrieving the list of compacted files and moving them to 174 * the archive directory. Since this is usually a background process (other than 175 * on close), we don't want to handle this with the store write lock, which would 176 * block readers and degrade performance. 177 * 178 * Locked by: 179 * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() 180 * - close() 181 */ 182 final ReentrantLock archiveLock = new ReentrantLock(); 183 184 private final boolean verifyBulkLoads; 185 186 /** 187 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 188 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will 189 * log a message that identifies the Store experience this high-level of concurrency. 190 */ 191 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 192 private final int parallelPutCountPrintThreshold; 193 194 private ScanInfo scanInfo; 195 196 // All access must be synchronized. 197 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 198 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 199 200 // All access must be synchronized. 201 private final Set<ChangedReadersObserver> changedReaderObservers = 202 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 203 204 protected final int blocksize; 205 private HFileDataBlockEncoder dataBlockEncoder; 206 207 /** Checksum configuration */ 208 protected ChecksumType checksumType; 209 protected int bytesPerChecksum; 210 211 // Comparing KeyValues 212 protected final CellComparator comparator; 213 214 final StoreEngine<?, ?, ?, ?> storeEngine; 215 216 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 217 private volatile OffPeakHours offPeakHours; 218 219 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 220 private int flushRetriesNumber; 221 private int pauseTime; 222 223 private long blockingFileCount; 224 private int compactionCheckMultiplier; 225 protected Encryption.Context cryptoContext = Encryption.Context.NONE; 226 227 private AtomicLong flushedCellsCount = new AtomicLong(); 228 private AtomicLong compactedCellsCount = new AtomicLong(); 229 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 230 private AtomicLong flushedCellsSize = new AtomicLong(); 231 private AtomicLong flushedOutputFileSize = new AtomicLong(); 232 private AtomicLong compactedCellsSize = new AtomicLong(); 233 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 234 235 /** 236 * Constructor 237 * @param region 238 * @param family HColumnDescriptor for this column 239 * @param confParam configuration object 240 * failed. Can be null. 241 * @throws IOException 242 */ 243 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 244 final Configuration confParam, boolean warmup) throws IOException { 245 246 this.fs = region.getRegionFileSystem(); 247 248 // Assemble the store's home directory and Ensure it exists. 249 fs.createStoreDir(family.getNameAsString()); 250 this.region = region; 251 this.family = family; 252 // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor 253 // CompoundConfiguration will look for keys in reverse order of addition, so we'd 254 // add global config first, then table and cf overrides, then cf metadata. 255 this.conf = new CompoundConfiguration() 256 .add(confParam) 257 .addBytesMap(region.getTableDescriptor().getValues()) 258 .addStringMap(family.getConfiguration()) 259 .addBytesMap(family.getValues()); 260 this.blocksize = family.getBlocksize(); 261 262 // set block storage policy for store directory 263 String policyName = family.getStoragePolicy(); 264 if (null == policyName) { 265 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 266 } 267 this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); 268 269 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 270 271 this.comparator = region.getCellComparator(); 272 // used by ScanQueryMatcher 273 long timeToPurgeDeletes = 274 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 275 LOG.trace("Time to purge deletes set to {}ms in store {}", timeToPurgeDeletes, this); 276 // Get TTL 277 long ttl = determineTTLFromFamily(family); 278 // Why not just pass a HColumnDescriptor in here altogether? Even if have 279 // to clone it? 280 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); 281 this.memstore = getMemstore(); 282 283 this.offPeakHours = OffPeakHours.getInstance(conf); 284 285 // Setting up cache configuration for this family 286 createCacheConf(family); 287 288 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 289 290 this.blockingFileCount = 291 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 292 this.compactionCheckMultiplier = conf.getInt( 293 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 294 if (this.compactionCheckMultiplier <= 0) { 295 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 296 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 297 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 298 } 299 300 if (HStore.closeCheckInterval == 0) { 301 HStore.closeCheckInterval = conf.getInt( 302 "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); 303 } 304 305 this.storeEngine = createStoreEngine(this, this.conf, this.comparator); 306 List<HStoreFile> hStoreFiles = loadStoreFiles(warmup); 307 // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read 308 // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and 309 // update the storeSize in the completeCompaction(..) finally (just like compaction) , so 310 // no need calculate the storeSize twice. 311 this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); 312 this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); 313 this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); 314 315 // Initialize checksum type from name. The names are CRC32, CRC32C, etc. 316 this.checksumType = getChecksumType(conf); 317 // Initialize bytes per checksum 318 this.bytesPerChecksum = getBytesPerChecksum(conf); 319 flushRetriesNumber = conf.getInt( 320 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 321 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 322 if (flushRetriesNumber <= 0) { 323 throw new IllegalArgumentException( 324 "hbase.hstore.flush.retries.number must be > 0, not " 325 + flushRetriesNumber); 326 } 327 cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); 328 329 int confPrintThreshold = 330 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 331 if (confPrintThreshold < 10) { 332 confPrintThreshold = 10; 333 } 334 this.parallelPutCountPrintThreshold = confPrintThreshold; 335 LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 336 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 337 getColumnFamilyName(), memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 338 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), 339 family.getCompressionType()); 340 } 341 342 /** 343 * @return MemStore Instance to use in this store. 344 */ 345 private MemStore getMemstore() { 346 MemStore ms = null; 347 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 348 MemoryCompactionPolicy inMemoryCompaction = null; 349 if (this.getTableName().isSystemTable()) { 350 inMemoryCompaction = MemoryCompactionPolicy 351 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 352 } else { 353 inMemoryCompaction = family.getInMemoryCompaction(); 354 } 355 if (inMemoryCompaction == null) { 356 inMemoryCompaction = 357 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 358 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase()); 359 } 360 switch (inMemoryCompaction) { 361 case NONE: 362 ms = ReflectionUtils.newInstance(DefaultMemStore.class, 363 new Object[] { conf, this.comparator, 364 this.getHRegion().getRegionServicesForStores()}); 365 break; 366 default: 367 Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME, 368 CompactingMemStore.class, CompactingMemStore.class); 369 ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this, 370 this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); 371 } 372 return ms; 373 } 374 375 /** 376 * Creates the cache config. 377 * @param family The current column family. 378 */ 379 protected void createCacheConf(final ColumnFamilyDescriptor family) { 380 this.cacheConf = new CacheConfig(conf, family, region.getBlockCache()); 381 } 382 383 /** 384 * Creates the store engine configured for the given Store. 385 * @param store The store. An unfortunate dependency needed due to it 386 * being passed to coprocessors via the compactor. 387 * @param conf Store configuration. 388 * @param kvComparator KVComparator for storeFileManager. 389 * @return StoreEngine to use. 390 */ 391 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 392 CellComparator kvComparator) throws IOException { 393 return StoreEngine.create(store, conf, comparator); 394 } 395 396 /** 397 * @param family 398 * @return TTL in seconds of the specified family 399 */ 400 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 401 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 402 long ttl = family.getTimeToLive(); 403 if (ttl == HConstants.FOREVER) { 404 // Default is unlimited ttl. 405 ttl = Long.MAX_VALUE; 406 } else if (ttl == -1) { 407 ttl = Long.MAX_VALUE; 408 } else { 409 // Second -> ms adjust for user data 410 ttl *= 1000; 411 } 412 return ttl; 413 } 414 415 @Override 416 public String getColumnFamilyName() { 417 return this.family.getNameAsString(); 418 } 419 420 @Override 421 public TableName getTableName() { 422 return this.getRegionInfo().getTable(); 423 } 424 425 @Override 426 public FileSystem getFileSystem() { 427 return this.fs.getFileSystem(); 428 } 429 430 public HRegionFileSystem getRegionFileSystem() { 431 return this.fs; 432 } 433 434 /* Implementation of StoreConfigInformation */ 435 @Override 436 public long getStoreFileTtl() { 437 // TTL only applies if there's no MIN_VERSIONs setting on the column. 438 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 439 } 440 441 @Override 442 public long getMemStoreFlushSize() { 443 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 444 return this.region.memstoreFlushSize; 445 } 446 447 @Override 448 public MemStoreSize getFlushableSize() { 449 return this.memstore.getFlushableSize(); 450 } 451 452 @Override 453 public MemStoreSize getSnapshotSize() { 454 return this.memstore.getSnapshotSize(); 455 } 456 457 @Override 458 public long getCompactionCheckMultiplier() { 459 return this.compactionCheckMultiplier; 460 } 461 462 @Override 463 public long getBlockingFileCount() { 464 return blockingFileCount; 465 } 466 /* End implementation of StoreConfigInformation */ 467 468 /** 469 * Returns the configured bytesPerChecksum value. 470 * @param conf The configuration 471 * @return The bytesPerChecksum that is set in the configuration 472 */ 473 public static int getBytesPerChecksum(Configuration conf) { 474 return conf.getInt(HConstants.BYTES_PER_CHECKSUM, 475 HFile.DEFAULT_BYTES_PER_CHECKSUM); 476 } 477 478 /** 479 * Returns the configured checksum algorithm. 480 * @param conf The configuration 481 * @return The checksum algorithm that is set in the configuration 482 */ 483 public static ChecksumType getChecksumType(Configuration conf) { 484 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); 485 if (checksumName == null) { 486 return ChecksumType.getDefaultChecksumType(); 487 } else { 488 return ChecksumType.nameToType(checksumName); 489 } 490 } 491 492 /** 493 * @return how many bytes to write between status checks 494 */ 495 public static int getCloseCheckInterval() { 496 return closeCheckInterval; 497 } 498 499 @Override 500 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 501 return this.family; 502 } 503 504 @Override 505 public OptionalLong getMaxSequenceId() { 506 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 507 } 508 509 @Override 510 public OptionalLong getMaxMemStoreTS() { 511 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 512 } 513 514 /** 515 * @param tabledir {@link Path} to where the table is being stored 516 * @param hri {@link RegionInfo} for the region. 517 * @param family {@link ColumnFamilyDescriptor} describing the column family 518 * @return Path to family/Store home directory. 519 */ 520 @Deprecated 521 public static Path getStoreHomedir(final Path tabledir, 522 final RegionInfo hri, final byte[] family) { 523 return getStoreHomedir(tabledir, hri.getEncodedName(), family); 524 } 525 526 /** 527 * @param tabledir {@link Path} to where the table is being stored 528 * @param encodedName Encoded region name. 529 * @param family {@link ColumnFamilyDescriptor} describing the column family 530 * @return Path to family/Store home directory. 531 */ 532 @Deprecated 533 public static Path getStoreHomedir(final Path tabledir, 534 final String encodedName, final byte[] family) { 535 return new Path(tabledir, new Path(encodedName, Bytes.toString(family))); 536 } 537 538 /** 539 * @return the data block encoder 540 */ 541 public HFileDataBlockEncoder getDataBlockEncoder() { 542 return dataBlockEncoder; 543 } 544 545 /** 546 * Should be used only in tests. 547 * @param blockEncoder the block delta encoder to use 548 */ 549 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 550 this.dataBlockEncoder = blockEncoder; 551 } 552 553 /** 554 * Creates an unsorted list of StoreFile loaded in parallel 555 * from the given directory. 556 * @throws IOException 557 */ 558 private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException { 559 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); 560 return openStoreFiles(files, warmup); 561 } 562 563 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 564 throws IOException { 565 if (CollectionUtils.isEmpty(files)) { 566 return Collections.emptyList(); 567 } 568 // initialize the thread pool for opening store files in parallel.. 569 ThreadPoolExecutor storeFileOpenerThreadPool = 570 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + 571 this.getColumnFamilyName()); 572 CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool); 573 574 int totalValidStoreFile = 0; 575 for (StoreFileInfo storeFileInfo : files) { 576 // open each store file in parallel 577 completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); 578 totalValidStoreFile++; 579 } 580 581 Set<String> compactedStoreFiles = new HashSet<>(); 582 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 583 IOException ioe = null; 584 try { 585 for (int i = 0; i < totalValidStoreFile; i++) { 586 try { 587 HStoreFile storeFile = completionService.take().get(); 588 if (storeFile != null) { 589 LOG.debug("loaded {}", storeFile); 590 results.add(storeFile); 591 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 592 } 593 } catch (InterruptedException e) { 594 if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); 595 } catch (ExecutionException e) { 596 if (ioe == null) ioe = new IOException(e.getCause()); 597 } 598 } 599 } finally { 600 storeFileOpenerThreadPool.shutdownNow(); 601 } 602 if (ioe != null) { 603 // close StoreFile readers 604 boolean evictOnClose = 605 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 606 for (HStoreFile file : results) { 607 try { 608 if (file != null) { 609 file.closeStoreFile(evictOnClose); 610 } 611 } catch (IOException e) { 612 LOG.warn("Could not close store file", e); 613 } 614 } 615 throw ioe; 616 } 617 618 // Should not archive the compacted store files when region warmup. See HBASE-22163. 619 if (!warmup) { 620 // Remove the compacted files from result 621 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 622 for (HStoreFile storeFile : results) { 623 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 624 LOG.warn("Clearing the compacted storefile {} from this store", storeFile); 625 storeFile.getReader().close(true); 626 filesToRemove.add(storeFile); 627 } 628 } 629 results.removeAll(filesToRemove); 630 if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) { 631 LOG.debug("Moving the files {} to archive", filesToRemove); 632 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 633 } 634 } 635 636 return results; 637 } 638 639 @Override 640 public void refreshStoreFiles() throws IOException { 641 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName()); 642 refreshStoreFilesInternal(newFiles); 643 } 644 645 /** 646 * Replaces the store files that the store has with the given files. Mainly used by secondary 647 * region replicas to keep up to date with the primary region files. 648 * @throws IOException 649 */ 650 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 651 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 652 for (String file : newFiles) { 653 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); 654 } 655 refreshStoreFilesInternal(storeFiles); 656 } 657 658 /** 659 * Checks the underlying store files, and opens the files that have not 660 * been opened, and removes the store file readers for store files no longer 661 * available. Mainly used by secondary region replicas to keep up to date with 662 * the primary region files. 663 * @throws IOException 664 */ 665 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 666 StoreFileManager sfm = storeEngine.getStoreFileManager(); 667 Collection<HStoreFile> currentFiles = sfm.getStorefiles(); 668 Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); 669 if (currentFiles == null) currentFiles = Collections.emptySet(); 670 if (newFiles == null) newFiles = Collections.emptySet(); 671 if (compactedFiles == null) compactedFiles = Collections.emptySet(); 672 673 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 674 for (HStoreFile sf : currentFiles) { 675 currentFilesSet.put(sf.getFileInfo(), sf); 676 } 677 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 678 for (HStoreFile sf : compactedFiles) { 679 compactedFilesSet.put(sf.getFileInfo(), sf); 680 } 681 682 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 683 // Exclude the files that have already been compacted 684 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 685 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 686 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 687 688 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 689 return; 690 } 691 692 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString() 693 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); 694 695 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 696 for (StoreFileInfo sfi : toBeRemovedFiles) { 697 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 698 } 699 700 // try to open the files 701 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 702 703 // propogate the file changes to the underlying store file manager 704 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception 705 706 // Advance the memstore read point to be at least the new store files seqIds so that 707 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 708 // in-flight transactions might be made visible) 709 if (!toBeAddedFiles.isEmpty()) { 710 // we must have the max sequence id here as we do have several store files 711 region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); 712 } 713 714 completeCompaction(toBeRemovedStoreFiles); 715 } 716 717 @VisibleForTesting 718 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { 719 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); 720 return createStoreFileAndReader(info); 721 } 722 723 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 724 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); 725 HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, 726 this.family.getBloomFilterType(), isPrimaryReplicaStore()); 727 storeFile.initReader(); 728 return storeFile; 729 } 730 731 /** 732 * This message intends to inform the MemStore that next coming updates 733 * are going to be part of the replaying edits from WAL 734 */ 735 public void startReplayingFromWAL(){ 736 this.memstore.startReplayingFromWAL(); 737 } 738 739 /** 740 * This message intends to inform the MemStore that the replaying edits from WAL 741 * are done 742 */ 743 public void stopReplayingFromWAL(){ 744 this.memstore.stopReplayingFromWAL(); 745 } 746 747 /** 748 * Adds a value to the memstore 749 */ 750 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 751 lock.readLock().lock(); 752 try { 753 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 754 LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " + 755 "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName()); 756 } 757 this.memstore.add(cell, memstoreSizing); 758 } finally { 759 lock.readLock().unlock(); 760 currentParallelPutCount.decrementAndGet(); 761 } 762 } 763 764 /** 765 * Adds the specified value to the memstore 766 */ 767 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 768 lock.readLock().lock(); 769 try { 770 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 771 LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " + 772 "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName()); 773 } 774 memstore.add(cells, memstoreSizing); 775 } finally { 776 lock.readLock().unlock(); 777 currentParallelPutCount.decrementAndGet(); 778 } 779 } 780 781 @Override 782 public long timeOfOldestEdit() { 783 return memstore.timeOfOldestEdit(); 784 } 785 786 /** 787 * @return All store files. 788 */ 789 @Override 790 public Collection<HStoreFile> getStorefiles() { 791 return this.storeEngine.getStoreFileManager().getStorefiles(); 792 } 793 794 @Override 795 public Collection<HStoreFile> getCompactedFiles() { 796 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 797 } 798 799 /** 800 * This throws a WrongRegionException if the HFile does not fit in this region, or an 801 * InvalidHFileException if the HFile is not valid. 802 */ 803 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 804 HFile.Reader reader = null; 805 try { 806 LOG.info("Validating hfile at " + srcPath + " for inclusion in " 807 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); 808 FileSystem srcFs = srcPath.getFileSystem(conf); 809 srcFs.access(srcPath, FsAction.READ_WRITE); 810 reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf); 811 reader.loadFileInfo(); 812 813 Optional<byte[]> firstKey = reader.getFirstRowKey(); 814 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 815 Optional<Cell> lk = reader.getLastKey(); 816 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 817 byte[] lastKey = CellUtil.cloneRow(lk.get()); 818 819 if (LOG.isDebugEnabled()) { 820 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + 821 " last=" + Bytes.toStringBinary(lastKey)); 822 LOG.debug("Region bounds: first=" + 823 Bytes.toStringBinary(getRegionInfo().getStartKey()) + 824 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 825 } 826 827 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 828 throw new WrongRegionException( 829 "Bulk load file " + srcPath.toString() + " does not fit inside region " 830 + this.getRegionInfo().getRegionNameAsString()); 831 } 832 833 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, 834 HConstants.DEFAULT_MAX_FILE_SIZE)) { 835 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + 836 reader.length() + " bytes can be problematic as it may lead to oversplitting."); 837 } 838 839 if (verifyBulkLoads) { 840 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 841 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 842 Cell prevCell = null; 843 HFileScanner scanner = reader.getScanner(false, false, false); 844 scanner.seekTo(); 845 do { 846 Cell cell = scanner.getCell(); 847 if (prevCell != null) { 848 if (comparator.compareRows(prevCell, cell) > 0) { 849 throw new InvalidHFileException("Previous row is greater than" 850 + " current row: path=" + srcPath + " previous=" 851 + CellUtil.getCellKeyAsString(prevCell) + " current=" 852 + CellUtil.getCellKeyAsString(cell)); 853 } 854 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 855 throw new InvalidHFileException("Previous key had different" 856 + " family compared to current key: path=" + srcPath 857 + " previous=" 858 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 859 prevCell.getFamilyLength()) 860 + " current=" 861 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 862 cell.getFamilyLength())); 863 } 864 } 865 prevCell = cell; 866 } while (scanner.next()); 867 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() 868 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) 869 + " ms"); 870 } 871 } finally { 872 if (reader != null) reader.close(); 873 } 874 } 875 876 /** 877 * This method should only be called from Region. It is assumed that the ranges of values in the 878 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 879 * 880 * @param srcPathStr 881 * @param seqNum sequence Id associated with the HFile 882 */ 883 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 884 Path srcPath = new Path(srcPathStr); 885 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 886 } 887 888 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 889 Path srcPath = new Path(srcPathStr); 890 try { 891 fs.commitStoreFile(srcPath, dstPath); 892 } finally { 893 if (this.getCoprocessorHost() != null) { 894 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 895 } 896 } 897 898 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " 899 + dstPath + " - updating store file list."); 900 901 HStoreFile sf = createStoreFileAndReader(dstPath); 902 bulkLoadHFile(sf); 903 904 LOG.info("Successfully loaded store file {} into store {} (new location: {})", 905 srcPath, this, dstPath); 906 907 return dstPath; 908 } 909 910 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 911 HStoreFile sf = createStoreFileAndReader(fileInfo); 912 bulkLoadHFile(sf); 913 } 914 915 private void bulkLoadHFile(HStoreFile sf) throws IOException { 916 StoreFileReader r = sf.getReader(); 917 this.storeSize.addAndGet(r.length()); 918 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 919 920 // Append the new storefile into the list 921 this.lock.writeLock().lock(); 922 try { 923 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 924 } finally { 925 // We need the lock, as long as we are updating the storeFiles 926 // or changing the memstore. Let us release it before calling 927 // notifyChangeReadersObservers. See HBASE-4485 for a possible 928 // deadlock scenario that could have happened if continue to hold 929 // the lock. 930 this.lock.writeLock().unlock(); 931 } 932 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); 933 if (LOG.isTraceEnabled()) { 934 String traceMessage = "BULK LOAD time,size,store size,store files [" 935 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize 936 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 937 LOG.trace(traceMessage); 938 } 939 } 940 941 /** 942 * Close all the readers We don't need to worry about subsequent requests because the Region holds 943 * a write lock that will prevent any more reads or writes. 944 * @return the {@link StoreFile StoreFiles} that were previously being used. 945 * @throws IOException on failure 946 */ 947 public ImmutableCollection<HStoreFile> close() throws IOException { 948 this.archiveLock.lock(); 949 this.lock.writeLock().lock(); 950 try { 951 // Clear so metrics doesn't find them. 952 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 953 Collection<HStoreFile> compactedfiles = 954 storeEngine.getStoreFileManager().clearCompactedFiles(); 955 // clear the compacted files 956 if (CollectionUtils.isNotEmpty(compactedfiles)) { 957 removeCompactedfiles(compactedfiles); 958 } 959 if (!result.isEmpty()) { 960 // initialize the thread pool for closing store files in parallel. 961 ThreadPoolExecutor storeFileCloserThreadPool = this.region 962 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" 963 + this.getColumnFamilyName()); 964 965 // close each store file in parallel 966 CompletionService<Void> completionService = 967 new ExecutorCompletionService<>(storeFileCloserThreadPool); 968 for (HStoreFile f : result) { 969 completionService.submit(new Callable<Void>() { 970 @Override 971 public Void call() throws IOException { 972 boolean evictOnClose = 973 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 974 f.closeStoreFile(evictOnClose); 975 return null; 976 } 977 }); 978 } 979 980 IOException ioe = null; 981 try { 982 for (int i = 0; i < result.size(); i++) { 983 try { 984 Future<Void> future = completionService.take(); 985 future.get(); 986 } catch (InterruptedException e) { 987 if (ioe == null) { 988 ioe = new InterruptedIOException(); 989 ioe.initCause(e); 990 } 991 } catch (ExecutionException e) { 992 if (ioe == null) ioe = new IOException(e.getCause()); 993 } 994 } 995 } finally { 996 storeFileCloserThreadPool.shutdownNow(); 997 } 998 if (ioe != null) throw ioe; 999 } 1000 LOG.trace("Closed {}", this); 1001 return result; 1002 } finally { 1003 this.lock.writeLock().unlock(); 1004 this.archiveLock.unlock(); 1005 } 1006 } 1007 1008 /** 1009 * Snapshot this stores memstore. Call before running 1010 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, 1011 * FlushLifeCycleTracker)} 1012 * so it has some work to do. 1013 */ 1014 void snapshot() { 1015 this.lock.writeLock().lock(); 1016 try { 1017 this.memstore.snapshot(); 1018 } finally { 1019 this.lock.writeLock().unlock(); 1020 } 1021 } 1022 1023 /** 1024 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. 1025 * @param logCacheFlushId flush sequence number 1026 * @param snapshot 1027 * @param status 1028 * @param throughputController 1029 * @return The path name of the tmp file to which the store was flushed 1030 * @throws IOException if exception occurs during process 1031 */ 1032 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 1033 MonitoredTask status, ThroughputController throughputController, 1034 FlushLifeCycleTracker tracker) throws IOException { 1035 // If an exception happens flushing, we let it out without clearing 1036 // the memstore snapshot. The old snapshot will be returned when we say 1037 // 'snapshot', the next time flush comes around. 1038 // Retry after catching exception when flushing, otherwise server will abort 1039 // itself 1040 StoreFlusher flusher = storeEngine.getStoreFlusher(); 1041 IOException lastException = null; 1042 for (int i = 0; i < flushRetriesNumber; i++) { 1043 try { 1044 List<Path> pathNames = 1045 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); 1046 Path lastPathName = null; 1047 try { 1048 for (Path pathName : pathNames) { 1049 lastPathName = pathName; 1050 validateStoreFile(pathName); 1051 } 1052 return pathNames; 1053 } catch (Exception e) { 1054 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 1055 if (e instanceof IOException) { 1056 lastException = (IOException) e; 1057 } else { 1058 lastException = new IOException(e); 1059 } 1060 } 1061 } catch (IOException e) { 1062 LOG.warn("Failed flushing store file, retrying num={}", i, e); 1063 lastException = e; 1064 } 1065 if (lastException != null && i < (flushRetriesNumber - 1)) { 1066 try { 1067 Thread.sleep(pauseTime); 1068 } catch (InterruptedException e) { 1069 IOException iie = new InterruptedIOException(); 1070 iie.initCause(e); 1071 throw iie; 1072 } 1073 } 1074 } 1075 throw lastException; 1076 } 1077 1078 /** 1079 * @param path The pathname of the tmp file into which the store was flushed 1080 * @param logCacheFlushId 1081 * @param status 1082 * @return store file created. 1083 * @throws IOException 1084 */ 1085 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) 1086 throws IOException { 1087 // Write-out finished successfully, move into the right spot 1088 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1089 1090 status.setStatus("Flushing " + this + ": reopening flushed file"); 1091 HStoreFile sf = createStoreFileAndReader(dstPath); 1092 1093 StoreFileReader r = sf.getReader(); 1094 this.storeSize.addAndGet(r.length()); 1095 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1096 1097 if (LOG.isInfoEnabled()) { 1098 LOG.info("Added " + sf + ", entries=" + r.getEntries() + 1099 ", sequenceid=" + logCacheFlushId + 1100 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1101 } 1102 return sf; 1103 } 1104 1105 /** 1106 * @param maxKeyCount 1107 * @param compression Compression algorithm to use 1108 * @param isCompaction whether we are creating a new file in a compaction 1109 * @param includeMVCCReadpoint - whether to include MVCC or not 1110 * @param includesTag - includesTag or not 1111 * @return Writer for a new StoreFile in the tmp dir. 1112 */ 1113 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of 1114 // compaction 1115 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1116 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1117 boolean shouldDropBehind) throws IOException { 1118 final CacheConfig writerCacheConf; 1119 if (isCompaction) { 1120 // Don't cache data on write on compactions. 1121 writerCacheConf = new CacheConfig(cacheConf); 1122 writerCacheConf.setCacheDataOnWrite(false); 1123 } else { 1124 writerCacheConf = cacheConf; 1125 } 1126 InetSocketAddress[] favoredNodes = null; 1127 if (region.getRegionServerServices() != null) { 1128 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( 1129 region.getRegionInfo().getEncodedName()); 1130 } 1131 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, 1132 cryptoContext); 1133 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); 1134 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, 1135 this.getFileSystem()) 1136 .withOutputDir(familyTempDir) 1137 .withComparator(comparator) 1138 .withBloomType(family.getBloomFilterType()) 1139 .withMaxKeyCount(maxKeyCount) 1140 .withFavoredNodes(favoredNodes) 1141 .withFileContext(hFileContext) 1142 .withShouldDropCacheBehind(shouldDropBehind) 1143 .withCompactedFilesSupplier(this::getCompactedFiles); 1144 return builder.build(); 1145 } 1146 1147 private HFileContext createFileContext(Compression.Algorithm compression, 1148 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { 1149 if (compression == null) { 1150 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 1151 } 1152 HFileContext hFileContext = new HFileContextBuilder() 1153 .withIncludesMvcc(includeMVCCReadpoint) 1154 .withIncludesTags(includesTag) 1155 .withCompression(compression) 1156 .withCompressTags(family.isCompressTags()) 1157 .withChecksumType(checksumType) 1158 .withBytesPerCheckSum(bytesPerChecksum) 1159 .withBlockSize(blocksize) 1160 .withHBaseCheckSum(true) 1161 .withDataBlockEncoding(family.getDataBlockEncoding()) 1162 .withEncryptionContext(cryptoContext) 1163 .withCreateTime(EnvironmentEdgeManager.currentTime()) 1164 .build(); 1165 return hFileContext; 1166 } 1167 1168 1169 private long getTotalSize(Collection<HStoreFile> sfs) { 1170 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 1171 } 1172 1173 /** 1174 * Change storeFiles adding into place the Reader produced by this new flush. 1175 * @param sfs Store files 1176 * @param snapshotId 1177 * @throws IOException 1178 * @return Whether compaction is required. 1179 */ 1180 private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { 1181 this.lock.writeLock().lock(); 1182 try { 1183 this.storeEngine.getStoreFileManager().insertNewFiles(sfs); 1184 if (snapshotId > 0) { 1185 this.memstore.clearSnapshot(snapshotId); 1186 } 1187 } finally { 1188 // We need the lock, as long as we are updating the storeFiles 1189 // or changing the memstore. Let us release it before calling 1190 // notifyChangeReadersObservers. See HBASE-4485 for a possible 1191 // deadlock scenario that could have happened if continue to hold 1192 // the lock. 1193 this.lock.writeLock().unlock(); 1194 } 1195 // notify to be called here - only in case of flushes 1196 notifyChangedReadersObservers(sfs); 1197 if (LOG.isTraceEnabled()) { 1198 long totalSize = getTotalSize(sfs); 1199 String traceMessage = "FLUSH time,count,size,store size,store files [" 1200 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize 1201 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 1202 LOG.trace(traceMessage); 1203 } 1204 return needsCompaction(); 1205 } 1206 1207 /** 1208 * Notify all observers that set of Readers has changed. 1209 * @throws IOException 1210 */ 1211 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 1212 for (ChangedReadersObserver o : this.changedReaderObservers) { 1213 List<KeyValueScanner> memStoreScanners; 1214 this.lock.readLock().lock(); 1215 try { 1216 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 1217 } finally { 1218 this.lock.readLock().unlock(); 1219 } 1220 o.updateReaders(sfs, memStoreScanners); 1221 } 1222 } 1223 1224 /** 1225 * Get all scanners with no filtering based on TTL (that happens further down the line). 1226 * @param cacheBlocks cache the blocks or not 1227 * @param usePread true to use pread, false if not 1228 * @param isCompaction true if the scanner is created for compaction 1229 * @param matcher the scan query matcher 1230 * @param startRow the start row 1231 * @param stopRow the stop row 1232 * @param readPt the read point of the current scan 1233 * @return all scanners for this store 1234 */ 1235 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 1236 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 1237 throws IOException { 1238 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 1239 readPt); 1240 } 1241 1242 /** 1243 * Get all scanners with no filtering based on TTL (that happens further down the line). 1244 * @param cacheBlocks cache the blocks or not 1245 * @param usePread true to use pread, false if not 1246 * @param isCompaction true if the scanner is created for compaction 1247 * @param matcher the scan query matcher 1248 * @param startRow the start row 1249 * @param includeStartRow true to include start row, false if not 1250 * @param stopRow the stop row 1251 * @param includeStopRow true to include stop row, false if not 1252 * @param readPt the read point of the current scan 1253 * @return all scanners for this store 1254 */ 1255 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 1256 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 1257 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 1258 Collection<HStoreFile> storeFilesToScan; 1259 List<KeyValueScanner> memStoreScanners; 1260 this.lock.readLock().lock(); 1261 try { 1262 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 1263 includeStartRow, stopRow, includeStopRow); 1264 memStoreScanners = this.memstore.getScanners(readPt); 1265 } finally { 1266 this.lock.readLock().unlock(); 1267 } 1268 1269 try { 1270 // First the store file scanners 1271 1272 // TODO this used to get the store files in descending order, 1273 // but now we get them in ascending order, which I think is 1274 // actually more correct, since memstore get put at the end. 1275 List<StoreFileScanner> sfScanners = StoreFileScanner 1276 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, 1277 matcher, readPt); 1278 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1279 scanners.addAll(sfScanners); 1280 // Then the memstore scanners 1281 scanners.addAll(memStoreScanners); 1282 return scanners; 1283 } catch (Throwable t) { 1284 clearAndClose(memStoreScanners); 1285 throw t instanceof IOException ? (IOException) t : new IOException(t); 1286 } 1287 } 1288 1289 private static void clearAndClose(List<KeyValueScanner> scanners) { 1290 if (scanners == null) { 1291 return; 1292 } 1293 for (KeyValueScanner s : scanners) { 1294 s.close(); 1295 } 1296 scanners.clear(); 1297 } 1298 1299 /** 1300 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1301 * (that happens further down the line). 1302 * @param files the list of files on which the scanners has to be created 1303 * @param cacheBlocks cache the blocks or not 1304 * @param usePread true to use pread, false if not 1305 * @param isCompaction true if the scanner is created for compaction 1306 * @param matcher the scan query matcher 1307 * @param startRow the start row 1308 * @param stopRow the stop row 1309 * @param readPt the read point of the current scan 1310 * @param includeMemstoreScanner true if memstore has to be included 1311 * @return scanners on the given files and on the memstore if specified 1312 */ 1313 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1314 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1315 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1316 throws IOException { 1317 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1318 false, readPt, includeMemstoreScanner); 1319 } 1320 1321 /** 1322 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1323 * (that happens further down the line). 1324 * @param files the list of files on which the scanners has to be created 1325 * @param cacheBlocks ache the blocks or not 1326 * @param usePread true to use pread, false if not 1327 * @param isCompaction true if the scanner is created for compaction 1328 * @param matcher the scan query matcher 1329 * @param startRow the start row 1330 * @param includeStartRow true to include start row, false if not 1331 * @param stopRow the stop row 1332 * @param includeStopRow true to include stop row, false if not 1333 * @param readPt the read point of the current scan 1334 * @param includeMemstoreScanner true if memstore has to be included 1335 * @return scanners on the given files and on the memstore if specified 1336 */ 1337 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1338 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1339 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1340 boolean includeMemstoreScanner) throws IOException { 1341 List<KeyValueScanner> memStoreScanners = null; 1342 if (includeMemstoreScanner) { 1343 this.lock.readLock().lock(); 1344 try { 1345 memStoreScanners = this.memstore.getScanners(readPt); 1346 } finally { 1347 this.lock.readLock().unlock(); 1348 } 1349 } 1350 try { 1351 List<StoreFileScanner> sfScanners = StoreFileScanner 1352 .getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher, 1353 readPt); 1354 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1355 scanners.addAll(sfScanners); 1356 // Then the memstore scanners 1357 if (memStoreScanners != null) { 1358 scanners.addAll(memStoreScanners); 1359 } 1360 return scanners; 1361 } catch (Throwable t) { 1362 clearAndClose(memStoreScanners); 1363 throw t instanceof IOException ? (IOException) t : new IOException(t); 1364 } 1365 } 1366 1367 /** 1368 * @param o Observer who wants to know about changes in set of Readers 1369 */ 1370 public void addChangedReaderObserver(ChangedReadersObserver o) { 1371 this.changedReaderObservers.add(o); 1372 } 1373 1374 /** 1375 * @param o Observer no longer interested in changes in set of Readers. 1376 */ 1377 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1378 // We don't check if observer present; it may not be (legitimately) 1379 this.changedReaderObservers.remove(o); 1380 } 1381 1382 ////////////////////////////////////////////////////////////////////////////// 1383 // Compaction 1384 ////////////////////////////////////////////////////////////////////////////// 1385 1386 /** 1387 * Compact the StoreFiles. This method may take some time, so the calling 1388 * thread must be able to block for long periods. 1389 * 1390 * <p>During this time, the Store can work as usual, getting values from 1391 * StoreFiles and writing new StoreFiles from the memstore. 1392 * 1393 * Existing StoreFiles are not destroyed until the new compacted StoreFile is 1394 * completely written-out to disk. 1395 * 1396 * <p>The compactLock prevents multiple simultaneous compactions. 1397 * The structureLock prevents us from interfering with other write operations. 1398 * 1399 * <p>We don't want to hold the structureLock for the whole time, as a compact() 1400 * can be lengthy and we want to allow cache-flushes during this period. 1401 * 1402 * <p> Compaction event should be idempotent, since there is no IO Fencing for 1403 * the region directory in hdfs. A region server might still try to complete the 1404 * compaction after it lost the region. That is why the following events are carefully 1405 * ordered for a compaction: 1406 * 1. Compaction writes new files under region/.tmp directory (compaction output) 1407 * 2. Compaction atomically moves the temporary file under region directory 1408 * 3. Compaction appends a WAL edit containing the compaction input and output files. 1409 * Forces sync on WAL. 1410 * 4. Compaction deletes the input files from the region directory. 1411 * 1412 * Failure conditions are handled like this: 1413 * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes 1414 * the compaction later, it will only write the new data file to the region directory. 1415 * Since we already have this data, this will be idempotent but we will have a redundant 1416 * copy of the data. 1417 * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The 1418 * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. 1419 * - If RS fails after 3, the region region server who opens the region will pick up the 1420 * the compaction marker from the WAL and replay it by removing the compaction input files. 1421 * Failed RS can also attempt to delete those files, but the operation will be idempotent 1422 * 1423 * See HBASE-2231 for details. 1424 * 1425 * @param compaction compaction details obtained from requestCompaction() 1426 * @throws IOException 1427 * @return Storefile we compacted into or null if we failed or opted out early. 1428 */ 1429 public List<HStoreFile> compact(CompactionContext compaction, 1430 ThroughputController throughputController, User user) throws IOException { 1431 assert compaction != null; 1432 List<HStoreFile> sfs = null; 1433 CompactionRequestImpl cr = compaction.getRequest(); 1434 try { 1435 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1436 // because we need to clean up after it on the way out in a finally 1437 // block below 1438 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1439 assert compaction.hasSelection(); 1440 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1441 assert !filesToCompact.isEmpty(); 1442 synchronized (filesCompacting) { 1443 // sanity check: we're compacting files that this store knows about 1444 // TODO: change this to LOG.error() after more debugging 1445 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1446 } 1447 1448 // Ready to go. Have list of files to compact. 1449 LOG.info("Starting compaction of " + filesToCompact + 1450 " into tmpdir=" + fs.getTempDir() + ", totalSize=" + 1451 TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1452 1453 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1454 compaction.compact(throughputController, user)); 1455 } finally { 1456 finishCompactionRequest(cr); 1457 } 1458 } 1459 1460 @VisibleForTesting 1461 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1462 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 1463 List<Path> newFiles) throws IOException { 1464 // Do the steps necessary to complete the compaction. 1465 List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); 1466 writeCompactionWalRecord(filesToCompact, sfs); 1467 replaceStoreFiles(filesToCompact, sfs); 1468 if (cr.isMajor()) { 1469 majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1470 majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1471 } else { 1472 compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1473 compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1474 } 1475 long outputBytes = getTotalSize(sfs); 1476 1477 // At this point the store will use new files for all new scanners. 1478 completeCompaction(filesToCompact); // update store size. 1479 1480 long now = EnvironmentEdgeManager.currentTime(); 1481 if (region.getRegionServerServices() != null 1482 && region.getRegionServerServices().getMetrics() != null) { 1483 region.getRegionServerServices().getMetrics().updateCompaction( 1484 region.getTableDescriptor().getTableName().getNameAsString(), 1485 cr.isMajor(), now - compactionStartTime, cr.getFiles().size(), 1486 newFiles.size(), cr.getSize(), outputBytes); 1487 1488 } 1489 1490 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1491 return sfs; 1492 } 1493 1494 private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles, 1495 User user) throws IOException { 1496 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 1497 for (Path newFile : newFiles) { 1498 assert newFile != null; 1499 HStoreFile sf = moveFileIntoPlace(newFile); 1500 if (this.getCoprocessorHost() != null) { 1501 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1502 } 1503 assert sf != null; 1504 sfs.add(sf); 1505 } 1506 return sfs; 1507 } 1508 1509 // Package-visible for tests 1510 HStoreFile moveFileIntoPlace(Path newFile) throws IOException { 1511 validateStoreFile(newFile); 1512 // Move the file into the right spot 1513 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); 1514 return createStoreFileAndReader(destPath); 1515 } 1516 1517 /** 1518 * Writes the compaction WAL record. 1519 * @param filesCompacted Files compacted (input). 1520 * @param newFiles Files from compaction. 1521 */ 1522 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1523 Collection<HStoreFile> newFiles) throws IOException { 1524 if (region.getWAL() == null) { 1525 return; 1526 } 1527 List<Path> inputPaths = 1528 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1529 List<Path> outputPaths = 1530 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1531 RegionInfo info = this.region.getRegionInfo(); 1532 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1533 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1534 // Fix reaching into Region to get the maxWaitForSeqId. 1535 // Does this method belong in Region altogether given it is making so many references up there? 1536 // Could be Region#writeCompactionMarker(compactionDescriptor); 1537 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1538 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1539 } 1540 1541 @VisibleForTesting 1542 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) 1543 throws IOException { 1544 this.lock.writeLock().lock(); 1545 try { 1546 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); 1547 synchronized (filesCompacting) { 1548 filesCompacting.removeAll(compactedFiles); 1549 } 1550 1551 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1552 // sizes later so it's not super-critical if we miss these. 1553 RegionServerServices rsServices = region.getRegionServerServices(); 1554 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1555 updateSpaceQuotaAfterFileReplacement( 1556 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1557 compactedFiles, result); 1558 } 1559 } finally { 1560 this.lock.writeLock().unlock(); 1561 } 1562 } 1563 1564 /** 1565 * Updates the space quota usage for this region, removing the size for files compacted away 1566 * and adding in the size for new files. 1567 * 1568 * @param sizeStore The object tracking changes in region size for space quotas. 1569 * @param regionInfo The identifier for the region whose size is being updated. 1570 * @param oldFiles Files removed from this store's region. 1571 * @param newFiles Files added to this store's region. 1572 */ 1573 void updateSpaceQuotaAfterFileReplacement( 1574 RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles, 1575 Collection<HStoreFile> newFiles) { 1576 long delta = 0; 1577 if (oldFiles != null) { 1578 for (HStoreFile compactedFile : oldFiles) { 1579 if (compactedFile.isHFile()) { 1580 delta -= compactedFile.getReader().length(); 1581 } 1582 } 1583 } 1584 if (newFiles != null) { 1585 for (HStoreFile newFile : newFiles) { 1586 if (newFile.isHFile()) { 1587 delta += newFile.getReader().length(); 1588 } 1589 } 1590 } 1591 sizeStore.incrementRegionSize(regionInfo, delta); 1592 } 1593 1594 /** 1595 * Log a very elaborate compaction completion message. 1596 * @param cr Request. 1597 * @param sfs Resulting files. 1598 * @param compactionStartTime Start time. 1599 */ 1600 private void logCompactionEndMessage( 1601 CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) { 1602 StringBuilder message = new StringBuilder( 1603 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " 1604 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " 1605 + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1606 if (sfs.isEmpty()) { 1607 message.append("none, "); 1608 } else { 1609 for (HStoreFile sf: sfs) { 1610 message.append(sf.getPath().getName()); 1611 message.append("(size="); 1612 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1613 message.append("), "); 1614 } 1615 } 1616 message.append("total size for store is ") 1617 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1618 .append(". This selection was in queue for ") 1619 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1620 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1621 .append(" to execute."); 1622 LOG.info(message.toString()); 1623 if (LOG.isTraceEnabled()) { 1624 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1625 long resultSize = getTotalSize(sfs); 1626 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1627 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1628 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1629 LOG.trace(traceMessage); 1630 } 1631 } 1632 1633 /** 1634 * Call to complete a compaction. Its for the case where we find in the WAL a compaction 1635 * that was not finished. We could find one recovering a WAL after a regionserver crash. 1636 * See HBASE-2231. 1637 * @param compaction 1638 */ 1639 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1640 boolean removeFiles) throws IOException { 1641 LOG.debug("Completing compaction from the WAL marker"); 1642 List<String> compactionInputs = compaction.getCompactionInputList(); 1643 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1644 1645 // The Compaction Marker is written after the compaction is completed, 1646 // and the files moved into the region/family folder. 1647 // 1648 // If we crash after the entry is written, we may not have removed the 1649 // input files, but the output file is present. 1650 // (The unremoved input files will be removed by this function) 1651 // 1652 // If we scan the directory and the file is not present, it can mean that: 1653 // - The file was manually removed by the user 1654 // - The file was removed as consequence of subsequent compaction 1655 // so, we can't do anything with the "compaction output list" because those 1656 // files have already been loaded when opening the region (by virtue of 1657 // being in the store's folder) or they may be missing due to a compaction. 1658 1659 String familyName = this.getColumnFamilyName(); 1660 Set<String> inputFiles = new HashSet<>(); 1661 for (String compactionInput : compactionInputs) { 1662 Path inputPath = fs.getStoreFilePath(familyName, compactionInput); 1663 inputFiles.add(inputPath.getName()); 1664 } 1665 1666 //some of the input files might already be deleted 1667 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1668 for (HStoreFile sf : this.getStorefiles()) { 1669 if (inputFiles.contains(sf.getPath().getName())) { 1670 inputStoreFiles.add(sf); 1671 } 1672 } 1673 1674 // check whether we need to pick up the new files 1675 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1676 1677 if (pickCompactionFiles) { 1678 for (HStoreFile sf : this.getStorefiles()) { 1679 compactionOutputs.remove(sf.getPath().getName()); 1680 } 1681 for (String compactionOutput : compactionOutputs) { 1682 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1683 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 1684 outputStoreFiles.add(storeFile); 1685 } 1686 } 1687 1688 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1689 LOG.info("Replaying compaction marker, replacing input files: " + 1690 inputStoreFiles + " with output files : " + outputStoreFiles); 1691 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); 1692 this.completeCompaction(inputStoreFiles); 1693 } 1694 } 1695 1696 /** 1697 * This method tries to compact N recent files for testing. 1698 * Note that because compacting "recent" files only makes sense for some policies, 1699 * e.g. the default one, it assumes default policy is used. It doesn't use policy, 1700 * but instead makes a compaction candidate list by itself. 1701 * @param N Number of files. 1702 */ 1703 @VisibleForTesting 1704 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { 1705 List<HStoreFile> filesToCompact; 1706 boolean isMajor; 1707 1708 this.lock.readLock().lock(); 1709 try { 1710 synchronized (filesCompacting) { 1711 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles()); 1712 if (!filesCompacting.isEmpty()) { 1713 // exclude all files older than the newest file we're currently 1714 // compacting. this allows us to preserve contiguity (HBASE-2856) 1715 HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); 1716 int idx = filesToCompact.indexOf(last); 1717 Preconditions.checkArgument(idx != -1); 1718 filesToCompact.subList(0, idx + 1).clear(); 1719 } 1720 int count = filesToCompact.size(); 1721 if (N > count) { 1722 throw new RuntimeException("Not enough files"); 1723 } 1724 1725 filesToCompact = filesToCompact.subList(count - N, count); 1726 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); 1727 filesCompacting.addAll(filesToCompact); 1728 Collections.sort(filesCompacting, storeEngine.getStoreFileManager() 1729 .getStoreFileComparator()); 1730 } 1731 } finally { 1732 this.lock.readLock().unlock(); 1733 } 1734 1735 try { 1736 // Ready to go. Have list of files to compact. 1737 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor()) 1738 .compactForTesting(filesToCompact, isMajor); 1739 for (Path newFile: newFiles) { 1740 // Move the compaction into place. 1741 HStoreFile sf = moveFileIntoPlace(newFile); 1742 if (this.getCoprocessorHost() != null) { 1743 this.getCoprocessorHost().postCompact(this, sf, null, null, null); 1744 } 1745 replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); 1746 completeCompaction(filesToCompact); 1747 } 1748 } finally { 1749 synchronized (filesCompacting) { 1750 filesCompacting.removeAll(filesToCompact); 1751 } 1752 } 1753 } 1754 1755 @Override 1756 public boolean hasReferences() { 1757 // Grab the read lock here, because we need to ensure that: only when the atomic 1758 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1759 this.lock.readLock().lock(); 1760 try { 1761 // Merge the current store files with compacted files here due to HBASE-20940. 1762 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1763 allStoreFiles.addAll(getCompactedFiles()); 1764 return StoreUtils.hasReferences(allStoreFiles); 1765 } finally { 1766 this.lock.readLock().unlock(); 1767 } 1768 } 1769 1770 /** 1771 * getter for CompactionProgress object 1772 * @return CompactionProgress object; can be null 1773 */ 1774 public CompactionProgress getCompactionProgress() { 1775 return this.storeEngine.getCompactor().getProgress(); 1776 } 1777 1778 @Override 1779 public boolean shouldPerformMajorCompaction() throws IOException { 1780 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1781 // TODO: what are these reader checks all over the place? 1782 if (sf.getReader() == null) { 1783 LOG.debug("StoreFile {} has null Reader", sf); 1784 return false; 1785 } 1786 } 1787 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( 1788 this.storeEngine.getStoreFileManager().getStorefiles()); 1789 } 1790 1791 public Optional<CompactionContext> requestCompaction() throws IOException { 1792 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1793 } 1794 1795 public Optional<CompactionContext> requestCompaction(int priority, 1796 CompactionLifeCycleTracker tracker, User user) throws IOException { 1797 // don't even select for compaction if writes are disabled 1798 if (!this.areWritesEnabled()) { 1799 return Optional.empty(); 1800 } 1801 // Before we do compaction, try to get rid of unneeded files to simplify things. 1802 removeUnneededFiles(); 1803 1804 final CompactionContext compaction = storeEngine.createCompaction(); 1805 CompactionRequestImpl request = null; 1806 this.lock.readLock().lock(); 1807 try { 1808 synchronized (filesCompacting) { 1809 // First, see if coprocessor would want to override selection. 1810 if (this.getCoprocessorHost() != null) { 1811 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1812 boolean override = getCoprocessorHost().preCompactSelection(this, 1813 candidatesForCoproc, tracker, user); 1814 if (override) { 1815 // Coprocessor is overriding normal file selection. 1816 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1817 } 1818 } 1819 1820 // Normal case - coprocessor is not overriding file selection. 1821 if (!compaction.hasSelection()) { 1822 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1823 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && 1824 offPeakCompactionTracker.compareAndSet(false, true); 1825 try { 1826 compaction.select(this.filesCompacting, isUserCompaction, 1827 mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); 1828 } catch (IOException e) { 1829 if (mayUseOffPeak) { 1830 offPeakCompactionTracker.set(false); 1831 } 1832 throw e; 1833 } 1834 assert compaction.hasSelection(); 1835 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1836 // Compaction policy doesn't want to take advantage of off-peak. 1837 offPeakCompactionTracker.set(false); 1838 } 1839 } 1840 if (this.getCoprocessorHost() != null) { 1841 this.getCoprocessorHost().postCompactSelection( 1842 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1843 compaction.getRequest(), user); 1844 } 1845 // Finally, we have the resulting files list. Check if we have any files at all. 1846 request = compaction.getRequest(); 1847 Collection<HStoreFile> selectedFiles = request.getFiles(); 1848 if (selectedFiles.isEmpty()) { 1849 return Optional.empty(); 1850 } 1851 1852 addToCompactingFiles(selectedFiles); 1853 1854 // If we're enqueuing a major, clear the force flag. 1855 this.forceMajor = this.forceMajor && !request.isMajor(); 1856 1857 // Set common request properties. 1858 // Set priority, either override value supplied by caller or from store. 1859 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); 1860 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1861 request.setTracker(tracker); 1862 } 1863 } finally { 1864 this.lock.readLock().unlock(); 1865 } 1866 1867 if (LOG.isDebugEnabled()) { 1868 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() 1869 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1870 + (request.isAllFiles() ? " (all files)" : "")); 1871 } 1872 this.region.reportCompactionRequestStart(request.isMajor()); 1873 return Optional.of(compaction); 1874 } 1875 1876 /** Adds the files to compacting files. filesCompacting must be locked. */ 1877 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1878 if (CollectionUtils.isEmpty(filesToAdd)) { 1879 return; 1880 } 1881 // Check that we do not try to compact the same StoreFile twice. 1882 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1883 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1884 } 1885 filesCompacting.addAll(filesToAdd); 1886 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1887 } 1888 1889 private void removeUnneededFiles() throws IOException { 1890 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; 1891 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1892 LOG.debug("Skipping expired store file removal due to min version being {}", 1893 getColumnFamilyDescriptor().getMinVersions()); 1894 return; 1895 } 1896 this.lock.readLock().lock(); 1897 Collection<HStoreFile> delSfs = null; 1898 try { 1899 synchronized (filesCompacting) { 1900 long cfTtl = getStoreFileTtl(); 1901 if (cfTtl != Long.MAX_VALUE) { 1902 delSfs = storeEngine.getStoreFileManager().getUnneededFiles( 1903 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1904 addToCompactingFiles(delSfs); 1905 } 1906 } 1907 } finally { 1908 this.lock.readLock().unlock(); 1909 } 1910 1911 if (CollectionUtils.isEmpty(delSfs)) { 1912 return; 1913 } 1914 1915 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1916 writeCompactionWalRecord(delSfs, newFiles); 1917 replaceStoreFiles(delSfs, newFiles); 1918 completeCompaction(delSfs); 1919 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " 1920 + this + " of " + this.getRegionInfo().getRegionNameAsString() 1921 + "; total size for store is " 1922 + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1923 } 1924 1925 public void cancelRequestedCompaction(CompactionContext compaction) { 1926 finishCompactionRequest(compaction.getRequest()); 1927 } 1928 1929 private void finishCompactionRequest(CompactionRequestImpl cr) { 1930 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1931 if (cr.isOffPeak()) { 1932 offPeakCompactionTracker.set(false); 1933 cr.setOffPeak(false); 1934 } 1935 synchronized (filesCompacting) { 1936 filesCompacting.removeAll(cr.getFiles()); 1937 } 1938 } 1939 1940 /** 1941 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 1942 * operation. 1943 * @param path the path to the store file 1944 */ 1945 private void validateStoreFile(Path path) throws IOException { 1946 HStoreFile storeFile = null; 1947 try { 1948 storeFile = createStoreFileAndReader(path); 1949 } catch (IOException e) { 1950 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 1951 throw e; 1952 } finally { 1953 if (storeFile != null) { 1954 storeFile.closeStoreFile(false); 1955 } 1956 } 1957 } 1958 1959 /** 1960 * Update counts. 1961 * @param compactedFiles list of files that were compacted 1962 */ 1963 @VisibleForTesting 1964 protected void completeCompaction(Collection<HStoreFile> compactedFiles) 1965 // Rename this method! TODO. 1966 throws IOException { 1967 this.storeSize.set(0L); 1968 this.totalUncompressedBytes.set(0L); 1969 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1970 StoreFileReader r = hsf.getReader(); 1971 if (r == null) { 1972 LOG.warn("StoreFile {} has a null Reader", hsf); 1973 continue; 1974 } 1975 this.storeSize.addAndGet(r.length()); 1976 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1977 } 1978 } 1979 1980 /* 1981 * @param wantedVersions How many versions were asked for. 1982 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1983 */ 1984 int versionsToReturn(final int wantedVersions) { 1985 if (wantedVersions <= 0) { 1986 throw new IllegalArgumentException("Number of versions must be > 0"); 1987 } 1988 // Make sure we do not return more than maximum versions for this store. 1989 int maxVersions = this.family.getMaxVersions(); 1990 return wantedVersions > maxVersions ? maxVersions: wantedVersions; 1991 } 1992 1993 @Override 1994 public boolean canSplit() { 1995 this.lock.readLock().lock(); 1996 try { 1997 // Not split-able if we find a reference store file present in the store. 1998 boolean result = !hasReferences(); 1999 if (!result) { 2000 LOG.trace("Not splittable; has references: {}", this); 2001 } 2002 return result; 2003 } finally { 2004 this.lock.readLock().unlock(); 2005 } 2006 } 2007 2008 /** 2009 * Determines if Store should be split. 2010 */ 2011 public Optional<byte[]> getSplitPoint() { 2012 this.lock.readLock().lock(); 2013 try { 2014 // Should already be enforced by the split policy! 2015 assert !this.getRegionInfo().isMetaRegion(); 2016 // Not split-able if we find a reference store file present in the store. 2017 if (hasReferences()) { 2018 LOG.trace("Not splittable; has references: {}", this); 2019 return Optional.empty(); 2020 } 2021 return this.storeEngine.getStoreFileManager().getSplitPoint(); 2022 } catch(IOException e) { 2023 LOG.warn("Failed getting store size for {}", this, e); 2024 } finally { 2025 this.lock.readLock().unlock(); 2026 } 2027 return Optional.empty(); 2028 } 2029 2030 @Override 2031 public long getLastCompactSize() { 2032 return this.lastCompactSize; 2033 } 2034 2035 @Override 2036 public long getSize() { 2037 return storeSize.get(); 2038 } 2039 2040 public void triggerMajorCompaction() { 2041 this.forceMajor = true; 2042 } 2043 2044 ////////////////////////////////////////////////////////////////////////////// 2045 // File administration 2046 ////////////////////////////////////////////////////////////////////////////// 2047 2048 /** 2049 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 2050 * compaction. 2051 * @param scan Scan to apply when scanning the stores 2052 * @param targetCols columns to scan 2053 * @return a scanner over the current key values 2054 * @throws IOException on failure 2055 */ 2056 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 2057 throws IOException { 2058 lock.readLock().lock(); 2059 try { 2060 ScanInfo scanInfo; 2061 if (this.getCoprocessorHost() != null) { 2062 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this); 2063 } else { 2064 scanInfo = getScanInfo(); 2065 } 2066 return createScanner(scan, scanInfo, targetCols, readPt); 2067 } finally { 2068 lock.readLock().unlock(); 2069 } 2070 } 2071 2072 // HMobStore will override this method to return its own implementation. 2073 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 2074 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 2075 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 2076 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 2077 } 2078 2079 /** 2080 * Recreates the scanners on the current list of active store file scanners 2081 * @param currentFileScanners the current set of active store file scanners 2082 * @param cacheBlocks cache the blocks or not 2083 * @param usePread use pread or not 2084 * @param isCompaction is the scanner for compaction 2085 * @param matcher the scan query matcher 2086 * @param startRow the scan's start row 2087 * @param includeStartRow should the scan include the start row 2088 * @param stopRow the scan's stop row 2089 * @param includeStopRow should the scan include the stop row 2090 * @param readPt the read point of the current scane 2091 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 2092 * @return list of scanners recreated on the current Scanners 2093 * @throws IOException 2094 */ 2095 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 2096 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 2097 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 2098 boolean includeMemstoreScanner) throws IOException { 2099 this.lock.readLock().lock(); 2100 try { 2101 Map<String, HStoreFile> name2File = 2102 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 2103 for (HStoreFile file : getStorefiles()) { 2104 name2File.put(file.getFileInfo().getActiveFileName(), file); 2105 } 2106 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 2107 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 2108 name2File.put(file.getFileInfo().getActiveFileName(), file); 2109 } 2110 List<HStoreFile> filesToReopen = new ArrayList<>(); 2111 for (KeyValueScanner kvs : currentFileScanners) { 2112 assert kvs.isFileScanner(); 2113 if (kvs.peek() == null) { 2114 continue; 2115 } 2116 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 2117 } 2118 if (filesToReopen.isEmpty()) { 2119 return null; 2120 } 2121 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 2122 includeStartRow, stopRow, includeStopRow, readPt, false); 2123 } finally { 2124 this.lock.readLock().unlock(); 2125 } 2126 } 2127 2128 @Override 2129 public String toString() { 2130 return this.getColumnFamilyName(); 2131 } 2132 2133 @Override 2134 public int getStorefilesCount() { 2135 return this.storeEngine.getStoreFileManager().getStorefileCount(); 2136 } 2137 2138 @Override 2139 public int getCompactedFilesCount() { 2140 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 2141 } 2142 2143 private LongStream getStoreFileAgeStream() { 2144 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2145 if (sf.getReader() == null) { 2146 LOG.warn("StoreFile {} has a null Reader", sf); 2147 return false; 2148 } else { 2149 return true; 2150 } 2151 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 2152 .map(t -> EnvironmentEdgeManager.currentTime() - t); 2153 } 2154 2155 @Override 2156 public OptionalLong getMaxStoreFileAge() { 2157 return getStoreFileAgeStream().max(); 2158 } 2159 2160 @Override 2161 public OptionalLong getMinStoreFileAge() { 2162 return getStoreFileAgeStream().min(); 2163 } 2164 2165 @Override 2166 public OptionalDouble getAvgStoreFileAge() { 2167 return getStoreFileAgeStream().average(); 2168 } 2169 2170 @Override 2171 public long getNumReferenceFiles() { 2172 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2173 .filter(HStoreFile::isReference).count(); 2174 } 2175 2176 @Override 2177 public long getNumHFiles() { 2178 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2179 .filter(HStoreFile::isHFile).count(); 2180 } 2181 2182 @Override 2183 public long getStoreSizeUncompressed() { 2184 return this.totalUncompressedBytes.get(); 2185 } 2186 2187 @Override 2188 public long getStorefilesSize() { 2189 // Include all StoreFiles 2190 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); 2191 } 2192 2193 @Override 2194 public long getHFilesSize() { 2195 // Include only StoreFiles which are HFiles 2196 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 2197 HStoreFile::isHFile); 2198 } 2199 2200 private long getTotalUncompressedBytes(List<HStoreFile> files) { 2201 return files.stream() 2202 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) 2203 .sum(); 2204 } 2205 2206 private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) { 2207 return files.stream().filter(predicate) 2208 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); 2209 } 2210 2211 private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) { 2212 if (file == null) { 2213 return 0L; 2214 } 2215 StoreFileReader reader = file.getReader(); 2216 if (reader == null) { 2217 return 0L; 2218 } 2219 return f.applyAsLong(reader); 2220 } 2221 2222 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 2223 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2224 .mapToLong(file -> getStorefileFieldSize(file, f)).sum(); 2225 } 2226 2227 @Override 2228 public long getStorefilesRootLevelIndexSize() { 2229 return getStorefilesFieldSize(StoreFileReader::indexSize); 2230 } 2231 2232 @Override 2233 public long getTotalStaticIndexSize() { 2234 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 2235 } 2236 2237 @Override 2238 public long getTotalStaticBloomSize() { 2239 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 2240 } 2241 2242 @Override 2243 public MemStoreSize getMemStoreSize() { 2244 return this.memstore.size(); 2245 } 2246 2247 @Override 2248 public int getCompactPriority() { 2249 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 2250 if (priority == PRIORITY_USER) { 2251 LOG.warn("Compaction priority is USER despite there being no user compaction"); 2252 } 2253 return priority; 2254 } 2255 2256 public boolean throttleCompaction(long compactionSize) { 2257 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 2258 } 2259 2260 public HRegion getHRegion() { 2261 return this.region; 2262 } 2263 2264 public RegionCoprocessorHost getCoprocessorHost() { 2265 return this.region.getCoprocessorHost(); 2266 } 2267 2268 @Override 2269 public RegionInfo getRegionInfo() { 2270 return this.fs.getRegionInfo(); 2271 } 2272 2273 @Override 2274 public boolean areWritesEnabled() { 2275 return this.region.areWritesEnabled(); 2276 } 2277 2278 @Override 2279 public long getSmallestReadPoint() { 2280 return this.region.getSmallestReadPoint(); 2281 } 2282 2283 /** 2284 * Adds or replaces the specified KeyValues. 2285 * <p> 2286 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 2287 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 2288 * <p> 2289 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 2290 * across all of them. 2291 * @param readpoint readpoint below which we can safely remove duplicate KVs 2292 * @throws IOException 2293 */ 2294 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 2295 throws IOException { 2296 this.lock.readLock().lock(); 2297 try { 2298 this.memstore.upsert(cells, readpoint, memstoreSizing); 2299 } finally { 2300 this.lock.readLock().unlock(); 2301 } 2302 } 2303 2304 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 2305 return new StoreFlusherImpl(cacheFlushId, tracker); 2306 } 2307 2308 private final class StoreFlusherImpl implements StoreFlushContext { 2309 2310 private final FlushLifeCycleTracker tracker; 2311 private final long cacheFlushSeqNum; 2312 private MemStoreSnapshot snapshot; 2313 private List<Path> tempFiles; 2314 private List<Path> committedFiles; 2315 private long cacheFlushCount; 2316 private long cacheFlushSize; 2317 private long outputFileSize; 2318 2319 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 2320 this.cacheFlushSeqNum = cacheFlushSeqNum; 2321 this.tracker = tracker; 2322 } 2323 2324 /** 2325 * This is not thread safe. The caller should have a lock on the region or the store. 2326 * If necessary, the lock can be added with the patch provided in HBASE-10087 2327 */ 2328 @Override 2329 public MemStoreSize prepare() { 2330 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 2331 this.snapshot = memstore.snapshot(); 2332 this.cacheFlushCount = snapshot.getCellsCount(); 2333 this.cacheFlushSize = snapshot.getDataSize(); 2334 committedFiles = new ArrayList<>(1); 2335 return snapshot.getMemStoreSize(); 2336 } 2337 2338 @Override 2339 public void flushCache(MonitoredTask status) throws IOException { 2340 RegionServerServices rsService = region.getRegionServerServices(); 2341 ThroughputController throughputController = 2342 rsService == null ? null : rsService.getFlushThroughputController(); 2343 tempFiles = 2344 HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); 2345 } 2346 2347 @Override 2348 public boolean commit(MonitoredTask status) throws IOException { 2349 if (CollectionUtils.isEmpty(this.tempFiles)) { 2350 return false; 2351 } 2352 List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); 2353 for (Path storeFilePath : tempFiles) { 2354 try { 2355 HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); 2356 outputFileSize += sf.getReader().length(); 2357 storeFiles.add(sf); 2358 } catch (IOException ex) { 2359 LOG.error("Failed to commit store file {}", storeFilePath, ex); 2360 // Try to delete the files we have committed before. 2361 for (HStoreFile sf : storeFiles) { 2362 Path pathToDelete = sf.getPath(); 2363 try { 2364 sf.deleteStoreFile(); 2365 } catch (IOException deleteEx) { 2366 LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " 2367 + "halting {}", pathToDelete, ex); 2368 Runtime.getRuntime().halt(1); 2369 } 2370 } 2371 throw new IOException("Failed to commit the flush", ex); 2372 } 2373 } 2374 2375 for (HStoreFile sf : storeFiles) { 2376 if (HStore.this.getCoprocessorHost() != null) { 2377 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2378 } 2379 committedFiles.add(sf.getPath()); 2380 } 2381 2382 HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); 2383 HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); 2384 HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); 2385 2386 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2387 return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); 2388 } 2389 2390 @Override 2391 public long getOutputFileSize() { 2392 return outputFileSize; 2393 } 2394 2395 @Override 2396 public List<Path> getCommittedFiles() { 2397 return committedFiles; 2398 } 2399 2400 /** 2401 * Similar to commit, but called in secondary region replicas for replaying the 2402 * flush cache from primary region. Adds the new files to the store, and drops the 2403 * snapshot depending on dropMemstoreSnapshot argument. 2404 * @param fileNames names of the flushed files 2405 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2406 * @throws IOException 2407 */ 2408 @Override 2409 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2410 throws IOException { 2411 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2412 for (String file : fileNames) { 2413 // open the file as a store file (hfile link, etc) 2414 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); 2415 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 2416 storeFiles.add(storeFile); 2417 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2418 HStore.this.totalUncompressedBytes 2419 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2420 if (LOG.isInfoEnabled()) { 2421 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() + 2422 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + 2423 ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2424 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2425 } 2426 } 2427 2428 long snapshotId = -1; // -1 means do not drop 2429 if (dropMemstoreSnapshot && snapshot != null) { 2430 snapshotId = snapshot.getId(); 2431 snapshot.close(); 2432 } 2433 HStore.this.updateStorefiles(storeFiles, snapshotId); 2434 } 2435 2436 /** 2437 * Abort the snapshot preparation. Drops the snapshot if any. 2438 * @throws IOException 2439 */ 2440 @Override 2441 public void abort() throws IOException { 2442 if (snapshot != null) { 2443 //We need to close the snapshot when aborting, otherwise, the segment scanner 2444 //won't be closed. If we are using MSLAB, the chunk referenced by those scanners 2445 //can't be released, thus memory leak 2446 snapshot.close(); 2447 HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); 2448 } 2449 } 2450 } 2451 2452 @Override 2453 public boolean needsCompaction() { 2454 List<HStoreFile> filesCompactingClone = null; 2455 synchronized (filesCompacting) { 2456 filesCompactingClone = Lists.newArrayList(filesCompacting); 2457 } 2458 return this.storeEngine.needsCompaction(filesCompactingClone); 2459 } 2460 2461 /** 2462 * Used for tests. 2463 * @return cache configuration for this Store. 2464 */ 2465 @VisibleForTesting 2466 public CacheConfig getCacheConfig() { 2467 return this.cacheConf; 2468 } 2469 2470 public static final long FIXED_OVERHEAD = 2471 ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 2472 + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); 2473 2474 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD 2475 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK 2476 + ClassSize.CONCURRENT_SKIPLISTMAP 2477 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT 2478 + ScanInfo.FIXED_OVERHEAD); 2479 2480 @Override 2481 public long heapSize() { 2482 MemStoreSize memstoreSize = this.memstore.size(); 2483 return DEEP_OVERHEAD + memstoreSize.getHeapSize(); 2484 } 2485 2486 @Override 2487 public CellComparator getComparator() { 2488 return comparator; 2489 } 2490 2491 public ScanInfo getScanInfo() { 2492 return scanInfo; 2493 } 2494 2495 /** 2496 * Set scan info, used by test 2497 * @param scanInfo new scan info to use for test 2498 */ 2499 void setScanInfo(ScanInfo scanInfo) { 2500 this.scanInfo = scanInfo; 2501 } 2502 2503 @Override 2504 public boolean hasTooManyStoreFiles() { 2505 return getStorefilesCount() > this.blockingFileCount; 2506 } 2507 2508 @Override 2509 public long getFlushedCellsCount() { 2510 return flushedCellsCount.get(); 2511 } 2512 2513 @Override 2514 public long getFlushedCellsSize() { 2515 return flushedCellsSize.get(); 2516 } 2517 2518 @Override 2519 public long getFlushedOutputFileSize() { 2520 return flushedOutputFileSize.get(); 2521 } 2522 2523 @Override 2524 public long getCompactedCellsCount() { 2525 return compactedCellsCount.get(); 2526 } 2527 2528 @Override 2529 public long getCompactedCellsSize() { 2530 return compactedCellsSize.get(); 2531 } 2532 2533 @Override 2534 public long getMajorCompactedCellsCount() { 2535 return majorCompactedCellsCount.get(); 2536 } 2537 2538 @Override 2539 public long getMajorCompactedCellsSize() { 2540 return majorCompactedCellsSize.get(); 2541 } 2542 2543 /** 2544 * Returns the StoreEngine that is backing this concrete implementation of Store. 2545 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2546 */ 2547 @VisibleForTesting 2548 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2549 return this.storeEngine; 2550 } 2551 2552 protected OffPeakHours getOffPeakHours() { 2553 return this.offPeakHours; 2554 } 2555 2556 /** 2557 * {@inheritDoc} 2558 */ 2559 @Override 2560 public void onConfigurationChange(Configuration conf) { 2561 this.conf = new CompoundConfiguration() 2562 .add(conf) 2563 .addBytesMap(family.getValues()); 2564 this.storeEngine.compactionPolicy.setConf(conf); 2565 this.offPeakHours = OffPeakHours.getInstance(conf); 2566 } 2567 2568 /** 2569 * {@inheritDoc} 2570 */ 2571 @Override 2572 public void registerChildren(ConfigurationManager manager) { 2573 // No children to register 2574 } 2575 2576 /** 2577 * {@inheritDoc} 2578 */ 2579 @Override 2580 public void deregisterChildren(ConfigurationManager manager) { 2581 // No children to deregister 2582 } 2583 2584 @Override 2585 public double getCompactionPressure() { 2586 return storeEngine.getStoreFileManager().getCompactionPressure(); 2587 } 2588 2589 @Override 2590 public boolean isPrimaryReplicaStore() { 2591 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2592 } 2593 2594 /** 2595 * Sets the store up for a region level snapshot operation. 2596 * @see #postSnapshotOperation() 2597 */ 2598 public void preSnapshotOperation() { 2599 archiveLock.lock(); 2600 } 2601 2602 /** 2603 * Perform tasks needed after the completion of snapshot operation. 2604 * @see #preSnapshotOperation() 2605 */ 2606 public void postSnapshotOperation() { 2607 archiveLock.unlock(); 2608 } 2609 2610 /** 2611 * Closes and archives the compacted files under this store 2612 */ 2613 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2614 // ensure other threads do not attempt to archive the same files on close() 2615 archiveLock.lock(); 2616 try { 2617 lock.readLock().lock(); 2618 Collection<HStoreFile> copyCompactedfiles = null; 2619 try { 2620 Collection<HStoreFile> compactedfiles = 2621 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2622 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2623 // Do a copy under read lock 2624 copyCompactedfiles = new ArrayList<>(compactedfiles); 2625 } else { 2626 LOG.trace("No compacted files to archive"); 2627 } 2628 } finally { 2629 lock.readLock().unlock(); 2630 } 2631 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2632 removeCompactedfiles(copyCompactedfiles); 2633 } 2634 } finally { 2635 archiveLock.unlock(); 2636 } 2637 } 2638 2639 /** 2640 * Archives and removes the compacted files 2641 * @param compactedfiles The compacted files in this store that are not active in reads 2642 */ 2643 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles) 2644 throws IOException { 2645 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2646 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2647 for (final HStoreFile file : compactedfiles) { 2648 synchronized (file) { 2649 try { 2650 StoreFileReader r = file.getReader(); 2651 if (r == null) { 2652 LOG.debug("The file {} was closed but still not archived", file); 2653 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2654 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2655 // for that. 2656 long length = getStoreFileSize(file); 2657 filesToRemove.add(file); 2658 storeFileSizes.add(length); 2659 continue; 2660 } 2661 2662 if (file.isCompactedAway() && !file.isReferencedInReads()) { 2663 // Even if deleting fails we need not bother as any new scanners won't be 2664 // able to use the compacted file as the status is already compactedAway 2665 LOG.trace("Closing and archiving the file {}", file); 2666 // Copy the file size before closing the reader 2667 final long length = r.length(); 2668 r.close(true); 2669 // Just close and return 2670 filesToRemove.add(file); 2671 // Only add the length if we successfully added the file to `filesToRemove` 2672 storeFileSizes.add(length); 2673 } else { 2674 LOG.info("Can't archive compacted file " + file.getPath() 2675 + " because of either isCompactedAway=" + file.isCompactedAway() 2676 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2677 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2678 } 2679 } catch (Exception e) { 2680 LOG.error("Exception while trying to close the compacted store file {}", file.getPath(), 2681 e); 2682 } 2683 } 2684 } 2685 if (this.isPrimaryReplicaStore()) { 2686 // Only the primary region is allowed to move the file to archive. 2687 // The secondary region does not move the files to archive. Any active reads from 2688 // the secondary region will still work because the file as such has active readers on it. 2689 if (!filesToRemove.isEmpty()) { 2690 LOG.debug("Moving the files {} to archive", filesToRemove); 2691 // Only if this is successful it has to be removed 2692 try { 2693 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 2694 } catch (FailedArchiveException fae) { 2695 // Even if archiving some files failed, we still need to clear out any of the 2696 // files which were successfully archived. Otherwise we will receive a 2697 // FileNotFoundException when we attempt to re-archive them in the next go around. 2698 Collection<Path> failedFiles = fae.getFailedFiles(); 2699 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2700 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2701 while (iter.hasNext()) { 2702 sizeIter.next(); 2703 if (failedFiles.contains(iter.next().getPath())) { 2704 iter.remove(); 2705 sizeIter.remove(); 2706 } 2707 } 2708 if (!filesToRemove.isEmpty()) { 2709 clearCompactedfiles(filesToRemove); 2710 } 2711 throw fae; 2712 } 2713 } 2714 } 2715 if (!filesToRemove.isEmpty()) { 2716 // Clear the compactedfiles from the store file manager 2717 clearCompactedfiles(filesToRemove); 2718 // Try to send report of this archival to the Master for updating quota usage faster 2719 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2720 } 2721 } 2722 2723 /** 2724 * Computes the length of a store file without succumbing to any errors along the way. If an 2725 * error is encountered, the implementation returns {@code 0} instead of the actual size. 2726 * 2727 * @param file The file to compute the size of. 2728 * @return The size in bytes of the provided {@code file}. 2729 */ 2730 long getStoreFileSize(HStoreFile file) { 2731 long length = 0; 2732 try { 2733 file.initReader(); 2734 length = file.getReader().length(); 2735 } catch (IOException e) { 2736 LOG.trace("Failed to open reader when trying to compute store file size, ignoring", e); 2737 } finally { 2738 try { 2739 file.closeStoreFile( 2740 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2741 } catch (IOException e) { 2742 LOG.trace("Failed to close reader after computing store file size, ignoring", e); 2743 } 2744 } 2745 return length; 2746 } 2747 2748 public Long preFlushSeqIDEstimation() { 2749 return memstore.preFlushSeqIDEstimation(); 2750 } 2751 2752 @Override 2753 public boolean isSloppyMemStore() { 2754 return this.memstore.isSloppy(); 2755 } 2756 2757 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2758 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2759 try { 2760 lock.writeLock().lock(); 2761 this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); 2762 } finally { 2763 lock.writeLock().unlock(); 2764 } 2765 } 2766 2767 @Override 2768 public int getCurrentParallelPutCount() { 2769 return currentParallelPutCount.get(); 2770 } 2771 2772 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2773 // Sanity check from the caller 2774 if (archivedFiles.size() != fileSizes.size()) { 2775 throw new RuntimeException("Coding error: should never see lists of varying size"); 2776 } 2777 RegionServerServices rss = this.region.getRegionServerServices(); 2778 if (rss == null) { 2779 return; 2780 } 2781 List<Entry<String,Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2782 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2783 for (StoreFile storeFile : archivedFiles) { 2784 final long fileSize = fileSizeIter.next(); 2785 if (storeFile.isHFile() && fileSize != 0) { 2786 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2787 } 2788 } 2789 if (LOG.isTraceEnabled()) { 2790 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2791 + filesWithSizes); 2792 } 2793 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2794 if (!success) { 2795 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2796 } 2797 } 2798}