001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentSkipListMap; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.RegionTooBusyException; 027import org.apache.hadoop.hbase.regionserver.Region; 028import org.apache.hadoop.hbase.regionserver.Store; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it 038 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with 039 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM 040 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, 041 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. 042 * <p> 043 * There are three key parameters: 044 * <p> 045 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this 046 * threshold, the HotProtector will work, 100 by default 047 * <p> 048 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at 049 * the same time. 050 * <p> 051 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to 052 * prepare writing puts to a Store at the same time. 053 * <p> 054 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and 055 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not 056 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or 057 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. 058 * <p> 059 * This protector is enabled by default and could be turned off by setting 060 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. 061 */ 062@InterfaceAudience.Private 063public class StoreHotnessProtector { 064 private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class); 065 private volatile int parallelPutToStoreThreadLimit; 066 067 private volatile int parallelPreparePutToStoreThreadLimit; 068 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = 069 "hbase.region.store.parallel.put.limit"; 070 public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 071 "hbase.region.store.parallel.prepare.put.multiplier"; 072 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; 073 private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; 074 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = 075 "hbase.region.store.parallel.put.limit.min.column.count"; 076 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; 077 private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; 078 079 private final Map<byte[], AtomicInteger> preparePutToStoreMap = 080 new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); 081 private final Region region; 082 083 public StoreHotnessProtector(Region region, Configuration conf) { 084 init(conf); 085 this.region = region; 086 } 087 088 public void init(Configuration conf) { 089 this.parallelPutToStoreThreadLimit = 090 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); 091 this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 092 DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; 093 this.parallelPutToStoreThreadLimitCheckMinColumnCount = 094 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 095 DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); 096 097 } 098 099 public void update(Configuration conf) { 100 init(conf); 101 preparePutToStoreMap.clear(); 102 LOG.debug("update config: " + toString()); 103 } 104 105 public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { 106 if (!isEnable()) { 107 return; 108 } 109 110 String tooBusyStore = null; 111 112 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 113 Store store = this.region.getStore(e.getKey()); 114 if (store == null || e.getValue() == null) { 115 continue; 116 } 117 118 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 119 120 //we need to try to add #preparePutCount at first because preparePutToStoreMap will be 121 //cleared when changing the configuration. 122 preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); 123 AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); 124 if (preparePutCounter == null) { 125 preparePutCounter = new AtomicInteger(); 126 preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); 127 } 128 int preparePutCount = preparePutCounter.incrementAndGet(); 129 if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit 130 || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { 131 tooBusyStore = (tooBusyStore == null ? 132 store.getColumnFamilyName() : 133 tooBusyStore + "," + store.getColumnFamilyName()); 134 } 135 136 if (LOG.isTraceEnabled()) { 137 LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount 138 + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); 139 } 140 } 141 } 142 143 if (tooBusyStore != null) { 144 String msg = 145 "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore 146 + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; 147 if (LOG.isTraceEnabled()) { 148 LOG.trace(msg); 149 } 150 throw new RegionTooBusyException(msg); 151 } 152 } 153 154 public void finish(Map<byte[], List<Cell>> familyMaps) { 155 if (!isEnable()) { 156 return; 157 } 158 159 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 160 Store store = this.region.getStore(e.getKey()); 161 if (store == null || e.getValue() == null) { 162 continue; 163 } 164 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 165 AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); 166 // preparePutToStoreMap will be cleared when changing the configuration, so it may turn 167 // into a negative value. It will be not accuracy in a short time, it's a trade-off for 168 // performance. 169 if (counter != null && counter.decrementAndGet() < 0) { 170 counter.incrementAndGet(); 171 } 172 } 173 } 174 } 175 176 public String toString() { 177 return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" 178 + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" 179 + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" 180 + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ? 181 "enable" : 182 "disable"); 183 } 184 185 public boolean isEnable() { 186 // feature is enabled when parallelPutToStoreThreadLimit > 0 187 return this.parallelPutToStoreThreadLimit > 0; 188 } 189 190 Map<byte[], AtomicInteger> getPreparePutToStoreMap() { 191 return preparePutToStoreMap; 192 } 193 194 public static final long FIXED_SIZE = 195 ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); 196}