001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.throttle;
019
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ConcurrentSkipListMap;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.RegionTooBusyException;
027import org.apache.hadoop.hbase.regionserver.Region;
028import org.apache.hadoop.hbase.regionserver.Store;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
038 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
039 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
040 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
041 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
042 * <p>
043 * There are three key parameters:
044 * <p>
045 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
046 * threshold, the HotProtector will work, 100 by default
047 * <p>
048 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
049 * the same time.
050 * <p>
051 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
052 * prepare writing puts to a Store at the same time.
053 * <p>
054 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
055 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
056 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
057 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
058 * <p>
059 * This protector is enabled by default and could be turned off by setting
060 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
061 */
062@InterfaceAudience.Private
063public class StoreHotnessProtector {
064  private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class);
065  private volatile int parallelPutToStoreThreadLimit;
066
067  private volatile int parallelPreparePutToStoreThreadLimit;
068  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
069      "hbase.region.store.parallel.put.limit";
070  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
071      "hbase.region.store.parallel.prepare.put.multiplier";
072  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
073  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
074  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
075      "hbase.region.store.parallel.put.limit.min.column.count";
076  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
077  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
078
079  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
080      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
081  private final Region region;
082
083  public StoreHotnessProtector(Region region, Configuration conf) {
084    init(conf);
085    this.region = region;
086  }
087
088  public void init(Configuration conf) {
089    this.parallelPutToStoreThreadLimit =
090        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
091    this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
092        DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
093    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
094        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
095            DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
096
097  }
098
099  public void update(Configuration conf) {
100    init(conf);
101    preparePutToStoreMap.clear();
102    LOG.debug("update config: " + toString());
103  }
104
105  public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
106    if (!isEnable()) {
107      return;
108    }
109
110    String tooBusyStore = null;
111
112    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
113      Store store = this.region.getStore(e.getKey());
114      if (store == null || e.getValue() == null) {
115        continue;
116      }
117
118      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
119
120        //we need to try to add #preparePutCount at first because preparePutToStoreMap will be
121        //cleared when changing the configuration.
122        preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
123        AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
124        if (preparePutCounter == null) {
125          preparePutCounter = new AtomicInteger();
126          preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
127        }
128        int preparePutCount = preparePutCounter.incrementAndGet();
129        if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
130            || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
131          tooBusyStore = (tooBusyStore == null ?
132              store.getColumnFamilyName() :
133              tooBusyStore + "," + store.getColumnFamilyName());
134        }
135
136        if (LOG.isTraceEnabled()) {
137          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
138              + "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
139        }
140      }
141    }
142
143    if (tooBusyStore != null) {
144      String msg =
145          "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
146              + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
147      if (LOG.isTraceEnabled()) {
148        LOG.trace(msg);
149      }
150      throw new RegionTooBusyException(msg);
151    }
152  }
153
154  public void finish(Map<byte[], List<Cell>> familyMaps) {
155    if (!isEnable()) {
156      return;
157    }
158
159    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
160      Store store = this.region.getStore(e.getKey());
161      if (store == null || e.getValue() == null) {
162        continue;
163      }
164      if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
165        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
166        // preparePutToStoreMap will be cleared when changing the configuration, so it may turn
167        // into a negative value. It will be not accuracy in a short time, it's a trade-off for
168        // performance.
169        if (counter != null && counter.decrementAndGet() < 0) {
170          counter.incrementAndGet();
171        }
172      }
173    }
174  }
175
176  public String toString() {
177    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
178        + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
179        + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
180        + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
181        "enable" :
182        "disable");
183  }
184
185  public boolean isEnable() {
186    // feature is enabled when parallelPutToStoreThreadLimit > 0
187    return this.parallelPutToStoreThreadLimit > 0;
188  }
189
190  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
191    return preparePutToStoreMap;
192  }
193
194  public static final long FIXED_SIZE =
195      ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
196}