001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.net.InetSocketAddress; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.NavigableSet; 033import java.util.Optional; 034import java.util.OptionalDouble; 035import java.util.OptionalLong; 036import java.util.Set; 037import java.util.concurrent.Callable; 038import java.util.concurrent.CompletionService; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ExecutorCompletionService; 042import java.util.concurrent.Future; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.locks.ReentrantLock; 047import java.util.concurrent.locks.ReentrantReadWriteLock; 048import java.util.function.Predicate; 049import java.util.function.ToLongFunction; 050import java.util.stream.Collectors; 051import java.util.stream.LongStream; 052 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.permission.FsAction; 057import org.apache.hadoop.hbase.Cell; 058import org.apache.hadoop.hbase.CellComparator; 059import org.apache.hadoop.hbase.CellUtil; 060import org.apache.hadoop.hbase.CompoundConfiguration; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.MemoryCompactionPolicy; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.backup.FailedArchiveException; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.Scan; 068import org.apache.hadoop.hbase.conf.ConfigurationManager; 069import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 070import org.apache.hadoop.hbase.io.HeapSize; 071import org.apache.hadoop.hbase.io.compress.Compression; 072import org.apache.hadoop.hbase.io.crypto.Encryption; 073import org.apache.hadoop.hbase.io.hfile.CacheConfig; 074import org.apache.hadoop.hbase.io.hfile.HFile; 075import org.apache.hadoop.hbase.io.hfile.HFileContext; 076import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 077import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 078import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; 079import org.apache.hadoop.hbase.io.hfile.HFileScanner; 080import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; 081import org.apache.hadoop.hbase.log.HBaseMarkers; 082import org.apache.hadoop.hbase.monitoring.MonitoredTask; 083import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 084import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 085import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 086import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 087import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 088import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 089import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 091import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 092import org.apache.hadoop.hbase.security.EncryptionUtil; 093import org.apache.hadoop.hbase.security.User; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.ChecksumType; 096import org.apache.hadoop.hbase.util.ClassSize; 097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.util.ReflectionUtils; 100import org.apache.hadoop.util.StringUtils; 101import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 102import org.apache.yetus.audience.InterfaceAudience; 103import org.slf4j.Logger; 104import org.slf4j.LoggerFactory; 105import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 106import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 107import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 108import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 109import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 110import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 111import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 112import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 113import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 115 116/** 117 * A Store holds a column family in a Region. Its a memstore and a set of zero 118 * or more StoreFiles, which stretch backwards over time. 119 * 120 * <p>There's no reason to consider append-logging at this level; all logging 121 * and locking is handled at the HRegion level. Store just provides 122 * services to manage sets of StoreFiles. One of the most important of those 123 * services is compaction services where files are aggregated once they pass 124 * a configurable threshold. 125 * 126 * <p>Locking and transactions are handled at a higher level. This API should 127 * not be called directly but by an HRegion manager. 128 */ 129@InterfaceAudience.Private 130public class HStore implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { 131 public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; 132 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = 133 "hbase.server.compactchecker.interval.multiplier"; 134 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; 135 public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; 136 // keep in accordance with HDFS default storage policy 137 public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; 138 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; 139 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; 140 141 private static final Logger LOG = LoggerFactory.getLogger(HStore.class); 142 143 protected final MemStore memstore; 144 // This stores directory in the filesystem. 145 protected final HRegion region; 146 private final ColumnFamilyDescriptor family; 147 private final HRegionFileSystem fs; 148 protected Configuration conf; 149 protected CacheConfig cacheConf; 150 private long lastCompactSize = 0; 151 volatile boolean forceMajor = false; 152 /* how many bytes to write between status checks */ 153 static int closeCheckInterval = 0; 154 private AtomicLong storeSize = new AtomicLong(); 155 private AtomicLong totalUncompressedBytes = new AtomicLong(); 156 157 /** 158 * RWLock for store operations. 159 * Locked in shared mode when the list of component stores is looked at: 160 * - all reads/writes to table data 161 * - checking for split 162 * Locked in exclusive mode when the list of component stores is modified: 163 * - closing 164 * - completing a compaction 165 */ 166 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 167 /** 168 * Lock specific to archiving compacted store files. This avoids races around 169 * the combination of retrieving the list of compacted files and moving them to 170 * the archive directory. Since this is usually a background process (other than 171 * on close), we don't want to handle this with the store write lock, which would 172 * block readers and degrade performance. 173 * 174 * Locked by: 175 * - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() 176 * - close() 177 */ 178 final ReentrantLock archiveLock = new ReentrantLock(); 179 180 private final boolean verifyBulkLoads; 181 182 private ScanInfo scanInfo; 183 184 // All access must be synchronized. 185 // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. 186 private final List<HStoreFile> filesCompacting = Lists.newArrayList(); 187 188 // All access must be synchronized. 189 private final Set<ChangedReadersObserver> changedReaderObservers = 190 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>()); 191 192 protected final int blocksize; 193 private HFileDataBlockEncoder dataBlockEncoder; 194 195 /** Checksum configuration */ 196 protected ChecksumType checksumType; 197 protected int bytesPerChecksum; 198 199 // Comparing KeyValues 200 protected final CellComparator comparator; 201 202 final StoreEngine<?, ?, ?, ?> storeEngine; 203 204 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); 205 private volatile OffPeakHours offPeakHours; 206 207 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; 208 private int flushRetriesNumber; 209 private int pauseTime; 210 211 private long blockingFileCount; 212 private int compactionCheckMultiplier; 213 protected Encryption.Context cryptoContext = Encryption.Context.NONE; 214 215 private AtomicLong flushedCellsCount = new AtomicLong(); 216 private AtomicLong compactedCellsCount = new AtomicLong(); 217 private AtomicLong majorCompactedCellsCount = new AtomicLong(); 218 private AtomicLong flushedCellsSize = new AtomicLong(); 219 private AtomicLong flushedOutputFileSize = new AtomicLong(); 220 private AtomicLong compactedCellsSize = new AtomicLong(); 221 private AtomicLong majorCompactedCellsSize = new AtomicLong(); 222 223 /** 224 * Constructor 225 * @param region 226 * @param family HColumnDescriptor for this column 227 * @param confParam configuration object 228 * failed. Can be null. 229 * @throws IOException 230 */ 231 protected HStore(final HRegion region, final ColumnFamilyDescriptor family, 232 final Configuration confParam) throws IOException { 233 234 this.fs = region.getRegionFileSystem(); 235 236 // Assemble the store's home directory and Ensure it exists. 237 fs.createStoreDir(family.getNameAsString()); 238 this.region = region; 239 this.family = family; 240 // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor 241 // CompoundConfiguration will look for keys in reverse order of addition, so we'd 242 // add global config first, then table and cf overrides, then cf metadata. 243 this.conf = new CompoundConfiguration() 244 .add(confParam) 245 .addBytesMap(region.getTableDescriptor().getValues()) 246 .addStringMap(family.getConfiguration()) 247 .addBytesMap(family.getValues()); 248 this.blocksize = family.getBlocksize(); 249 250 // set block storage policy for store directory 251 String policyName = family.getStoragePolicy(); 252 if (null == policyName) { 253 policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); 254 } 255 this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); 256 257 this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); 258 259 this.comparator = region.getCellComparator(); 260 // used by ScanQueryMatcher 261 long timeToPurgeDeletes = 262 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 263 LOG.trace("Time to purge deletes set to {}ms in store {}", timeToPurgeDeletes, this); 264 // Get TTL 265 long ttl = determineTTLFromFamily(family); 266 // Why not just pass a HColumnDescriptor in here altogether? Even if have 267 // to clone it? 268 scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); 269 this.memstore = getMemstore(); 270 this.offPeakHours = OffPeakHours.getInstance(conf); 271 272 // Setting up cache configuration for this family 273 createCacheConf(family); 274 275 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); 276 277 this.blockingFileCount = 278 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT); 279 this.compactionCheckMultiplier = conf.getInt( 280 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 281 if (this.compactionCheckMultiplier <= 0) { 282 LOG.error("Compaction check period multiplier must be positive, setting default: {}", 283 DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); 284 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; 285 } 286 287 if (HStore.closeCheckInterval == 0) { 288 HStore.closeCheckInterval = conf.getInt( 289 "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); 290 } 291 292 this.storeEngine = createStoreEngine(this, this.conf, this.comparator); 293 List<HStoreFile> hStoreFiles = loadStoreFiles(); 294 // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read 295 // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and 296 // update the storeSize in the completeCompaction(..) finally (just like compaction) , so 297 // no need calculate the storeSize twice. 298 this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); 299 this.totalUncompressedBytes.addAndGet(getTotalUmcompressedBytes(hStoreFiles)); 300 this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); 301 302 // Initialize checksum type from name. The names are CRC32, CRC32C, etc. 303 this.checksumType = getChecksumType(conf); 304 // Initialize bytes per checksum 305 this.bytesPerChecksum = getBytesPerChecksum(conf); 306 flushRetriesNumber = conf.getInt( 307 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); 308 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE); 309 if (flushRetriesNumber <= 0) { 310 throw new IllegalArgumentException( 311 "hbase.hstore.flush.retries.number must be > 0, not " 312 + flushRetriesNumber); 313 } 314 cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); 315 316 LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " 317 + "encoding={}, compression={}", 318 getColumnFamilyName(), this.memstore.getClass().getSimpleName(), policyName, 319 this.verifyBulkLoads, family.getDataBlockEncoding(), 320 family.getCompressionType()); 321 } 322 323 /** 324 * @return MemStore Instance to use in this store. 325 */ 326 private MemStore getMemstore() { 327 MemStore ms = null; 328 // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum! 329 MemoryCompactionPolicy inMemoryCompaction = null; 330 if (this.getTableName().isSystemTable()) { 331 inMemoryCompaction = MemoryCompactionPolicy.valueOf( 332 conf.get("hbase.systemtables.compacting.memstore.type", "NONE")); 333 } else { 334 inMemoryCompaction = family.getInMemoryCompaction(); 335 } 336 if (inMemoryCompaction == null) { 337 inMemoryCompaction = 338 MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 339 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT)); 340 } 341 switch (inMemoryCompaction) { 342 case NONE: 343 ms = ReflectionUtils.newInstance(DefaultMemStore.class, 344 new Object[] { conf, this.comparator, 345 this.getHRegion().getRegionServicesForStores()}); 346 break; 347 default: 348 Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME, 349 CompactingMemStore.class, CompactingMemStore.class); 350 ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this, 351 this.getHRegion().getRegionServicesForStores(), inMemoryCompaction}); 352 } 353 return ms; 354 } 355 356 /** 357 * Creates the cache config. 358 * @param family The current column family. 359 */ 360 protected void createCacheConf(final ColumnFamilyDescriptor family) { 361 this.cacheConf = new CacheConfig(conf, family); 362 } 363 364 /** 365 * Creates the store engine configured for the given Store. 366 * @param store The store. An unfortunate dependency needed due to it 367 * being passed to coprocessors via the compactor. 368 * @param conf Store configuration. 369 * @param kvComparator KVComparator for storeFileManager. 370 * @return StoreEngine to use. 371 */ 372 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 373 CellComparator kvComparator) throws IOException { 374 return StoreEngine.create(store, conf, comparator); 375 } 376 377 /** 378 * @param family 379 * @return TTL in seconds of the specified family 380 */ 381 public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { 382 // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. 383 long ttl = family.getTimeToLive(); 384 if (ttl == HConstants.FOREVER) { 385 // Default is unlimited ttl. 386 ttl = Long.MAX_VALUE; 387 } else if (ttl == -1) { 388 ttl = Long.MAX_VALUE; 389 } else { 390 // Second -> ms adjust for user data 391 ttl *= 1000; 392 } 393 return ttl; 394 } 395 396 @Override 397 public String getColumnFamilyName() { 398 return this.family.getNameAsString(); 399 } 400 401 @Override 402 public TableName getTableName() { 403 return this.getRegionInfo().getTable(); 404 } 405 406 @Override 407 public FileSystem getFileSystem() { 408 return this.fs.getFileSystem(); 409 } 410 411 public HRegionFileSystem getRegionFileSystem() { 412 return this.fs; 413 } 414 415 /* Implementation of StoreConfigInformation */ 416 @Override 417 public long getStoreFileTtl() { 418 // TTL only applies if there's no MIN_VERSIONs setting on the column. 419 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE; 420 } 421 422 @Override 423 public long getMemStoreFlushSize() { 424 // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack 425 return this.region.memstoreFlushSize; 426 } 427 428 @Override 429 public MemStoreSize getFlushableSize() { 430 return this.memstore.getFlushableSize(); 431 } 432 433 @Override 434 public MemStoreSize getSnapshotSize() { 435 return this.memstore.getSnapshotSize(); 436 } 437 438 @Override 439 public long getCompactionCheckMultiplier() { 440 return this.compactionCheckMultiplier; 441 } 442 443 @Override 444 public long getBlockingFileCount() { 445 return blockingFileCount; 446 } 447 /* End implementation of StoreConfigInformation */ 448 449 /** 450 * Returns the configured bytesPerChecksum value. 451 * @param conf The configuration 452 * @return The bytesPerChecksum that is set in the configuration 453 */ 454 public static int getBytesPerChecksum(Configuration conf) { 455 return conf.getInt(HConstants.BYTES_PER_CHECKSUM, 456 HFile.DEFAULT_BYTES_PER_CHECKSUM); 457 } 458 459 /** 460 * Returns the configured checksum algorithm. 461 * @param conf The configuration 462 * @return The checksum algorithm that is set in the configuration 463 */ 464 public static ChecksumType getChecksumType(Configuration conf) { 465 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); 466 if (checksumName == null) { 467 return ChecksumType.getDefaultChecksumType(); 468 } else { 469 return ChecksumType.nameToType(checksumName); 470 } 471 } 472 473 /** 474 * @return how many bytes to write between status checks 475 */ 476 public static int getCloseCheckInterval() { 477 return closeCheckInterval; 478 } 479 480 @Override 481 public ColumnFamilyDescriptor getColumnFamilyDescriptor() { 482 return this.family; 483 } 484 485 @Override 486 public OptionalLong getMaxSequenceId() { 487 return StoreUtils.getMaxSequenceIdInList(this.getStorefiles()); 488 } 489 490 @Override 491 public OptionalLong getMaxMemStoreTS() { 492 return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles()); 493 } 494 495 /** 496 * @param tabledir {@link Path} to where the table is being stored 497 * @param hri {@link RegionInfo} for the region. 498 * @param family {@link ColumnFamilyDescriptor} describing the column family 499 * @return Path to family/Store home directory. 500 */ 501 @Deprecated 502 public static Path getStoreHomedir(final Path tabledir, 503 final RegionInfo hri, final byte[] family) { 504 return getStoreHomedir(tabledir, hri.getEncodedName(), family); 505 } 506 507 /** 508 * @param tabledir {@link Path} to where the table is being stored 509 * @param encodedName Encoded region name. 510 * @param family {@link ColumnFamilyDescriptor} describing the column family 511 * @return Path to family/Store home directory. 512 */ 513 @Deprecated 514 public static Path getStoreHomedir(final Path tabledir, 515 final String encodedName, final byte[] family) { 516 return new Path(tabledir, new Path(encodedName, Bytes.toString(family))); 517 } 518 519 /** 520 * @return the data block encoder 521 */ 522 public HFileDataBlockEncoder getDataBlockEncoder() { 523 return dataBlockEncoder; 524 } 525 526 /** 527 * Should be used only in tests. 528 * @param blockEncoder the block delta encoder to use 529 */ 530 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { 531 this.dataBlockEncoder = blockEncoder; 532 } 533 534 /** 535 * Creates an unsorted list of StoreFile loaded in parallel 536 * from the given directory. 537 * @throws IOException 538 */ 539 private List<HStoreFile> loadStoreFiles() throws IOException { 540 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); 541 return openStoreFiles(files); 542 } 543 544 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException { 545 if (CollectionUtils.isEmpty(files)) { 546 return Collections.emptyList(); 547 } 548 // initialize the thread pool for opening store files in parallel.. 549 ThreadPoolExecutor storeFileOpenerThreadPool = 550 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + 551 this.getColumnFamilyName()); 552 CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool); 553 554 int totalValidStoreFile = 0; 555 for (StoreFileInfo storeFileInfo : files) { 556 // open each store file in parallel 557 completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); 558 totalValidStoreFile++; 559 } 560 561 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 562 IOException ioe = null; 563 try { 564 for (int i = 0; i < totalValidStoreFile; i++) { 565 try { 566 HStoreFile storeFile = completionService.take().get(); 567 if (storeFile != null) { 568 LOG.debug("loaded {}", storeFile); 569 results.add(storeFile); 570 } 571 } catch (InterruptedException e) { 572 if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); 573 } catch (ExecutionException e) { 574 if (ioe == null) ioe = new IOException(e.getCause()); 575 } 576 } 577 } finally { 578 storeFileOpenerThreadPool.shutdownNow(); 579 } 580 if (ioe != null) { 581 // close StoreFile readers 582 boolean evictOnClose = 583 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 584 for (HStoreFile file : results) { 585 try { 586 if (file != null) { 587 file.closeStoreFile(evictOnClose); 588 } 589 } catch (IOException e) { 590 LOG.warn("Could not close store file", e); 591 } 592 } 593 throw ioe; 594 } 595 596 return results; 597 } 598 599 @Override 600 public void refreshStoreFiles() throws IOException { 601 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName()); 602 refreshStoreFilesInternal(newFiles); 603 } 604 605 /** 606 * Replaces the store files that the store has with the given files. Mainly used by secondary 607 * region replicas to keep up to date with the primary region files. 608 * @throws IOException 609 */ 610 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 611 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 612 for (String file : newFiles) { 613 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file)); 614 } 615 refreshStoreFilesInternal(storeFiles); 616 } 617 618 /** 619 * Checks the underlying store files, and opens the files that have not 620 * been opened, and removes the store file readers for store files no longer 621 * available. Mainly used by secondary region replicas to keep up to date with 622 * the primary region files. 623 * @throws IOException 624 */ 625 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 626 StoreFileManager sfm = storeEngine.getStoreFileManager(); 627 Collection<HStoreFile> currentFiles = sfm.getStorefiles(); 628 Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); 629 if (currentFiles == null) currentFiles = Collections.emptySet(); 630 if (newFiles == null) newFiles = Collections.emptySet(); 631 if (compactedFiles == null) compactedFiles = Collections.emptySet(); 632 633 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 634 for (HStoreFile sf : currentFiles) { 635 currentFilesSet.put(sf.getFileInfo(), sf); 636 } 637 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 638 for (HStoreFile sf : compactedFiles) { 639 compactedFilesSet.put(sf.getFileInfo(), sf); 640 } 641 642 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 643 // Exclude the files that have already been compacted 644 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 645 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 646 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 647 648 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 649 return; 650 } 651 652 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString() 653 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); 654 655 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 656 for (StoreFileInfo sfi : toBeRemovedFiles) { 657 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 658 } 659 660 // try to open the files 661 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles); 662 663 // propogate the file changes to the underlying store file manager 664 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception 665 666 // Advance the memstore read point to be at least the new store files seqIds so that 667 // readers might pick it up. This assumes that the store is not getting any writes (otherwise 668 // in-flight transactions might be made visible) 669 if (!toBeAddedFiles.isEmpty()) { 670 // we must have the max sequence id here as we do have several store files 671 region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); 672 } 673 674 completeCompaction(toBeRemovedStoreFiles); 675 } 676 677 @VisibleForTesting 678 protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { 679 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); 680 return createStoreFileAndReader(info); 681 } 682 683 private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 684 info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); 685 HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, 686 this.family.getBloomFilterType(), isPrimaryReplicaStore()); 687 storeFile.initReader(); 688 return storeFile; 689 } 690 691 /** 692 * This message intends to inform the MemStore that next coming updates 693 * are going to be part of the replaying edits from WAL 694 */ 695 public void startReplayingFromWAL(){ 696 this.memstore.startReplayingFromWAL(); 697 } 698 699 /** 700 * This message intends to inform the MemStore that the replaying edits from WAL 701 * are done 702 */ 703 public void stopReplayingFromWAL(){ 704 this.memstore.stopReplayingFromWAL(); 705 } 706 707 /** 708 * Adds a value to the memstore 709 */ 710 public void add(final Cell cell, MemStoreSizing memstoreSizing) { 711 lock.readLock().lock(); 712 try { 713 this.memstore.add(cell, memstoreSizing); 714 } finally { 715 lock.readLock().unlock(); 716 } 717 } 718 719 /** 720 * Adds the specified value to the memstore 721 */ 722 public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { 723 lock.readLock().lock(); 724 try { 725 memstore.add(cells, memstoreSizing); 726 } finally { 727 lock.readLock().unlock(); 728 } 729 } 730 731 @Override 732 public long timeOfOldestEdit() { 733 return memstore.timeOfOldestEdit(); 734 } 735 736 /** 737 * @return All store files. 738 */ 739 @Override 740 public Collection<HStoreFile> getStorefiles() { 741 return this.storeEngine.getStoreFileManager().getStorefiles(); 742 } 743 744 @Override 745 public Collection<HStoreFile> getCompactedFiles() { 746 return this.storeEngine.getStoreFileManager().getCompactedfiles(); 747 } 748 749 /** 750 * This throws a WrongRegionException if the HFile does not fit in this region, or an 751 * InvalidHFileException if the HFile is not valid. 752 */ 753 public void assertBulkLoadHFileOk(Path srcPath) throws IOException { 754 HFile.Reader reader = null; 755 try { 756 LOG.info("Validating hfile at " + srcPath + " for inclusion in " 757 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); 758 FileSystem srcFs = srcPath.getFileSystem(conf); 759 srcFs.access(srcPath, FsAction.READ_WRITE); 760 reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf); 761 reader.loadFileInfo(); 762 763 Optional<byte[]> firstKey = reader.getFirstRowKey(); 764 Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); 765 Optional<Cell> lk = reader.getLastKey(); 766 Preconditions.checkState(lk.isPresent(), "Last key can not be null"); 767 byte[] lastKey = CellUtil.cloneRow(lk.get()); 768 769 if (LOG.isDebugEnabled()) { 770 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + 771 " last=" + Bytes.toStringBinary(lastKey)); 772 LOG.debug("Region bounds: first=" + 773 Bytes.toStringBinary(getRegionInfo().getStartKey()) + 774 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); 775 } 776 777 if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { 778 throw new WrongRegionException( 779 "Bulk load file " + srcPath.toString() + " does not fit inside region " 780 + this.getRegionInfo().getRegionNameAsString()); 781 } 782 783 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE, 784 HConstants.DEFAULT_MAX_FILE_SIZE)) { 785 LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + 786 reader.length() + " bytes can be problematic as it may lead to oversplitting."); 787 } 788 789 if (verifyBulkLoads) { 790 long verificationStartTime = EnvironmentEdgeManager.currentTime(); 791 LOG.info("Full verification started for bulk load hfile: {}", srcPath); 792 Cell prevCell = null; 793 HFileScanner scanner = reader.getScanner(false, false, false); 794 scanner.seekTo(); 795 do { 796 Cell cell = scanner.getCell(); 797 if (prevCell != null) { 798 if (comparator.compareRows(prevCell, cell) > 0) { 799 throw new InvalidHFileException("Previous row is greater than" 800 + " current row: path=" + srcPath + " previous=" 801 + CellUtil.getCellKeyAsString(prevCell) + " current=" 802 + CellUtil.getCellKeyAsString(cell)); 803 } 804 if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) { 805 throw new InvalidHFileException("Previous key had different" 806 + " family compared to current key: path=" + srcPath 807 + " previous=" 808 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(), 809 prevCell.getFamilyLength()) 810 + " current=" 811 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 812 cell.getFamilyLength())); 813 } 814 } 815 prevCell = cell; 816 } while (scanner.next()); 817 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() 818 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime) 819 + " ms"); 820 } 821 } finally { 822 if (reader != null) reader.close(); 823 } 824 } 825 826 /** 827 * This method should only be called from Region. It is assumed that the ranges of values in the 828 * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) 829 * 830 * @param srcPathStr 831 * @param seqNum sequence Id associated with the HFile 832 */ 833 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { 834 Path srcPath = new Path(srcPathStr); 835 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); 836 } 837 838 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { 839 Path srcPath = new Path(srcPathStr); 840 try { 841 fs.commitStoreFile(srcPath, dstPath); 842 } finally { 843 if (this.getCoprocessorHost() != null) { 844 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); 845 } 846 } 847 848 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " 849 + dstPath + " - updating store file list."); 850 851 HStoreFile sf = createStoreFileAndReader(dstPath); 852 bulkLoadHFile(sf); 853 854 LOG.info("Successfully loaded store file {} into store {} (new location: {})", 855 srcPath, this, dstPath); 856 857 return dstPath; 858 } 859 860 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { 861 HStoreFile sf = createStoreFileAndReader(fileInfo); 862 bulkLoadHFile(sf); 863 } 864 865 private void bulkLoadHFile(HStoreFile sf) throws IOException { 866 StoreFileReader r = sf.getReader(); 867 this.storeSize.addAndGet(r.length()); 868 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 869 870 // Append the new storefile into the list 871 this.lock.writeLock().lock(); 872 try { 873 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); 874 } finally { 875 // We need the lock, as long as we are updating the storeFiles 876 // or changing the memstore. Let us release it before calling 877 // notifyChangeReadersObservers. See HBASE-4485 for a possible 878 // deadlock scenario that could have happened if continue to hold 879 // the lock. 880 this.lock.writeLock().unlock(); 881 } 882 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); 883 if (LOG.isTraceEnabled()) { 884 String traceMessage = "BULK LOAD time,size,store size,store files [" 885 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize 886 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 887 LOG.trace(traceMessage); 888 } 889 } 890 891 /** 892 * Close all the readers We don't need to worry about subsequent requests because the Region holds 893 * a write lock that will prevent any more reads or writes. 894 * @return the {@link StoreFile StoreFiles} that were previously being used. 895 * @throws IOException on failure 896 */ 897 public ImmutableCollection<HStoreFile> close() throws IOException { 898 this.archiveLock.lock(); 899 this.lock.writeLock().lock(); 900 try { 901 // Clear so metrics doesn't find them. 902 ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); 903 Collection<HStoreFile> compactedfiles = 904 storeEngine.getStoreFileManager().clearCompactedFiles(); 905 // clear the compacted files 906 if (CollectionUtils.isNotEmpty(compactedfiles)) { 907 removeCompactedfiles(compactedfiles, true); 908 } 909 if (!result.isEmpty()) { 910 // initialize the thread pool for closing store files in parallel. 911 ThreadPoolExecutor storeFileCloserThreadPool = this.region 912 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" 913 + this.getColumnFamilyName()); 914 915 // close each store file in parallel 916 CompletionService<Void> completionService = 917 new ExecutorCompletionService<>(storeFileCloserThreadPool); 918 for (HStoreFile f : result) { 919 completionService.submit(new Callable<Void>() { 920 @Override 921 public Void call() throws IOException { 922 boolean evictOnClose = 923 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 924 f.closeStoreFile(evictOnClose); 925 return null; 926 } 927 }); 928 } 929 930 IOException ioe = null; 931 try { 932 for (int i = 0; i < result.size(); i++) { 933 try { 934 Future<Void> future = completionService.take(); 935 future.get(); 936 } catch (InterruptedException e) { 937 if (ioe == null) { 938 ioe = new InterruptedIOException(); 939 ioe.initCause(e); 940 } 941 } catch (ExecutionException e) { 942 if (ioe == null) ioe = new IOException(e.getCause()); 943 } 944 } 945 } finally { 946 storeFileCloserThreadPool.shutdownNow(); 947 } 948 if (ioe != null) throw ioe; 949 } 950 LOG.trace("Closed {}", this); 951 return result; 952 } finally { 953 this.lock.writeLock().unlock(); 954 this.archiveLock.unlock(); 955 } 956 } 957 958 /** 959 * Snapshot this stores memstore. Call before running 960 * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController, 961 * FlushLifeCycleTracker)} 962 * so it has some work to do. 963 */ 964 void snapshot() { 965 this.lock.writeLock().lock(); 966 try { 967 this.memstore.snapshot(); 968 } finally { 969 this.lock.writeLock().unlock(); 970 } 971 } 972 973 /** 974 * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. 975 * @param logCacheFlushId flush sequence number 976 * @param snapshot 977 * @param status 978 * @param throughputController 979 * @return The path name of the tmp file to which the store was flushed 980 * @throws IOException if exception occurs during process 981 */ 982 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, 983 MonitoredTask status, ThroughputController throughputController, 984 FlushLifeCycleTracker tracker) throws IOException { 985 // If an exception happens flushing, we let it out without clearing 986 // the memstore snapshot. The old snapshot will be returned when we say 987 // 'snapshot', the next time flush comes around. 988 // Retry after catching exception when flushing, otherwise server will abort 989 // itself 990 StoreFlusher flusher = storeEngine.getStoreFlusher(); 991 IOException lastException = null; 992 for (int i = 0; i < flushRetriesNumber; i++) { 993 try { 994 List<Path> pathNames = 995 flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); 996 Path lastPathName = null; 997 try { 998 for (Path pathName : pathNames) { 999 lastPathName = pathName; 1000 validateStoreFile(pathName); 1001 } 1002 return pathNames; 1003 } catch (Exception e) { 1004 LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e); 1005 if (e instanceof IOException) { 1006 lastException = (IOException) e; 1007 } else { 1008 lastException = new IOException(e); 1009 } 1010 } 1011 } catch (IOException e) { 1012 LOG.warn("Failed flushing store file, retrying num={}", i, e); 1013 lastException = e; 1014 } 1015 if (lastException != null && i < (flushRetriesNumber - 1)) { 1016 try { 1017 Thread.sleep(pauseTime); 1018 } catch (InterruptedException e) { 1019 IOException iie = new InterruptedIOException(); 1020 iie.initCause(e); 1021 throw iie; 1022 } 1023 } 1024 } 1025 throw lastException; 1026 } 1027 1028 /** 1029 * @param path The pathname of the tmp file into which the store was flushed 1030 * @param logCacheFlushId 1031 * @param status 1032 * @return store file created. 1033 * @throws IOException 1034 */ 1035 private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) 1036 throws IOException { 1037 // Write-out finished successfully, move into the right spot 1038 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); 1039 1040 status.setStatus("Flushing " + this + ": reopening flushed file"); 1041 HStoreFile sf = createStoreFileAndReader(dstPath); 1042 1043 StoreFileReader r = sf.getReader(); 1044 this.storeSize.addAndGet(r.length()); 1045 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1046 1047 if (LOG.isInfoEnabled()) { 1048 LOG.info("Added " + sf + ", entries=" + r.getEntries() + 1049 ", sequenceid=" + logCacheFlushId + 1050 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); 1051 } 1052 return sf; 1053 } 1054 1055 /** 1056 * @param maxKeyCount 1057 * @param compression Compression algorithm to use 1058 * @param isCompaction whether we are creating a new file in a compaction 1059 * @param includeMVCCReadpoint - whether to include MVCC or not 1060 * @param includesTag - includesTag or not 1061 * @return Writer for a new StoreFile in the tmp dir. 1062 */ 1063 // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of 1064 // compaction 1065 public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, 1066 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, 1067 boolean shouldDropBehind) throws IOException { 1068 final CacheConfig writerCacheConf; 1069 if (isCompaction) { 1070 // Don't cache data on write on compactions. 1071 writerCacheConf = new CacheConfig(cacheConf); 1072 writerCacheConf.setCacheDataOnWrite(false); 1073 } else { 1074 writerCacheConf = cacheConf; 1075 } 1076 InetSocketAddress[] favoredNodes = null; 1077 if (region.getRegionServerServices() != null) { 1078 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( 1079 region.getRegionInfo().getEncodedName()); 1080 } 1081 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, 1082 cryptoContext); 1083 Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); 1084 StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, 1085 this.getFileSystem()) 1086 .withOutputDir(familyTempDir) 1087 .withComparator(comparator) 1088 .withBloomType(family.getBloomFilterType()) 1089 .withMaxKeyCount(maxKeyCount) 1090 .withFavoredNodes(favoredNodes) 1091 .withFileContext(hFileContext) 1092 .withShouldDropCacheBehind(shouldDropBehind); 1093 return builder.build(); 1094 } 1095 1096 private HFileContext createFileContext(Compression.Algorithm compression, 1097 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) { 1098 if (compression == null) { 1099 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 1100 } 1101 HFileContext hFileContext = new HFileContextBuilder() 1102 .withIncludesMvcc(includeMVCCReadpoint) 1103 .withIncludesTags(includesTag) 1104 .withCompression(compression) 1105 .withCompressTags(family.isCompressTags()) 1106 .withChecksumType(checksumType) 1107 .withBytesPerCheckSum(bytesPerChecksum) 1108 .withBlockSize(blocksize) 1109 .withHBaseCheckSum(true) 1110 .withDataBlockEncoding(family.getDataBlockEncoding()) 1111 .withEncryptionContext(cryptoContext) 1112 .withCreateTime(EnvironmentEdgeManager.currentTime()) 1113 .build(); 1114 return hFileContext; 1115 } 1116 1117 1118 private long getTotalSize(Collection<HStoreFile> sfs) { 1119 return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); 1120 } 1121 1122 /** 1123 * Change storeFiles adding into place the Reader produced by this new flush. 1124 * @param sfs Store files 1125 * @param snapshotId 1126 * @throws IOException 1127 * @return Whether compaction is required. 1128 */ 1129 private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { 1130 this.lock.writeLock().lock(); 1131 try { 1132 this.storeEngine.getStoreFileManager().insertNewFiles(sfs); 1133 if (snapshotId > 0) { 1134 this.memstore.clearSnapshot(snapshotId); 1135 } 1136 } finally { 1137 // We need the lock, as long as we are updating the storeFiles 1138 // or changing the memstore. Let us release it before calling 1139 // notifyChangeReadersObservers. See HBASE-4485 for a possible 1140 // deadlock scenario that could have happened if continue to hold 1141 // the lock. 1142 this.lock.writeLock().unlock(); 1143 } 1144 // notify to be called here - only in case of flushes 1145 notifyChangedReadersObservers(sfs); 1146 if (LOG.isTraceEnabled()) { 1147 long totalSize = getTotalSize(sfs); 1148 String traceMessage = "FLUSH time,count,size,store size,store files [" 1149 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize 1150 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; 1151 LOG.trace(traceMessage); 1152 } 1153 return needsCompaction(); 1154 } 1155 1156 /** 1157 * Notify all observers that set of Readers has changed. 1158 * @throws IOException 1159 */ 1160 private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { 1161 for (ChangedReadersObserver o : this.changedReaderObservers) { 1162 List<KeyValueScanner> memStoreScanners; 1163 this.lock.readLock().lock(); 1164 try { 1165 memStoreScanners = this.memstore.getScanners(o.getReadPoint()); 1166 } finally { 1167 this.lock.readLock().unlock(); 1168 } 1169 o.updateReaders(sfs, memStoreScanners); 1170 } 1171 } 1172 1173 /** 1174 * Get all scanners with no filtering based on TTL (that happens further down the line). 1175 * @param cacheBlocks cache the blocks or not 1176 * @param usePread true to use pread, false if not 1177 * @param isCompaction true if the scanner is created for compaction 1178 * @param matcher the scan query matcher 1179 * @param startRow the start row 1180 * @param stopRow the stop row 1181 * @param readPt the read point of the current scan 1182 * @return all scanners for this store 1183 */ 1184 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, 1185 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) 1186 throws IOException { 1187 return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, 1188 readPt); 1189 } 1190 1191 /** 1192 * Get all scanners with no filtering based on TTL (that happens further down the line). 1193 * @param cacheBlocks cache the blocks or not 1194 * @param usePread true to use pread, false if not 1195 * @param isCompaction true if the scanner is created for compaction 1196 * @param matcher the scan query matcher 1197 * @param startRow the start row 1198 * @param includeStartRow true to include start row, false if not 1199 * @param stopRow the stop row 1200 * @param includeStopRow true to include stop row, false if not 1201 * @param readPt the read point of the current scan 1202 * @return all scanners for this store 1203 */ 1204 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, 1205 boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, 1206 byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { 1207 Collection<HStoreFile> storeFilesToScan; 1208 List<KeyValueScanner> memStoreScanners; 1209 this.lock.readLock().lock(); 1210 try { 1211 storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, 1212 includeStartRow, stopRow, includeStopRow); 1213 memStoreScanners = this.memstore.getScanners(readPt); 1214 } finally { 1215 this.lock.readLock().unlock(); 1216 } 1217 1218 // First the store file scanners 1219 1220 // TODO this used to get the store files in descending order, 1221 // but now we get them in ascending order, which I think is 1222 // actually more correct, since memstore get put at the end. 1223 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, 1224 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1225 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1226 scanners.addAll(sfScanners); 1227 // Then the memstore scanners 1228 scanners.addAll(memStoreScanners); 1229 return scanners; 1230 } 1231 1232 /** 1233 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1234 * (that happens further down the line). 1235 * @param files the list of files on which the scanners has to be created 1236 * @param cacheBlocks cache the blocks or not 1237 * @param usePread true to use pread, false if not 1238 * @param isCompaction true if the scanner is created for compaction 1239 * @param matcher the scan query matcher 1240 * @param startRow the start row 1241 * @param stopRow the stop row 1242 * @param readPt the read point of the current scan 1243 * @param includeMemstoreScanner true if memstore has to be included 1244 * @return scanners on the given files and on the memstore if specified 1245 */ 1246 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1247 boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1248 byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) 1249 throws IOException { 1250 return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, 1251 false, readPt, includeMemstoreScanner); 1252 } 1253 1254 /** 1255 * Create scanners on the given files and if needed on the memstore with no filtering based on TTL 1256 * (that happens further down the line). 1257 * @param files the list of files on which the scanners has to be created 1258 * @param cacheBlocks ache the blocks or not 1259 * @param usePread true to use pread, false if not 1260 * @param isCompaction true if the scanner is created for compaction 1261 * @param matcher the scan query matcher 1262 * @param startRow the start row 1263 * @param includeStartRow true to include start row, false if not 1264 * @param stopRow the stop row 1265 * @param includeStopRow true to include stop row, false if not 1266 * @param readPt the read point of the current scan 1267 * @param includeMemstoreScanner true if memstore has to be included 1268 * @return scanners on the given files and on the memstore if specified 1269 */ 1270 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1271 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1272 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1273 boolean includeMemstoreScanner) throws IOException { 1274 List<KeyValueScanner> memStoreScanners = null; 1275 if (includeMemstoreScanner) { 1276 this.lock.readLock().lock(); 1277 try { 1278 memStoreScanners = this.memstore.getScanners(readPt); 1279 } finally { 1280 this.lock.readLock().unlock(); 1281 } 1282 } 1283 List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, 1284 cacheBlocks, usePread, isCompaction, false, matcher, readPt); 1285 List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); 1286 scanners.addAll(sfScanners); 1287 // Then the memstore scanners 1288 if (memStoreScanners != null) { 1289 scanners.addAll(memStoreScanners); 1290 } 1291 return scanners; 1292 } 1293 1294 /** 1295 * @param o Observer who wants to know about changes in set of Readers 1296 */ 1297 public void addChangedReaderObserver(ChangedReadersObserver o) { 1298 this.changedReaderObservers.add(o); 1299 } 1300 1301 /** 1302 * @param o Observer no longer interested in changes in set of Readers. 1303 */ 1304 public void deleteChangedReaderObserver(ChangedReadersObserver o) { 1305 // We don't check if observer present; it may not be (legitimately) 1306 this.changedReaderObservers.remove(o); 1307 } 1308 1309 ////////////////////////////////////////////////////////////////////////////// 1310 // Compaction 1311 ////////////////////////////////////////////////////////////////////////////// 1312 1313 /** 1314 * Compact the StoreFiles. This method may take some time, so the calling 1315 * thread must be able to block for long periods. 1316 * 1317 * <p>During this time, the Store can work as usual, getting values from 1318 * StoreFiles and writing new StoreFiles from the memstore. 1319 * 1320 * Existing StoreFiles are not destroyed until the new compacted StoreFile is 1321 * completely written-out to disk. 1322 * 1323 * <p>The compactLock prevents multiple simultaneous compactions. 1324 * The structureLock prevents us from interfering with other write operations. 1325 * 1326 * <p>We don't want to hold the structureLock for the whole time, as a compact() 1327 * can be lengthy and we want to allow cache-flushes during this period. 1328 * 1329 * <p> Compaction event should be idempotent, since there is no IO Fencing for 1330 * the region directory in hdfs. A region server might still try to complete the 1331 * compaction after it lost the region. That is why the following events are carefully 1332 * ordered for a compaction: 1333 * 1. Compaction writes new files under region/.tmp directory (compaction output) 1334 * 2. Compaction atomically moves the temporary file under region directory 1335 * 3. Compaction appends a WAL edit containing the compaction input and output files. 1336 * Forces sync on WAL. 1337 * 4. Compaction deletes the input files from the region directory. 1338 * 1339 * Failure conditions are handled like this: 1340 * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes 1341 * the compaction later, it will only write the new data file to the region directory. 1342 * Since we already have this data, this will be idempotent but we will have a redundant 1343 * copy of the data. 1344 * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The 1345 * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. 1346 * - If RS fails after 3, the region region server who opens the region will pick up the 1347 * the compaction marker from the WAL and replay it by removing the compaction input files. 1348 * Failed RS can also attempt to delete those files, but the operation will be idempotent 1349 * 1350 * See HBASE-2231 for details. 1351 * 1352 * @param compaction compaction details obtained from requestCompaction() 1353 * @throws IOException 1354 * @return Storefile we compacted into or null if we failed or opted out early. 1355 */ 1356 public List<HStoreFile> compact(CompactionContext compaction, 1357 ThroughputController throughputController, User user) throws IOException { 1358 assert compaction != null; 1359 List<HStoreFile> sfs = null; 1360 CompactionRequestImpl cr = compaction.getRequest(); 1361 try { 1362 // Do all sanity checking in here if we have a valid CompactionRequestImpl 1363 // because we need to clean up after it on the way out in a finally 1364 // block below 1365 long compactionStartTime = EnvironmentEdgeManager.currentTime(); 1366 assert compaction.hasSelection(); 1367 Collection<HStoreFile> filesToCompact = cr.getFiles(); 1368 assert !filesToCompact.isEmpty(); 1369 synchronized (filesCompacting) { 1370 // sanity check: we're compacting files that this store knows about 1371 // TODO: change this to LOG.error() after more debugging 1372 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); 1373 } 1374 1375 // Ready to go. Have list of files to compact. 1376 LOG.info("Starting compaction of " + filesToCompact + 1377 " into tmpdir=" + fs.getTempDir() + ", totalSize=" + 1378 TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); 1379 1380 return doCompaction(cr, filesToCompact, user, compactionStartTime, 1381 compaction.compact(throughputController, user)); 1382 } finally { 1383 finishCompactionRequest(cr); 1384 } 1385 } 1386 1387 @VisibleForTesting 1388 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 1389 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 1390 List<Path> newFiles) throws IOException { 1391 // Do the steps necessary to complete the compaction. 1392 List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); 1393 writeCompactionWalRecord(filesToCompact, sfs); 1394 replaceStoreFiles(filesToCompact, sfs); 1395 if (cr.isMajor()) { 1396 majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1397 majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1398 } else { 1399 compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); 1400 compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); 1401 } 1402 long outputBytes = getTotalSize(sfs); 1403 1404 // At this point the store will use new files for all new scanners. 1405 completeCompaction(filesToCompact); // update store size. 1406 1407 long now = EnvironmentEdgeManager.currentTime(); 1408 if (region.getRegionServerServices() != null 1409 && region.getRegionServerServices().getMetrics() != null) { 1410 region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(), 1411 now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), 1412 outputBytes); 1413 } 1414 1415 logCompactionEndMessage(cr, sfs, now, compactionStartTime); 1416 return sfs; 1417 } 1418 1419 private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles, 1420 User user) throws IOException { 1421 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 1422 for (Path newFile : newFiles) { 1423 assert newFile != null; 1424 HStoreFile sf = moveFileIntoPlace(newFile); 1425 if (this.getCoprocessorHost() != null) { 1426 getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); 1427 } 1428 assert sf != null; 1429 sfs.add(sf); 1430 } 1431 return sfs; 1432 } 1433 1434 // Package-visible for tests 1435 HStoreFile moveFileIntoPlace(Path newFile) throws IOException { 1436 validateStoreFile(newFile); 1437 // Move the file into the right spot 1438 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); 1439 return createStoreFileAndReader(destPath); 1440 } 1441 1442 /** 1443 * Writes the compaction WAL record. 1444 * @param filesCompacted Files compacted (input). 1445 * @param newFiles Files from compaction. 1446 */ 1447 private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, 1448 Collection<HStoreFile> newFiles) throws IOException { 1449 if (region.getWAL() == null) { 1450 return; 1451 } 1452 List<Path> inputPaths = 1453 filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1454 List<Path> outputPaths = 1455 newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); 1456 RegionInfo info = this.region.getRegionInfo(); 1457 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, 1458 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); 1459 // Fix reaching into Region to get the maxWaitForSeqId. 1460 // Does this method belong in Region altogether given it is making so many references up there? 1461 // Could be Region#writeCompactionMarker(compactionDescriptor); 1462 WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(), 1463 this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); 1464 } 1465 1466 @VisibleForTesting 1467 void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) 1468 throws IOException { 1469 this.lock.writeLock().lock(); 1470 try { 1471 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); 1472 synchronized (filesCompacting) { 1473 filesCompacting.removeAll(compactedFiles); 1474 } 1475 } finally { 1476 this.lock.writeLock().unlock(); 1477 } 1478 } 1479 1480 /** 1481 * Log a very elaborate compaction completion message. 1482 * @param cr Request. 1483 * @param sfs Resulting files. 1484 * @param compactionStartTime Start time. 1485 */ 1486 private void logCompactionEndMessage( 1487 CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) { 1488 StringBuilder message = new StringBuilder( 1489 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " 1490 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " 1491 + this + " of " + this.getRegionInfo().getShortNameToLog() + " into "); 1492 if (sfs.isEmpty()) { 1493 message.append("none, "); 1494 } else { 1495 for (HStoreFile sf: sfs) { 1496 message.append(sf.getPath().getName()); 1497 message.append("(size="); 1498 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); 1499 message.append("), "); 1500 } 1501 } 1502 message.append("total size for store is ") 1503 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)) 1504 .append(". This selection was in queue for ") 1505 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) 1506 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) 1507 .append(" to execute."); 1508 LOG.info(message.toString()); 1509 if (LOG.isTraceEnabled()) { 1510 int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); 1511 long resultSize = getTotalSize(sfs); 1512 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," 1513 + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," 1514 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; 1515 LOG.trace(traceMessage); 1516 } 1517 } 1518 1519 /** 1520 * Call to complete a compaction. Its for the case where we find in the WAL a compaction 1521 * that was not finished. We could find one recovering a WAL after a regionserver crash. 1522 * See HBASE-2231. 1523 * @param compaction 1524 */ 1525 public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, 1526 boolean removeFiles) throws IOException { 1527 LOG.debug("Completing compaction from the WAL marker"); 1528 List<String> compactionInputs = compaction.getCompactionInputList(); 1529 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); 1530 1531 // The Compaction Marker is written after the compaction is completed, 1532 // and the files moved into the region/family folder. 1533 // 1534 // If we crash after the entry is written, we may not have removed the 1535 // input files, but the output file is present. 1536 // (The unremoved input files will be removed by this function) 1537 // 1538 // If we scan the directory and the file is not present, it can mean that: 1539 // - The file was manually removed by the user 1540 // - The file was removed as consequence of subsequent compaction 1541 // so, we can't do anything with the "compaction output list" because those 1542 // files have already been loaded when opening the region (by virtue of 1543 // being in the store's folder) or they may be missing due to a compaction. 1544 1545 String familyName = this.getColumnFamilyName(); 1546 Set<String> inputFiles = new HashSet<>(); 1547 for (String compactionInput : compactionInputs) { 1548 Path inputPath = fs.getStoreFilePath(familyName, compactionInput); 1549 inputFiles.add(inputPath.getName()); 1550 } 1551 1552 //some of the input files might already be deleted 1553 List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); 1554 for (HStoreFile sf : this.getStorefiles()) { 1555 if (inputFiles.contains(sf.getPath().getName())) { 1556 inputStoreFiles.add(sf); 1557 } 1558 } 1559 1560 // check whether we need to pick up the new files 1561 List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); 1562 1563 if (pickCompactionFiles) { 1564 for (HStoreFile sf : this.getStorefiles()) { 1565 compactionOutputs.remove(sf.getPath().getName()); 1566 } 1567 for (String compactionOutput : compactionOutputs) { 1568 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); 1569 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 1570 outputStoreFiles.add(storeFile); 1571 } 1572 } 1573 1574 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { 1575 LOG.info("Replaying compaction marker, replacing input files: " + 1576 inputStoreFiles + " with output files : " + outputStoreFiles); 1577 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); 1578 this.completeCompaction(inputStoreFiles); 1579 } 1580 } 1581 1582 /** 1583 * This method tries to compact N recent files for testing. 1584 * Note that because compacting "recent" files only makes sense for some policies, 1585 * e.g. the default one, it assumes default policy is used. It doesn't use policy, 1586 * but instead makes a compaction candidate list by itself. 1587 * @param N Number of files. 1588 */ 1589 @VisibleForTesting 1590 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { 1591 List<HStoreFile> filesToCompact; 1592 boolean isMajor; 1593 1594 this.lock.readLock().lock(); 1595 try { 1596 synchronized (filesCompacting) { 1597 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles()); 1598 if (!filesCompacting.isEmpty()) { 1599 // exclude all files older than the newest file we're currently 1600 // compacting. this allows us to preserve contiguity (HBASE-2856) 1601 HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); 1602 int idx = filesToCompact.indexOf(last); 1603 Preconditions.checkArgument(idx != -1); 1604 filesToCompact.subList(0, idx + 1).clear(); 1605 } 1606 int count = filesToCompact.size(); 1607 if (N > count) { 1608 throw new RuntimeException("Not enough files"); 1609 } 1610 1611 filesToCompact = filesToCompact.subList(count - N, count); 1612 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); 1613 filesCompacting.addAll(filesToCompact); 1614 Collections.sort(filesCompacting, storeEngine.getStoreFileManager() 1615 .getStoreFileComparator()); 1616 } 1617 } finally { 1618 this.lock.readLock().unlock(); 1619 } 1620 1621 try { 1622 // Ready to go. Have list of files to compact. 1623 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor()) 1624 .compactForTesting(filesToCompact, isMajor); 1625 for (Path newFile: newFiles) { 1626 // Move the compaction into place. 1627 HStoreFile sf = moveFileIntoPlace(newFile); 1628 if (this.getCoprocessorHost() != null) { 1629 this.getCoprocessorHost().postCompact(this, sf, null, null, null); 1630 } 1631 replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); 1632 completeCompaction(filesToCompact); 1633 } 1634 } finally { 1635 synchronized (filesCompacting) { 1636 filesCompacting.removeAll(filesToCompact); 1637 } 1638 } 1639 } 1640 1641 @Override 1642 public boolean hasReferences() { 1643 // Grab the read lock here, because we need to ensure that: only when the atomic 1644 // replaceStoreFiles(..) finished, we can get all the complete store file list. 1645 this.lock.readLock().lock(); 1646 try { 1647 // Merge the current store files with compacted files here due to HBASE-20940. 1648 Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles()); 1649 allStoreFiles.addAll(getCompactedFiles()); 1650 return StoreUtils.hasReferences(allStoreFiles); 1651 } finally { 1652 this.lock.readLock().unlock(); 1653 } 1654 } 1655 1656 /** 1657 * getter for CompactionProgress object 1658 * @return CompactionProgress object; can be null 1659 */ 1660 public CompactionProgress getCompactionProgress() { 1661 return this.storeEngine.getCompactor().getProgress(); 1662 } 1663 1664 @Override 1665 public boolean shouldPerformMajorCompaction() throws IOException { 1666 for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1667 // TODO: what are these reader checks all over the place? 1668 if (sf.getReader() == null) { 1669 LOG.debug("StoreFile {} has null Reader", sf); 1670 return false; 1671 } 1672 } 1673 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( 1674 this.storeEngine.getStoreFileManager().getStorefiles()); 1675 } 1676 1677 public Optional<CompactionContext> requestCompaction() throws IOException { 1678 return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null); 1679 } 1680 1681 public Optional<CompactionContext> requestCompaction(int priority, 1682 CompactionLifeCycleTracker tracker, User user) throws IOException { 1683 // don't even select for compaction if writes are disabled 1684 if (!this.areWritesEnabled()) { 1685 return Optional.empty(); 1686 } 1687 // Before we do compaction, try to get rid of unneeded files to simplify things. 1688 removeUnneededFiles(); 1689 1690 final CompactionContext compaction = storeEngine.createCompaction(); 1691 CompactionRequestImpl request = null; 1692 this.lock.readLock().lock(); 1693 try { 1694 synchronized (filesCompacting) { 1695 // First, see if coprocessor would want to override selection. 1696 if (this.getCoprocessorHost() != null) { 1697 final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); 1698 boolean override = getCoprocessorHost().preCompactSelection(this, 1699 candidatesForCoproc, tracker, user); 1700 if (override) { 1701 // Coprocessor is overriding normal file selection. 1702 compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc)); 1703 } 1704 } 1705 1706 // Normal case - coprocessor is not overriding file selection. 1707 if (!compaction.hasSelection()) { 1708 boolean isUserCompaction = priority == Store.PRIORITY_USER; 1709 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && 1710 offPeakCompactionTracker.compareAndSet(false, true); 1711 try { 1712 compaction.select(this.filesCompacting, isUserCompaction, 1713 mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); 1714 } catch (IOException e) { 1715 if (mayUseOffPeak) { 1716 offPeakCompactionTracker.set(false); 1717 } 1718 throw e; 1719 } 1720 assert compaction.hasSelection(); 1721 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { 1722 // Compaction policy doesn't want to take advantage of off-peak. 1723 offPeakCompactionTracker.set(false); 1724 } 1725 } 1726 if (this.getCoprocessorHost() != null) { 1727 this.getCoprocessorHost().postCompactSelection( 1728 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, 1729 compaction.getRequest(), user); 1730 } 1731 // Finally, we have the resulting files list. Check if we have any files at all. 1732 request = compaction.getRequest(); 1733 Collection<HStoreFile> selectedFiles = request.getFiles(); 1734 if (selectedFiles.isEmpty()) { 1735 return Optional.empty(); 1736 } 1737 1738 addToCompactingFiles(selectedFiles); 1739 1740 // If we're enqueuing a major, clear the force flag. 1741 this.forceMajor = this.forceMajor && !request.isMajor(); 1742 1743 // Set common request properties. 1744 // Set priority, either override value supplied by caller or from store. 1745 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); 1746 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); 1747 request.setTracker(tracker); 1748 } 1749 } finally { 1750 this.lock.readLock().unlock(); 1751 } 1752 1753 if (LOG.isDebugEnabled()) { 1754 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() 1755 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" 1756 + (request.isAllFiles() ? " (all files)" : "")); 1757 } 1758 this.region.reportCompactionRequestStart(request.isMajor()); 1759 return Optional.of(compaction); 1760 } 1761 1762 /** Adds the files to compacting files. filesCompacting must be locked. */ 1763 private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { 1764 if (CollectionUtils.isEmpty(filesToAdd)) { 1765 return; 1766 } 1767 // Check that we do not try to compact the same StoreFile twice. 1768 if (!Collections.disjoint(filesCompacting, filesToAdd)) { 1769 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); 1770 } 1771 filesCompacting.addAll(filesToAdd); 1772 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); 1773 } 1774 1775 private void removeUnneededFiles() throws IOException { 1776 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return; 1777 if (getColumnFamilyDescriptor().getMinVersions() > 0) { 1778 LOG.debug("Skipping expired store file removal due to min version being {}", 1779 getColumnFamilyDescriptor().getMinVersions()); 1780 return; 1781 } 1782 this.lock.readLock().lock(); 1783 Collection<HStoreFile> delSfs = null; 1784 try { 1785 synchronized (filesCompacting) { 1786 long cfTtl = getStoreFileTtl(); 1787 if (cfTtl != Long.MAX_VALUE) { 1788 delSfs = storeEngine.getStoreFileManager().getUnneededFiles( 1789 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting); 1790 addToCompactingFiles(delSfs); 1791 } 1792 } 1793 } finally { 1794 this.lock.readLock().unlock(); 1795 } 1796 1797 if (CollectionUtils.isEmpty(delSfs)) { 1798 return; 1799 } 1800 1801 Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files. 1802 writeCompactionWalRecord(delSfs, newFiles); 1803 replaceStoreFiles(delSfs, newFiles); 1804 completeCompaction(delSfs); 1805 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " 1806 + this + " of " + this.getRegionInfo().getRegionNameAsString() 1807 + "; total size for store is " 1808 + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1)); 1809 } 1810 1811 public void cancelRequestedCompaction(CompactionContext compaction) { 1812 finishCompactionRequest(compaction.getRequest()); 1813 } 1814 1815 private void finishCompactionRequest(CompactionRequestImpl cr) { 1816 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); 1817 if (cr.isOffPeak()) { 1818 offPeakCompactionTracker.set(false); 1819 cr.setOffPeak(false); 1820 } 1821 synchronized (filesCompacting) { 1822 filesCompacting.removeAll(cr.getFiles()); 1823 } 1824 } 1825 1826 /** 1827 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 1828 * operation. 1829 * @param path the path to the store file 1830 */ 1831 private void validateStoreFile(Path path) throws IOException { 1832 HStoreFile storeFile = null; 1833 try { 1834 storeFile = createStoreFileAndReader(path); 1835 } catch (IOException e) { 1836 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 1837 throw e; 1838 } finally { 1839 if (storeFile != null) { 1840 storeFile.closeStoreFile(false); 1841 } 1842 } 1843 } 1844 1845 /** 1846 * Update counts. 1847 * @param compactedFiles list of files that were compacted 1848 */ 1849 @VisibleForTesting 1850 protected void completeCompaction(Collection<HStoreFile> compactedFiles) 1851 // Rename this method! TODO. 1852 throws IOException { 1853 this.storeSize.set(0L); 1854 this.totalUncompressedBytes.set(0L); 1855 for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { 1856 StoreFileReader r = hsf.getReader(); 1857 if (r == null) { 1858 LOG.warn("StoreFile {} has a null Reader", hsf); 1859 continue; 1860 } 1861 this.storeSize.addAndGet(r.length()); 1862 this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); 1863 } 1864 } 1865 1866 /* 1867 * @param wantedVersions How many versions were asked for. 1868 * @return wantedVersions or this families' {@link HConstants#VERSIONS}. 1869 */ 1870 int versionsToReturn(final int wantedVersions) { 1871 if (wantedVersions <= 0) { 1872 throw new IllegalArgumentException("Number of versions must be > 0"); 1873 } 1874 // Make sure we do not return more than maximum versions for this store. 1875 int maxVersions = this.family.getMaxVersions(); 1876 return wantedVersions > maxVersions ? maxVersions: wantedVersions; 1877 } 1878 1879 @Override 1880 public boolean canSplit() { 1881 this.lock.readLock().lock(); 1882 try { 1883 // Not split-able if we find a reference store file present in the store. 1884 boolean result = !hasReferences(); 1885 if (!result) { 1886 LOG.trace("Not splittable; has references: {}", this); 1887 } 1888 return result; 1889 } finally { 1890 this.lock.readLock().unlock(); 1891 } 1892 } 1893 1894 /** 1895 * Determines if Store should be split. 1896 */ 1897 public Optional<byte[]> getSplitPoint() { 1898 this.lock.readLock().lock(); 1899 try { 1900 // Should already be enforced by the split policy! 1901 assert !this.getRegionInfo().isMetaRegion(); 1902 // Not split-able if we find a reference store file present in the store. 1903 if (hasReferences()) { 1904 LOG.trace("Not splittable; has references: {}", this); 1905 return Optional.empty(); 1906 } 1907 return this.storeEngine.getStoreFileManager().getSplitPoint(); 1908 } catch(IOException e) { 1909 LOG.warn("Failed getting store size for {}", this, e); 1910 } finally { 1911 this.lock.readLock().unlock(); 1912 } 1913 return Optional.empty(); 1914 } 1915 1916 @Override 1917 public long getLastCompactSize() { 1918 return this.lastCompactSize; 1919 } 1920 1921 @Override 1922 public long getSize() { 1923 return storeSize.get(); 1924 } 1925 1926 public void triggerMajorCompaction() { 1927 this.forceMajor = true; 1928 } 1929 1930 ////////////////////////////////////////////////////////////////////////////// 1931 // File administration 1932 ////////////////////////////////////////////////////////////////////////////// 1933 1934 /** 1935 * Return a scanner for both the memstore and the HStore files. Assumes we are not in a 1936 * compaction. 1937 * @param scan Scan to apply when scanning the stores 1938 * @param targetCols columns to scan 1939 * @return a scanner over the current key values 1940 * @throws IOException on failure 1941 */ 1942 public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) 1943 throws IOException { 1944 lock.readLock().lock(); 1945 try { 1946 ScanInfo scanInfo; 1947 if (this.getCoprocessorHost() != null) { 1948 scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this); 1949 } else { 1950 scanInfo = getScanInfo(); 1951 } 1952 return createScanner(scan, scanInfo, targetCols, readPt); 1953 } finally { 1954 lock.readLock().unlock(); 1955 } 1956 } 1957 1958 // HMobStore will override this method to return its own implementation. 1959 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 1960 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 1961 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 1962 : new StoreScanner(this, scanInfo, scan, targetCols, readPt); 1963 } 1964 1965 /** 1966 * Recreates the scanners on the current list of active store file scanners 1967 * @param currentFileScanners the current set of active store file scanners 1968 * @param cacheBlocks cache the blocks or not 1969 * @param usePread use pread or not 1970 * @param isCompaction is the scanner for compaction 1971 * @param matcher the scan query matcher 1972 * @param startRow the scan's start row 1973 * @param includeStartRow should the scan include the start row 1974 * @param stopRow the scan's stop row 1975 * @param includeStopRow should the scan include the stop row 1976 * @param readPt the read point of the current scane 1977 * @param includeMemstoreScanner whether the current scanner should include memstorescanner 1978 * @return list of scanners recreated on the current Scanners 1979 * @throws IOException 1980 */ 1981 public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, 1982 boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 1983 byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1984 boolean includeMemstoreScanner) throws IOException { 1985 this.lock.readLock().lock(); 1986 try { 1987 Map<String, HStoreFile> name2File = 1988 new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); 1989 for (HStoreFile file : getStorefiles()) { 1990 name2File.put(file.getFileInfo().getActiveFileName(), file); 1991 } 1992 Collection<HStoreFile> compactedFiles = getCompactedFiles(); 1993 for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) { 1994 name2File.put(file.getFileInfo().getActiveFileName(), file); 1995 } 1996 List<HStoreFile> filesToReopen = new ArrayList<>(); 1997 for (KeyValueScanner kvs : currentFileScanners) { 1998 assert kvs.isFileScanner(); 1999 if (kvs.peek() == null) { 2000 continue; 2001 } 2002 filesToReopen.add(name2File.get(kvs.getFilePath().getName())); 2003 } 2004 if (filesToReopen.isEmpty()) { 2005 return null; 2006 } 2007 return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, 2008 includeStartRow, stopRow, includeStopRow, readPt, false); 2009 } finally { 2010 this.lock.readLock().unlock(); 2011 } 2012 } 2013 2014 @Override 2015 public String toString() { 2016 return this.getColumnFamilyName(); 2017 } 2018 2019 @Override 2020 public int getStorefilesCount() { 2021 return this.storeEngine.getStoreFileManager().getStorefileCount(); 2022 } 2023 2024 @Override 2025 public int getCompactedFilesCount() { 2026 return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); 2027 } 2028 2029 private LongStream getStoreFileAgeStream() { 2030 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2031 if (sf.getReader() == null) { 2032 LOG.warn("StoreFile {} has a null Reader", sf); 2033 return false; 2034 } else { 2035 return true; 2036 } 2037 }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()) 2038 .map(t -> EnvironmentEdgeManager.currentTime() - t); 2039 } 2040 2041 @Override 2042 public OptionalLong getMaxStoreFileAge() { 2043 return getStoreFileAgeStream().max(); 2044 } 2045 2046 @Override 2047 public OptionalLong getMinStoreFileAge() { 2048 return getStoreFileAgeStream().min(); 2049 } 2050 2051 @Override 2052 public OptionalDouble getAvgStoreFileAge() { 2053 return getStoreFileAgeStream().average(); 2054 } 2055 2056 @Override 2057 public long getNumReferenceFiles() { 2058 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2059 .filter(HStoreFile::isReference).count(); 2060 } 2061 2062 @Override 2063 public long getNumHFiles() { 2064 return this.storeEngine.getStoreFileManager().getStorefiles().stream() 2065 .filter(HStoreFile::isHFile).count(); 2066 } 2067 2068 @Override 2069 public long getStoreSizeUncompressed() { 2070 return this.totalUncompressedBytes.get(); 2071 } 2072 2073 @Override 2074 public long getStorefilesSize() { 2075 // Include all StoreFiles 2076 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); 2077 } 2078 2079 @Override 2080 public long getHFilesSize() { 2081 // Include only StoreFiles which are HFiles 2082 return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), 2083 HStoreFile::isHFile); 2084 } 2085 2086 private long getTotalUmcompressedBytes(List<HStoreFile> files) { 2087 return files.stream().filter(f -> f != null && f.getReader() != null) 2088 .mapToLong(f -> f.getReader().getTotalUncompressedBytes()).sum(); 2089 } 2090 2091 private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) { 2092 return files.stream().filter(f -> f != null && f.getReader() != null).filter(predicate) 2093 .mapToLong(f -> f.getReader().length()).sum(); 2094 } 2095 2096 private long getStoreFileFieldSize(ToLongFunction<StoreFileReader> f) { 2097 return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { 2098 if (sf.getReader() == null) { 2099 LOG.warn("StoreFile {} has a null Reader", sf); 2100 return false; 2101 } else { 2102 return true; 2103 } 2104 }).map(HStoreFile::getReader).mapToLong(f).sum(); 2105 } 2106 2107 @Override 2108 public long getStorefilesRootLevelIndexSize() { 2109 return getStoreFileFieldSize(StoreFileReader::indexSize); 2110 } 2111 2112 @Override 2113 public long getTotalStaticIndexSize() { 2114 return getStoreFileFieldSize(StoreFileReader::getUncompressedDataIndexSize); 2115 } 2116 2117 @Override 2118 public long getTotalStaticBloomSize() { 2119 return getStoreFileFieldSize(StoreFileReader::getTotalBloomSize); 2120 } 2121 2122 @Override 2123 public MemStoreSize getMemStoreSize() { 2124 return this.memstore.size(); 2125 } 2126 2127 @Override 2128 public int getCompactPriority() { 2129 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); 2130 if (priority == PRIORITY_USER) { 2131 LOG.warn("Compaction priority is USER despite there being no user compaction"); 2132 } 2133 return priority; 2134 } 2135 2136 public boolean throttleCompaction(long compactionSize) { 2137 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize); 2138 } 2139 2140 public HRegion getHRegion() { 2141 return this.region; 2142 } 2143 2144 public RegionCoprocessorHost getCoprocessorHost() { 2145 return this.region.getCoprocessorHost(); 2146 } 2147 2148 @Override 2149 public RegionInfo getRegionInfo() { 2150 return this.fs.getRegionInfo(); 2151 } 2152 2153 @Override 2154 public boolean areWritesEnabled() { 2155 return this.region.areWritesEnabled(); 2156 } 2157 2158 @Override 2159 public long getSmallestReadPoint() { 2160 return this.region.getSmallestReadPoint(); 2161 } 2162 2163 /** 2164 * Adds or replaces the specified KeyValues. 2165 * <p> 2166 * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in 2167 * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore. 2168 * <p> 2169 * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic 2170 * across all of them. 2171 * @param readpoint readpoint below which we can safely remove duplicate KVs 2172 * @throws IOException 2173 */ 2174 public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) 2175 throws IOException { 2176 this.lock.readLock().lock(); 2177 try { 2178 this.memstore.upsert(cells, readpoint, memstoreSizing); 2179 } finally { 2180 this.lock.readLock().unlock(); 2181 } 2182 } 2183 2184 public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { 2185 return new StoreFlusherImpl(cacheFlushId, tracker); 2186 } 2187 2188 private final class StoreFlusherImpl implements StoreFlushContext { 2189 2190 private final FlushLifeCycleTracker tracker; 2191 private final long cacheFlushSeqNum; 2192 private MemStoreSnapshot snapshot; 2193 private List<Path> tempFiles; 2194 private List<Path> committedFiles; 2195 private long cacheFlushCount; 2196 private long cacheFlushSize; 2197 private long outputFileSize; 2198 2199 private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { 2200 this.cacheFlushSeqNum = cacheFlushSeqNum; 2201 this.tracker = tracker; 2202 } 2203 2204 /** 2205 * This is not thread safe. The caller should have a lock on the region or the store. 2206 * If necessary, the lock can be added with the patch provided in HBASE-10087 2207 */ 2208 @Override 2209 public MemStoreSize prepare() { 2210 // passing the current sequence number of the wal - to allow bookkeeping in the memstore 2211 this.snapshot = memstore.snapshot(); 2212 this.cacheFlushCount = snapshot.getCellsCount(); 2213 this.cacheFlushSize = snapshot.getDataSize(); 2214 committedFiles = new ArrayList<>(1); 2215 return snapshot.getMemStoreSize(); 2216 } 2217 2218 @Override 2219 public void flushCache(MonitoredTask status) throws IOException { 2220 RegionServerServices rsService = region.getRegionServerServices(); 2221 ThroughputController throughputController = 2222 rsService == null ? null : rsService.getFlushThroughputController(); 2223 tempFiles = 2224 HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); 2225 } 2226 2227 @Override 2228 public boolean commit(MonitoredTask status) throws IOException { 2229 if (CollectionUtils.isEmpty(this.tempFiles)) { 2230 return false; 2231 } 2232 List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); 2233 for (Path storeFilePath : tempFiles) { 2234 try { 2235 HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); 2236 outputFileSize += sf.getReader().length(); 2237 storeFiles.add(sf); 2238 } catch (IOException ex) { 2239 LOG.error("Failed to commit store file {}", storeFilePath, ex); 2240 // Try to delete the files we have committed before. 2241 for (HStoreFile sf : storeFiles) { 2242 Path pathToDelete = sf.getPath(); 2243 try { 2244 sf.deleteStoreFile(); 2245 } catch (IOException deleteEx) { 2246 LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " 2247 + "halting {}", pathToDelete, ex); 2248 Runtime.getRuntime().halt(1); 2249 } 2250 } 2251 throw new IOException("Failed to commit the flush", ex); 2252 } 2253 } 2254 2255 for (HStoreFile sf : storeFiles) { 2256 if (HStore.this.getCoprocessorHost() != null) { 2257 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); 2258 } 2259 committedFiles.add(sf.getPath()); 2260 } 2261 2262 HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); 2263 HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); 2264 HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); 2265 2266 // Add new file to store files. Clear snapshot too while we have the Store write lock. 2267 return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); 2268 } 2269 2270 @Override 2271 public long getOutputFileSize() { 2272 return outputFileSize; 2273 } 2274 2275 @Override 2276 public List<Path> getCommittedFiles() { 2277 return committedFiles; 2278 } 2279 2280 /** 2281 * Similar to commit, but called in secondary region replicas for replaying the 2282 * flush cache from primary region. Adds the new files to the store, and drops the 2283 * snapshot depending on dropMemstoreSnapshot argument. 2284 * @param fileNames names of the flushed files 2285 * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot 2286 * @throws IOException 2287 */ 2288 @Override 2289 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) 2290 throws IOException { 2291 List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); 2292 for (String file : fileNames) { 2293 // open the file as a store file (hfile link, etc) 2294 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); 2295 HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); 2296 storeFiles.add(storeFile); 2297 HStore.this.storeSize.addAndGet(storeFile.getReader().length()); 2298 HStore.this.totalUncompressedBytes 2299 .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); 2300 if (LOG.isInfoEnabled()) { 2301 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() + 2302 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + 2303 ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" 2304 + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); 2305 } 2306 } 2307 2308 long snapshotId = -1; // -1 means do not drop 2309 if (dropMemstoreSnapshot && snapshot != null) { 2310 snapshotId = snapshot.getId(); 2311 snapshot.close(); 2312 } 2313 HStore.this.updateStorefiles(storeFiles, snapshotId); 2314 } 2315 2316 /** 2317 * Abort the snapshot preparation. Drops the snapshot if any. 2318 * @throws IOException 2319 */ 2320 @Override 2321 public void abort() throws IOException { 2322 if (snapshot != null) { 2323 //We need to close the snapshot when aborting, otherwise, the segment scanner 2324 //won't be closed. If we are using MSLAB, the chunk referenced by those scanners 2325 //can't be released, thus memory leak 2326 snapshot.close(); 2327 HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); 2328 } 2329 } 2330 } 2331 2332 @Override 2333 public boolean needsCompaction() { 2334 List<HStoreFile> filesCompactingClone = null; 2335 synchronized (filesCompacting) { 2336 filesCompactingClone = Lists.newArrayList(filesCompacting); 2337 } 2338 return this.storeEngine.needsCompaction(filesCompactingClone); 2339 } 2340 2341 /** 2342 * Used for tests. 2343 * @return cache configuration for this Store. 2344 */ 2345 @VisibleForTesting 2346 public CacheConfig getCacheConfig() { 2347 return this.cacheConf; 2348 } 2349 2350 public static final long FIXED_OVERHEAD = 2351 ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 2352 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); 2353 2354 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD 2355 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK 2356 + ClassSize.CONCURRENT_SKIPLISTMAP 2357 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT 2358 + ScanInfo.FIXED_OVERHEAD); 2359 2360 @Override 2361 public long heapSize() { 2362 MemStoreSize memstoreSize = this.memstore.size(); 2363 return DEEP_OVERHEAD + memstoreSize.getHeapSize(); 2364 } 2365 2366 @Override 2367 public CellComparator getComparator() { 2368 return comparator; 2369 } 2370 2371 public ScanInfo getScanInfo() { 2372 return scanInfo; 2373 } 2374 2375 /** 2376 * Set scan info, used by test 2377 * @param scanInfo new scan info to use for test 2378 */ 2379 void setScanInfo(ScanInfo scanInfo) { 2380 this.scanInfo = scanInfo; 2381 } 2382 2383 @Override 2384 public boolean hasTooManyStoreFiles() { 2385 return getStorefilesCount() > this.blockingFileCount; 2386 } 2387 2388 @Override 2389 public long getFlushedCellsCount() { 2390 return flushedCellsCount.get(); 2391 } 2392 2393 @Override 2394 public long getFlushedCellsSize() { 2395 return flushedCellsSize.get(); 2396 } 2397 2398 @Override 2399 public long getFlushedOutputFileSize() { 2400 return flushedOutputFileSize.get(); 2401 } 2402 2403 @Override 2404 public long getCompactedCellsCount() { 2405 return compactedCellsCount.get(); 2406 } 2407 2408 @Override 2409 public long getCompactedCellsSize() { 2410 return compactedCellsSize.get(); 2411 } 2412 2413 @Override 2414 public long getMajorCompactedCellsCount() { 2415 return majorCompactedCellsCount.get(); 2416 } 2417 2418 @Override 2419 public long getMajorCompactedCellsSize() { 2420 return majorCompactedCellsSize.get(); 2421 } 2422 2423 /** 2424 * Returns the StoreEngine that is backing this concrete implementation of Store. 2425 * @return Returns the {@link StoreEngine} object used internally inside this HStore object. 2426 */ 2427 @VisibleForTesting 2428 public StoreEngine<?, ?, ?, ?> getStoreEngine() { 2429 return this.storeEngine; 2430 } 2431 2432 protected OffPeakHours getOffPeakHours() { 2433 return this.offPeakHours; 2434 } 2435 2436 /** 2437 * {@inheritDoc} 2438 */ 2439 @Override 2440 public void onConfigurationChange(Configuration conf) { 2441 this.conf = new CompoundConfiguration() 2442 .add(conf) 2443 .addBytesMap(family.getValues()); 2444 this.storeEngine.compactionPolicy.setConf(conf); 2445 this.offPeakHours = OffPeakHours.getInstance(conf); 2446 } 2447 2448 /** 2449 * {@inheritDoc} 2450 */ 2451 @Override 2452 public void registerChildren(ConfigurationManager manager) { 2453 // No children to register 2454 } 2455 2456 /** 2457 * {@inheritDoc} 2458 */ 2459 @Override 2460 public void deregisterChildren(ConfigurationManager manager) { 2461 // No children to deregister 2462 } 2463 2464 @Override 2465 public double getCompactionPressure() { 2466 return storeEngine.getStoreFileManager().getCompactionPressure(); 2467 } 2468 2469 @Override 2470 public boolean isPrimaryReplicaStore() { 2471 return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; 2472 } 2473 2474 /** 2475 * Sets the store up for a region level snapshot operation. 2476 * @see #postSnapshotOperation() 2477 */ 2478 public void preSnapshotOperation() { 2479 archiveLock.lock(); 2480 } 2481 2482 /** 2483 * Perform tasks needed after the completion of snapshot operation. 2484 * @see #preSnapshotOperation() 2485 */ 2486 public void postSnapshotOperation() { 2487 archiveLock.unlock(); 2488 } 2489 2490 /** 2491 * Closes and archives the compacted files under this store 2492 */ 2493 public synchronized void closeAndArchiveCompactedFiles() throws IOException { 2494 closeAndArchiveCompactedFiles(false); 2495 } 2496 2497 @VisibleForTesting 2498 public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException { 2499 // ensure other threads do not attempt to archive the same files on close() 2500 archiveLock.lock(); 2501 try { 2502 lock.readLock().lock(); 2503 Collection<HStoreFile> copyCompactedfiles = null; 2504 try { 2505 Collection<HStoreFile> compactedfiles = 2506 this.getStoreEngine().getStoreFileManager().getCompactedfiles(); 2507 if (CollectionUtils.isNotEmpty(compactedfiles)) { 2508 // Do a copy under read lock 2509 copyCompactedfiles = new ArrayList<>(compactedfiles); 2510 } else { 2511 LOG.trace("No compacted files to archive"); 2512 } 2513 } finally { 2514 lock.readLock().unlock(); 2515 } 2516 if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { 2517 removeCompactedfiles(copyCompactedfiles, storeClosing); 2518 } 2519 } finally { 2520 archiveLock.unlock(); 2521 } 2522 } 2523 2524 /** 2525 * Archives and removes the compacted files 2526 * @param compactedfiles The compacted files in this store that are not active in reads 2527 * @throws IOException 2528 */ 2529 private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean storeClosing) 2530 throws IOException { 2531 final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); 2532 for (final HStoreFile file : compactedfiles) { 2533 synchronized (file) { 2534 try { 2535 StoreFileReader r = file.getReader(); 2536 if (r == null) { 2537 LOG.debug("The file {} was closed but still not archived", file); 2538 filesToRemove.add(file); 2539 continue; 2540 } 2541 2542 //Compacted files in the list should always be marked compacted away. In the event 2543 //they're contradicting in order to guarantee data consistency 2544 //should we choose one and ignore the other? 2545 if (storeClosing && !file.isCompactedAway()) { 2546 String msg = 2547 "Region closing but StoreFile is in compacted list but not compacted away: " + 2548 file.getPath(); 2549 throw new IllegalStateException(msg); 2550 } 2551 2552 //If store is closing we're ignoring any references to keep things consistent 2553 //and remove compacted storefiles from the region directory 2554 if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) { 2555 if (storeClosing && file.isReferencedInReads()) { 2556 LOG.warn("Region closing but StoreFile still has references: file={}, refCount={}", 2557 file.getPath(), r.getRefCount()); 2558 } 2559 // Even if deleting fails we need not bother as any new scanners won't be 2560 // able to use the compacted file as the status is already compactedAway 2561 LOG.trace("Closing and archiving the file {}", file); 2562 r.close(true); 2563 file.closeStreamReaders(true); 2564 // Just close and return 2565 filesToRemove.add(file); 2566 } else { 2567 LOG.info("Can't archive compacted file " + file.getPath() 2568 + " because of either isCompactedAway=" + file.isCompactedAway() 2569 + " or file has reference, isReferencedInReads=" + file.isReferencedInReads() 2570 + ", refCount=" + r.getRefCount() + ", skipping for now."); 2571 } 2572 } catch (Exception e) { 2573 String msg = "Exception while trying to close the compacted store file " + 2574 file.getPath(); 2575 if (storeClosing) { 2576 msg = "Store is closing. " + msg; 2577 } 2578 LOG.error(msg, e); 2579 //if we get an exception let caller know so it can abort the server 2580 if (storeClosing) { 2581 throw new IOException(msg, e); 2582 } 2583 } 2584 } 2585 } 2586 if (this.isPrimaryReplicaStore()) { 2587 // Only the primary region is allowed to move the file to archive. 2588 // The secondary region does not move the files to archive. Any active reads from 2589 // the secondary region will still work because the file as such has active readers on it. 2590 if (!filesToRemove.isEmpty()) { 2591 LOG.debug("Moving the files {} to archive", filesToRemove); 2592 // Only if this is successful it has to be removed 2593 try { 2594 this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove); 2595 } catch (FailedArchiveException fae) { 2596 // Even if archiving some files failed, we still need to clear out any of the 2597 // files which were successfully archived. Otherwise we will receive a 2598 // FileNotFoundException when we attempt to re-archive them in the next go around. 2599 Collection<Path> failedFiles = fae.getFailedFiles(); 2600 Iterator<HStoreFile> iter = filesToRemove.iterator(); 2601 while (iter.hasNext()) { 2602 if (failedFiles.contains(iter.next().getPath())) { 2603 iter.remove(); 2604 } 2605 } 2606 if (!filesToRemove.isEmpty()) { 2607 clearCompactedfiles(filesToRemove); 2608 } 2609 throw fae; 2610 } 2611 } 2612 } 2613 if (!filesToRemove.isEmpty()) { 2614 // Clear the compactedfiles from the store file manager 2615 clearCompactedfiles(filesToRemove); 2616 } 2617 } 2618 2619 public Long preFlushSeqIDEstimation() { 2620 return memstore.preFlushSeqIDEstimation(); 2621 } 2622 2623 @Override 2624 public boolean isSloppyMemStore() { 2625 return this.memstore.isSloppy(); 2626 } 2627 2628 private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { 2629 LOG.trace("Clearing the compacted file {} from this store", filesToRemove); 2630 try { 2631 lock.writeLock().lock(); 2632 this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); 2633 } finally { 2634 lock.writeLock().unlock(); 2635 } 2636 } 2637}