001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.quotas.OperationQuota;
037import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
038import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ MediumTests.class, CoprocessorTests.class })
050public class TestRegionCoprocessorQuotaUsage {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class);
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class);
057
058  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
059  private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage");
060  private static byte[] CF = Bytes.toBytes("CF");
061  private static byte[] CQ = Bytes.toBytes("CQ");
062  private static Connection CONN;
063  private static Table TABLE;
064  private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false);
065
066  public static class MyRegionObserver implements RegionObserver {
067    @Override
068    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
069      List<Cell> result) throws IOException {
070
071      // For the purposes of this test, we only need to catch a throttle happening once, then
072      // let future requests pass through so we don't make this test take any longer than necessary
073      LOG.info("Intercepting GetOp");
074      if (!THROTTLING_OCCURRED.get()) {
075        try {
076          c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
077            OperationQuota.OperationType.GET);
078          LOG.info("Request was not throttled");
079        } catch (RpcThrottlingException e) {
080          LOG.info("Intercepting was throttled");
081          THROTTLING_OCCURRED.set(true);
082          throw e;
083        }
084      }
085    }
086  }
087
088  public static class MyCoprocessor implements RegionCoprocessor {
089    RegionObserver observer = new MyRegionObserver();
090
091    @Override
092    public Optional<RegionObserver> getRegionObserver() {
093      return Optional.of(observer);
094    }
095  }
096
097  @BeforeClass
098  public static void setUp() throws Exception {
099    Configuration conf = UTIL.getConfiguration();
100    conf.setBoolean("hbase.quota.enabled", true);
101    conf.setInt("hbase.quota.default.user.machine.read.num", 1);
102    conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
103    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName());
104    UTIL.startMiniCluster(3);
105    byte[][] splitKeys = new byte[8][];
106    for (int i = 111; i < 999; i += 111) {
107      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
108    }
109    UTIL.createTable(TABLE_NAME, CF, splitKeys);
110    CONN = UTIL.getConnection();
111    TABLE = CONN.getTable(TABLE_NAME);
112    TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L)));
113  }
114
115  @AfterClass
116  public static void tearDown() throws Exception {
117    UTIL.shutdownMiniCluster();
118  }
119
120  @Test
121  public void testGet() throws InterruptedException, ExecutionException, IOException {
122    // Hit the table 5 times which ought to be enough to make a throttle happen
123    for (int i = 0; i < 5; i++) {
124      TABLE.get(new Get(Bytes.toBytes("000")));
125      if (THROTTLING_OCCURRED.get()) {
126        break;
127      }
128    }
129    assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get());
130  }
131}