001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.concurrent.ThreadLocalRandom; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.yetus.audience.InterfaceAudience; 023 024/** 025 * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the level 026 * of redundancy in the data. Adaptive triggers redundancy elimination only for those stores where 027 * positive impact is expected. Adaptive uses two parameters to determine whether to perform 028 * redundancy elimination. The first parameter, u, estimates the ratio of unique keys in the memory 029 * store based on the fraction of unique keys encountered during the previous merge of segment 030 * indices. The second is the perceived probability (compactionProbability) that the store can 031 * benefit from redundancy elimination. Initially, compactionProbability=0.5; it then grows 032 * exponentially by 2% whenever a compaction is successful and decreased by 2% whenever a compaction 033 * did not meet the expectation. It is reset back to the default value (namely 0.5) upon disk flush. 034 * Adaptive triggers redundancy elimination with probability compactionProbability if the fraction 035 * of redundant keys 1-u exceeds a parameter threshold compactionThreshold. 036 */ 037@InterfaceAudience.Private 038public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy { 039 private static final String NAME = "ADAPTIVE"; 040 public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY = 041 "hbase.hregion.compacting.memstore.adaptive.compaction.threshold"; 042 private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5; 043 public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY = 044 "hbase.hregion.compacting.memstore.adaptive.compaction.probability"; 045 private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5; 046 private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02; 047 048 private double compactionThreshold; 049 private double initialCompactionProbability; 050 private double compactionProbability; 051 private double numCellsInVersionedList = 0; 052 private boolean compacted = false; 053 054 public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) { 055 super(conf, cfName); 056 compactionThreshold = 057 conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY, ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT); 058 initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY, 059 ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT); 060 resetStats(); 061 } 062 063 @Override 064 public Action getAction(VersionedSegmentsList versionedList) { 065 if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) { 066 double r = ThreadLocalRandom.current().nextDouble(); 067 if (r < compactionProbability) { 068 numCellsInVersionedList = versionedList.getNumOfCells(); 069 compacted = true; 070 return compact(versionedList, 071 getName() + " (compaction probability=" + compactionProbability + ")"); 072 } 073 } 074 compacted = false; 075 return simpleMergeOrFlatten(versionedList, 076 getName() + " (compaction probability=" + compactionProbability + ")"); 077 } 078 079 @Override 080 public void updateStats(Segment replacement) { 081 if (compacted) { 082 if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) { 083 // compaction was a good decision - increase probability 084 compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR; 085 if (compactionProbability > 1.0) { 086 compactionProbability = 1.0; 087 } 088 } else { 089 // compaction was NOT a good decision - decrease probability 090 compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR; 091 } 092 } 093 } 094 095 @Override 096 public void resetStats() { 097 compactionProbability = initialCompactionProbability; 098 } 099 100 @Override 101 protected Action getMergingAction() { 102 return Action.MERGE_COUNT_UNIQUE_KEYS; 103 } 104 105 @Override 106 protected Action getFlattenAction() { 107 return Action.FLATTEN; 108 } 109 110 @Override 111 protected String getName() { 112 return NAME; 113 } 114}