001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.Random;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the
028 * level of redundancy in the data. Adaptive triggers redundancy elimination only for those
029 * stores where positive impact is expected.
030 *
031 * Adaptive uses two parameters to determine whether to perform redundancy elimination.
032 * The first parameter, u, estimates the ratio of unique keys in the memory store based on the
033 * fraction of unique keys encountered during the previous merge of segment indices.
034 * The second is the perceived probability (compactionProbability) that the store can benefit from
035 * redundancy elimination. Initially, compactionProbability=0.5; it then grows exponentially by
036 * 2% whenever a compaction is successful and decreased by 2% whenever a compaction did not meet
037 * the expectation. It is reset back to the default value (namely 0.5) upon disk flush.
038 *
039 * Adaptive triggers redundancy elimination with probability compactionProbability if the
040 * fraction of redundant keys 1-u exceeds a parameter threshold compactionThreshold.
041 */
042@InterfaceAudience.Private
043public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
044  private static final String NAME = "ADAPTIVE";
045  public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
046      "hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
047  private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
048  public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY =
049      "hbase.hregion.compacting.memstore.adaptive.compaction.probability";
050  private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5;
051  private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02;
052
053  private double compactionThreshold;
054  private double initialCompactionProbability;
055  private double compactionProbability;
056  private Random rand = new Random();
057  private double numCellsInVersionedList = 0;
058  private boolean compacted = false;
059
060  public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) {
061    super(conf, cfName);
062    compactionThreshold = conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY,
063        ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT);
064    initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY,
065        ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT);
066    resetStats();
067  }
068
069  @Override public Action getAction(VersionedSegmentsList versionedList) {
070    if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) {
071      double r = rand.nextDouble();
072      if(r < compactionProbability) {
073        numCellsInVersionedList = versionedList.getNumOfCells();
074        compacted = true;
075        return compact(versionedList,
076            getName() + " (compaction probability=" + compactionProbability + ")");
077      }
078    }
079    compacted = false;
080    return simpleMergeOrFlatten(versionedList,
081        getName() + " (compaction probability=" + compactionProbability + ")");
082  }
083
084  @Override
085  public void updateStats(Segment replacement) {
086    if(compacted) {
087      if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) {
088        // compaction was a good decision - increase probability
089        compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR;
090        if(compactionProbability > 1.0) {
091          compactionProbability = 1.0;
092        }
093      } else {
094        // compaction was NOT a good decision - decrease probability
095        compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR;
096      }
097    }
098  }
099
100  @Override
101  public void resetStats() {
102    compactionProbability = initialCompactionProbability;
103  }
104
105  @Override
106  protected Action getMergingAction() {
107    return Action.MERGE_COUNT_UNIQUE_KEYS;
108  }
109
110  @Override
111  protected Action getFlattenAction() {
112    return Action.FLATTEN;
113  }
114
115  @Override
116  protected  String getName() {
117    return NAME;
118  }
119}