001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetSocketAddress;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableSet;
034import java.util.Optional;
035import java.util.OptionalDouble;
036import java.util.OptionalInt;
037import java.util.OptionalLong;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CompletionService;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorCompletionService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import java.util.concurrent.atomic.LongAdder;
050import java.util.concurrent.locks.ReentrantLock;
051import java.util.function.Consumer;
052import java.util.function.Supplier;
053import java.util.function.ToLongFunction;
054import java.util.stream.Collectors;
055import java.util.stream.LongStream;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellComparator;
062import org.apache.hadoop.hbase.CellUtil;
063import org.apache.hadoop.hbase.ExtendedCell;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.InnerStoreCellComparator;
066import org.apache.hadoop.hbase.MemoryCompactionPolicy;
067import org.apache.hadoop.hbase.MetaCellComparator;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.backup.FailedArchiveException;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.RegionInfo;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.conf.ConfigurationManager;
074import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
075import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
076import org.apache.hadoop.hbase.io.HeapSize;
077import org.apache.hadoop.hbase.io.hfile.CacheConfig;
078import org.apache.hadoop.hbase.io.hfile.HFile;
079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
083import org.apache.hadoop.hbase.monitoring.MonitoredTask;
084import org.apache.hadoop.hbase.quotas.RegionSizeStore;
085import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
086import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
087import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
088import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
089import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
090import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
091import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
092import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
093import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
094import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
095import org.apache.hadoop.hbase.security.EncryptionUtil;
096import org.apache.hadoop.hbase.security.User;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.ClassSize;
099import org.apache.hadoop.hbase.util.CommonFSUtils;
100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.ReflectionUtils;
103import org.apache.hadoop.util.StringUtils;
104import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
105import org.apache.yetus.audience.InterfaceAudience;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
110import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
112import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
113import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
119
120/**
121 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles,
122 * which stretch backwards over time.
123 * <p>
124 * There's no reason to consider append-logging at this level; all logging and locking is handled at
125 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most
126 * important of those services is compaction services where files are aggregated once they pass a
127 * configurable threshold.
128 * <p>
129 * Locking and transactions are handled at a higher level. This API should not be called directly
130 * but by an HRegion manager.
131 */
132@InterfaceAudience.Private
133public class HStore
134  implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
135  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
136  public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
137    "hbase.server.compactchecker.interval.multiplier";
138  public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
139  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
140  // "NONE" is not a valid storage policy and means we defer the policy to HDFS
141  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE";
142  public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
143  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
144
145  // HBASE-24428 : Update compaction priority for recently split daughter regions
146  // so as to prioritize their compaction.
147  // Any compaction candidate with higher priority than compaction of newly split daugher regions
148  // should have priority value < (Integer.MIN_VALUE + 1000)
149  private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
150
151  private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
152
153  protected final MemStore memstore;
154  // This stores directory in the filesystem.
155  private final HRegion region;
156  protected Configuration conf;
157  private long lastCompactSize = 0;
158  volatile boolean forceMajor = false;
159  private AtomicLong storeSize = new AtomicLong();
160  private AtomicLong totalUncompressedBytes = new AtomicLong();
161  private LongAdder memstoreOnlyRowReadsCount = new LongAdder();
162  // rows that has cells from both memstore and files (or only files)
163  private LongAdder mixedRowReadsCount = new LongAdder();
164
165  /**
166   * Lock specific to archiving compacted store files. This avoids races around the combination of
167   * retrieving the list of compacted files and moving them to the archive directory. Since this is
168   * usually a background process (other than on close), we don't want to handle this with the store
169   * write lock, which would block readers and degrade performance. Locked by: -
170   * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close()
171   */
172  final ReentrantLock archiveLock = new ReentrantLock();
173
174  private final boolean verifyBulkLoads;
175
176  /**
177   * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
178   * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a
179   * message that identifies the Store experience this high-level of concurrency.
180   */
181  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
182  private final int parallelPutCountPrintThreshold;
183
184  private ScanInfo scanInfo;
185
186  // All access must be synchronized.
187  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
188  private final List<HStoreFile> filesCompacting = Lists.newArrayList();
189
190  // All access must be synchronized.
191  private final Set<ChangedReadersObserver> changedReaderObservers =
192    Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
193
194  private HFileDataBlockEncoder dataBlockEncoder;
195
196  final StoreEngine<?, ?, ?, ?> storeEngine;
197
198  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
199  private volatile OffPeakHours offPeakHours;
200
201  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
202  private int flushRetriesNumber;
203  private int pauseTime;
204
205  private long blockingFileCount;
206  private int compactionCheckMultiplier;
207
208  private AtomicLong flushedCellsCount = new AtomicLong();
209  private AtomicLong compactedCellsCount = new AtomicLong();
210  private AtomicLong majorCompactedCellsCount = new AtomicLong();
211  private AtomicLong flushedCellsSize = new AtomicLong();
212  private AtomicLong flushedOutputFileSize = new AtomicLong();
213  private AtomicLong compactedCellsSize = new AtomicLong();
214  private AtomicLong majorCompactedCellsSize = new AtomicLong();
215
216  private final StoreContext storeContext;
217
218  // Used to track the store files which are currently being written. For compaction, if we want to
219  // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
220  // track the store files being written when flushing.
221  // Notice that the creation is in the background compaction or flush thread and we will get the
222  // files in other thread, so it needs to be thread safe.
223  private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
224
225    private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
226
227    @Override
228    public void accept(Path t) {
229      files.add(t);
230    }
231
232    public Set<Path> get() {
233      return Collections.unmodifiableSet(files);
234    }
235  }
236
237  // We may have multiple compaction running at the same time, and flush can also happen at the same
238  // time, so here we need to use a collection, and the collection needs to be thread safe.
239  // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
240  // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
241  // IdentityHashMap if later we want to implement hashCode or equals.
242  private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
243    Collections.newSetFromMap(new ConcurrentHashMap<>());
244
245  // For the SFT implementation which we will write tmp store file first, we do not need to clean up
246  // the broken store files under the data directory, which means we do not need to track the store
247  // file writer creation. So here we abstract a factory to return different trackers for different
248  // SFT implementations.
249  private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
250
251  private final boolean warmup;
252
253  /**
254   * Constructor
255   * @param family    HColumnDescriptor for this column
256   * @param confParam configuration object failed. Can be null.
257   */
258  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
259    final Configuration confParam, boolean warmup) throws IOException {
260    this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
261
262    this.region = region;
263    this.storeContext = initializeStoreContext(family);
264
265    // Assemble the store's home directory and Ensure it exists.
266    region.getRegionFileSystem().createStoreDir(family.getNameAsString());
267
268    // set block storage policy for store directory
269    String policyName = family.getStoragePolicy();
270    if (null == policyName) {
271      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
272    }
273    region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());
274
275    this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
276
277    // used by ScanQueryMatcher
278    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
279    LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this);
280    // Get TTL
281    long ttl = determineTTLFromFamily(family);
282    // Why not just pass a HColumnDescriptor in here altogether? Even if have
283    // to clone it?
284    scanInfo =
285      new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator());
286    this.memstore = getMemstore();
287
288    this.offPeakHours = OffPeakHours.getInstance(conf);
289
290    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
291
292    this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
293    this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY,
294      DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
295    if (this.compactionCheckMultiplier <= 0) {
296      LOG.error("Compaction check period multiplier must be positive, setting default: {}",
297        DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
298      this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
299    }
300
301    this.warmup = warmup;
302    this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
303    storeEngine.initialize(warmup);
304    // if require writing to tmp dir first, then we just return null, which indicate that we do not
305    // need to track the creation of store file writer, otherwise we return a new
306    // StoreFileWriterCreationTracker.
307    this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst()
308      ? () -> null
309      : () -> new StoreFileWriterCreationTracker();
310    refreshStoreSizeAndTotalBytes();
311
312    flushRetriesNumber =
313      conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
314    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
315    if (flushRetriesNumber <= 0) {
316      throw new IllegalArgumentException(
317        "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber);
318    }
319
320    int confPrintThreshold =
321      this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
322    if (confPrintThreshold < 10) {
323      confPrintThreshold = 10;
324    }
325    this.parallelPutCountPrintThreshold = confPrintThreshold;
326
327    LOG.info(
328      "Store={},  memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
329        + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
330      this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
331      parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType());
332  }
333
334  private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
335    return new StoreContext.Builder().withBlockSize(family.getBlocksize())
336      .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))
337      .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family))
338      .withCellComparator(region.getTableDescriptor().isMetaTable() || conf
339        .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR)
340          ? MetaCellComparator.META_COMPARATOR
341          : InnerStoreCellComparator.INNER_STORE_COMPARATOR)
342      .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles)
343      .withRegionFileSystem(region.getRegionFileSystem())
344      .withFavoredNodesSupplier(this::getFavoredNodes)
345      .withFamilyStoreDirectoryPath(
346        region.getRegionFileSystem().getStoreDir(family.getNameAsString()))
347      .withRegionCoprocessorHost(region.getCoprocessorHost()).build();
348  }
349
350  private InetSocketAddress[] getFavoredNodes() {
351    InetSocketAddress[] favoredNodes = null;
352    if (region.getRegionServerServices() != null) {
353      favoredNodes = region.getRegionServerServices()
354        .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
355    }
356    return favoredNodes;
357  }
358
359  /** Returns MemStore Instance to use in this store. */
360  private MemStore getMemstore() {
361    MemStore ms = null;
362    // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
363    MemoryCompactionPolicy inMemoryCompaction = null;
364    if (this.getTableName().isSystemTable()) {
365      inMemoryCompaction = MemoryCompactionPolicy
366        .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase());
367    } else {
368      inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();
369    }
370    if (inMemoryCompaction == null) {
371      inMemoryCompaction =
372        MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
373          CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());
374    }
375
376    switch (inMemoryCompaction) {
377      case NONE:
378        Class<? extends MemStore> memStoreClass =
379          conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class);
380        ms = ReflectionUtils.newInstance(memStoreClass,
381          new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() });
382        break;
383      default:
384        Class<? extends CompactingMemStore> compactingMemStoreClass =
385          conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
386        ms =
387          ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(),
388            this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
389    }
390    return ms;
391  }
392
393  /**
394   * Creates the cache config.
395   * @param family The current column family.
396   */
397  protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {
398    CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
399      region.getRegionServicesForStores().getByteBuffAllocator());
400    LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,
401      family.getNameAsString(), region.getRegionInfo().getEncodedName());
402    return cacheConf;
403  }
404
405  /**
406   * Creates the store engine configured for the given Store.
407   * @param store        The store. An unfortunate dependency needed due to it being passed to
408   *                     coprocessors via the compactor.
409   * @param conf         Store configuration.
410   * @param kvComparator KVComparator for storeFileManager.
411   * @return StoreEngine to use.
412   */
413  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
414    CellComparator kvComparator) throws IOException {
415    return StoreEngine.create(store, conf, kvComparator);
416  }
417
418  /** Returns TTL in seconds of the specified family */
419  public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
420    // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
421    long ttl = family.getTimeToLive();
422    if (ttl == HConstants.FOREVER) {
423      // Default is unlimited ttl.
424      ttl = Long.MAX_VALUE;
425    } else if (ttl == -1) {
426      ttl = Long.MAX_VALUE;
427    } else {
428      // Second -> ms adjust for user data
429      ttl *= 1000;
430    }
431    return ttl;
432  }
433
434  public StoreContext getStoreContext() {
435    return storeContext;
436  }
437
438  @Override
439  public String getColumnFamilyName() {
440    return this.storeContext.getFamily().getNameAsString();
441  }
442
443  @Override
444  public TableName getTableName() {
445    return this.getRegionInfo().getTable();
446  }
447
448  @Override
449  public FileSystem getFileSystem() {
450    return storeContext.getRegionFileSystem().getFileSystem();
451  }
452
453  public HRegionFileSystem getRegionFileSystem() {
454    return storeContext.getRegionFileSystem();
455  }
456
457  /* Implementation of StoreConfigInformation */
458  @Override
459  public long getStoreFileTtl() {
460    // TTL only applies if there's no MIN_VERSIONs setting on the column.
461    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
462  }
463
464  @Override
465  public long getMemStoreFlushSize() {
466    // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack
467    return this.region.memstoreFlushSize;
468  }
469
470  @Override
471  public MemStoreSize getFlushableSize() {
472    return this.memstore.getFlushableSize();
473  }
474
475  @Override
476  public MemStoreSize getSnapshotSize() {
477    return this.memstore.getSnapshotSize();
478  }
479
480  @Override
481  public long getCompactionCheckMultiplier() {
482    return this.compactionCheckMultiplier;
483  }
484
485  @Override
486  public long getBlockingFileCount() {
487    return blockingFileCount;
488  }
489  /* End implementation of StoreConfigInformation */
490
491  @Override
492  public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
493    return this.storeContext.getFamily();
494  }
495
496  @Override
497  public OptionalLong getMaxSequenceId() {
498    return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());
499  }
500
501  @Override
502  public OptionalLong getMaxMemStoreTS() {
503    return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());
504  }
505
506  /** Returns the data block encoder */
507  public HFileDataBlockEncoder getDataBlockEncoder() {
508    return dataBlockEncoder;
509  }
510
511  /**
512   * Should be used only in tests.
513   * @param blockEncoder the block delta encoder to use
514   */
515  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
516    this.dataBlockEncoder = blockEncoder;
517  }
518
519  private void postRefreshStoreFiles() throws IOException {
520    // Advance the memstore read point to be at least the new store files seqIds so that
521    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
522    // in-flight transactions might be made visible)
523    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
524    refreshStoreSizeAndTotalBytes();
525  }
526
527  @Override
528  public void refreshStoreFiles() throws IOException {
529    storeEngine.refreshStoreFiles();
530    postRefreshStoreFiles();
531  }
532
533  /**
534   * Replaces the store files that the store has with the given files. Mainly used by secondary
535   * region replicas to keep up to date with the primary region files.
536   */
537  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
538    storeEngine.refreshStoreFiles(newFiles);
539    postRefreshStoreFiles();
540  }
541
542  /**
543   * This message intends to inform the MemStore that next coming updates are going to be part of
544   * the replaying edits from WAL
545   */
546  public void startReplayingFromWAL() {
547    this.memstore.startReplayingFromWAL();
548  }
549
550  /**
551   * This message intends to inform the MemStore that the replaying edits from WAL are done
552   */
553  public void stopReplayingFromWAL() {
554    this.memstore.stopReplayingFromWAL();
555  }
556
557  /**
558   * Adds a value to the memstore
559   */
560  public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) {
561    storeEngine.readLock();
562    try {
563      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
564        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
565          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
566      }
567      this.memstore.add(cell, memstoreSizing);
568    } finally {
569      storeEngine.readUnlock();
570      currentParallelPutCount.decrementAndGet();
571    }
572  }
573
574  /**
575   * Adds the specified value to the memstore
576   */
577  public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) {
578    storeEngine.readLock();
579    try {
580      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
581        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
582          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
583      }
584      memstore.add(cells, memstoreSizing);
585    } finally {
586      storeEngine.readUnlock();
587      currentParallelPutCount.decrementAndGet();
588    }
589  }
590
591  @Override
592  public long timeOfOldestEdit() {
593    return memstore.timeOfOldestEdit();
594  }
595
596  /** Returns All store files. */
597  @Override
598  public Collection<HStoreFile> getStorefiles() {
599    return this.storeEngine.getStoreFileManager().getStoreFiles();
600  }
601
602  @Override
603  public Collection<HStoreFile> getCompactedFiles() {
604    return this.storeEngine.getStoreFileManager().getCompactedfiles();
605  }
606
607  /**
608   * This throws a WrongRegionException if the HFile does not fit in this region, or an
609   * InvalidHFileException if the HFile is not valid.
610   */
611  public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
612    HFile.Reader reader = null;
613    try {
614      LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);
615      FileSystem srcFs = srcPath.getFileSystem(conf);
616      srcFs.access(srcPath, FsAction.READ_WRITE);
617      reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);
618
619      Optional<byte[]> firstKey = reader.getFirstRowKey();
620      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
621      Optional<ExtendedCell> lk = reader.getLastKey();
622      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
623      byte[] lastKey = CellUtil.cloneRow(lk.get());
624
625      if (LOG.isDebugEnabled()) {
626        LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last="
627          + Bytes.toStringBinary(lastKey));
628        LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey())
629          + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
630      }
631
632      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
633        throw new WrongRegionException("Bulk load file " + srcPath.toString()
634          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
635      }
636
637      if (
638        reader.length()
639            > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
640      ) {
641        LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length()
642          + " bytes can be problematic as it may lead to oversplitting.");
643      }
644
645      if (verifyBulkLoads) {
646        long verificationStartTime = EnvironmentEdgeManager.currentTime();
647        LOG.info("Full verification started for bulk load hfile: {}", srcPath);
648        Cell prevCell = null;
649        HFileScanner scanner = reader.getScanner(conf, false, false, false);
650        scanner.seekTo();
651        do {
652          Cell cell = scanner.getCell();
653          if (prevCell != null) {
654            if (getComparator().compareRows(prevCell, cell) > 0) {
655              throw new InvalidHFileException("Previous row is greater than" + " current row: path="
656                + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current="
657                + CellUtil.getCellKeyAsString(cell));
658            }
659            if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {
660              throw new InvalidHFileException("Previous key had different"
661                + " family compared to current key: path=" + srcPath + " previous="
662                + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
663                  prevCell.getFamilyLength())
664                + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
665                  cell.getFamilyLength()));
666            }
667          }
668          prevCell = cell;
669        } while (scanner.next());
670        LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took "
671          + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms");
672      }
673    } finally {
674      if (reader != null) {
675        reader.close();
676      }
677    }
678  }
679
680  /**
681   * This method should only be called from Region. It is assumed that the ranges of values in the
682   * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
683   * @param seqNum sequence Id associated with the HFile
684   */
685  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
686    Path srcPath = new Path(srcPathStr);
687    return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
688  }
689
690  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
691    Path srcPath = new Path(srcPathStr);
692    try {
693      getRegionFileSystem().commitStoreFile(srcPath, dstPath);
694    } finally {
695      if (this.getCoprocessorHost() != null) {
696        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
697      }
698    }
699
700    LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath
701      + " - updating store file list.");
702
703    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
704    bulkLoadHFile(sf);
705
706    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
707
708    return dstPath;
709  }
710
711  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
712    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
713    bulkLoadHFile(sf);
714  }
715
716  private void bulkLoadHFile(HStoreFile sf) throws IOException {
717    StoreFileReader r = sf.getReader();
718    this.storeSize.addAndGet(r.length());
719    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
720    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
721    });
722    LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
723    if (LOG.isTraceEnabled()) {
724      String traceMessage = "BULK LOAD time,size,store size,store files ["
725        + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + ","
726        + storeEngine.getStoreFileManager().getStorefileCount() + "]";
727      LOG.trace(traceMessage);
728    }
729  }
730
731  private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
732    memstore.close();
733    // Clear so metrics doesn't find them.
734    ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
735    Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
736    // clear the compacted files
737    if (CollectionUtils.isNotEmpty(compactedfiles)) {
738      removeCompactedfiles(compactedfiles,
739        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
740    }
741    if (!result.isEmpty()) {
742      // initialize the thread pool for closing store files in parallel.
743      ThreadPoolExecutor storeFileCloserThreadPool =
744        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
745          + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
746
747      // close each store file in parallel
748      CompletionService<Void> completionService =
749        new ExecutorCompletionService<>(storeFileCloserThreadPool);
750      for (HStoreFile f : result) {
751        completionService.submit(new Callable<Void>() {
752          @Override
753          public Void call() throws IOException {
754            boolean evictOnClose =
755              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
756            f.closeStoreFile(!warmup && evictOnClose);
757            return null;
758          }
759        });
760      }
761
762      IOException ioe = null;
763      try {
764        for (int i = 0; i < result.size(); i++) {
765          try {
766            Future<Void> future = completionService.take();
767            future.get();
768          } catch (InterruptedException e) {
769            if (ioe == null) {
770              ioe = new InterruptedIOException();
771              ioe.initCause(e);
772            }
773          } catch (ExecutionException e) {
774            if (ioe == null) {
775              ioe = new IOException(e.getCause());
776            }
777          }
778        }
779      } finally {
780        storeFileCloserThreadPool.shutdownNow();
781      }
782      if (ioe != null) {
783        throw ioe;
784      }
785    }
786    LOG.trace("Closed {}", this);
787    return result;
788  }
789
790  /**
791   * Close all the readers We don't need to worry about subsequent requests because the Region holds
792   * a write lock that will prevent any more reads or writes.
793   * @return the {@link StoreFile StoreFiles} that were previously being used.
794   * @throws IOException on failure
795   */
796  public ImmutableCollection<HStoreFile> close() throws IOException {
797    // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
798    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
799    // Change later if findbugs becomes smarter in the future.
800    this.archiveLock.lock();
801    try {
802      this.storeEngine.writeLock();
803      try {
804        return closeWithoutLock();
805      } finally {
806        this.storeEngine.writeUnlock();
807      }
808    } finally {
809      this.archiveLock.unlock();
810    }
811  }
812
813  /**
814   * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called
815   * previously.
816   * @param logCacheFlushId flush sequence number
817   * @return The path name of the tmp file to which the store was flushed
818   * @throws IOException if exception occurs during process
819   */
820  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
821    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
822    Consumer<Path> writerCreationTracker) throws IOException {
823    // If an exception happens flushing, we let it out without clearing
824    // the memstore snapshot. The old snapshot will be returned when we say
825    // 'snapshot', the next time flush comes around.
826    // Retry after catching exception when flushing, otherwise server will abort
827    // itself
828    StoreFlusher flusher = storeEngine.getStoreFlusher();
829    IOException lastException = null;
830    for (int i = 0; i < flushRetriesNumber; i++) {
831      try {
832        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status,
833          throughputController, tracker, writerCreationTracker);
834        Path lastPathName = null;
835        try {
836          for (Path pathName : pathNames) {
837            lastPathName = pathName;
838            storeEngine.validateStoreFile(pathName);
839          }
840          return pathNames;
841        } catch (Exception e) {
842          LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
843          if (e instanceof IOException) {
844            lastException = (IOException) e;
845          } else {
846            lastException = new IOException(e);
847          }
848        }
849      } catch (IOException e) {
850        LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e);
851        lastException = e;
852      }
853      if (lastException != null && i < (flushRetriesNumber - 1)) {
854        try {
855          Thread.sleep(pauseTime);
856        } catch (InterruptedException e) {
857          IOException iie = new InterruptedIOException();
858          iie.initCause(e);
859          throw iie;
860        }
861      }
862    }
863    throw lastException;
864  }
865
866  public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
867    LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this);
868    FileSystem srcFs = path.getFileSystem(conf);
869    srcFs.access(path, FsAction.READ_WRITE);
870    try (HFile.Reader reader =
871      HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {
872      Optional<byte[]> firstKey = reader.getFirstRowKey();
873      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
874      Optional<ExtendedCell> lk = reader.getLastKey();
875      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
876      byte[] lastKey = CellUtil.cloneRow(lk.get());
877      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
878        throw new WrongRegionException("Recovered hfile " + path.toString()
879          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
880      }
881    }
882
883    Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
884    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
885    StoreFileReader r = sf.getReader();
886    this.storeSize.addAndGet(r.length());
887    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
888
889    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
890    });
891
892    LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
893      r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
894    return sf;
895  }
896
897  private long getTotalSize(Collection<HStoreFile> sfs) {
898    return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
899  }
900
901  private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
902    // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
903    // close {@link DefaultMemStore#snapshot}, which may be used by
904    // {@link DefaultMemStore#getScanners}.
905    storeEngine.addStoreFiles(sfs,
906      // NOTE: here we must increase the refCount for storeFiles because we would open the
907      // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
908      // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
909      // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
910      // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
911      // storeEngine lock. see HBASE-27519 for more details.
912      snapshotId > 0 ? () -> {
913        this.memstore.clearSnapshot(snapshotId);
914        HStoreFile.increaseStoreFilesRefeCount(sfs);
915      } : () -> {
916        HStoreFile.increaseStoreFilesRefeCount(sfs);
917      });
918    // notify to be called here - only in case of flushes
919    try {
920      notifyChangedReadersObservers(sfs);
921    } finally {
922      HStoreFile.decreaseStoreFilesRefeCount(sfs);
923    }
924    if (LOG.isTraceEnabled()) {
925      long totalSize = getTotalSize(sfs);
926      String traceMessage = "FLUSH time,count,size,store size,store files ["
927        + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + ","
928        + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
929      LOG.trace(traceMessage);
930    }
931    return needsCompaction();
932  }
933
934  /**
935   * Notify all observers that set of Readers has changed.
936   */
937  private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
938    for (ChangedReadersObserver o : this.changedReaderObservers) {
939      List<KeyValueScanner> memStoreScanners;
940      this.storeEngine.readLock();
941      try {
942        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
943      } finally {
944        this.storeEngine.readUnlock();
945      }
946      o.updateReaders(sfs, memStoreScanners);
947    }
948  }
949
950  /**
951   * Get all scanners with no filtering based on TTL (that happens further down the line).
952   * @param cacheBlocks  cache the blocks or not
953   * @param usePread     true to use pread, false if not
954   * @param isCompaction true if the scanner is created for compaction
955   * @param matcher      the scan query matcher
956   * @param startRow     the start row
957   * @param stopRow      the stop row
958   * @param readPt       the read point of the current scan
959   * @return all scanners for this store
960   */
961  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
962    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt,
963    boolean onlyLatestVersion) throws IOException {
964    return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
965      readPt, onlyLatestVersion);
966  }
967
968  /**
969   * Get all scanners with no filtering based on TTL (that happens further down the line).
970   * @param cacheBlocks     cache the blocks or not
971   * @param usePread        true to use pread, false if not
972   * @param isCompaction    true if the scanner is created for compaction
973   * @param matcher         the scan query matcher
974   * @param startRow        the start row
975   * @param includeStartRow true to include start row, false if not
976   * @param stopRow         the stop row
977   * @param includeStopRow  true to include stop row, false if not
978   * @param readPt          the read point of the current scan
979   * @return all scanners for this store
980   */
981  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
982    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
983    byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)
984    throws IOException {
985    Collection<HStoreFile> storeFilesToScan;
986    List<KeyValueScanner> memStoreScanners;
987    this.storeEngine.readLock();
988    try {
989      storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
990        includeStartRow, stopRow, includeStopRow, onlyLatestVersion);
991      memStoreScanners = this.memstore.getScanners(readPt);
992      // NOTE: here we must increase the refCount for storeFiles because we would open the
993      // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
994      // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
995      // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
996      // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
997      // for more details.
998      HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
999    } finally {
1000      this.storeEngine.readUnlock();
1001    }
1002    try {
1003      // First the store file scanners
1004
1005      // TODO this used to get the store files in descending order,
1006      // but now we get them in ascending order, which I think is
1007      // actually more correct, since memstore get put at the end.
1008      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
1009        storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1010      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1011      scanners.addAll(sfScanners);
1012      // Then the memstore scanners
1013      scanners.addAll(memStoreScanners);
1014      return scanners;
1015    } catch (Throwable t) {
1016      clearAndClose(memStoreScanners);
1017      throw t instanceof IOException ? (IOException) t : new IOException(t);
1018    } finally {
1019      HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
1020    }
1021  }
1022
1023  private static void clearAndClose(List<KeyValueScanner> scanners) {
1024    if (scanners == null) {
1025      return;
1026    }
1027    for (KeyValueScanner s : scanners) {
1028      s.close();
1029    }
1030    scanners.clear();
1031  }
1032
1033  /**
1034   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1035   * (that happens further down the line).
1036   * @param files                  the list of files on which the scanners has to be created
1037   * @param cacheBlocks            cache the blocks or not
1038   * @param usePread               true to use pread, false if not
1039   * @param isCompaction           true if the scanner is created for compaction
1040   * @param matcher                the scan query matcher
1041   * @param startRow               the start row
1042   * @param stopRow                the stop row
1043   * @param readPt                 the read point of the current scan
1044   * @param includeMemstoreScanner true if memstore has to be included
1045   * @return scanners on the given files and on the memstore if specified
1046   */
1047  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1048    boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1049    byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner,
1050    boolean onlyLatestVersion) throws IOException {
1051    return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
1052      false, readPt, includeMemstoreScanner, onlyLatestVersion);
1053  }
1054
1055  /**
1056   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1057   * (that happens further down the line).
1058   * @param files                  the list of files on which the scanners has to be created
1059   * @param cacheBlocks            ache the blocks or not
1060   * @param usePread               true to use pread, false if not
1061   * @param isCompaction           true if the scanner is created for compaction
1062   * @param matcher                the scan query matcher
1063   * @param startRow               the start row
1064   * @param includeStartRow        true to include start row, false if not
1065   * @param stopRow                the stop row
1066   * @param includeStopRow         true to include stop row, false if not
1067   * @param readPt                 the read point of the current scan
1068   * @param includeMemstoreScanner true if memstore has to be included
1069   * @return scanners on the given files and on the memstore if specified
1070   */
1071  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1072    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1073    boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1074    boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1075    List<KeyValueScanner> memStoreScanners = null;
1076    if (includeMemstoreScanner) {
1077      this.storeEngine.readLock();
1078      try {
1079        memStoreScanners = this.memstore.getScanners(readPt);
1080      } finally {
1081        this.storeEngine.readUnlock();
1082      }
1083    }
1084    try {
1085      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1086        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1087      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1088      scanners.addAll(sfScanners);
1089      // Then the memstore scanners
1090      if (memStoreScanners != null) {
1091        scanners.addAll(memStoreScanners);
1092      }
1093      return scanners;
1094    } catch (Throwable t) {
1095      clearAndClose(memStoreScanners);
1096      throw t instanceof IOException ? (IOException) t : new IOException(t);
1097    }
1098  }
1099
1100  /**
1101   * @param o Observer who wants to know about changes in set of Readers
1102   */
1103  public void addChangedReaderObserver(ChangedReadersObserver o) {
1104    this.changedReaderObservers.add(o);
1105  }
1106
1107  /**
1108   * @param o Observer no longer interested in changes in set of Readers.
1109   */
1110  public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1111    // We don't check if observer present; it may not be (legitimately)
1112    this.changedReaderObservers.remove(o);
1113  }
1114
1115  //////////////////////////////////////////////////////////////////////////////
1116  // Compaction
1117  //////////////////////////////////////////////////////////////////////////////
1118
1119  /**
1120   * Compact the StoreFiles. This method may take some time, so the calling thread must be able to
1121   * block for long periods.
1122   * <p>
1123   * During this time, the Store can work as usual, getting values from StoreFiles and writing new
1124   * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted
1125   * StoreFile is completely written-out to disk.
1126   * <p>
1127   * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from
1128   * interfering with other write operations.
1129   * <p>
1130   * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and
1131   * we want to allow cache-flushes during this period.
1132   * <p>
1133   * Compaction event should be idempotent, since there is no IO Fencing for the region directory in
1134   * hdfs. A region server might still try to complete the compaction after it lost the region. That
1135   * is why the following events are carefully ordered for a compaction: 1. Compaction writes new
1136   * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the
1137   * temporary file under region directory 3. Compaction appends a WAL edit containing the
1138   * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files
1139   * from the region directory. Failure conditions are handled like this: - If RS fails before 2,
1140   * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only
1141   * write the new data file to the region directory. Since we already have this data, this will be
1142   * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the
1143   * region will have a redundant copy of the data. The RS that failed won't be able to finish
1144   * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region
1145   * server who opens the region will pick up the the compaction marker from the WAL and replay it
1146   * by removing the compaction input files. Failed RS can also attempt to delete those files, but
1147   * the operation will be idempotent See HBASE-2231 for details.
1148   * @param compaction compaction details obtained from requestCompaction()
1149   * @return Storefile we compacted into or null if we failed or opted out early.
1150   */
1151  public List<HStoreFile> compact(CompactionContext compaction,
1152    ThroughputController throughputController, User user) throws IOException {
1153    assert compaction != null;
1154    CompactionRequestImpl cr = compaction.getRequest();
1155    StoreFileWriterCreationTracker writerCreationTracker =
1156      storeFileWriterCreationTrackerFactory.get();
1157    if (writerCreationTracker != null) {
1158      cr.setWriterCreationTracker(writerCreationTracker);
1159      storeFileWriterCreationTrackers.add(writerCreationTracker);
1160    }
1161    try {
1162      // Do all sanity checking in here if we have a valid CompactionRequestImpl
1163      // because we need to clean up after it on the way out in a finally
1164      // block below
1165      long compactionStartTime = EnvironmentEdgeManager.currentTime();
1166      assert compaction.hasSelection();
1167      Collection<HStoreFile> filesToCompact = cr.getFiles();
1168      assert !filesToCompact.isEmpty();
1169      synchronized (filesCompacting) {
1170        // sanity check: we're compacting files that this store knows about
1171        // TODO: change this to LOG.error() after more debugging
1172        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1173      }
1174
1175      // Ready to go. Have list of files to compact.
1176      LOG.info("Starting compaction of " + filesToCompact + " into tmpdir="
1177        + getRegionFileSystem().getTempDir() + ", totalSize="
1178        + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1179
1180      return doCompaction(cr, filesToCompact, user, compactionStartTime,
1181        compaction.compact(throughputController, user));
1182    } finally {
1183      finishCompactionRequest(cr);
1184    }
1185  }
1186
1187  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
1188    Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles)
1189    throws IOException {
1190    // Do the steps necessary to complete the compaction.
1191    setStoragePolicyFromFileName(newFiles);
1192    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
1193    if (this.getCoprocessorHost() != null) {
1194      for (HStoreFile sf : sfs) {
1195        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
1196      }
1197    }
1198    replaceStoreFiles(filesToCompact, sfs, true);
1199
1200    long outputBytes = getTotalSize(sfs);
1201
1202    // At this point the store will use new files for all new scanners.
1203    refreshStoreSizeAndTotalBytes(); // update store size.
1204
1205    long now = EnvironmentEdgeManager.currentTime();
1206    if (
1207      region.getRegionServerServices() != null
1208        && region.getRegionServerServices().getMetrics() != null
1209    ) {
1210      region.getRegionServerServices().getMetrics().updateCompaction(
1211        region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(),
1212        now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1213        outputBytes);
1214
1215    }
1216
1217    logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1218    return sfs;
1219  }
1220
1221  // Set correct storage policy from the file name of DTCP.
1222  // Rename file will not change the storage policy.
1223  private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
1224    String prefix = HConstants.STORAGE_POLICY_PREFIX;
1225    for (Path newFile : newFiles) {
1226      if (newFile.getParent().getName().startsWith(prefix)) {
1227        CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,
1228          newFile.getParent().getName().substring(prefix.length()));
1229      }
1230    }
1231  }
1232
1233  /**
1234   * Writes the compaction WAL record.
1235   * @param filesCompacted Files compacted (input).
1236   * @param newFiles       Files from compaction.
1237   */
1238  private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
1239    Collection<HStoreFile> newFiles) throws IOException {
1240    if (region.getWAL() == null) {
1241      return;
1242    }
1243    List<Path> inputPaths =
1244      filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1245    List<Path> outputPaths =
1246      newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1247    RegionInfo info = this.region.getRegionInfo();
1248    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1249      getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,
1250      getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
1251    // Fix reaching into Region to get the maxWaitForSeqId.
1252    // Does this method belong in Region altogether given it is making so many references up there?
1253    // Could be Region#writeCompactionMarker(compactionDescriptor);
1254    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1255      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
1256      region.getRegionReplicationSink().orElse(null));
1257  }
1258
1259  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1260      allowedOnPath = ".*/(HStore|TestHStore).java")
1261  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
1262    boolean writeCompactionMarker) throws IOException {
1263    storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
1264      if (writeCompactionMarker) {
1265        writeCompactionWalRecord(compactedFiles, result);
1266      }
1267    }, () -> {
1268      synchronized (filesCompacting) {
1269        filesCompacting.removeAll(compactedFiles);
1270      }
1271    });
1272    // These may be null when the RS is shutting down. The space quota Chores will fix the Region
1273    // sizes later so it's not super-critical if we miss these.
1274    RegionServerServices rsServices = region.getRegionServerServices();
1275    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
1276      updateSpaceQuotaAfterFileReplacement(
1277        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1278        compactedFiles, result);
1279    }
1280  }
1281
1282  /**
1283   * Updates the space quota usage for this region, removing the size for files compacted away and
1284   * adding in the size for new files.
1285   * @param sizeStore  The object tracking changes in region size for space quotas.
1286   * @param regionInfo The identifier for the region whose size is being updated.
1287   * @param oldFiles   Files removed from this store's region.
1288   * @param newFiles   Files added to this store's region.
1289   */
1290  void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo,
1291    Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) {
1292    long delta = 0;
1293    if (oldFiles != null) {
1294      for (HStoreFile compactedFile : oldFiles) {
1295        if (compactedFile.isHFile()) {
1296          delta -= compactedFile.getReader().length();
1297        }
1298      }
1299    }
1300    if (newFiles != null) {
1301      for (HStoreFile newFile : newFiles) {
1302        if (newFile.isHFile()) {
1303          delta += newFile.getReader().length();
1304        }
1305      }
1306    }
1307    sizeStore.incrementRegionSize(regionInfo, delta);
1308  }
1309
1310  /**
1311   * Log a very elaborate compaction completion message.
1312   * @param cr                  Request.
1313   * @param sfs                 Resulting files.
1314   * @param compactionStartTime Start time.
1315   */
1316  private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now,
1317    long compactionStartTime) {
1318    StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "")
1319      + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "")
1320      + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
1321    if (sfs.isEmpty()) {
1322      message.append("none, ");
1323    } else {
1324      for (HStoreFile sf : sfs) {
1325        message.append(sf.getPath().getName());
1326        message.append("(size=");
1327        message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1328        message.append("), ");
1329      }
1330    }
1331    message.append("total size for store is ")
1332      .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
1333      .append(". This selection was in queue for ")
1334      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1335      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1336      .append(" to execute.");
1337    LOG.info(message.toString());
1338    if (LOG.isTraceEnabled()) {
1339      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1340      long resultSize = getTotalSize(sfs);
1341      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1342        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1343        + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1344      LOG.trace(traceMessage);
1345    }
1346  }
1347
1348  /**
1349   * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was
1350   * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231.
1351   */
1352  public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
1353    boolean removeFiles) throws IOException {
1354    LOG.debug("Completing compaction from the WAL marker");
1355    List<String> compactionInputs = compaction.getCompactionInputList();
1356    List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1357
1358    // The Compaction Marker is written after the compaction is completed,
1359    // and the files moved into the region/family folder.
1360    //
1361    // If we crash after the entry is written, we may not have removed the
1362    // input files, but the output file is present.
1363    // (The unremoved input files will be removed by this function)
1364    //
1365    // If we scan the directory and the file is not present, it can mean that:
1366    // - The file was manually removed by the user
1367    // - The file was removed as consequence of subsequent compaction
1368    // so, we can't do anything with the "compaction output list" because those
1369    // files have already been loaded when opening the region (by virtue of
1370    // being in the store's folder) or they may be missing due to a compaction.
1371
1372    String familyName = this.getColumnFamilyName();
1373    Set<String> inputFiles = new HashSet<>();
1374    for (String compactionInput : compactionInputs) {
1375      Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);
1376      inputFiles.add(inputPath.getName());
1377    }
1378
1379    // some of the input files might already be deleted
1380    List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
1381    for (HStoreFile sf : this.getStorefiles()) {
1382      if (inputFiles.contains(sf.getPath().getName())) {
1383        inputStoreFiles.add(sf);
1384      }
1385    }
1386
1387    // check whether we need to pick up the new files
1388    List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
1389
1390    if (pickCompactionFiles) {
1391      for (HStoreFile sf : this.getStorefiles()) {
1392        compactionOutputs.remove(sf.getPath().getName());
1393      }
1394      for (String compactionOutput : compactionOutputs) {
1395        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
1396        StoreFileInfo storeFileInfo =
1397          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput, sft);
1398        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
1399        outputStoreFiles.add(storeFile);
1400      }
1401    }
1402
1403    if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1404      LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles
1405        + " with output files : " + outputStoreFiles);
1406      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
1407      this.refreshStoreSizeAndTotalBytes();
1408    }
1409  }
1410
1411  @Override
1412  public boolean hasReferences() {
1413    // Grab the read lock here, because we need to ensure that: only when the atomic
1414    // replaceStoreFiles(..) finished, we can get all the complete store file list.
1415    this.storeEngine.readLock();
1416    try {
1417      // Merge the current store files with compacted files here due to HBASE-20940.
1418      Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
1419      allStoreFiles.addAll(getCompactedFiles());
1420      return StoreUtils.hasReferences(allStoreFiles);
1421    } finally {
1422      this.storeEngine.readUnlock();
1423    }
1424  }
1425
1426  /**
1427   * getter for CompactionProgress object
1428   * @return CompactionProgress object; can be null
1429   */
1430  public CompactionProgress getCompactionProgress() {
1431    return this.storeEngine.getCompactor().getProgress();
1432  }
1433
1434  @Override
1435  public boolean shouldPerformMajorCompaction() throws IOException {
1436    for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) {
1437      // TODO: what are these reader checks all over the place?
1438      if (sf.getReader() == null) {
1439        LOG.debug("StoreFile {} has null Reader", sf);
1440        return false;
1441      }
1442    }
1443    return storeEngine.getCompactionPolicy()
1444      .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles());
1445  }
1446
1447  public Optional<CompactionContext> requestCompaction() throws IOException {
1448    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
1449  }
1450
1451  public Optional<CompactionContext> requestCompaction(int priority,
1452    CompactionLifeCycleTracker tracker, User user) throws IOException {
1453    // don't even select for compaction if writes are disabled
1454    if (!this.areWritesEnabled()) {
1455      return Optional.empty();
1456    }
1457    // Before we do compaction, try to get rid of unneeded files to simplify things.
1458    removeUnneededFiles();
1459
1460    final CompactionContext compaction = storeEngine.createCompaction();
1461    CompactionRequestImpl request = null;
1462    this.storeEngine.readLock();
1463    try {
1464      synchronized (filesCompacting) {
1465        // First, see if coprocessor would want to override selection.
1466        if (this.getCoprocessorHost() != null) {
1467          final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1468          boolean override =
1469            getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user);
1470          if (override) {
1471            // Coprocessor is overriding normal file selection.
1472            compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
1473          }
1474        }
1475
1476        // Normal case - coprocessor is not overriding file selection.
1477        if (!compaction.hasSelection()) {
1478          boolean isUserCompaction = priority == Store.PRIORITY_USER;
1479          boolean mayUseOffPeak =
1480            offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true);
1481          try {
1482            compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak,
1483              forceMajor && filesCompacting.isEmpty());
1484          } catch (IOException e) {
1485            if (mayUseOffPeak) {
1486              offPeakCompactionTracker.set(false);
1487            }
1488            throw e;
1489          }
1490          assert compaction.hasSelection();
1491          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1492            // Compaction policy doesn't want to take advantage of off-peak.
1493            offPeakCompactionTracker.set(false);
1494          }
1495        }
1496        if (this.getCoprocessorHost() != null) {
1497          this.getCoprocessorHost().postCompactSelection(this,
1498            ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
1499            compaction.getRequest(), user);
1500        }
1501        // Finally, we have the resulting files list. Check if we have any files at all.
1502        request = compaction.getRequest();
1503        Collection<HStoreFile> selectedFiles = request.getFiles();
1504        if (selectedFiles.isEmpty()) {
1505          return Optional.empty();
1506        }
1507
1508        addToCompactingFiles(selectedFiles);
1509
1510        // If we're enqueuing a major, clear the force flag.
1511        this.forceMajor = this.forceMajor && !request.isMajor();
1512
1513        // Set common request properties.
1514        // Set priority, either override value supplied by caller or from store.
1515        final int compactionPriority =
1516          (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
1517        request.setPriority(compactionPriority);
1518
1519        if (request.isAfterSplit()) {
1520          // If the store belongs to recently splitted daughter regions, better we consider
1521          // them with the higher priority in the compaction queue.
1522          // Override priority if it is lower (higher int value) than
1523          // SPLIT_REGION_COMPACTION_PRIORITY
1524          final int splitHousekeepingPriority =
1525            Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
1526          request.setPriority(splitHousekeepingPriority);
1527          LOG.info(
1528            "Keeping/Overriding Compaction request priority to {} for CF {} since it"
1529              + " belongs to recently split daughter region {}",
1530            splitHousekeepingPriority, this.getColumnFamilyName(),
1531            getRegionInfo().getRegionNameAsString());
1532        }
1533        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1534        request.setTracker(tracker);
1535      }
1536    } finally {
1537      this.storeEngine.readUnlock();
1538    }
1539
1540    if (LOG.isDebugEnabled()) {
1541      LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1542        + (request.isAllFiles() ? " (all files)" : ""));
1543    }
1544    this.region.reportCompactionRequestStart(request.isMajor());
1545    return Optional.of(compaction);
1546  }
1547
1548  /** Adds the files to compacting files. filesCompacting must be locked. */
1549  private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
1550    if (CollectionUtils.isEmpty(filesToAdd)) {
1551      return;
1552    }
1553    // Check that we do not try to compact the same StoreFile twice.
1554    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1555      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1556    }
1557    filesCompacting.addAll(filesToAdd);
1558    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1559  }
1560
1561  private void removeUnneededFiles() throws IOException {
1562    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
1563      return;
1564    }
1565    if (getColumnFamilyDescriptor().getMinVersions() > 0) {
1566      LOG.debug("Skipping expired store file removal due to min version of {} being {}", this,
1567        getColumnFamilyDescriptor().getMinVersions());
1568      return;
1569    }
1570    this.storeEngine.readLock();
1571    Collection<HStoreFile> delSfs = null;
1572    try {
1573      synchronized (filesCompacting) {
1574        long cfTtl = getStoreFileTtl();
1575        if (cfTtl != Long.MAX_VALUE) {
1576          delSfs = storeEngine.getStoreFileManager()
1577            .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1578          addToCompactingFiles(delSfs);
1579        }
1580      }
1581    } finally {
1582      this.storeEngine.readUnlock();
1583    }
1584
1585    if (CollectionUtils.isEmpty(delSfs)) {
1586      return;
1587    }
1588
1589    Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1590    replaceStoreFiles(delSfs, newFiles, true);
1591    refreshStoreSizeAndTotalBytes();
1592    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this
1593      + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
1594  }
1595
1596  public void cancelRequestedCompaction(CompactionContext compaction) {
1597    finishCompactionRequest(compaction.getRequest());
1598  }
1599
1600  private void finishCompactionRequest(CompactionRequestImpl cr) {
1601    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1602    if (cr.isOffPeak()) {
1603      offPeakCompactionTracker.set(false);
1604      cr.setOffPeak(false);
1605    }
1606    synchronized (filesCompacting) {
1607      filesCompacting.removeAll(cr.getFiles());
1608    }
1609    // The tracker could be null, for example, we do not need to track the creation of store file
1610    // writer due to different implementation of SFT, or the compaction is canceled.
1611    if (cr.getWriterCreationTracker() != null) {
1612      storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
1613    }
1614  }
1615
1616  /**
1617   * Update counts.
1618   */
1619  protected void refreshStoreSizeAndTotalBytes() throws IOException {
1620    this.storeSize.set(0L);
1621    this.totalUncompressedBytes.set(0L);
1622    for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) {
1623      StoreFileReader r = hsf.getReader();
1624      if (r == null) {
1625        LOG.debug("StoreFile {} has a null Reader", hsf);
1626        continue;
1627      }
1628      this.storeSize.addAndGet(r.length());
1629      this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1630    }
1631  }
1632
1633  /*
1634   * @param wantedVersions How many versions were asked for.
1635   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1636   */
1637  int versionsToReturn(final int wantedVersions) {
1638    if (wantedVersions <= 0) {
1639      throw new IllegalArgumentException("Number of versions must be > 0");
1640    }
1641    // Make sure we do not return more than maximum versions for this store.
1642    int maxVersions = getColumnFamilyDescriptor().getMaxVersions();
1643    return wantedVersions > maxVersions ? maxVersions : wantedVersions;
1644  }
1645
1646  @Override
1647  public boolean canSplit() {
1648    // Not split-able if we find a reference store file present in the store.
1649    boolean result = !hasReferences();
1650    if (!result) {
1651      LOG.trace("Not splittable; has references: {}", this);
1652    }
1653    return result;
1654  }
1655
1656  /**
1657   * Determines if Store should be split.
1658   */
1659  public Optional<byte[]> getSplitPoint() {
1660    this.storeEngine.readLock();
1661    try {
1662      // Should already be enforced by the split policy!
1663      assert !this.getRegionInfo().isMetaRegion();
1664      // Not split-able if we find a reference store file present in the store.
1665      if (hasReferences()) {
1666        LOG.trace("Not splittable; has references: {}", this);
1667        return Optional.empty();
1668      }
1669      return this.storeEngine.getStoreFileManager().getSplitPoint();
1670    } catch (IOException e) {
1671      LOG.warn("Failed getting store size for {}", this, e);
1672    } finally {
1673      this.storeEngine.readUnlock();
1674    }
1675    return Optional.empty();
1676  }
1677
1678  @Override
1679  public long getLastCompactSize() {
1680    return this.lastCompactSize;
1681  }
1682
1683  @Override
1684  public long getSize() {
1685    return storeSize.get();
1686  }
1687
1688  public void triggerMajorCompaction() {
1689    this.forceMajor = true;
1690  }
1691
1692  //////////////////////////////////////////////////////////////////////////////
1693  // File administration
1694  //////////////////////////////////////////////////////////////////////////////
1695
1696  /**
1697   * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
1698   * compaction.
1699   * @param scan       Scan to apply when scanning the stores
1700   * @param targetCols columns to scan
1701   * @return a scanner over the current key values
1702   * @throws IOException on failure
1703   */
1704  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
1705    throws IOException {
1706    storeEngine.readLock();
1707    try {
1708      ScanInfo scanInfo;
1709      if (this.getCoprocessorHost() != null) {
1710        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
1711      } else {
1712        scanInfo = getScanInfo();
1713      }
1714      return createScanner(scan, scanInfo, targetCols, readPt);
1715    } finally {
1716      storeEngine.readUnlock();
1717    }
1718  }
1719
1720  // HMobStore will override this method to return its own implementation.
1721  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
1722    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
1723    return scan.isReversed()
1724      ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
1725      : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
1726  }
1727
1728  /**
1729   * Recreates the scanners on the current list of active store file scanners
1730   * @param currentFileScanners    the current set of active store file scanners
1731   * @param cacheBlocks            cache the blocks or not
1732   * @param usePread               use pread or not
1733   * @param isCompaction           is the scanner for compaction
1734   * @param matcher                the scan query matcher
1735   * @param startRow               the scan's start row
1736   * @param includeStartRow        should the scan include the start row
1737   * @param stopRow                the scan's stop row
1738   * @param includeStopRow         should the scan include the stop row
1739   * @param readPt                 the read point of the current scane
1740   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
1741   * @return list of scanners recreated on the current Scanners
1742   */
1743  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
1744    boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1745    byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1746    boolean includeMemstoreScanner) throws IOException {
1747    this.storeEngine.readLock();
1748    try {
1749      Map<String, HStoreFile> name2File =
1750        new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
1751      for (HStoreFile file : getStorefiles()) {
1752        name2File.put(file.getFileInfo().getActiveFileName(), file);
1753      }
1754      Collection<HStoreFile> compactedFiles = getCompactedFiles();
1755      for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
1756        name2File.put(file.getFileInfo().getActiveFileName(), file);
1757      }
1758      List<HStoreFile> filesToReopen = new ArrayList<>();
1759      for (KeyValueScanner kvs : currentFileScanners) {
1760        assert kvs.isFileScanner();
1761        if (kvs.peek() == null) {
1762          continue;
1763        }
1764        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
1765      }
1766      if (filesToReopen.isEmpty()) {
1767        return null;
1768      }
1769      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
1770        includeStartRow, stopRow, includeStopRow, readPt, false, false);
1771    } finally {
1772      this.storeEngine.readUnlock();
1773    }
1774  }
1775
1776  @Override
1777  public String toString() {
1778    return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName();
1779  }
1780
1781  @Override
1782  public int getStorefilesCount() {
1783    return this.storeEngine.getStoreFileManager().getStorefileCount();
1784  }
1785
1786  @Override
1787  public int getCompactedFilesCount() {
1788    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
1789  }
1790
1791  private LongStream getStoreFileAgeStream() {
1792    return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> {
1793      if (sf.getReader() == null) {
1794        LOG.debug("StoreFile {} has a null Reader", sf);
1795        return false;
1796      } else {
1797        return true;
1798      }
1799    }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())
1800      .map(t -> EnvironmentEdgeManager.currentTime() - t);
1801  }
1802
1803  @Override
1804  public OptionalLong getMaxStoreFileAge() {
1805    return getStoreFileAgeStream().max();
1806  }
1807
1808  @Override
1809  public OptionalLong getMinStoreFileAge() {
1810    return getStoreFileAgeStream().min();
1811  }
1812
1813  @Override
1814  public OptionalDouble getAvgStoreFileAge() {
1815    return getStoreFileAgeStream().average();
1816  }
1817
1818  @Override
1819  public long getNumReferenceFiles() {
1820    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1821      .filter(HStoreFile::isReference).count();
1822  }
1823
1824  @Override
1825  public long getNumHFiles() {
1826    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1827      .filter(HStoreFile::isHFile).count();
1828  }
1829
1830  @Override
1831  public long getStoreSizeUncompressed() {
1832    return this.totalUncompressedBytes.get();
1833  }
1834
1835  @Override
1836  public long getStorefilesSize() {
1837    // Include all StoreFiles
1838    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),
1839      sf -> true);
1840  }
1841
1842  @Override
1843  public long getHFilesSize() {
1844    // Include only StoreFiles which are HFiles
1845    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),
1846      HStoreFile::isHFile);
1847  }
1848
1849  private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
1850    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
1851      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
1852  }
1853
1854  @Override
1855  public long getStorefilesRootLevelIndexSize() {
1856    return getStorefilesFieldSize(StoreFileReader::indexSize);
1857  }
1858
1859  @Override
1860  public long getTotalStaticIndexSize() {
1861    return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);
1862  }
1863
1864  @Override
1865  public long getTotalStaticBloomSize() {
1866    return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);
1867  }
1868
1869  @Override
1870  public MemStoreSize getMemStoreSize() {
1871    return this.memstore.size();
1872  }
1873
1874  @Override
1875  public int getCompactPriority() {
1876    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1877    if (priority == PRIORITY_USER) {
1878      LOG.warn("Compaction priority is USER despite there being no user compaction");
1879    }
1880    return priority;
1881  }
1882
1883  public boolean throttleCompaction(long compactionSize) {
1884    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1885  }
1886
1887  public HRegion getHRegion() {
1888    return this.region;
1889  }
1890
1891  public RegionCoprocessorHost getCoprocessorHost() {
1892    return this.region.getCoprocessorHost();
1893  }
1894
1895  @Override
1896  public RegionInfo getRegionInfo() {
1897    return getRegionFileSystem().getRegionInfo();
1898  }
1899
1900  @Override
1901  public boolean areWritesEnabled() {
1902    return this.region.areWritesEnabled();
1903  }
1904
1905  @Override
1906  public long getSmallestReadPoint() {
1907    return this.region.getSmallestReadPoint();
1908  }
1909
1910  /**
1911   * Adds or replaces the specified KeyValues.
1912   * <p>
1913   * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
1914   * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
1915   * <p>
1916   * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
1917   * across all of them.
1918   * @param readpoint readpoint below which we can safely remove duplicate KVs
1919   */
1920  public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) {
1921    this.storeEngine.readLock();
1922    try {
1923      this.memstore.upsert(cells, readpoint, memstoreSizing);
1924    } finally {
1925      this.storeEngine.readUnlock();
1926    }
1927  }
1928
1929  public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
1930    return new StoreFlusherImpl(cacheFlushId, tracker);
1931  }
1932
1933  private final class StoreFlusherImpl implements StoreFlushContext {
1934
1935    private final FlushLifeCycleTracker tracker;
1936    private final StoreFileWriterCreationTracker writerCreationTracker;
1937    private final long cacheFlushSeqNum;
1938    private MemStoreSnapshot snapshot;
1939    private List<Path> tempFiles;
1940    private List<Path> committedFiles;
1941    private long cacheFlushCount;
1942    private long cacheFlushSize;
1943    private long outputFileSize;
1944
1945    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
1946      this.cacheFlushSeqNum = cacheFlushSeqNum;
1947      this.tracker = tracker;
1948      this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
1949    }
1950
1951    /**
1952     * This is not thread safe. The caller should have a lock on the region or the store. If
1953     * necessary, the lock can be added with the patch provided in HBASE-10087
1954     */
1955    @Override
1956    public MemStoreSize prepare() {
1957      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
1958      this.snapshot = memstore.snapshot();
1959      this.cacheFlushCount = snapshot.getCellsCount();
1960      this.cacheFlushSize = snapshot.getDataSize();
1961      committedFiles = new ArrayList<>(1);
1962      return snapshot.getMemStoreSize();
1963    }
1964
1965    @Override
1966    public void flushCache(MonitoredTask status) throws IOException {
1967      RegionServerServices rsService = region.getRegionServerServices();
1968      ThroughputController throughputController =
1969        rsService == null ? null : rsService.getFlushThroughputController();
1970      // it could be null if we do not need to track the creation of store file writer due to
1971      // different SFT implementation.
1972      if (writerCreationTracker != null) {
1973        HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
1974      }
1975      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController,
1976        tracker, writerCreationTracker);
1977    }
1978
1979    @Override
1980    public boolean commit(MonitoredTask status) throws IOException {
1981      try {
1982        if (CollectionUtils.isEmpty(this.tempFiles)) {
1983          return false;
1984        }
1985        status.setStatus("Flushing " + this + ": reopening flushed file");
1986        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
1987        for (HStoreFile sf : storeFiles) {
1988          StoreFileReader r = sf.getReader();
1989          if (LOG.isInfoEnabled()) {
1990            LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
1991              cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
1992          }
1993          outputFileSize += r.length();
1994          storeSize.addAndGet(r.length());
1995          totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1996          committedFiles.add(sf.getPath());
1997        }
1998
1999        flushedCellsCount.addAndGet(cacheFlushCount);
2000        flushedCellsSize.addAndGet(cacheFlushSize);
2001        flushedOutputFileSize.addAndGet(outputFileSize);
2002        // call coprocessor after we have done all the accounting above
2003        for (HStoreFile sf : storeFiles) {
2004          if (getCoprocessorHost() != null) {
2005            getCoprocessorHost().postFlush(HStore.this, sf, tracker);
2006          }
2007        }
2008        // Add new file to store files. Clear snapshot too while we have the Store write lock.
2009        return completeFlush(storeFiles, snapshot.getId());
2010      } finally {
2011        if (writerCreationTracker != null) {
2012          HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
2013        }
2014      }
2015    }
2016
2017    @Override
2018    public long getOutputFileSize() {
2019      return outputFileSize;
2020    }
2021
2022    @Override
2023    public List<Path> getCommittedFiles() {
2024      return committedFiles;
2025    }
2026
2027    /**
2028     * Similar to commit, but called in secondary region replicas for replaying the flush cache from
2029     * primary region. Adds the new files to the store, and drops the snapshot depending on
2030     * dropMemstoreSnapshot argument.
2031     * @param fileNames            names of the flushed files
2032     * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2033     */
2034    @Override
2035    public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2036      throws IOException {
2037      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
2038      for (String file : fileNames) {
2039        // open the file as a store file (hfile link, etc)
2040        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, storeContext);
2041        StoreFileInfo storeFileInfo =
2042          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file, sft);
2043        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
2044        storeFiles.add(storeFile);
2045        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
2046        HStore.this.totalUncompressedBytes
2047          .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
2048        if (LOG.isInfoEnabled()) {
2049          LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries()
2050            + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="
2051            + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
2052        }
2053      }
2054
2055      long snapshotId = -1; // -1 means do not drop
2056      if (dropMemstoreSnapshot && snapshot != null) {
2057        snapshotId = snapshot.getId();
2058      }
2059      HStore.this.completeFlush(storeFiles, snapshotId);
2060    }
2061
2062    /**
2063     * Abort the snapshot preparation. Drops the snapshot if any.
2064     */
2065    @Override
2066    public void abort() throws IOException {
2067      if (snapshot != null) {
2068        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
2069      }
2070    }
2071  }
2072
2073  @Override
2074  public boolean needsCompaction() {
2075    List<HStoreFile> filesCompactingClone = null;
2076    synchronized (filesCompacting) {
2077      filesCompactingClone = Lists.newArrayList(filesCompacting);
2078    }
2079    return this.storeEngine.needsCompaction(filesCompactingClone);
2080  }
2081
2082  /**
2083   * Used for tests.
2084   * @return cache configuration for this Store.
2085   */
2086  public CacheConfig getCacheConfig() {
2087    return storeContext.getCacheConf();
2088  }
2089
2090  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
2091
2092  public static final long DEEP_OVERHEAD = ClassSize.align(
2093    FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP
2094      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD);
2095
2096  @Override
2097  public long heapSize() {
2098    MemStoreSize memstoreSize = this.memstore.size();
2099    return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();
2100  }
2101
2102  @Override
2103  public CellComparator getComparator() {
2104    return storeContext.getComparator();
2105  }
2106
2107  public ScanInfo getScanInfo() {
2108    return scanInfo;
2109  }
2110
2111  /**
2112   * Set scan info, used by test
2113   * @param scanInfo new scan info to use for test
2114   */
2115  void setScanInfo(ScanInfo scanInfo) {
2116    this.scanInfo = scanInfo;
2117  }
2118
2119  @Override
2120  public boolean hasTooManyStoreFiles() {
2121    return getStorefilesCount() > this.blockingFileCount;
2122  }
2123
2124  @Override
2125  public long getFlushedCellsCount() {
2126    return flushedCellsCount.get();
2127  }
2128
2129  @Override
2130  public long getFlushedCellsSize() {
2131    return flushedCellsSize.get();
2132  }
2133
2134  @Override
2135  public long getFlushedOutputFileSize() {
2136    return flushedOutputFileSize.get();
2137  }
2138
2139  @Override
2140  public long getCompactedCellsCount() {
2141    return compactedCellsCount.get();
2142  }
2143
2144  @Override
2145  public long getCompactedCellsSize() {
2146    return compactedCellsSize.get();
2147  }
2148
2149  @Override
2150  public long getMajorCompactedCellsCount() {
2151    return majorCompactedCellsCount.get();
2152  }
2153
2154  @Override
2155  public long getMajorCompactedCellsSize() {
2156    return majorCompactedCellsSize.get();
2157  }
2158
2159  public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
2160    if (isMajor) {
2161      majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2162      majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
2163    } else {
2164      compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2165      compactedCellsSize.addAndGet(progress.totalCompactedSize);
2166    }
2167  }
2168
2169  /**
2170   * Returns the StoreEngine that is backing this concrete implementation of Store.
2171   * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2172   */
2173  public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2174    return this.storeEngine;
2175  }
2176
2177  protected OffPeakHours getOffPeakHours() {
2178    return this.offPeakHours;
2179  }
2180
2181  @Override
2182  public void onConfigurationChange(Configuration conf) {
2183    Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(),
2184      getColumnFamilyDescriptor());
2185    this.conf = storeConf;
2186    this.storeEngine.compactionPolicy.setConf(storeConf);
2187    this.offPeakHours = OffPeakHours.getInstance(storeConf);
2188  }
2189
2190  /**
2191   * {@inheritDoc}
2192   */
2193  @Override
2194  public void registerChildren(ConfigurationManager manager) {
2195    CacheConfig cacheConfig = this.storeContext.getCacheConf();
2196    if (cacheConfig != null) {
2197      manager.registerObserver(cacheConfig);
2198    }
2199  }
2200
2201  /**
2202   * {@inheritDoc}
2203   */
2204  @Override
2205  public void deregisterChildren(ConfigurationManager manager) {
2206    // No children to deregister
2207  }
2208
2209  @Override
2210  public double getCompactionPressure() {
2211    return storeEngine.getStoreFileManager().getCompactionPressure();
2212  }
2213
2214  @Override
2215  public boolean isPrimaryReplicaStore() {
2216    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
2217  }
2218
2219  /**
2220   * Sets the store up for a region level snapshot operation.
2221   * @see #postSnapshotOperation()
2222   */
2223  public void preSnapshotOperation() {
2224    archiveLock.lock();
2225  }
2226
2227  /**
2228   * Perform tasks needed after the completion of snapshot operation.
2229   * @see #preSnapshotOperation()
2230   */
2231  public void postSnapshotOperation() {
2232    archiveLock.unlock();
2233  }
2234
2235  /**
2236   * Closes and archives the compacted files under this store
2237   */
2238  public synchronized void closeAndArchiveCompactedFiles() throws IOException {
2239    // ensure other threads do not attempt to archive the same files on close()
2240    archiveLock.lock();
2241    try {
2242      storeEngine.readLock();
2243      Collection<HStoreFile> copyCompactedfiles = null;
2244      try {
2245        Collection<HStoreFile> compactedfiles =
2246          this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2247        if (CollectionUtils.isNotEmpty(compactedfiles)) {
2248          // Do a copy under read lock
2249          copyCompactedfiles = new ArrayList<>(compactedfiles);
2250        } else {
2251          LOG.trace("No compacted files to archive");
2252        }
2253      } finally {
2254        storeEngine.readUnlock();
2255      }
2256      if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
2257        removeCompactedfiles(copyCompactedfiles, true);
2258      }
2259    } finally {
2260      archiveLock.unlock();
2261    }
2262  }
2263
2264  /**
2265   * Archives and removes the compacted files
2266   * @param compactedfiles The compacted files in this store that are not active in reads
2267   * @param evictOnClose   true if blocks should be evicted from the cache when an HFile reader is
2268   *                       closed, false if not
2269   */
2270  private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose)
2271    throws IOException {
2272    final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
2273    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
2274    for (final HStoreFile file : compactedfiles) {
2275      synchronized (file) {
2276        try {
2277          StoreFileReader r = file.getReader();
2278          if (r == null) {
2279            LOG.debug("The file {} was closed but still not archived", file);
2280            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
2281            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
2282            // for that.
2283            long length = getStoreFileSize(file);
2284            filesToRemove.add(file);
2285            storeFileSizes.add(length);
2286            continue;
2287          }
2288
2289          if (file.isCompactedAway() && !file.isReferencedInReads()) {
2290            // Even if deleting fails we need not bother as any new scanners won't be
2291            // able to use the compacted file as the status is already compactedAway
2292            LOG.trace("Closing and archiving the file {}", file);
2293            // Copy the file size before closing the reader
2294            final long length = r.length();
2295            r.close(evictOnClose);
2296            // Just close and return
2297            filesToRemove.add(file);
2298            // Only add the length if we successfully added the file to `filesToRemove`
2299            storeFileSizes.add(length);
2300          } else {
2301            LOG.info("Can't archive compacted file " + file.getPath()
2302              + " because of either isCompactedAway=" + file.isCompactedAway()
2303              + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()
2304              + ", refCount=" + r.getRefCount() + ", skipping for now.");
2305          }
2306        } catch (Exception e) {
2307          LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
2308            e);
2309        }
2310      }
2311    }
2312    if (this.isPrimaryReplicaStore()) {
2313      // Only the primary region is allowed to move the file to archive.
2314      // The secondary region does not move the files to archive. Any active reads from
2315      // the secondary region will still work because the file as such has active readers on it.
2316      if (!filesToRemove.isEmpty()) {
2317        LOG.debug("Moving the files {} to archive", filesToRemove);
2318        // Only if this is successful it has to be removed
2319        try {
2320          getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
2321            filesToRemove);
2322        } catch (FailedArchiveException fae) {
2323          // Even if archiving some files failed, we still need to clear out any of the
2324          // files which were successfully archived. Otherwise we will receive a
2325          // FileNotFoundException when we attempt to re-archive them in the next go around.
2326          Collection<Path> failedFiles = fae.getFailedFiles();
2327          Iterator<HStoreFile> iter = filesToRemove.iterator();
2328          Iterator<Long> sizeIter = storeFileSizes.iterator();
2329          while (iter.hasNext()) {
2330            sizeIter.next();
2331            if (failedFiles.contains(iter.next().getPath())) {
2332              iter.remove();
2333              sizeIter.remove();
2334            }
2335          }
2336          if (!filesToRemove.isEmpty()) {
2337            clearCompactedfiles(filesToRemove);
2338          }
2339          throw fae;
2340        }
2341      }
2342    }
2343    if (!filesToRemove.isEmpty()) {
2344      // Clear the compactedfiles from the store file manager
2345      clearCompactedfiles(filesToRemove);
2346      // Try to send report of this archival to the Master for updating quota usage faster
2347      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
2348    }
2349  }
2350
2351  /**
2352   * Computes the length of a store file without succumbing to any errors along the way. If an error
2353   * is encountered, the implementation returns {@code 0} instead of the actual size.
2354   * @param file The file to compute the size of.
2355   * @return The size in bytes of the provided {@code file}.
2356   */
2357  long getStoreFileSize(HStoreFile file) {
2358    long length = 0;
2359    try {
2360      file.initReader();
2361      length = file.getReader().length();
2362    } catch (IOException e) {
2363      LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring",
2364        file, e);
2365    } finally {
2366      try {
2367        file.closeStoreFile(
2368          file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
2369      } catch (IOException e) {
2370        LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file,
2371          e);
2372      }
2373    }
2374    return length;
2375  }
2376
2377  public Long preFlushSeqIDEstimation() {
2378    return memstore.preFlushSeqIDEstimation();
2379  }
2380
2381  @Override
2382  public boolean isSloppyMemStore() {
2383    return this.memstore.isSloppy();
2384  }
2385
2386  private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
2387    LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
2388    storeEngine.removeCompactedFiles(filesToRemove);
2389  }
2390
2391  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
2392    // Sanity check from the caller
2393    if (archivedFiles.size() != fileSizes.size()) {
2394      throw new RuntimeException("Coding error: should never see lists of varying size");
2395    }
2396    RegionServerServices rss = this.region.getRegionServerServices();
2397    if (rss == null) {
2398      return;
2399    }
2400    List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
2401    Iterator<Long> fileSizeIter = fileSizes.iterator();
2402    for (StoreFile storeFile : archivedFiles) {
2403      final long fileSize = fileSizeIter.next();
2404      if (storeFile.isHFile() && fileSize != 0) {
2405        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
2406      }
2407    }
2408    if (LOG.isTraceEnabled()) {
2409      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
2410        + filesWithSizes);
2411    }
2412    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
2413    if (!success) {
2414      LOG.warn("Failed to report archival of files: " + filesWithSizes);
2415    }
2416  }
2417
2418  @Override
2419  public int getCurrentParallelPutCount() {
2420    return currentParallelPutCount.get();
2421  }
2422
2423  public int getStoreRefCount() {
2424    return this.storeEngine.getStoreFileManager().getStoreFiles().stream()
2425      .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2426      .mapToInt(HStoreFile::getRefCount).sum();
2427  }
2428
2429  /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */
2430  public int getMaxCompactedStoreFileRefCount() {
2431    OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
2432      .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2433      .mapToInt(HStoreFile::getRefCount).max();
2434    return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0;
2435  }
2436
2437  @Override
2438  public long getMemstoreOnlyRowReadsCount() {
2439    return memstoreOnlyRowReadsCount.sum();
2440  }
2441
2442  @Override
2443  public long getMixedRowReadsCount() {
2444    return mixedRowReadsCount.sum();
2445  }
2446
2447  @Override
2448  public Configuration getReadOnlyConfiguration() {
2449    return new ReadOnlyConfiguration(this.conf);
2450  }
2451
2452  void updateMetricsStore(boolean memstoreRead) {
2453    if (memstoreRead) {
2454      memstoreOnlyRowReadsCount.increment();
2455    } else {
2456      mixedRowReadsCount.increment();
2457    }
2458  }
2459
2460  /**
2461   * Return the storefiles which are currently being written to. Mainly used by
2462   * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
2463   * SFT yet.
2464   */
2465  public Set<Path> getStoreFilesBeingWritten() {
2466    return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
2467      .collect(Collectors.toSet());
2468  }
2469
2470  @Override
2471  public long getBloomFilterRequestsCount() {
2472    return storeEngine.getBloomFilterMetrics().getRequestsCount();
2473  }
2474
2475  @Override
2476  public long getBloomFilterNegativeResultsCount() {
2477    return storeEngine.getBloomFilterMetrics().getNegativeResultsCount();
2478  }
2479
2480  @Override
2481  public long getBloomFilterEligibleRequestsCount() {
2482    return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount();
2483  }
2484}