001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.util.Arrays;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hadoop.hbase.client.Mutation;
030import org.apache.hadoop.hbase.client.Result;
031
032@InterfaceAudience.Private
033@InterfaceStability.Evolving
034public class DefaultOperationQuota implements OperationQuota {
035  private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
036
037  protected final List<QuotaLimiter> limiters;
038  private final long writeCapacityUnit;
039  private final long readCapacityUnit;
040
041  // the available read/write quota size in bytes
042  protected long writeAvailable = 0;
043  protected long readAvailable = 0;
044  // estimated quota
045  protected long writeConsumed = 0;
046  protected long readConsumed = 0;
047  protected long writeCapacityUnitConsumed = 0;
048  protected long readCapacityUnitConsumed = 0;
049  // real consumed quota
050  private final long[] operationSize;
051  // difference between estimated quota and real consumed quota used in close method
052  // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass
053  // of DefaultOperationQuota
054  protected long writeDiff = 0;
055  protected long readDiff = 0;
056  protected long writeCapacityUnitDiff = 0;
057  protected long readCapacityUnitDiff = 0;
058
059  public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
060    this(conf, Arrays.asList(limiters));
061  }
062
063  /**
064   * NOTE: The order matters. It should be something like [user, table, namespace, global]
065   */
066  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
067    this.writeCapacityUnit =
068        conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
069    this.readCapacityUnit =
070        conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
071    this.limiters = limiters;
072    int size = OperationType.values().length;
073    operationSize = new long[size];
074
075    for (int i = 0; i < size; ++i) {
076      operationSize[i] = 0;
077    }
078  }
079
080  @Override
081  public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
082    updateEstimateConsumeQuota(numWrites, numReads, numScans);
083
084    writeAvailable = Long.MAX_VALUE;
085    readAvailable = Long.MAX_VALUE;
086    for (final QuotaLimiter limiter : limiters) {
087      if (limiter.isBypass()) continue;
088
089      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
090        writeCapacityUnitConsumed, readCapacityUnitConsumed);
091      readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
092      writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
093    }
094
095    for (final QuotaLimiter limiter : limiters) {
096      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
097        writeCapacityUnitConsumed, readCapacityUnitConsumed);
098    }
099  }
100
101  @Override
102  public void close() {
103    // Adjust the quota consumed for the specified operation
104    writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
105    readDiff = operationSize[OperationType.GET.ordinal()]
106        + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
107    writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
108      operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
109    readCapacityUnitDiff = calculateReadCapacityUnitDiff(
110      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
111      readConsumed);
112
113    for (final QuotaLimiter limiter : limiters) {
114      if (writeDiff != 0) {
115        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
116      }
117      if (readDiff != 0) {
118        limiter.consumeRead(readDiff, readCapacityUnitDiff);
119      }
120    }
121  }
122
123  @Override
124  public long getReadAvailable() {
125    return readAvailable;
126  }
127
128  @Override
129  public long getWriteAvailable() {
130    return writeAvailable;
131  }
132
133  @Override
134  public void addGetResult(final Result result) {
135    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
136  }
137
138  @Override
139  public void addScanResult(final List<Result> results) {
140    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
141  }
142
143  @Override
144  public void addMutation(final Mutation mutation) {
145    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
146  }
147
148  /**
149   * Update estimate quota(read/write size/capacityUnits) which will be consumed
150   * @param numWrites the number of write requests
151   * @param numReads the number of read requests
152   * @param numScans the number of scan requests
153   */
154  protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
155    writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
156    readConsumed = estimateConsume(OperationType.GET, numReads, 100);
157    readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
158
159    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
160    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
161  }
162
163  private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
164    if (numReqs > 0) {
165      return avgSize * numReqs;
166    }
167    return 0;
168  }
169
170  private long calculateWriteCapacityUnit(final long size) {
171    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
172  }
173
174  private long calculateReadCapacityUnit(final long size) {
175    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
176  }
177
178  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
179    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
180  }
181
182  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
183    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
184  }
185}