001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.util.Arrays;
022import java.util.List;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.hbase.client.Mutation;
029import org.apache.hadoop.hbase.client.Result;
030
031@InterfaceAudience.Private
032@InterfaceStability.Evolving
033public class DefaultOperationQuota implements OperationQuota {
034  private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
035
036  private final List<QuotaLimiter> limiters;
037  private long writeAvailable = 0;
038  private long readAvailable = 0;
039  private long writeConsumed = 0;
040  private long readConsumed = 0;
041  private final long[] operationSize;
042
043  public DefaultOperationQuota(final QuotaLimiter... limiters) {
044    this(Arrays.asList(limiters));
045  }
046
047  /**
048   * NOTE: The order matters. It should be something like [user, table, namespace, global]
049   */
050  public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
051    this.limiters = limiters;
052    int size = OperationType.values().length;
053    operationSize = new long[size];
054
055    for (int i = 0; i < size; ++i) {
056      operationSize[i] = 0;
057    }
058  }
059
060  @Override
061  public void checkQuota(int numWrites, int numReads, int numScans)
062      throws RpcThrottlingException {
063    writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
064    readConsumed  = estimateConsume(OperationType.GET, numReads, 100);
065    readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
066
067    writeAvailable = Long.MAX_VALUE;
068    readAvailable = Long.MAX_VALUE;
069    for (final QuotaLimiter limiter: limiters) {
070      if (limiter.isBypass()) continue;
071
072      limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
073      readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
074      writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
075    }
076
077    for (final QuotaLimiter limiter: limiters) {
078      limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
079    }
080  }
081
082  @Override
083  public void close() {
084    // Adjust the quota consumed for the specified operation
085    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
086    long readDiff = operationSize[OperationType.GET.ordinal()] +
087        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
088
089    for (final QuotaLimiter limiter: limiters) {
090      if (writeDiff != 0) limiter.consumeWrite(writeDiff);
091      if (readDiff != 0) limiter.consumeRead(readDiff);
092    }
093  }
094
095  @Override
096  public long getReadAvailable() {
097    return readAvailable;
098  }
099
100  @Override
101  public long getWriteAvailable() {
102    return writeAvailable;
103  }
104
105  @Override
106  public void addGetResult(final Result result) {
107    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
108  }
109
110  @Override
111  public void addScanResult(final List<Result> results) {
112    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
113  }
114
115  @Override
116  public void addMutation(final Mutation mutation) {
117    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
118  }
119
120  private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
121    if (numReqs > 0) {
122      return avgSize * numReqs;
123    }
124    return 0;
125  }
126}