001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentSkipListMap; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.RegionTooBusyException; 027import org.apache.hadoop.hbase.regionserver.Region; 028import org.apache.hadoop.hbase.regionserver.Store; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it 037 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with 038 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM 039 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, 040 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. 041 * <p> 042 * There are three key parameters: 043 * <p> 044 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this 045 * threshold, the HotProtector will work, 100 by default 046 * <p> 047 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at 048 * the same time. 049 * <p> 050 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to prepare writing 051 * puts to a Store at the same time. 052 * <p> 053 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and 054 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not 055 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or 056 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. 057 * <p> 058 * This protector is enabled by default and could be turned off by setting 059 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. 060 */ 061@InterfaceAudience.Private 062public class StoreHotnessProtector { 063 private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class); 064 065 private static volatile boolean loggedDisableMessage; 066 067 private volatile int parallelPutToStoreThreadLimit; 068 069 private volatile int parallelPreparePutToStoreThreadLimit; 070 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = 071 "hbase.region.store.parallel.put.limit"; 072 public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 073 "hbase.region.store.parallel.prepare.put.multiplier"; 074 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 0; 075 private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; 076 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = 077 "hbase.region.store.parallel.put.limit.min.column.count"; 078 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; 079 private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; 080 081 private final Map<byte[], AtomicInteger> preparePutToStoreMap = 082 new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); 083 private final Region region; 084 085 public StoreHotnessProtector(Region region, Configuration conf) { 086 init(conf); 087 this.region = region; 088 } 089 090 public void init(Configuration conf) { 091 this.parallelPutToStoreThreadLimit = 092 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); 093 this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 094 DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; 095 this.parallelPutToStoreThreadLimitCheckMinColumnCount = 096 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 097 DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); 098 099 if (!isEnable()) { 100 logDisabledMessageOnce(); 101 } 102 } 103 104 /** 105 * {@link #init(Configuration)} is called for every Store that opens on a RegionServer. Here we 106 * make a lightweight attempt to log this message once per RegionServer, rather than per-Store. 107 * The goal is just to draw attention to this feature if debugging overload due to heavy writes. 108 */ 109 private static void logDisabledMessageOnce() { 110 if (!loggedDisableMessage) { 111 LOG.info( 112 "StoreHotnessProtector is disabled. Set {} > 0 to enable, " 113 + "which may help mitigate load under heavy write pressure.", 114 PARALLEL_PUT_STORE_THREADS_LIMIT); 115 loggedDisableMessage = true; 116 } 117 } 118 119 public void update(Configuration conf) { 120 init(conf); 121 preparePutToStoreMap.clear(); 122 LOG.debug("update config: " + toString()); 123 } 124 125 public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { 126 if (!isEnable()) { 127 return; 128 } 129 130 String tooBusyStore = null; 131 boolean aboveParallelThreadLimit = false; 132 boolean aboveParallelPrePutLimit = false; 133 134 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 135 Store store = this.region.getStore(e.getKey()); 136 if (store == null || e.getValue() == null) { 137 continue; 138 } 139 140 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 141 142 // we need to try to add #preparePutCount at first because preparePutToStoreMap will be 143 // cleared when changing the configuration. 144 int preparePutCount = preparePutToStoreMap 145 .computeIfAbsent(e.getKey(), key -> new AtomicInteger()).incrementAndGet(); 146 boolean storeAboveThread = 147 store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit; 148 boolean storeAbovePrePut = preparePutCount > this.parallelPreparePutToStoreThreadLimit; 149 if (storeAboveThread || storeAbovePrePut) { 150 tooBusyStore = (tooBusyStore == null 151 ? store.getColumnFamilyName() 152 : tooBusyStore + "," + store.getColumnFamilyName()); 153 } 154 aboveParallelThreadLimit |= storeAboveThread; 155 aboveParallelPrePutLimit |= storeAbovePrePut; 156 157 if (LOG.isTraceEnabled()) { 158 LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount 159 + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); 160 } 161 } 162 } 163 164 if (aboveParallelThreadLimit || aboveParallelPrePutLimit) { 165 String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" 166 + tooBusyStore + " Above " 167 + (aboveParallelThreadLimit 168 ? "parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")" 169 : "") 170 + (aboveParallelThreadLimit && aboveParallelPrePutLimit ? " or " : "") 171 + (aboveParallelPrePutLimit 172 ? "parallelPreparePutToStoreThreadLimit(" + this.parallelPreparePutToStoreThreadLimit 173 + ")" 174 : ""); 175 LOG.trace(msg); 176 throw new RegionTooBusyException(msg); 177 } 178 } 179 180 public void finish(Map<byte[], List<Cell>> familyMaps) { 181 if (!isEnable()) { 182 return; 183 } 184 185 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 186 Store store = this.region.getStore(e.getKey()); 187 if (store == null || e.getValue() == null) { 188 continue; 189 } 190 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 191 AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); 192 // preparePutToStoreMap will be cleared when changing the configuration, so it may turn 193 // into a negative value. It will be not accuracy in a short time, it's a trade-off for 194 // performance. 195 if (counter != null && counter.decrementAndGet() < 0) { 196 counter.incrementAndGet(); 197 } 198 } 199 } 200 } 201 202 public String toString() { 203 return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" 204 + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" 205 + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" 206 + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " 207 + (this.isEnable() ? "enable" : "disable"); 208 } 209 210 public boolean isEnable() { 211 // feature is enabled when parallelPutToStoreThreadLimit > 0 212 return this.parallelPutToStoreThreadLimit > 0; 213 } 214 215 Map<byte[], AtomicInteger> getPreparePutToStoreMap() { 216 return preparePutToStoreMap; 217 } 218 219 public static final long FIXED_SIZE = 220 ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); 221}