001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentSkipListMap; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.RegionTooBusyException; 027import org.apache.hadoop.hbase.regionserver.Region; 028import org.apache.hadoop.hbase.regionserver.Store; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037 038/** 039 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it 040 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with 041 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM 042 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, 043 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. 044 * <p> 045 * There are three key parameters: 046 * <p> 047 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this 048 * threshold, the HotProtector will work, 100 by default 049 * <p> 050 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at 051 * the same time. 052 * <p> 053 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to 054 * prepare writing puts to a Store at the same time. 055 * <p> 056 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and 057 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not 058 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or 059 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. 060 * <p> 061 * This protector is enabled by default and could be turned off by setting 062 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. 063 */ 064@InterfaceAudience.Private 065public class StoreHotnessProtector { 066 private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class); 067 private volatile int parallelPutToStoreThreadLimit; 068 069 private volatile int parallelPreparePutToStoreThreadLimit; 070 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = 071 "hbase.region.store.parallel.put.limit"; 072 public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 073 "hbase.region.store.parallel.prepare.put.multiplier"; 074 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; 075 private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; 076 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = 077 "hbase.region.store.parallel.put.limit.min.column.count"; 078 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; 079 private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; 080 081 private final Map<byte[], AtomicInteger> preparePutToStoreMap = 082 new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); 083 private final Region region; 084 085 public StoreHotnessProtector(Region region, Configuration conf) { 086 init(conf); 087 this.region = region; 088 } 089 090 public void init(Configuration conf) { 091 this.parallelPutToStoreThreadLimit = 092 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); 093 this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 094 DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; 095 this.parallelPutToStoreThreadLimitCheckMinColumnCount = 096 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 097 DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); 098 099 } 100 101 public void update(Configuration conf) { 102 init(conf); 103 preparePutToStoreMap.clear(); 104 LOG.debug("update config: " + toString()); 105 } 106 107 public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { 108 if (!isEnable()) { 109 return; 110 } 111 112 String tooBusyStore = null; 113 114 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 115 Store store = this.region.getStore(e.getKey()); 116 if (store == null || e.getValue() == null) { 117 continue; 118 } 119 120 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 121 122 //we need to try to add #preparePutCount at first because preparePutToStoreMap will be 123 //cleared when changing the configuration. 124 preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); 125 AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); 126 if (preparePutCounter == null) { 127 preparePutCounter = new AtomicInteger(); 128 preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); 129 } 130 int preparePutCount = preparePutCounter.incrementAndGet(); 131 if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit 132 || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { 133 tooBusyStore = (tooBusyStore == null ? 134 store.getColumnFamilyName() : 135 tooBusyStore + "," + store.getColumnFamilyName()); 136 } 137 138 if (LOG.isTraceEnabled()) { 139 LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount 140 + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); 141 } 142 } 143 } 144 145 if (tooBusyStore != null) { 146 String msg = 147 "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore 148 + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; 149 if (LOG.isTraceEnabled()) { 150 LOG.trace(msg); 151 } 152 throw new RegionTooBusyException(msg); 153 } 154 } 155 156 public void finish(Map<byte[], List<Cell>> familyMaps) { 157 if (!isEnable()) { 158 return; 159 } 160 161 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 162 Store store = this.region.getStore(e.getKey()); 163 if (store == null || e.getValue() == null) { 164 continue; 165 } 166 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 167 AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); 168 // preparePutToStoreMap will be cleared when changing the configuration, so it may turn 169 // into a negative value. It will be not accuracy in a short time, it's a trade-off for 170 // performance. 171 if (counter != null && counter.decrementAndGet() < 0) { 172 counter.incrementAndGet(); 173 } 174 } 175 } 176 } 177 178 public String toString() { 179 return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" 180 + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" 181 + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" 182 + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ? 183 "enable" : 184 "disable"); 185 } 186 187 public boolean isEnable() { 188 // feature is enabled when parallelPutToStoreThreadLimit > 0 189 return this.parallelPutToStoreThreadLimit > 0; 190 } 191 192 @VisibleForTesting 193 Map<byte[], AtomicInteger> getPreparePutToStoreMap() { 194 return preparePutToStoreMap; 195 } 196 197 public static final long FIXED_SIZE = 198 ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); 199}