001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.quotas.OperationQuota;
036import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
037import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.BeforeAll;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Tag(MediumTests.TAG)
048@Tag(CoprocessorTests.TAG)
049public class TestRegionCoprocessorQuotaUsage {
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class);
052
053  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
054  private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage");
055  private static byte[] CF = Bytes.toBytes("CF");
056  private static byte[] CQ = Bytes.toBytes("CQ");
057  private static Connection CONN;
058  private static Table TABLE;
059  private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false);
060
061  public static class MyRegionObserver implements RegionObserver {
062    @Override
063    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
064      List<Cell> result) throws IOException {
065
066      // For the purposes of this test, we only need to catch a throttle happening once, then
067      // let future requests pass through so we don't make this test take any longer than necessary
068      LOG.info("Intercepting GetOp");
069      if (!THROTTLING_OCCURRED.get()) {
070        try {
071          c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
072            OperationQuota.OperationType.GET);
073          LOG.info("Request was not throttled");
074        } catch (RpcThrottlingException e) {
075          LOG.info("Intercepting was throttled");
076          THROTTLING_OCCURRED.set(true);
077          throw e;
078        }
079      }
080    }
081  }
082
083  public static class MyCoprocessor implements RegionCoprocessor {
084    RegionObserver observer = new MyRegionObserver();
085
086    @Override
087    public Optional<RegionObserver> getRegionObserver() {
088      return Optional.of(observer);
089    }
090  }
091
092  @BeforeAll
093  public static void setUp() throws Exception {
094    Configuration conf = UTIL.getConfiguration();
095    conf.setBoolean("hbase.quota.enabled", true);
096    conf.setInt("hbase.quota.default.user.machine.read.num", 1);
097    conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
098    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName());
099    UTIL.startMiniCluster(3);
100    byte[][] splitKeys = new byte[8][];
101    for (int i = 111; i < 999; i += 111) {
102      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
103    }
104    UTIL.createTable(TABLE_NAME, CF, splitKeys);
105    CONN = UTIL.getConnection();
106    TABLE = CONN.getTable(TABLE_NAME);
107    TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L)));
108  }
109
110  @AfterAll
111  public static void tearDown() throws Exception {
112    UTIL.shutdownMiniCluster();
113  }
114
115  @Test
116  public void testGet() throws InterruptedException, ExecutionException, IOException {
117    // Hit the table 5 times which ought to be enough to make a throttle happen
118    for (int i = 0; i < 5; i++) {
119      TABLE.get(new Get(Bytes.toBytes("000")));
120      if (THROTTLING_OCCURRED.get()) {
121        break;
122      }
123    }
124    assertTrue(THROTTLING_OCCURRED.get(), "Throttling did not happen as expected");
125  }
126}