001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.ConcurrentSkipListMap;
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.RegionTooBusyException;
028import org.apache.hadoop.hbase.regionserver.Region;
029import org.apache.hadoop.hbase.regionserver.Store;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ClassSize;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
038 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
039 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
040 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
041 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
042 * <p>
043 * There are three key parameters:
044 * <p>
045 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
046 * threshold, the HotProtector will work, 100 by default
047 * <p>
048 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
049 * the same time.
050 * <p>
051 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to prepare writing
052 * puts to a Store at the same time.
053 * <p>
054 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
055 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
056 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
057 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
058 * <p>
059 * This protector is enabled by default and could be turned off by setting
060 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
061 */
062@InterfaceAudience.Private
063public class StoreHotnessProtector {
064  private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class);
065
066  private static volatile boolean loggedDisableMessage;
067
068  private volatile int parallelPutToStoreThreadLimit;
069
070  private volatile int parallelPreparePutToStoreThreadLimit;
071  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
072    "hbase.region.store.parallel.put.limit";
073  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
074    "hbase.region.store.parallel.prepare.put.multiplier";
075  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 0;
076  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
077  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
078    "hbase.region.store.parallel.put.limit.min.column.count";
079  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
080  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
081
082  private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
083    new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
084  private final Region region;
085
086  public StoreHotnessProtector(Region region, Configuration conf) {
087    init(conf);
088    this.region = region;
089  }
090
091  public void init(Configuration conf) {
092    this.parallelPutToStoreThreadLimit =
093      conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
094    this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
095      DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
096    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
097      conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
098        DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
099
100    if (!isEnable()) {
101      logDisabledMessageOnce();
102    }
103  }
104
105  /**
106   * {@link #init(Configuration)} is called for every Store that opens on a RegionServer. Here we
107   * make a lightweight attempt to log this message once per RegionServer, rather than per-Store.
108   * The goal is just to draw attention to this feature if debugging overload due to heavy writes.
109   */
110  private static void logDisabledMessageOnce() {
111    if (!loggedDisableMessage) {
112      LOG.info(
113        "StoreHotnessProtector is disabled. Set {} > 0 to enable, "
114          + "which may help mitigate load under heavy write pressure.",
115        PARALLEL_PUT_STORE_THREADS_LIMIT);
116      loggedDisableMessage = true;
117    }
118  }
119
120  public void update(Configuration conf) {
121    init(conf);
122    preparePutToStoreMap.clear();
123    LOG.debug("update config: {}", this);
124  }
125
126  public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
127    if (!isEnable()) {
128      return;
129    }
130
131    StringBuilder tooBusyStore = new StringBuilder();
132    boolean aboveParallelThreadLimit = false;
133    boolean aboveParallelPrePutLimit = false;
134
135    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
136      Store store = this.region.getStore(e.getKey());
137      if (store == null || e.getValue() == null) {
138        continue;
139      }
140
141      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
142
143        // we need to try to add #preparePutCount at first because preparePutToStoreMap will be
144        // cleared when changing the configuration.
145        int preparePutCount = preparePutToStoreMap
146          .computeIfAbsent(e.getKey(), key -> new AtomicInteger()).incrementAndGet();
147        boolean storeAboveThread =
148          store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit;
149        boolean storeAbovePrePut = preparePutCount > this.parallelPreparePutToStoreThreadLimit;
150        if (storeAboveThread || storeAbovePrePut) {
151          if (tooBusyStore.length() > 0) {
152            tooBusyStore.append(',');
153          }
154          tooBusyStore.append(store.getColumnFamilyName());
155        }
156        aboveParallelThreadLimit |= storeAboveThread;
157        aboveParallelPrePutLimit |= storeAbovePrePut;
158
159        if (LOG.isTraceEnabled()) {
160          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
161            + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
162        }
163      }
164    }
165
166    if (aboveParallelThreadLimit || aboveParallelPrePutLimit) {
167      String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":"
168        + tooBusyStore + " Above "
169        + (aboveParallelThreadLimit
170          ? "parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"
171          : "")
172        + (aboveParallelThreadLimit && aboveParallelPrePutLimit ? " or " : "")
173        + (aboveParallelPrePutLimit
174          ? "parallelPreparePutToStoreThreadLimit(" + this.parallelPreparePutToStoreThreadLimit
175            + ")"
176          : "");
177      LOG.trace(msg);
178      throw new RegionTooBusyException(msg);
179    }
180  }
181
182  public void finish(Map<byte[], List<Cell>> familyMaps) {
183    if (!isEnable()) {
184      return;
185    }
186
187    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
188      Store store = this.region.getStore(e.getKey());
189      if (store == null || e.getValue() == null) {
190        continue;
191      }
192      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
193        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
194        // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
195        // into a negative value. It will be not accuracy in a short time, it's a trade-off for
196        // performance.
197        if (counter != null && counter.decrementAndGet() < 0) {
198          counter.incrementAndGet();
199        }
200      }
201    }
202  }
203
204  public String toString() {
205    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
206      + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
207      + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
208      + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now "
209      + (this.isEnable() ? "enable" : "disable");
210  }
211
212  public boolean isEnable() {
213    // feature is enabled when parallelPutToStoreThreadLimit > 0
214    return this.parallelPutToStoreThreadLimit > 0;
215  }
216
217  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
218    return preparePutToStoreMap;
219  }
220
221  public static final long FIXED_SIZE =
222    ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
223}