001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.concurrent.ThreadLocalRandom;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.yetus.audience.InterfaceAudience;
023
024/**
025 * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the level
026 * of redundancy in the data. Adaptive triggers redundancy elimination only for those stores where
027 * positive impact is expected. Adaptive uses two parameters to determine whether to perform
028 * redundancy elimination. The first parameter, u, estimates the ratio of unique keys in the memory
029 * store based on the fraction of unique keys encountered during the previous merge of segment
030 * indices. The second is the perceived probability (compactionProbability) that the store can
031 * benefit from redundancy elimination. Initially, compactionProbability=0.5; it then grows
032 * exponentially by 2% whenever a compaction is successful and decreased by 2% whenever a compaction
033 * did not meet the expectation. It is reset back to the default value (namely 0.5) upon disk flush.
034 * Adaptive triggers redundancy elimination with probability compactionProbability if the fraction
035 * of redundant keys 1-u exceeds a parameter threshold compactionThreshold.
036 */
037@InterfaceAudience.Private
038public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy {
039  private static final String NAME = "ADAPTIVE";
040  public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
041    "hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
042  private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
043  public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY =
044    "hbase.hregion.compacting.memstore.adaptive.compaction.probability";
045  private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5;
046  private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02;
047
048  private double compactionThreshold;
049  private double initialCompactionProbability;
050  private double compactionProbability;
051  private double numCellsInVersionedList = 0;
052  private boolean compacted = false;
053
054  public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) {
055    super(conf, cfName);
056    compactionThreshold =
057      conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY, ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT);
058    initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY,
059      ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT);
060    resetStats();
061  }
062
063  @Override
064  public Action getAction(VersionedSegmentsList versionedList) {
065    if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) {
066      double r = ThreadLocalRandom.current().nextDouble();
067      if (r < compactionProbability) {
068        numCellsInVersionedList = versionedList.getNumOfCells();
069        compacted = true;
070        return compact(versionedList,
071          getName() + " (compaction probability=" + compactionProbability + ")");
072      }
073    }
074    compacted = false;
075    return simpleMergeOrFlatten(versionedList,
076      getName() + " (compaction probability=" + compactionProbability + ")");
077  }
078
079  @Override
080  public void updateStats(Segment replacement) {
081    if (compacted) {
082      if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) {
083        // compaction was a good decision - increase probability
084        compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR;
085        if (compactionProbability > 1.0) {
086          compactionProbability = 1.0;
087        }
088      } else {
089        // compaction was NOT a good decision - decrease probability
090        compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR;
091      }
092    }
093  }
094
095  @Override
096  public void resetStats() {
097    compactionProbability = initialCompactionProbability;
098  }
099
100  @Override
101  protected Action getMergingAction() {
102    return Action.MERGE_COUNT_UNIQUE_KEYS;
103  }
104
105  @Override
106  protected Action getFlattenAction() {
107    return Action.FLATTEN;
108  }
109
110  @Override
111  protected String getName() {
112    return NAME;
113  }
114}