001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ConcurrentSkipListMap;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.RegionTooBusyException;
027import org.apache.hadoop.hbase.regionserver.Region;
028import org.apache.hadoop.hbase.regionserver.Store;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
037 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
038 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
039 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
040 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
041 * <p>
042 * There are three key parameters:
043 * <p>
044 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
045 * threshold, the HotProtector will work, 100 by default
046 * <p>
047 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
048 * the same time.
049 * <p>
050 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to prepare writing
051 * puts to a Store at the same time.
052 * <p>
053 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
054 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
055 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
056 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
057 * <p>
058 * This protector is enabled by default and could be turned off by setting
059 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
060 */
061@InterfaceAudience.Private
062public class StoreHotnessProtector {
063  private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class);
064
065  private static volatile boolean loggedDisableMessage;
066
067  private volatile int parallelPutToStoreThreadLimit;
068
069  private volatile int parallelPreparePutToStoreThreadLimit;
070  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
071    "hbase.region.store.parallel.put.limit";
072  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
073    "hbase.region.store.parallel.prepare.put.multiplier";
074  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 0;
075  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
076  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
077    "hbase.region.store.parallel.put.limit.min.column.count";
078  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
079  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
080
081  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
082    new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
083  private final Region region;
084
085  public StoreHotnessProtector(Region region, Configuration conf) {
086    init(conf);
087    this.region = region;
088  }
089
090  public void init(Configuration conf) {
091    this.parallelPutToStoreThreadLimit =
092      conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
093    this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
094      DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
095    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
096      conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
097        DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
098
099    if (!isEnable()) {
100      logDisabledMessageOnce();
101    }
102  }
103
104  /**
105   * {@link #init(Configuration)} is called for every Store that opens on a RegionServer. Here we
106   * make a lightweight attempt to log this message once per RegionServer, rather than per-Store.
107   * The goal is just to draw attention to this feature if debugging overload due to heavy writes.
108   */
109  private static void logDisabledMessageOnce() {
110    if (!loggedDisableMessage) {
111      LOG.info(
112        "StoreHotnessProtector is disabled. Set {} > 0 to enable, "
113          + "which may help mitigate load under heavy write pressure.",
114        PARALLEL_PUT_STORE_THREADS_LIMIT);
115      loggedDisableMessage = true;
116    }
117  }
118
119  public void update(Configuration conf) {
120    init(conf);
121    preparePutToStoreMap.clear();
122    LOG.debug("update config: " + toString());
123  }
124
125  public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
126    if (!isEnable()) {
127      return;
128    }
129
130    String tooBusyStore = null;
131    boolean aboveParallelThreadLimit = false;
132    boolean aboveParallelPrePutLimit = false;
133
134    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
135      Store store = this.region.getStore(e.getKey());
136      if (store == null || e.getValue() == null) {
137        continue;
138      }
139
140      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
141
142        // we need to try to add #preparePutCount at first because preparePutToStoreMap will be
143        // cleared when changing the configuration.
144        int preparePutCount = preparePutToStoreMap
145          .computeIfAbsent(e.getKey(), key -> new AtomicInteger()).incrementAndGet();
146        boolean storeAboveThread =
147          store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit;
148        boolean storeAbovePrePut = preparePutCount > this.parallelPreparePutToStoreThreadLimit;
149        if (storeAboveThread || storeAbovePrePut) {
150          tooBusyStore = (tooBusyStore == null
151            ? store.getColumnFamilyName()
152            : tooBusyStore + "," + store.getColumnFamilyName());
153        }
154        aboveParallelThreadLimit |= storeAboveThread;
155        aboveParallelPrePutLimit |= storeAbovePrePut;
156
157        if (LOG.isTraceEnabled()) {
158          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
159            + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
160        }
161      }
162    }
163
164    if (aboveParallelThreadLimit || aboveParallelPrePutLimit) {
165      String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":"
166        + tooBusyStore + " Above "
167        + (aboveParallelThreadLimit
168          ? "parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"
169          : "")
170        + (aboveParallelThreadLimit && aboveParallelPrePutLimit ? " or " : "")
171        + (aboveParallelPrePutLimit
172          ? "parallelPreparePutToStoreThreadLimit(" + this.parallelPreparePutToStoreThreadLimit
173            + ")"
174          : "");
175      LOG.trace(msg);
176      throw new RegionTooBusyException(msg);
177    }
178  }
179
180  public void finish(Map<byte[], List<Cell>> familyMaps) {
181    if (!isEnable()) {
182      return;
183    }
184
185    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
186      Store store = this.region.getStore(e.getKey());
187      if (store == null || e.getValue() == null) {
188        continue;
189      }
190      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
191        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
192        // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
193        // into a negative value. It will be not accuracy in a short time, it's a trade-off for
194        // performance.
195        if (counter != null && counter.decrementAndGet() < 0) {
196          counter.incrementAndGet();
197        }
198      }
199    }
200  }
201
202  public String toString() {
203    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
204      + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
205      + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
206      + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now "
207      + (this.isEnable() ? "enable" : "disable");
208  }
209
210  public boolean isEnable() {
211    // feature is enabled when parallelPutToStoreThreadLimit > 0
212    return this.parallelPutToStoreThreadLimit > 0;
213  }
214
215  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
216    return preparePutToStoreMap;
217  }
218
219  public static final long FIXED_SIZE =
220    ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
221}