001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ConcurrentSkipListMap;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.RegionTooBusyException;
027import org.apache.hadoop.hbase.regionserver.Region;
028import org.apache.hadoop.hbase.regionserver.Store;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038/**
039 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
040 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
041 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
042 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
043 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
044 * <p>
045 * There are three key parameters:
046 * <p>
047 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
048 * threshold, the HotProtector will work, 100 by default
049 * <p>
050 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
051 * the same time.
052 * <p>
053 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
054 * prepare writing puts to a Store at the same time.
055 * <p>
056 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
057 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
058 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
059 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
060 * <p>
061 * This protector is enabled by default and could be turned off by setting
062 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
063 */
064@InterfaceAudience.Private
065public class StoreHotnessProtector {
066  private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class);
067  private volatile int parallelPutToStoreThreadLimit;
068
069  private volatile int parallelPreparePutToStoreThreadLimit;
070  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
071      "hbase.region.store.parallel.put.limit";
072  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
073      "hbase.region.store.parallel.prepare.put.multiplier";
074  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
075  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
076  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
077      "hbase.region.store.parallel.put.limit.min.column.count";
078  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
079  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
080
081  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
082      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
083  private final Region region;
084
085  public StoreHotnessProtector(Region region, Configuration conf) {
086    init(conf);
087    this.region = region;
088  }
089
090  public void init(Configuration conf) {
091    this.parallelPutToStoreThreadLimit =
092        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
093    this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
094        DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
095    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
096        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
097            DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
098
099  }
100
101  public void update(Configuration conf) {
102    init(conf);
103    preparePutToStoreMap.clear();
104    LOG.debug("update config: " + toString());
105  }
106
107  public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
108    if (!isEnable()) {
109      return;
110    }
111
112    String tooBusyStore = null;
113
114    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
115      Store store = this.region.getStore(e.getKey());
116      if (store == null || e.getValue() == null) {
117        continue;
118      }
119
120      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
121
122        //we need to try to add #preparePutCount at first because preparePutToStoreMap will be
123        //cleared when changing the configuration.
124        preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
125        AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
126        if (preparePutCounter == null) {
127          preparePutCounter = new AtomicInteger();
128          preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
129        }
130        int preparePutCount = preparePutCounter.incrementAndGet();
131        if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
132            || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
133          tooBusyStore = (tooBusyStore == null ?
134              store.getColumnFamilyName() :
135              tooBusyStore + "," + store.getColumnFamilyName());
136        }
137
138        if (LOG.isTraceEnabled()) {
139          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
140              + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
141        }
142      }
143    }
144
145    if (tooBusyStore != null) {
146      String msg =
147          "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
148              + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
149      if (LOG.isTraceEnabled()) {
150        LOG.trace(msg);
151      }
152      throw new RegionTooBusyException(msg);
153    }
154  }
155
156  public void finish(Map<byte[], List<Cell>> familyMaps) {
157    if (!isEnable()) {
158      return;
159    }
160
161    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
162      Store store = this.region.getStore(e.getKey());
163      if (store == null || e.getValue() == null) {
164        continue;
165      }
166      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
167        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
168        // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
169        // into a negative value. It will be not accuracy in a short time, it's a trade-off for
170        // performance.
171        if (counter != null && counter.decrementAndGet() < 0) {
172          counter.incrementAndGet();
173        }
174      }
175    }
176  }
177
178  public String toString() {
179    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
180        + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
181        + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
182        + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
183        "enable" :
184        "disable");
185  }
186
187  public boolean isEnable() {
188    // feature is enabled when parallelPutToStoreThreadLimit > 0
189    return this.parallelPutToStoreThreadLimit > 0;
190  }
191
192  @VisibleForTesting
193  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
194    return preparePutToStoreMap;
195  }
196
197  public static final long FIXED_SIZE =
198      ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
199}