001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.NavigableSet; 034import java.util.Optional; 035import java.util.OptionalDouble; 036import java.util.OptionalLong; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.concurrent.CompletionService; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorCompletionService; 043import java.util.concurrent.Future; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.locks.ReentrantLock; 049import java.util.concurrent.locks.ReentrantReadWriteLock; 050import java.util.function.Predicate; 051import java.util.function.ToLongFunction; 052import java.util.stream.Collectors; 053import java.util.stream.LongStream; 054 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsAction; 059import org.apache.hadoop.hbase.Cell; 060import org.apache.hadoop.hbase.CellComparator; 061import org.apache.hadoop.hbase.CellUtil; 062import org.apache.hadoop.hbase.CompoundConfiguration; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.MemoryCompactionPolicy; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.backup.FailedArchiveException; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.conf.ConfigurationManager; 071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 072import org.apache.hadoop.hbase.io.HeapSize; 073import org.apache.hadoop.hbase.io.compress.Compression; 074import org.apache.hadoop.hbase.io.crypto.Encryption; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.HFile; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 083import org.apache.hadoop.hbase.log.HBaseMarkers; 084import org.apache.hadoop.hbase.monitoring.MonitoredTask; 085import org.apache.hadoop.hbase.quotas.RegionSizeStore; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 088import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 089import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 090import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 091import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 092import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 093import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 094import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 095import org.apache.hadoop.hbase.security.EncryptionUtil; 096import org.apache.hadoop.hbase.security.User; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.ChecksumType; 099import org.apache.hadoop.hbase.util.ClassSize; 100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.ReflectionUtils; 103import org.apache.hadoop.util.StringUtils; 104import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 105import org.apache.yetus.audience.InterfaceAudience; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 109import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 110import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 112import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 113import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 114import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 119 120/** 121 * A Store holds a column family in a Region. Its a memstore and a set of zero 122 * or more StoreFiles, which stretch backwards over time. 123 * 124 * <p>There's no reason to consider append-logging at this level; all logging 125 * and locking is handled at the HRegion level. Store just provides 126 * services to manage sets of StoreFiles. One of the most important of those 127 * services is compaction services where files are aggregated once they pass 128 * a configurable threshold. 129 * 130 * <p>Locking and transactions are handled at a higher level. This API should 131 * not be called directly but by an HRegion manager. 132 */ 133@InterfaceAudience.Private 134public class HStore implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 135 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 136 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 137 "hbase.server.compactchecker.interval.multiplier"; 138 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 139 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 140 // keep in accordance with HDFS default storage policy 141 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; 142 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 143 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 144 145 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 146 147 protected final MemStore memstore; 148 // This stores directory in the filesystem. 149 protected final HRegion region; 150 private final ColumnFamilyDescriptor family; 151 private final HRegionFileSystem fs; 152 protected Configuration conf; 153 protected CacheConfig cacheConf; 154 private long lastCompactSize = 0; 155 volatile boolean forceMajor = false; 156 /* how many bytes to write between status checks */ 157 static int closeCheckInterval = 0; 158 private AtomicLong storeSize = new AtomicLong(); 159 private AtomicLong totalUncompressedBytes = new AtomicLong(); 160 161 /** 162 * RWLock for store operations. 163 * Locked in shared mode when the list of component stores is looked at: 164 * - all reads/writes to table data 165 * - checking for split 166 * Locked in exclusive mode when the list of component stores is modified: 167 * - closing 168 * - completing a compaction 169 */ 170 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 171 /** 172 * Lock specific to archiving compacted store files. This avoids races around 173 * the combination of retrieving the list of compacted files and moving them to 174 * the archive directory. Since this is usually a background process (other than 175 * on close), we don't want to handle this with the store write lock, which would 176 * block readers and degrade performance. 177 * 178 * Locked by: 179 * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() 180 * - close() 181 */ 182 final ReentrantLock archiveLock = new ReentrantLock(); 183 184 private final boolean verifyBulkLoads; 185 186 /** 187 * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the 188 * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will 189 * log a message that identifies the Store experience this high-level of concurrency. 190 */ 191 private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); 192 private final int parallelPutCountPrintThreshold; 193 194 private ScanInfo scanInfo; 195 196 // All access must be synchronized. 197 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 198 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 199 200 // All access must be synchronized. 201 private final Set<ChangedReadersObserver> changedReaderObservers = 202 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 203 204 protected final int blocksize; 205 private HFileDataBlockEncoder dataBlockEncoder; 206 207 /** Checksum configuration */ 208 protected ChecksumType checksumType; 209 protected int bytesPerChecksum; 210 211 // Comparing KeyValues 212 protected final CellComparator comparator; 213 214 final StoreEngine<?, ?, ?, ?> storeEngine; 215 216 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 217 private volatile OffPeakHours offPeakHours; 218 219 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 220 private int flushRetriesNumber; 221 private int pauseTime; 222 223 private long blockingFileCount; 224 private int compactionCheckMultiplier; 225 protected Encryption.Context cryptoContext = Encryption.Context.NONE; 226 227 private AtomicLong flushedCellsCount = new AtomicLong(); 228 private AtomicLong compactedCellsCount = new AtomicLong(); 229 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 230 private AtomicLong flushedCellsSize = new AtomicLong(); 231 private AtomicLong flushedOutputFileSize = new AtomicLong(); 232 private AtomicLong compactedCellsSize = new AtomicLong(); 233 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 234 235 /** 236 * Constructor 237 * @param region 238 * @param family HColumnDescriptor for this column 239 * @param confParam configuration object 240 * failed. Can be null. 241 * @throws IOException 242 */ 243 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 244 final Configuration confParam) throws IOException { 245 246 this.fs = region.getRegionFileSystem(); 247 248 // Assemble the store's home directory and Ensure it exists. 249 fs.createStoreDir(family.getNameAsString()); 250 this.region = region; 251 this.family = family; 252 // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor 253 // CompoundConfiguration will look for keys in reverse order of addition, so we'd 254 // add global config first, then table and cf overrides, then cf metadata. 255 this.conf = new CompoundConfiguration() 256 .add(confParam) 257 .addBytesMap(region.getTableDescriptor().getValues()) 258 .addStringMap(family.getConfiguration()) 259 .addBytesMap(family.getValues()); 260 this.blocksize = family.getBlocksize(); 261 262 // set block storage policy for store directory 263 String policyName = family.getStoragePolicy(); 264 if (null == policyName) { 265 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 266 } 267 this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); 268 269 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 270 271 this.comparator = region.getCellComparator(); 272 // used by ScanQueryMatcher 273 long timeToPurgeDeletes = 274 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 275 LOG.trace("Time to purge deletes set to {}ms in store {}", timeToPurgeDeletes, this); 276 // Get TTL 277 long ttl = determineTTLFromFamily(family); 278 // Why not just pass a HColumnDescriptor in here altogether? Even if have 279 // to clone it? 280 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); 281 this.memstore = getMemstore(); 282 283 this.offPeakHours = OffPeakHours.getInstance(conf); 284 285 // Setting up cache configuration for this family 286 createCacheConf(family); 287 288 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 289 290 this.blockingFileCount = 291 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 292 this.compactionCheckMultiplier = conf.getInt( 293 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 294 if (this.compactionCheckMultiplier <= 0) { 295 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 296 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 297 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 298 } 299 300 if (HStore.closeCheckInterval == 0) { 301 HStore.closeCheckInterval = conf.getInt( 302 "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); 303 } 304 305 this.storeEngine = createStoreEngine(this, this.conf, this.comparator); 306 List<HStoreFile> hStoreFiles = loadStoreFiles(); 307 // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read 308 // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and 309 // update the storeSize in the completeCompaction(..) finally (just like compaction) , so 310 // no need calculate the storeSize twice. 311 this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); 312 this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); 313 this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); 314 315 // Initialize checksum type from name. The names are CRC32, CRC32C, etc. 316 this.checksumType = getChecksumType(conf); 317 // Initialize bytes per checksum 318 this.bytesPerChecksum = getBytesPerChecksum(conf); 319 flushRetriesNumber = conf.getInt( 320 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 321 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 322 if (flushRetriesNumber <= 0) { 323 throw new IllegalArgumentException( 324 "hbase.hstore.flush.retries.number must be > 0, not " 325 + flushRetriesNumber); 326 } 327 cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); 328 329 int confPrintThreshold = 330 this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); 331 if (confPrintThreshold < 10) { 332 confPrintThreshold = 10; 333 } 334 this.parallelPutCountPrintThreshold = confPrintThreshold; 335 LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 336 + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", 337 getColumnFamilyName(), memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, 338 parallelPutCountPrintThreshold, family.getDataBlockEncoding(), 339 family.getCompressionType()); 340 } 341 342 /** 343 * @return MemStore Instance to use in this store. 344 */ 345 private MemStore getMemstore() { 346 MemStore ms = null; 347 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 348 MemoryCompactionPolicy inMemoryCompaction = null; 349 if (this.getTableName().isSystemTable()) { 350 inMemoryCompaction = MemoryCompactionPolicy 351 .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase()); 352 } else { 353 inMemoryCompaction = family.getInMemoryCompaction(); 354 } 355 if (inMemoryCompaction == null) { 356 inMemoryCompaction = 357 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 358 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT)); 359 } 360 switch (inMemoryCompaction) { 361 case NONE: 362 ms = ReflectionUtils.newInstance(DefaultMemStore.class, 363 new Object[] { conf, this.comparator, 364 this.getHRegion().getRegionServicesForStores()}); 365 break; 366 default: 367 Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME, 368 CompactingMemStore.class, CompactingMemStore.class); 369 ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this, 370 this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); 371 } 372 return ms; 373 } 374 375 /** 376 * Creates the cache config. 377 * @param family The current column family. 378 */ 379 protected void createCacheConf(final ColumnFamilyDescriptor family) { 380 this.cacheConf = new CacheConfig(conf, family); 381 } 382 383 /** 384 * Creates the store engine configured for the given Store. 385 * @param store The store. An unfortunate dependency needed due to it 386 * being passed to coprocessors via the compactor. 387 * @param conf Store configuration. 388 * @param kvComparator KVComparator for storeFileManager. 389 * @return StoreEngine to use. 390 */ 391 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 392 CellComparator kvComparator) throws IOException { 393 return StoreEngine.create(store, conf, comparator); 394 } 395 396 /** 397 * @param family 398 * @return TTL in seconds of the specified family 399 */ 400 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 401 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 402 long ttl = family.getTimeToLive(); 403 if (ttl == HConstants.FOREVER) { 404 // Default is unlimited ttl. 405 ttl = Long.MAX_VALUE; 406 } else if (ttl == -1) { 407 ttl = Long.MAX_VALUE; 408 } else { 409 // Second -> ms adjust for user data 410 ttl *= 1000; 411 } 412 return ttl; 413 } 414 415 @Override 416 public String getColumnFamilyName() { 417 return this.family.getNameAsString(); 418 } 419 420 @Override 421 public TableName getTableName() { 422 return this.getRegionInfo().getTable(); 423 } 424 425 @Override 426 public FileSystem getFileSystem() { 427 return this.fs.getFileSystem(); 428 } 429 430 public HRegionFileSystem getRegionFileSystem() { 431 return this.fs; 432 } 433 434 /* Implementation of StoreConfigInformation */ 435 @Override 436 public long getStoreFileTtl() { 437 // TTL only applies if there's no MIN_VERSIONs setting on the column. 438 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 439 } 440 441 @Override 442 public long getMemStoreFlushSize() { 443 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 444 return this.region.memstoreFlushSize; 445 } 446 447 @Override 448 public MemStoreSize getFlushableSize() { 449 return this.memstore.getFlushableSize(); 450 } 451 452 @Override 453 public MemStoreSize getSnapshotSize() { 454 return this.memstore.getSnapshotSize(); 455 } 456 457 @Override 458 public long getCompactionCheckMultiplier() { 459 return this.compactionCheckMultiplier; 460 } 461 462 @Override 463 public long getBlockingFileCount() { 464 return blockingFileCount; 465 } 466 /* End implementation of StoreConfigInformation */ 467 468 /** 469 * Returns the configured bytesPerChecksum value. 470 * @param conf The configuration 471 * @return The bytesPerChecksum that is set in the configuration 472 */ 473 public static int getBytesPerChecksum(Configuration conf) { 474 return conf.getInt(HConstants.BYTES_PER_CHECKSUM, 475 HFile.DEFAULT_BYTES_PER_CHECKSUM); 476 } 477 478 /** 479 * Returns the configured checksum algorithm. 480 * @param conf The configuration 481 * @return The checksum algorithm that is set in the configuration 482 */ 483 public static ChecksumType getChecksumType(Configuration conf) { 484 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); 485 if (checksumName == null) { 486 return ChecksumType.getDefaultChecksumType(); 487 } else { 488 return ChecksumType.nameToType(checksumName); 489 } 490 } 491 492 /** 493 * @return how many bytes to write between status checks 494 */ 495 public static int getCloseCheckInterval() { 496 return closeCheckInterval; 497 } 498 499 @Override 500 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 501 return this.family; 502 } 503 504 @Override 505 public OptionalLong getMaxSequenceId() { 506 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 507 } 508 509 @Override 510 public OptionalLong getMaxMemStoreTS() { 511 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 512 } 513 514 /** 515 * @param tabledir {@link Path} to where the table is being stored 516 * @param hri {@link RegionInfo} for the region. 517 * @param family {@link ColumnFamilyDescriptor} describing the column family 518 * @return Path to family/Store home directory. 519 */ 520 @Deprecated 521 public static Path getStoreHomedir(final Path tabledir, 522 final RegionInfo hri, final byte[] family) { 523 return getStoreHomedir(tabledir, hri.getEncodedName(), family); 524 } 525 526 /** 527 * @param tabledir {@link Path} to where the table is being stored 528 * @param encodedName Encoded region name. 529 * @param family {@link ColumnFamilyDescriptor} describing the column family 530 * @return Path to family/Store home directory. 531 */ 532 @Deprecated 533 public static Path getStoreHomedir(final Path tabledir, 534 final String encodedName, final byte[] family) { 535 return new Path(tabledir, new Path(encodedName, Bytes.toString(family))); 536 } 537 538 /** 539 * @return the data block encoder 540 */ 541 public HFileDataBlockEncoder getDataBlockEncoder() { 542 return dataBlockEncoder; 543 } 544 545 /** 546 * Should be used only in tests. 547 * @param blockEncoder the block delta encoder to use 548 */ 549 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 550 this.dataBlockEncoder = blockEncoder; 551 } 552 553 /** 554 * Creates an unsorted list of StoreFile loaded in parallel 555 * from the given directory. 556 * @throws IOException 557 */ 558 private List<HStoreFile> loadStoreFiles() throws IOException { 559 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); 560 return openStoreFiles(files); 561 } 562 563 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException { 564 if (CollectionUtils.isEmpty(files)) { 565 return Collections.emptyList(); 566 } 567 // initialize the thread pool for opening store files in parallel.. 568 ThreadPoolExecutor storeFileOpenerThreadPool = 569 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + 570 this.getColumnFamilyName()); 571 CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool); 572 573 int totalValidStoreFile = 0; 574 for (StoreFileInfo storeFileInfo : files) { 575 // open each store file in parallel 576 completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); 577 totalValidStoreFile++; 578 } 579 580 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 581 IOException ioe = null; 582 try { 583 for (int i = 0; i < totalValidStoreFile; i++) { 584 try { 585 HStoreFile storeFile = completionService.take().get(); 586 if (storeFile != null) { 587 LOG.debug("loaded {}", storeFile); 588 results.add(storeFile); 589 } 590 } catch (InterruptedException e) { 591 if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); 592 } catch (ExecutionException e) { 593 if (ioe == null) ioe = new IOException(e.getCause()); 594 } 595 } 596 } finally { 597 storeFileOpenerThreadPool.shutdownNow(); 598 } 599 if (ioe != null) { 600 // close StoreFile readers 601 boolean evictOnClose = 602 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 603 for (HStoreFile file : results) { 604 try { 605 if (file != null) { 606 file.closeStoreFile(evictOnClose); 607 } 608 } catch (IOException e) { 609 LOG.warn("Could not close store file", e); 610 } 611 } 612 throw ioe; 613 } 614 615 return results; 616 } 617 618 @Override 619 public void refreshStoreFiles() throws IOException { 620 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName()); 621 refreshStoreFilesInternal(newFiles); 622 } 623 624 /** 625 * Replaces the store files that the store has with the given files. Mainly used by secondary 626 * region replicas to keep up to date with the primary region files. 627 * @throws IOException 628 */ 629 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 630 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 631 for (String file : newFiles) { 632 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); 633 } 634 refreshStoreFilesInternal(storeFiles); 635 } 636 637 /** 638 * Checks the underlying store files, and opens the files that have not 639 * been opened, and removes the store file readers for store files no longer 640 * available. Mainly used by secondary region replicas to keep up to date with 641 * the primary region files. 642 * @throws IOException 643 */ 644 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 645 StoreFileManager sfm = storeEngine.getStoreFileManager(); 646 Collection<HStoreFile> currentFiles = sfm.getStorefiles(); 647 Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); 648 if (currentFiles == null) currentFiles = Collections.emptySet(); 649 if (newFiles == null) newFiles = Collections.emptySet(); 650 if (compactedFiles == null) compactedFiles = Collections.emptySet(); 651 652 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 653 for (HStoreFile sf : currentFiles) { 654 currentFilesSet.put(sf.getFileInfo(), sf); 655 } 656 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 657 for (HStoreFile sf : compactedFiles) { 658 compactedFilesSet.put(sf.getFileInfo(), sf); 659 } 660 661 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 662 // Exclude the files that have already been compacted 663 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 664 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 665 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 666 667 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 668 return; 669 } 670 671 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString() 672 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); 673 674 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 675 for (StoreFileInfo sfi : toBeRemovedFiles) { 676 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 677 } 678 679 // try to open the files 680 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles); 681 682 // propogate the file changes to the underlying store file manager 683 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception 684 685 // Advance the memstore read point to be at least the new store files seqIds so that 686 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 687 // in-flight transactions might be made visible) 688 if (!toBeAddedFiles.isEmpty()) { 689 // we must have the max sequence id here as we do have several store files 690 region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); 691 } 692 693 completeCompaction(toBeRemovedStoreFiles); 694 } 695 696 @VisibleForTesting 697 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { 698 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); 699 return createStoreFileAndReader(info); 700 } 701 702 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 703 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); 704 HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, 705 this.family.getBloomFilterType(), isPrimaryReplicaStore()); 706 storeFile.initReader(); 707 return storeFile; 708 } 709 710 /** 711 * This message intends to inform the MemStore that next coming updates 712 * are going to be part of the replaying edits from WAL 713 */ 714 public void startReplayingFromWAL(){ 715 this.memstore.startReplayingFromWAL(); 716 } 717 718 /** 719 * This message intends to inform the MemStore that the replaying edits from WAL 720 * are done 721 */ 722 public void stopReplayingFromWAL(){ 723 this.memstore.stopReplayingFromWAL(); 724 } 725 726 /** 727 * Adds a value to the memstore 728 */ 729 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 730 lock.readLock().lock(); 731 try { 732 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 733 LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " + 734 "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName()); 735 } 736 this.memstore.add(cell, memstoreSizing); 737 } finally { 738 lock.readLock().unlock(); 739 currentParallelPutCount.decrementAndGet(); 740 } 741 } 742 743 /** 744 * Adds the specified value to the memstore 745 */ 746 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 747 lock.readLock().lock(); 748 try { 749 if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { 750 LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " + 751 "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName()); 752 } 753 memstore.add(cells, memstoreSizing); 754 } finally { 755 lock.readLock().unlock(); 756 currentParallelPutCount.decrementAndGet(); 757 } 758 } 759 760 @Override 761 public long timeOfOldestEdit() { 762 return memstore.timeOfOldestEdit(); 763 } 764 765 /** 766 * @return All store files. 767 */ 768 @Override 769 public Collection<HStoreFile> getStorefiles() { 770 return this.storeEngine.getStoreFileManager().getStorefiles(); 771 } 772 773 @Override 774 public Collection<HStoreFile> getCompactedFiles() { 775 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 776 } 777 778 /** 779 * This throws a WrongRegionException if the HFile does not fit in this region, or an 780 * InvalidHFileException if the HFile is not valid. 781 */ 782 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 783 HFile.Reader reader = null; 784 try { 785 LOG.info("Validating hfile at " + srcPath + " for inclusion in " 786 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); 787 FileSystem srcFs = srcPath.getFileSystem(conf); 788 srcFs.access(srcPath, FsAction.READ_WRITE); 789 reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf); 790 reader.loadFileInfo(); 791 792 Optional<byte[]> firstKey = reader.getFirstRowKey(); 793 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 794 Optional<Cell> lk = reader.getLastKey(); 795 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 796 byte[] lastKey = CellUtil.cloneRow(lk.get()); 797 798 if (LOG.isDebugEnabled()) { 799 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + 800 " last=" + Bytes.toStringBinary(lastKey)); 801 LOG.debug("Region bounds: first=" + 802 Bytes.toStringBinary(getRegionInfo().getStartKey()) + 803 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 804 } 805 806 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 807 throw new WrongRegionException( 808 "Bulk load file " + srcPath.toString() + " does not fit inside region " 809 + this.getRegionInfo().getRegionNameAsString()); 810 } 811 812 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, 813 HConstants.DEFAULT_MAX_FILE_SIZE)) { 814 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + 815 reader.length() + " bytes can be problematic as it may lead to oversplitting."); 816 } 817 818 if (verifyBulkLoads) { 819 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 820 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 821 Cell prevCell = null; 822 HFileScanner scanner = reader.getScanner(false, false, false); 823 scanner.seekTo(); 824 do { 825 Cell cell = scanner.getCell(); 826 if (prevCell != null) { 827 if (comparator.compareRows(prevCell, cell) > 0) { 828 throw new InvalidHFileException("Previous row is greater than" 829 + " current row: path=" + srcPath + " previous=" 830 + CellUtil.getCellKeyAsString(prevCell) + " current=" 831 + CellUtil.getCellKeyAsString(cell)); 832 } 833 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 834 throw new InvalidHFileException("Previous key had different" 835 + " family compared to current key: path=" + srcPath 836 + " previous=" 837 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 838 prevCell.getFamilyLength()) 839 + " current=" 840 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 841 cell.getFamilyLength())); 842 } 843 } 844 prevCell = cell; 845 } while (scanner.next()); 846 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() 847 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) 848 + " ms"); 849 } 850 } finally { 851 if (reader != null) reader.close(); 852 } 853 } 854 855 /** 856 * This method should only be called from Region. It is assumed that the ranges of values in the 857 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 858 * 859 * @param srcPathStr 860 * @param seqNum sequence Id associated with the HFile 861 */ 862 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 863 Path srcPath = new Path(srcPathStr); 864 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 865 } 866 867 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 868 Path srcPath = new Path(srcPathStr); 869 try { 870 fs.commitStoreFile(srcPath, dstPath); 871 } finally { 872 if (this.getCoprocessorHost() != null) { 873 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 874 } 875 } 876 877 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " 878 + dstPath + " - updating store file list."); 879 880 HStoreFile sf = createStoreFileAndReader(dstPath); 881 bulkLoadHFile(sf); 882 883 LOG.info("Successfully loaded store file {} into store {} (new location: {})", 884 srcPath, this, dstPath); 885 886 return dstPath; 887 } 888 889 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 890 HStoreFile sf = createStoreFileAndReader(fileInfo); 891 bulkLoadHFile(sf); 892 } 893 894 private void bulkLoadHFile(HStoreFile sf) throws IOException { 895 StoreFileReader r = sf.getReader(); 896 this.storeSize.addAndGet(r.length()); 897 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 898 899 // Append the new storefile into the list 900 this.lock.writeLock().lock(); 901 try { 902 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 903 } finally { 904 // We need the lock, as long as we are updating the storeFiles 905 // or changing the memstore. Let us release it before calling 906 // notifyChangeReadersObservers. See HBASE-4485 for a possible 907 // deadlock scenario that could have happened if continue to hold 908 // the lock. 909 this.lock.writeLock().unlock(); 910 } 911 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); 912 if (LOG.isTraceEnabled()) { 913 String traceMessage = "BULK LOAD time,size,store size,store files [" 914 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize 915 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 916 LOG.trace(traceMessage); 917 } 918 } 919 920 /** 921 * Close all the readers We don't need to worry about subsequent requests because the Region holds 922 * a write lock that will prevent any more reads or writes. 923 * @return the {@link StoreFile StoreFiles} that were previously being used. 924 * @throws IOException on failure 925 */ 926 public ImmutableCollection<HStoreFile> close() throws IOException { 927 this.archiveLock.lock(); 928 this.lock.writeLock().lock(); 929 try { 930 // Clear so metrics doesn't find them. 931 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 932 Collection<HStoreFile> compactedfiles = 933 storeEngine.getStoreFileManager().clearCompactedFiles(); 934 // clear the compacted files 935 if (CollectionUtils.isNotEmpty(compactedfiles)) { 936 removeCompactedfiles(compactedfiles, true); 937 } 938 if (!result.isEmpty()) { 939 // initialize the thread pool for closing store files in parallel. 940 ThreadPoolExecutor storeFileCloserThreadPool = this.region 941 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" 942 + this.getColumnFamilyName()); 943 944 // close each store file in parallel 945 CompletionService<Void> completionService = 946 new ExecutorCompletionService<>(storeFileCloserThreadPool); 947 for (HStoreFile f : result) { 948 completionService.submit(new Callable<Void>() { 949 @Override 950 public Void call() throws IOException { 951 boolean evictOnClose = 952 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 953 f.closeStoreFile(evictOnClose); 954 return null; 955 } 956 }); 957 } 958 959 IOException ioe = null; 960 try { 961 for (int i = 0; i < result.size(); i++) { 962 try { 963 Future<Void> future = completionService.take(); 964 future.get(); 965 } catch (InterruptedException e) { 966 if (ioe == null) { 967 ioe = new InterruptedIOException(); 968 ioe.initCause(e); 969 } 970 } catch (ExecutionException e) { 971 if (ioe == null) ioe = new IOException(e.getCause()); 972 } 973 } 974 } finally { 975 storeFileCloserThreadPool.shutdownNow(); 976 } 977 if (ioe != null) throw ioe; 978 } 979 LOG.trace("Closed {}", this); 980 return result; 981 } finally { 982 this.lock.writeLock().unlock(); 983 this.archiveLock.unlock(); 984 } 985 } 986 987 /** 988 * Snapshot this stores memstore. Call before running 989 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, 990 * FlushLifeCycleTracker)} 991 * so it has some work to do. 992 */ 993 void snapshot() { 994 this.lock.writeLock().lock(); 995 try { 996 this.memstore.snapshot(); 997 } finally { 998 this.lock.writeLock().unlock(); 999 } 1000 } 1001 1002 /** 1003 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. 1004 * @param logCacheFlushId flush sequence number 1005 * @param snapshot 1006 * @param status 1007 * @param throughputController 1008 * @return The path name of the tmp file to which the store was flushed 1009 * @throws IOException if exception occurs during process 1010 */ 1011 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 1012 MonitoredTask status, ThroughputController throughputController, 1013 FlushLifeCycleTracker tracker) throws IOException { 1014 // If an exception happens flushing, we let it out without clearing 1015 // the memstore snapshot. The old snapshot will be returned when we say 1016 // 'snapshot', the next time flush comes around. 1017 // Retry after catching exception when flushing, otherwise server will abort 1018 // itself 1019 StoreFlusher flusher = storeEngine.getStoreFlusher(); 1020 IOException lastException = null; 1021 for (int i = 0; i < flushRetriesNumber; i++) { 1022 try { 1023 List<Path> pathNames = 1024 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); 1025 Path lastPathName = null; 1026 try { 1027 for (Path pathName : pathNames) { 1028 lastPathName = pathName; 1029 validateStoreFile(pathName); 1030 } 1031 return pathNames; 1032 } catch (Exception e) { 1033 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 1034 if (e instanceof IOException) { 1035 lastException = (IOException) e; 1036 } else { 1037 lastException = new IOException(e); 1038 } 1039 } 1040 } catch (IOException e) { 1041 LOG.warn("Failed flushing store file, retrying num={}", i, e); 1042 lastException = e; 1043 } 1044 if (lastException != null && i < (flushRetriesNumber - 1)) { 1045 try { 1046 Thread.sleep(pauseTime); 1047 } catch (InterruptedException e) { 1048 IOException iie = new InterruptedIOException(); 1049 iie.initCause(e); 1050 throw iie; 1051 } 1052 } 1053 } 1054 throw lastException; 1055 } 1056 1057 /** 1058 * @param path The pathname of the tmp file into which the store was flushed 1059 * @param logCacheFlushId 1060 * @param status 1061 * @return store file created. 1062 * @throws IOException 1063 */ 1064 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) 1065 throws IOException { 1066 // Write-out finished successfully, move into the right spot 1067 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1068 1069 status.setStatus("Flushing " + this + ": reopening flushed file"); 1070 HStoreFile sf = createStoreFileAndReader(dstPath); 1071 1072 StoreFileReader r = sf.getReader(); 1073 this.storeSize.addAndGet(r.length()); 1074 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1075 1076 if (LOG.isInfoEnabled()) { 1077 LOG.info("Added " + sf + ", entries=" + r.getEntries() + 1078 ", sequenceid=" + logCacheFlushId + 1079 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1080 } 1081 return sf; 1082 } 1083 1084 /** 1085 * @param maxKeyCount 1086 * @param compression Compression algorithm to use 1087 * @param isCompaction whether we are creating a new file in a compaction 1088 * @param includeMVCCReadpoint - whether to include MVCC or not 1089 * @param includesTag - includesTag or not 1090 * @return Writer for a new StoreFile in the tmp dir. 1091 */ 1092 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of 1093 // compaction 1094 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1095 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1096 boolean shouldDropBehind) throws IOException { 1097 final CacheConfig writerCacheConf; 1098 if (isCompaction) { 1099 // Don't cache data on write on compactions. 1100 writerCacheConf = new CacheConfig(cacheConf); 1101 writerCacheConf.setCacheDataOnWrite(false); 1102 } else { 1103 writerCacheConf = cacheConf; 1104 } 1105 InetSocketAddress[] favoredNodes = null; 1106 if (region.getRegionServerServices() != null) { 1107 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( 1108 region.getRegionInfo().getEncodedName()); 1109 } 1110 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, 1111 cryptoContext); 1112 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); 1113 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, 1114 this.getFileSystem()) 1115 .withOutputDir(familyTempDir) 1116 .withComparator(comparator) 1117 .withBloomType(family.getBloomFilterType()) 1118 .withMaxKeyCount(maxKeyCount) 1119 .withFavoredNodes(favoredNodes) 1120 .withFileContext(hFileContext) 1121 .withShouldDropCacheBehind(shouldDropBehind); 1122 return builder.build(); 1123 } 1124 1125 private HFileContext createFileContext(Compression.Algorithm compression, 1126 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { 1127 if (compression == null) { 1128 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 1129 } 1130 HFileContext hFileContext = new HFileContextBuilder() 1131 .withIncludesMvcc(includeMVCCReadpoint) 1132 .withIncludesTags(includesTag) 1133 .withCompression(compression) 1134 .withCompressTags(family.isCompressTags()) 1135 .withChecksumType(checksumType) 1136 .withBytesPerCheckSum(bytesPerChecksum) 1137 .withBlockSize(blocksize) 1138 .withHBaseCheckSum(true) 1139 .withDataBlockEncoding(family.getDataBlockEncoding()) 1140 .withEncryptionContext(cryptoContext) 1141 .withCreateTime(EnvironmentEdgeManager.currentTime()) 1142 .build(); 1143 return hFileContext; 1144 } 1145 1146 1147 private long getTotalSize(Collection<HStoreFile> sfs) { 1148 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 1149 } 1150 1151 /** 1152 * Change storeFiles adding into place the Reader produced by this new flush. 1153 * @param sfs Store files 1154 * @param snapshotId 1155 * @throws IOException 1156 * @return Whether compaction is required. 1157 */ 1158 private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { 1159 this.lock.writeLock().lock(); 1160 try { 1161 this.storeEngine.getStoreFileManager().insertNewFiles(sfs); 1162 if (snapshotId > 0) { 1163 this.memstore.clearSnapshot(snapshotId); 1164 } 1165 } finally { 1166 // We need the lock, as long as we are updating the storeFiles 1167 // or changing the memstore. Let us release it before calling 1168 // notifyChangeReadersObservers. See HBASE-4485 for a possible 1169 // deadlock scenario that could have happened if continue to hold 1170 // the lock. 1171 this.lock.writeLock().unlock(); 1172 } 1173 // notify to be called here - only in case of flushes 1174 notifyChangedReadersObservers(sfs); 1175 if (LOG.isTraceEnabled()) { 1176 long totalSize = getTotalSize(sfs); 1177 String traceMessage = "FLUSH time,count,size,store size,store files [" 1178 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize 1179 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 1180 LOG.trace(traceMessage); 1181 } 1182 return needsCompaction(); 1183 } 1184 1185 /** 1186 * Notify all observers that set of Readers has changed. 1187 * @throws IOException 1188 */ 1189 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 1190 for (ChangedReadersObserver o : this.changedReaderObservers) { 1191 List<KeyValueScanner> memStoreScanners; 1192 this.lock.readLock().lock(); 1193 try { 1194 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 1195 } finally { 1196 this.lock.readLock().unlock(); 1197 } 1198 o.updateReaders(sfs, memStoreScanners); 1199 } 1200 } 1201 1202 /** 1203 * Get all scanners with no filtering based on TTL (that happens further down the line). 1204 * @param cacheBlocks cache the blocks or not 1205 * @param usePread true to use pread, false if not 1206 * @param isCompaction true if the scanner is created for compaction 1207 * @param matcher the scan query matcher 1208 * @param startRow the start row 1209 * @param stopRow the stop row 1210 * @param readPt the read point of the current scan 1211 * @return all scanners for this store 1212 */ 1213 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 1214 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 1215 throws IOException { 1216 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 1217 readPt); 1218 } 1219 1220 /** 1221 * Get all scanners with no filtering based on TTL (that happens further down the line). 1222 * @param cacheBlocks cache the blocks or not 1223 * @param usePread true to use pread, false if not 1224 * @param isCompaction true if the scanner is created for compaction 1225 * @param matcher the scan query matcher 1226 * @param startRow the start row 1227 * @param includeStartRow true to include start row, false if not 1228 * @param stopRow the stop row 1229 * @param includeStopRow true to include stop row, false if not 1230 * @param readPt the read point of the current scan 1231 * @return all scanners for this store 1232 */ 1233 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 1234 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 1235 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 1236 Collection<HStoreFile> storeFilesToScan; 1237 List<KeyValueScanner> memStoreScanners; 1238 this.lock.readLock().lock(); 1239 try { 1240 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 1241 includeStartRow, stopRow, includeStopRow); 1242 memStoreScanners = this.memstore.getScanners(readPt); 1243 } finally { 1244 this.lock.readLock().unlock(); 1245 } 1246 1247 try { 1248 // First the store file scanners 1249 1250 // TODO this used to get the store files in descending order, 1251 // but now we get them in ascending order, which I think is 1252 // actually more correct, since memstore get put at the end. 1253 List<StoreFileScanner> sfScanners = StoreFileScanner 1254 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, 1255 matcher, readPt); 1256 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1257 scanners.addAll(sfScanners); 1258 // Then the memstore scanners 1259 scanners.addAll(memStoreScanners); 1260 return scanners; 1261 } catch (Throwable t) { 1262 clearAndClose(memStoreScanners); 1263 throw t instanceof IOException ? (IOException) t : new IOException(t); 1264 } 1265 } 1266 1267 private static void clearAndClose(List<KeyValueScanner> scanners) { 1268 if (scanners == null) { 1269 return; 1270 } 1271 for (KeyValueScanner s : scanners) { 1272 s.close(); 1273 } 1274 scanners.clear(); 1275 } 1276 1277 /** 1278 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1279 * (that happens further down the line). 1280 * @param files the list of files on which the scanners has to be created 1281 * @param cacheBlocks cache the blocks or not 1282 * @param usePread true to use pread, false if not 1283 * @param isCompaction true if the scanner is created for compaction 1284 * @param matcher the scan query matcher 1285 * @param startRow the start row 1286 * @param stopRow the stop row 1287 * @param readPt the read point of the current scan 1288 * @param includeMemstoreScanner true if memstore has to be included 1289 * @return scanners on the given files and on the memstore if specified 1290 */ 1291 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1292 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1293 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1294 throws IOException { 1295 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1296 false, readPt, includeMemstoreScanner); 1297 } 1298 1299 /** 1300 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1301 * (that happens further down the line). 1302 * @param files the list of files on which the scanners has to be created 1303 * @param cacheBlocks ache the blocks or not 1304 * @param usePread true to use pread, false if not 1305 * @param isCompaction true if the scanner is created for compaction 1306 * @param matcher the scan query matcher 1307 * @param startRow the start row 1308 * @param includeStartRow true to include start row, false if not 1309 * @param stopRow the stop row 1310 * @param includeStopRow true to include stop row, false if not 1311 * @param readPt the read point of the current scan 1312 * @param includeMemstoreScanner true if memstore has to be included 1313 * @return scanners on the given files and on the memstore if specified 1314 */ 1315 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1316 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1317 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1318 boolean includeMemstoreScanner) throws IOException { 1319 List<KeyValueScanner> memStoreScanners = null; 1320 if (includeMemstoreScanner) { 1321 this.lock.readLock().lock(); 1322 try { 1323 memStoreScanners = this.memstore.getScanners(readPt); 1324 } finally { 1325 this.lock.readLock().unlock(); 1326 } 1327 } 1328 try { 1329 List<StoreFileScanner> sfScanners = StoreFileScanner 1330 .getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher, 1331 readPt); 1332 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1333 scanners.addAll(sfScanners); 1334 // Then the memstore scanners 1335 if (memStoreScanners != null) { 1336 scanners.addAll(memStoreScanners); 1337 } 1338 return scanners; 1339 } catch (Throwable t) { 1340 clearAndClose(memStoreScanners); 1341 throw t instanceof IOException ? (IOException) t : new IOException(t); 1342 } 1343 } 1344 1345 /** 1346 * @param o Observer who wants to know about changes in set of Readers 1347 */ 1348 public void addChangedReaderObserver(ChangedReadersObserver o) { 1349 this.changedReaderObservers.add(o); 1350 } 1351 1352 /** 1353 * @param o Observer no longer interested in changes in set of Readers. 1354 */ 1355 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1356 // We don't check if observer present; it may not be (legitimately) 1357 this.changedReaderObservers.remove(o); 1358 } 1359 1360 ////////////////////////////////////////////////////////////////////////////// 1361 // Compaction 1362 ////////////////////////////////////////////////////////////////////////////// 1363 1364 /** 1365 * Compact the StoreFiles. This method may take some time, so the calling 1366 * thread must be able to block for long periods. 1367 * 1368 * <p>During this time, the Store can work as usual, getting values from 1369 * StoreFiles and writing new StoreFiles from the memstore. 1370 * 1371 * Existing StoreFiles are not destroyed until the new compacted StoreFile is 1372 * completely written-out to disk. 1373 * 1374 * <p>The compactLock prevents multiple simultaneous compactions. 1375 * The structureLock prevents us from interfering with other write operations. 1376 * 1377 * <p>We don't want to hold the structureLock for the whole time, as a compact() 1378 * can be lengthy and we want to allow cache-flushes during this period. 1379 * 1380 * <p> Compaction event should be idempotent, since there is no IO Fencing for 1381 * the region directory in hdfs. A region server might still try to complete the 1382 * compaction after it lost the region. That is why the following events are carefully 1383 * ordered for a compaction: 1384 * 1. Compaction writes new files under region/.tmp directory (compaction output) 1385 * 2. Compaction atomically moves the temporary file under region directory 1386 * 3. Compaction appends a WAL edit containing the compaction input and output files. 1387 * Forces sync on WAL. 1388 * 4. Compaction deletes the input files from the region directory. 1389 * 1390 * Failure conditions are handled like this: 1391 * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes 1392 * the compaction later, it will only write the new data file to the region directory. 1393 * Since we already have this data, this will be idempotent but we will have a redundant 1394 * copy of the data. 1395 * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The 1396 * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. 1397 * - If RS fails after 3, the region region server who opens the region will pick up the 1398 * the compaction marker from the WAL and replay it by removing the compaction input files. 1399 * Failed RS can also attempt to delete those files, but the operation will be idempotent 1400 * 1401 * See HBASE-2231 for details. 1402 * 1403 * @param compaction compaction details obtained from requestCompaction() 1404 * @throws IOException 1405 * @return Storefile we compacted into or null if we failed or opted out early. 1406 */ 1407 public List<HStoreFile> compact(CompactionContext compaction, 1408 ThroughputController throughputController, User user) throws IOException { 1409 assert compaction != null; 1410 List<HStoreFile> sfs = null; 1411 CompactionRequestImpl cr = compaction.getRequest(); 1412 try { 1413 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1414 // because we need to clean up after it on the way out in a finally 1415 // block below 1416 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1417 assert compaction.hasSelection(); 1418 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1419 assert !filesToCompact.isEmpty(); 1420 synchronized (filesCompacting) { 1421 // sanity check: we're compacting files that this store knows about 1422 // TODO: change this to LOG.error() after more debugging 1423 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1424 } 1425 1426 // Ready to go. Have list of files to compact. 1427 LOG.info("Starting compaction of " + filesToCompact + 1428 " into tmpdir=" + fs.getTempDir() + ", totalSize=" + 1429 TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1430 1431 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1432 compaction.compact(throughputController, user)); 1433 } finally { 1434 finishCompactionRequest(cr); 1435 } 1436 } 1437 1438 @VisibleForTesting 1439 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1440 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 1441 List<Path> newFiles) throws IOException { 1442 // Do the steps necessary to complete the compaction. 1443 List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); 1444 writeCompactionWalRecord(filesToCompact, sfs); 1445 replaceStoreFiles(filesToCompact, sfs); 1446 if (cr.isMajor()) { 1447 majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1448 majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1449 } else { 1450 compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1451 compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1452 } 1453 long outputBytes = getTotalSize(sfs); 1454 1455 // At this point the store will use new files for all new scanners. 1456 completeCompaction(filesToCompact); // update store size. 1457 1458 long now = EnvironmentEdgeManager.currentTime(); 1459 if (region.getRegionServerServices() != null 1460 && region.getRegionServerServices().getMetrics() != null) { 1461 region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(), 1462 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1463 outputBytes); 1464 } 1465 1466 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1467 return sfs; 1468 } 1469 1470 private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles, 1471 User user) throws IOException { 1472 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 1473 for (Path newFile : newFiles) { 1474 assert newFile != null; 1475 HStoreFile sf = moveFileIntoPlace(newFile); 1476 if (this.getCoprocessorHost() != null) { 1477 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1478 } 1479 assert sf != null; 1480 sfs.add(sf); 1481 } 1482 return sfs; 1483 } 1484 1485 // Package-visible for tests 1486 HStoreFile moveFileIntoPlace(Path newFile) throws IOException { 1487 validateStoreFile(newFile); 1488 // Move the file into the right spot 1489 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); 1490 return createStoreFileAndReader(destPath); 1491 } 1492 1493 /** 1494 * Writes the compaction WAL record. 1495 * @param filesCompacted Files compacted (input). 1496 * @param newFiles Files from compaction. 1497 */ 1498 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1499 Collection<HStoreFile> newFiles) throws IOException { 1500 if (region.getWAL() == null) { 1501 return; 1502 } 1503 List<Path> inputPaths = 1504 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1505 List<Path> outputPaths = 1506 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1507 RegionInfo info = this.region.getRegionInfo(); 1508 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1509 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1510 // Fix reaching into Region to get the maxWaitForSeqId. 1511 // Does this method belong in Region altogether given it is making so many references up there? 1512 // Could be Region#writeCompactionMarker(compactionDescriptor); 1513 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1514 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1515 } 1516 1517 @VisibleForTesting 1518 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) 1519 throws IOException { 1520 this.lock.writeLock().lock(); 1521 try { 1522 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); 1523 synchronized (filesCompacting) { 1524 filesCompacting.removeAll(compactedFiles); 1525 } 1526 1527 // These may be null when the RS is shutting down. The space quota Chores will fix the Region 1528 // sizes later so it's not super-critical if we miss these. 1529 RegionServerServices rsServices = region.getRegionServerServices(); 1530 if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { 1531 updateSpaceQuotaAfterFileReplacement( 1532 rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), 1533 compactedFiles, result); 1534 } 1535 } finally { 1536 this.lock.writeLock().unlock(); 1537 } 1538 } 1539 1540 /** 1541 * Updates the space quota usage for this region, removing the size for files compacted away 1542 * and adding in the size for new files. 1543 * 1544 * @param sizeStore The object tracking changes in region size for space quotas. 1545 * @param regionInfo The identifier for the region whose size is being updated. 1546 * @param oldFiles Files removed from this store's region. 1547 * @param newFiles Files added to this store's region. 1548 */ 1549 void updateSpaceQuotaAfterFileReplacement( 1550 RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles, 1551 Collection<HStoreFile> newFiles) { 1552 long delta = 0; 1553 if (oldFiles != null) { 1554 for (HStoreFile compactedFile : oldFiles) { 1555 if (compactedFile.isHFile()) { 1556 delta -= compactedFile.getReader().length(); 1557 } 1558 } 1559 } 1560 if (newFiles != null) { 1561 for (HStoreFile newFile : newFiles) { 1562 if (newFile.isHFile()) { 1563 delta += newFile.getReader().length(); 1564 } 1565 } 1566 } 1567 sizeStore.incrementRegionSize(regionInfo, delta); 1568 } 1569 1570 /** 1571 * Log a very elaborate compaction completion message. 1572 * @param cr Request. 1573 * @param sfs Resulting files. 1574 * @param compactionStartTime Start time. 1575 */ 1576 private void logCompactionEndMessage( 1577 CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) { 1578 StringBuilder message = new StringBuilder( 1579 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " 1580 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " 1581 + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1582 if (sfs.isEmpty()) { 1583 message.append("none, "); 1584 } else { 1585 for (HStoreFile sf: sfs) { 1586 message.append(sf.getPath().getName()); 1587 message.append("(size="); 1588 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1589 message.append("), "); 1590 } 1591 } 1592 message.append("total size for store is ") 1593 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1594 .append(". This selection was in queue for ") 1595 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1596 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1597 .append(" to execute."); 1598 LOG.info(message.toString()); 1599 if (LOG.isTraceEnabled()) { 1600 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1601 long resultSize = getTotalSize(sfs); 1602 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1603 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1604 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1605 LOG.trace(traceMessage); 1606 } 1607 } 1608 1609 /** 1610 * Call to complete a compaction. Its for the case where we find in the WAL a compaction 1611 * that was not finished. We could find one recovering a WAL after a regionserver crash. 1612 * See HBASE-2231. 1613 * @param compaction 1614 */ 1615 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1616 boolean removeFiles) throws IOException { 1617 LOG.debug("Completing compaction from the WAL marker"); 1618 List<String> compactionInputs = compaction.getCompactionInputList(); 1619 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1620 1621 // The Compaction Marker is written after the compaction is completed, 1622 // and the files moved into the region/family folder. 1623 // 1624 // If we crash after the entry is written, we may not have removed the 1625 // input files, but the output file is present. 1626 // (The unremoved input files will be removed by this function) 1627 // 1628 // If we scan the directory and the file is not present, it can mean that: 1629 // - The file was manually removed by the user 1630 // - The file was removed as consequence of subsequent compaction 1631 // so, we can't do anything with the "compaction output list" because those 1632 // files have already been loaded when opening the region (by virtue of 1633 // being in the store's folder) or they may be missing due to a compaction. 1634 1635 String familyName = this.getColumnFamilyName(); 1636 Set<String> inputFiles = new HashSet<>(); 1637 for (String compactionInput : compactionInputs) { 1638 Path inputPath = fs.getStoreFilePath(familyName, compactionInput); 1639 inputFiles.add(inputPath.getName()); 1640 } 1641 1642 //some of the input files might already be deleted 1643 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1644 for (HStoreFile sf : this.getStorefiles()) { 1645 if (inputFiles.contains(sf.getPath().getName())) { 1646 inputStoreFiles.add(sf); 1647 } 1648 } 1649 1650 // check whether we need to pick up the new files 1651 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1652 1653 if (pickCompactionFiles) { 1654 for (HStoreFile sf : this.getStorefiles()) { 1655 compactionOutputs.remove(sf.getPath().getName()); 1656 } 1657 for (String compactionOutput : compactionOutputs) { 1658 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1659 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 1660 outputStoreFiles.add(storeFile); 1661 } 1662 } 1663 1664 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1665 LOG.info("Replaying compaction marker, replacing input files: " + 1666 inputStoreFiles + " with output files : " + outputStoreFiles); 1667 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); 1668 this.completeCompaction(inputStoreFiles); 1669 } 1670 } 1671 1672 /** 1673 * This method tries to compact N recent files for testing. 1674 * Note that because compacting "recent" files only makes sense for some policies, 1675 * e.g. the default one, it assumes default policy is used. It doesn't use policy, 1676 * but instead makes a compaction candidate list by itself. 1677 * @param N Number of files. 1678 */ 1679 @VisibleForTesting 1680 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { 1681 List<HStoreFile> filesToCompact; 1682 boolean isMajor; 1683 1684 this.lock.readLock().lock(); 1685 try { 1686 synchronized (filesCompacting) { 1687 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles()); 1688 if (!filesCompacting.isEmpty()) { 1689 // exclude all files older than the newest file we're currently 1690 // compacting. this allows us to preserve contiguity (HBASE-2856) 1691 HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); 1692 int idx = filesToCompact.indexOf(last); 1693 Preconditions.checkArgument(idx != -1); 1694 filesToCompact.subList(0, idx + 1).clear(); 1695 } 1696 int count = filesToCompact.size(); 1697 if (N > count) { 1698 throw new RuntimeException("Not enough files"); 1699 } 1700 1701 filesToCompact = filesToCompact.subList(count - N, count); 1702 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); 1703 filesCompacting.addAll(filesToCompact); 1704 Collections.sort(filesCompacting, storeEngine.getStoreFileManager() 1705 .getStoreFileComparator()); 1706 } 1707 } finally { 1708 this.lock.readLock().unlock(); 1709 } 1710 1711 try { 1712 // Ready to go. Have list of files to compact. 1713 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor()) 1714 .compactForTesting(filesToCompact, isMajor); 1715 for (Path newFile: newFiles) { 1716 // Move the compaction into place. 1717 HStoreFile sf = moveFileIntoPlace(newFile); 1718 if (this.getCoprocessorHost() != null) { 1719 this.getCoprocessorHost().postCompact(this, sf, null, null, null); 1720 } 1721 replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); 1722 completeCompaction(filesToCompact); 1723 } 1724 } finally { 1725 synchronized (filesCompacting) { 1726 filesCompacting.removeAll(filesToCompact); 1727 } 1728 } 1729 } 1730 1731 @Override 1732 public boolean hasReferences() { 1733 // Grab the read lock here, because we need to ensure that: only when the atomic 1734 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1735 this.lock.readLock().lock(); 1736 try { 1737 // Merge the current store files with compacted files here due to HBASE-20940. 1738 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1739 allStoreFiles.addAll(getCompactedFiles()); 1740 return StoreUtils.hasReferences(allStoreFiles); 1741 } finally { 1742 this.lock.readLock().unlock(); 1743 } 1744 } 1745 1746 /** 1747 * getter for CompactionProgress object 1748 * @return CompactionProgress object; can be null 1749 */ 1750 public CompactionProgress getCompactionProgress() { 1751 return this.storeEngine.getCompactor().getProgress(); 1752 } 1753 1754 @Override 1755 public boolean shouldPerformMajorCompaction() throws IOException { 1756 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1757 // TODO: what are these reader checks all over the place? 1758 if (sf.getReader() == null) { 1759 LOG.debug("StoreFile {} has null Reader", sf); 1760 return false; 1761 } 1762 } 1763 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( 1764 this.storeEngine.getStoreFileManager().getStorefiles()); 1765 } 1766 1767 public Optional<CompactionContext> requestCompaction() throws IOException { 1768 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1769 } 1770 1771 public Optional<CompactionContext> requestCompaction(int priority, 1772 CompactionLifeCycleTracker tracker, User user) throws IOException { 1773 // don't even select for compaction if writes are disabled 1774 if (!this.areWritesEnabled()) { 1775 return Optional.empty(); 1776 } 1777 // Before we do compaction, try to get rid of unneeded files to simplify things. 1778 removeUnneededFiles(); 1779 1780 final CompactionContext compaction = storeEngine.createCompaction(); 1781 CompactionRequestImpl request = null; 1782 this.lock.readLock().lock(); 1783 try { 1784 synchronized (filesCompacting) { 1785 // First, see if coprocessor would want to override selection. 1786 if (this.getCoprocessorHost() != null) { 1787 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1788 boolean override = getCoprocessorHost().preCompactSelection(this, 1789 candidatesForCoproc, tracker, user); 1790 if (override) { 1791 // Coprocessor is overriding normal file selection. 1792 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1793 } 1794 } 1795 1796 // Normal case - coprocessor is not overriding file selection. 1797 if (!compaction.hasSelection()) { 1798 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1799 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && 1800 offPeakCompactionTracker.compareAndSet(false, true); 1801 try { 1802 compaction.select(this.filesCompacting, isUserCompaction, 1803 mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); 1804 } catch (IOException e) { 1805 if (mayUseOffPeak) { 1806 offPeakCompactionTracker.set(false); 1807 } 1808 throw e; 1809 } 1810 assert compaction.hasSelection(); 1811 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1812 // Compaction policy doesn't want to take advantage of off-peak. 1813 offPeakCompactionTracker.set(false); 1814 } 1815 } 1816 if (this.getCoprocessorHost() != null) { 1817 this.getCoprocessorHost().postCompactSelection( 1818 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1819 compaction.getRequest(), user); 1820 } 1821 // Finally, we have the resulting files list. Check if we have any files at all. 1822 request = compaction.getRequest(); 1823 Collection<HStoreFile> selectedFiles = request.getFiles(); 1824 if (selectedFiles.isEmpty()) { 1825 return Optional.empty(); 1826 } 1827 1828 addToCompactingFiles(selectedFiles); 1829 1830 // If we're enqueuing a major, clear the force flag. 1831 this.forceMajor = this.forceMajor && !request.isMajor(); 1832 1833 // Set common request properties. 1834 // Set priority, either override value supplied by caller or from store. 1835 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); 1836 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1837 request.setTracker(tracker); 1838 } 1839 } finally { 1840 this.lock.readLock().unlock(); 1841 } 1842 1843 if (LOG.isDebugEnabled()) { 1844 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() 1845 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1846 + (request.isAllFiles() ? " (all files)" : "")); 1847 } 1848 this.region.reportCompactionRequestStart(request.isMajor()); 1849 return Optional.of(compaction); 1850 } 1851 1852 /** Adds the files to compacting files. filesCompacting must be locked. */ 1853 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1854 if (CollectionUtils.isEmpty(filesToAdd)) { 1855 return; 1856 } 1857 // Check that we do not try to compact the same StoreFile twice. 1858 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1859 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1860 } 1861 filesCompacting.addAll(filesToAdd); 1862 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1863 } 1864 1865 private void removeUnneededFiles() throws IOException { 1866 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; 1867 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1868 LOG.debug("Skipping expired store file removal due to min version being {}", 1869 getColumnFamilyDescriptor().getMinVersions()); 1870 return; 1871 } 1872 this.lock.readLock().lock(); 1873 Collection<HStoreFile> delSfs = null; 1874 try { 1875 synchronized (filesCompacting) { 1876 long cfTtl = getStoreFileTtl(); 1877 if (cfTtl != Long.MAX_VALUE) { 1878 delSfs = storeEngine.getStoreFileManager().getUnneededFiles( 1879 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1880 addToCompactingFiles(delSfs); 1881 } 1882 } 1883 } finally { 1884 this.lock.readLock().unlock(); 1885 } 1886 1887 if (CollectionUtils.isEmpty(delSfs)) { 1888 return; 1889 } 1890 1891 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1892 writeCompactionWalRecord(delSfs, newFiles); 1893 replaceStoreFiles(delSfs, newFiles); 1894 completeCompaction(delSfs); 1895 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " 1896 + this + " of " + this.getRegionInfo().getRegionNameAsString() 1897 + "; total size for store is " 1898 + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1899 } 1900 1901 public void cancelRequestedCompaction(CompactionContext compaction) { 1902 finishCompactionRequest(compaction.getRequest()); 1903 } 1904 1905 private void finishCompactionRequest(CompactionRequestImpl cr) { 1906 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1907 if (cr.isOffPeak()) { 1908 offPeakCompactionTracker.set(false); 1909 cr.setOffPeak(false); 1910 } 1911 synchronized (filesCompacting) { 1912 filesCompacting.removeAll(cr.getFiles()); 1913 } 1914 } 1915 1916 /** 1917 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 1918 * operation. 1919 * @param path the path to the store file 1920 */ 1921 private void validateStoreFile(Path path) throws IOException { 1922 HStoreFile storeFile = null; 1923 try { 1924 storeFile = createStoreFileAndReader(path); 1925 } catch (IOException e) { 1926 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 1927 throw e; 1928 } finally { 1929 if (storeFile != null) { 1930 storeFile.closeStoreFile(false); 1931 } 1932 } 1933 } 1934 1935 /** 1936 * Update counts. 1937 * @param compactedFiles list of files that were compacted 1938 */ 1939 @VisibleForTesting 1940 protected void completeCompaction(Collection<HStoreFile> compactedFiles) 1941 // Rename this method! TODO. 1942 throws IOException { 1943 this.storeSize.set(0L); 1944 this.totalUncompressedBytes.set(0L); 1945 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1946 StoreFileReader r = hsf.getReader(); 1947 if (r == null) { 1948 LOG.warn("StoreFile {} has a null Reader", hsf); 1949 continue; 1950 } 1951 this.storeSize.addAndGet(r.length()); 1952 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1953 } 1954 } 1955 1956 /* 1957 * @param wantedVersions How many versions were asked for. 1958 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1959 */ 1960 int versionsToReturn(final int wantedVersions) { 1961 if (wantedVersions <= 0) { 1962 throw new IllegalArgumentException("Number of versions must be > 0"); 1963 } 1964 // Make sure we do not return more than maximum versions for this store. 1965 int maxVersions = this.family.getMaxVersions(); 1966 return wantedVersions > maxVersions ? maxVersions: wantedVersions; 1967 } 1968 1969 @Override 1970 public boolean canSplit() { 1971 this.lock.readLock().lock(); 1972 try { 1973 // Not split-able if we find a reference store file present in the store. 1974 boolean result = !hasReferences(); 1975 if (!result) { 1976 LOG.trace("Not splittable; has references: {}", this); 1977 } 1978 return result; 1979 } finally { 1980 this.lock.readLock().unlock(); 1981 } 1982 } 1983 1984 /** 1985 * Determines if Store should be split. 1986 */ 1987 public Optional<byte[]> getSplitPoint() { 1988 this.lock.readLock().lock(); 1989 try { 1990 // Should already be enforced by the split policy! 1991 assert !this.getRegionInfo().isMetaRegion(); 1992 // Not split-able if we find a reference store file present in the store. 1993 if (hasReferences()) { 1994 LOG.trace("Not splittable; has references: {}", this); 1995 return Optional.empty(); 1996 } 1997 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1998 } catch(IOException e) { 1999 LOG.warn("Failed getting store size for {}", this, e); 2000 } finally { 2001 this.lock.readLock().unlock(); 2002 } 2003 return Optional.empty(); 2004 } 2005 2006 @Override 2007 public long getLastCompactSize() { 2008 return this.lastCompactSize; 2009 } 2010 2011 @Override 2012 public long getSize() { 2013 return storeSize.get(); 2014 } 2015 2016 public void triggerMajorCompaction() { 2017 this.forceMajor = true; 2018 } 2019 2020 ////////////////////////////////////////////////////////////////////////////// 2021 // File administration 2022 ////////////////////////////////////////////////////////////////////////////// 2023 2024 /** 2025 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 2026 * compaction. 2027 * @param scan Scan to apply when scanning the stores 2028 * @param targetCols columns to scan 2029 * @return a scanner over the current key values 2030 * @throws IOException on failure 2031 */ 2032 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 2033 throws IOException { 2034 lock.readLock().lock(); 2035 try { 2036 ScanInfo scanInfo; 2037 if (this.getCoprocessorHost() != null) { 2038 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this); 2039 } else { 2040 scanInfo = getScanInfo(); 2041 } 2042 return createScanner(scan, scanInfo, targetCols, readPt); 2043 } finally { 2044 lock.readLock().unlock(); 2045 } 2046 } 2047 2048 // HMobStore will override this method to return its own implementation. 2049 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 2050 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 2051 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 2052 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 2053 } 2054 2055 /** 2056 * Recreates the scanners on the current list of active store file scanners 2057 * @param currentFileScanners the current set of active store file scanners 2058 * @param cacheBlocks cache the blocks or not 2059 * @param usePread use pread or not 2060 * @param isCompaction is the scanner for compaction 2061 * @param matcher the scan query matcher 2062 * @param startRow the scan's start row 2063 * @param includeStartRow should the scan include the start row 2064 * @param stopRow the scan's stop row 2065 * @param includeStopRow should the scan include the stop row 2066 * @param readPt the read point of the current scane 2067 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 2068 * @return list of scanners recreated on the current Scanners 2069 * @throws IOException 2070 */ 2071 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 2072 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 2073 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 2074 boolean includeMemstoreScanner) throws IOException { 2075 this.lock.readLock().lock(); 2076 try { 2077 Map<String, HStoreFile> name2File = 2078 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 2079 for (HStoreFile file : getStorefiles()) { 2080 name2File.put(file.getFileInfo().getActiveFileName(), file); 2081 } 2082 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 2083 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 2084 name2File.put(file.getFileInfo().getActiveFileName(), file); 2085 } 2086 List<HStoreFile> filesToReopen = new ArrayList<>(); 2087 for (KeyValueScanner kvs : currentFileScanners) { 2088 assert kvs.isFileScanner(); 2089 if (kvs.peek() == null) { 2090 continue; 2091 } 2092 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 2093 } 2094 if (filesToReopen.isEmpty()) { 2095 return null; 2096 } 2097 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 2098 includeStartRow, stopRow, includeStopRow, readPt, false); 2099 } finally { 2100 this.lock.readLock().unlock(); 2101 } 2102 } 2103 2104 @Override 2105 public String toString() { 2106 return this.getColumnFamilyName(); 2107 } 2108 2109 @Override 2110 public int getStorefilesCount() { 2111 return this.storeEngine.getStoreFileManager().getStorefileCount(); 2112 } 2113 2114 @Override 2115 public int getCompactedFilesCount() { 2116 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 2117 } 2118 2119 private LongStream getStoreFileAgeStream() { 2120 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2121 if (sf.getReader() == null) { 2122 LOG.warn("StoreFile {} has a null Reader", sf); 2123 return false; 2124 } else { 2125 return true; 2126 } 2127 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 2128 .map(t -> EnvironmentEdgeManager.currentTime() - t); 2129 } 2130 2131 @Override 2132 public OptionalLong getMaxStoreFileAge() { 2133 return getStoreFileAgeStream().max(); 2134 } 2135 2136 @Override 2137 public OptionalLong getMinStoreFileAge() { 2138 return getStoreFileAgeStream().min(); 2139 } 2140 2141 @Override 2142 public OptionalDouble getAvgStoreFileAge() { 2143 return getStoreFileAgeStream().average(); 2144 } 2145 2146 @Override 2147 public long getNumReferenceFiles() { 2148 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2149 .filter(HStoreFile::isReference).count(); 2150 } 2151 2152 @Override 2153 public long getNumHFiles() { 2154 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2155 .filter(HStoreFile::isHFile).count(); 2156 } 2157 2158 @Override 2159 public long getStoreSizeUncompressed() { 2160 return this.totalUncompressedBytes.get(); 2161 } 2162 2163 @Override 2164 public long getStorefilesSize() { 2165 // Include all StoreFiles 2166 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); 2167 } 2168 2169 @Override 2170 public long getHFilesSize() { 2171 // Include only StoreFiles which are HFiles 2172 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 2173 HStoreFile::isHFile); 2174 } 2175 2176 private long getTotalUncompressedBytes(List<HStoreFile> files) { 2177 return files.stream() 2178 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) 2179 .sum(); 2180 } 2181 2182 private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) { 2183 return files.stream().filter(predicate) 2184 .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); 2185 } 2186 2187 private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) { 2188 if (file == null) { 2189 return 0L; 2190 } 2191 StoreFileReader reader = file.getReader(); 2192 if (reader == null) { 2193 return 0L; 2194 } 2195 return f.applyAsLong(reader); 2196 } 2197 2198 private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) { 2199 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2200 .mapToLong(file -> getStorefileFieldSize(file, f)).sum(); 2201 } 2202 2203 @Override 2204 public long getStorefilesRootLevelIndexSize() { 2205 return getStorefilesFieldSize(StoreFileReader::indexSize); 2206 } 2207 2208 @Override 2209 public long getTotalStaticIndexSize() { 2210 return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize); 2211 } 2212 2213 @Override 2214 public long getTotalStaticBloomSize() { 2215 return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize); 2216 } 2217 2218 @Override 2219 public MemStoreSize getMemStoreSize() { 2220 return this.memstore.size(); 2221 } 2222 2223 @Override 2224 public int getCompactPriority() { 2225 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 2226 if (priority == PRIORITY_USER) { 2227 LOG.warn("Compaction priority is USER despite there being no user compaction"); 2228 } 2229 return priority; 2230 } 2231 2232 public boolean throttleCompaction(long compactionSize) { 2233 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 2234 } 2235 2236 public HRegion getHRegion() { 2237 return this.region; 2238 } 2239 2240 public RegionCoprocessorHost getCoprocessorHost() { 2241 return this.region.getCoprocessorHost(); 2242 } 2243 2244 @Override 2245 public RegionInfo getRegionInfo() { 2246 return this.fs.getRegionInfo(); 2247 } 2248 2249 @Override 2250 public boolean areWritesEnabled() { 2251 return this.region.areWritesEnabled(); 2252 } 2253 2254 @Override 2255 public long getSmallestReadPoint() { 2256 return this.region.getSmallestReadPoint(); 2257 } 2258 2259 /** 2260 * Adds or replaces the specified KeyValues. 2261 * <p> 2262 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 2263 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 2264 * <p> 2265 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 2266 * across all of them. 2267 * @param readpoint readpoint below which we can safely remove duplicate KVs 2268 * @throws IOException 2269 */ 2270 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 2271 throws IOException { 2272 this.lock.readLock().lock(); 2273 try { 2274 this.memstore.upsert(cells, readpoint, memstoreSizing); 2275 } finally { 2276 this.lock.readLock().unlock(); 2277 } 2278 } 2279 2280 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 2281 return new StoreFlusherImpl(cacheFlushId, tracker); 2282 } 2283 2284 private final class StoreFlusherImpl implements StoreFlushContext { 2285 2286 private final FlushLifeCycleTracker tracker; 2287 private final long cacheFlushSeqNum; 2288 private MemStoreSnapshot snapshot; 2289 private List<Path> tempFiles; 2290 private List<Path> committedFiles; 2291 private long cacheFlushCount; 2292 private long cacheFlushSize; 2293 private long outputFileSize; 2294 2295 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 2296 this.cacheFlushSeqNum = cacheFlushSeqNum; 2297 this.tracker = tracker; 2298 } 2299 2300 /** 2301 * This is not thread safe. The caller should have a lock on the region or the store. 2302 * If necessary, the lock can be added with the patch provided in HBASE-10087 2303 */ 2304 @Override 2305 public MemStoreSize prepare() { 2306 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 2307 this.snapshot = memstore.snapshot(); 2308 this.cacheFlushCount = snapshot.getCellsCount(); 2309 this.cacheFlushSize = snapshot.getDataSize(); 2310 committedFiles = new ArrayList<>(1); 2311 return snapshot.getMemStoreSize(); 2312 } 2313 2314 @Override 2315 public void flushCache(MonitoredTask status) throws IOException { 2316 RegionServerServices rsService = region.getRegionServerServices(); 2317 ThroughputController throughputController = 2318 rsService == null ? null : rsService.getFlushThroughputController(); 2319 tempFiles = 2320 HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); 2321 } 2322 2323 @Override 2324 public boolean commit(MonitoredTask status) throws IOException { 2325 if (CollectionUtils.isEmpty(this.tempFiles)) { 2326 return false; 2327 } 2328 List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); 2329 for (Path storeFilePath : tempFiles) { 2330 try { 2331 HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); 2332 outputFileSize += sf.getReader().length(); 2333 storeFiles.add(sf); 2334 } catch (IOException ex) { 2335 LOG.error("Failed to commit store file {}", storeFilePath, ex); 2336 // Try to delete the files we have committed before. 2337 for (HStoreFile sf : storeFiles) { 2338 Path pathToDelete = sf.getPath(); 2339 try { 2340 sf.deleteStoreFile(); 2341 } catch (IOException deleteEx) { 2342 LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " 2343 + "halting {}", pathToDelete, ex); 2344 Runtime.getRuntime().halt(1); 2345 } 2346 } 2347 throw new IOException("Failed to commit the flush", ex); 2348 } 2349 } 2350 2351 for (HStoreFile sf : storeFiles) { 2352 if (HStore.this.getCoprocessorHost() != null) { 2353 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2354 } 2355 committedFiles.add(sf.getPath()); 2356 } 2357 2358 HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); 2359 HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); 2360 HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); 2361 2362 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2363 return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); 2364 } 2365 2366 @Override 2367 public long getOutputFileSize() { 2368 return outputFileSize; 2369 } 2370 2371 @Override 2372 public List<Path> getCommittedFiles() { 2373 return committedFiles; 2374 } 2375 2376 /** 2377 * Similar to commit, but called in secondary region replicas for replaying the 2378 * flush cache from primary region. Adds the new files to the store, and drops the 2379 * snapshot depending on dropMemstoreSnapshot argument. 2380 * @param fileNames names of the flushed files 2381 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2382 * @throws IOException 2383 */ 2384 @Override 2385 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2386 throws IOException { 2387 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2388 for (String file : fileNames) { 2389 // open the file as a store file (hfile link, etc) 2390 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); 2391 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 2392 storeFiles.add(storeFile); 2393 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2394 HStore.this.totalUncompressedBytes 2395 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2396 if (LOG.isInfoEnabled()) { 2397 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() + 2398 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + 2399 ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2400 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2401 } 2402 } 2403 2404 long snapshotId = -1; // -1 means do not drop 2405 if (dropMemstoreSnapshot && snapshot != null) { 2406 snapshotId = snapshot.getId(); 2407 snapshot.close(); 2408 } 2409 HStore.this.updateStorefiles(storeFiles, snapshotId); 2410 } 2411 2412 /** 2413 * Abort the snapshot preparation. Drops the snapshot if any. 2414 * @throws IOException 2415 */ 2416 @Override 2417 public void abort() throws IOException { 2418 if (snapshot != null) { 2419 //We need to close the snapshot when aborting, otherwise, the segment scanner 2420 //won't be closed. If we are using MSLAB, the chunk referenced by those scanners 2421 //can't be released, thus memory leak 2422 snapshot.close(); 2423 HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); 2424 } 2425 } 2426 } 2427 2428 @Override 2429 public boolean needsCompaction() { 2430 List<HStoreFile> filesCompactingClone = null; 2431 synchronized (filesCompacting) { 2432 filesCompactingClone = Lists.newArrayList(filesCompacting); 2433 } 2434 return this.storeEngine.needsCompaction(filesCompactingClone); 2435 } 2436 2437 /** 2438 * Used for tests. 2439 * @return cache configuration for this Store. 2440 */ 2441 @VisibleForTesting 2442 public CacheConfig getCacheConfig() { 2443 return this.cacheConf; 2444 } 2445 2446 public static final long FIXED_OVERHEAD = 2447 ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 2448 + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); 2449 2450 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD 2451 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK 2452 + ClassSize.CONCURRENT_SKIPLISTMAP 2453 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT 2454 + ScanInfo.FIXED_OVERHEAD); 2455 2456 @Override 2457 public long heapSize() { 2458 MemStoreSize memstoreSize = this.memstore.size(); 2459 return DEEP_OVERHEAD + memstoreSize.getHeapSize(); 2460 } 2461 2462 @Override 2463 public CellComparator getComparator() { 2464 return comparator; 2465 } 2466 2467 public ScanInfo getScanInfo() { 2468 return scanInfo; 2469 } 2470 2471 /** 2472 * Set scan info, used by test 2473 * @param scanInfo new scan info to use for test 2474 */ 2475 void setScanInfo(ScanInfo scanInfo) { 2476 this.scanInfo = scanInfo; 2477 } 2478 2479 @Override 2480 public boolean hasTooManyStoreFiles() { 2481 return getStorefilesCount() > this.blockingFileCount; 2482 } 2483 2484 @Override 2485 public long getFlushedCellsCount() { 2486 return flushedCellsCount.get(); 2487 } 2488 2489 @Override 2490 public long getFlushedCellsSize() { 2491 return flushedCellsSize.get(); 2492 } 2493 2494 @Override 2495 public long getFlushedOutputFileSize() { 2496 return flushedOutputFileSize.get(); 2497 } 2498 2499 @Override 2500 public long getCompactedCellsCount() { 2501 return compactedCellsCount.get(); 2502 } 2503 2504 @Override 2505 public long getCompactedCellsSize() { 2506 return compactedCellsSize.get(); 2507 } 2508 2509 @Override 2510 public long getMajorCompactedCellsCount() { 2511 return majorCompactedCellsCount.get(); 2512 } 2513 2514 @Override 2515 public long getMajorCompactedCellsSize() { 2516 return majorCompactedCellsSize.get(); 2517 } 2518 2519 /** 2520 * Returns the StoreEngine that is backing this concrete implementation of Store. 2521 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2522 */ 2523 @VisibleForTesting 2524 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2525 return this.storeEngine; 2526 } 2527 2528 protected OffPeakHours getOffPeakHours() { 2529 return this.offPeakHours; 2530 } 2531 2532 /** 2533 * {@inheritDoc} 2534 */ 2535 @Override 2536 public void onConfigurationChange(Configuration conf) { 2537 this.conf = new CompoundConfiguration() 2538 .add(conf) 2539 .addBytesMap(family.getValues()); 2540 this.storeEngine.compactionPolicy.setConf(conf); 2541 this.offPeakHours = OffPeakHours.getInstance(conf); 2542 } 2543 2544 /** 2545 * {@inheritDoc} 2546 */ 2547 @Override 2548 public void registerChildren(ConfigurationManager manager) { 2549 // No children to register 2550 } 2551 2552 /** 2553 * {@inheritDoc} 2554 */ 2555 @Override 2556 public void deregisterChildren(ConfigurationManager manager) { 2557 // No children to deregister 2558 } 2559 2560 @Override 2561 public double getCompactionPressure() { 2562 return storeEngine.getStoreFileManager().getCompactionPressure(); 2563 } 2564 2565 @Override 2566 public boolean isPrimaryReplicaStore() { 2567 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2568 } 2569 2570 /** 2571 * Sets the store up for a region level snapshot operation. 2572 * @see #postSnapshotOperation() 2573 */ 2574 public void preSnapshotOperation() { 2575 archiveLock.lock(); 2576 } 2577 2578 /** 2579 * Perform tasks needed after the completion of snapshot operation. 2580 * @see #preSnapshotOperation() 2581 */ 2582 public void postSnapshotOperation() { 2583 archiveLock.unlock(); 2584 } 2585 2586 /** 2587 * Closes and archives the compacted files under this store 2588 */ 2589 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2590 closeAndArchiveCompactedFiles(false); 2591 } 2592 2593 @VisibleForTesting 2594 public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException { 2595 // ensure other threads do not attempt to archive the same files on close() 2596 archiveLock.lock(); 2597 try { 2598 lock.readLock().lock(); 2599 Collection<HStoreFile> copyCompactedfiles = null; 2600 try { 2601 Collection<HStoreFile> compactedfiles = 2602 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2603 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2604 // Do a copy under read lock 2605 copyCompactedfiles = new ArrayList<>(compactedfiles); 2606 } else { 2607 LOG.trace("No compacted files to archive"); 2608 } 2609 } finally { 2610 lock.readLock().unlock(); 2611 } 2612 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2613 removeCompactedfiles(copyCompactedfiles, storeClosing); 2614 } 2615 } finally { 2616 archiveLock.unlock(); 2617 } 2618 } 2619 2620 /** 2621 * Archives and removes the compacted files 2622 * @param compactedfiles The compacted files in this store that are not active in reads 2623 */ 2624 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean storeClosing) 2625 throws IOException { 2626 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2627 final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); 2628 for (final HStoreFile file : compactedfiles) { 2629 synchronized (file) { 2630 try { 2631 StoreFileReader r = file.getReader(); 2632 if (r == null) { 2633 LOG.debug("The file {} was closed but still not archived", file); 2634 // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally, 2635 // we should know the size of an HStoreFile without having to ask the HStoreFileReader 2636 // for that. 2637 long length = getStoreFileSize(file); 2638 filesToRemove.add(file); 2639 storeFileSizes.add(length); 2640 continue; 2641 } 2642 2643 //Compacted files in the list should always be marked compacted away. In the event 2644 //they're contradicting in order to guarantee data consistency 2645 //should we choose one and ignore the other? 2646 if (storeClosing && !file.isCompactedAway()) { 2647 String msg = 2648 "Region closing but StoreFile is in compacted list but not compacted away: " + 2649 file.getPath(); 2650 throw new IllegalStateException(msg); 2651 } 2652 2653 //If store is closing we're ignoring any references to keep things consistent 2654 //and remove compacted storefiles from the region directory 2655 if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) { 2656 if (storeClosing && file.isReferencedInReads()) { 2657 LOG.warn("Region closing but StoreFile still has references: file={}, refCount={}", 2658 file.getPath(), r.getRefCount()); 2659 } 2660 // Even if deleting fails we need not bother as any new scanners won't be 2661 // able to use the compacted file as the status is already compactedAway 2662 LOG.trace("Closing and archiving the file {}", file); 2663 // Copy the file size before closing the reader 2664 final long length = r.length(); 2665 r.close(true); 2666 file.closeStreamReaders(true); 2667 // Just close and return 2668 filesToRemove.add(file); 2669 // Only add the length if we successfully added the file to `filesToRemove` 2670 storeFileSizes.add(length); 2671 } else { 2672 LOG.info("Can't archive compacted file " + file.getPath() 2673 + " because of either isCompactedAway=" + file.isCompactedAway() 2674 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2675 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2676 } 2677 } catch (Exception e) { 2678 String msg = "Exception while trying to close the compacted store file " + 2679 file.getPath(); 2680 if (storeClosing) { 2681 msg = "Store is closing. " + msg; 2682 } 2683 LOG.error(msg, e); 2684 //if we get an exception let caller know so it can abort the server 2685 if (storeClosing) { 2686 throw new IOException(msg, e); 2687 } 2688 } 2689 } 2690 } 2691 if (this.isPrimaryReplicaStore()) { 2692 // Only the primary region is allowed to move the file to archive. 2693 // The secondary region does not move the files to archive. Any active reads from 2694 // the secondary region will still work because the file as such has active readers on it. 2695 if (!filesToRemove.isEmpty()) { 2696 LOG.debug("Moving the files {} to archive", filesToRemove); 2697 // Only if this is successful it has to be removed 2698 try { 2699 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 2700 } catch (FailedArchiveException fae) { 2701 // Even if archiving some files failed, we still need to clear out any of the 2702 // files which were successfully archived. Otherwise we will receive a 2703 // FileNotFoundException when we attempt to re-archive them in the next go around. 2704 Collection<Path> failedFiles = fae.getFailedFiles(); 2705 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2706 Iterator<Long> sizeIter = storeFileSizes.iterator(); 2707 while (iter.hasNext()) { 2708 sizeIter.next(); 2709 if (failedFiles.contains(iter.next().getPath())) { 2710 iter.remove(); 2711 sizeIter.remove(); 2712 } 2713 } 2714 if (!filesToRemove.isEmpty()) { 2715 clearCompactedfiles(filesToRemove); 2716 } 2717 throw fae; 2718 } 2719 } 2720 } 2721 if (!filesToRemove.isEmpty()) { 2722 // Clear the compactedfiles from the store file manager 2723 clearCompactedfiles(filesToRemove); 2724 // Try to send report of this archival to the Master for updating quota usage faster 2725 reportArchivedFilesForQuota(filesToRemove, storeFileSizes); 2726 } 2727 } 2728 2729 /** 2730 * Computes the length of a store file without succumbing to any errors along the way. If an 2731 * error is encountered, the implementation returns {@code 0} instead of the actual size. 2732 * 2733 * @param file The file to compute the size of. 2734 * @return The size in bytes of the provided {@code file}. 2735 */ 2736 long getStoreFileSize(HStoreFile file) { 2737 long length = 0; 2738 try { 2739 file.initReader(); 2740 length = file.getReader().length(); 2741 } catch (IOException e) { 2742 LOG.trace("Failed to open reader when trying to compute store file size, ignoring", e); 2743 } finally { 2744 try { 2745 file.closeStoreFile( 2746 file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true); 2747 } catch (IOException e) { 2748 LOG.trace("Failed to close reader after computing store file size, ignoring", e); 2749 } 2750 } 2751 return length; 2752 } 2753 2754 public Long preFlushSeqIDEstimation() { 2755 return memstore.preFlushSeqIDEstimation(); 2756 } 2757 2758 @Override 2759 public boolean isSloppyMemStore() { 2760 return this.memstore.isSloppy(); 2761 } 2762 2763 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2764 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2765 try { 2766 lock.writeLock().lock(); 2767 this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); 2768 } finally { 2769 lock.writeLock().unlock(); 2770 } 2771 } 2772 2773 public int getCurrentParallelPutCount() { 2774 return currentParallelPutCount.get(); 2775 } 2776 2777 void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) { 2778 // Sanity check from the caller 2779 if (archivedFiles.size() != fileSizes.size()) { 2780 throw new RuntimeException("Coding error: should never see lists of varying size"); 2781 } 2782 RegionServerServices rss = this.region.getRegionServerServices(); 2783 if (rss == null) { 2784 return; 2785 } 2786 List<Entry<String,Long>> filesWithSizes = new ArrayList<>(archivedFiles.size()); 2787 Iterator<Long> fileSizeIter = fileSizes.iterator(); 2788 for (StoreFile storeFile : archivedFiles) { 2789 final long fileSize = fileSizeIter.next(); 2790 if (storeFile.isHFile() && fileSize != 0) { 2791 filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize)); 2792 } 2793 } 2794 if (LOG.isTraceEnabled()) { 2795 LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: " 2796 + filesWithSizes); 2797 } 2798 boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes); 2799 if (!success) { 2800 LOG.warn("Failed to report archival of files: " + filesWithSizes); 2801 } 2802 } 2803}