001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetSocketAddress;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableSet;
034import java.util.Optional;
035import java.util.OptionalDouble;
036import java.util.OptionalInt;
037import java.util.OptionalLong;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CompletionService;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorCompletionService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import java.util.concurrent.locks.ReentrantLock;
050import java.util.concurrent.locks.ReentrantReadWriteLock;
051import java.util.function.Predicate;
052import java.util.function.ToLongFunction;
053import java.util.stream.Collectors;
054import java.util.stream.LongStream;
055
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellComparator;
062import org.apache.hadoop.hbase.CellUtil;
063import org.apache.hadoop.hbase.CompoundConfiguration;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.MemoryCompactionPolicy;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.backup.FailedArchiveException;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.Scan;
071import org.apache.hadoop.hbase.conf.ConfigurationManager;
072import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
073import org.apache.hadoop.hbase.io.HeapSize;
074import org.apache.hadoop.hbase.io.compress.Compression;
075import org.apache.hadoop.hbase.io.crypto.Encryption;
076import org.apache.hadoop.hbase.io.hfile.CacheConfig;
077import org.apache.hadoop.hbase.io.hfile.HFile;
078import org.apache.hadoop.hbase.io.hfile.HFileContext;
079import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
080import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
082import org.apache.hadoop.hbase.io.hfile.HFileScanner;
083import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
084import org.apache.hadoop.hbase.log.HBaseMarkers;
085import org.apache.hadoop.hbase.monitoring.MonitoredTask;
086import org.apache.hadoop.hbase.quotas.RegionSizeStore;
087import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
088import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
089import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
090import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
091import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
092import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
093import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
094import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
095import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
096import org.apache.hadoop.hbase.security.EncryptionUtil;
097import org.apache.hadoop.hbase.security.User;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.ChecksumType;
100import org.apache.hadoop.hbase.util.ClassSize;
101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
102import org.apache.hadoop.hbase.util.Pair;
103import org.apache.hadoop.hbase.util.ReflectionUtils;
104import org.apache.hadoop.util.StringUtils;
105import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
106import org.apache.yetus.audience.InterfaceAudience;
107import org.slf4j.Logger;
108import org.slf4j.LoggerFactory;
109import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
110import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
111import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
112import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
113import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
114import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
115import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
116import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
117import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
118import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
120
121/**
122 * A Store holds a column family in a Region.  Its a memstore and a set of zero
123 * or more StoreFiles, which stretch backwards over time.
124 *
125 * <p>There's no reason to consider append-logging at this level; all logging
126 * and locking is handled at the HRegion level.  Store just provides
127 * services to manage sets of StoreFiles.  One of the most important of those
128 * services is compaction services where files are aggregated once they pass
129 * a configurable threshold.
130 *
131 * <p>Locking and transactions are handled at a higher level.  This API should
132 * not be called directly but by an HRegion manager.
133 */
134@InterfaceAudience.Private
135public class HStore implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
136  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
137  public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
138      "hbase.server.compactchecker.interval.multiplier";
139  public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
140  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
141  // keep in accordance with HDFS default storage policy
142  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT";
143  public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
144  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
145
146  private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
147
148  protected final MemStore memstore;
149  // This stores directory in the filesystem.
150  protected final HRegion region;
151  private final ColumnFamilyDescriptor family;
152  private final HRegionFileSystem fs;
153  protected Configuration conf;
154  protected CacheConfig cacheConf;
155  private long lastCompactSize = 0;
156  volatile boolean forceMajor = false;
157  /* how many bytes to write between status checks */
158  static int closeCheckInterval = 0;
159  private AtomicLong storeSize = new AtomicLong();
160  private AtomicLong totalUncompressedBytes = new AtomicLong();
161
162  /**
163   * RWLock for store operations.
164   * Locked in shared mode when the list of component stores is looked at:
165   *   - all reads/writes to table data
166   *   - checking for split
167   * Locked in exclusive mode when the list of component stores is modified:
168   *   - closing
169   *   - completing a compaction
170   */
171  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
172  /**
173   * Lock specific to archiving compacted store files.  This avoids races around
174   * the combination of retrieving the list of compacted files and moving them to
175   * the archive directory.  Since this is usually a background process (other than
176   * on close), we don't want to handle this with the store write lock, which would
177   * block readers and degrade performance.
178   *
179   * Locked by:
180   *   - CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles()
181   *   - close()
182   */
183  final ReentrantLock archiveLock = new ReentrantLock();
184
185  private final boolean verifyBulkLoads;
186
187  /**
188   * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
189   * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will
190   * log a message that identifies the Store experience this high-level of concurrency.
191   */
192  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
193  private final int parallelPutCountPrintThreshold;
194
195  private ScanInfo scanInfo;
196
197  // All access must be synchronized.
198  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
199  private final List<HStoreFile> filesCompacting = Lists.newArrayList();
200
201  // All access must be synchronized.
202  private final Set<ChangedReadersObserver> changedReaderObservers =
203    Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
204
205  protected final int blocksize;
206  private HFileDataBlockEncoder dataBlockEncoder;
207
208  /** Checksum configuration */
209  protected ChecksumType checksumType;
210  protected int bytesPerChecksum;
211
212  // Comparing KeyValues
213  protected final CellComparator comparator;
214
215  final StoreEngine<?, ?, ?, ?> storeEngine;
216
217  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
218  private volatile OffPeakHours offPeakHours;
219
220  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
221  private int flushRetriesNumber;
222  private int pauseTime;
223
224  private long blockingFileCount;
225  private int compactionCheckMultiplier;
226  protected Encryption.Context cryptoContext = Encryption.Context.NONE;
227
228  private AtomicLong flushedCellsCount = new AtomicLong();
229  private AtomicLong compactedCellsCount = new AtomicLong();
230  private AtomicLong majorCompactedCellsCount = new AtomicLong();
231  private AtomicLong flushedCellsSize = new AtomicLong();
232  private AtomicLong flushedOutputFileSize = new AtomicLong();
233  private AtomicLong compactedCellsSize = new AtomicLong();
234  private AtomicLong majorCompactedCellsSize = new AtomicLong();
235
236  /**
237   * Constructor
238   * @param region
239   * @param family HColumnDescriptor for this column
240   * @param confParam configuration object
241   * failed.  Can be null.
242   * @throws IOException
243   */
244  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
245      final Configuration confParam, boolean warmup) throws IOException {
246
247    this.fs = region.getRegionFileSystem();
248
249    // Assemble the store's home directory and Ensure it exists.
250    fs.createStoreDir(family.getNameAsString());
251    this.region = region;
252    this.family = family;
253    // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
254    // CompoundConfiguration will look for keys in reverse order of addition, so we'd
255    // add global config first, then table and cf overrides, then cf metadata.
256    this.conf = new CompoundConfiguration()
257      .add(confParam)
258      .addBytesMap(region.getTableDescriptor().getValues())
259      .addStringMap(family.getConfiguration())
260      .addBytesMap(family.getValues());
261    this.blocksize = family.getBlocksize();
262
263    // set block storage policy for store directory
264    String policyName = family.getStoragePolicy();
265    if (null == policyName) {
266      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
267    }
268    this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
269
270    this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
271
272    this.comparator = region.getCellComparator();
273    // used by ScanQueryMatcher
274    long timeToPurgeDeletes =
275        Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
276    LOG.trace("Time to purge deletes set to {}ms in store {}", timeToPurgeDeletes, this);
277    // Get TTL
278    long ttl = determineTTLFromFamily(family);
279    // Why not just pass a HColumnDescriptor in here altogether?  Even if have
280    // to clone it?
281    scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
282    this.memstore = getMemstore();
283
284    this.offPeakHours = OffPeakHours.getInstance(conf);
285
286    // Setting up cache configuration for this family
287    createCacheConf(family);
288
289    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
290
291    this.blockingFileCount =
292        conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
293    this.compactionCheckMultiplier = conf.getInt(
294        COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
295    if (this.compactionCheckMultiplier <= 0) {
296      LOG.error("Compaction check period multiplier must be positive, setting default: {}",
297          DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
298      this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
299    }
300
301    if (HStore.closeCheckInterval == 0) {
302      HStore.closeCheckInterval = conf.getInt(
303          "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
304    }
305
306    this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
307    List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
308    // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
309    // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
310    // update the storeSize in the completeCompaction(..) finally (just like compaction) , so
311    // no need calculate the storeSize twice.
312    this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true));
313    this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles));
314    this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles);
315
316    // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
317    this.checksumType = getChecksumType(conf);
318    // Initialize bytes per checksum
319    this.bytesPerChecksum = getBytesPerChecksum(conf);
320    flushRetriesNumber = conf.getInt(
321        "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
322    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
323    if (flushRetriesNumber <= 0) {
324      throw new IllegalArgumentException(
325          "hbase.hstore.flush.retries.number must be > 0, not "
326              + flushRetriesNumber);
327    }
328    cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
329
330    int confPrintThreshold =
331        this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
332    if (confPrintThreshold < 10) {
333      confPrintThreshold = 10;
334    }
335    this.parallelPutCountPrintThreshold = confPrintThreshold;
336    LOG.info("Store={},  memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
337            + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
338        getColumnFamilyName(), memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
339        parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
340        family.getCompressionType());
341  }
342
343  /**
344   * @return MemStore Instance to use in this store.
345   */
346  private MemStore getMemstore() {
347    MemStore ms = null;
348    // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
349    MemoryCompactionPolicy inMemoryCompaction = null;
350    if (this.getTableName().isSystemTable()) {
351      inMemoryCompaction = MemoryCompactionPolicy.valueOf(
352          conf.get("hbase.systemtables.compacting.memstore.type", "NONE"));
353    } else {
354      inMemoryCompaction = family.getInMemoryCompaction();
355    }
356    if (inMemoryCompaction == null) {
357      inMemoryCompaction =
358          MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
359              CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());
360    }
361    switch (inMemoryCompaction) {
362      case NONE:
363        ms = ReflectionUtils.newInstance(DefaultMemStore.class,
364            new Object[] { conf, this.comparator,
365                this.getHRegion().getRegionServicesForStores()});
366        break;
367      default:
368        Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
369            CompactingMemStore.class, CompactingMemStore.class);
370        ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this,
371            this.getHRegion().getRegionServicesForStores(), inMemoryCompaction});
372    }
373    return ms;
374  }
375
376  /**
377   * Creates the cache config.
378   * @param family The current column family.
379   */
380  protected void createCacheConf(final ColumnFamilyDescriptor family) {
381    this.cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
382        region.getRegionServicesForStores().getByteBuffAllocator());
383  }
384
385  /**
386   * Creates the store engine configured for the given Store.
387   * @param store The store. An unfortunate dependency needed due to it
388   *              being passed to coprocessors via the compactor.
389   * @param conf Store configuration.
390   * @param kvComparator KVComparator for storeFileManager.
391   * @return StoreEngine to use.
392   */
393  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
394      CellComparator kvComparator) throws IOException {
395    return StoreEngine.create(store, conf, comparator);
396  }
397
398  /**
399   * @param family
400   * @return TTL in seconds of the specified family
401   */
402  public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
403    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
404    long ttl = family.getTimeToLive();
405    if (ttl == HConstants.FOREVER) {
406      // Default is unlimited ttl.
407      ttl = Long.MAX_VALUE;
408    } else if (ttl == -1) {
409      ttl = Long.MAX_VALUE;
410    } else {
411      // Second -> ms adjust for user data
412      ttl *= 1000;
413    }
414    return ttl;
415  }
416
417  @Override
418  public String getColumnFamilyName() {
419    return this.family.getNameAsString();
420  }
421
422  @Override
423  public TableName getTableName() {
424    return this.getRegionInfo().getTable();
425  }
426
427  @Override
428  public FileSystem getFileSystem() {
429    return this.fs.getFileSystem();
430  }
431
432  public HRegionFileSystem getRegionFileSystem() {
433    return this.fs;
434  }
435
436  /* Implementation of StoreConfigInformation */
437  @Override
438  public long getStoreFileTtl() {
439    // TTL only applies if there's no MIN_VERSIONs setting on the column.
440    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
441  }
442
443  @Override
444  public long getMemStoreFlushSize() {
445    // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
446    return this.region.memstoreFlushSize;
447  }
448
449  @Override
450  public MemStoreSize getFlushableSize() {
451    return this.memstore.getFlushableSize();
452  }
453
454  @Override
455  public MemStoreSize getSnapshotSize() {
456    return this.memstore.getSnapshotSize();
457  }
458
459  @Override
460  public long getCompactionCheckMultiplier() {
461    return this.compactionCheckMultiplier;
462  }
463
464  @Override
465  public long getBlockingFileCount() {
466    return blockingFileCount;
467  }
468  /* End implementation of StoreConfigInformation */
469
470  /**
471   * Returns the configured bytesPerChecksum value.
472   * @param conf The configuration
473   * @return The bytesPerChecksum that is set in the configuration
474   */
475  public static int getBytesPerChecksum(Configuration conf) {
476    return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
477                       HFile.DEFAULT_BYTES_PER_CHECKSUM);
478  }
479
480  /**
481   * Returns the configured checksum algorithm.
482   * @param conf The configuration
483   * @return The checksum algorithm that is set in the configuration
484   */
485  public static ChecksumType getChecksumType(Configuration conf) {
486    String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
487    if (checksumName == null) {
488      return ChecksumType.getDefaultChecksumType();
489    } else {
490      return ChecksumType.nameToType(checksumName);
491    }
492  }
493
494  /**
495   * @return how many bytes to write between status checks
496   */
497  public static int getCloseCheckInterval() {
498    return closeCheckInterval;
499  }
500
501  @Override
502  public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
503    return this.family;
504  }
505
506  @Override
507  public OptionalLong getMaxSequenceId() {
508    return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());
509  }
510
511  @Override
512  public OptionalLong getMaxMemStoreTS() {
513    return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());
514  }
515
516  /**
517   * @param tabledir {@link Path} to where the table is being stored
518   * @param hri {@link RegionInfo} for the region.
519   * @param family {@link ColumnFamilyDescriptor} describing the column family
520   * @return Path to family/Store home directory.
521   */
522  @Deprecated
523  public static Path getStoreHomedir(final Path tabledir,
524      final RegionInfo hri, final byte[] family) {
525    return getStoreHomedir(tabledir, hri.getEncodedName(), family);
526  }
527
528  /**
529   * @param tabledir {@link Path} to where the table is being stored
530   * @param encodedName Encoded region name.
531   * @param family {@link ColumnFamilyDescriptor} describing the column family
532   * @return Path to family/Store home directory.
533   */
534  @Deprecated
535  public static Path getStoreHomedir(final Path tabledir,
536      final String encodedName, final byte[] family) {
537    return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
538  }
539
540  /**
541   * @return the data block encoder
542   */
543  public HFileDataBlockEncoder getDataBlockEncoder() {
544    return dataBlockEncoder;
545  }
546
547  /**
548   * Should be used only in tests.
549   * @param blockEncoder the block delta encoder to use
550   */
551  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
552    this.dataBlockEncoder = blockEncoder;
553  }
554
555  /**
556   * Creates an unsorted list of StoreFile loaded in parallel
557   * from the given directory.
558   * @throws IOException
559   */
560  private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
561    Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
562    return openStoreFiles(files, warmup);
563  }
564
565  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
566      throws IOException {
567    if (CollectionUtils.isEmpty(files)) {
568      return Collections.emptyList();
569    }
570    // initialize the thread pool for opening store files in parallel..
571    ThreadPoolExecutor storeFileOpenerThreadPool =
572      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-"
573        + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
574    CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool);
575
576    int totalValidStoreFile = 0;
577    for (StoreFileInfo storeFileInfo : files) {
578      // open each store file in parallel
579      completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
580      totalValidStoreFile++;
581    }
582
583    Set<String> compactedStoreFiles = new HashSet<>();
584    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
585    IOException ioe = null;
586    try {
587      for (int i = 0; i < totalValidStoreFile; i++) {
588        try {
589          HStoreFile storeFile = completionService.take().get();
590          if (storeFile != null) {
591            LOG.debug("loaded {}", storeFile);
592            results.add(storeFile);
593            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
594          }
595        } catch (InterruptedException e) {
596          if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
597        } catch (ExecutionException e) {
598          if (ioe == null) ioe = new IOException(e.getCause());
599        }
600      }
601    } finally {
602      storeFileOpenerThreadPool.shutdownNow();
603    }
604    if (ioe != null) {
605      // close StoreFile readers
606      boolean evictOnClose =
607          cacheConf != null? cacheConf.shouldEvictOnClose(): true;
608      for (HStoreFile file : results) {
609        try {
610          if (file != null) {
611            file.closeStoreFile(evictOnClose);
612          }
613        } catch (IOException e) {
614          LOG.warn("Could not close store file", e);
615        }
616      }
617      throw ioe;
618    }
619
620    // Should not archive the compacted store files when region warmup. See HBASE-22163.
621    if (!warmup) {
622      // Remove the compacted files from result
623      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
624      for (HStoreFile storeFile : results) {
625        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
626          LOG.warn("Clearing the compacted storefile {} from this store", storeFile);
627          storeFile.getReader().close(true);
628          filesToRemove.add(storeFile);
629        }
630      }
631      results.removeAll(filesToRemove);
632      if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
633        LOG.debug("Moving the files {} to archive", filesToRemove);
634        this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
635      }
636    }
637
638    return results;
639  }
640
641  @Override
642  public void refreshStoreFiles() throws IOException {
643    Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
644    refreshStoreFilesInternal(newFiles);
645  }
646
647  /**
648   * Replaces the store files that the store has with the given files. Mainly used by secondary
649   * region replicas to keep up to date with the primary region files.
650   * @throws IOException
651   */
652  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
653    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
654    for (String file : newFiles) {
655      storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
656    }
657    refreshStoreFilesInternal(storeFiles);
658  }
659
660  /**
661   * Checks the underlying store files, and opens the files that  have not
662   * been opened, and removes the store file readers for store files no longer
663   * available. Mainly used by secondary region replicas to keep up to date with
664   * the primary region files.
665   * @throws IOException
666   */
667  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
668    StoreFileManager sfm = storeEngine.getStoreFileManager();
669    Collection<HStoreFile> currentFiles = sfm.getStorefiles();
670    Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles();
671    if (currentFiles == null) currentFiles = Collections.emptySet();
672    if (newFiles == null) newFiles = Collections.emptySet();
673    if (compactedFiles == null) compactedFiles = Collections.emptySet();
674
675    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
676    for (HStoreFile sf : currentFiles) {
677      currentFilesSet.put(sf.getFileInfo(), sf);
678    }
679    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
680    for (HStoreFile sf : compactedFiles) {
681      compactedFilesSet.put(sf.getFileInfo(), sf);
682    }
683
684    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
685    // Exclude the files that have already been compacted
686    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
687    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
688    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
689
690    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
691      return;
692    }
693
694    LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
695      + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
696
697    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
698    for (StoreFileInfo sfi : toBeRemovedFiles) {
699      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
700    }
701
702    // try to open the files
703    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
704
705    // propogate the file changes to the underlying store file manager
706    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
707
708    // Advance the memstore read point to be at least the new store files seqIds so that
709    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
710    // in-flight transactions might be made visible)
711    if (!toBeAddedFiles.isEmpty()) {
712      // we must have the max sequence id here as we do have several store files
713      region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong());
714    }
715
716    completeCompaction(toBeRemovedStoreFiles);
717  }
718
719  @VisibleForTesting
720  protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
721    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
722        p, isPrimaryReplicaStore());
723    return createStoreFileAndReader(info);
724  }
725
726  private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
727    info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
728    HStoreFile storeFile = new HStoreFile(info, this.family.getBloomFilterType(), this.cacheConf);
729    storeFile.initReader();
730    return storeFile;
731  }
732
733  /**
734   * This message intends to inform the MemStore that next coming updates
735   * are going to be part of the replaying edits from WAL
736   */
737  public void startReplayingFromWAL(){
738    this.memstore.startReplayingFromWAL();
739  }
740
741  /**
742   * This message intends to inform the MemStore that the replaying edits from WAL
743   * are done
744   */
745  public void stopReplayingFromWAL(){
746    this.memstore.stopReplayingFromWAL();
747  }
748
749  /**
750   * Adds a value to the memstore
751   */
752  public void add(final Cell cell, MemStoreSizing memstoreSizing) {
753    lock.readLock().lock();
754    try {
755      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
756        LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " +
757          "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName());
758      }
759      this.memstore.add(cell, memstoreSizing);
760    } finally {
761      lock.readLock().unlock();
762      currentParallelPutCount.decrementAndGet();
763    }
764  }
765
766  /**
767   * Adds the specified value to the memstore
768   */
769  public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
770    lock.readLock().lock();
771    try {
772      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
773        LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " +
774            "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName());
775      }
776      memstore.add(cells, memstoreSizing);
777    } finally {
778      lock.readLock().unlock();
779      currentParallelPutCount.decrementAndGet();
780    }
781  }
782
783  @Override
784  public long timeOfOldestEdit() {
785    return memstore.timeOfOldestEdit();
786  }
787
788  /**
789   * @return All store files.
790   */
791  @Override
792  public Collection<HStoreFile> getStorefiles() {
793    return this.storeEngine.getStoreFileManager().getStorefiles();
794  }
795
796  @Override
797  public Collection<HStoreFile> getCompactedFiles() {
798    return this.storeEngine.getStoreFileManager().getCompactedfiles();
799  }
800
801  /**
802   * This throws a WrongRegionException if the HFile does not fit in this region, or an
803   * InvalidHFileException if the HFile is not valid.
804   */
805  public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
806    HFile.Reader reader  = null;
807    try {
808      LOG.info("Validating hfile at " + srcPath + " for inclusion in "
809          + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
810      FileSystem srcFs = srcPath.getFileSystem(conf);
811      srcFs.access(srcPath, FsAction.READ_WRITE);
812      reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf);
813
814      Optional<byte[]> firstKey = reader.getFirstRowKey();
815      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
816      Optional<Cell> lk = reader.getLastKey();
817      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
818      byte[] lastKey =  CellUtil.cloneRow(lk.get());
819
820      if (LOG.isDebugEnabled()) {
821        LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) +
822            " last=" + Bytes.toStringBinary(lastKey));
823        LOG.debug("Region bounds: first=" +
824            Bytes.toStringBinary(getRegionInfo().getStartKey()) +
825            " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
826      }
827
828      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
829        throw new WrongRegionException(
830            "Bulk load file " + srcPath.toString() + " does not fit inside region "
831            + this.getRegionInfo().getRegionNameAsString());
832      }
833
834      if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
835          HConstants.DEFAULT_MAX_FILE_SIZE)) {
836        LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " +
837            reader.length() + " bytes can be problematic as it may lead to oversplitting.");
838      }
839
840      if (verifyBulkLoads) {
841        long verificationStartTime = EnvironmentEdgeManager.currentTime();
842        LOG.info("Full verification started for bulk load hfile: {}", srcPath);
843        Cell prevCell = null;
844        HFileScanner scanner = reader.getScanner(false, false, false);
845        scanner.seekTo();
846        do {
847          Cell cell = scanner.getCell();
848          if (prevCell != null) {
849            if (comparator.compareRows(prevCell, cell) > 0) {
850              throw new InvalidHFileException("Previous row is greater than"
851                  + " current row: path=" + srcPath + " previous="
852                  + CellUtil.getCellKeyAsString(prevCell) + " current="
853                  + CellUtil.getCellKeyAsString(cell));
854            }
855            if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {
856              throw new InvalidHFileException("Previous key had different"
857                  + " family compared to current key: path=" + srcPath
858                  + " previous="
859                  + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
860                      prevCell.getFamilyLength())
861                  + " current="
862                  + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
863                      cell.getFamilyLength()));
864            }
865          }
866          prevCell = cell;
867        } while (scanner.next());
868      LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
869         + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
870         + " ms");
871      }
872    } finally {
873      if (reader != null) reader.close();
874    }
875  }
876
877  /**
878   * This method should only be called from Region. It is assumed that the ranges of values in the
879   * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
880   *
881   * @param srcPathStr
882   * @param seqNum sequence Id associated with the HFile
883   */
884  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
885    Path srcPath = new Path(srcPathStr);
886    return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
887  }
888
889  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
890    Path srcPath = new Path(srcPathStr);
891    try {
892      fs.commitStoreFile(srcPath, dstPath);
893    } finally {
894      if (this.getCoprocessorHost() != null) {
895        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
896      }
897    }
898
899    LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
900        + dstPath + " - updating store file list.");
901
902    HStoreFile sf = createStoreFileAndReader(dstPath);
903    bulkLoadHFile(sf);
904
905    LOG.info("Successfully loaded store file {} into store {} (new location: {})",
906        srcPath, this, dstPath);
907
908    return dstPath;
909  }
910
911  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
912    HStoreFile sf = createStoreFileAndReader(fileInfo);
913    bulkLoadHFile(sf);
914  }
915
916  private void bulkLoadHFile(HStoreFile sf) throws IOException {
917    StoreFileReader r = sf.getReader();
918    this.storeSize.addAndGet(r.length());
919    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
920
921    // Append the new storefile into the list
922    this.lock.writeLock().lock();
923    try {
924      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
925    } finally {
926      // We need the lock, as long as we are updating the storeFiles
927      // or changing the memstore. Let us release it before calling
928      // notifyChangeReadersObservers. See HBASE-4485 for a possible
929      // deadlock scenario that could have happened if continue to hold
930      // the lock.
931      this.lock.writeLock().unlock();
932    }
933    LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
934    if (LOG.isTraceEnabled()) {
935      String traceMessage = "BULK LOAD time,size,store size,store files ["
936          + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
937          + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
938      LOG.trace(traceMessage);
939    }
940  }
941
942  /**
943   * Close all the readers We don't need to worry about subsequent requests because the Region holds
944   * a write lock that will prevent any more reads or writes.
945   * @return the {@link StoreFile StoreFiles} that were previously being used.
946   * @throws IOException on failure
947   */
948  public ImmutableCollection<HStoreFile> close() throws IOException {
949    this.archiveLock.lock();
950    this.lock.writeLock().lock();
951    try {
952      // Clear so metrics doesn't find them.
953      ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
954      Collection<HStoreFile> compactedfiles =
955          storeEngine.getStoreFileManager().clearCompactedFiles();
956      // clear the compacted files
957      if (CollectionUtils.isNotEmpty(compactedfiles)) {
958        removeCompactedfiles(compactedfiles);
959      }
960      if (!result.isEmpty()) {
961        // initialize the thread pool for closing store files in parallel.
962        ThreadPoolExecutor storeFileCloserThreadPool = this.region
963            .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
964              + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
965
966        // close each store file in parallel
967        CompletionService<Void> completionService =
968          new ExecutorCompletionService<>(storeFileCloserThreadPool);
969        for (HStoreFile f : result) {
970          completionService.submit(new Callable<Void>() {
971            @Override
972            public Void call() throws IOException {
973              boolean evictOnClose =
974                  cacheConf != null? cacheConf.shouldEvictOnClose(): true;
975              f.closeStoreFile(evictOnClose);
976              return null;
977            }
978          });
979        }
980
981        IOException ioe = null;
982        try {
983          for (int i = 0; i < result.size(); i++) {
984            try {
985              Future<Void> future = completionService.take();
986              future.get();
987            } catch (InterruptedException e) {
988              if (ioe == null) {
989                ioe = new InterruptedIOException();
990                ioe.initCause(e);
991              }
992            } catch (ExecutionException e) {
993              if (ioe == null) ioe = new IOException(e.getCause());
994            }
995          }
996        } finally {
997          storeFileCloserThreadPool.shutdownNow();
998        }
999        if (ioe != null) throw ioe;
1000      }
1001      LOG.trace("Closed {}", this);
1002      return result;
1003    } finally {
1004      this.lock.writeLock().unlock();
1005      this.archiveLock.unlock();
1006    }
1007  }
1008
1009  /**
1010   * Snapshot this stores memstore. Call before running
1011   * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController,
1012   * FlushLifeCycleTracker)}
1013   *  so it has some work to do.
1014   */
1015  void snapshot() {
1016    this.lock.writeLock().lock();
1017    try {
1018      this.memstore.snapshot();
1019    } finally {
1020      this.lock.writeLock().unlock();
1021    }
1022  }
1023
1024  /**
1025   * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
1026   * @param logCacheFlushId flush sequence number
1027   * @param snapshot
1028   * @param status
1029   * @param throughputController
1030   * @return The path name of the tmp file to which the store was flushed
1031   * @throws IOException if exception occurs during process
1032   */
1033  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
1034      MonitoredTask status, ThroughputController throughputController,
1035      FlushLifeCycleTracker tracker) throws IOException {
1036    // If an exception happens flushing, we let it out without clearing
1037    // the memstore snapshot.  The old snapshot will be returned when we say
1038    // 'snapshot', the next time flush comes around.
1039    // Retry after catching exception when flushing, otherwise server will abort
1040    // itself
1041    StoreFlusher flusher = storeEngine.getStoreFlusher();
1042    IOException lastException = null;
1043    for (int i = 0; i < flushRetriesNumber; i++) {
1044      try {
1045        List<Path> pathNames =
1046            flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
1047        Path lastPathName = null;
1048        try {
1049          for (Path pathName : pathNames) {
1050            lastPathName = pathName;
1051            validateStoreFile(pathName);
1052          }
1053          return pathNames;
1054        } catch (Exception e) {
1055          LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
1056          if (e instanceof IOException) {
1057            lastException = (IOException) e;
1058          } else {
1059            lastException = new IOException(e);
1060          }
1061        }
1062      } catch (IOException e) {
1063        LOG.warn("Failed flushing store file, retrying num={}", i, e);
1064        lastException = e;
1065      }
1066      if (lastException != null && i < (flushRetriesNumber - 1)) {
1067        try {
1068          Thread.sleep(pauseTime);
1069        } catch (InterruptedException e) {
1070          IOException iie = new InterruptedIOException();
1071          iie.initCause(e);
1072          throw iie;
1073        }
1074      }
1075    }
1076    throw lastException;
1077  }
1078
1079  /**
1080   * @param path The pathname of the tmp file into which the store was flushed
1081   * @param logCacheFlushId
1082   * @param status
1083   * @return store file created.
1084   * @throws IOException
1085   */
1086  private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status)
1087      throws IOException {
1088    // Write-out finished successfully, move into the right spot
1089    Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
1090
1091    status.setStatus("Flushing " + this + ": reopening flushed file");
1092    HStoreFile sf = createStoreFileAndReader(dstPath);
1093
1094    StoreFileReader r = sf.getReader();
1095    this.storeSize.addAndGet(r.length());
1096    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1097
1098    if (LOG.isInfoEnabled()) {
1099      LOG.info("Added " + sf + ", entries=" + r.getEntries() +
1100        ", sequenceid=" + logCacheFlushId +
1101        ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
1102    }
1103    return sf;
1104  }
1105
1106  /**
1107   * @param maxKeyCount
1108   * @param compression Compression algorithm to use
1109   * @param isCompaction whether we are creating a new file in a compaction
1110   * @param includeMVCCReadpoint - whether to include MVCC or not
1111   * @param includesTag - includesTag or not
1112   * @return Writer for a new StoreFile in the tmp dir.
1113   */
1114  // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
1115  // compaction
1116  public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
1117      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
1118      boolean shouldDropBehind) throws IOException {
1119    final CacheConfig writerCacheConf;
1120    if (isCompaction) {
1121      // Don't cache data on write on compactions.
1122      writerCacheConf = new CacheConfig(cacheConf);
1123      writerCacheConf.setCacheDataOnWrite(false);
1124    } else {
1125      writerCacheConf = cacheConf;
1126    }
1127    InetSocketAddress[] favoredNodes = null;
1128    if (region.getRegionServerServices() != null) {
1129      favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1130          region.getRegionInfo().getEncodedName());
1131    }
1132    HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1133      cryptoContext);
1134    Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
1135    StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
1136        this.getFileSystem())
1137            .withOutputDir(familyTempDir)
1138            .withComparator(comparator)
1139            .withBloomType(family.getBloomFilterType())
1140            .withMaxKeyCount(maxKeyCount)
1141            .withFavoredNodes(favoredNodes)
1142            .withFileContext(hFileContext)
1143            .withShouldDropCacheBehind(shouldDropBehind)
1144            .withCompactedFilesSupplier(this::getCompactedFiles);
1145    return builder.build();
1146  }
1147
1148  private HFileContext createFileContext(Compression.Algorithm compression,
1149      boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1150    if (compression == null) {
1151      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1152    }
1153    HFileContext hFileContext = new HFileContextBuilder()
1154                                .withIncludesMvcc(includeMVCCReadpoint)
1155                                .withIncludesTags(includesTag)
1156                                .withCompression(compression)
1157                                .withCompressTags(family.isCompressTags())
1158                                .withChecksumType(checksumType)
1159                                .withBytesPerCheckSum(bytesPerChecksum)
1160                                .withBlockSize(blocksize)
1161                                .withHBaseCheckSum(true)
1162                                .withDataBlockEncoding(family.getDataBlockEncoding())
1163                                .withEncryptionContext(cryptoContext)
1164                                .withCreateTime(EnvironmentEdgeManager.currentTime())
1165                                .withColumnFamily(family.getName())
1166                                .withTableName(region.getTableDescriptor()
1167                                    .getTableName().getName())
1168                                .build();
1169    return hFileContext;
1170  }
1171
1172
1173  private long getTotalSize(Collection<HStoreFile> sfs) {
1174    return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
1175  }
1176
1177  /**
1178   * Change storeFiles adding into place the Reader produced by this new flush.
1179   * @param sfs Store files
1180   * @param snapshotId
1181   * @throws IOException
1182   * @return Whether compaction is required.
1183   */
1184  private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException {
1185    this.lock.writeLock().lock();
1186    try {
1187      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1188      if (snapshotId > 0) {
1189        this.memstore.clearSnapshot(snapshotId);
1190      }
1191    } finally {
1192      // We need the lock, as long as we are updating the storeFiles
1193      // or changing the memstore. Let us release it before calling
1194      // notifyChangeReadersObservers. See HBASE-4485 for a possible
1195      // deadlock scenario that could have happened if continue to hold
1196      // the lock.
1197      this.lock.writeLock().unlock();
1198    }
1199    // notify to be called here - only in case of flushes
1200    notifyChangedReadersObservers(sfs);
1201    if (LOG.isTraceEnabled()) {
1202      long totalSize = getTotalSize(sfs);
1203      String traceMessage = "FLUSH time,count,size,store size,store files ["
1204          + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1205          + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1206      LOG.trace(traceMessage);
1207    }
1208    return needsCompaction();
1209  }
1210
1211  /**
1212   * Notify all observers that set of Readers has changed.
1213   * @throws IOException
1214   */
1215  private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
1216    for (ChangedReadersObserver o : this.changedReaderObservers) {
1217      List<KeyValueScanner> memStoreScanners;
1218      this.lock.readLock().lock();
1219      try {
1220        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
1221      } finally {
1222        this.lock.readLock().unlock();
1223      }
1224      o.updateReaders(sfs, memStoreScanners);
1225    }
1226  }
1227
1228  /**
1229   * Get all scanners with no filtering based on TTL (that happens further down the line).
1230   * @param cacheBlocks cache the blocks or not
1231   * @param usePread true to use pread, false if not
1232   * @param isCompaction true if the scanner is created for compaction
1233   * @param matcher the scan query matcher
1234   * @param startRow the start row
1235   * @param stopRow the stop row
1236   * @param readPt the read point of the current scan
1237   * @return all scanners for this store
1238   */
1239  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
1240      boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)
1241      throws IOException {
1242    return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
1243      readPt);
1244  }
1245
1246  /**
1247   * Get all scanners with no filtering based on TTL (that happens further down the line).
1248   * @param cacheBlocks cache the blocks or not
1249   * @param usePread true to use pread, false if not
1250   * @param isCompaction true if the scanner is created for compaction
1251   * @param matcher the scan query matcher
1252   * @param startRow the start row
1253   * @param includeStartRow true to include start row, false if not
1254   * @param stopRow the stop row
1255   * @param includeStopRow true to include stop row, false if not
1256   * @param readPt the read point of the current scan
1257   * @return all scanners for this store
1258   */
1259  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
1260      boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
1261      byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
1262    Collection<HStoreFile> storeFilesToScan;
1263    List<KeyValueScanner> memStoreScanners;
1264    this.lock.readLock().lock();
1265    try {
1266      storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
1267        includeStartRow, stopRow, includeStopRow);
1268      memStoreScanners = this.memstore.getScanners(readPt);
1269    } finally {
1270      this.lock.readLock().unlock();
1271    }
1272
1273    try {
1274      // First the store file scanners
1275
1276      // TODO this used to get the store files in descending order,
1277      // but now we get them in ascending order, which I think is
1278      // actually more correct, since memstore get put at the end.
1279      List<StoreFileScanner> sfScanners = StoreFileScanner
1280        .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false,
1281          matcher, readPt);
1282      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1283      scanners.addAll(sfScanners);
1284      // Then the memstore scanners
1285      scanners.addAll(memStoreScanners);
1286      return scanners;
1287    } catch (Throwable t) {
1288      clearAndClose(memStoreScanners);
1289      throw t instanceof IOException ? (IOException) t : new IOException(t);
1290    }
1291  }
1292
1293  private static void clearAndClose(List<KeyValueScanner> scanners) {
1294    if (scanners == null) {
1295      return;
1296    }
1297    for (KeyValueScanner s : scanners) {
1298      s.close();
1299    }
1300    scanners.clear();
1301  }
1302
1303  /**
1304   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1305   * (that happens further down the line).
1306   * @param files the list of files on which the scanners has to be created
1307   * @param cacheBlocks cache the blocks or not
1308   * @param usePread true to use pread, false if not
1309   * @param isCompaction true if the scanner is created for compaction
1310   * @param matcher the scan query matcher
1311   * @param startRow the start row
1312   * @param stopRow the stop row
1313   * @param readPt the read point of the current scan
1314   * @param includeMemstoreScanner true if memstore has to be included
1315   * @return scanners on the given files and on the memstore if specified
1316   */
1317  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1318      boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1319      byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner)
1320      throws IOException {
1321    return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
1322      false, readPt, includeMemstoreScanner);
1323  }
1324
1325  /**
1326   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1327   * (that happens further down the line).
1328   * @param files the list of files on which the scanners has to be created
1329   * @param cacheBlocks ache the blocks or not
1330   * @param usePread true to use pread, false if not
1331   * @param isCompaction true if the scanner is created for compaction
1332   * @param matcher the scan query matcher
1333   * @param startRow the start row
1334   * @param includeStartRow true to include start row, false if not
1335   * @param stopRow the stop row
1336   * @param includeStopRow true to include stop row, false if not
1337   * @param readPt the read point of the current scan
1338   * @param includeMemstoreScanner true if memstore has to be included
1339   * @return scanners on the given files and on the memstore if specified
1340   */
1341  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1342      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1343      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1344      boolean includeMemstoreScanner) throws IOException {
1345    List<KeyValueScanner> memStoreScanners = null;
1346    if (includeMemstoreScanner) {
1347      this.lock.readLock().lock();
1348      try {
1349        memStoreScanners = this.memstore.getScanners(readPt);
1350      } finally {
1351        this.lock.readLock().unlock();
1352      }
1353    }
1354    try {
1355      List<StoreFileScanner> sfScanners = StoreFileScanner
1356        .getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher,
1357          readPt);
1358      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1359      scanners.addAll(sfScanners);
1360      // Then the memstore scanners
1361      if (memStoreScanners != null) {
1362        scanners.addAll(memStoreScanners);
1363      }
1364      return scanners;
1365    } catch (Throwable t) {
1366      clearAndClose(memStoreScanners);
1367      throw t instanceof IOException ? (IOException) t : new IOException(t);
1368    }
1369  }
1370
1371  /**
1372   * @param o Observer who wants to know about changes in set of Readers
1373   */
1374  public void addChangedReaderObserver(ChangedReadersObserver o) {
1375    this.changedReaderObservers.add(o);
1376  }
1377
1378  /**
1379   * @param o Observer no longer interested in changes in set of Readers.
1380   */
1381  public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1382    // We don't check if observer present; it may not be (legitimately)
1383    this.changedReaderObservers.remove(o);
1384  }
1385
1386  //////////////////////////////////////////////////////////////////////////////
1387  // Compaction
1388  //////////////////////////////////////////////////////////////////////////////
1389
1390  /**
1391   * Compact the StoreFiles.  This method may take some time, so the calling
1392   * thread must be able to block for long periods.
1393   *
1394   * <p>During this time, the Store can work as usual, getting values from
1395   * StoreFiles and writing new StoreFiles from the memstore.
1396   *
1397   * Existing StoreFiles are not destroyed until the new compacted StoreFile is
1398   * completely written-out to disk.
1399   *
1400   * <p>The compactLock prevents multiple simultaneous compactions.
1401   * The structureLock prevents us from interfering with other write operations.
1402   *
1403   * <p>We don't want to hold the structureLock for the whole time, as a compact()
1404   * can be lengthy and we want to allow cache-flushes during this period.
1405   *
1406   * <p> Compaction event should be idempotent, since there is no IO Fencing for
1407   * the region directory in hdfs. A region server might still try to complete the
1408   * compaction after it lost the region. That is why the following events are carefully
1409   * ordered for a compaction:
1410   *  1. Compaction writes new files under region/.tmp directory (compaction output)
1411   *  2. Compaction atomically moves the temporary file under region directory
1412   *  3. Compaction appends a WAL edit containing the compaction input and output files.
1413   *  Forces sync on WAL.
1414   *  4. Compaction deletes the input files from the region directory.
1415   *
1416   * Failure conditions are handled like this:
1417   *  - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
1418   *  the compaction later, it will only write the new data file to the region directory.
1419   *  Since we already have this data, this will be idempotent but we will have a redundant
1420   *  copy of the data.
1421   *  - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
1422   *  RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
1423   *  - If RS fails after 3, the region region server who opens the region will pick up the
1424   *  the compaction marker from the WAL and replay it by removing the compaction input files.
1425   *  Failed RS can also attempt to delete those files, but the operation will be idempotent
1426   *
1427   * See HBASE-2231 for details.
1428   *
1429   * @param compaction compaction details obtained from requestCompaction()
1430   * @throws IOException
1431   * @return Storefile we compacted into or null if we failed or opted out early.
1432   */
1433  public List<HStoreFile> compact(CompactionContext compaction,
1434    ThroughputController throughputController, User user) throws IOException {
1435    assert compaction != null;
1436    CompactionRequestImpl cr = compaction.getRequest();
1437    try {
1438      // Do all sanity checking in here if we have a valid CompactionRequestImpl
1439      // because we need to clean up after it on the way out in a finally
1440      // block below
1441      long compactionStartTime = EnvironmentEdgeManager.currentTime();
1442      assert compaction.hasSelection();
1443      Collection<HStoreFile> filesToCompact = cr.getFiles();
1444      assert !filesToCompact.isEmpty();
1445      synchronized (filesCompacting) {
1446        // sanity check: we're compacting files that this store knows about
1447        // TODO: change this to LOG.error() after more debugging
1448        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1449      }
1450
1451      // Ready to go. Have list of files to compact.
1452      LOG.info("Starting compaction of " + filesToCompact +
1453        " into tmpdir=" + fs.getTempDir() + ", totalSize=" +
1454          TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1455
1456      return doCompaction(cr, filesToCompact, user, compactionStartTime,
1457          compaction.compact(throughputController, user));
1458    } finally {
1459      finishCompactionRequest(cr);
1460    }
1461  }
1462
1463  @VisibleForTesting
1464  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
1465      Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
1466      List<Path> newFiles) throws IOException {
1467    // Do the steps necessary to complete the compaction.
1468    List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
1469    writeCompactionWalRecord(filesToCompact, sfs);
1470    replaceStoreFiles(filesToCompact, sfs);
1471    if (cr.isMajor()) {
1472      majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
1473      majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
1474    } else {
1475      compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
1476      compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
1477    }
1478    long outputBytes = getTotalSize(sfs);
1479
1480    // At this point the store will use new files for all new scanners.
1481    completeCompaction(filesToCompact); // update store size.
1482
1483    long now = EnvironmentEdgeManager.currentTime();
1484    if (region.getRegionServerServices() != null
1485        && region.getRegionServerServices().getMetrics() != null) {
1486      region.getRegionServerServices().getMetrics().updateCompaction(
1487          region.getTableDescriptor().getTableName().getNameAsString(),
1488          cr.isMajor(), now - compactionStartTime, cr.getFiles().size(),
1489          newFiles.size(), cr.getSize(), outputBytes);
1490
1491    }
1492
1493    logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1494    return sfs;
1495  }
1496
1497  private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
1498      User user) throws IOException {
1499    List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
1500    for (Path newFile : newFiles) {
1501      assert newFile != null;
1502      HStoreFile sf = moveFileIntoPlace(newFile);
1503      if (this.getCoprocessorHost() != null) {
1504        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
1505      }
1506      assert sf != null;
1507      sfs.add(sf);
1508    }
1509    return sfs;
1510  }
1511
1512  // Package-visible for tests
1513  HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
1514    validateStoreFile(newFile);
1515    // Move the file into the right spot
1516    Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1517    return createStoreFileAndReader(destPath);
1518  }
1519
1520  /**
1521   * Writes the compaction WAL record.
1522   * @param filesCompacted Files compacted (input).
1523   * @param newFiles Files from compaction.
1524   */
1525  private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
1526      Collection<HStoreFile> newFiles) throws IOException {
1527    if (region.getWAL() == null) {
1528      return;
1529    }
1530    List<Path> inputPaths =
1531        filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1532    List<Path> outputPaths =
1533        newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1534    RegionInfo info = this.region.getRegionInfo();
1535    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1536        family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
1537    // Fix reaching into Region to get the maxWaitForSeqId.
1538    // Does this method belong in Region altogether given it is making so many references up there?
1539    // Could be Region#writeCompactionMarker(compactionDescriptor);
1540    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1541        this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1542  }
1543
1544  @VisibleForTesting
1545  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
1546      throws IOException {
1547    this.lock.writeLock().lock();
1548    try {
1549      this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1550      synchronized (filesCompacting) {
1551        filesCompacting.removeAll(compactedFiles);
1552      }
1553
1554      // These may be null when the RS is shutting down. The space quota Chores will fix the Region
1555      // sizes later so it's not super-critical if we miss these.
1556      RegionServerServices rsServices = region.getRegionServerServices();
1557      if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
1558        updateSpaceQuotaAfterFileReplacement(
1559            rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1560            compactedFiles, result);
1561      }
1562    } finally {
1563      this.lock.writeLock().unlock();
1564    }
1565  }
1566
1567  /**
1568   * Updates the space quota usage for this region, removing the size for files compacted away
1569   * and adding in the size for new files.
1570   *
1571   * @param sizeStore The object tracking changes in region size for space quotas.
1572   * @param regionInfo The identifier for the region whose size is being updated.
1573   * @param oldFiles Files removed from this store's region.
1574   * @param newFiles Files added to this store's region.
1575   */
1576  void updateSpaceQuotaAfterFileReplacement(
1577      RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles,
1578      Collection<HStoreFile> newFiles) {
1579    long delta = 0;
1580    if (oldFiles != null) {
1581      for (HStoreFile compactedFile : oldFiles) {
1582        if (compactedFile.isHFile()) {
1583          delta -= compactedFile.getReader().length();
1584        }
1585      }
1586    }
1587    if (newFiles != null) {
1588      for (HStoreFile newFile : newFiles) {
1589        if (newFile.isHFile()) {
1590          delta += newFile.getReader().length();
1591        }
1592      }
1593    }
1594    sizeStore.incrementRegionSize(regionInfo, delta);
1595  }
1596
1597  /**
1598   * Log a very elaborate compaction completion message.
1599   * @param cr Request.
1600   * @param sfs Resulting files.
1601   * @param compactionStartTime Start time.
1602   */
1603  private void logCompactionEndMessage(
1604      CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
1605    StringBuilder message = new StringBuilder(
1606      "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1607      + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1608      + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
1609    if (sfs.isEmpty()) {
1610      message.append("none, ");
1611    } else {
1612      for (HStoreFile sf: sfs) {
1613        message.append(sf.getPath().getName());
1614        message.append("(size=");
1615        message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1616        message.append("), ");
1617      }
1618    }
1619    message.append("total size for store is ")
1620      .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
1621      .append(". This selection was in queue for ")
1622      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1623      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1624      .append(" to execute.");
1625    LOG.info(message.toString());
1626    if (LOG.isTraceEnabled()) {
1627      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1628      long resultSize = getTotalSize(sfs);
1629      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1630        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1631          + cr.getFiles().size() + "," + sfs.size() + "," +  storeSize + "," + fileCount + "]";
1632      LOG.trace(traceMessage);
1633    }
1634  }
1635
1636  /**
1637   * Call to complete a compaction. Its for the case where we find in the WAL a compaction
1638   * that was not finished.  We could find one recovering a WAL after a regionserver crash.
1639   * See HBASE-2231.
1640   * @param compaction
1641   */
1642  public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
1643      boolean removeFiles) throws IOException {
1644    LOG.debug("Completing compaction from the WAL marker");
1645    List<String> compactionInputs = compaction.getCompactionInputList();
1646    List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1647
1648    // The Compaction Marker is written after the compaction is completed,
1649    // and the files moved into the region/family folder.
1650    //
1651    // If we crash after the entry is written, we may not have removed the
1652    // input files, but the output file is present.
1653    // (The unremoved input files will be removed by this function)
1654    //
1655    // If we scan the directory and the file is not present, it can mean that:
1656    //   - The file was manually removed by the user
1657    //   - The file was removed as consequence of subsequent compaction
1658    // so, we can't do anything with the "compaction output list" because those
1659    // files have already been loaded when opening the region (by virtue of
1660    // being in the store's folder) or they may be missing due to a compaction.
1661
1662    String familyName = this.getColumnFamilyName();
1663    Set<String> inputFiles = new HashSet<>();
1664    for (String compactionInput : compactionInputs) {
1665      Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1666      inputFiles.add(inputPath.getName());
1667    }
1668
1669    //some of the input files might already be deleted
1670    List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
1671    for (HStoreFile sf : this.getStorefiles()) {
1672      if (inputFiles.contains(sf.getPath().getName())) {
1673        inputStoreFiles.add(sf);
1674      }
1675    }
1676
1677    // check whether we need to pick up the new files
1678    List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
1679
1680    if (pickCompactionFiles) {
1681      for (HStoreFile sf : this.getStorefiles()) {
1682        compactionOutputs.remove(sf.getPath().getName());
1683      }
1684      for (String compactionOutput : compactionOutputs) {
1685        StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1686        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1687        outputStoreFiles.add(storeFile);
1688      }
1689    }
1690
1691    if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1692      LOG.info("Replaying compaction marker, replacing input files: " +
1693          inputStoreFiles + " with output files : " + outputStoreFiles);
1694      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1695      this.completeCompaction(inputStoreFiles);
1696    }
1697  }
1698
1699  /**
1700   * This method tries to compact N recent files for testing.
1701   * Note that because compacting "recent" files only makes sense for some policies,
1702   * e.g. the default one, it assumes default policy is used. It doesn't use policy,
1703   * but instead makes a compaction candidate list by itself.
1704   * @param N Number of files.
1705   */
1706  @VisibleForTesting
1707  public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1708    List<HStoreFile> filesToCompact;
1709    boolean isMajor;
1710
1711    this.lock.readLock().lock();
1712    try {
1713      synchronized (filesCompacting) {
1714        filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1715        if (!filesCompacting.isEmpty()) {
1716          // exclude all files older than the newest file we're currently
1717          // compacting. this allows us to preserve contiguity (HBASE-2856)
1718          HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1719          int idx = filesToCompact.indexOf(last);
1720          Preconditions.checkArgument(idx != -1);
1721          filesToCompact.subList(0, idx + 1).clear();
1722        }
1723        int count = filesToCompact.size();
1724        if (N > count) {
1725          throw new RuntimeException("Not enough files");
1726        }
1727
1728        filesToCompact = filesToCompact.subList(count - N, count);
1729        isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1730        filesCompacting.addAll(filesToCompact);
1731        Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1732            .getStoreFileComparator());
1733      }
1734    } finally {
1735      this.lock.readLock().unlock();
1736    }
1737
1738    try {
1739      // Ready to go. Have list of files to compact.
1740      List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1741          .compactForTesting(filesToCompact, isMajor);
1742      for (Path newFile: newFiles) {
1743        // Move the compaction into place.
1744        HStoreFile sf = moveFileIntoPlace(newFile);
1745        if (this.getCoprocessorHost() != null) {
1746          this.getCoprocessorHost().postCompact(this, sf, null, null, null);
1747        }
1748        replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
1749        completeCompaction(filesToCompact);
1750      }
1751    } finally {
1752      synchronized (filesCompacting) {
1753        filesCompacting.removeAll(filesToCompact);
1754      }
1755    }
1756  }
1757
1758  @Override
1759  public boolean hasReferences() {
1760    // Grab the read lock here, because we need to ensure that: only when the atomic
1761    // replaceStoreFiles(..) finished, we can get all the complete store file list.
1762    this.lock.readLock().lock();
1763    try {
1764      // Merge the current store files with compacted files here due to HBASE-20940.
1765      Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
1766      allStoreFiles.addAll(getCompactedFiles());
1767      return StoreUtils.hasReferences(allStoreFiles);
1768    } finally {
1769      this.lock.readLock().unlock();
1770    }
1771  }
1772
1773  /**
1774   * getter for CompactionProgress object
1775   * @return CompactionProgress object; can be null
1776   */
1777  public CompactionProgress getCompactionProgress() {
1778    return this.storeEngine.getCompactor().getProgress();
1779  }
1780
1781  @Override
1782  public boolean shouldPerformMajorCompaction() throws IOException {
1783    for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1784      // TODO: what are these reader checks all over the place?
1785      if (sf.getReader() == null) {
1786        LOG.debug("StoreFile {} has null Reader", sf);
1787        return false;
1788      }
1789    }
1790    return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1791        this.storeEngine.getStoreFileManager().getStorefiles());
1792  }
1793
1794  public Optional<CompactionContext> requestCompaction() throws IOException {
1795    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
1796  }
1797
1798  public Optional<CompactionContext> requestCompaction(int priority,
1799      CompactionLifeCycleTracker tracker, User user) throws IOException {
1800    // don't even select for compaction if writes are disabled
1801    if (!this.areWritesEnabled()) {
1802      return Optional.empty();
1803    }
1804    // Before we do compaction, try to get rid of unneeded files to simplify things.
1805    removeUnneededFiles();
1806
1807    final CompactionContext compaction = storeEngine.createCompaction();
1808    CompactionRequestImpl request = null;
1809    this.lock.readLock().lock();
1810    try {
1811      synchronized (filesCompacting) {
1812        // First, see if coprocessor would want to override selection.
1813        if (this.getCoprocessorHost() != null) {
1814          final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1815          boolean override = getCoprocessorHost().preCompactSelection(this,
1816              candidatesForCoproc, tracker, user);
1817          if (override) {
1818            // Coprocessor is overriding normal file selection.
1819            compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
1820          }
1821        }
1822
1823        // Normal case - coprocessor is not overriding file selection.
1824        if (!compaction.hasSelection()) {
1825          boolean isUserCompaction = priority == Store.PRIORITY_USER;
1826          boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1827              offPeakCompactionTracker.compareAndSet(false, true);
1828          try {
1829            compaction.select(this.filesCompacting, isUserCompaction,
1830              mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1831          } catch (IOException e) {
1832            if (mayUseOffPeak) {
1833              offPeakCompactionTracker.set(false);
1834            }
1835            throw e;
1836          }
1837          assert compaction.hasSelection();
1838          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1839            // Compaction policy doesn't want to take advantage of off-peak.
1840            offPeakCompactionTracker.set(false);
1841          }
1842        }
1843        if (this.getCoprocessorHost() != null) {
1844          this.getCoprocessorHost().postCompactSelection(
1845              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
1846              compaction.getRequest(), user);
1847        }
1848        // Finally, we have the resulting files list. Check if we have any files at all.
1849        request = compaction.getRequest();
1850        Collection<HStoreFile> selectedFiles = request.getFiles();
1851        if (selectedFiles.isEmpty()) {
1852          return Optional.empty();
1853        }
1854
1855        addToCompactingFiles(selectedFiles);
1856
1857        // If we're enqueuing a major, clear the force flag.
1858        this.forceMajor = this.forceMajor && !request.isMajor();
1859
1860        // Set common request properties.
1861        // Set priority, either override value supplied by caller or from store.
1862        request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1863        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1864        request.setTracker(tracker);
1865      }
1866    } finally {
1867      this.lock.readLock().unlock();
1868    }
1869
1870    if (LOG.isDebugEnabled()) {
1871      LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1872          + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1873          + (request.isAllFiles() ? " (all files)" : ""));
1874    }
1875    this.region.reportCompactionRequestStart(request.isMajor());
1876    return Optional.of(compaction);
1877  }
1878
1879  /** Adds the files to compacting files. filesCompacting must be locked. */
1880  private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
1881    if (CollectionUtils.isEmpty(filesToAdd)) {
1882      return;
1883    }
1884    // Check that we do not try to compact the same StoreFile twice.
1885    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1886      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1887    }
1888    filesCompacting.addAll(filesToAdd);
1889    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1890  }
1891
1892  private void removeUnneededFiles() throws IOException {
1893    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1894    if (getColumnFamilyDescriptor().getMinVersions() > 0) {
1895      LOG.debug("Skipping expired store file removal due to min version being {}",
1896          getColumnFamilyDescriptor().getMinVersions());
1897      return;
1898    }
1899    this.lock.readLock().lock();
1900    Collection<HStoreFile> delSfs = null;
1901    try {
1902      synchronized (filesCompacting) {
1903        long cfTtl = getStoreFileTtl();
1904        if (cfTtl != Long.MAX_VALUE) {
1905          delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1906              EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1907          addToCompactingFiles(delSfs);
1908        }
1909      }
1910    } finally {
1911      this.lock.readLock().unlock();
1912    }
1913
1914    if (CollectionUtils.isEmpty(delSfs)) {
1915      return;
1916    }
1917
1918    Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1919    writeCompactionWalRecord(delSfs, newFiles);
1920    replaceStoreFiles(delSfs, newFiles);
1921    completeCompaction(delSfs);
1922    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1923        + this + " of " + this.getRegionInfo().getRegionNameAsString()
1924        + "; total size for store is "
1925        + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
1926  }
1927
1928  public void cancelRequestedCompaction(CompactionContext compaction) {
1929    finishCompactionRequest(compaction.getRequest());
1930  }
1931
1932  private void finishCompactionRequest(CompactionRequestImpl cr) {
1933    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1934    if (cr.isOffPeak()) {
1935      offPeakCompactionTracker.set(false);
1936      cr.setOffPeak(false);
1937    }
1938    synchronized (filesCompacting) {
1939      filesCompacting.removeAll(cr.getFiles());
1940    }
1941  }
1942
1943  /**
1944   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
1945   * operation.
1946   * @param path the path to the store file
1947   */
1948  private void validateStoreFile(Path path) throws IOException {
1949    HStoreFile storeFile = null;
1950    try {
1951      storeFile = createStoreFileAndReader(path);
1952    } catch (IOException e) {
1953      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
1954      throw e;
1955    } finally {
1956      if (storeFile != null) {
1957        storeFile.closeStoreFile(false);
1958      }
1959    }
1960  }
1961
1962  /**
1963   * Update counts.
1964   * @param compactedFiles list of files that were compacted
1965   */
1966  @VisibleForTesting
1967  protected void completeCompaction(Collection<HStoreFile> compactedFiles)
1968  // Rename this method! TODO.
1969    throws IOException {
1970    this.storeSize.set(0L);
1971    this.totalUncompressedBytes.set(0L);
1972    for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1973      StoreFileReader r = hsf.getReader();
1974      if (r == null) {
1975        LOG.warn("StoreFile {} has a null Reader", hsf);
1976        continue;
1977      }
1978      this.storeSize.addAndGet(r.length());
1979      this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1980    }
1981  }
1982
1983  /*
1984   * @param wantedVersions How many versions were asked for.
1985   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1986   */
1987  int versionsToReturn(final int wantedVersions) {
1988    if (wantedVersions <= 0) {
1989      throw new IllegalArgumentException("Number of versions must be > 0");
1990    }
1991    // Make sure we do not return more than maximum versions for this store.
1992    int maxVersions = this.family.getMaxVersions();
1993    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1994  }
1995
1996  @Override
1997  public boolean canSplit() {
1998    this.lock.readLock().lock();
1999    try {
2000      // Not split-able if we find a reference store file present in the store.
2001      boolean result = !hasReferences();
2002      if (!result) {
2003        LOG.trace("Not splittable; has references: {}", this);
2004      }
2005      return result;
2006    } finally {
2007      this.lock.readLock().unlock();
2008    }
2009  }
2010
2011  /**
2012   * Determines if Store should be split.
2013   */
2014  public Optional<byte[]> getSplitPoint() {
2015    this.lock.readLock().lock();
2016    try {
2017      // Should already be enforced by the split policy!
2018      assert !this.getRegionInfo().isMetaRegion();
2019      // Not split-able if we find a reference store file present in the store.
2020      if (hasReferences()) {
2021        LOG.trace("Not splittable; has references: {}", this);
2022        return Optional.empty();
2023      }
2024      return this.storeEngine.getStoreFileManager().getSplitPoint();
2025    } catch(IOException e) {
2026      LOG.warn("Failed getting store size for {}", this, e);
2027    } finally {
2028      this.lock.readLock().unlock();
2029    }
2030    return Optional.empty();
2031  }
2032
2033  @Override
2034  public long getLastCompactSize() {
2035    return this.lastCompactSize;
2036  }
2037
2038  @Override
2039  public long getSize() {
2040    return storeSize.get();
2041  }
2042
2043  public void triggerMajorCompaction() {
2044    this.forceMajor = true;
2045  }
2046
2047  //////////////////////////////////////////////////////////////////////////////
2048  // File administration
2049  //////////////////////////////////////////////////////////////////////////////
2050
2051  /**
2052   * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
2053   * compaction.
2054   * @param scan Scan to apply when scanning the stores
2055   * @param targetCols columns to scan
2056   * @return a scanner over the current key values
2057   * @throws IOException on failure
2058   */
2059  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
2060      throws IOException {
2061    lock.readLock().lock();
2062    try {
2063      ScanInfo scanInfo;
2064      if (this.getCoprocessorHost() != null) {
2065        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
2066      } else {
2067        scanInfo = getScanInfo();
2068      }
2069      return createScanner(scan, scanInfo, targetCols, readPt);
2070    } finally {
2071      lock.readLock().unlock();
2072    }
2073  }
2074
2075  // HMobStore will override this method to return its own implementation.
2076  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
2077      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
2078    return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
2079        : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
2080  }
2081
2082  /**
2083   * Recreates the scanners on the current list of active store file scanners
2084   * @param currentFileScanners the current set of active store file scanners
2085   * @param cacheBlocks cache the blocks or not
2086   * @param usePread use pread or not
2087   * @param isCompaction is the scanner for compaction
2088   * @param matcher the scan query matcher
2089   * @param startRow the scan's start row
2090   * @param includeStartRow should the scan include the start row
2091   * @param stopRow the scan's stop row
2092   * @param includeStopRow should the scan include the stop row
2093   * @param readPt the read point of the current scane
2094   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
2095   * @return list of scanners recreated on the current Scanners
2096   * @throws IOException
2097   */
2098  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
2099      boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
2100      byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
2101      boolean includeMemstoreScanner) throws IOException {
2102    this.lock.readLock().lock();
2103    try {
2104      Map<String, HStoreFile> name2File =
2105          new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
2106      for (HStoreFile file : getStorefiles()) {
2107        name2File.put(file.getFileInfo().getActiveFileName(), file);
2108      }
2109      Collection<HStoreFile> compactedFiles = getCompactedFiles();
2110      for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
2111        name2File.put(file.getFileInfo().getActiveFileName(), file);
2112      }
2113      List<HStoreFile> filesToReopen = new ArrayList<>();
2114      for (KeyValueScanner kvs : currentFileScanners) {
2115        assert kvs.isFileScanner();
2116        if (kvs.peek() == null) {
2117          continue;
2118        }
2119        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
2120      }
2121      if (filesToReopen.isEmpty()) {
2122        return null;
2123      }
2124      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
2125        includeStartRow, stopRow, includeStopRow, readPt, false);
2126    } finally {
2127      this.lock.readLock().unlock();
2128    }
2129  }
2130
2131  @Override
2132  public String toString() {
2133    return this.getColumnFamilyName();
2134  }
2135
2136  @Override
2137  public int getStorefilesCount() {
2138    return this.storeEngine.getStoreFileManager().getStorefileCount();
2139  }
2140
2141  @Override
2142  public int getCompactedFilesCount() {
2143    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
2144  }
2145
2146  private LongStream getStoreFileAgeStream() {
2147    return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
2148      if (sf.getReader() == null) {
2149        LOG.warn("StoreFile {} has a null Reader", sf);
2150        return false;
2151      } else {
2152        return true;
2153      }
2154    }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())
2155        .map(t -> EnvironmentEdgeManager.currentTime() - t);
2156  }
2157
2158  @Override
2159  public OptionalLong getMaxStoreFileAge() {
2160    return getStoreFileAgeStream().max();
2161  }
2162
2163  @Override
2164  public OptionalLong getMinStoreFileAge() {
2165    return getStoreFileAgeStream().min();
2166  }
2167
2168  @Override
2169  public OptionalDouble getAvgStoreFileAge() {
2170    return getStoreFileAgeStream().average();
2171  }
2172
2173  @Override
2174  public long getNumReferenceFiles() {
2175    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2176        .filter(HStoreFile::isReference).count();
2177  }
2178
2179  @Override
2180  public long getNumHFiles() {
2181    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2182        .filter(HStoreFile::isHFile).count();
2183  }
2184
2185  @Override
2186  public long getStoreSizeUncompressed() {
2187    return this.totalUncompressedBytes.get();
2188  }
2189
2190  @Override
2191  public long getStorefilesSize() {
2192    // Include all StoreFiles
2193    return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true);
2194  }
2195
2196  @Override
2197  public long getHFilesSize() {
2198    // Include only StoreFiles which are HFiles
2199    return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
2200      HStoreFile::isHFile);
2201  }
2202
2203  private long getTotalUncompressedBytes(List<HStoreFile> files) {
2204    return files.stream()
2205      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
2206      .sum();
2207  }
2208
2209  private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) {
2210    return files.stream().filter(predicate)
2211      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
2212  }
2213
2214  private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
2215    if (file == null) {
2216      return 0L;
2217    }
2218    StoreFileReader reader = file.getReader();
2219    if (reader == null) {
2220      return 0L;
2221    }
2222    return f.applyAsLong(reader);
2223  }
2224
2225  private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
2226    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2227      .mapToLong(file -> getStorefileFieldSize(file, f)).sum();
2228  }
2229
2230  @Override
2231  public long getStorefilesRootLevelIndexSize() {
2232    return getStorefilesFieldSize(StoreFileReader::indexSize);
2233  }
2234
2235  @Override
2236  public long getTotalStaticIndexSize() {
2237    return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);
2238  }
2239
2240  @Override
2241  public long getTotalStaticBloomSize() {
2242    return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);
2243  }
2244
2245  @Override
2246  public MemStoreSize getMemStoreSize() {
2247    return this.memstore.size();
2248  }
2249
2250  @Override
2251  public int getCompactPriority() {
2252    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2253    if (priority == PRIORITY_USER) {
2254      LOG.warn("Compaction priority is USER despite there being no user compaction");
2255    }
2256    return priority;
2257  }
2258
2259  public boolean throttleCompaction(long compactionSize) {
2260    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2261  }
2262
2263  public HRegion getHRegion() {
2264    return this.region;
2265  }
2266
2267  public RegionCoprocessorHost getCoprocessorHost() {
2268    return this.region.getCoprocessorHost();
2269  }
2270
2271  @Override
2272  public RegionInfo getRegionInfo() {
2273    return this.fs.getRegionInfo();
2274  }
2275
2276  @Override
2277  public boolean areWritesEnabled() {
2278    return this.region.areWritesEnabled();
2279  }
2280
2281  @Override
2282  public long getSmallestReadPoint() {
2283    return this.region.getSmallestReadPoint();
2284  }
2285
2286  /**
2287   * Adds or replaces the specified KeyValues.
2288   * <p>
2289   * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
2290   * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
2291   * <p>
2292   * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
2293   * across all of them.
2294   * @param readpoint readpoint below which we can safely remove duplicate KVs
2295   * @throws IOException
2296   */
2297  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
2298      throws IOException {
2299    this.lock.readLock().lock();
2300    try {
2301      this.memstore.upsert(cells, readpoint, memstoreSizing);
2302    } finally {
2303      this.lock.readLock().unlock();
2304    }
2305  }
2306
2307  public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
2308    return new StoreFlusherImpl(cacheFlushId, tracker);
2309  }
2310
2311  private final class StoreFlusherImpl implements StoreFlushContext {
2312
2313    private final FlushLifeCycleTracker tracker;
2314    private final long cacheFlushSeqNum;
2315    private MemStoreSnapshot snapshot;
2316    private List<Path> tempFiles;
2317    private List<Path> committedFiles;
2318    private long cacheFlushCount;
2319    private long cacheFlushSize;
2320    private long outputFileSize;
2321
2322    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
2323      this.cacheFlushSeqNum = cacheFlushSeqNum;
2324      this.tracker = tracker;
2325    }
2326
2327    /**
2328     * This is not thread safe. The caller should have a lock on the region or the store.
2329     * If necessary, the lock can be added with the patch provided in HBASE-10087
2330     */
2331    @Override
2332    public MemStoreSize prepare() {
2333      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
2334      this.snapshot = memstore.snapshot();
2335      this.cacheFlushCount = snapshot.getCellsCount();
2336      this.cacheFlushSize = snapshot.getDataSize();
2337      committedFiles = new ArrayList<>(1);
2338      return snapshot.getMemStoreSize();
2339    }
2340
2341    @Override
2342    public void flushCache(MonitoredTask status) throws IOException {
2343      RegionServerServices rsService = region.getRegionServerServices();
2344      ThroughputController throughputController =
2345          rsService == null ? null : rsService.getFlushThroughputController();
2346      tempFiles =
2347          HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
2348    }
2349
2350    @Override
2351    public boolean commit(MonitoredTask status) throws IOException {
2352      if (CollectionUtils.isEmpty(this.tempFiles)) {
2353        return false;
2354      }
2355      List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
2356      for (Path storeFilePath : tempFiles) {
2357        try {
2358          HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2359          outputFileSize += sf.getReader().length();
2360          storeFiles.add(sf);
2361        } catch (IOException ex) {
2362          LOG.error("Failed to commit store file {}", storeFilePath, ex);
2363          // Try to delete the files we have committed before.
2364          for (HStoreFile sf : storeFiles) {
2365            Path pathToDelete = sf.getPath();
2366            try {
2367              sf.deleteStoreFile();
2368            } catch (IOException deleteEx) {
2369              LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
2370                  + "halting {}", pathToDelete, ex);
2371              Runtime.getRuntime().halt(1);
2372            }
2373          }
2374          throw new IOException("Failed to commit the flush", ex);
2375        }
2376      }
2377
2378      for (HStoreFile sf : storeFiles) {
2379        if (HStore.this.getCoprocessorHost() != null) {
2380          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
2381        }
2382        committedFiles.add(sf.getPath());
2383      }
2384
2385      HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
2386      HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
2387      HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
2388
2389      // Add new file to store files.  Clear snapshot too while we have the Store write lock.
2390      return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2391    }
2392
2393    @Override
2394    public long getOutputFileSize() {
2395      return outputFileSize;
2396    }
2397
2398    @Override
2399    public List<Path> getCommittedFiles() {
2400      return committedFiles;
2401    }
2402
2403    /**
2404     * Similar to commit, but called in secondary region replicas for replaying the
2405     * flush cache from primary region. Adds the new files to the store, and drops the
2406     * snapshot depending on dropMemstoreSnapshot argument.
2407     * @param fileNames names of the flushed files
2408     * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
2409     * @throws IOException
2410     */
2411    @Override
2412    public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2413        throws IOException {
2414      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
2415      for (String file : fileNames) {
2416        // open the file as a store file (hfile link, etc)
2417        StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2418        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2419        storeFiles.add(storeFile);
2420        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
2421        HStore.this.totalUncompressedBytes
2422            .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
2423        if (LOG.isInfoEnabled()) {
2424          LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2425            " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2426              ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="
2427              + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
2428        }
2429      }
2430
2431      long snapshotId = -1; // -1 means do not drop
2432      if (dropMemstoreSnapshot && snapshot != null) {
2433        snapshotId = snapshot.getId();
2434        snapshot.close();
2435      }
2436      HStore.this.updateStorefiles(storeFiles, snapshotId);
2437    }
2438
2439    /**
2440     * Abort the snapshot preparation. Drops the snapshot if any.
2441     * @throws IOException
2442     */
2443    @Override
2444    public void abort() throws IOException {
2445      if (snapshot != null) {
2446        //We need to close the snapshot when aborting, otherwise, the segment scanner
2447        //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
2448        //can't be released, thus memory leak
2449        snapshot.close();
2450        HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
2451      }
2452    }
2453  }
2454
2455  @Override
2456  public boolean needsCompaction() {
2457    List<HStoreFile> filesCompactingClone = null;
2458    synchronized (filesCompacting) {
2459      filesCompactingClone = Lists.newArrayList(filesCompacting);
2460    }
2461    return this.storeEngine.needsCompaction(filesCompactingClone);
2462  }
2463
2464  /**
2465   * Used for tests.
2466   * @return cache configuration for this Store.
2467   */
2468  @VisibleForTesting
2469  public CacheConfig getCacheConfig() {
2470    return this.cacheConf;
2471  }
2472
2473  public static final long FIXED_OVERHEAD =
2474      ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
2475              + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2476
2477  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2478      + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2479      + ClassSize.CONCURRENT_SKIPLISTMAP
2480      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2481      + ScanInfo.FIXED_OVERHEAD);
2482
2483  @Override
2484  public long heapSize() {
2485    MemStoreSize memstoreSize = this.memstore.size();
2486    return DEEP_OVERHEAD + memstoreSize.getHeapSize();
2487  }
2488
2489  @Override
2490  public CellComparator getComparator() {
2491    return comparator;
2492  }
2493
2494  public ScanInfo getScanInfo() {
2495    return scanInfo;
2496  }
2497
2498  /**
2499   * Set scan info, used by test
2500   * @param scanInfo new scan info to use for test
2501   */
2502  void setScanInfo(ScanInfo scanInfo) {
2503    this.scanInfo = scanInfo;
2504  }
2505
2506  @Override
2507  public boolean hasTooManyStoreFiles() {
2508    return getStorefilesCount() > this.blockingFileCount;
2509  }
2510
2511  @Override
2512  public long getFlushedCellsCount() {
2513    return flushedCellsCount.get();
2514  }
2515
2516  @Override
2517  public long getFlushedCellsSize() {
2518    return flushedCellsSize.get();
2519  }
2520
2521  @Override
2522  public long getFlushedOutputFileSize() {
2523    return flushedOutputFileSize.get();
2524  }
2525
2526  @Override
2527  public long getCompactedCellsCount() {
2528    return compactedCellsCount.get();
2529  }
2530
2531  @Override
2532  public long getCompactedCellsSize() {
2533    return compactedCellsSize.get();
2534  }
2535
2536  @Override
2537  public long getMajorCompactedCellsCount() {
2538    return majorCompactedCellsCount.get();
2539  }
2540
2541  @Override
2542  public long getMajorCompactedCellsSize() {
2543    return majorCompactedCellsSize.get();
2544  }
2545
2546  /**
2547   * Returns the StoreEngine that is backing this concrete implementation of Store.
2548   * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2549   */
2550  @VisibleForTesting
2551  public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2552    return this.storeEngine;
2553  }
2554
2555  protected OffPeakHours getOffPeakHours() {
2556    return this.offPeakHours;
2557  }
2558
2559  /**
2560   * {@inheritDoc}
2561   */
2562  @Override
2563  public void onConfigurationChange(Configuration conf) {
2564    this.conf = new CompoundConfiguration()
2565            .add(conf)
2566            .addBytesMap(family.getValues());
2567    this.storeEngine.compactionPolicy.setConf(conf);
2568    this.offPeakHours = OffPeakHours.getInstance(conf);
2569  }
2570
2571  /**
2572   * {@inheritDoc}
2573   */
2574  @Override
2575  public void registerChildren(ConfigurationManager manager) {
2576    // No children to register
2577  }
2578
2579  /**
2580   * {@inheritDoc}
2581   */
2582  @Override
2583  public void deregisterChildren(ConfigurationManager manager) {
2584    // No children to deregister
2585  }
2586
2587  @Override
2588  public double getCompactionPressure() {
2589    return storeEngine.getStoreFileManager().getCompactionPressure();
2590  }
2591
2592  @Override
2593  public boolean isPrimaryReplicaStore() {
2594    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
2595  }
2596
2597  /**
2598   * Sets the store up for a region level snapshot operation.
2599   * @see #postSnapshotOperation()
2600   */
2601  public void preSnapshotOperation() {
2602    archiveLock.lock();
2603  }
2604
2605  /**
2606   * Perform tasks needed after the completion of snapshot operation.
2607   * @see #preSnapshotOperation()
2608   */
2609  public void postSnapshotOperation() {
2610    archiveLock.unlock();
2611  }
2612
2613  /**
2614   * Closes and archives the compacted files under this store
2615   */
2616  public synchronized void closeAndArchiveCompactedFiles() throws IOException {
2617    // ensure other threads do not attempt to archive the same files on close()
2618    archiveLock.lock();
2619    try {
2620      lock.readLock().lock();
2621      Collection<HStoreFile> copyCompactedfiles = null;
2622      try {
2623        Collection<HStoreFile> compactedfiles =
2624            this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2625        if (CollectionUtils.isNotEmpty(compactedfiles)) {
2626          // Do a copy under read lock
2627          copyCompactedfiles = new ArrayList<>(compactedfiles);
2628        } else {
2629          LOG.trace("No compacted files to archive");
2630        }
2631      } finally {
2632        lock.readLock().unlock();
2633      }
2634      if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
2635        removeCompactedfiles(copyCompactedfiles);
2636      }
2637    } finally {
2638      archiveLock.unlock();
2639    }
2640  }
2641
2642  /**
2643   * Archives and removes the compacted files
2644   * @param compactedfiles The compacted files in this store that are not active in reads
2645   */
2646  private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
2647      throws IOException {
2648    final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
2649    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
2650    for (final HStoreFile file : compactedfiles) {
2651      synchronized (file) {
2652        try {
2653          StoreFileReader r = file.getReader();
2654          if (r == null) {
2655            LOG.debug("The file {} was closed but still not archived", file);
2656            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
2657            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
2658            // for that.
2659            long length = getStoreFileSize(file);
2660            filesToRemove.add(file);
2661            storeFileSizes.add(length);
2662            continue;
2663          }
2664
2665          if (file.isCompactedAway() && !file.isReferencedInReads()) {
2666            // Even if deleting fails we need not bother as any new scanners won't be
2667            // able to use the compacted file as the status is already compactedAway
2668            LOG.trace("Closing and archiving the file {}", file);
2669            // Copy the file size before closing the reader
2670            final long length = r.length();
2671            r.close(true);
2672            // Just close and return
2673            filesToRemove.add(file);
2674            // Only add the length if we successfully added the file to `filesToRemove`
2675            storeFileSizes.add(length);
2676          } else {
2677            LOG.info("Can't archive compacted file " + file.getPath()
2678                + " because of either isCompactedAway=" + file.isCompactedAway()
2679                + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()
2680                + ", refCount=" + r.getRefCount() + ", skipping for now.");
2681          }
2682        } catch (Exception e) {
2683          LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
2684              e);
2685        }
2686      }
2687    }
2688    if (this.isPrimaryReplicaStore()) {
2689      // Only the primary region is allowed to move the file to archive.
2690      // The secondary region does not move the files to archive. Any active reads from
2691      // the secondary region will still work because the file as such has active readers on it.
2692      if (!filesToRemove.isEmpty()) {
2693        LOG.debug("Moving the files {} to archive", filesToRemove);
2694        // Only if this is successful it has to be removed
2695        try {
2696          this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
2697        } catch (FailedArchiveException fae) {
2698          // Even if archiving some files failed, we still need to clear out any of the
2699          // files which were successfully archived.  Otherwise we will receive a
2700          // FileNotFoundException when we attempt to re-archive them in the next go around.
2701          Collection<Path> failedFiles = fae.getFailedFiles();
2702          Iterator<HStoreFile> iter = filesToRemove.iterator();
2703          Iterator<Long> sizeIter = storeFileSizes.iterator();
2704          while (iter.hasNext()) {
2705            sizeIter.next();
2706            if (failedFiles.contains(iter.next().getPath())) {
2707              iter.remove();
2708              sizeIter.remove();
2709            }
2710          }
2711          if (!filesToRemove.isEmpty()) {
2712            clearCompactedfiles(filesToRemove);
2713          }
2714          throw fae;
2715        }
2716      }
2717    }
2718    if (!filesToRemove.isEmpty()) {
2719      // Clear the compactedfiles from the store file manager
2720      clearCompactedfiles(filesToRemove);
2721      // Try to send report of this archival to the Master for updating quota usage faster
2722      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
2723    }
2724  }
2725
2726  /**
2727   * Computes the length of a store file without succumbing to any errors along the way. If an
2728   * error is encountered, the implementation returns {@code 0} instead of the actual size.
2729   *
2730   * @param file The file to compute the size of.
2731   * @return The size in bytes of the provided {@code file}.
2732   */
2733  long getStoreFileSize(HStoreFile file) {
2734    long length = 0;
2735    try {
2736      file.initReader();
2737      length = file.getReader().length();
2738    } catch (IOException e) {
2739      LOG.trace("Failed to open reader when trying to compute store file size, ignoring", e);
2740    } finally {
2741      try {
2742        file.closeStoreFile(
2743            file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
2744      } catch (IOException e) {
2745        LOG.trace("Failed to close reader after computing store file size, ignoring", e);
2746      }
2747    }
2748    return length;
2749  }
2750
2751  public Long preFlushSeqIDEstimation() {
2752    return memstore.preFlushSeqIDEstimation();
2753  }
2754
2755  @Override
2756  public boolean isSloppyMemStore() {
2757    return this.memstore.isSloppy();
2758  }
2759
2760  private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
2761    LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
2762    try {
2763      lock.writeLock().lock();
2764      this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
2765    } finally {
2766      lock.writeLock().unlock();
2767    }
2768  }
2769
2770  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
2771    // Sanity check from the caller
2772    if (archivedFiles.size() != fileSizes.size()) {
2773      throw new RuntimeException("Coding error: should never see lists of varying size");
2774    }
2775    RegionServerServices rss = this.region.getRegionServerServices();
2776    if (rss == null) {
2777      return;
2778    }
2779    List<Entry<String,Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
2780    Iterator<Long> fileSizeIter = fileSizes.iterator();
2781    for (StoreFile storeFile : archivedFiles) {
2782      final long fileSize = fileSizeIter.next();
2783      if (storeFile.isHFile() && fileSize != 0) {
2784        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
2785      }
2786    }
2787    if (LOG.isTraceEnabled()) {
2788      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
2789          + filesWithSizes);
2790    }
2791    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
2792    if (!success) {
2793      LOG.warn("Failed to report archival of files: " + filesWithSizes);
2794    }
2795  }
2796
2797  @Override
2798  public int getCurrentParallelPutCount() {
2799    return currentParallelPutCount.get();
2800  }
2801
2802  public int getStoreRefCount() {
2803    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2804      .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2805      .mapToInt(HStoreFile::getRefCount).sum();
2806  }
2807
2808  /**
2809   * @return get maximum ref count of storeFile among all HStore Files
2810   *   for the HStore
2811   */
2812  public int getMaxStoreFileRefCount() {
2813    OptionalInt maxStoreFileRefCount = this.storeEngine.getStoreFileManager()
2814      .getStorefiles()
2815      .stream()
2816      .filter(sf -> sf.getReader() != null)
2817      .filter(HStoreFile::isHFile)
2818      .mapToInt(HStoreFile::getRefCount)
2819      .max();
2820    return maxStoreFileRefCount.isPresent() ? maxStoreFileRefCount.getAsInt() : 0;
2821  }
2822
2823}