001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.ConcurrentSkipListMap; 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.RegionTooBusyException; 028import org.apache.hadoop.hbase.regionserver.Region; 029import org.apache.hadoop.hbase.regionserver.Store; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ClassSize; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it 038 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with 039 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM 040 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, 041 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. 042 * <p> 043 * There are three key parameters: 044 * <p> 045 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this 046 * threshold, the HotProtector will work, 100 by default 047 * <p> 048 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at 049 * the same time. 050 * <p> 051 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to prepare writing 052 * puts to a Store at the same time. 053 * <p> 054 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and 055 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not 056 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or 057 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. 058 * <p> 059 * This protector is enabled by default and could be turned off by setting 060 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. 061 */ 062@InterfaceAudience.Private 063public class StoreHotnessProtector { 064 private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class); 065 066 private static volatile boolean loggedDisableMessage; 067 068 private volatile int parallelPutToStoreThreadLimit; 069 070 private volatile int parallelPreparePutToStoreThreadLimit; 071 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = 072 "hbase.region.store.parallel.put.limit"; 073 public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 074 "hbase.region.store.parallel.prepare.put.multiplier"; 075 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 0; 076 private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; 077 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = 078 "hbase.region.store.parallel.put.limit.min.column.count"; 079 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; 080 private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; 081 082 private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap = 083 new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); 084 private final Region region; 085 086 public StoreHotnessProtector(Region region, Configuration conf) { 087 init(conf); 088 this.region = region; 089 } 090 091 public void init(Configuration conf) { 092 this.parallelPutToStoreThreadLimit = 093 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); 094 this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 095 DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; 096 this.parallelPutToStoreThreadLimitCheckMinColumnCount = 097 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 098 DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); 099 100 if (!isEnable()) { 101 logDisabledMessageOnce(); 102 } 103 } 104 105 /** 106 * {@link #init(Configuration)} is called for every Store that opens on a RegionServer. Here we 107 * make a lightweight attempt to log this message once per RegionServer, rather than per-Store. 108 * The goal is just to draw attention to this feature if debugging overload due to heavy writes. 109 */ 110 private static void logDisabledMessageOnce() { 111 if (!loggedDisableMessage) { 112 LOG.info( 113 "StoreHotnessProtector is disabled. Set {} > 0 to enable, " 114 + "which may help mitigate load under heavy write pressure.", 115 PARALLEL_PUT_STORE_THREADS_LIMIT); 116 loggedDisableMessage = true; 117 } 118 } 119 120 public void update(Configuration conf) { 121 init(conf); 122 preparePutToStoreMap.clear(); 123 LOG.debug("update config: {}", this); 124 } 125 126 public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { 127 if (!isEnable()) { 128 return; 129 } 130 131 StringBuilder tooBusyStore = new StringBuilder(); 132 boolean aboveParallelThreadLimit = false; 133 boolean aboveParallelPrePutLimit = false; 134 135 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 136 Store store = this.region.getStore(e.getKey()); 137 if (store == null || e.getValue() == null) { 138 continue; 139 } 140 141 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 142 143 // we need to try to add #preparePutCount at first because preparePutToStoreMap will be 144 // cleared when changing the configuration. 145 int preparePutCount = preparePutToStoreMap 146 .computeIfAbsent(e.getKey(), key -> new AtomicInteger()).incrementAndGet(); 147 boolean storeAboveThread = 148 store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit; 149 boolean storeAbovePrePut = preparePutCount > this.parallelPreparePutToStoreThreadLimit; 150 if (storeAboveThread || storeAbovePrePut) { 151 if (tooBusyStore.length() > 0) { 152 tooBusyStore.append(','); 153 } 154 tooBusyStore.append(store.getColumnFamilyName()); 155 } 156 aboveParallelThreadLimit |= storeAboveThread; 157 aboveParallelPrePutLimit |= storeAbovePrePut; 158 159 if (LOG.isTraceEnabled()) { 160 LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount 161 + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); 162 } 163 } 164 } 165 166 if (aboveParallelThreadLimit || aboveParallelPrePutLimit) { 167 String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" 168 + tooBusyStore + " Above " 169 + (aboveParallelThreadLimit 170 ? "parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")" 171 : "") 172 + (aboveParallelThreadLimit && aboveParallelPrePutLimit ? " or " : "") 173 + (aboveParallelPrePutLimit 174 ? "parallelPreparePutToStoreThreadLimit(" + this.parallelPreparePutToStoreThreadLimit 175 + ")" 176 : ""); 177 LOG.trace(msg); 178 throw new RegionTooBusyException(msg); 179 } 180 } 181 182 public void finish(Map<byte[], List<Cell>> familyMaps) { 183 if (!isEnable()) { 184 return; 185 } 186 187 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 188 Store store = this.region.getStore(e.getKey()); 189 if (store == null || e.getValue() == null) { 190 continue; 191 } 192 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 193 AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); 194 // preparePutToStoreMap will be cleared when changing the configuration, so it may turn 195 // into a negative value. It will be not accuracy in a short time, it's a trade-off for 196 // performance. 197 if (counter != null && counter.decrementAndGet() < 0) { 198 counter.incrementAndGet(); 199 } 200 } 201 } 202 } 203 204 public String toString() { 205 return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" 206 + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" 207 + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" 208 + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " 209 + (this.isEnable() ? "enable" : "disable"); 210 } 211 212 public boolean isEnable() { 213 // feature is enabled when parallelPutToStoreThreadLimit > 0 214 return this.parallelPutToStoreThreadLimit > 0; 215 } 216 217 Map<byte[], AtomicInteger> getPreparePutToStoreMap() { 218 return preparePutToStoreMap; 219 } 220 221 public static final long FIXED_SIZE = 222 ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); 223}