001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.HBaseInterfaceAudience; 022import org.apache.hadoop.hbase.ScheduledChore; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.hbase.regionserver.RegionServerServices; 027import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 028 029/** 030 * A throughput controller which uses the follow schema to limit throughput 031 * <ul> 032 * <li>If compaction pressure is greater than 1.0, no limitation.</li> 033 * <li>In off peak hours, use a fixed throughput limitation 034 * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li> 035 * <li>In normal hours, the max throughput is tuned between 036 * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and 037 * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower + 038 * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]</li> 039 * </ul> 040 * @see org.apache.hadoop.hbase.regionserver.HStore#getCompactionPressure() 041 */ 042@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 043public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController { 044 045 private final static Logger LOG = LoggerFactory 046 .getLogger(PressureAwareCompactionThroughputController.class); 047 048 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = 049 "hbase.hstore.compaction.throughput.higher.bound"; 050 051 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = 052 20L * 1024 * 1024; 053 054 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = 055 "hbase.hstore.compaction.throughput.lower.bound"; 056 057 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = 058 10L * 1024 * 1024; 059 060 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = 061 "hbase.hstore.compaction.throughput.offpeak"; 062 063 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE; 064 065 public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 066 "hbase.hstore.compaction.throughput.tune.period"; 067 068 private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; 069 070 // check compaction throughput every this size 071 private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL = 072 "hbase.hstore.compaction.throughput.control.check.interval"; 073 074 private long maxThroughputOffpeak; 075 076 @Override 077 public void setup(final RegionServerServices server) { 078 server.getChoreService().scheduleChore( 079 new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) { 080 081 @Override 082 protected void chore() { 083 tune(server.getCompactionPressure()); 084 } 085 }); 086 } 087 088 private void tune(double compactionPressure) { 089 double maxThroughputToSet; 090 if (compactionPressure > 1.0) { 091 // set to unlimited if some stores already reach the blocking store file count 092 maxThroughputToSet = Double.MAX_VALUE; 093 } else if (offPeakHours.isOffPeakHour()) { 094 maxThroughputToSet = maxThroughputOffpeak; 095 } else { 096 // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to 097 // calculate the throughput limitation. 098 maxThroughputToSet = 099 maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound) 100 * compactionPressure; 101 } 102 if (LOG.isTraceEnabled()) { 103 // TODO: FIX!!! Don't log unless some activity or a change in config. Making TRACE 104 // in the meantime. 105 LOG.trace("CompactionPressure is " + compactionPressure + ", tune throughput to " 106 + throughputDesc(maxThroughputToSet)); 107 } 108 this.setMaxThroughput(maxThroughputToSet); 109 } 110 111 @Override 112 public void setConf(Configuration conf) { 113 super.setConf(conf); 114 if (conf == null) { 115 return; 116 } 117 this.maxThroughputUpperBound = 118 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 119 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); 120 this.maxThroughputLowerBound = 121 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 122 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); 123 this.maxThroughputOffpeak = 124 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, 125 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); 126 this.offPeakHours = OffPeakHours.getInstance(conf); 127 this.controlPerSize = 128 conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL, 129 this.maxThroughputLowerBound); 130 this.setMaxThroughput(this.maxThroughputLowerBound); 131 this.tuningPeriod = 132 getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 133 DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); 134 LOG.info("Compaction throughput configurations, higher bound: " 135 + throughputDesc(maxThroughputUpperBound) + ", lower bound " 136 + throughputDesc(maxThroughputLowerBound) + ", off peak: " 137 + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); 138 } 139 140 @Override 141 public String toString() { 142 return "DefaultCompactionThroughputController [maxThroughput=" 143 + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size() 144 + "]"; 145 } 146 147 @Override 148 protected boolean skipControl(long deltaSize, long controlSize) { 149 if (deltaSize < controlSize) { 150 return true; 151 } else { 152 return false; 153 } 154 } 155}