001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetSocketAddress;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableSet;
034import java.util.Optional;
035import java.util.OptionalDouble;
036import java.util.OptionalInt;
037import java.util.OptionalLong;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CompletionService;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorCompletionService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import java.util.concurrent.atomic.LongAdder;
050import java.util.concurrent.locks.ReentrantLock;
051import java.util.function.Consumer;
052import java.util.function.Supplier;
053import java.util.function.ToLongFunction;
054import java.util.stream.Collectors;
055import java.util.stream.LongStream;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellComparator;
062import org.apache.hadoop.hbase.CellUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.MemoryCompactionPolicy;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.backup.FailedArchiveException;
067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.Scan;
070import org.apache.hadoop.hbase.conf.ConfigurationManager;
071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
072import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
073import org.apache.hadoop.hbase.io.HeapSize;
074import org.apache.hadoop.hbase.io.hfile.CacheConfig;
075import org.apache.hadoop.hbase.io.hfile.HFile;
076import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
077import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
078import org.apache.hadoop.hbase.io.hfile.HFileScanner;
079import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
080import org.apache.hadoop.hbase.monitoring.MonitoredTask;
081import org.apache.hadoop.hbase.quotas.RegionSizeStore;
082import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
083import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
084import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
085import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
086import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
087import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
088import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
089import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
090import org.apache.hadoop.hbase.security.EncryptionUtil;
091import org.apache.hadoop.hbase.security.User;
092import org.apache.hadoop.hbase.util.Bytes;
093import org.apache.hadoop.hbase.util.ClassSize;
094import org.apache.hadoop.hbase.util.CommonFSUtils;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.Pair;
097import org.apache.hadoop.hbase.util.ReflectionUtils;
098import org.apache.hadoop.util.StringUtils;
099import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
100import org.apache.yetus.audience.InterfaceAudience;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
107import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
108import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
114
115/**
116 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles,
117 * which stretch backwards over time.
118 * <p>
119 * There's no reason to consider append-logging at this level; all logging and locking is handled at
120 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most
121 * important of those services is compaction services where files are aggregated once they pass a
122 * configurable threshold.
123 * <p>
124 * Locking and transactions are handled at a higher level. This API should not be called directly
125 * but by an HRegion manager.
126 */
127@InterfaceAudience.Private
128public class HStore
129  implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
130  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
131  public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
132    "hbase.server.compactchecker.interval.multiplier";
133  public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
134  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
135  // "NONE" is not a valid storage policy and means we defer the policy to HDFS
136  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE";
137  public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
138  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
139
140  // HBASE-24428 : Update compaction priority for recently split daughter regions
141  // so as to prioritize their compaction.
142  // Any compaction candidate with higher priority than compaction of newly split daugher regions
143  // should have priority value < (Integer.MIN_VALUE + 1000)
144  private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
145
146  private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
147
148  protected final MemStore memstore;
149  // This stores directory in the filesystem.
150  private final HRegion region;
151  protected Configuration conf;
152  private long lastCompactSize = 0;
153  volatile boolean forceMajor = false;
154  private AtomicLong storeSize = new AtomicLong();
155  private AtomicLong totalUncompressedBytes = new AtomicLong();
156  private LongAdder memstoreOnlyRowReadsCount = new LongAdder();
157  // rows that has cells from both memstore and files (or only files)
158  private LongAdder mixedRowReadsCount = new LongAdder();
159
160  /**
161   * Lock specific to archiving compacted store files. This avoids races around the combination of
162   * retrieving the list of compacted files and moving them to the archive directory. Since this is
163   * usually a background process (other than on close), we don't want to handle this with the store
164   * write lock, which would block readers and degrade performance. Locked by: -
165   * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close()
166   */
167  final ReentrantLock archiveLock = new ReentrantLock();
168
169  private final boolean verifyBulkLoads;
170
171  /**
172   * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
173   * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a
174   * message that identifies the Store experience this high-level of concurrency.
175   */
176  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
177  private final int parallelPutCountPrintThreshold;
178
179  private ScanInfo scanInfo;
180
181  // All access must be synchronized.
182  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
183  private final List<HStoreFile> filesCompacting = Lists.newArrayList();
184
185  // All access must be synchronized.
186  private final Set<ChangedReadersObserver> changedReaderObservers =
187    Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
188
189  private HFileDataBlockEncoder dataBlockEncoder;
190
191  final StoreEngine<?, ?, ?, ?> storeEngine;
192
193  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
194  private volatile OffPeakHours offPeakHours;
195
196  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
197  private int flushRetriesNumber;
198  private int pauseTime;
199
200  private long blockingFileCount;
201  private int compactionCheckMultiplier;
202
203  private AtomicLong flushedCellsCount = new AtomicLong();
204  private AtomicLong compactedCellsCount = new AtomicLong();
205  private AtomicLong majorCompactedCellsCount = new AtomicLong();
206  private AtomicLong flushedCellsSize = new AtomicLong();
207  private AtomicLong flushedOutputFileSize = new AtomicLong();
208  private AtomicLong compactedCellsSize = new AtomicLong();
209  private AtomicLong majorCompactedCellsSize = new AtomicLong();
210
211  private final StoreContext storeContext;
212
213  // Used to track the store files which are currently being written. For compaction, if we want to
214  // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
215  // track the store files being written when flushing.
216  // Notice that the creation is in the background compaction or flush thread and we will get the
217  // files in other thread, so it needs to be thread safe.
218  private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
219
220    private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
221
222    @Override
223    public void accept(Path t) {
224      files.add(t);
225    }
226
227    public Set<Path> get() {
228      return Collections.unmodifiableSet(files);
229    }
230  }
231
232  // We may have multiple compaction running at the same time, and flush can also happen at the same
233  // time, so here we need to use a collection, and the collection needs to be thread safe.
234  // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
235  // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
236  // IdentityHashMap if later we want to implement hashCode or equals.
237  private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
238    Collections.newSetFromMap(new ConcurrentHashMap<>());
239
240  // For the SFT implementation which we will write tmp store file first, we do not need to clean up
241  // the broken store files under the data directory, which means we do not need to track the store
242  // file writer creation. So here we abstract a factory to return different trackers for different
243  // SFT implementations.
244  private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
245
246  /**
247   * Constructor
248   * @param family    HColumnDescriptor for this column
249   * @param confParam configuration object failed. Can be null.
250   */
251  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
252    final Configuration confParam, boolean warmup) throws IOException {
253    this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
254
255    this.region = region;
256    this.storeContext = initializeStoreContext(family);
257
258    // Assemble the store's home directory and Ensure it exists.
259    region.getRegionFileSystem().createStoreDir(family.getNameAsString());
260
261    // set block storage policy for store directory
262    String policyName = family.getStoragePolicy();
263    if (null == policyName) {
264      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
265    }
266    region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());
267
268    this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
269
270    // used by ScanQueryMatcher
271    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
272    LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this);
273    // Get TTL
274    long ttl = determineTTLFromFamily(family);
275    // Why not just pass a HColumnDescriptor in here altogether? Even if have
276    // to clone it?
277    scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, region.getCellComparator());
278    this.memstore = getMemstore();
279
280    this.offPeakHours = OffPeakHours.getInstance(conf);
281
282    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
283
284    this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
285    this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY,
286      DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
287    if (this.compactionCheckMultiplier <= 0) {
288      LOG.error("Compaction check period multiplier must be positive, setting default: {}",
289        DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
290      this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
291    }
292
293    this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
294    storeEngine.initialize(warmup);
295    // if require writing to tmp dir first, then we just return null, which indicate that we do not
296    // need to track the creation of store file writer, otherwise we return a new
297    // StoreFileWriterCreationTracker.
298    this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst()
299      ? () -> null
300      : () -> new StoreFileWriterCreationTracker();
301    refreshStoreSizeAndTotalBytes();
302
303    flushRetriesNumber =
304      conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
305    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
306    if (flushRetriesNumber <= 0) {
307      throw new IllegalArgumentException(
308        "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber);
309    }
310
311    int confPrintThreshold =
312      this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
313    if (confPrintThreshold < 10) {
314      confPrintThreshold = 10;
315    }
316    this.parallelPutCountPrintThreshold = confPrintThreshold;
317
318    LOG.info(
319      "Store={},  memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
320        + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
321      this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
322      parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType());
323  }
324
325  private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
326    return new StoreContext.Builder().withBlockSize(family.getBlocksize())
327      .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))
328      .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family))
329      .withCellComparator(region.getCellComparator()).withColumnFamilyDescriptor(family)
330      .withCompactedFilesSupplier(this::getCompactedFiles)
331      .withRegionFileSystem(region.getRegionFileSystem())
332      .withFavoredNodesSupplier(this::getFavoredNodes)
333      .withFamilyStoreDirectoryPath(
334        region.getRegionFileSystem().getStoreDir(family.getNameAsString()))
335      .withRegionCoprocessorHost(region.getCoprocessorHost()).build();
336  }
337
338  private InetSocketAddress[] getFavoredNodes() {
339    InetSocketAddress[] favoredNodes = null;
340    if (region.getRegionServerServices() != null) {
341      favoredNodes = region.getRegionServerServices()
342        .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
343    }
344    return favoredNodes;
345  }
346
347  /**
348   * @return MemStore Instance to use in this store.
349   */
350  private MemStore getMemstore() {
351    MemStore ms = null;
352    // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
353    MemoryCompactionPolicy inMemoryCompaction = null;
354    if (this.getTableName().isSystemTable()) {
355      inMemoryCompaction = MemoryCompactionPolicy
356        .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE"));
357    } else {
358      inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();
359    }
360    if (inMemoryCompaction == null) {
361      inMemoryCompaction =
362        MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
363          CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());
364    }
365
366    switch (inMemoryCompaction) {
367      case NONE:
368        Class<? extends MemStore> memStoreClass =
369          conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class);
370        ms = ReflectionUtils.newInstance(memStoreClass,
371          new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() });
372        break;
373      default:
374        Class<? extends CompactingMemStore> compactingMemStoreClass =
375          conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
376        ms =
377          ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(),
378            this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
379    }
380    return ms;
381  }
382
383  /**
384   * Creates the cache config.
385   * @param family The current column family.
386   */
387  protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {
388    CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
389      region.getRegionServicesForStores().getByteBuffAllocator());
390    LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,
391      family.getNameAsString(), region.getRegionInfo().getEncodedName());
392    return cacheConf;
393  }
394
395  /**
396   * Creates the store engine configured for the given Store.
397   * @param store        The store. An unfortunate dependency needed due to it being passed to
398   *                     coprocessors via the compactor.
399   * @param conf         Store configuration.
400   * @param kvComparator KVComparator for storeFileManager.
401   * @return StoreEngine to use.
402   */
403  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
404    CellComparator kvComparator) throws IOException {
405    return StoreEngine.create(store, conf, kvComparator);
406  }
407
408  /**
409   * @return TTL in seconds of the specified family
410   */
411  public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
412    // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
413    long ttl = family.getTimeToLive();
414    if (ttl == HConstants.FOREVER) {
415      // Default is unlimited ttl.
416      ttl = Long.MAX_VALUE;
417    } else if (ttl == -1) {
418      ttl = Long.MAX_VALUE;
419    } else {
420      // Second -> ms adjust for user data
421      ttl *= 1000;
422    }
423    return ttl;
424  }
425
426  StoreContext getStoreContext() {
427    return storeContext;
428  }
429
430  @Override
431  public String getColumnFamilyName() {
432    return this.storeContext.getFamily().getNameAsString();
433  }
434
435  @Override
436  public TableName getTableName() {
437    return this.getRegionInfo().getTable();
438  }
439
440  @Override
441  public FileSystem getFileSystem() {
442    return storeContext.getRegionFileSystem().getFileSystem();
443  }
444
445  public HRegionFileSystem getRegionFileSystem() {
446    return storeContext.getRegionFileSystem();
447  }
448
449  /* Implementation of StoreConfigInformation */
450  @Override
451  public long getStoreFileTtl() {
452    // TTL only applies if there's no MIN_VERSIONs setting on the column.
453    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
454  }
455
456  @Override
457  public long getMemStoreFlushSize() {
458    // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack
459    return this.region.memstoreFlushSize;
460  }
461
462  @Override
463  public MemStoreSize getFlushableSize() {
464    return this.memstore.getFlushableSize();
465  }
466
467  @Override
468  public MemStoreSize getSnapshotSize() {
469    return this.memstore.getSnapshotSize();
470  }
471
472  @Override
473  public long getCompactionCheckMultiplier() {
474    return this.compactionCheckMultiplier;
475  }
476
477  @Override
478  public long getBlockingFileCount() {
479    return blockingFileCount;
480  }
481  /* End implementation of StoreConfigInformation */
482
483  @Override
484  public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
485    return this.storeContext.getFamily();
486  }
487
488  @Override
489  public OptionalLong getMaxSequenceId() {
490    return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());
491  }
492
493  @Override
494  public OptionalLong getMaxMemStoreTS() {
495    return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());
496  }
497
498  /**
499   * @return the data block encoder
500   */
501  public HFileDataBlockEncoder getDataBlockEncoder() {
502    return dataBlockEncoder;
503  }
504
505  /**
506   * Should be used only in tests.
507   * @param blockEncoder the block delta encoder to use
508   */
509  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
510    this.dataBlockEncoder = blockEncoder;
511  }
512
513  private void postRefreshStoreFiles() throws IOException {
514    // Advance the memstore read point to be at least the new store files seqIds so that
515    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
516    // in-flight transactions might be made visible)
517    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
518    refreshStoreSizeAndTotalBytes();
519  }
520
521  @Override
522  public void refreshStoreFiles() throws IOException {
523    storeEngine.refreshStoreFiles();
524    postRefreshStoreFiles();
525  }
526
527  /**
528   * Replaces the store files that the store has with the given files. Mainly used by secondary
529   * region replicas to keep up to date with the primary region files.
530   */
531  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
532    storeEngine.refreshStoreFiles(newFiles);
533    postRefreshStoreFiles();
534  }
535
536  /**
537   * This message intends to inform the MemStore that next coming updates are going to be part of
538   * the replaying edits from WAL
539   */
540  public void startReplayingFromWAL() {
541    this.memstore.startReplayingFromWAL();
542  }
543
544  /**
545   * This message intends to inform the MemStore that the replaying edits from WAL are done
546   */
547  public void stopReplayingFromWAL() {
548    this.memstore.stopReplayingFromWAL();
549  }
550
551  /**
552   * Adds a value to the memstore
553   */
554  public void add(final Cell cell, MemStoreSizing memstoreSizing) {
555    storeEngine.readLock();
556    try {
557      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
558        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
559          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
560      }
561      this.memstore.add(cell, memstoreSizing);
562    } finally {
563      storeEngine.readUnlock();
564      currentParallelPutCount.decrementAndGet();
565    }
566  }
567
568  /**
569   * Adds the specified value to the memstore
570   */
571  public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
572    storeEngine.readLock();
573    try {
574      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
575        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
576          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
577      }
578      memstore.add(cells, memstoreSizing);
579    } finally {
580      storeEngine.readUnlock();
581      currentParallelPutCount.decrementAndGet();
582    }
583  }
584
585  @Override
586  public long timeOfOldestEdit() {
587    return memstore.timeOfOldestEdit();
588  }
589
590  /**
591   * @return All store files.
592   */
593  @Override
594  public Collection<HStoreFile> getStorefiles() {
595    return this.storeEngine.getStoreFileManager().getStorefiles();
596  }
597
598  @Override
599  public Collection<HStoreFile> getCompactedFiles() {
600    return this.storeEngine.getStoreFileManager().getCompactedfiles();
601  }
602
603  /**
604   * This throws a WrongRegionException if the HFile does not fit in this region, or an
605   * InvalidHFileException if the HFile is not valid.
606   */
607  public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
608    HFile.Reader reader = null;
609    try {
610      LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);
611      FileSystem srcFs = srcPath.getFileSystem(conf);
612      srcFs.access(srcPath, FsAction.READ_WRITE);
613      reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);
614
615      Optional<byte[]> firstKey = reader.getFirstRowKey();
616      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
617      Optional<Cell> lk = reader.getLastKey();
618      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
619      byte[] lastKey = CellUtil.cloneRow(lk.get());
620
621      if (LOG.isDebugEnabled()) {
622        LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last="
623          + Bytes.toStringBinary(lastKey));
624        LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey())
625          + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
626      }
627
628      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
629        throw new WrongRegionException("Bulk load file " + srcPath.toString()
630          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
631      }
632
633      if (
634        reader.length()
635            > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
636      ) {
637        LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length()
638          + " bytes can be problematic as it may lead to oversplitting.");
639      }
640
641      if (verifyBulkLoads) {
642        long verificationStartTime = EnvironmentEdgeManager.currentTime();
643        LOG.info("Full verification started for bulk load hfile: {}", srcPath);
644        Cell prevCell = null;
645        HFileScanner scanner = reader.getScanner(conf, false, false, false);
646        scanner.seekTo();
647        do {
648          Cell cell = scanner.getCell();
649          if (prevCell != null) {
650            if (getComparator().compareRows(prevCell, cell) > 0) {
651              throw new InvalidHFileException("Previous row is greater than" + " current row: path="
652                + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current="
653                + CellUtil.getCellKeyAsString(cell));
654            }
655            if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {
656              throw new InvalidHFileException("Previous key had different"
657                + " family compared to current key: path=" + srcPath + " previous="
658                + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
659                  prevCell.getFamilyLength())
660                + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
661                  cell.getFamilyLength()));
662            }
663          }
664          prevCell = cell;
665        } while (scanner.next());
666        LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took "
667          + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms");
668      }
669    } finally {
670      if (reader != null) {
671        reader.close();
672      }
673    }
674  }
675
676  /**
677   * This method should only be called from Region. It is assumed that the ranges of values in the
678   * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
679   * @param seqNum sequence Id associated with the HFile
680   */
681  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
682    Path srcPath = new Path(srcPathStr);
683    return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
684  }
685
686  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
687    Path srcPath = new Path(srcPathStr);
688    try {
689      getRegionFileSystem().commitStoreFile(srcPath, dstPath);
690    } finally {
691      if (this.getCoprocessorHost() != null) {
692        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
693      }
694    }
695
696    LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath
697      + " - updating store file list.");
698
699    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
700    bulkLoadHFile(sf);
701
702    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
703
704    return dstPath;
705  }
706
707  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
708    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
709    bulkLoadHFile(sf);
710  }
711
712  private void bulkLoadHFile(HStoreFile sf) throws IOException {
713    StoreFileReader r = sf.getReader();
714    this.storeSize.addAndGet(r.length());
715    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
716    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
717    });
718    LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
719    if (LOG.isTraceEnabled()) {
720      String traceMessage = "BULK LOAD time,size,store size,store files ["
721        + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + ","
722        + storeEngine.getStoreFileManager().getStorefileCount() + "]";
723      LOG.trace(traceMessage);
724    }
725  }
726
727  private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
728    // Clear so metrics doesn't find them.
729    ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
730    Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
731    // clear the compacted files
732    if (CollectionUtils.isNotEmpty(compactedfiles)) {
733      removeCompactedfiles(compactedfiles,
734        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
735    }
736    if (!result.isEmpty()) {
737      // initialize the thread pool for closing store files in parallel.
738      ThreadPoolExecutor storeFileCloserThreadPool =
739        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
740          + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
741
742      // close each store file in parallel
743      CompletionService<Void> completionService =
744        new ExecutorCompletionService<>(storeFileCloserThreadPool);
745      for (HStoreFile f : result) {
746        completionService.submit(new Callable<Void>() {
747          @Override
748          public Void call() throws IOException {
749            boolean evictOnClose =
750              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
751            f.closeStoreFile(evictOnClose);
752            return null;
753          }
754        });
755      }
756
757      IOException ioe = null;
758      try {
759        for (int i = 0; i < result.size(); i++) {
760          try {
761            Future<Void> future = completionService.take();
762            future.get();
763          } catch (InterruptedException e) {
764            if (ioe == null) {
765              ioe = new InterruptedIOException();
766              ioe.initCause(e);
767            }
768          } catch (ExecutionException e) {
769            if (ioe == null) {
770              ioe = new IOException(e.getCause());
771            }
772          }
773        }
774      } finally {
775        storeFileCloserThreadPool.shutdownNow();
776      }
777      if (ioe != null) {
778        throw ioe;
779      }
780    }
781    LOG.trace("Closed {}", this);
782    return result;
783  }
784
785  /**
786   * Close all the readers We don't need to worry about subsequent requests because the Region holds
787   * a write lock that will prevent any more reads or writes.
788   * @return the {@link StoreFile StoreFiles} that were previously being used.
789   * @throws IOException on failure
790   */
791  public ImmutableCollection<HStoreFile> close() throws IOException {
792    // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
793    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
794    // Change later if findbugs becomes smarter in the future.
795    this.archiveLock.lock();
796    try {
797      this.storeEngine.writeLock();
798      try {
799        return closeWithoutLock();
800      } finally {
801        this.storeEngine.writeUnlock();
802      }
803    } finally {
804      this.archiveLock.unlock();
805    }
806  }
807
808  /**
809   * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called
810   * previously.
811   * @param logCacheFlushId flush sequence number
812   * @return The path name of the tmp file to which the store was flushed
813   * @throws IOException if exception occurs during process
814   */
815  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
816    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
817    Consumer<Path> writerCreationTracker) throws IOException {
818    // If an exception happens flushing, we let it out without clearing
819    // the memstore snapshot. The old snapshot will be returned when we say
820    // 'snapshot', the next time flush comes around.
821    // Retry after catching exception when flushing, otherwise server will abort
822    // itself
823    StoreFlusher flusher = storeEngine.getStoreFlusher();
824    IOException lastException = null;
825    for (int i = 0; i < flushRetriesNumber; i++) {
826      try {
827        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status,
828          throughputController, tracker, writerCreationTracker);
829        Path lastPathName = null;
830        try {
831          for (Path pathName : pathNames) {
832            lastPathName = pathName;
833            storeEngine.validateStoreFile(pathName);
834          }
835          return pathNames;
836        } catch (Exception e) {
837          LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
838          if (e instanceof IOException) {
839            lastException = (IOException) e;
840          } else {
841            lastException = new IOException(e);
842          }
843        }
844      } catch (IOException e) {
845        LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e);
846        lastException = e;
847      }
848      if (lastException != null && i < (flushRetriesNumber - 1)) {
849        try {
850          Thread.sleep(pauseTime);
851        } catch (InterruptedException e) {
852          IOException iie = new InterruptedIOException();
853          iie.initCause(e);
854          throw iie;
855        }
856      }
857    }
858    throw lastException;
859  }
860
861  public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
862    LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this);
863    FileSystem srcFs = path.getFileSystem(conf);
864    srcFs.access(path, FsAction.READ_WRITE);
865    try (HFile.Reader reader =
866      HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {
867      Optional<byte[]> firstKey = reader.getFirstRowKey();
868      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
869      Optional<Cell> lk = reader.getLastKey();
870      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
871      byte[] lastKey = CellUtil.cloneRow(lk.get());
872      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
873        throw new WrongRegionException("Recovered hfile " + path.toString()
874          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
875      }
876    }
877
878    Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
879    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
880    StoreFileReader r = sf.getReader();
881    this.storeSize.addAndGet(r.length());
882    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
883
884    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
885    });
886
887    LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
888      r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
889    return sf;
890  }
891
892  private long getTotalSize(Collection<HStoreFile> sfs) {
893    return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
894  }
895
896  private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
897    // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
898    // close {@link DefaultMemStore#snapshot}, which may be used by
899    // {@link DefaultMemStore#getScanners}.
900    storeEngine.addStoreFiles(sfs,
901      snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
902      });
903    // notify to be called here - only in case of flushes
904    notifyChangedReadersObservers(sfs);
905    if (LOG.isTraceEnabled()) {
906      long totalSize = getTotalSize(sfs);
907      String traceMessage = "FLUSH time,count,size,store size,store files ["
908        + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + ","
909        + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
910      LOG.trace(traceMessage);
911    }
912    return needsCompaction();
913  }
914
915  /**
916   * Notify all observers that set of Readers has changed.
917   */
918  private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
919    for (ChangedReadersObserver o : this.changedReaderObservers) {
920      List<KeyValueScanner> memStoreScanners;
921      this.storeEngine.readLock();
922      try {
923        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
924      } finally {
925        this.storeEngine.readUnlock();
926      }
927      o.updateReaders(sfs, memStoreScanners);
928    }
929  }
930
931  /**
932   * Get all scanners with no filtering based on TTL (that happens further down the line).
933   * @param cacheBlocks  cache the blocks or not
934   * @param usePread     true to use pread, false if not
935   * @param isCompaction true if the scanner is created for compaction
936   * @param matcher      the scan query matcher
937   * @param startRow     the start row
938   * @param stopRow      the stop row
939   * @param readPt       the read point of the current scan
940   * @return all scanners for this store
941   */
942  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
943    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)
944    throws IOException {
945    return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
946      readPt);
947  }
948
949  /**
950   * Get all scanners with no filtering based on TTL (that happens further down the line).
951   * @param cacheBlocks     cache the blocks or not
952   * @param usePread        true to use pread, false if not
953   * @param isCompaction    true if the scanner is created for compaction
954   * @param matcher         the scan query matcher
955   * @param startRow        the start row
956   * @param includeStartRow true to include start row, false if not
957   * @param stopRow         the stop row
958   * @param includeStopRow  true to include stop row, false if not
959   * @param readPt          the read point of the current scan
960   * @return all scanners for this store
961   */
962  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
963    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
964    byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
965    Collection<HStoreFile> storeFilesToScan;
966    List<KeyValueScanner> memStoreScanners;
967    this.storeEngine.readLock();
968    try {
969      storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
970        includeStartRow, stopRow, includeStopRow);
971      memStoreScanners = this.memstore.getScanners(readPt);
972    } finally {
973      this.storeEngine.readUnlock();
974    }
975
976    try {
977      // First the store file scanners
978
979      // TODO this used to get the store files in descending order,
980      // but now we get them in ascending order, which I think is
981      // actually more correct, since memstore get put at the end.
982      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
983        storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
984      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
985      scanners.addAll(sfScanners);
986      // Then the memstore scanners
987      scanners.addAll(memStoreScanners);
988      return scanners;
989    } catch (Throwable t) {
990      clearAndClose(memStoreScanners);
991      throw t instanceof IOException ? (IOException) t : new IOException(t);
992    }
993  }
994
995  private static void clearAndClose(List<KeyValueScanner> scanners) {
996    if (scanners == null) {
997      return;
998    }
999    for (KeyValueScanner s : scanners) {
1000      s.close();
1001    }
1002    scanners.clear();
1003  }
1004
1005  /**
1006   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1007   * (that happens further down the line).
1008   * @param files                  the list of files on which the scanners has to be created
1009   * @param cacheBlocks            cache the blocks or not
1010   * @param usePread               true to use pread, false if not
1011   * @param isCompaction           true if the scanner is created for compaction
1012   * @param matcher                the scan query matcher
1013   * @param startRow               the start row
1014   * @param stopRow                the stop row
1015   * @param readPt                 the read point of the current scan
1016   * @param includeMemstoreScanner true if memstore has to be included
1017   * @return scanners on the given files and on the memstore if specified
1018   */
1019  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1020    boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1021    byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner)
1022    throws IOException {
1023    return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
1024      false, readPt, includeMemstoreScanner);
1025  }
1026
1027  /**
1028   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1029   * (that happens further down the line).
1030   * @param files                  the list of files on which the scanners has to be created
1031   * @param cacheBlocks            ache the blocks or not
1032   * @param usePread               true to use pread, false if not
1033   * @param isCompaction           true if the scanner is created for compaction
1034   * @param matcher                the scan query matcher
1035   * @param startRow               the start row
1036   * @param includeStartRow        true to include start row, false if not
1037   * @param stopRow                the stop row
1038   * @param includeStopRow         true to include stop row, false if not
1039   * @param readPt                 the read point of the current scan
1040   * @param includeMemstoreScanner true if memstore has to be included
1041   * @return scanners on the given files and on the memstore if specified
1042   */
1043  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1044    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1045    boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1046    boolean includeMemstoreScanner) throws IOException {
1047    List<KeyValueScanner> memStoreScanners = null;
1048    if (includeMemstoreScanner) {
1049      this.storeEngine.readLock();
1050      try {
1051        memStoreScanners = this.memstore.getScanners(readPt);
1052      } finally {
1053        this.storeEngine.readUnlock();
1054      }
1055    }
1056    try {
1057      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1058        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1059      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1060      scanners.addAll(sfScanners);
1061      // Then the memstore scanners
1062      if (memStoreScanners != null) {
1063        scanners.addAll(memStoreScanners);
1064      }
1065      return scanners;
1066    } catch (Throwable t) {
1067      clearAndClose(memStoreScanners);
1068      throw t instanceof IOException ? (IOException) t : new IOException(t);
1069    }
1070  }
1071
1072  /**
1073   * @param o Observer who wants to know about changes in set of Readers
1074   */
1075  public void addChangedReaderObserver(ChangedReadersObserver o) {
1076    this.changedReaderObservers.add(o);
1077  }
1078
1079  /**
1080   * @param o Observer no longer interested in changes in set of Readers.
1081   */
1082  public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1083    // We don't check if observer present; it may not be (legitimately)
1084    this.changedReaderObservers.remove(o);
1085  }
1086
1087  //////////////////////////////////////////////////////////////////////////////
1088  // Compaction
1089  //////////////////////////////////////////////////////////////////////////////
1090
1091  /**
1092   * Compact the StoreFiles. This method may take some time, so the calling thread must be able to
1093   * block for long periods.
1094   * <p>
1095   * During this time, the Store can work as usual, getting values from StoreFiles and writing new
1096   * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted
1097   * StoreFile is completely written-out to disk.
1098   * <p>
1099   * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from
1100   * interfering with other write operations.
1101   * <p>
1102   * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and
1103   * we want to allow cache-flushes during this period.
1104   * <p>
1105   * Compaction event should be idempotent, since there is no IO Fencing for the region directory in
1106   * hdfs. A region server might still try to complete the compaction after it lost the region. That
1107   * is why the following events are carefully ordered for a compaction: 1. Compaction writes new
1108   * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the
1109   * temporary file under region directory 3. Compaction appends a WAL edit containing the
1110   * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files
1111   * from the region directory. Failure conditions are handled like this: - If RS fails before 2,
1112   * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only
1113   * write the new data file to the region directory. Since we already have this data, this will be
1114   * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the
1115   * region will have a redundant copy of the data. The RS that failed won't be able to finish
1116   * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region
1117   * server who opens the region will pick up the the compaction marker from the WAL and replay it
1118   * by removing the compaction input files. Failed RS can also attempt to delete those files, but
1119   * the operation will be idempotent See HBASE-2231 for details.
1120   * @param compaction compaction details obtained from requestCompaction()
1121   * @return Storefile we compacted into or null if we failed or opted out early.
1122   */
1123  public List<HStoreFile> compact(CompactionContext compaction,
1124    ThroughputController throughputController, User user) throws IOException {
1125    assert compaction != null;
1126    CompactionRequestImpl cr = compaction.getRequest();
1127    StoreFileWriterCreationTracker writerCreationTracker =
1128      storeFileWriterCreationTrackerFactory.get();
1129    if (writerCreationTracker != null) {
1130      cr.setWriterCreationTracker(writerCreationTracker);
1131      storeFileWriterCreationTrackers.add(writerCreationTracker);
1132    }
1133    try {
1134      // Do all sanity checking in here if we have a valid CompactionRequestImpl
1135      // because we need to clean up after it on the way out in a finally
1136      // block below
1137      long compactionStartTime = EnvironmentEdgeManager.currentTime();
1138      assert compaction.hasSelection();
1139      Collection<HStoreFile> filesToCompact = cr.getFiles();
1140      assert !filesToCompact.isEmpty();
1141      synchronized (filesCompacting) {
1142        // sanity check: we're compacting files that this store knows about
1143        // TODO: change this to LOG.error() after more debugging
1144        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1145      }
1146
1147      // Ready to go. Have list of files to compact.
1148      LOG.info("Starting compaction of " + filesToCompact + " into tmpdir="
1149        + getRegionFileSystem().getTempDir() + ", totalSize="
1150        + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1151
1152      return doCompaction(cr, filesToCompact, user, compactionStartTime,
1153        compaction.compact(throughputController, user));
1154    } finally {
1155      finishCompactionRequest(cr);
1156    }
1157  }
1158
1159  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
1160    Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles)
1161    throws IOException {
1162    // Do the steps necessary to complete the compaction.
1163    setStoragePolicyFromFileName(newFiles);
1164    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
1165    if (this.getCoprocessorHost() != null) {
1166      for (HStoreFile sf : sfs) {
1167        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
1168      }
1169    }
1170    replaceStoreFiles(filesToCompact, sfs, true);
1171
1172    long outputBytes = getTotalSize(sfs);
1173
1174    // At this point the store will use new files for all new scanners.
1175    refreshStoreSizeAndTotalBytes(); // update store size.
1176
1177    long now = EnvironmentEdgeManager.currentTime();
1178    if (
1179      region.getRegionServerServices() != null
1180        && region.getRegionServerServices().getMetrics() != null
1181    ) {
1182      region.getRegionServerServices().getMetrics().updateCompaction(
1183        region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(),
1184        now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1185        outputBytes);
1186
1187    }
1188
1189    logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1190    return sfs;
1191  }
1192
1193  // Set correct storage policy from the file name of DTCP.
1194  // Rename file will not change the storage policy.
1195  private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
1196    String prefix = HConstants.STORAGE_POLICY_PREFIX;
1197    for (Path newFile : newFiles) {
1198      if (newFile.getParent().getName().startsWith(prefix)) {
1199        CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,
1200          newFile.getParent().getName().substring(prefix.length()));
1201      }
1202    }
1203  }
1204
1205  /**
1206   * Writes the compaction WAL record.
1207   * @param filesCompacted Files compacted (input).
1208   * @param newFiles       Files from compaction.
1209   */
1210  private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
1211    Collection<HStoreFile> newFiles) throws IOException {
1212    if (region.getWAL() == null) {
1213      return;
1214    }
1215    List<Path> inputPaths =
1216      filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1217    List<Path> outputPaths =
1218      newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1219    RegionInfo info = this.region.getRegionInfo();
1220    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1221      getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,
1222      getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
1223    // Fix reaching into Region to get the maxWaitForSeqId.
1224    // Does this method belong in Region altogether given it is making so many references up there?
1225    // Could be Region#writeCompactionMarker(compactionDescriptor);
1226    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1227      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
1228      region.getRegionReplicationSink().orElse(null));
1229  }
1230
1231  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1232      allowedOnPath = ".*/(HStore|TestHStore).java")
1233  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
1234    boolean writeCompactionMarker) throws IOException {
1235    storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
1236      if (writeCompactionMarker) {
1237        writeCompactionWalRecord(compactedFiles, result);
1238      }
1239    }, () -> {
1240      synchronized (filesCompacting) {
1241        filesCompacting.removeAll(compactedFiles);
1242      }
1243    });
1244    // These may be null when the RS is shutting down. The space quota Chores will fix the Region
1245    // sizes later so it's not super-critical if we miss these.
1246    RegionServerServices rsServices = region.getRegionServerServices();
1247    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
1248      updateSpaceQuotaAfterFileReplacement(
1249        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1250        compactedFiles, result);
1251    }
1252  }
1253
1254  /**
1255   * Updates the space quota usage for this region, removing the size for files compacted away and
1256   * adding in the size for new files.
1257   * @param sizeStore  The object tracking changes in region size for space quotas.
1258   * @param regionInfo The identifier for the region whose size is being updated.
1259   * @param oldFiles   Files removed from this store's region.
1260   * @param newFiles   Files added to this store's region.
1261   */
1262  void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo,
1263    Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) {
1264    long delta = 0;
1265    if (oldFiles != null) {
1266      for (HStoreFile compactedFile : oldFiles) {
1267        if (compactedFile.isHFile()) {
1268          delta -= compactedFile.getReader().length();
1269        }
1270      }
1271    }
1272    if (newFiles != null) {
1273      for (HStoreFile newFile : newFiles) {
1274        if (newFile.isHFile()) {
1275          delta += newFile.getReader().length();
1276        }
1277      }
1278    }
1279    sizeStore.incrementRegionSize(regionInfo, delta);
1280  }
1281
1282  /**
1283   * Log a very elaborate compaction completion message.
1284   * @param cr                  Request.
1285   * @param sfs                 Resulting files.
1286   * @param compactionStartTime Start time.
1287   */
1288  private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now,
1289    long compactionStartTime) {
1290    StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "")
1291      + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "")
1292      + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
1293    if (sfs.isEmpty()) {
1294      message.append("none, ");
1295    } else {
1296      for (HStoreFile sf : sfs) {
1297        message.append(sf.getPath().getName());
1298        message.append("(size=");
1299        message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1300        message.append("), ");
1301      }
1302    }
1303    message.append("total size for store is ")
1304      .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
1305      .append(". This selection was in queue for ")
1306      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1307      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1308      .append(" to execute.");
1309    LOG.info(message.toString());
1310    if (LOG.isTraceEnabled()) {
1311      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1312      long resultSize = getTotalSize(sfs);
1313      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1314        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1315        + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1316      LOG.trace(traceMessage);
1317    }
1318  }
1319
1320  /**
1321   * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was
1322   * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231.
1323   */
1324  public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
1325    boolean removeFiles) throws IOException {
1326    LOG.debug("Completing compaction from the WAL marker");
1327    List<String> compactionInputs = compaction.getCompactionInputList();
1328    List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1329
1330    // The Compaction Marker is written after the compaction is completed,
1331    // and the files moved into the region/family folder.
1332    //
1333    // If we crash after the entry is written, we may not have removed the
1334    // input files, but the output file is present.
1335    // (The unremoved input files will be removed by this function)
1336    //
1337    // If we scan the directory and the file is not present, it can mean that:
1338    // - The file was manually removed by the user
1339    // - The file was removed as consequence of subsequent compaction
1340    // so, we can't do anything with the "compaction output list" because those
1341    // files have already been loaded when opening the region (by virtue of
1342    // being in the store's folder) or they may be missing due to a compaction.
1343
1344    String familyName = this.getColumnFamilyName();
1345    Set<String> inputFiles = new HashSet<>();
1346    for (String compactionInput : compactionInputs) {
1347      Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);
1348      inputFiles.add(inputPath.getName());
1349    }
1350
1351    // some of the input files might already be deleted
1352    List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
1353    for (HStoreFile sf : this.getStorefiles()) {
1354      if (inputFiles.contains(sf.getPath().getName())) {
1355        inputStoreFiles.add(sf);
1356      }
1357    }
1358
1359    // check whether we need to pick up the new files
1360    List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
1361
1362    if (pickCompactionFiles) {
1363      for (HStoreFile sf : this.getStorefiles()) {
1364        compactionOutputs.remove(sf.getPath().getName());
1365      }
1366      for (String compactionOutput : compactionOutputs) {
1367        StoreFileInfo storeFileInfo =
1368          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1369        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
1370        outputStoreFiles.add(storeFile);
1371      }
1372    }
1373
1374    if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1375      LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles
1376        + " with output files : " + outputStoreFiles);
1377      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
1378      this.refreshStoreSizeAndTotalBytes();
1379    }
1380  }
1381
1382  @Override
1383  public boolean hasReferences() {
1384    // Grab the read lock here, because we need to ensure that: only when the atomic
1385    // replaceStoreFiles(..) finished, we can get all the complete store file list.
1386    this.storeEngine.readLock();
1387    try {
1388      // Merge the current store files with compacted files here due to HBASE-20940.
1389      Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
1390      allStoreFiles.addAll(getCompactedFiles());
1391      return StoreUtils.hasReferences(allStoreFiles);
1392    } finally {
1393      this.storeEngine.readUnlock();
1394    }
1395  }
1396
1397  /**
1398   * getter for CompactionProgress object
1399   * @return CompactionProgress object; can be null
1400   */
1401  public CompactionProgress getCompactionProgress() {
1402    return this.storeEngine.getCompactor().getProgress();
1403  }
1404
1405  @Override
1406  public boolean shouldPerformMajorCompaction() throws IOException {
1407    for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1408      // TODO: what are these reader checks all over the place?
1409      if (sf.getReader() == null) {
1410        LOG.debug("StoreFile {} has null Reader", sf);
1411        return false;
1412      }
1413    }
1414    return storeEngine.getCompactionPolicy()
1415      .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());
1416  }
1417
1418  public Optional<CompactionContext> requestCompaction() throws IOException {
1419    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
1420  }
1421
1422  public Optional<CompactionContext> requestCompaction(int priority,
1423    CompactionLifeCycleTracker tracker, User user) throws IOException {
1424    // don't even select for compaction if writes are disabled
1425    if (!this.areWritesEnabled()) {
1426      return Optional.empty();
1427    }
1428    // Before we do compaction, try to get rid of unneeded files to simplify things.
1429    removeUnneededFiles();
1430
1431    final CompactionContext compaction = storeEngine.createCompaction();
1432    CompactionRequestImpl request = null;
1433    this.storeEngine.readLock();
1434    try {
1435      synchronized (filesCompacting) {
1436        // First, see if coprocessor would want to override selection.
1437        if (this.getCoprocessorHost() != null) {
1438          final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1439          boolean override =
1440            getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user);
1441          if (override) {
1442            // Coprocessor is overriding normal file selection.
1443            compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
1444          }
1445        }
1446
1447        // Normal case - coprocessor is not overriding file selection.
1448        if (!compaction.hasSelection()) {
1449          boolean isUserCompaction = priority == Store.PRIORITY_USER;
1450          boolean mayUseOffPeak =
1451            offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true);
1452          try {
1453            compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak,
1454              forceMajor && filesCompacting.isEmpty());
1455          } catch (IOException e) {
1456            if (mayUseOffPeak) {
1457              offPeakCompactionTracker.set(false);
1458            }
1459            throw e;
1460          }
1461          assert compaction.hasSelection();
1462          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1463            // Compaction policy doesn't want to take advantage of off-peak.
1464            offPeakCompactionTracker.set(false);
1465          }
1466        }
1467        if (this.getCoprocessorHost() != null) {
1468          this.getCoprocessorHost().postCompactSelection(this,
1469            ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
1470            compaction.getRequest(), user);
1471        }
1472        // Finally, we have the resulting files list. Check if we have any files at all.
1473        request = compaction.getRequest();
1474        Collection<HStoreFile> selectedFiles = request.getFiles();
1475        if (selectedFiles.isEmpty()) {
1476          return Optional.empty();
1477        }
1478
1479        addToCompactingFiles(selectedFiles);
1480
1481        // If we're enqueuing a major, clear the force flag.
1482        this.forceMajor = this.forceMajor && !request.isMajor();
1483
1484        // Set common request properties.
1485        // Set priority, either override value supplied by caller or from store.
1486        final int compactionPriority =
1487          (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
1488        request.setPriority(compactionPriority);
1489
1490        if (request.isAfterSplit()) {
1491          // If the store belongs to recently splitted daughter regions, better we consider
1492          // them with the higher priority in the compaction queue.
1493          // Override priority if it is lower (higher int value) than
1494          // SPLIT_REGION_COMPACTION_PRIORITY
1495          final int splitHousekeepingPriority =
1496            Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
1497          request.setPriority(splitHousekeepingPriority);
1498          LOG.info(
1499            "Keeping/Overriding Compaction request priority to {} for CF {} since it"
1500              + " belongs to recently split daughter region {}",
1501            splitHousekeepingPriority, this.getColumnFamilyName(),
1502            getRegionInfo().getRegionNameAsString());
1503        }
1504        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1505        request.setTracker(tracker);
1506      }
1507    } finally {
1508      this.storeEngine.readUnlock();
1509    }
1510
1511    if (LOG.isDebugEnabled()) {
1512      LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1513        + (request.isAllFiles() ? " (all files)" : ""));
1514    }
1515    this.region.reportCompactionRequestStart(request.isMajor());
1516    return Optional.of(compaction);
1517  }
1518
1519  /** Adds the files to compacting files. filesCompacting must be locked. */
1520  private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
1521    if (CollectionUtils.isEmpty(filesToAdd)) {
1522      return;
1523    }
1524    // Check that we do not try to compact the same StoreFile twice.
1525    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1526      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1527    }
1528    filesCompacting.addAll(filesToAdd);
1529    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1530  }
1531
1532  private void removeUnneededFiles() throws IOException {
1533    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
1534      return;
1535    }
1536    if (getColumnFamilyDescriptor().getMinVersions() > 0) {
1537      LOG.debug("Skipping expired store file removal due to min version of {} being {}", this,
1538        getColumnFamilyDescriptor().getMinVersions());
1539      return;
1540    }
1541    this.storeEngine.readLock();
1542    Collection<HStoreFile> delSfs = null;
1543    try {
1544      synchronized (filesCompacting) {
1545        long cfTtl = getStoreFileTtl();
1546        if (cfTtl != Long.MAX_VALUE) {
1547          delSfs = storeEngine.getStoreFileManager()
1548            .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1549          addToCompactingFiles(delSfs);
1550        }
1551      }
1552    } finally {
1553      this.storeEngine.readUnlock();
1554    }
1555
1556    if (CollectionUtils.isEmpty(delSfs)) {
1557      return;
1558    }
1559
1560    Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1561    replaceStoreFiles(delSfs, newFiles, true);
1562    refreshStoreSizeAndTotalBytes();
1563    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this
1564      + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
1565  }
1566
1567  public void cancelRequestedCompaction(CompactionContext compaction) {
1568    finishCompactionRequest(compaction.getRequest());
1569  }
1570
1571  private void finishCompactionRequest(CompactionRequestImpl cr) {
1572    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1573    if (cr.isOffPeak()) {
1574      offPeakCompactionTracker.set(false);
1575      cr.setOffPeak(false);
1576    }
1577    synchronized (filesCompacting) {
1578      filesCompacting.removeAll(cr.getFiles());
1579    }
1580    // The tracker could be null, for example, we do not need to track the creation of store file
1581    // writer due to different implementation of SFT, or the compaction is canceled.
1582    if (cr.getWriterCreationTracker() != null) {
1583      storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
1584    }
1585  }
1586
1587  /**
1588   * Update counts.
1589   */
1590  protected void refreshStoreSizeAndTotalBytes() throws IOException {
1591    this.storeSize.set(0L);
1592    this.totalUncompressedBytes.set(0L);
1593    for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1594      StoreFileReader r = hsf.getReader();
1595      if (r == null) {
1596        LOG.warn("StoreFile {} has a null Reader", hsf);
1597        continue;
1598      }
1599      this.storeSize.addAndGet(r.length());
1600      this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1601    }
1602  }
1603
1604  /*
1605   * @param wantedVersions How many versions were asked for.
1606   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1607   */
1608  int versionsToReturn(final int wantedVersions) {
1609    if (wantedVersions <= 0) {
1610      throw new IllegalArgumentException("Number of versions must be > 0");
1611    }
1612    // Make sure we do not return more than maximum versions for this store.
1613    int maxVersions = getColumnFamilyDescriptor().getMaxVersions();
1614    return wantedVersions > maxVersions ? maxVersions : wantedVersions;
1615  }
1616
1617  @Override
1618  public boolean canSplit() {
1619    // Not split-able if we find a reference store file present in the store.
1620    boolean result = !hasReferences();
1621    if (!result) {
1622      LOG.trace("Not splittable; has references: {}", this);
1623    }
1624    return result;
1625  }
1626
1627  /**
1628   * Determines if Store should be split.
1629   */
1630  public Optional<byte[]> getSplitPoint() {
1631    this.storeEngine.readLock();
1632    try {
1633      // Should already be enforced by the split policy!
1634      assert !this.getRegionInfo().isMetaRegion();
1635      // Not split-able if we find a reference store file present in the store.
1636      if (hasReferences()) {
1637        LOG.trace("Not splittable; has references: {}", this);
1638        return Optional.empty();
1639      }
1640      return this.storeEngine.getStoreFileManager().getSplitPoint();
1641    } catch (IOException e) {
1642      LOG.warn("Failed getting store size for {}", this, e);
1643    } finally {
1644      this.storeEngine.readUnlock();
1645    }
1646    return Optional.empty();
1647  }
1648
1649  @Override
1650  public long getLastCompactSize() {
1651    return this.lastCompactSize;
1652  }
1653
1654  @Override
1655  public long getSize() {
1656    return storeSize.get();
1657  }
1658
1659  public void triggerMajorCompaction() {
1660    this.forceMajor = true;
1661  }
1662
1663  //////////////////////////////////////////////////////////////////////////////
1664  // File administration
1665  //////////////////////////////////////////////////////////////////////////////
1666
1667  /**
1668   * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
1669   * compaction.
1670   * @param scan       Scan to apply when scanning the stores
1671   * @param targetCols columns to scan
1672   * @return a scanner over the current key values
1673   * @throws IOException on failure
1674   */
1675  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
1676    throws IOException {
1677    storeEngine.readLock();
1678    try {
1679      ScanInfo scanInfo;
1680      if (this.getCoprocessorHost() != null) {
1681        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
1682      } else {
1683        scanInfo = getScanInfo();
1684      }
1685      return createScanner(scan, scanInfo, targetCols, readPt);
1686    } finally {
1687      storeEngine.readUnlock();
1688    }
1689  }
1690
1691  // HMobStore will override this method to return its own implementation.
1692  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
1693    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
1694    return scan.isReversed()
1695      ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
1696      : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
1697  }
1698
1699  /**
1700   * Recreates the scanners on the current list of active store file scanners
1701   * @param currentFileScanners    the current set of active store file scanners
1702   * @param cacheBlocks            cache the blocks or not
1703   * @param usePread               use pread or not
1704   * @param isCompaction           is the scanner for compaction
1705   * @param matcher                the scan query matcher
1706   * @param startRow               the scan's start row
1707   * @param includeStartRow        should the scan include the start row
1708   * @param stopRow                the scan's stop row
1709   * @param includeStopRow         should the scan include the stop row
1710   * @param readPt                 the read point of the current scane
1711   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
1712   * @return list of scanners recreated on the current Scanners
1713   */
1714  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
1715    boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1716    byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1717    boolean includeMemstoreScanner) throws IOException {
1718    this.storeEngine.readLock();
1719    try {
1720      Map<String, HStoreFile> name2File =
1721        new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
1722      for (HStoreFile file : getStorefiles()) {
1723        name2File.put(file.getFileInfo().getActiveFileName(), file);
1724      }
1725      Collection<HStoreFile> compactedFiles = getCompactedFiles();
1726      for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
1727        name2File.put(file.getFileInfo().getActiveFileName(), file);
1728      }
1729      List<HStoreFile> filesToReopen = new ArrayList<>();
1730      for (KeyValueScanner kvs : currentFileScanners) {
1731        assert kvs.isFileScanner();
1732        if (kvs.peek() == null) {
1733          continue;
1734        }
1735        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
1736      }
1737      if (filesToReopen.isEmpty()) {
1738        return null;
1739      }
1740      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
1741        includeStartRow, stopRow, includeStopRow, readPt, false);
1742    } finally {
1743      this.storeEngine.readUnlock();
1744    }
1745  }
1746
1747  @Override
1748  public String toString() {
1749    return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName();
1750  }
1751
1752  @Override
1753  public int getStorefilesCount() {
1754    return this.storeEngine.getStoreFileManager().getStorefileCount();
1755  }
1756
1757  @Override
1758  public int getCompactedFilesCount() {
1759    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
1760  }
1761
1762  private LongStream getStoreFileAgeStream() {
1763    return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
1764      if (sf.getReader() == null) {
1765        LOG.warn("StoreFile {} has a null Reader", sf);
1766        return false;
1767      } else {
1768        return true;
1769      }
1770    }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())
1771      .map(t -> EnvironmentEdgeManager.currentTime() - t);
1772  }
1773
1774  @Override
1775  public OptionalLong getMaxStoreFileAge() {
1776    return getStoreFileAgeStream().max();
1777  }
1778
1779  @Override
1780  public OptionalLong getMinStoreFileAge() {
1781    return getStoreFileAgeStream().min();
1782  }
1783
1784  @Override
1785  public OptionalDouble getAvgStoreFileAge() {
1786    return getStoreFileAgeStream().average();
1787  }
1788
1789  @Override
1790  public long getNumReferenceFiles() {
1791    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1792      .filter(HStoreFile::isReference).count();
1793  }
1794
1795  @Override
1796  public long getNumHFiles() {
1797    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1798      .filter(HStoreFile::isHFile).count();
1799  }
1800
1801  @Override
1802  public long getStoreSizeUncompressed() {
1803    return this.totalUncompressedBytes.get();
1804  }
1805
1806  @Override
1807  public long getStorefilesSize() {
1808    // Include all StoreFiles
1809    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
1810      sf -> true);
1811  }
1812
1813  @Override
1814  public long getHFilesSize() {
1815    // Include only StoreFiles which are HFiles
1816    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
1817      HStoreFile::isHFile);
1818  }
1819
1820  private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
1821    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1822      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
1823  }
1824
1825  @Override
1826  public long getStorefilesRootLevelIndexSize() {
1827    return getStorefilesFieldSize(StoreFileReader::indexSize);
1828  }
1829
1830  @Override
1831  public long getTotalStaticIndexSize() {
1832    return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);
1833  }
1834
1835  @Override
1836  public long getTotalStaticBloomSize() {
1837    return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);
1838  }
1839
1840  @Override
1841  public MemStoreSize getMemStoreSize() {
1842    return this.memstore.size();
1843  }
1844
1845  @Override
1846  public int getCompactPriority() {
1847    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1848    if (priority == PRIORITY_USER) {
1849      LOG.warn("Compaction priority is USER despite there being no user compaction");
1850    }
1851    return priority;
1852  }
1853
1854  public boolean throttleCompaction(long compactionSize) {
1855    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1856  }
1857
1858  public HRegion getHRegion() {
1859    return this.region;
1860  }
1861
1862  public RegionCoprocessorHost getCoprocessorHost() {
1863    return this.region.getCoprocessorHost();
1864  }
1865
1866  @Override
1867  public RegionInfo getRegionInfo() {
1868    return getRegionFileSystem().getRegionInfo();
1869  }
1870
1871  @Override
1872  public boolean areWritesEnabled() {
1873    return this.region.areWritesEnabled();
1874  }
1875
1876  @Override
1877  public long getSmallestReadPoint() {
1878    return this.region.getSmallestReadPoint();
1879  }
1880
1881  /**
1882   * Adds or replaces the specified KeyValues.
1883   * <p>
1884   * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
1885   * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
1886   * <p>
1887   * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
1888   * across all of them.
1889   * @param readpoint readpoint below which we can safely remove duplicate KVs
1890   */
1891  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
1892    throws IOException {
1893    this.storeEngine.readLock();
1894    try {
1895      this.memstore.upsert(cells, readpoint, memstoreSizing);
1896    } finally {
1897      this.storeEngine.readUnlock();
1898    }
1899  }
1900
1901  public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
1902    return new StoreFlusherImpl(cacheFlushId, tracker);
1903  }
1904
1905  private final class StoreFlusherImpl implements StoreFlushContext {
1906
1907    private final FlushLifeCycleTracker tracker;
1908    private final StoreFileWriterCreationTracker writerCreationTracker;
1909    private final long cacheFlushSeqNum;
1910    private MemStoreSnapshot snapshot;
1911    private List<Path> tempFiles;
1912    private List<Path> committedFiles;
1913    private long cacheFlushCount;
1914    private long cacheFlushSize;
1915    private long outputFileSize;
1916
1917    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
1918      this.cacheFlushSeqNum = cacheFlushSeqNum;
1919      this.tracker = tracker;
1920      this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
1921    }
1922
1923    /**
1924     * This is not thread safe. The caller should have a lock on the region or the store. If
1925     * necessary, the lock can be added with the patch provided in HBASE-10087
1926     */
1927    @Override
1928    public MemStoreSize prepare() {
1929      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
1930      this.snapshot = memstore.snapshot();
1931      this.cacheFlushCount = snapshot.getCellsCount();
1932      this.cacheFlushSize = snapshot.getDataSize();
1933      committedFiles = new ArrayList<>(1);
1934      return snapshot.getMemStoreSize();
1935    }
1936
1937    @Override
1938    public void flushCache(MonitoredTask status) throws IOException {
1939      RegionServerServices rsService = region.getRegionServerServices();
1940      ThroughputController throughputController =
1941        rsService == null ? null : rsService.getFlushThroughputController();
1942      // it could be null if we do not need to track the creation of store file writer due to
1943      // different SFT implementation.
1944      if (writerCreationTracker != null) {
1945        HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
1946      }
1947      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController,
1948        tracker, writerCreationTracker);
1949    }
1950
1951    @Override
1952    public boolean commit(MonitoredTask status) throws IOException {
1953      try {
1954        if (CollectionUtils.isEmpty(this.tempFiles)) {
1955          return false;
1956        }
1957        status.setStatus("Flushing " + this + ": reopening flushed file");
1958        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
1959        for (HStoreFile sf : storeFiles) {
1960          StoreFileReader r = sf.getReader();
1961          if (LOG.isInfoEnabled()) {
1962            LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
1963              cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
1964          }
1965          outputFileSize += r.length();
1966          storeSize.addAndGet(r.length());
1967          totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1968          committedFiles.add(sf.getPath());
1969        }
1970
1971        flushedCellsCount.addAndGet(cacheFlushCount);
1972        flushedCellsSize.addAndGet(cacheFlushSize);
1973        flushedOutputFileSize.addAndGet(outputFileSize);
1974        // call coprocessor after we have done all the accounting above
1975        for (HStoreFile sf : storeFiles) {
1976          if (getCoprocessorHost() != null) {
1977            getCoprocessorHost().postFlush(HStore.this, sf, tracker);
1978          }
1979        }
1980        // Add new file to store files. Clear snapshot too while we have the Store write lock.
1981        return completeFlush(storeFiles, snapshot.getId());
1982      } finally {
1983        if (writerCreationTracker != null) {
1984          HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
1985        }
1986      }
1987    }
1988
1989    @Override
1990    public long getOutputFileSize() {
1991      return outputFileSize;
1992    }
1993
1994    @Override
1995    public List<Path> getCommittedFiles() {
1996      return committedFiles;
1997    }
1998
1999    /**
2000     * Similar to commit, but called in secondary region replicas for replaying the flush cache from
2001     * primary region. Adds the new files to the store, and drops the snapshot depending on
2002     * dropMemstoreSnapshot argument.
2003     * @param fileNames            names of the flushed files
2004     * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2005     */
2006    @Override
2007    public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2008      throws IOException {
2009      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
2010      for (String file : fileNames) {
2011        // open the file as a store file (hfile link, etc)
2012        StoreFileInfo storeFileInfo =
2013          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
2014        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
2015        storeFiles.add(storeFile);
2016        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
2017        HStore.this.totalUncompressedBytes
2018          .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
2019        if (LOG.isInfoEnabled()) {
2020          LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries()
2021            + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="
2022            + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
2023        }
2024      }
2025
2026      long snapshotId = -1; // -1 means do not drop
2027      if (dropMemstoreSnapshot && snapshot != null) {
2028        snapshotId = snapshot.getId();
2029      }
2030      HStore.this.completeFlush(storeFiles, snapshotId);
2031    }
2032
2033    /**
2034     * Abort the snapshot preparation. Drops the snapshot if any.
2035     */
2036    @Override
2037    public void abort() throws IOException {
2038      if (snapshot != null) {
2039        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
2040      }
2041    }
2042  }
2043
2044  @Override
2045  public boolean needsCompaction() {
2046    List<HStoreFile> filesCompactingClone = null;
2047    synchronized (filesCompacting) {
2048      filesCompactingClone = Lists.newArrayList(filesCompacting);
2049    }
2050    return this.storeEngine.needsCompaction(filesCompactingClone);
2051  }
2052
2053  /**
2054   * Used for tests.
2055   * @return cache configuration for this Store.
2056   */
2057  public CacheConfig getCacheConfig() {
2058    return storeContext.getCacheConf();
2059  }
2060
2061  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
2062
2063  public static final long DEEP_OVERHEAD = ClassSize.align(
2064    FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP
2065      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD);
2066
2067  @Override
2068  public long heapSize() {
2069    MemStoreSize memstoreSize = this.memstore.size();
2070    return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();
2071  }
2072
2073  @Override
2074  public CellComparator getComparator() {
2075    return storeContext.getComparator();
2076  }
2077
2078  public ScanInfo getScanInfo() {
2079    return scanInfo;
2080  }
2081
2082  /**
2083   * Set scan info, used by test
2084   * @param scanInfo new scan info to use for test
2085   */
2086  void setScanInfo(ScanInfo scanInfo) {
2087    this.scanInfo = scanInfo;
2088  }
2089
2090  @Override
2091  public boolean hasTooManyStoreFiles() {
2092    return getStorefilesCount() > this.blockingFileCount;
2093  }
2094
2095  @Override
2096  public long getFlushedCellsCount() {
2097    return flushedCellsCount.get();
2098  }
2099
2100  @Override
2101  public long getFlushedCellsSize() {
2102    return flushedCellsSize.get();
2103  }
2104
2105  @Override
2106  public long getFlushedOutputFileSize() {
2107    return flushedOutputFileSize.get();
2108  }
2109
2110  @Override
2111  public long getCompactedCellsCount() {
2112    return compactedCellsCount.get();
2113  }
2114
2115  @Override
2116  public long getCompactedCellsSize() {
2117    return compactedCellsSize.get();
2118  }
2119
2120  @Override
2121  public long getMajorCompactedCellsCount() {
2122    return majorCompactedCellsCount.get();
2123  }
2124
2125  @Override
2126  public long getMajorCompactedCellsSize() {
2127    return majorCompactedCellsSize.get();
2128  }
2129
2130  public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
2131    if (isMajor) {
2132      majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2133      majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
2134    } else {
2135      compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2136      compactedCellsSize.addAndGet(progress.totalCompactedSize);
2137    }
2138  }
2139
2140  /**
2141   * Returns the StoreEngine that is backing this concrete implementation of Store.
2142   * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2143   */
2144  public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2145    return this.storeEngine;
2146  }
2147
2148  protected OffPeakHours getOffPeakHours() {
2149    return this.offPeakHours;
2150  }
2151
2152  @Override
2153  public void onConfigurationChange(Configuration conf) {
2154    Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(),
2155      getColumnFamilyDescriptor());
2156    this.conf = storeConf;
2157    this.storeEngine.compactionPolicy.setConf(storeConf);
2158    this.offPeakHours = OffPeakHours.getInstance(storeConf);
2159  }
2160
2161  /**
2162   * {@inheritDoc}
2163   */
2164  @Override
2165  public void registerChildren(ConfigurationManager manager) {
2166    // No children to register
2167  }
2168
2169  /**
2170   * {@inheritDoc}
2171   */
2172  @Override
2173  public void deregisterChildren(ConfigurationManager manager) {
2174    // No children to deregister
2175  }
2176
2177  @Override
2178  public double getCompactionPressure() {
2179    return storeEngine.getStoreFileManager().getCompactionPressure();
2180  }
2181
2182  @Override
2183  public boolean isPrimaryReplicaStore() {
2184    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
2185  }
2186
2187  /**
2188   * Sets the store up for a region level snapshot operation.
2189   * @see #postSnapshotOperation()
2190   */
2191  public void preSnapshotOperation() {
2192    archiveLock.lock();
2193  }
2194
2195  /**
2196   * Perform tasks needed after the completion of snapshot operation.
2197   * @see #preSnapshotOperation()
2198   */
2199  public void postSnapshotOperation() {
2200    archiveLock.unlock();
2201  }
2202
2203  /**
2204   * Closes and archives the compacted files under this store
2205   */
2206  public synchronized void closeAndArchiveCompactedFiles() throws IOException {
2207    // ensure other threads do not attempt to archive the same files on close()
2208    archiveLock.lock();
2209    try {
2210      storeEngine.readLock();
2211      Collection<HStoreFile> copyCompactedfiles = null;
2212      try {
2213        Collection<HStoreFile> compactedfiles =
2214          this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2215        if (CollectionUtils.isNotEmpty(compactedfiles)) {
2216          // Do a copy under read lock
2217          copyCompactedfiles = new ArrayList<>(compactedfiles);
2218        } else {
2219          LOG.trace("No compacted files to archive");
2220        }
2221      } finally {
2222        storeEngine.readUnlock();
2223      }
2224      if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
2225        removeCompactedfiles(copyCompactedfiles, true);
2226      }
2227    } finally {
2228      archiveLock.unlock();
2229    }
2230  }
2231
2232  /**
2233   * Archives and removes the compacted files
2234   * @param compactedfiles The compacted files in this store that are not active in reads
2235   * @param evictOnClose   true if blocks should be evicted from the cache when an HFile reader is
2236   *                       closed, false if not
2237   */
2238  private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose)
2239    throws IOException {
2240    final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
2241    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
2242    for (final HStoreFile file : compactedfiles) {
2243      synchronized (file) {
2244        try {
2245          StoreFileReader r = file.getReader();
2246          if (r == null) {
2247            LOG.debug("The file {} was closed but still not archived", file);
2248            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
2249            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
2250            // for that.
2251            long length = getStoreFileSize(file);
2252            filesToRemove.add(file);
2253            storeFileSizes.add(length);
2254            continue;
2255          }
2256
2257          if (file.isCompactedAway() && !file.isReferencedInReads()) {
2258            // Even if deleting fails we need not bother as any new scanners won't be
2259            // able to use the compacted file as the status is already compactedAway
2260            LOG.trace("Closing and archiving the file {}", file);
2261            // Copy the file size before closing the reader
2262            final long length = r.length();
2263            r.close(evictOnClose);
2264            // Just close and return
2265            filesToRemove.add(file);
2266            // Only add the length if we successfully added the file to `filesToRemove`
2267            storeFileSizes.add(length);
2268          } else {
2269            LOG.info("Can't archive compacted file " + file.getPath()
2270              + " because of either isCompactedAway=" + file.isCompactedAway()
2271              + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()
2272              + ", refCount=" + r.getRefCount() + ", skipping for now.");
2273          }
2274        } catch (Exception e) {
2275          LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
2276            e);
2277        }
2278      }
2279    }
2280    if (this.isPrimaryReplicaStore()) {
2281      // Only the primary region is allowed to move the file to archive.
2282      // The secondary region does not move the files to archive. Any active reads from
2283      // the secondary region will still work because the file as such has active readers on it.
2284      if (!filesToRemove.isEmpty()) {
2285        LOG.debug("Moving the files {} to archive", filesToRemove);
2286        // Only if this is successful it has to be removed
2287        try {
2288          getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
2289            filesToRemove);
2290        } catch (FailedArchiveException fae) {
2291          // Even if archiving some files failed, we still need to clear out any of the
2292          // files which were successfully archived. Otherwise we will receive a
2293          // FileNotFoundException when we attempt to re-archive them in the next go around.
2294          Collection<Path> failedFiles = fae.getFailedFiles();
2295          Iterator<HStoreFile> iter = filesToRemove.iterator();
2296          Iterator<Long> sizeIter = storeFileSizes.iterator();
2297          while (iter.hasNext()) {
2298            sizeIter.next();
2299            if (failedFiles.contains(iter.next().getPath())) {
2300              iter.remove();
2301              sizeIter.remove();
2302            }
2303          }
2304          if (!filesToRemove.isEmpty()) {
2305            clearCompactedfiles(filesToRemove);
2306          }
2307          throw fae;
2308        }
2309      }
2310    }
2311    if (!filesToRemove.isEmpty()) {
2312      // Clear the compactedfiles from the store file manager
2313      clearCompactedfiles(filesToRemove);
2314      // Try to send report of this archival to the Master for updating quota usage faster
2315      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
2316    }
2317  }
2318
2319  /**
2320   * Computes the length of a store file without succumbing to any errors along the way. If an error
2321   * is encountered, the implementation returns {@code 0} instead of the actual size.
2322   * @param file The file to compute the size of.
2323   * @return The size in bytes of the provided {@code file}.
2324   */
2325  long getStoreFileSize(HStoreFile file) {
2326    long length = 0;
2327    try {
2328      file.initReader();
2329      length = file.getReader().length();
2330    } catch (IOException e) {
2331      LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring",
2332        file, e);
2333    } finally {
2334      try {
2335        file.closeStoreFile(
2336          file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
2337      } catch (IOException e) {
2338        LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file,
2339          e);
2340      }
2341    }
2342    return length;
2343  }
2344
2345  public Long preFlushSeqIDEstimation() {
2346    return memstore.preFlushSeqIDEstimation();
2347  }
2348
2349  @Override
2350  public boolean isSloppyMemStore() {
2351    return this.memstore.isSloppy();
2352  }
2353
2354  private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
2355    LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
2356    storeEngine.removeCompactedFiles(filesToRemove);
2357  }
2358
2359  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
2360    // Sanity check from the caller
2361    if (archivedFiles.size() != fileSizes.size()) {
2362      throw new RuntimeException("Coding error: should never see lists of varying size");
2363    }
2364    RegionServerServices rss = this.region.getRegionServerServices();
2365    if (rss == null) {
2366      return;
2367    }
2368    List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
2369    Iterator<Long> fileSizeIter = fileSizes.iterator();
2370    for (StoreFile storeFile : archivedFiles) {
2371      final long fileSize = fileSizeIter.next();
2372      if (storeFile.isHFile() && fileSize != 0) {
2373        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
2374      }
2375    }
2376    if (LOG.isTraceEnabled()) {
2377      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
2378        + filesWithSizes);
2379    }
2380    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
2381    if (!success) {
2382      LOG.warn("Failed to report archival of files: " + filesWithSizes);
2383    }
2384  }
2385
2386  @Override
2387  public int getCurrentParallelPutCount() {
2388    return currentParallelPutCount.get();
2389  }
2390
2391  public int getStoreRefCount() {
2392    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2393      .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2394      .mapToInt(HStoreFile::getRefCount).sum();
2395  }
2396
2397  /**
2398   * @return get maximum ref count of storeFile among all compacted HStore Files for the HStore
2399   */
2400  public int getMaxCompactedStoreFileRefCount() {
2401    OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
2402      .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2403      .mapToInt(HStoreFile::getRefCount).max();
2404    return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0;
2405  }
2406
2407  @Override
2408  public long getMemstoreOnlyRowReadsCount() {
2409    return memstoreOnlyRowReadsCount.sum();
2410  }
2411
2412  @Override
2413  public long getMixedRowReadsCount() {
2414    return mixedRowReadsCount.sum();
2415  }
2416
2417  @Override
2418  public Configuration getReadOnlyConfiguration() {
2419    return new ReadOnlyConfiguration(this.conf);
2420  }
2421
2422  void updateMetricsStore(boolean memstoreRead) {
2423    if (memstoreRead) {
2424      memstoreOnlyRowReadsCount.increment();
2425    } else {
2426      mixedRowReadsCount.increment();
2427    }
2428  }
2429
2430  /**
2431   * Return the storefiles which are currently being written to. Mainly used by
2432   * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
2433   * SFT yet.
2434   */
2435  public Set<Path> getStoreFilesBeingWritten() {
2436    return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
2437      .collect(Collectors.toSet());
2438  }
2439}