001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetSocketAddress;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableSet;
034import java.util.Optional;
035import java.util.OptionalDouble;
036import java.util.OptionalInt;
037import java.util.OptionalLong;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CompletionService;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorCompletionService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import java.util.concurrent.atomic.LongAdder;
050import java.util.concurrent.locks.ReentrantLock;
051import java.util.function.Consumer;
052import java.util.function.Supplier;
053import java.util.function.ToLongFunction;
054import java.util.stream.Collectors;
055import java.util.stream.LongStream;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellComparator;
062import org.apache.hadoop.hbase.CellUtil;
063import org.apache.hadoop.hbase.ExtendedCell;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.InnerStoreCellComparator;
066import org.apache.hadoop.hbase.MemoryCompactionPolicy;
067import org.apache.hadoop.hbase.MetaCellComparator;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.backup.FailedArchiveException;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.RegionInfo;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.conf.ConfigurationManager;
074import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
075import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
076import org.apache.hadoop.hbase.io.HeapSize;
077import org.apache.hadoop.hbase.io.hfile.CacheConfig;
078import org.apache.hadoop.hbase.io.hfile.HFile;
079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
083import org.apache.hadoop.hbase.monitoring.MonitoredTask;
084import org.apache.hadoop.hbase.quotas.RegionSizeStore;
085import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
086import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
087import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
088import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
089import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
090import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
091import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
092import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
093import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
094import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
095import org.apache.hadoop.hbase.security.EncryptionUtil;
096import org.apache.hadoop.hbase.security.User;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.ClassSize;
099import org.apache.hadoop.hbase.util.CommonFSUtils;
100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.ReflectionUtils;
103import org.apache.hadoop.util.StringUtils;
104import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
105import org.apache.yetus.audience.InterfaceAudience;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
110import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
112import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
113import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
119
120/**
121 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles,
122 * which stretch backwards over time.
123 * <p>
124 * There's no reason to consider append-logging at this level; all logging and locking is handled at
125 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most
126 * important of those services is compaction services where files are aggregated once they pass a
127 * configurable threshold.
128 * <p>
129 * Locking and transactions are handled at a higher level. This API should not be called directly
130 * but by an HRegion manager.
131 */
132@InterfaceAudience.Private
133public class HStore
134  implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
135  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
136  public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
137    "hbase.server.compactchecker.interval.multiplier";
138  public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
139  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
140  // "NONE" is not a valid storage policy and means we defer the policy to HDFS
141  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE";
142  public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
143  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
144
145  // HBASE-24428 : Update compaction priority for recently split daughter regions
146  // so as to prioritize their compaction.
147  // Any compaction candidate with higher priority than compaction of newly split daugher regions
148  // should have priority value < (Integer.MIN_VALUE + 1000)
149  private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
150
151  private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
152
153  protected final MemStore memstore;
154  // This stores directory in the filesystem.
155  private final HRegion region;
156  protected Configuration conf;
157  private long lastCompactSize = 0;
158  volatile boolean forceMajor = false;
159  private AtomicLong storeSize = new AtomicLong();
160  private AtomicLong totalUncompressedBytes = new AtomicLong();
161  private LongAdder memstoreOnlyRowReadsCount = new LongAdder();
162  // rows that has cells from both memstore and files (or only files)
163  private LongAdder mixedRowReadsCount = new LongAdder();
164
165  /**
166   * Lock specific to archiving compacted store files. This avoids races around the combination of
167   * retrieving the list of compacted files and moving them to the archive directory. Since this is
168   * usually a background process (other than on close), we don't want to handle this with the store
169   * write lock, which would block readers and degrade performance. Locked by: -
170   * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close()
171   */
172  final ReentrantLock archiveLock = new ReentrantLock();
173
174  private final boolean verifyBulkLoads;
175
176  /**
177   * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
178   * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a
179   * message that identifies the Store experience this high-level of concurrency.
180   */
181  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
182  private final int parallelPutCountPrintThreshold;
183
184  private ScanInfo scanInfo;
185
186  // All access must be synchronized.
187  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
188  private final List<HStoreFile> filesCompacting = Lists.newArrayList();
189
190  // All access must be synchronized.
191  private final Set<ChangedReadersObserver> changedReaderObservers =
192    Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
193
194  private HFileDataBlockEncoder dataBlockEncoder;
195
196  final StoreEngine<?, ?, ?, ?> storeEngine;
197
198  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
199  private volatile OffPeakHours offPeakHours;
200
201  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
202  private int flushRetriesNumber;
203  private int pauseTime;
204
205  private long blockingFileCount;
206  private int compactionCheckMultiplier;
207
208  private AtomicLong flushedCellsCount = new AtomicLong();
209  private AtomicLong compactedCellsCount = new AtomicLong();
210  private AtomicLong majorCompactedCellsCount = new AtomicLong();
211  private AtomicLong flushedCellsSize = new AtomicLong();
212  private AtomicLong flushedOutputFileSize = new AtomicLong();
213  private AtomicLong compactedCellsSize = new AtomicLong();
214  private AtomicLong majorCompactedCellsSize = new AtomicLong();
215
216  private final StoreContext storeContext;
217
218  // Used to track the store files which are currently being written. For compaction, if we want to
219  // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
220  // track the store files being written when flushing.
221  // Notice that the creation is in the background compaction or flush thread and we will get the
222  // files in other thread, so it needs to be thread safe.
223  private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
224
225    private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
226
227    @Override
228    public void accept(Path t) {
229      files.add(t);
230    }
231
232    public Set<Path> get() {
233      return Collections.unmodifiableSet(files);
234    }
235  }
236
237  // We may have multiple compaction running at the same time, and flush can also happen at the same
238  // time, so here we need to use a collection, and the collection needs to be thread safe.
239  // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
240  // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
241  // IdentityHashMap if later we want to implement hashCode or equals.
242  private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
243    Collections.newSetFromMap(new ConcurrentHashMap<>());
244
245  // For the SFT implementation which we will write tmp store file first, we do not need to clean up
246  // the broken store files under the data directory, which means we do not need to track the store
247  // file writer creation. So here we abstract a factory to return different trackers for different
248  // SFT implementations.
249  private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
250
251  private final boolean warmup;
252
253  /**
254   * Constructor
255   * @param family    HColumnDescriptor for this column
256   * @param confParam configuration object failed. Can be null.
257   */
258  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
259    final Configuration confParam, boolean warmup) throws IOException {
260    this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
261
262    this.region = region;
263    this.storeContext = initializeStoreContext(family);
264
265    // Assemble the store's home directory and Ensure it exists.
266    region.getRegionFileSystem().createStoreDir(family.getNameAsString());
267
268    // set block storage policy for store directory
269    String policyName = family.getStoragePolicy();
270    if (null == policyName) {
271      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
272    }
273    region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());
274
275    this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
276
277    // used by ScanQueryMatcher
278    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
279    LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this);
280    // Get TTL
281    long ttl = determineTTLFromFamily(family);
282    // Why not just pass a HColumnDescriptor in here altogether? Even if have
283    // to clone it?
284    scanInfo =
285      new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator());
286    this.memstore = getMemstore();
287
288    this.offPeakHours = OffPeakHours.getInstance(conf);
289
290    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
291
292    this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
293    this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY,
294      DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
295    if (this.compactionCheckMultiplier <= 0) {
296      LOG.error("Compaction check period multiplier must be positive, setting default: {}",
297        DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
298      this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
299    }
300
301    this.warmup = warmup;
302    this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
303    storeEngine.initialize(warmup);
304    // if require writing to tmp dir first, then we just return null, which indicate that we do not
305    // need to track the creation of store file writer, otherwise we return a new
306    // StoreFileWriterCreationTracker.
307    this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst()
308      ? () -> null
309      : () -> new StoreFileWriterCreationTracker();
310    refreshStoreSizeAndTotalBytes();
311
312    flushRetriesNumber =
313      conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
314    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
315    if (flushRetriesNumber <= 0) {
316      throw new IllegalArgumentException(
317        "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber);
318    }
319
320    int confPrintThreshold =
321      this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
322    if (confPrintThreshold < 10) {
323      confPrintThreshold = 10;
324    }
325    this.parallelPutCountPrintThreshold = confPrintThreshold;
326
327    LOG.info(
328      "Store={},  memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
329        + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
330      this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
331      parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType());
332  }
333
334  private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
335    return new StoreContext.Builder().withBlockSize(family.getBlocksize())
336      .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))
337      .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family))
338      .withCellComparator(region.getTableDescriptor().isMetaTable() || conf
339        .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR)
340          ? MetaCellComparator.META_COMPARATOR
341          : InnerStoreCellComparator.INNER_STORE_COMPARATOR)
342      .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles)
343      .withRegionFileSystem(region.getRegionFileSystem())
344      .withFavoredNodesSupplier(this::getFavoredNodes)
345      .withFamilyStoreDirectoryPath(
346        region.getRegionFileSystem().getStoreDir(family.getNameAsString()))
347      .withRegionCoprocessorHost(region.getCoprocessorHost()).build();
348  }
349
350  private InetSocketAddress[] getFavoredNodes() {
351    InetSocketAddress[] favoredNodes = null;
352    if (region.getRegionServerServices() != null) {
353      favoredNodes = region.getRegionServerServices()
354        .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
355    }
356    return favoredNodes;
357  }
358
359  /** Returns MemStore Instance to use in this store. */
360  private MemStore getMemstore() {
361    MemStore ms = null;
362    // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
363    MemoryCompactionPolicy inMemoryCompaction = null;
364    if (this.getTableName().isSystemTable()) {
365      inMemoryCompaction = MemoryCompactionPolicy
366        .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase());
367    } else {
368      inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();
369    }
370    if (inMemoryCompaction == null) {
371      inMemoryCompaction =
372        MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
373          CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());
374    }
375
376    switch (inMemoryCompaction) {
377      case NONE:
378        Class<? extends MemStore> memStoreClass =
379          conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class);
380        ms = ReflectionUtils.newInstance(memStoreClass,
381          new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() });
382        break;
383      default:
384        Class<? extends CompactingMemStore> compactingMemStoreClass =
385          conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
386        ms =
387          ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(),
388            this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
389    }
390    return ms;
391  }
392
393  /**
394   * Creates the cache config.
395   * @param family The current column family.
396   */
397  protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {
398    CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
399      region.getRegionServicesForStores().getByteBuffAllocator());
400    LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,
401      family.getNameAsString(), region.getRegionInfo().getEncodedName());
402    return cacheConf;
403  }
404
405  /**
406   * Creates the store engine configured for the given Store.
407   * @param store        The store. An unfortunate dependency needed due to it being passed to
408   *                     coprocessors via the compactor.
409   * @param conf         Store configuration.
410   * @param kvComparator KVComparator for storeFileManager.
411   * @return StoreEngine to use.
412   */
413  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
414    CellComparator kvComparator) throws IOException {
415    return StoreEngine.create(store, conf, kvComparator);
416  }
417
418  /** Returns TTL in seconds of the specified family */
419  public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
420    // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
421    long ttl = family.getTimeToLive();
422    if (ttl == HConstants.FOREVER) {
423      // Default is unlimited ttl.
424      ttl = Long.MAX_VALUE;
425    } else if (ttl == -1) {
426      ttl = Long.MAX_VALUE;
427    } else {
428      // Second -> ms adjust for user data
429      ttl *= 1000;
430    }
431    return ttl;
432  }
433
434  public StoreContext getStoreContext() {
435    return storeContext;
436  }
437
438  @Override
439  public String getColumnFamilyName() {
440    return this.storeContext.getFamily().getNameAsString();
441  }
442
443  @Override
444  public TableName getTableName() {
445    return this.getRegionInfo().getTable();
446  }
447
448  @Override
449  public FileSystem getFileSystem() {
450    return storeContext.getRegionFileSystem().getFileSystem();
451  }
452
453  public HRegionFileSystem getRegionFileSystem() {
454    return storeContext.getRegionFileSystem();
455  }
456
457  /* Implementation of StoreConfigInformation */
458  @Override
459  public long getStoreFileTtl() {
460    // TTL only applies if there's no MIN_VERSIONs setting on the column.
461    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
462  }
463
464  @Override
465  public long getMemStoreFlushSize() {
466    // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack
467    return this.region.memstoreFlushSize;
468  }
469
470  @Override
471  public MemStoreSize getFlushableSize() {
472    return this.memstore.getFlushableSize();
473  }
474
475  @Override
476  public MemStoreSize getSnapshotSize() {
477    return this.memstore.getSnapshotSize();
478  }
479
480  @Override
481  public long getCompactionCheckMultiplier() {
482    return this.compactionCheckMultiplier;
483  }
484
485  @Override
486  public long getBlockingFileCount() {
487    return blockingFileCount;
488  }
489  /* End implementation of StoreConfigInformation */
490
491  @Override
492  public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
493    return this.storeContext.getFamily();
494  }
495
496  @Override
497  public OptionalLong getMaxSequenceId() {
498    return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());
499  }
500
501  @Override
502  public OptionalLong getMaxMemStoreTS() {
503    return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());
504  }
505
506  /** Returns the data block encoder */
507  public HFileDataBlockEncoder getDataBlockEncoder() {
508    return dataBlockEncoder;
509  }
510
511  /**
512   * Should be used only in tests.
513   * @param blockEncoder the block delta encoder to use
514   */
515  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
516    this.dataBlockEncoder = blockEncoder;
517  }
518
519  private void postRefreshStoreFiles() throws IOException {
520    // Advance the memstore read point to be at least the new store files seqIds so that
521    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
522    // in-flight transactions might be made visible)
523    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
524    refreshStoreSizeAndTotalBytes();
525  }
526
527  @Override
528  public void refreshStoreFiles() throws IOException {
529    storeEngine.refreshStoreFiles();
530    postRefreshStoreFiles();
531  }
532
533  /**
534   * Replaces the store files that the store has with the given files. Mainly used by secondary
535   * region replicas to keep up to date with the primary region files.
536   */
537  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
538    storeEngine.refreshStoreFiles(newFiles);
539    postRefreshStoreFiles();
540  }
541
542  /**
543   * This message intends to inform the MemStore that next coming updates are going to be part of
544   * the replaying edits from WAL
545   */
546  public void startReplayingFromWAL() {
547    this.memstore.startReplayingFromWAL();
548  }
549
550  /**
551   * This message intends to inform the MemStore that the replaying edits from WAL are done
552   */
553  public void stopReplayingFromWAL() {
554    this.memstore.stopReplayingFromWAL();
555  }
556
557  /**
558   * Adds a value to the memstore
559   */
560  public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) {
561    storeEngine.readLock();
562    try {
563      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
564        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
565          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
566      }
567      this.memstore.add(cell, memstoreSizing);
568    } finally {
569      storeEngine.readUnlock();
570      currentParallelPutCount.decrementAndGet();
571    }
572  }
573
574  /**
575   * Adds the specified value to the memstore
576   */
577  public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) {
578    storeEngine.readLock();
579    try {
580      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
581        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
582          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
583      }
584      memstore.add(cells, memstoreSizing);
585    } finally {
586      storeEngine.readUnlock();
587      currentParallelPutCount.decrementAndGet();
588    }
589  }
590
591  @Override
592  public long timeOfOldestEdit() {
593    return memstore.timeOfOldestEdit();
594  }
595
596  /** Returns All store files. */
597  @Override
598  public Collection<HStoreFile> getStorefiles() {
599    return this.storeEngine.getStoreFileManager().getStoreFiles();
600  }
601
602  @Override
603  public Collection<HStoreFile> getCompactedFiles() {
604    return this.storeEngine.getStoreFileManager().getCompactedfiles();
605  }
606
607  /**
608   * This throws a WrongRegionException if the HFile does not fit in this region, or an
609   * InvalidHFileException if the HFile is not valid.
610   */
611  public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
612    HFile.Reader reader = null;
613    try {
614      LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);
615      FileSystem srcFs = srcPath.getFileSystem(conf);
616      srcFs.access(srcPath, FsAction.READ_WRITE);
617      reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);
618
619      Optional<byte[]> firstKey = reader.getFirstRowKey();
620      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
621      Optional<ExtendedCell> lk = reader.getLastKey();
622      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
623      byte[] lastKey = CellUtil.cloneRow(lk.get());
624
625      if (LOG.isDebugEnabled()) {
626        LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last="
627          + Bytes.toStringBinary(lastKey));
628        LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey())
629          + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
630      }
631
632      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
633        throw new WrongRegionException("Bulk load file " + srcPath.toString()
634          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
635      }
636
637      if (
638        reader.length()
639            > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
640      ) {
641        LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length()
642          + " bytes can be problematic as it may lead to oversplitting.");
643      }
644
645      if (verifyBulkLoads) {
646        long verificationStartTime = EnvironmentEdgeManager.currentTime();
647        LOG.info("Full verification started for bulk load hfile: {}", srcPath);
648        Cell prevCell = null;
649        HFileScanner scanner = reader.getScanner(conf, false, false, false);
650        scanner.seekTo();
651        do {
652          Cell cell = scanner.getCell();
653          if (prevCell != null) {
654            if (getComparator().compareRows(prevCell, cell) > 0) {
655              throw new InvalidHFileException("Previous row is greater than" + " current row: path="
656                + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current="
657                + CellUtil.getCellKeyAsString(cell));
658            }
659            if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {
660              throw new InvalidHFileException("Previous key had different"
661                + " family compared to current key: path=" + srcPath + " previous="
662                + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
663                  prevCell.getFamilyLength())
664                + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
665                  cell.getFamilyLength()));
666            }
667          }
668          prevCell = cell;
669        } while (scanner.next());
670        LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took "
671          + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms");
672      }
673    } finally {
674      if (reader != null) {
675        reader.close();
676      }
677    }
678  }
679
680  /**
681   * This method should only be called from Region. It is assumed that the ranges of values in the
682   * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
683   * @param seqNum sequence Id associated with the HFile
684   */
685  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
686    Path srcPath = new Path(srcPathStr);
687    return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
688  }
689
690  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
691    Path srcPath = new Path(srcPathStr);
692    try {
693      getRegionFileSystem().commitStoreFile(srcPath, dstPath);
694    } finally {
695      if (this.getCoprocessorHost() != null) {
696        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
697      }
698    }
699
700    LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath
701      + " - updating store file list.");
702
703    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
704    bulkLoadHFile(sf);
705
706    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
707
708    return dstPath;
709  }
710
711  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
712    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
713    bulkLoadHFile(sf);
714  }
715
716  private void bulkLoadHFile(HStoreFile sf) throws IOException {
717    StoreFileReader r = sf.getReader();
718    this.storeSize.addAndGet(r.length());
719    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
720    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
721    });
722    LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
723    if (LOG.isTraceEnabled()) {
724      String traceMessage = "BULK LOAD time,size,store size,store files ["
725        + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + ","
726        + storeEngine.getStoreFileManager().getStorefileCount() + "]";
727      LOG.trace(traceMessage);
728    }
729  }
730
731  private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
732    memstore.close();
733    // Clear so metrics doesn't find them.
734    ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
735    Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
736    // clear the compacted files
737    if (CollectionUtils.isNotEmpty(compactedfiles)) {
738      removeCompactedfiles(compactedfiles,
739        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
740    }
741    if (!result.isEmpty()) {
742      // initialize the thread pool for closing store files in parallel.
743      ThreadPoolExecutor storeFileCloserThreadPool =
744        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
745          + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
746
747      // close each store file in parallel
748      CompletionService<Void> completionService =
749        new ExecutorCompletionService<>(storeFileCloserThreadPool);
750      for (HStoreFile f : result) {
751        completionService.submit(new Callable<Void>() {
752          @Override
753          public Void call() throws IOException {
754            boolean evictOnClose =
755              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
756            f.closeStoreFile(!warmup && evictOnClose);
757            return null;
758          }
759        });
760      }
761
762      IOException ioe = null;
763      try {
764        for (int i = 0; i < result.size(); i++) {
765          try {
766            Future<Void> future = completionService.take();
767            future.get();
768          } catch (InterruptedException e) {
769            if (ioe == null) {
770              ioe = new InterruptedIOException();
771              ioe.initCause(e);
772            }
773          } catch (ExecutionException e) {
774            if (ioe == null) {
775              ioe = new IOException(e.getCause());
776            }
777          }
778        }
779      } finally {
780        storeFileCloserThreadPool.shutdownNow();
781      }
782      if (ioe != null) {
783        throw ioe;
784      }
785    }
786    LOG.trace("Closed {}", this);
787    return result;
788  }
789
790  /**
791   * Close all the readers We don't need to worry about subsequent requests because the Region holds
792   * a write lock that will prevent any more reads or writes.
793   * @return the {@link StoreFile StoreFiles} that were previously being used.
794   * @throws IOException on failure
795   */
796  public ImmutableCollection<HStoreFile> close() throws IOException {
797    // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
798    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
799    // Change later if findbugs becomes smarter in the future.
800    this.archiveLock.lock();
801    try {
802      this.storeEngine.writeLock();
803      try {
804        return closeWithoutLock();
805      } finally {
806        this.storeEngine.writeUnlock();
807      }
808    } finally {
809      this.archiveLock.unlock();
810    }
811  }
812
813  /**
814   * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called
815   * previously.
816   * @param logCacheFlushId flush sequence number
817   * @return The path name of the tmp file to which the store was flushed
818   * @throws IOException if exception occurs during process
819   */
820  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
821    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
822    Consumer<Path> writerCreationTracker) throws IOException {
823    // If an exception happens flushing, we let it out without clearing
824    // the memstore snapshot. The old snapshot will be returned when we say
825    // 'snapshot', the next time flush comes around.
826    // Retry after catching exception when flushing, otherwise server will abort
827    // itself
828    StoreFlusher flusher = storeEngine.getStoreFlusher();
829    IOException lastException = null;
830    for (int i = 0; i < flushRetriesNumber; i++) {
831      try {
832        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status,
833          throughputController, tracker, writerCreationTracker);
834        Path lastPathName = null;
835        try {
836          for (Path pathName : pathNames) {
837            lastPathName = pathName;
838            storeEngine.validateStoreFile(pathName, false);
839          }
840          return pathNames;
841        } catch (Exception e) {
842          LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
843          if (e instanceof IOException) {
844            lastException = (IOException) e;
845          } else {
846            lastException = new IOException(e);
847          }
848        }
849      } catch (IOException e) {
850        LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e);
851        lastException = e;
852      }
853      if (lastException != null && i < (flushRetriesNumber - 1)) {
854        try {
855          Thread.sleep(pauseTime);
856        } catch (InterruptedException e) {
857          IOException iie = new InterruptedIOException();
858          iie.initCause(e);
859          throw iie;
860        }
861      }
862    }
863    throw lastException;
864  }
865
866  public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
867    LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this);
868    FileSystem srcFs = path.getFileSystem(conf);
869    srcFs.access(path, FsAction.READ_WRITE);
870    try (HFile.Reader reader =
871      HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {
872      Optional<byte[]> firstKey = reader.getFirstRowKey();
873      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
874      Optional<ExtendedCell> lk = reader.getLastKey();
875      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
876      byte[] lastKey = CellUtil.cloneRow(lk.get());
877      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
878        throw new WrongRegionException("Recovered hfile " + path.toString()
879          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
880      }
881    }
882
883    Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
884    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
885    StoreFileReader r = sf.getReader();
886    this.storeSize.addAndGet(r.length());
887    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
888
889    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
890    });
891
892    LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
893      r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
894    return sf;
895  }
896
897  private long getTotalSize(Collection<HStoreFile> sfs) {
898    return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
899  }
900
901  private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
902    // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
903    // close {@link DefaultMemStore#snapshot}, which may be used by
904    // {@link DefaultMemStore#getScanners}.
905    storeEngine.addStoreFiles(sfs,
906      // NOTE: here we must increase the refCount for storeFiles because we would open the
907      // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
908      // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
909      // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
910      // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
911      // storeEngine lock. see HBASE-27519 for more details.
912      snapshotId > 0 ? () -> {
913        this.memstore.clearSnapshot(snapshotId);
914        HStoreFile.increaseStoreFilesRefeCount(sfs);
915      } : () -> {
916        HStoreFile.increaseStoreFilesRefeCount(sfs);
917      });
918    // notify to be called here - only in case of flushes
919    try {
920      notifyChangedReadersObservers(sfs);
921    } finally {
922      HStoreFile.decreaseStoreFilesRefeCount(sfs);
923    }
924    if (LOG.isTraceEnabled()) {
925      long totalSize = getTotalSize(sfs);
926      String traceMessage = "FLUSH time,count,size,store size,store files ["
927        + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + ","
928        + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
929      LOG.trace(traceMessage);
930    }
931    return needsCompaction();
932  }
933
934  /**
935   * Notify all observers that set of Readers has changed.
936   */
937  private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
938    for (ChangedReadersObserver o : this.changedReaderObservers) {
939      List<KeyValueScanner> memStoreScanners;
940      this.storeEngine.readLock();
941      try {
942        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
943      } finally {
944        this.storeEngine.readUnlock();
945      }
946      o.updateReaders(sfs, memStoreScanners);
947    }
948  }
949
950  /**
951   * Get all scanners with no filtering based on TTL (that happens further down the line).
952   * @param cacheBlocks  cache the blocks or not
953   * @param usePread     true to use pread, false if not
954   * @param isCompaction true if the scanner is created for compaction
955   * @param matcher      the scan query matcher
956   * @param startRow     the start row
957   * @param stopRow      the stop row
958   * @param readPt       the read point of the current scan
959   * @return all scanners for this store
960   */
961  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
962    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt,
963    boolean onlyLatestVersion) throws IOException {
964    return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
965      readPt, onlyLatestVersion);
966  }
967
968  /**
969   * Get all scanners with no filtering based on TTL (that happens further down the line).
970   * @param cacheBlocks     cache the blocks or not
971   * @param usePread        true to use pread, false if not
972   * @param isCompaction    true if the scanner is created for compaction
973   * @param matcher         the scan query matcher
974   * @param startRow        the start row
975   * @param includeStartRow true to include start row, false if not
976   * @param stopRow         the stop row
977   * @param includeStopRow  true to include stop row, false if not
978   * @param readPt          the read point of the current scan
979   * @return all scanners for this store
980   */
981  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
982    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
983    byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)
984    throws IOException {
985    Collection<HStoreFile> storeFilesToScan;
986    List<KeyValueScanner> memStoreScanners;
987    this.storeEngine.readLock();
988    try {
989      storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
990        includeStartRow, stopRow, includeStopRow, onlyLatestVersion);
991      memStoreScanners = this.memstore.getScanners(readPt);
992      // NOTE: here we must increase the refCount for storeFiles because we would open the
993      // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
994      // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
995      // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
996      // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
997      // for more details.
998      HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
999    } finally {
1000      this.storeEngine.readUnlock();
1001    }
1002    try {
1003      // First the store file scanners
1004
1005      // TODO this used to get the store files in descending order,
1006      // but now we get them in ascending order, which I think is
1007      // actually more correct, since memstore get put at the end.
1008      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
1009        storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1010      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1011      scanners.addAll(sfScanners);
1012      // Then the memstore scanners
1013      scanners.addAll(memStoreScanners);
1014      return scanners;
1015    } catch (Throwable t) {
1016      clearAndClose(memStoreScanners);
1017      throw t instanceof IOException ? (IOException) t : new IOException(t);
1018    } finally {
1019      HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
1020    }
1021  }
1022
1023  private static void clearAndClose(List<KeyValueScanner> scanners) {
1024    if (scanners == null) {
1025      return;
1026    }
1027    for (KeyValueScanner s : scanners) {
1028      s.close();
1029    }
1030    scanners.clear();
1031  }
1032
1033  /**
1034   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1035   * (that happens further down the line).
1036   * @param files                  the list of files on which the scanners has to be created
1037   * @param cacheBlocks            cache the blocks or not
1038   * @param usePread               true to use pread, false if not
1039   * @param isCompaction           true if the scanner is created for compaction
1040   * @param matcher                the scan query matcher
1041   * @param startRow               the start row
1042   * @param stopRow                the stop row
1043   * @param readPt                 the read point of the current scan
1044   * @param includeMemstoreScanner true if memstore has to be included
1045   * @return scanners on the given files and on the memstore if specified
1046   */
1047  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1048    boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1049    byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner,
1050    boolean onlyLatestVersion) throws IOException {
1051    return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
1052      false, readPt, includeMemstoreScanner, onlyLatestVersion);
1053  }
1054
1055  /**
1056   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1057   * (that happens further down the line).
1058   * @param files                  the list of files on which the scanners has to be created
1059   * @param cacheBlocks            ache the blocks or not
1060   * @param usePread               true to use pread, false if not
1061   * @param isCompaction           true if the scanner is created for compaction
1062   * @param matcher                the scan query matcher
1063   * @param startRow               the start row
1064   * @param includeStartRow        true to include start row, false if not
1065   * @param stopRow                the stop row
1066   * @param includeStopRow         true to include stop row, false if not
1067   * @param readPt                 the read point of the current scan
1068   * @param includeMemstoreScanner true if memstore has to be included
1069   * @return scanners on the given files and on the memstore if specified
1070   */
1071  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1072    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1073    boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1074    boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1075    List<KeyValueScanner> memStoreScanners = null;
1076    if (includeMemstoreScanner) {
1077      this.storeEngine.readLock();
1078      try {
1079        memStoreScanners = this.memstore.getScanners(readPt);
1080      } finally {
1081        this.storeEngine.readUnlock();
1082      }
1083    }
1084    try {
1085      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1086        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1087      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1088      scanners.addAll(sfScanners);
1089      // Then the memstore scanners
1090      if (memStoreScanners != null) {
1091        scanners.addAll(memStoreScanners);
1092      }
1093      return scanners;
1094    } catch (Throwable t) {
1095      clearAndClose(memStoreScanners);
1096      throw t instanceof IOException ? (IOException) t : new IOException(t);
1097    }
1098  }
1099
1100  /**
1101   * @param o Observer who wants to know about changes in set of Readers
1102   */
1103  public void addChangedReaderObserver(ChangedReadersObserver o) {
1104    this.changedReaderObservers.add(o);
1105  }
1106
1107  /**
1108   * @param o Observer no longer interested in changes in set of Readers.
1109   */
1110  public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1111    // We don't check if observer present; it may not be (legitimately)
1112    this.changedReaderObservers.remove(o);
1113  }
1114
1115  //////////////////////////////////////////////////////////////////////////////
1116  // Compaction
1117  //////////////////////////////////////////////////////////////////////////////
1118
1119  /**
1120   * Compact the StoreFiles. This method may take some time, so the calling thread must be able to
1121   * block for long periods.
1122   * <p>
1123   * During this time, the Store can work as usual, getting values from StoreFiles and writing new
1124   * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until the new compacted
1125   * StoreFile is completely written-out to disk.
1126   * <p>
1127   * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from
1128   * interfering with other write operations.
1129   * <p>
1130   * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and
1131   * we want to allow cache-flushes during this period.
1132   * <p>
1133   * Compaction event should be idempotent, since there is no IO Fencing for the region directory in
1134   * hdfs. A region server might still try to complete the compaction after it lost the region. That
1135   * is why the following events are carefully ordered for a compaction:
1136   * <ol>
1137   * <li>Compaction writes new files under region/.tmp directory (compaction output)</li>
1138   * <li>Compaction atomically moves the temporary file under region directory</li>
1139   * <li>Compaction appends a WAL edit containing the compaction input and output files. Forces sync
1140   * on WAL.</li>
1141   * <li>Compaction deletes the input files from the region directory.</li>
1142   * </ol>
1143   * Failure conditions are handled like this:
1144   * <ul>
1145   * <li>If RS fails before 2, compaction won't complete. Even if RS lives on and finishes the
1146   * compaction later, it will only write the new data file to the region directory. Since we
1147   * already have this data, this will be idempotent, but we will have a redundant copy of the
1148   * data.</li>
1149   * <li>If RS fails between 2 and 3, the region will have a redundant copy of the data. The RS that
1150   * failed won't be able to finish sync() for WAL because of lease recovery in WAL.</li>
1151   * <li>If RS fails after 3, the region server who opens the region will pick up the compaction
1152   * marker from the WAL and replay it by removing the compaction input files. Failed RS can also
1153   * attempt to delete those files, but the operation will be idempotent</li>
1154   * </ul>
1155   * See HBASE-2231 for details.
1156   * @param compaction compaction details obtained from requestCompaction()
1157   * @return The storefiles that we compacted into or null if we failed or opted out early.
1158   */
1159  public List<HStoreFile> compact(CompactionContext compaction,
1160    ThroughputController throughputController, User user) throws IOException {
1161    assert compaction != null;
1162    CompactionRequestImpl cr = compaction.getRequest();
1163    StoreFileWriterCreationTracker writerCreationTracker =
1164      storeFileWriterCreationTrackerFactory.get();
1165    if (writerCreationTracker != null) {
1166      cr.setWriterCreationTracker(writerCreationTracker);
1167      storeFileWriterCreationTrackers.add(writerCreationTracker);
1168    }
1169    try {
1170      // Do all sanity checking in here if we have a valid CompactionRequestImpl
1171      // because we need to clean up after it on the way out in a finally
1172      // block below
1173      long compactionStartTime = EnvironmentEdgeManager.currentTime();
1174      assert compaction.hasSelection();
1175      Collection<HStoreFile> filesToCompact = cr.getFiles();
1176      assert !filesToCompact.isEmpty();
1177      synchronized (filesCompacting) {
1178        // sanity check: we're compacting files that this store knows about
1179        // TODO: change this to LOG.error() after more debugging
1180        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1181      }
1182
1183      // Ready to go. Have list of files to compact.
1184      LOG.info("Starting compaction of " + filesToCompact + " into tmpdir="
1185        + getRegionFileSystem().getTempDir() + ", totalSize="
1186        + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1187
1188      return doCompaction(cr, filesToCompact, user, compactionStartTime,
1189        compaction.compact(throughputController, user));
1190    } finally {
1191      finishCompactionRequest(cr);
1192    }
1193  }
1194
1195  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
1196    Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles)
1197    throws IOException {
1198    // Do the steps necessary to complete the compaction.
1199    setStoragePolicyFromFileName(newFiles);
1200    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true);
1201    if (this.getCoprocessorHost() != null) {
1202      for (HStoreFile sf : sfs) {
1203        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
1204      }
1205    }
1206    replaceStoreFiles(filesToCompact, sfs, true);
1207
1208    long outputBytes = getTotalSize(sfs);
1209
1210    // At this point the store will use new files for all new scanners.
1211    refreshStoreSizeAndTotalBytes(); // update store size.
1212
1213    long now = EnvironmentEdgeManager.currentTime();
1214    if (
1215      region.getRegionServerServices() != null
1216        && region.getRegionServerServices().getMetrics() != null
1217    ) {
1218      region.getRegionServerServices().getMetrics().updateCompaction(
1219        region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(),
1220        now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1221        outputBytes);
1222
1223    }
1224
1225    logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1226    return sfs;
1227  }
1228
1229  // Set correct storage policy from the file name of DTCP.
1230  // Rename file will not change the storage policy.
1231  private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
1232    String prefix = HConstants.STORAGE_POLICY_PREFIX;
1233    for (Path newFile : newFiles) {
1234      if (newFile.getParent().getName().startsWith(prefix)) {
1235        CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,
1236          newFile.getParent().getName().substring(prefix.length()));
1237      }
1238    }
1239  }
1240
1241  /**
1242   * Writes the compaction WAL record.
1243   * @param filesCompacted Files compacted (input).
1244   * @param newFiles       Files from compaction.
1245   */
1246  private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
1247    Collection<HStoreFile> newFiles) throws IOException {
1248    if (region.getWAL() == null) {
1249      return;
1250    }
1251    List<Path> inputPaths =
1252      filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1253    List<Path> outputPaths =
1254      newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1255    RegionInfo info = this.region.getRegionInfo();
1256    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1257      getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,
1258      getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
1259    // Fix reaching into Region to get the maxWaitForSeqId.
1260    // Does this method belong in Region altogether given it is making so many references up there?
1261    // Could be Region#writeCompactionMarker(compactionDescriptor);
1262    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1263      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
1264      region.getRegionReplicationSink().orElse(null));
1265  }
1266
1267  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1268      allowedOnPath = ".*/(HStore|TestHStore).java")
1269  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
1270    boolean writeCompactionMarker) throws IOException {
1271    storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
1272      if (writeCompactionMarker) {
1273        writeCompactionWalRecord(compactedFiles, result);
1274      }
1275    }, () -> {
1276      synchronized (filesCompacting) {
1277        filesCompacting.removeAll(compactedFiles);
1278      }
1279    });
1280    // These may be null when the RS is shutting down. The space quota Chores will fix the Region
1281    // sizes later so it's not super-critical if we miss these.
1282    RegionServerServices rsServices = region.getRegionServerServices();
1283    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
1284      updateSpaceQuotaAfterFileReplacement(
1285        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1286        compactedFiles, result);
1287    }
1288  }
1289
1290  /**
1291   * Updates the space quota usage for this region, removing the size for files compacted away and
1292   * adding in the size for new files.
1293   * @param sizeStore  The object tracking changes in region size for space quotas.
1294   * @param regionInfo The identifier for the region whose size is being updated.
1295   * @param oldFiles   Files removed from this store's region.
1296   * @param newFiles   Files added to this store's region.
1297   */
1298  void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo,
1299    Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) {
1300    long delta = 0;
1301    if (oldFiles != null) {
1302      for (HStoreFile compactedFile : oldFiles) {
1303        if (compactedFile.isHFile()) {
1304          delta -= compactedFile.getReader().length();
1305        }
1306      }
1307    }
1308    if (newFiles != null) {
1309      for (HStoreFile newFile : newFiles) {
1310        if (newFile.isHFile()) {
1311          delta += newFile.getReader().length();
1312        }
1313      }
1314    }
1315    sizeStore.incrementRegionSize(regionInfo, delta);
1316  }
1317
1318  /**
1319   * Log a very elaborate compaction completion message.
1320   * @param cr                  Request.
1321   * @param sfs                 Resulting files.
1322   * @param compactionStartTime Start time.
1323   */
1324  private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now,
1325    long compactionStartTime) {
1326    StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "")
1327      + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "")
1328      + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
1329    if (sfs.isEmpty()) {
1330      message.append("none, ");
1331    } else {
1332      for (HStoreFile sf : sfs) {
1333        message.append(sf.getPath().getName());
1334        message.append("(size=");
1335        message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1336        message.append("), ");
1337      }
1338    }
1339    message.append("total size for store is ")
1340      .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
1341      .append(". This selection was in queue for ")
1342      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1343      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1344      .append(" to execute.");
1345    LOG.info(message.toString());
1346    if (LOG.isTraceEnabled()) {
1347      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1348      long resultSize = getTotalSize(sfs);
1349      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1350        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1351        + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1352      LOG.trace(traceMessage);
1353    }
1354  }
1355
1356  /**
1357   * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was
1358   * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231.
1359   */
1360  public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
1361    boolean removeFiles) throws IOException {
1362    LOG.debug("Completing compaction from the WAL marker");
1363    List<String> compactionInputs = compaction.getCompactionInputList();
1364    List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1365
1366    // The Compaction Marker is written after the compaction is completed,
1367    // and the files moved into the region/family folder.
1368    //
1369    // If we crash after the entry is written, we may not have removed the
1370    // input files, but the output file is present.
1371    // (The unremoved input files will be removed by this function)
1372    //
1373    // If we scan the directory and the file is not present, it can mean that:
1374    // - The file was manually removed by the user
1375    // - The file was removed as consequence of subsequent compaction
1376    // so, we can't do anything with the "compaction output list" because those
1377    // files have already been loaded when opening the region (by virtue of
1378    // being in the store's folder) or they may be missing due to a compaction.
1379
1380    String familyName = this.getColumnFamilyName();
1381    Set<String> inputFiles = new HashSet<>();
1382    for (String compactionInput : compactionInputs) {
1383      Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);
1384      inputFiles.add(inputPath.getName());
1385    }
1386
1387    // some of the input files might already be deleted
1388    List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
1389    for (HStoreFile sf : this.getStorefiles()) {
1390      if (inputFiles.contains(sf.getPath().getName())) {
1391        inputStoreFiles.add(sf);
1392      }
1393    }
1394
1395    // check whether we need to pick up the new files
1396    List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
1397
1398    if (pickCompactionFiles) {
1399      for (HStoreFile sf : this.getStorefiles()) {
1400        compactionOutputs.remove(sf.getPath().getName());
1401      }
1402      for (String compactionOutput : compactionOutputs) {
1403        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
1404        StoreFileInfo storeFileInfo =
1405          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput, sft);
1406        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
1407        outputStoreFiles.add(storeFile);
1408      }
1409    }
1410
1411    if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1412      LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles
1413        + " with output files : " + outputStoreFiles);
1414      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
1415      this.refreshStoreSizeAndTotalBytes();
1416    }
1417  }
1418
1419  @Override
1420  public boolean hasReferences() {
1421    // Grab the read lock here, because we need to ensure that: only when the atomic
1422    // replaceStoreFiles(..) finished, we can get all the complete store file list.
1423    this.storeEngine.readLock();
1424    try {
1425      // Merge the current store files with compacted files here due to HBASE-20940.
1426      Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
1427      allStoreFiles.addAll(getCompactedFiles());
1428      return StoreUtils.hasReferences(allStoreFiles);
1429    } finally {
1430      this.storeEngine.readUnlock();
1431    }
1432  }
1433
1434  /**
1435   * getter for CompactionProgress object
1436   * @return CompactionProgress object; can be null
1437   */
1438  public CompactionProgress getCompactionProgress() {
1439    return this.storeEngine.getCompactor().getProgress();
1440  }
1441
1442  @Override
1443  public boolean shouldPerformMajorCompaction() throws IOException {
1444    for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) {
1445      // TODO: what are these reader checks all over the place?
1446      if (sf.getReader() == null) {
1447        LOG.debug("StoreFile {} has null Reader", sf);
1448        return false;
1449      }
1450    }
1451    return storeEngine.getCompactionPolicy()
1452      .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles());
1453  }
1454
1455  public Optional<CompactionContext> requestCompaction() throws IOException {
1456    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
1457  }
1458
1459  public Optional<CompactionContext> requestCompaction(int priority,
1460    CompactionLifeCycleTracker tracker, User user) throws IOException {
1461    // don't even select for compaction if writes are disabled
1462    if (!this.areWritesEnabled()) {
1463      return Optional.empty();
1464    }
1465    // Before we do compaction, try to get rid of unneeded files to simplify things.
1466    removeUnneededFiles();
1467
1468    final CompactionContext compaction = storeEngine.createCompaction();
1469    CompactionRequestImpl request = null;
1470    this.storeEngine.readLock();
1471    try {
1472      synchronized (filesCompacting) {
1473        // First, see if coprocessor would want to override selection.
1474        if (this.getCoprocessorHost() != null) {
1475          final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1476          boolean override =
1477            getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user);
1478          if (override) {
1479            // Coprocessor is overriding normal file selection.
1480            compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
1481          }
1482        }
1483
1484        // Normal case - coprocessor is not overriding file selection.
1485        if (!compaction.hasSelection()) {
1486          boolean isUserCompaction = priority == Store.PRIORITY_USER;
1487          boolean mayUseOffPeak =
1488            offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true);
1489          try {
1490            compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak,
1491              forceMajor && filesCompacting.isEmpty());
1492          } catch (IOException e) {
1493            if (mayUseOffPeak) {
1494              offPeakCompactionTracker.set(false);
1495            }
1496            throw e;
1497          }
1498          assert compaction.hasSelection();
1499          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1500            // Compaction policy doesn't want to take advantage of off-peak.
1501            offPeakCompactionTracker.set(false);
1502          }
1503        }
1504        if (this.getCoprocessorHost() != null) {
1505          this.getCoprocessorHost().postCompactSelection(this,
1506            ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
1507            compaction.getRequest(), user);
1508        }
1509        // Finally, we have the resulting files list. Check if we have any files at all.
1510        request = compaction.getRequest();
1511        Collection<HStoreFile> selectedFiles = request.getFiles();
1512        if (selectedFiles.isEmpty()) {
1513          return Optional.empty();
1514        }
1515
1516        addToCompactingFiles(selectedFiles);
1517
1518        // If we're enqueuing a major, clear the force flag.
1519        this.forceMajor = this.forceMajor && !request.isMajor();
1520
1521        // Set common request properties.
1522        // Set priority, either override value supplied by caller or from store.
1523        final int compactionPriority =
1524          (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
1525        request.setPriority(compactionPriority);
1526
1527        if (request.isAfterSplit()) {
1528          // If the store belongs to recently splitted daughter regions, better we consider
1529          // them with the higher priority in the compaction queue.
1530          // Override priority if it is lower (higher int value) than
1531          // SPLIT_REGION_COMPACTION_PRIORITY
1532          final int splitHousekeepingPriority =
1533            Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
1534          request.setPriority(splitHousekeepingPriority);
1535          LOG.info(
1536            "Keeping/Overriding Compaction request priority to {} for CF {} since it"
1537              + " belongs to recently split daughter region {}",
1538            splitHousekeepingPriority, this.getColumnFamilyName(),
1539            getRegionInfo().getRegionNameAsString());
1540        }
1541        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1542        request.setTracker(tracker);
1543      }
1544    } finally {
1545      this.storeEngine.readUnlock();
1546    }
1547
1548    if (LOG.isDebugEnabled()) {
1549      LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1550        + (request.isAllFiles() ? " (all files)" : ""));
1551    }
1552    this.region.reportCompactionRequestStart(request.isMajor());
1553    return Optional.of(compaction);
1554  }
1555
1556  /** Adds the files to compacting files. filesCompacting must be locked. */
1557  private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
1558    if (CollectionUtils.isEmpty(filesToAdd)) {
1559      return;
1560    }
1561    // Check that we do not try to compact the same StoreFile twice.
1562    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1563      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1564    }
1565    filesCompacting.addAll(filesToAdd);
1566    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1567  }
1568
1569  private void removeUnneededFiles() throws IOException {
1570    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
1571      return;
1572    }
1573    if (getColumnFamilyDescriptor().getMinVersions() > 0) {
1574      LOG.debug("Skipping expired store file removal due to min version of {} being {}", this,
1575        getColumnFamilyDescriptor().getMinVersions());
1576      return;
1577    }
1578    this.storeEngine.readLock();
1579    Collection<HStoreFile> delSfs = null;
1580    try {
1581      synchronized (filesCompacting) {
1582        long cfTtl = getStoreFileTtl();
1583        if (cfTtl != Long.MAX_VALUE) {
1584          delSfs = storeEngine.getStoreFileManager()
1585            .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1586          addToCompactingFiles(delSfs);
1587        }
1588      }
1589    } finally {
1590      this.storeEngine.readUnlock();
1591    }
1592
1593    if (CollectionUtils.isEmpty(delSfs)) {
1594      return;
1595    }
1596
1597    Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1598    replaceStoreFiles(delSfs, newFiles, true);
1599    refreshStoreSizeAndTotalBytes();
1600    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this
1601      + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
1602  }
1603
1604  public void cancelRequestedCompaction(CompactionContext compaction) {
1605    finishCompactionRequest(compaction.getRequest());
1606  }
1607
1608  private void finishCompactionRequest(CompactionRequestImpl cr) {
1609    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1610    if (cr.isOffPeak()) {
1611      offPeakCompactionTracker.set(false);
1612      cr.setOffPeak(false);
1613    }
1614    synchronized (filesCompacting) {
1615      filesCompacting.removeAll(cr.getFiles());
1616    }
1617    // The tracker could be null, for example, we do not need to track the creation of store file
1618    // writer due to different implementation of SFT, or the compaction is canceled.
1619    if (cr.getWriterCreationTracker() != null) {
1620      storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
1621    }
1622  }
1623
1624  /**
1625   * Update counts.
1626   */
1627  protected void refreshStoreSizeAndTotalBytes() throws IOException {
1628    this.storeSize.set(0L);
1629    this.totalUncompressedBytes.set(0L);
1630    for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) {
1631      StoreFileReader r = hsf.getReader();
1632      if (r == null) {
1633        LOG.debug("StoreFile {} has a null Reader", hsf);
1634        continue;
1635      }
1636      this.storeSize.addAndGet(r.length());
1637      this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1638    }
1639  }
1640
1641  /*
1642   * @param wantedVersions How many versions were asked for.
1643   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1644   */
1645  int versionsToReturn(final int wantedVersions) {
1646    if (wantedVersions <= 0) {
1647      throw new IllegalArgumentException("Number of versions must be > 0");
1648    }
1649    // Make sure we do not return more than maximum versions for this store.
1650    int maxVersions = getColumnFamilyDescriptor().getMaxVersions();
1651    return wantedVersions > maxVersions ? maxVersions : wantedVersions;
1652  }
1653
1654  @Override
1655  public boolean canSplit() {
1656    // Not split-able if we find a reference store file present in the store.
1657    boolean result = !hasReferences();
1658    if (!result) {
1659      LOG.trace("Not splittable; has references: {}", this);
1660    }
1661    return result;
1662  }
1663
1664  /**
1665   * Determines if Store should be split.
1666   */
1667  public Optional<byte[]> getSplitPoint() {
1668    this.storeEngine.readLock();
1669    try {
1670      // Should already be enforced by the split policy!
1671      assert !this.getRegionInfo().isMetaRegion();
1672      // Not split-able if we find a reference store file present in the store.
1673      if (hasReferences()) {
1674        LOG.trace("Not splittable; has references: {}", this);
1675        return Optional.empty();
1676      }
1677      return this.storeEngine.getStoreFileManager().getSplitPoint();
1678    } catch (IOException e) {
1679      LOG.warn("Failed getting store size for {}", this, e);
1680    } finally {
1681      this.storeEngine.readUnlock();
1682    }
1683    return Optional.empty();
1684  }
1685
1686  @Override
1687  public long getLastCompactSize() {
1688    return this.lastCompactSize;
1689  }
1690
1691  @Override
1692  public long getSize() {
1693    return storeSize.get();
1694  }
1695
1696  public void triggerMajorCompaction() {
1697    this.forceMajor = true;
1698  }
1699
1700  //////////////////////////////////////////////////////////////////////////////
1701  // File administration
1702  //////////////////////////////////////////////////////////////////////////////
1703
1704  /**
1705   * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
1706   * compaction.
1707   * @param scan       Scan to apply when scanning the stores
1708   * @param targetCols columns to scan
1709   * @return a scanner over the current key values
1710   * @throws IOException on failure
1711   */
1712  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
1713    throws IOException {
1714    storeEngine.readLock();
1715    try {
1716      ScanInfo scanInfo;
1717      if (this.getCoprocessorHost() != null) {
1718        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
1719      } else {
1720        scanInfo = getScanInfo();
1721      }
1722      return createScanner(scan, scanInfo, targetCols, readPt);
1723    } finally {
1724      storeEngine.readUnlock();
1725    }
1726  }
1727
1728  // HMobStore will override this method to return its own implementation.
1729  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
1730    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
1731    return scan.isReversed()
1732      ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
1733      : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
1734  }
1735
1736  /**
1737   * Recreates the scanners on the current list of active store file scanners
1738   * @param currentFileScanners    the current set of active store file scanners
1739   * @param cacheBlocks            cache the blocks or not
1740   * @param usePread               use pread or not
1741   * @param isCompaction           is the scanner for compaction
1742   * @param matcher                the scan query matcher
1743   * @param startRow               the scan's start row
1744   * @param includeStartRow        should the scan include the start row
1745   * @param stopRow                the scan's stop row
1746   * @param includeStopRow         should the scan include the stop row
1747   * @param readPt                 the read point of the current scane
1748   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
1749   * @return list of scanners recreated on the current Scanners
1750   */
1751  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
1752    boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1753    byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1754    boolean includeMemstoreScanner) throws IOException {
1755    this.storeEngine.readLock();
1756    try {
1757      Map<String, HStoreFile> name2File =
1758        new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
1759      for (HStoreFile file : getStorefiles()) {
1760        name2File.put(file.getFileInfo().getActiveFileName(), file);
1761      }
1762      Collection<HStoreFile> compactedFiles = getCompactedFiles();
1763      for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
1764        name2File.put(file.getFileInfo().getActiveFileName(), file);
1765      }
1766      List<HStoreFile> filesToReopen = new ArrayList<>();
1767      for (KeyValueScanner kvs : currentFileScanners) {
1768        assert kvs.isFileScanner();
1769        if (kvs.peek() == null) {
1770          continue;
1771        }
1772        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
1773      }
1774      if (filesToReopen.isEmpty()) {
1775        return null;
1776      }
1777      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
1778        includeStartRow, stopRow, includeStopRow, readPt, false, false);
1779    } finally {
1780      this.storeEngine.readUnlock();
1781    }
1782  }
1783
1784  @Override
1785  public String toString() {
1786    return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName();
1787  }
1788
1789  @Override
1790  public int getStorefilesCount() {
1791    return this.storeEngine.getStoreFileManager().getStorefileCount();
1792  }
1793
1794  @Override
1795  public int getCompactedFilesCount() {
1796    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
1797  }
1798
1799  private LongStream getStoreFileAgeStream() {
1800    return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> {
1801      if (sf.getReader() == null) {
1802        LOG.debug("StoreFile {} has a null Reader", sf);
1803        return false;
1804      } else {
1805        return true;
1806      }
1807    }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())
1808      .map(t -> EnvironmentEdgeManager.currentTime() - t);
1809  }
1810
1811  @Override
1812  public OptionalLong getMaxStoreFileAge() {
1813    return getStoreFileAgeStream().max();
1814  }
1815
1816  @Override
1817  public OptionalLong getMinStoreFileAge() {
1818    return getStoreFileAgeStream().min();
1819  }
1820
1821  @Override
1822  public OptionalDouble getAvgStoreFileAge() {
1823    return getStoreFileAgeStream().average();
1824  }
1825
1826  @Override
1827  public long getNumReferenceFiles() {
1828    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1829      .filter(HStoreFile::isReference).count();
1830  }
1831
1832  @Override
1833  public long getNumHFiles() {
1834    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1835      .filter(HStoreFile::isHFile).count();
1836  }
1837
1838  @Override
1839  public long getStoreSizeUncompressed() {
1840    return this.totalUncompressedBytes.get();
1841  }
1842
1843  @Override
1844  public long getStorefilesSize() {
1845    // Include all StoreFiles
1846    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),
1847      sf -> true);
1848  }
1849
1850  @Override
1851  public long getHFilesSize() {
1852    // Include only StoreFiles which are HFiles
1853    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),
1854      HStoreFile::isHFile);
1855  }
1856
1857  private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
1858    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1859      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
1860  }
1861
1862  @Override
1863  public long getStorefilesRootLevelIndexSize() {
1864    return getStorefilesFieldSize(StoreFileReader::indexSize);
1865  }
1866
1867  @Override
1868  public long getTotalStaticIndexSize() {
1869    return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);
1870  }
1871
1872  @Override
1873  public long getTotalStaticBloomSize() {
1874    return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);
1875  }
1876
1877  @Override
1878  public MemStoreSize getMemStoreSize() {
1879    return this.memstore.size();
1880  }
1881
1882  @Override
1883  public int getCompactPriority() {
1884    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1885    if (priority == PRIORITY_USER) {
1886      LOG.warn("Compaction priority is USER despite there being no user compaction");
1887    }
1888    return priority;
1889  }
1890
1891  public boolean throttleCompaction(long compactionSize) {
1892    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1893  }
1894
1895  public HRegion getHRegion() {
1896    return this.region;
1897  }
1898
1899  public RegionCoprocessorHost getCoprocessorHost() {
1900    return this.region.getCoprocessorHost();
1901  }
1902
1903  @Override
1904  public RegionInfo getRegionInfo() {
1905    return getRegionFileSystem().getRegionInfo();
1906  }
1907
1908  @Override
1909  public boolean areWritesEnabled() {
1910    return this.region.areWritesEnabled();
1911  }
1912
1913  @Override
1914  public long getSmallestReadPoint() {
1915    return this.region.getSmallestReadPoint();
1916  }
1917
1918  /**
1919   * Adds or replaces the specified KeyValues.
1920   * <p>
1921   * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
1922   * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
1923   * <p>
1924   * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
1925   * across all of them.
1926   * @param readpoint readpoint below which we can safely remove duplicate KVs
1927   */
1928  public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) {
1929    this.storeEngine.readLock();
1930    try {
1931      this.memstore.upsert(cells, readpoint, memstoreSizing);
1932    } finally {
1933      this.storeEngine.readUnlock();
1934    }
1935  }
1936
1937  public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
1938    return new StoreFlusherImpl(cacheFlushId, tracker);
1939  }
1940
1941  private final class StoreFlusherImpl implements StoreFlushContext {
1942
1943    private final FlushLifeCycleTracker tracker;
1944    private final StoreFileWriterCreationTracker writerCreationTracker;
1945    private final long cacheFlushSeqNum;
1946    private MemStoreSnapshot snapshot;
1947    private List<Path> tempFiles;
1948    private List<Path> committedFiles;
1949    private long cacheFlushCount;
1950    private long cacheFlushSize;
1951    private long outputFileSize;
1952
1953    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
1954      this.cacheFlushSeqNum = cacheFlushSeqNum;
1955      this.tracker = tracker;
1956      this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
1957    }
1958
1959    /**
1960     * This is not thread safe. The caller should have a lock on the region or the store. If
1961     * necessary, the lock can be added with the patch provided in HBASE-10087
1962     */
1963    @Override
1964    public MemStoreSize prepare() {
1965      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
1966      this.snapshot = memstore.snapshot();
1967      this.cacheFlushCount = snapshot.getCellsCount();
1968      this.cacheFlushSize = snapshot.getDataSize();
1969      committedFiles = new ArrayList<>(1);
1970      return snapshot.getMemStoreSize();
1971    }
1972
1973    @Override
1974    public void flushCache(MonitoredTask status) throws IOException {
1975      RegionServerServices rsService = region.getRegionServerServices();
1976      ThroughputController throughputController =
1977        rsService == null ? null : rsService.getFlushThroughputController();
1978      // it could be null if we do not need to track the creation of store file writer due to
1979      // different SFT implementation.
1980      if (writerCreationTracker != null) {
1981        HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
1982      }
1983      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController,
1984        tracker, writerCreationTracker);
1985    }
1986
1987    @Override
1988    public boolean commit(MonitoredTask status) throws IOException {
1989      try {
1990        if (CollectionUtils.isEmpty(this.tempFiles)) {
1991          return false;
1992        }
1993        status.setStatus("Flushing " + this + ": reopening flushed file");
1994        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false, false);
1995        for (HStoreFile sf : storeFiles) {
1996          StoreFileReader r = sf.getReader();
1997          if (LOG.isInfoEnabled()) {
1998            LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
1999              cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
2000          }
2001          outputFileSize += r.length();
2002          storeSize.addAndGet(r.length());
2003          totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
2004          committedFiles.add(sf.getPath());
2005        }
2006
2007        flushedCellsCount.addAndGet(cacheFlushCount);
2008        flushedCellsSize.addAndGet(cacheFlushSize);
2009        flushedOutputFileSize.addAndGet(outputFileSize);
2010        // call coprocessor after we have done all the accounting above
2011        for (HStoreFile sf : storeFiles) {
2012          if (getCoprocessorHost() != null) {
2013            getCoprocessorHost().postFlush(HStore.this, sf, tracker);
2014          }
2015        }
2016        // Add new file to store files. Clear snapshot too while we have the Store write lock.
2017        return completeFlush(storeFiles, snapshot.getId());
2018      } finally {
2019        if (writerCreationTracker != null) {
2020          HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
2021        }
2022      }
2023    }
2024
2025    @Override
2026    public long getOutputFileSize() {
2027      return outputFileSize;
2028    }
2029
2030    @Override
2031    public List<Path> getCommittedFiles() {
2032      return committedFiles;
2033    }
2034
2035    /**
2036     * Similar to commit, but called in secondary region replicas for replaying the flush cache from
2037     * primary region. Adds the new files to the store, and drops the snapshot depending on
2038     * dropMemstoreSnapshot argument.
2039     * @param fileNames            names of the flushed files
2040     * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2041     */
2042    @Override
2043    public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2044      throws IOException {
2045      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
2046      for (String file : fileNames) {
2047        // open the file as a store file (hfile link, etc)
2048        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
2049        StoreFileInfo storeFileInfo =
2050          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file, sft);
2051        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
2052        storeFiles.add(storeFile);
2053        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
2054        HStore.this.totalUncompressedBytes
2055          .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
2056        if (LOG.isInfoEnabled()) {
2057          LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries()
2058            + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="
2059            + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
2060        }
2061      }
2062
2063      long snapshotId = -1; // -1 means do not drop
2064      if (dropMemstoreSnapshot && snapshot != null) {
2065        snapshotId = snapshot.getId();
2066      }
2067      HStore.this.completeFlush(storeFiles, snapshotId);
2068    }
2069
2070    /**
2071     * Abort the snapshot preparation. Drops the snapshot if any.
2072     */
2073    @Override
2074    public void abort() throws IOException {
2075      if (snapshot != null) {
2076        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
2077      }
2078    }
2079  }
2080
2081  @Override
2082  public boolean needsCompaction() {
2083    List<HStoreFile> filesCompactingClone = null;
2084    synchronized (filesCompacting) {
2085      filesCompactingClone = Lists.newArrayList(filesCompacting);
2086    }
2087    return this.storeEngine.needsCompaction(filesCompactingClone);
2088  }
2089
2090  /**
2091   * Used for tests.
2092   * @return cache configuration for this Store.
2093   */
2094  public CacheConfig getCacheConfig() {
2095    return storeContext.getCacheConf();
2096  }
2097
2098  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
2099
2100  public static final long DEEP_OVERHEAD = ClassSize.align(
2101    FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP
2102      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD);
2103
2104  @Override
2105  public long heapSize() {
2106    MemStoreSize memstoreSize = this.memstore.size();
2107    return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();
2108  }
2109
2110  @Override
2111  public CellComparator getComparator() {
2112    return storeContext.getComparator();
2113  }
2114
2115  public ScanInfo getScanInfo() {
2116    return scanInfo;
2117  }
2118
2119  /**
2120   * Set scan info, used by test
2121   * @param scanInfo new scan info to use for test
2122   */
2123  void setScanInfo(ScanInfo scanInfo) {
2124    this.scanInfo = scanInfo;
2125  }
2126
2127  @Override
2128  public boolean hasTooManyStoreFiles() {
2129    return getStorefilesCount() > this.blockingFileCount;
2130  }
2131
2132  @Override
2133  public long getFlushedCellsCount() {
2134    return flushedCellsCount.get();
2135  }
2136
2137  @Override
2138  public long getFlushedCellsSize() {
2139    return flushedCellsSize.get();
2140  }
2141
2142  @Override
2143  public long getFlushedOutputFileSize() {
2144    return flushedOutputFileSize.get();
2145  }
2146
2147  @Override
2148  public long getCompactedCellsCount() {
2149    return compactedCellsCount.get();
2150  }
2151
2152  @Override
2153  public long getCompactedCellsSize() {
2154    return compactedCellsSize.get();
2155  }
2156
2157  @Override
2158  public long getMajorCompactedCellsCount() {
2159    return majorCompactedCellsCount.get();
2160  }
2161
2162  @Override
2163  public long getMajorCompactedCellsSize() {
2164    return majorCompactedCellsSize.get();
2165  }
2166
2167  public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
2168    if (isMajor) {
2169      majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2170      majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
2171    } else {
2172      compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2173      compactedCellsSize.addAndGet(progress.totalCompactedSize);
2174    }
2175  }
2176
2177  /**
2178   * Returns the StoreEngine that is backing this concrete implementation of Store.
2179   * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2180   */
2181  public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2182    return this.storeEngine;
2183  }
2184
2185  protected OffPeakHours getOffPeakHours() {
2186    return this.offPeakHours;
2187  }
2188
2189  @Override
2190  public void onConfigurationChange(Configuration conf) {
2191    Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(),
2192      getColumnFamilyDescriptor());
2193    this.conf = storeConf;
2194    this.storeEngine.compactionPolicy.setConf(storeConf);
2195    this.offPeakHours = OffPeakHours.getInstance(storeConf);
2196  }
2197
2198  /**
2199   * {@inheritDoc}
2200   */
2201  @Override
2202  public void registerChildren(ConfigurationManager manager) {
2203    CacheConfig cacheConfig = this.storeContext.getCacheConf();
2204    if (cacheConfig != null) {
2205      manager.registerObserver(cacheConfig);
2206    }
2207  }
2208
2209  /**
2210   * {@inheritDoc}
2211   */
2212  @Override
2213  public void deregisterChildren(ConfigurationManager manager) {
2214    // No children to deregister
2215  }
2216
2217  @Override
2218  public double getCompactionPressure() {
2219    return storeEngine.getStoreFileManager().getCompactionPressure();
2220  }
2221
2222  @Override
2223  public boolean isPrimaryReplicaStore() {
2224    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
2225  }
2226
2227  /**
2228   * Sets the store up for a region level snapshot operation.
2229   * @see #postSnapshotOperation()
2230   */
2231  public void preSnapshotOperation() {
2232    archiveLock.lock();
2233  }
2234
2235  /**
2236   * Perform tasks needed after the completion of snapshot operation.
2237   * @see #preSnapshotOperation()
2238   */
2239  public void postSnapshotOperation() {
2240    archiveLock.unlock();
2241  }
2242
2243  /**
2244   * Closes and archives the compacted files under this store
2245   */
2246  public synchronized void closeAndArchiveCompactedFiles() throws IOException {
2247    // ensure other threads do not attempt to archive the same files on close()
2248    archiveLock.lock();
2249    try {
2250      storeEngine.readLock();
2251      Collection<HStoreFile> copyCompactedfiles = null;
2252      try {
2253        Collection<HStoreFile> compactedfiles =
2254          this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2255        if (CollectionUtils.isNotEmpty(compactedfiles)) {
2256          // Do a copy under read lock
2257          copyCompactedfiles = new ArrayList<>(compactedfiles);
2258        } else {
2259          LOG.trace("No compacted files to archive");
2260        }
2261      } finally {
2262        storeEngine.readUnlock();
2263      }
2264      if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
2265        removeCompactedfiles(copyCompactedfiles, true);
2266      }
2267    } finally {
2268      archiveLock.unlock();
2269    }
2270  }
2271
2272  /**
2273   * Archives and removes the compacted files
2274   * @param compactedfiles The compacted files in this store that are not active in reads
2275   * @param evictOnClose   true if blocks should be evicted from the cache when an HFile reader is
2276   *                       closed, false if not
2277   */
2278  private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose)
2279    throws IOException {
2280    final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
2281    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
2282    for (final HStoreFile file : compactedfiles) {
2283      synchronized (file) {
2284        try {
2285          StoreFileReader r = file.getReader();
2286          if (r == null) {
2287            LOG.debug("The file {} was closed but still not archived", file);
2288            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
2289            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
2290            // for that.
2291            long length = getStoreFileSize(file);
2292            filesToRemove.add(file);
2293            storeFileSizes.add(length);
2294            continue;
2295          }
2296
2297          if (file.isCompactedAway() && !file.isReferencedInReads()) {
2298            // Even if deleting fails we need not bother as any new scanners won't be
2299            // able to use the compacted file as the status is already compactedAway
2300            LOG.trace("Closing and archiving the file {}", file);
2301            // Copy the file size before closing the reader
2302            final long length = r.length();
2303            r.close(evictOnClose);
2304            // Just close and return
2305            filesToRemove.add(file);
2306            // Only add the length if we successfully added the file to `filesToRemove`
2307            storeFileSizes.add(length);
2308          } else {
2309            LOG.info("Can't archive compacted file " + file.getPath()
2310              + " because of either isCompactedAway=" + file.isCompactedAway()
2311              + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()
2312              + ", refCount=" + r.getRefCount() + ", skipping for now.");
2313          }
2314        } catch (Exception e) {
2315          LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
2316            e);
2317        }
2318      }
2319    }
2320    if (this.isPrimaryReplicaStore()) {
2321      // Only the primary region is allowed to move the file to archive.
2322      // The secondary region does not move the files to archive. Any active reads from
2323      // the secondary region will still work because the file as such has active readers on it.
2324      if (!filesToRemove.isEmpty()) {
2325        LOG.debug("Moving the files {} to archive", filesToRemove);
2326        // Only if this is successful it has to be removed
2327        try {
2328          getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
2329            filesToRemove);
2330        } catch (FailedArchiveException fae) {
2331          // Even if archiving some files failed, we still need to clear out any of the
2332          // files which were successfully archived. Otherwise we will receive a
2333          // FileNotFoundException when we attempt to re-archive them in the next go around.
2334          Collection<Path> failedFiles = fae.getFailedFiles();
2335          Iterator<HStoreFile> iter = filesToRemove.iterator();
2336          Iterator<Long> sizeIter = storeFileSizes.iterator();
2337          while (iter.hasNext()) {
2338            sizeIter.next();
2339            if (failedFiles.contains(iter.next().getPath())) {
2340              iter.remove();
2341              sizeIter.remove();
2342            }
2343          }
2344          if (!filesToRemove.isEmpty()) {
2345            clearCompactedfiles(filesToRemove);
2346          }
2347          throw fae;
2348        }
2349      }
2350    }
2351    if (!filesToRemove.isEmpty()) {
2352      // Clear the compactedfiles from the store file manager
2353      clearCompactedfiles(filesToRemove);
2354      // Try to send report of this archival to the Master for updating quota usage faster
2355      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
2356    }
2357  }
2358
2359  /**
2360   * Computes the length of a store file without succumbing to any errors along the way. If an error
2361   * is encountered, the implementation returns {@code 0} instead of the actual size.
2362   * @param file The file to compute the size of.
2363   * @return The size in bytes of the provided {@code file}.
2364   */
2365  long getStoreFileSize(HStoreFile file) {
2366    long length = 0;
2367    try {
2368      file.initReader();
2369      length = file.getReader().length();
2370    } catch (IOException e) {
2371      LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring",
2372        file, e);
2373    } finally {
2374      try {
2375        file.closeStoreFile(
2376          file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
2377      } catch (IOException e) {
2378        LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file,
2379          e);
2380      }
2381    }
2382    return length;
2383  }
2384
2385  public Long preFlushSeqIDEstimation() {
2386    return memstore.preFlushSeqIDEstimation();
2387  }
2388
2389  @Override
2390  public boolean isSloppyMemStore() {
2391    return this.memstore.isSloppy();
2392  }
2393
2394  private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
2395    LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
2396    storeEngine.removeCompactedFiles(filesToRemove);
2397  }
2398
2399  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
2400    // Sanity check from the caller
2401    if (archivedFiles.size() != fileSizes.size()) {
2402      throw new RuntimeException("Coding error: should never see lists of varying size");
2403    }
2404    RegionServerServices rss = this.region.getRegionServerServices();
2405    if (rss == null) {
2406      return;
2407    }
2408    List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
2409    Iterator<Long> fileSizeIter = fileSizes.iterator();
2410    for (StoreFile storeFile : archivedFiles) {
2411      final long fileSize = fileSizeIter.next();
2412      if (storeFile.isHFile() && fileSize != 0) {
2413        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
2414      }
2415    }
2416    if (LOG.isTraceEnabled()) {
2417      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
2418        + filesWithSizes);
2419    }
2420    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
2421    if (!success) {
2422      LOG.warn("Failed to report archival of files: " + filesWithSizes);
2423    }
2424  }
2425
2426  @Override
2427  public int getCurrentParallelPutCount() {
2428    return currentParallelPutCount.get();
2429  }
2430
2431  public int getStoreRefCount() {
2432    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
2433      .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2434      .mapToInt(HStoreFile::getRefCount).sum();
2435  }
2436
2437  /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */
2438  public int getMaxCompactedStoreFileRefCount() {
2439    OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
2440      .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2441      .mapToInt(HStoreFile::getRefCount).max();
2442    return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0;
2443  }
2444
2445  @Override
2446  public long getMemstoreOnlyRowReadsCount() {
2447    return memstoreOnlyRowReadsCount.sum();
2448  }
2449
2450  @Override
2451  public long getMixedRowReadsCount() {
2452    return mixedRowReadsCount.sum();
2453  }
2454
2455  @Override
2456  public Configuration getReadOnlyConfiguration() {
2457    return new ReadOnlyConfiguration(this.conf);
2458  }
2459
2460  void updateMetricsStore(boolean memstoreRead) {
2461    if (memstoreRead) {
2462      memstoreOnlyRowReadsCount.increment();
2463    } else {
2464      mixedRowReadsCount.increment();
2465    }
2466  }
2467
2468  /**
2469   * Return the storefiles which are currently being written to. Mainly used by
2470   * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
2471   * SFT yet.
2472   */
2473  public Set<Path> getStoreFilesBeingWritten() {
2474    return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
2475      .collect(Collectors.toSet());
2476  }
2477
2478  @Override
2479  public long getBloomFilterRequestsCount() {
2480    return storeEngine.getBloomFilterMetrics().getRequestsCount();
2481  }
2482
2483  @Override
2484  public long getBloomFilterNegativeResultsCount() {
2485    return storeEngine.getBloomFilterMetrics().getNegativeResultsCount();
2486  }
2487
2488  @Override
2489  public long getBloomFilterEligibleRequestsCount() {
2490    return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount();
2491  }
2492}