001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetSocketAddress;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.NavigableSet;
034import java.util.Optional;
035import java.util.OptionalDouble;
036import java.util.OptionalInt;
037import java.util.OptionalLong;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CompletionService;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorCompletionService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import java.util.concurrent.atomic.LongAdder;
050import java.util.concurrent.locks.ReentrantLock;
051import java.util.function.Consumer;
052import java.util.function.Supplier;
053import java.util.function.ToLongFunction;
054import java.util.stream.Collectors;
055import java.util.stream.LongStream;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsAction;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellComparator;
062import org.apache.hadoop.hbase.CellUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.MemoryCompactionPolicy;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.backup.FailedArchiveException;
067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.Scan;
070import org.apache.hadoop.hbase.conf.ConfigurationManager;
071import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
072import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
073import org.apache.hadoop.hbase.io.HeapSize;
074import org.apache.hadoop.hbase.io.hfile.CacheConfig;
075import org.apache.hadoop.hbase.io.hfile.HFile;
076import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
077import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
078import org.apache.hadoop.hbase.io.hfile.HFileScanner;
079import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
080import org.apache.hadoop.hbase.monitoring.MonitoredTask;
081import org.apache.hadoop.hbase.quotas.RegionSizeStore;
082import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
083import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
084import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
085import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
086import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
087import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
088import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
089import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
090import org.apache.hadoop.hbase.security.EncryptionUtil;
091import org.apache.hadoop.hbase.security.User;
092import org.apache.hadoop.hbase.util.Bytes;
093import org.apache.hadoop.hbase.util.ClassSize;
094import org.apache.hadoop.hbase.util.CommonFSUtils;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.Pair;
097import org.apache.hadoop.hbase.util.ReflectionUtils;
098import org.apache.hadoop.util.StringUtils;
099import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
100import org.apache.yetus.audience.InterfaceAudience;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
107import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
108import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
114
115/**
116 * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles,
117 * which stretch backwards over time.
118 * <p>
119 * There's no reason to consider append-logging at this level; all logging and locking is handled at
120 * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most
121 * important of those services is compaction services where files are aggregated once they pass a
122 * configurable threshold.
123 * <p>
124 * Locking and transactions are handled at a higher level. This API should not be called directly
125 * but by an HRegion manager.
126 */
127@InterfaceAudience.Private
128public class HStore
129  implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
130  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
131  public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
132    "hbase.server.compactchecker.interval.multiplier";
133  public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
134  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
135  // "NONE" is not a valid storage policy and means we defer the policy to HDFS
136  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE";
137  public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
138  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
139
140  // HBASE-24428 : Update compaction priority for recently split daughter regions
141  // so as to prioritize their compaction.
142  // Any compaction candidate with higher priority than compaction of newly split daugher regions
143  // should have priority value < (Integer.MIN_VALUE + 1000)
144  private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
145
146  private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
147
148  protected final MemStore memstore;
149  // This stores directory in the filesystem.
150  private final HRegion region;
151  protected Configuration conf;
152  private long lastCompactSize = 0;
153  volatile boolean forceMajor = false;
154  private AtomicLong storeSize = new AtomicLong();
155  private AtomicLong totalUncompressedBytes = new AtomicLong();
156  private LongAdder memstoreOnlyRowReadsCount = new LongAdder();
157  // rows that has cells from both memstore and files (or only files)
158  private LongAdder mixedRowReadsCount = new LongAdder();
159
160  /**
161   * Lock specific to archiving compacted store files. This avoids races around the combination of
162   * retrieving the list of compacted files and moving them to the archive directory. Since this is
163   * usually a background process (other than on close), we don't want to handle this with the store
164   * write lock, which would block readers and degrade performance. Locked by: -
165   * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close()
166   */
167  final ReentrantLock archiveLock = new ReentrantLock();
168
169  private final boolean verifyBulkLoads;
170
171  /**
172   * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
173   * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a
174   * message that identifies the Store experience this high-level of concurrency.
175   */
176  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
177  private final int parallelPutCountPrintThreshold;
178
179  private ScanInfo scanInfo;
180
181  // All access must be synchronized.
182  // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.
183  private final List<HStoreFile> filesCompacting = Lists.newArrayList();
184
185  // All access must be synchronized.
186  private final Set<ChangedReadersObserver> changedReaderObservers =
187    Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
188
189  private HFileDataBlockEncoder dataBlockEncoder;
190
191  final StoreEngine<?, ?, ?, ?> storeEngine;
192
193  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
194  private volatile OffPeakHours offPeakHours;
195
196  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
197  private int flushRetriesNumber;
198  private int pauseTime;
199
200  private long blockingFileCount;
201  private int compactionCheckMultiplier;
202
203  private AtomicLong flushedCellsCount = new AtomicLong();
204  private AtomicLong compactedCellsCount = new AtomicLong();
205  private AtomicLong majorCompactedCellsCount = new AtomicLong();
206  private AtomicLong flushedCellsSize = new AtomicLong();
207  private AtomicLong flushedOutputFileSize = new AtomicLong();
208  private AtomicLong compactedCellsSize = new AtomicLong();
209  private AtomicLong majorCompactedCellsSize = new AtomicLong();
210
211  private final StoreContext storeContext;
212
213  // Used to track the store files which are currently being written. For compaction, if we want to
214  // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
215  // track the store files being written when flushing.
216  // Notice that the creation is in the background compaction or flush thread and we will get the
217  // files in other thread, so it needs to be thread safe.
218  private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
219
220    private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
221
222    @Override
223    public void accept(Path t) {
224      files.add(t);
225    }
226
227    public Set<Path> get() {
228      return Collections.unmodifiableSet(files);
229    }
230  }
231
232  // We may have multiple compaction running at the same time, and flush can also happen at the same
233  // time, so here we need to use a collection, and the collection needs to be thread safe.
234  // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
235  // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
236  // IdentityHashMap if later we want to implement hashCode or equals.
237  private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
238    Collections.newSetFromMap(new ConcurrentHashMap<>());
239
240  // For the SFT implementation which we will write tmp store file first, we do not need to clean up
241  // the broken store files under the data directory, which means we do not need to track the store
242  // file writer creation. So here we abstract a factory to return different trackers for different
243  // SFT implementations.
244  private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
245
246  /**
247   * Constructor
248   * @param family    HColumnDescriptor for this column
249   * @param confParam configuration object failed. Can be null.
250   */
251  protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
252    final Configuration confParam, boolean warmup) throws IOException {
253    this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
254
255    this.region = region;
256    this.storeContext = initializeStoreContext(family);
257
258    // Assemble the store's home directory and Ensure it exists.
259    region.getRegionFileSystem().createStoreDir(family.getNameAsString());
260
261    // set block storage policy for store directory
262    String policyName = family.getStoragePolicy();
263    if (null == policyName) {
264      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
265    }
266    region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());
267
268    this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
269
270    // used by ScanQueryMatcher
271    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
272    LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this);
273    // Get TTL
274    long ttl = determineTTLFromFamily(family);
275    // Why not just pass a HColumnDescriptor in here altogether? Even if have
276    // to clone it?
277    scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, region.getCellComparator());
278    this.memstore = getMemstore();
279
280    this.offPeakHours = OffPeakHours.getInstance(conf);
281
282    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
283
284    this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
285    this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY,
286      DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
287    if (this.compactionCheckMultiplier <= 0) {
288      LOG.error("Compaction check period multiplier must be positive, setting default: {}",
289        DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
290      this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
291    }
292
293    this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
294    storeEngine.initialize(warmup);
295    // if require writing to tmp dir first, then we just return null, which indicate that we do not
296    // need to track the creation of store file writer, otherwise we return a new
297    // StoreFileWriterCreationTracker.
298    this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst()
299      ? () -> null
300      : () -> new StoreFileWriterCreationTracker();
301    refreshStoreSizeAndTotalBytes();
302
303    flushRetriesNumber =
304      conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
305    pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
306    if (flushRetriesNumber <= 0) {
307      throw new IllegalArgumentException(
308        "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber);
309    }
310
311    int confPrintThreshold =
312      this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
313    if (confPrintThreshold < 10) {
314      confPrintThreshold = 10;
315    }
316    this.parallelPutCountPrintThreshold = confPrintThreshold;
317
318    LOG.info(
319      "Store={},  memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
320        + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
321      this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
322      parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType());
323  }
324
325  private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
326    return new StoreContext.Builder().withBlockSize(family.getBlocksize())
327      .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))
328      .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family))
329      .withCellComparator(region.getCellComparator()).withColumnFamilyDescriptor(family)
330      .withCompactedFilesSupplier(this::getCompactedFiles)
331      .withRegionFileSystem(region.getRegionFileSystem())
332      .withFavoredNodesSupplier(this::getFavoredNodes)
333      .withFamilyStoreDirectoryPath(
334        region.getRegionFileSystem().getStoreDir(family.getNameAsString()))
335      .withRegionCoprocessorHost(region.getCoprocessorHost()).build();
336  }
337
338  private InetSocketAddress[] getFavoredNodes() {
339    InetSocketAddress[] favoredNodes = null;
340    if (region.getRegionServerServices() != null) {
341      favoredNodes = region.getRegionServerServices()
342        .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
343    }
344    return favoredNodes;
345  }
346
347  /** Returns MemStore Instance to use in this store. */
348  private MemStore getMemstore() {
349    MemStore ms = null;
350    // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
351    MemoryCompactionPolicy inMemoryCompaction = null;
352    if (this.getTableName().isSystemTable()) {
353      inMemoryCompaction = MemoryCompactionPolicy
354        .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE").toUpperCase());
355    } else {
356      inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();
357    }
358    if (inMemoryCompaction == null) {
359      inMemoryCompaction =
360        MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
361          CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());
362    }
363
364    switch (inMemoryCompaction) {
365      case NONE:
366        Class<? extends MemStore> memStoreClass =
367          conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class);
368        ms = ReflectionUtils.newInstance(memStoreClass,
369          new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() });
370        break;
371      default:
372        Class<? extends CompactingMemStore> compactingMemStoreClass =
373          conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
374        ms =
375          ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(),
376            this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
377    }
378    return ms;
379  }
380
381  /**
382   * Creates the cache config.
383   * @param family The current column family.
384   */
385  protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {
386    CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
387      region.getRegionServicesForStores().getByteBuffAllocator());
388    LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,
389      family.getNameAsString(), region.getRegionInfo().getEncodedName());
390    return cacheConf;
391  }
392
393  /**
394   * Creates the store engine configured for the given Store.
395   * @param store        The store. An unfortunate dependency needed due to it being passed to
396   *                     coprocessors via the compactor.
397   * @param conf         Store configuration.
398   * @param kvComparator KVComparator for storeFileManager.
399   * @return StoreEngine to use.
400   */
401  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
402    CellComparator kvComparator) throws IOException {
403    return StoreEngine.create(store, conf, kvComparator);
404  }
405
406  /** Returns TTL in seconds of the specified family */
407  public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
408    // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
409    long ttl = family.getTimeToLive();
410    if (ttl == HConstants.FOREVER) {
411      // Default is unlimited ttl.
412      ttl = Long.MAX_VALUE;
413    } else if (ttl == -1) {
414      ttl = Long.MAX_VALUE;
415    } else {
416      // Second -> ms adjust for user data
417      ttl *= 1000;
418    }
419    return ttl;
420  }
421
422  StoreContext getStoreContext() {
423    return storeContext;
424  }
425
426  @Override
427  public String getColumnFamilyName() {
428    return this.storeContext.getFamily().getNameAsString();
429  }
430
431  @Override
432  public TableName getTableName() {
433    return this.getRegionInfo().getTable();
434  }
435
436  @Override
437  public FileSystem getFileSystem() {
438    return storeContext.getRegionFileSystem().getFileSystem();
439  }
440
441  public HRegionFileSystem getRegionFileSystem() {
442    return storeContext.getRegionFileSystem();
443  }
444
445  /* Implementation of StoreConfigInformation */
446  @Override
447  public long getStoreFileTtl() {
448    // TTL only applies if there's no MIN_VERSIONs setting on the column.
449    return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
450  }
451
452  @Override
453  public long getMemStoreFlushSize() {
454    // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack
455    return this.region.memstoreFlushSize;
456  }
457
458  @Override
459  public MemStoreSize getFlushableSize() {
460    return this.memstore.getFlushableSize();
461  }
462
463  @Override
464  public MemStoreSize getSnapshotSize() {
465    return this.memstore.getSnapshotSize();
466  }
467
468  @Override
469  public long getCompactionCheckMultiplier() {
470    return this.compactionCheckMultiplier;
471  }
472
473  @Override
474  public long getBlockingFileCount() {
475    return blockingFileCount;
476  }
477  /* End implementation of StoreConfigInformation */
478
479  @Override
480  public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
481    return this.storeContext.getFamily();
482  }
483
484  @Override
485  public OptionalLong getMaxSequenceId() {
486    return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());
487  }
488
489  @Override
490  public OptionalLong getMaxMemStoreTS() {
491    return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());
492  }
493
494  /** Returns the data block encoder */
495  public HFileDataBlockEncoder getDataBlockEncoder() {
496    return dataBlockEncoder;
497  }
498
499  /**
500   * Should be used only in tests.
501   * @param blockEncoder the block delta encoder to use
502   */
503  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
504    this.dataBlockEncoder = blockEncoder;
505  }
506
507  private void postRefreshStoreFiles() throws IOException {
508    // Advance the memstore read point to be at least the new store files seqIds so that
509    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
510    // in-flight transactions might be made visible)
511    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
512    refreshStoreSizeAndTotalBytes();
513  }
514
515  @Override
516  public void refreshStoreFiles() throws IOException {
517    storeEngine.refreshStoreFiles();
518    postRefreshStoreFiles();
519  }
520
521  /**
522   * Replaces the store files that the store has with the given files. Mainly used by secondary
523   * region replicas to keep up to date with the primary region files.
524   */
525  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
526    storeEngine.refreshStoreFiles(newFiles);
527    postRefreshStoreFiles();
528  }
529
530  /**
531   * This message intends to inform the MemStore that next coming updates are going to be part of
532   * the replaying edits from WAL
533   */
534  public void startReplayingFromWAL() {
535    this.memstore.startReplayingFromWAL();
536  }
537
538  /**
539   * This message intends to inform the MemStore that the replaying edits from WAL are done
540   */
541  public void stopReplayingFromWAL() {
542    this.memstore.stopReplayingFromWAL();
543  }
544
545  /**
546   * Adds a value to the memstore
547   */
548  public void add(final Cell cell, MemStoreSizing memstoreSizing) {
549    storeEngine.readLock();
550    try {
551      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
552        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
553          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
554      }
555      this.memstore.add(cell, memstoreSizing);
556    } finally {
557      storeEngine.readUnlock();
558      currentParallelPutCount.decrementAndGet();
559    }
560  }
561
562  /**
563   * Adds the specified value to the memstore
564   */
565  public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
566    storeEngine.readLock();
567    try {
568      if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
569        LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
570          this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());
571      }
572      memstore.add(cells, memstoreSizing);
573    } finally {
574      storeEngine.readUnlock();
575      currentParallelPutCount.decrementAndGet();
576    }
577  }
578
579  @Override
580  public long timeOfOldestEdit() {
581    return memstore.timeOfOldestEdit();
582  }
583
584  /** Returns All store files. */
585  @Override
586  public Collection<HStoreFile> getStorefiles() {
587    return this.storeEngine.getStoreFileManager().getStorefiles();
588  }
589
590  @Override
591  public Collection<HStoreFile> getCompactedFiles() {
592    return this.storeEngine.getStoreFileManager().getCompactedfiles();
593  }
594
595  /**
596   * This throws a WrongRegionException if the HFile does not fit in this region, or an
597   * InvalidHFileException if the HFile is not valid.
598   */
599  public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
600    HFile.Reader reader = null;
601    try {
602      LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);
603      FileSystem srcFs = srcPath.getFileSystem(conf);
604      srcFs.access(srcPath, FsAction.READ_WRITE);
605      reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);
606
607      Optional<byte[]> firstKey = reader.getFirstRowKey();
608      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
609      Optional<Cell> lk = reader.getLastKey();
610      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
611      byte[] lastKey = CellUtil.cloneRow(lk.get());
612
613      if (LOG.isDebugEnabled()) {
614        LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last="
615          + Bytes.toStringBinary(lastKey));
616        LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey())
617          + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
618      }
619
620      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
621        throw new WrongRegionException("Bulk load file " + srcPath.toString()
622          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
623      }
624
625      if (
626        reader.length()
627            > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
628      ) {
629        LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length()
630          + " bytes can be problematic as it may lead to oversplitting.");
631      }
632
633      if (verifyBulkLoads) {
634        long verificationStartTime = EnvironmentEdgeManager.currentTime();
635        LOG.info("Full verification started for bulk load hfile: {}", srcPath);
636        Cell prevCell = null;
637        HFileScanner scanner = reader.getScanner(conf, false, false, false);
638        scanner.seekTo();
639        do {
640          Cell cell = scanner.getCell();
641          if (prevCell != null) {
642            if (getComparator().compareRows(prevCell, cell) > 0) {
643              throw new InvalidHFileException("Previous row is greater than" + " current row: path="
644                + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current="
645                + CellUtil.getCellKeyAsString(cell));
646            }
647            if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {
648              throw new InvalidHFileException("Previous key had different"
649                + " family compared to current key: path=" + srcPath + " previous="
650                + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
651                  prevCell.getFamilyLength())
652                + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
653                  cell.getFamilyLength()));
654            }
655          }
656          prevCell = cell;
657        } while (scanner.next());
658        LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took "
659          + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms");
660      }
661    } finally {
662      if (reader != null) {
663        reader.close();
664      }
665    }
666  }
667
668  /**
669   * This method should only be called from Region. It is assumed that the ranges of values in the
670   * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
671   * @param seqNum sequence Id associated with the HFile
672   */
673  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
674    Path srcPath = new Path(srcPathStr);
675    return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
676  }
677
678  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
679    Path srcPath = new Path(srcPathStr);
680    try {
681      getRegionFileSystem().commitStoreFile(srcPath, dstPath);
682    } finally {
683      if (this.getCoprocessorHost() != null) {
684        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
685      }
686    }
687
688    LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath
689      + " - updating store file list.");
690
691    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
692    bulkLoadHFile(sf);
693
694    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
695
696    return dstPath;
697  }
698
699  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
700    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
701    bulkLoadHFile(sf);
702  }
703
704  private void bulkLoadHFile(HStoreFile sf) throws IOException {
705    StoreFileReader r = sf.getReader();
706    this.storeSize.addAndGet(r.length());
707    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
708    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
709    });
710    LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
711    if (LOG.isTraceEnabled()) {
712      String traceMessage = "BULK LOAD time,size,store size,store files ["
713        + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + ","
714        + storeEngine.getStoreFileManager().getStorefileCount() + "]";
715      LOG.trace(traceMessage);
716    }
717  }
718
719  private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
720    // Clear so metrics doesn't find them.
721    ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
722    Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
723    // clear the compacted files
724    if (CollectionUtils.isNotEmpty(compactedfiles)) {
725      removeCompactedfiles(compactedfiles,
726        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
727    }
728    if (!result.isEmpty()) {
729      // initialize the thread pool for closing store files in parallel.
730      ThreadPoolExecutor storeFileCloserThreadPool =
731        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
732          + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
733
734      // close each store file in parallel
735      CompletionService<Void> completionService =
736        new ExecutorCompletionService<>(storeFileCloserThreadPool);
737      for (HStoreFile f : result) {
738        completionService.submit(new Callable<Void>() {
739          @Override
740          public Void call() throws IOException {
741            boolean evictOnClose =
742              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
743            f.closeStoreFile(evictOnClose);
744            return null;
745          }
746        });
747      }
748
749      IOException ioe = null;
750      try {
751        for (int i = 0; i < result.size(); i++) {
752          try {
753            Future<Void> future = completionService.take();
754            future.get();
755          } catch (InterruptedException e) {
756            if (ioe == null) {
757              ioe = new InterruptedIOException();
758              ioe.initCause(e);
759            }
760          } catch (ExecutionException e) {
761            if (ioe == null) {
762              ioe = new IOException(e.getCause());
763            }
764          }
765        }
766      } finally {
767        storeFileCloserThreadPool.shutdownNow();
768      }
769      if (ioe != null) {
770        throw ioe;
771      }
772    }
773    LOG.trace("Closed {}", this);
774    return result;
775  }
776
777  /**
778   * Close all the readers We don't need to worry about subsequent requests because the Region holds
779   * a write lock that will prevent any more reads or writes.
780   * @return the {@link StoreFile StoreFiles} that were previously being used.
781   * @throws IOException on failure
782   */
783  public ImmutableCollection<HStoreFile> close() throws IOException {
784    // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
785    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
786    // Change later if findbugs becomes smarter in the future.
787    this.archiveLock.lock();
788    try {
789      this.storeEngine.writeLock();
790      try {
791        return closeWithoutLock();
792      } finally {
793        this.storeEngine.writeUnlock();
794      }
795    } finally {
796      this.archiveLock.unlock();
797    }
798  }
799
800  /**
801   * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called
802   * previously.
803   * @param logCacheFlushId flush sequence number
804   * @return The path name of the tmp file to which the store was flushed
805   * @throws IOException if exception occurs during process
806   */
807  protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
808    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
809    Consumer<Path> writerCreationTracker) throws IOException {
810    // If an exception happens flushing, we let it out without clearing
811    // the memstore snapshot. The old snapshot will be returned when we say
812    // 'snapshot', the next time flush comes around.
813    // Retry after catching exception when flushing, otherwise server will abort
814    // itself
815    StoreFlusher flusher = storeEngine.getStoreFlusher();
816    IOException lastException = null;
817    for (int i = 0; i < flushRetriesNumber; i++) {
818      try {
819        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status,
820          throughputController, tracker, writerCreationTracker);
821        Path lastPathName = null;
822        try {
823          for (Path pathName : pathNames) {
824            lastPathName = pathName;
825            storeEngine.validateStoreFile(pathName);
826          }
827          return pathNames;
828        } catch (Exception e) {
829          LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
830          if (e instanceof IOException) {
831            lastException = (IOException) e;
832          } else {
833            lastException = new IOException(e);
834          }
835        }
836      } catch (IOException e) {
837        LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e);
838        lastException = e;
839      }
840      if (lastException != null && i < (flushRetriesNumber - 1)) {
841        try {
842          Thread.sleep(pauseTime);
843        } catch (InterruptedException e) {
844          IOException iie = new InterruptedIOException();
845          iie.initCause(e);
846          throw iie;
847        }
848      }
849    }
850    throw lastException;
851  }
852
853  public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
854    LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this);
855    FileSystem srcFs = path.getFileSystem(conf);
856    srcFs.access(path, FsAction.READ_WRITE);
857    try (HFile.Reader reader =
858      HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {
859      Optional<byte[]> firstKey = reader.getFirstRowKey();
860      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
861      Optional<Cell> lk = reader.getLastKey();
862      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
863      byte[] lastKey = CellUtil.cloneRow(lk.get());
864      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
865        throw new WrongRegionException("Recovered hfile " + path.toString()
866          + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
867      }
868    }
869
870    Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
871    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
872    StoreFileReader r = sf.getReader();
873    this.storeSize.addAndGet(r.length());
874    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
875
876    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
877    });
878
879    LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
880      r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
881    return sf;
882  }
883
884  private long getTotalSize(Collection<HStoreFile> sfs) {
885    return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
886  }
887
888  private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
889    // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
890    // close {@link DefaultMemStore#snapshot}, which may be used by
891    // {@link DefaultMemStore#getScanners}.
892    storeEngine.addStoreFiles(sfs,
893      snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
894      });
895    // notify to be called here - only in case of flushes
896    notifyChangedReadersObservers(sfs);
897    if (LOG.isTraceEnabled()) {
898      long totalSize = getTotalSize(sfs);
899      String traceMessage = "FLUSH time,count,size,store size,store files ["
900        + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + ","
901        + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
902      LOG.trace(traceMessage);
903    }
904    return needsCompaction();
905  }
906
907  /**
908   * Notify all observers that set of Readers has changed.
909   */
910  private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
911    for (ChangedReadersObserver o : this.changedReaderObservers) {
912      List<KeyValueScanner> memStoreScanners;
913      this.storeEngine.readLock();
914      try {
915        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
916      } finally {
917        this.storeEngine.readUnlock();
918      }
919      o.updateReaders(sfs, memStoreScanners);
920    }
921  }
922
923  /**
924   * Get all scanners with no filtering based on TTL (that happens further down the line).
925   * @param cacheBlocks  cache the blocks or not
926   * @param usePread     true to use pread, false if not
927   * @param isCompaction true if the scanner is created for compaction
928   * @param matcher      the scan query matcher
929   * @param startRow     the start row
930   * @param stopRow      the stop row
931   * @param readPt       the read point of the current scan
932   * @return all scanners for this store
933   */
934  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,
935    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)
936    throws IOException {
937    return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,
938      readPt);
939  }
940
941  /**
942   * Get all scanners with no filtering based on TTL (that happens further down the line).
943   * @param cacheBlocks     cache the blocks or not
944   * @param usePread        true to use pread, false if not
945   * @param isCompaction    true if the scanner is created for compaction
946   * @param matcher         the scan query matcher
947   * @param startRow        the start row
948   * @param includeStartRow true to include start row, false if not
949   * @param stopRow         the stop row
950   * @param includeStopRow  true to include stop row, false if not
951   * @param readPt          the read point of the current scan
952   * @return all scanners for this store
953   */
954  public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
955    boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
956    byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
957    Collection<HStoreFile> storeFilesToScan;
958    List<KeyValueScanner> memStoreScanners;
959    this.storeEngine.readLock();
960    try {
961      storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
962        includeStartRow, stopRow, includeStopRow);
963      memStoreScanners = this.memstore.getScanners(readPt);
964    } finally {
965      this.storeEngine.readUnlock();
966    }
967
968    try {
969      // First the store file scanners
970
971      // TODO this used to get the store files in descending order,
972      // but now we get them in ascending order, which I think is
973      // actually more correct, since memstore get put at the end.
974      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
975        storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
976      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
977      scanners.addAll(sfScanners);
978      // Then the memstore scanners
979      scanners.addAll(memStoreScanners);
980      return scanners;
981    } catch (Throwable t) {
982      clearAndClose(memStoreScanners);
983      throw t instanceof IOException ? (IOException) t : new IOException(t);
984    }
985  }
986
987  private static void clearAndClose(List<KeyValueScanner> scanners) {
988    if (scanners == null) {
989      return;
990    }
991    for (KeyValueScanner s : scanners) {
992      s.close();
993    }
994    scanners.clear();
995  }
996
997  /**
998   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
999   * (that happens further down the line).
1000   * @param files                  the list of files on which the scanners has to be created
1001   * @param cacheBlocks            cache the blocks or not
1002   * @param usePread               true to use pread, false if not
1003   * @param isCompaction           true if the scanner is created for compaction
1004   * @param matcher                the scan query matcher
1005   * @param startRow               the start row
1006   * @param stopRow                the stop row
1007   * @param readPt                 the read point of the current scan
1008   * @param includeMemstoreScanner true if memstore has to be included
1009   * @return scanners on the given files and on the memstore if specified
1010   */
1011  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1012    boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1013    byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner)
1014    throws IOException {
1015    return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,
1016      false, readPt, includeMemstoreScanner);
1017  }
1018
1019  /**
1020   * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
1021   * (that happens further down the line).
1022   * @param files                  the list of files on which the scanners has to be created
1023   * @param cacheBlocks            ache the blocks or not
1024   * @param usePread               true to use pread, false if not
1025   * @param isCompaction           true if the scanner is created for compaction
1026   * @param matcher                the scan query matcher
1027   * @param startRow               the start row
1028   * @param includeStartRow        true to include start row, false if not
1029   * @param stopRow                the stop row
1030   * @param includeStopRow         true to include stop row, false if not
1031   * @param readPt                 the read point of the current scan
1032   * @param includeMemstoreScanner true if memstore has to be included
1033   * @return scanners on the given files and on the memstore if specified
1034   */
1035  public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1036    boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1037    boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1038    boolean includeMemstoreScanner) throws IOException {
1039    List<KeyValueScanner> memStoreScanners = null;
1040    if (includeMemstoreScanner) {
1041      this.storeEngine.readLock();
1042      try {
1043        memStoreScanners = this.memstore.getScanners(readPt);
1044      } finally {
1045        this.storeEngine.readUnlock();
1046      }
1047    }
1048    try {
1049      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
1050        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
1051      List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
1052      scanners.addAll(sfScanners);
1053      // Then the memstore scanners
1054      if (memStoreScanners != null) {
1055        scanners.addAll(memStoreScanners);
1056      }
1057      return scanners;
1058    } catch (Throwable t) {
1059      clearAndClose(memStoreScanners);
1060      throw t instanceof IOException ? (IOException) t : new IOException(t);
1061    }
1062  }
1063
1064  /**
1065   * @param o Observer who wants to know about changes in set of Readers
1066   */
1067  public void addChangedReaderObserver(ChangedReadersObserver o) {
1068    this.changedReaderObservers.add(o);
1069  }
1070
1071  /**
1072   * @param o Observer no longer interested in changes in set of Readers.
1073   */
1074  public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1075    // We don't check if observer present; it may not be (legitimately)
1076    this.changedReaderObservers.remove(o);
1077  }
1078
1079  //////////////////////////////////////////////////////////////////////////////
1080  // Compaction
1081  //////////////////////////////////////////////////////////////////////////////
1082
1083  /**
1084   * Compact the StoreFiles. This method may take some time, so the calling thread must be able to
1085   * block for long periods.
1086   * <p>
1087   * During this time, the Store can work as usual, getting values from StoreFiles and writing new
1088   * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted
1089   * StoreFile is completely written-out to disk.
1090   * <p>
1091   * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from
1092   * interfering with other write operations.
1093   * <p>
1094   * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and
1095   * we want to allow cache-flushes during this period.
1096   * <p>
1097   * Compaction event should be idempotent, since there is no IO Fencing for the region directory in
1098   * hdfs. A region server might still try to complete the compaction after it lost the region. That
1099   * is why the following events are carefully ordered for a compaction: 1. Compaction writes new
1100   * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the
1101   * temporary file under region directory 3. Compaction appends a WAL edit containing the
1102   * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files
1103   * from the region directory. Failure conditions are handled like this: - If RS fails before 2,
1104   * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only
1105   * write the new data file to the region directory. Since we already have this data, this will be
1106   * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the
1107   * region will have a redundant copy of the data. The RS that failed won't be able to finish
1108   * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region
1109   * server who opens the region will pick up the the compaction marker from the WAL and replay it
1110   * by removing the compaction input files. Failed RS can also attempt to delete those files, but
1111   * the operation will be idempotent See HBASE-2231 for details.
1112   * @param compaction compaction details obtained from requestCompaction()
1113   * @return Storefile we compacted into or null if we failed or opted out early.
1114   */
1115  public List<HStoreFile> compact(CompactionContext compaction,
1116    ThroughputController throughputController, User user) throws IOException {
1117    assert compaction != null;
1118    CompactionRequestImpl cr = compaction.getRequest();
1119    StoreFileWriterCreationTracker writerCreationTracker =
1120      storeFileWriterCreationTrackerFactory.get();
1121    if (writerCreationTracker != null) {
1122      cr.setWriterCreationTracker(writerCreationTracker);
1123      storeFileWriterCreationTrackers.add(writerCreationTracker);
1124    }
1125    try {
1126      // Do all sanity checking in here if we have a valid CompactionRequestImpl
1127      // because we need to clean up after it on the way out in a finally
1128      // block below
1129      long compactionStartTime = EnvironmentEdgeManager.currentTime();
1130      assert compaction.hasSelection();
1131      Collection<HStoreFile> filesToCompact = cr.getFiles();
1132      assert !filesToCompact.isEmpty();
1133      synchronized (filesCompacting) {
1134        // sanity check: we're compacting files that this store knows about
1135        // TODO: change this to LOG.error() after more debugging
1136        Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1137      }
1138
1139      // Ready to go. Have list of files to compact.
1140      LOG.info("Starting compaction of " + filesToCompact + " into tmpdir="
1141        + getRegionFileSystem().getTempDir() + ", totalSize="
1142        + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1143
1144      return doCompaction(cr, filesToCompact, user, compactionStartTime,
1145        compaction.compact(throughputController, user));
1146    } finally {
1147      finishCompactionRequest(cr);
1148    }
1149  }
1150
1151  protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
1152    Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles)
1153    throws IOException {
1154    // Do the steps necessary to complete the compaction.
1155    setStoragePolicyFromFileName(newFiles);
1156    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
1157    if (this.getCoprocessorHost() != null) {
1158      for (HStoreFile sf : sfs) {
1159        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
1160      }
1161    }
1162    replaceStoreFiles(filesToCompact, sfs, true);
1163
1164    long outputBytes = getTotalSize(sfs);
1165
1166    // At this point the store will use new files for all new scanners.
1167    refreshStoreSizeAndTotalBytes(); // update store size.
1168
1169    long now = EnvironmentEdgeManager.currentTime();
1170    if (
1171      region.getRegionServerServices() != null
1172        && region.getRegionServerServices().getMetrics() != null
1173    ) {
1174      region.getRegionServerServices().getMetrics().updateCompaction(
1175        region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(),
1176        now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
1177        outputBytes);
1178
1179    }
1180
1181    logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1182    return sfs;
1183  }
1184
1185  // Set correct storage policy from the file name of DTCP.
1186  // Rename file will not change the storage policy.
1187  private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
1188    String prefix = HConstants.STORAGE_POLICY_PREFIX;
1189    for (Path newFile : newFiles) {
1190      if (newFile.getParent().getName().startsWith(prefix)) {
1191        CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,
1192          newFile.getParent().getName().substring(prefix.length()));
1193      }
1194    }
1195  }
1196
1197  /**
1198   * Writes the compaction WAL record.
1199   * @param filesCompacted Files compacted (input).
1200   * @param newFiles       Files from compaction.
1201   */
1202  private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
1203    Collection<HStoreFile> newFiles) throws IOException {
1204    if (region.getWAL() == null) {
1205      return;
1206    }
1207    List<Path> inputPaths =
1208      filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1209    List<Path> outputPaths =
1210      newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
1211    RegionInfo info = this.region.getRegionInfo();
1212    CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1213      getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,
1214      getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));
1215    // Fix reaching into Region to get the maxWaitForSeqId.
1216    // Does this method belong in Region altogether given it is making so many references up there?
1217    // Could be Region#writeCompactionMarker(compactionDescriptor);
1218    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
1219      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
1220  }
1221
1222  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
1223      allowedOnPath = ".*/(HStore|TestHStore).java")
1224  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
1225    boolean writeCompactionMarker) throws IOException {
1226    storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
1227      if (writeCompactionMarker) {
1228        writeCompactionWalRecord(compactedFiles, result);
1229      }
1230    }, () -> {
1231      synchronized (filesCompacting) {
1232        filesCompacting.removeAll(compactedFiles);
1233      }
1234    });
1235    // These may be null when the RS is shutting down. The space quota Chores will fix the Region
1236    // sizes later so it's not super-critical if we miss these.
1237    RegionServerServices rsServices = region.getRegionServerServices();
1238    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
1239      updateSpaceQuotaAfterFileReplacement(
1240        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
1241        compactedFiles, result);
1242    }
1243  }
1244
1245  /**
1246   * Updates the space quota usage for this region, removing the size for files compacted away and
1247   * adding in the size for new files.
1248   * @param sizeStore  The object tracking changes in region size for space quotas.
1249   * @param regionInfo The identifier for the region whose size is being updated.
1250   * @param oldFiles   Files removed from this store's region.
1251   * @param newFiles   Files added to this store's region.
1252   */
1253  void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo,
1254    Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) {
1255    long delta = 0;
1256    if (oldFiles != null) {
1257      for (HStoreFile compactedFile : oldFiles) {
1258        if (compactedFile.isHFile()) {
1259          delta -= compactedFile.getReader().length();
1260        }
1261      }
1262    }
1263    if (newFiles != null) {
1264      for (HStoreFile newFile : newFiles) {
1265        if (newFile.isHFile()) {
1266          delta += newFile.getReader().length();
1267        }
1268      }
1269    }
1270    sizeStore.incrementRegionSize(regionInfo, delta);
1271  }
1272
1273  /**
1274   * Log a very elaborate compaction completion message.
1275   * @param cr                  Request.
1276   * @param sfs                 Resulting files.
1277   * @param compactionStartTime Start time.
1278   */
1279  private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now,
1280    long compactionStartTime) {
1281    StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "")
1282      + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "")
1283      + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");
1284    if (sfs.isEmpty()) {
1285      message.append("none, ");
1286    } else {
1287      for (HStoreFile sf : sfs) {
1288        message.append(sf.getPath().getName());
1289        message.append("(size=");
1290        message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1291        message.append("), ");
1292      }
1293    }
1294    message.append("total size for store is ")
1295      .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
1296      .append(". This selection was in queue for ")
1297      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1298      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1299      .append(" to execute.");
1300    LOG.info(message.toString());
1301    if (LOG.isTraceEnabled()) {
1302      int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1303      long resultSize = getTotalSize(sfs);
1304      String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1305        + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1306        + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1307      LOG.trace(traceMessage);
1308    }
1309  }
1310
1311  /**
1312   * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was
1313   * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231.
1314   */
1315  public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
1316    boolean removeFiles) throws IOException {
1317    LOG.debug("Completing compaction from the WAL marker");
1318    List<String> compactionInputs = compaction.getCompactionInputList();
1319    List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1320
1321    // The Compaction Marker is written after the compaction is completed,
1322    // and the files moved into the region/family folder.
1323    //
1324    // If we crash after the entry is written, we may not have removed the
1325    // input files, but the output file is present.
1326    // (The unremoved input files will be removed by this function)
1327    //
1328    // If we scan the directory and the file is not present, it can mean that:
1329    // - The file was manually removed by the user
1330    // - The file was removed as consequence of subsequent compaction
1331    // so, we can't do anything with the "compaction output list" because those
1332    // files have already been loaded when opening the region (by virtue of
1333    // being in the store's folder) or they may be missing due to a compaction.
1334
1335    String familyName = this.getColumnFamilyName();
1336    Set<String> inputFiles = new HashSet<>();
1337    for (String compactionInput : compactionInputs) {
1338      Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);
1339      inputFiles.add(inputPath.getName());
1340    }
1341
1342    // some of the input files might already be deleted
1343    List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());
1344    for (HStoreFile sf : this.getStorefiles()) {
1345      if (inputFiles.contains(sf.getPath().getName())) {
1346        inputStoreFiles.add(sf);
1347      }
1348    }
1349
1350    // check whether we need to pick up the new files
1351    List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());
1352
1353    if (pickCompactionFiles) {
1354      for (HStoreFile sf : this.getStorefiles()) {
1355        compactionOutputs.remove(sf.getPath().getName());
1356      }
1357      for (String compactionOutput : compactionOutputs) {
1358        StoreFileInfo storeFileInfo =
1359          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1360        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
1361        outputStoreFiles.add(storeFile);
1362      }
1363    }
1364
1365    if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1366      LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles
1367        + " with output files : " + outputStoreFiles);
1368      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
1369      this.refreshStoreSizeAndTotalBytes();
1370    }
1371  }
1372
1373  @Override
1374  public boolean hasReferences() {
1375    // Grab the read lock here, because we need to ensure that: only when the atomic
1376    // replaceStoreFiles(..) finished, we can get all the complete store file list.
1377    this.storeEngine.readLock();
1378    try {
1379      // Merge the current store files with compacted files here due to HBASE-20940.
1380      Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
1381      allStoreFiles.addAll(getCompactedFiles());
1382      return StoreUtils.hasReferences(allStoreFiles);
1383    } finally {
1384      this.storeEngine.readUnlock();
1385    }
1386  }
1387
1388  /**
1389   * getter for CompactionProgress object
1390   * @return CompactionProgress object; can be null
1391   */
1392  public CompactionProgress getCompactionProgress() {
1393    return this.storeEngine.getCompactor().getProgress();
1394  }
1395
1396  @Override
1397  public boolean shouldPerformMajorCompaction() throws IOException {
1398    for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1399      // TODO: what are these reader checks all over the place?
1400      if (sf.getReader() == null) {
1401        LOG.debug("StoreFile {} has null Reader", sf);
1402        return false;
1403      }
1404    }
1405    return storeEngine.getCompactionPolicy()
1406      .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());
1407  }
1408
1409  public Optional<CompactionContext> requestCompaction() throws IOException {
1410    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
1411  }
1412
1413  public Optional<CompactionContext> requestCompaction(int priority,
1414    CompactionLifeCycleTracker tracker, User user) throws IOException {
1415    // don't even select for compaction if writes are disabled
1416    if (!this.areWritesEnabled()) {
1417      return Optional.empty();
1418    }
1419    // Before we do compaction, try to get rid of unneeded files to simplify things.
1420    removeUnneededFiles();
1421
1422    final CompactionContext compaction = storeEngine.createCompaction();
1423    CompactionRequestImpl request = null;
1424    this.storeEngine.readLock();
1425    try {
1426      synchronized (filesCompacting) {
1427        // First, see if coprocessor would want to override selection.
1428        if (this.getCoprocessorHost() != null) {
1429          final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1430          boolean override =
1431            getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user);
1432          if (override) {
1433            // Coprocessor is overriding normal file selection.
1434            compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
1435          }
1436        }
1437
1438        // Normal case - coprocessor is not overriding file selection.
1439        if (!compaction.hasSelection()) {
1440          boolean isUserCompaction = priority == Store.PRIORITY_USER;
1441          boolean mayUseOffPeak =
1442            offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true);
1443          try {
1444            compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak,
1445              forceMajor && filesCompacting.isEmpty());
1446          } catch (IOException e) {
1447            if (mayUseOffPeak) {
1448              offPeakCompactionTracker.set(false);
1449            }
1450            throw e;
1451          }
1452          assert compaction.hasSelection();
1453          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1454            // Compaction policy doesn't want to take advantage of off-peak.
1455            offPeakCompactionTracker.set(false);
1456          }
1457        }
1458        if (this.getCoprocessorHost() != null) {
1459          this.getCoprocessorHost().postCompactSelection(this,
1460            ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
1461            compaction.getRequest(), user);
1462        }
1463        // Finally, we have the resulting files list. Check if we have any files at all.
1464        request = compaction.getRequest();
1465        Collection<HStoreFile> selectedFiles = request.getFiles();
1466        if (selectedFiles.isEmpty()) {
1467          return Optional.empty();
1468        }
1469
1470        addToCompactingFiles(selectedFiles);
1471
1472        // If we're enqueuing a major, clear the force flag.
1473        this.forceMajor = this.forceMajor && !request.isMajor();
1474
1475        // Set common request properties.
1476        // Set priority, either override value supplied by caller or from store.
1477        final int compactionPriority =
1478          (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
1479        request.setPriority(compactionPriority);
1480
1481        if (request.isAfterSplit()) {
1482          // If the store belongs to recently splitted daughter regions, better we consider
1483          // them with the higher priority in the compaction queue.
1484          // Override priority if it is lower (higher int value) than
1485          // SPLIT_REGION_COMPACTION_PRIORITY
1486          final int splitHousekeepingPriority =
1487            Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
1488          request.setPriority(splitHousekeepingPriority);
1489          LOG.info(
1490            "Keeping/Overriding Compaction request priority to {} for CF {} since it"
1491              + " belongs to recently split daughter region {}",
1492            splitHousekeepingPriority, this.getColumnFamilyName(),
1493            getRegionInfo().getRegionNameAsString());
1494        }
1495        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1496        request.setTracker(tracker);
1497      }
1498    } finally {
1499      this.storeEngine.readUnlock();
1500    }
1501
1502    if (LOG.isDebugEnabled()) {
1503      LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1504        + (request.isAllFiles() ? " (all files)" : ""));
1505    }
1506    this.region.reportCompactionRequestStart(request.isMajor());
1507    return Optional.of(compaction);
1508  }
1509
1510  /** Adds the files to compacting files. filesCompacting must be locked. */
1511  private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
1512    if (CollectionUtils.isEmpty(filesToAdd)) {
1513      return;
1514    }
1515    // Check that we do not try to compact the same StoreFile twice.
1516    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1517      Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1518    }
1519    filesCompacting.addAll(filesToAdd);
1520    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1521  }
1522
1523  private void removeUnneededFiles() throws IOException {
1524    if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
1525      return;
1526    }
1527    if (getColumnFamilyDescriptor().getMinVersions() > 0) {
1528      LOG.debug("Skipping expired store file removal due to min version of {} being {}", this,
1529        getColumnFamilyDescriptor().getMinVersions());
1530      return;
1531    }
1532    this.storeEngine.readLock();
1533    Collection<HStoreFile> delSfs = null;
1534    try {
1535      synchronized (filesCompacting) {
1536        long cfTtl = getStoreFileTtl();
1537        if (cfTtl != Long.MAX_VALUE) {
1538          delSfs = storeEngine.getStoreFileManager()
1539            .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1540          addToCompactingFiles(delSfs);
1541        }
1542      }
1543    } finally {
1544      this.storeEngine.readUnlock();
1545    }
1546
1547    if (CollectionUtils.isEmpty(delSfs)) {
1548      return;
1549    }
1550
1551    Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
1552    replaceStoreFiles(delSfs, newFiles, true);
1553    refreshStoreSizeAndTotalBytes();
1554    LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this
1555      + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
1556  }
1557
1558  public void cancelRequestedCompaction(CompactionContext compaction) {
1559    finishCompactionRequest(compaction.getRequest());
1560  }
1561
1562  protected void finishCompactionRequest(CompactionRequestImpl cr) {
1563    this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1564    if (cr.isOffPeak()) {
1565      offPeakCompactionTracker.set(false);
1566      cr.setOffPeak(false);
1567    }
1568    synchronized (filesCompacting) {
1569      filesCompacting.removeAll(cr.getFiles());
1570    }
1571    // The tracker could be null, for example, we do not need to track the creation of store file
1572    // writer due to different implementation of SFT, or the compaction is canceled.
1573    if (cr.getWriterCreationTracker() != null) {
1574      storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
1575    }
1576  }
1577
1578  /**
1579   * Update counts.
1580   */
1581  protected void refreshStoreSizeAndTotalBytes() throws IOException {
1582    this.storeSize.set(0L);
1583    this.totalUncompressedBytes.set(0L);
1584    for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1585      StoreFileReader r = hsf.getReader();
1586      if (r == null) {
1587        LOG.warn("StoreFile {} has a null Reader", hsf);
1588        continue;
1589      }
1590      this.storeSize.addAndGet(r.length());
1591      this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1592    }
1593  }
1594
1595  /*
1596   * @param wantedVersions How many versions were asked for.
1597   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1598   */
1599  int versionsToReturn(final int wantedVersions) {
1600    if (wantedVersions <= 0) {
1601      throw new IllegalArgumentException("Number of versions must be > 0");
1602    }
1603    // Make sure we do not return more than maximum versions for this store.
1604    int maxVersions = getColumnFamilyDescriptor().getMaxVersions();
1605    return wantedVersions > maxVersions ? maxVersions : wantedVersions;
1606  }
1607
1608  @Override
1609  public boolean canSplit() {
1610    // Not split-able if we find a reference store file present in the store.
1611    boolean result = !hasReferences();
1612    if (!result) {
1613      LOG.trace("Not splittable; has references: {}", this);
1614    }
1615    return result;
1616  }
1617
1618  /**
1619   * Determines if Store should be split.
1620   */
1621  public Optional<byte[]> getSplitPoint() {
1622    this.storeEngine.readLock();
1623    try {
1624      // Should already be enforced by the split policy!
1625      assert !this.getRegionInfo().isMetaRegion();
1626      // Not split-able if we find a reference store file present in the store.
1627      if (hasReferences()) {
1628        LOG.trace("Not splittable; has references: {}", this);
1629        return Optional.empty();
1630      }
1631      return this.storeEngine.getStoreFileManager().getSplitPoint();
1632    } catch (IOException e) {
1633      LOG.warn("Failed getting store size for {}", this, e);
1634    } finally {
1635      this.storeEngine.readUnlock();
1636    }
1637    return Optional.empty();
1638  }
1639
1640  @Override
1641  public long getLastCompactSize() {
1642    return this.lastCompactSize;
1643  }
1644
1645  @Override
1646  public long getSize() {
1647    return storeSize.get();
1648  }
1649
1650  public void triggerMajorCompaction() {
1651    this.forceMajor = true;
1652  }
1653
1654  //////////////////////////////////////////////////////////////////////////////
1655  // File administration
1656  //////////////////////////////////////////////////////////////////////////////
1657
1658  /**
1659   * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
1660   * compaction.
1661   * @param scan       Scan to apply when scanning the stores
1662   * @param targetCols columns to scan
1663   * @return a scanner over the current key values
1664   * @throws IOException on failure
1665   */
1666  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
1667    throws IOException {
1668    storeEngine.readLock();
1669    try {
1670      ScanInfo scanInfo;
1671      if (this.getCoprocessorHost() != null) {
1672        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
1673      } else {
1674        scanInfo = getScanInfo();
1675      }
1676      return createScanner(scan, scanInfo, targetCols, readPt);
1677    } finally {
1678      storeEngine.readUnlock();
1679    }
1680  }
1681
1682  // HMobStore will override this method to return its own implementation.
1683  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
1684    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
1685    return scan.isReversed()
1686      ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
1687      : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
1688  }
1689
1690  /**
1691   * Recreates the scanners on the current list of active store file scanners
1692   * @param currentFileScanners    the current set of active store file scanners
1693   * @param cacheBlocks            cache the blocks or not
1694   * @param usePread               use pread or not
1695   * @param isCompaction           is the scanner for compaction
1696   * @param matcher                the scan query matcher
1697   * @param startRow               the scan's start row
1698   * @param includeStartRow        should the scan include the start row
1699   * @param stopRow                the scan's stop row
1700   * @param includeStopRow         should the scan include the stop row
1701   * @param readPt                 the read point of the current scane
1702   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
1703   * @return list of scanners recreated on the current Scanners
1704   */
1705  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
1706    boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
1707    byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1708    boolean includeMemstoreScanner) throws IOException {
1709    this.storeEngine.readLock();
1710    try {
1711      Map<String, HStoreFile> name2File =
1712        new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
1713      for (HStoreFile file : getStorefiles()) {
1714        name2File.put(file.getFileInfo().getActiveFileName(), file);
1715      }
1716      Collection<HStoreFile> compactedFiles = getCompactedFiles();
1717      for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
1718        name2File.put(file.getFileInfo().getActiveFileName(), file);
1719      }
1720      List<HStoreFile> filesToReopen = new ArrayList<>();
1721      for (KeyValueScanner kvs : currentFileScanners) {
1722        assert kvs.isFileScanner();
1723        if (kvs.peek() == null) {
1724          continue;
1725        }
1726        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
1727      }
1728      if (filesToReopen.isEmpty()) {
1729        return null;
1730      }
1731      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
1732        includeStartRow, stopRow, includeStopRow, readPt, false);
1733    } finally {
1734      this.storeEngine.readUnlock();
1735    }
1736  }
1737
1738  @Override
1739  public String toString() {
1740    return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName();
1741  }
1742
1743  @Override
1744  public int getStorefilesCount() {
1745    return this.storeEngine.getStoreFileManager().getStorefileCount();
1746  }
1747
1748  @Override
1749  public int getCompactedFilesCount() {
1750    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
1751  }
1752
1753  private LongStream getStoreFileAgeStream() {
1754    return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
1755      if (sf.getReader() == null) {
1756        LOG.warn("StoreFile {} has a null Reader", sf);
1757        return false;
1758      } else {
1759        return true;
1760      }
1761    }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())
1762      .map(t -> EnvironmentEdgeManager.currentTime() - t);
1763  }
1764
1765  @Override
1766  public OptionalLong getMaxStoreFileAge() {
1767    return getStoreFileAgeStream().max();
1768  }
1769
1770  @Override
1771  public OptionalLong getMinStoreFileAge() {
1772    return getStoreFileAgeStream().min();
1773  }
1774
1775  @Override
1776  public OptionalDouble getAvgStoreFileAge() {
1777    return getStoreFileAgeStream().average();
1778  }
1779
1780  @Override
1781  public long getNumReferenceFiles() {
1782    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1783      .filter(HStoreFile::isReference).count();
1784  }
1785
1786  @Override
1787  public long getNumHFiles() {
1788    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1789      .filter(HStoreFile::isHFile).count();
1790  }
1791
1792  @Override
1793  public long getStoreSizeUncompressed() {
1794    return this.totalUncompressedBytes.get();
1795  }
1796
1797  @Override
1798  public long getStorefilesSize() {
1799    // Include all StoreFiles
1800    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
1801      sf -> true);
1802  }
1803
1804  @Override
1805  public long getHFilesSize() {
1806    // Include only StoreFiles which are HFiles
1807    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
1808      HStoreFile::isHFile);
1809  }
1810
1811  private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
1812    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
1813      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
1814  }
1815
1816  @Override
1817  public long getStorefilesRootLevelIndexSize() {
1818    return getStorefilesFieldSize(StoreFileReader::indexSize);
1819  }
1820
1821  @Override
1822  public long getTotalStaticIndexSize() {
1823    return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);
1824  }
1825
1826  @Override
1827  public long getTotalStaticBloomSize() {
1828    return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);
1829  }
1830
1831  @Override
1832  public MemStoreSize getMemStoreSize() {
1833    return this.memstore.size();
1834  }
1835
1836  @Override
1837  public int getCompactPriority() {
1838    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1839    if (priority == PRIORITY_USER) {
1840      LOG.warn("Compaction priority is USER despite there being no user compaction");
1841    }
1842    return priority;
1843  }
1844
1845  public boolean throttleCompaction(long compactionSize) {
1846    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1847  }
1848
1849  public HRegion getHRegion() {
1850    return this.region;
1851  }
1852
1853  public RegionCoprocessorHost getCoprocessorHost() {
1854    return this.region.getCoprocessorHost();
1855  }
1856
1857  @Override
1858  public RegionInfo getRegionInfo() {
1859    return getRegionFileSystem().getRegionInfo();
1860  }
1861
1862  @Override
1863  public boolean areWritesEnabled() {
1864    return this.region.areWritesEnabled();
1865  }
1866
1867  @Override
1868  public long getSmallestReadPoint() {
1869    return this.region.getSmallestReadPoint();
1870  }
1871
1872  /**
1873   * Adds or replaces the specified KeyValues.
1874   * <p>
1875   * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
1876   * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
1877   * <p>
1878   * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
1879   * across all of them.
1880   * @param readpoint readpoint below which we can safely remove duplicate KVs
1881   */
1882  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
1883    throws IOException {
1884    this.storeEngine.readLock();
1885    try {
1886      this.memstore.upsert(cells, readpoint, memstoreSizing);
1887    } finally {
1888      this.storeEngine.readUnlock();
1889    }
1890  }
1891
1892  public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
1893    return new StoreFlusherImpl(cacheFlushId, tracker);
1894  }
1895
1896  private final class StoreFlusherImpl implements StoreFlushContext {
1897
1898    private final FlushLifeCycleTracker tracker;
1899    private final StoreFileWriterCreationTracker writerCreationTracker;
1900    private final long cacheFlushSeqNum;
1901    private MemStoreSnapshot snapshot;
1902    private List<Path> tempFiles;
1903    private List<Path> committedFiles;
1904    private long cacheFlushCount;
1905    private long cacheFlushSize;
1906    private long outputFileSize;
1907
1908    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
1909      this.cacheFlushSeqNum = cacheFlushSeqNum;
1910      this.tracker = tracker;
1911      this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
1912    }
1913
1914    /**
1915     * This is not thread safe. The caller should have a lock on the region or the store. If
1916     * necessary, the lock can be added with the patch provided in HBASE-10087
1917     */
1918    @Override
1919    public MemStoreSize prepare() {
1920      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
1921      this.snapshot = memstore.snapshot();
1922      this.cacheFlushCount = snapshot.getCellsCount();
1923      this.cacheFlushSize = snapshot.getDataSize();
1924      committedFiles = new ArrayList<>(1);
1925      return snapshot.getMemStoreSize();
1926    }
1927
1928    @Override
1929    public void flushCache(MonitoredTask status) throws IOException {
1930      RegionServerServices rsService = region.getRegionServerServices();
1931      ThroughputController throughputController =
1932        rsService == null ? null : rsService.getFlushThroughputController();
1933      // it could be null if we do not need to track the creation of store file writer due to
1934      // different SFT implementation.
1935      if (writerCreationTracker != null) {
1936        HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
1937      }
1938      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController,
1939        tracker, writerCreationTracker);
1940    }
1941
1942    @Override
1943    public boolean commit(MonitoredTask status) throws IOException {
1944      try {
1945        if (CollectionUtils.isEmpty(this.tempFiles)) {
1946          return false;
1947        }
1948        status.setStatus("Flushing " + this + ": reopening flushed file");
1949        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
1950        for (HStoreFile sf : storeFiles) {
1951          StoreFileReader r = sf.getReader();
1952          if (LOG.isInfoEnabled()) {
1953            LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
1954              cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
1955          }
1956          outputFileSize += r.length();
1957          storeSize.addAndGet(r.length());
1958          totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
1959          committedFiles.add(sf.getPath());
1960        }
1961
1962        flushedCellsCount.addAndGet(cacheFlushCount);
1963        flushedCellsSize.addAndGet(cacheFlushSize);
1964        flushedOutputFileSize.addAndGet(outputFileSize);
1965        // call coprocessor after we have done all the accounting above
1966        for (HStoreFile sf : storeFiles) {
1967          if (getCoprocessorHost() != null) {
1968            getCoprocessorHost().postFlush(HStore.this, sf, tracker);
1969          }
1970        }
1971        // Add new file to store files. Clear snapshot too while we have the Store write lock.
1972        return completeFlush(storeFiles, snapshot.getId());
1973      } finally {
1974        if (writerCreationTracker != null) {
1975          HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
1976        }
1977      }
1978    }
1979
1980    @Override
1981    public long getOutputFileSize() {
1982      return outputFileSize;
1983    }
1984
1985    @Override
1986    public List<Path> getCommittedFiles() {
1987      return committedFiles;
1988    }
1989
1990    /**
1991     * Similar to commit, but called in secondary region replicas for replaying the flush cache from
1992     * primary region. Adds the new files to the store, and drops the snapshot depending on
1993     * dropMemstoreSnapshot argument.
1994     * @param fileNames            names of the flushed files
1995     * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
1996     */
1997    @Override
1998    public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
1999      throws IOException {
2000      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
2001      for (String file : fileNames) {
2002        // open the file as a store file (hfile link, etc)
2003        StoreFileInfo storeFileInfo =
2004          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
2005        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
2006        storeFiles.add(storeFile);
2007        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
2008        HStore.this.totalUncompressedBytes
2009          .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
2010        if (LOG.isInfoEnabled()) {
2011          LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries()
2012            + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="
2013            + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
2014        }
2015      }
2016
2017      long snapshotId = -1; // -1 means do not drop
2018      if (dropMemstoreSnapshot && snapshot != null) {
2019        snapshotId = snapshot.getId();
2020      }
2021      HStore.this.completeFlush(storeFiles, snapshotId);
2022    }
2023
2024    /**
2025     * Abort the snapshot preparation. Drops the snapshot if any.
2026     */
2027    @Override
2028    public void abort() throws IOException {
2029      if (snapshot != null) {
2030        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
2031      }
2032    }
2033  }
2034
2035  @Override
2036  public boolean needsCompaction() {
2037    List<HStoreFile> filesCompactingClone = null;
2038    synchronized (filesCompacting) {
2039      filesCompactingClone = Lists.newArrayList(filesCompacting);
2040    }
2041    return this.storeEngine.needsCompaction(filesCompactingClone);
2042  }
2043
2044  /**
2045   * Used for tests.
2046   * @return cache configuration for this Store.
2047   */
2048  public CacheConfig getCacheConfig() {
2049    return storeContext.getCacheConf();
2050  }
2051
2052  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);
2053
2054  public static final long DEEP_OVERHEAD = ClassSize.align(
2055    FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP
2056      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD);
2057
2058  @Override
2059  public long heapSize() {
2060    MemStoreSize memstoreSize = this.memstore.size();
2061    return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();
2062  }
2063
2064  @Override
2065  public CellComparator getComparator() {
2066    return storeContext.getComparator();
2067  }
2068
2069  public ScanInfo getScanInfo() {
2070    return scanInfo;
2071  }
2072
2073  /**
2074   * Set scan info, used by test
2075   * @param scanInfo new scan info to use for test
2076   */
2077  void setScanInfo(ScanInfo scanInfo) {
2078    this.scanInfo = scanInfo;
2079  }
2080
2081  @Override
2082  public boolean hasTooManyStoreFiles() {
2083    return getStorefilesCount() > this.blockingFileCount;
2084  }
2085
2086  @Override
2087  public long getFlushedCellsCount() {
2088    return flushedCellsCount.get();
2089  }
2090
2091  @Override
2092  public long getFlushedCellsSize() {
2093    return flushedCellsSize.get();
2094  }
2095
2096  @Override
2097  public long getFlushedOutputFileSize() {
2098    return flushedOutputFileSize.get();
2099  }
2100
2101  @Override
2102  public long getCompactedCellsCount() {
2103    return compactedCellsCount.get();
2104  }
2105
2106  @Override
2107  public long getCompactedCellsSize() {
2108    return compactedCellsSize.get();
2109  }
2110
2111  @Override
2112  public long getMajorCompactedCellsCount() {
2113    return majorCompactedCellsCount.get();
2114  }
2115
2116  @Override
2117  public long getMajorCompactedCellsSize() {
2118    return majorCompactedCellsSize.get();
2119  }
2120
2121  public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
2122    if (isMajor) {
2123      majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2124      majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
2125    } else {
2126      compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
2127      compactedCellsSize.addAndGet(progress.totalCompactedSize);
2128    }
2129  }
2130
2131  /**
2132   * Returns the StoreEngine that is backing this concrete implementation of Store.
2133   * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
2134   */
2135  public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2136    return this.storeEngine;
2137  }
2138
2139  protected OffPeakHours getOffPeakHours() {
2140    return this.offPeakHours;
2141  }
2142
2143  @Override
2144  public void onConfigurationChange(Configuration conf) {
2145    Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(),
2146      getColumnFamilyDescriptor());
2147    this.conf = storeConf;
2148    this.storeEngine.compactionPolicy.setConf(storeConf);
2149    this.offPeakHours = OffPeakHours.getInstance(storeConf);
2150  }
2151
2152  /**
2153   * {@inheritDoc}
2154   */
2155  @Override
2156  public void registerChildren(ConfigurationManager manager) {
2157    // No children to register
2158  }
2159
2160  /**
2161   * {@inheritDoc}
2162   */
2163  @Override
2164  public void deregisterChildren(ConfigurationManager manager) {
2165    // No children to deregister
2166  }
2167
2168  @Override
2169  public double getCompactionPressure() {
2170    return storeEngine.getStoreFileManager().getCompactionPressure();
2171  }
2172
2173  @Override
2174  public boolean isPrimaryReplicaStore() {
2175    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
2176  }
2177
2178  /**
2179   * Sets the store up for a region level snapshot operation.
2180   * @see #postSnapshotOperation()
2181   */
2182  public void preSnapshotOperation() {
2183    archiveLock.lock();
2184  }
2185
2186  /**
2187   * Perform tasks needed after the completion of snapshot operation.
2188   * @see #preSnapshotOperation()
2189   */
2190  public void postSnapshotOperation() {
2191    archiveLock.unlock();
2192  }
2193
2194  /**
2195   * Closes and archives the compacted files under this store
2196   */
2197  public synchronized void closeAndArchiveCompactedFiles() throws IOException {
2198    // ensure other threads do not attempt to archive the same files on close()
2199    archiveLock.lock();
2200    try {
2201      storeEngine.readLock();
2202      Collection<HStoreFile> copyCompactedfiles = null;
2203      try {
2204        Collection<HStoreFile> compactedfiles =
2205          this.getStoreEngine().getStoreFileManager().getCompactedfiles();
2206        if (CollectionUtils.isNotEmpty(compactedfiles)) {
2207          // Do a copy under read lock
2208          copyCompactedfiles = new ArrayList<>(compactedfiles);
2209        } else {
2210          LOG.trace("No compacted files to archive");
2211        }
2212      } finally {
2213        storeEngine.readUnlock();
2214      }
2215      if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
2216        removeCompactedfiles(copyCompactedfiles, true);
2217      }
2218    } finally {
2219      archiveLock.unlock();
2220    }
2221  }
2222
2223  /**
2224   * Archives and removes the compacted files
2225   * @param compactedfiles The compacted files in this store that are not active in reads
2226   * @param evictOnClose   true if blocks should be evicted from the cache when an HFile reader is
2227   *                       closed, false if not
2228   */
2229  private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose)
2230    throws IOException {
2231    final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
2232    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
2233    for (final HStoreFile file : compactedfiles) {
2234      synchronized (file) {
2235        try {
2236          StoreFileReader r = file.getReader();
2237          if (r == null) {
2238            LOG.debug("The file {} was closed but still not archived", file);
2239            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
2240            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
2241            // for that.
2242            long length = getStoreFileSize(file);
2243            filesToRemove.add(file);
2244            storeFileSizes.add(length);
2245            continue;
2246          }
2247
2248          if (file.isCompactedAway() && !file.isReferencedInReads()) {
2249            // Even if deleting fails we need not bother as any new scanners won't be
2250            // able to use the compacted file as the status is already compactedAway
2251            LOG.trace("Closing and archiving the file {}", file);
2252            // Copy the file size before closing the reader
2253            final long length = r.length();
2254            r.close(evictOnClose);
2255            // Just close and return
2256            filesToRemove.add(file);
2257            // Only add the length if we successfully added the file to `filesToRemove`
2258            storeFileSizes.add(length);
2259          } else {
2260            LOG.info("Can't archive compacted file " + file.getPath()
2261              + " because of either isCompactedAway=" + file.isCompactedAway()
2262              + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()
2263              + ", refCount=" + r.getRefCount() + ", skipping for now.");
2264          }
2265        } catch (Exception e) {
2266          LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
2267            e);
2268        }
2269      }
2270    }
2271    if (this.isPrimaryReplicaStore()) {
2272      // Only the primary region is allowed to move the file to archive.
2273      // The secondary region does not move the files to archive. Any active reads from
2274      // the secondary region will still work because the file as such has active readers on it.
2275      if (!filesToRemove.isEmpty()) {
2276        LOG.debug("Moving the files {} to archive", filesToRemove);
2277        // Only if this is successful it has to be removed
2278        try {
2279          getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
2280            filesToRemove);
2281        } catch (FailedArchiveException fae) {
2282          // Even if archiving some files failed, we still need to clear out any of the
2283          // files which were successfully archived. Otherwise we will receive a
2284          // FileNotFoundException when we attempt to re-archive them in the next go around.
2285          Collection<Path> failedFiles = fae.getFailedFiles();
2286          Iterator<HStoreFile> iter = filesToRemove.iterator();
2287          Iterator<Long> sizeIter = storeFileSizes.iterator();
2288          while (iter.hasNext()) {
2289            sizeIter.next();
2290            if (failedFiles.contains(iter.next().getPath())) {
2291              iter.remove();
2292              sizeIter.remove();
2293            }
2294          }
2295          if (!filesToRemove.isEmpty()) {
2296            clearCompactedfiles(filesToRemove);
2297          }
2298          throw fae;
2299        }
2300      }
2301    }
2302    if (!filesToRemove.isEmpty()) {
2303      // Clear the compactedfiles from the store file manager
2304      clearCompactedfiles(filesToRemove);
2305      // Try to send report of this archival to the Master for updating quota usage faster
2306      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
2307    }
2308  }
2309
2310  /**
2311   * Computes the length of a store file without succumbing to any errors along the way. If an error
2312   * is encountered, the implementation returns {@code 0} instead of the actual size.
2313   * @param file The file to compute the size of.
2314   * @return The size in bytes of the provided {@code file}.
2315   */
2316  long getStoreFileSize(HStoreFile file) {
2317    long length = 0;
2318    try {
2319      file.initReader();
2320      length = file.getReader().length();
2321    } catch (IOException e) {
2322      LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring",
2323        file, e);
2324    } finally {
2325      try {
2326        file.closeStoreFile(
2327          file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
2328      } catch (IOException e) {
2329        LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file,
2330          e);
2331      }
2332    }
2333    return length;
2334  }
2335
2336  public Long preFlushSeqIDEstimation() {
2337    return memstore.preFlushSeqIDEstimation();
2338  }
2339
2340  @Override
2341  public boolean isSloppyMemStore() {
2342    return this.memstore.isSloppy();
2343  }
2344
2345  private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
2346    LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
2347    storeEngine.removeCompactedFiles(filesToRemove);
2348  }
2349
2350  @Override
2351  public int getCurrentParallelPutCount() {
2352    return currentParallelPutCount.get();
2353  }
2354
2355  public int getStoreRefCount() {
2356    return this.storeEngine.getStoreFileManager().getStorefiles().stream()
2357      .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2358      .mapToInt(HStoreFile::getRefCount).sum();
2359  }
2360
2361  /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */
2362  public int getMaxCompactedStoreFileRefCount() {
2363    OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
2364      .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)
2365      .mapToInt(HStoreFile::getRefCount).max();
2366    return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0;
2367  }
2368
2369  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
2370    // Sanity check from the caller
2371    if (archivedFiles.size() != fileSizes.size()) {
2372      throw new RuntimeException("Coding error: should never see lists of varying size");
2373    }
2374    RegionServerServices rss = this.region.getRegionServerServices();
2375    if (rss == null) {
2376      return;
2377    }
2378    List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
2379    Iterator<Long> fileSizeIter = fileSizes.iterator();
2380    for (StoreFile storeFile : archivedFiles) {
2381      final long fileSize = fileSizeIter.next();
2382      if (storeFile.isHFile() && fileSize != 0) {
2383        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
2384      }
2385    }
2386    if (LOG.isTraceEnabled()) {
2387      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
2388        + filesWithSizes);
2389    }
2390    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
2391    if (!success) {
2392      LOG.warn("Failed to report archival of files: " + filesWithSizes);
2393    }
2394  }
2395
2396  @Override
2397  public long getMemstoreOnlyRowReadsCount() {
2398    return memstoreOnlyRowReadsCount.sum();
2399  }
2400
2401  @Override
2402  public long getMixedRowReadsCount() {
2403    return mixedRowReadsCount.sum();
2404  }
2405
2406  @Override
2407  public Configuration getReadOnlyConfiguration() {
2408    return new ReadOnlyConfiguration(this.conf);
2409  }
2410
2411  void updateMetricsStore(boolean memstoreRead) {
2412    if (memstoreRead) {
2413      memstoreOnlyRowReadsCount.increment();
2414    } else {
2415      mixedRowReadsCount.increment();
2416    }
2417  }
2418
2419  /**
2420   * Return the storefiles which are currently being written to. Mainly used by
2421   * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
2422   * SFT yet.
2423   */
2424  public Set<Path> getStoreFilesBeingWritten() {
2425    return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())
2426      .collect(Collectors.toSet());
2427  }
2428
2429  @Override
2430  public long getBloomFilterRequestsCount() {
2431    return storeEngine.getBloomFilterMetrics().getRequestsCount();
2432  }
2433
2434  @Override
2435  public long getBloomFilterNegativeResultsCount() {
2436    return storeEngine.getBloomFilterMetrics().getNegativeResultsCount();
2437  }
2438
2439  @Override
2440  public long getBloomFilterEligibleRequestsCount() {
2441    return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount();
2442  }
2443}