001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.quotas.OperationQuota; 037import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 038import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ MediumTests.class, CoprocessorTests.class }) 050public class TestRegionCoprocessorQuotaUsage { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class); 055 056 private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class); 057 058 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 059 private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage"); 060 private static byte[] CF = Bytes.toBytes("CF"); 061 private static byte[] CQ = Bytes.toBytes("CQ"); 062 private static Connection CONN; 063 private static Table TABLE; 064 private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false); 065 066 public static class MyRegionObserver implements RegionObserver { 067 @Override 068 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 069 List<Cell> result) throws IOException { 070 071 // For the purposes of this test, we only need to catch a throttle happening once, then 072 // let future requests pass through so we don't make this test take any longer than necessary 073 LOG.info("Intercepting GetOp"); 074 if (!THROTTLING_OCCURRED.get()) { 075 try { 076 c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(), 077 OperationQuota.OperationType.GET); 078 LOG.info("Request was not throttled"); 079 } catch (RpcThrottlingException e) { 080 LOG.info("Intercepting was throttled"); 081 THROTTLING_OCCURRED.set(true); 082 throw e; 083 } 084 } 085 } 086 } 087 088 public static class MyCoprocessor implements RegionCoprocessor { 089 RegionObserver observer = new MyRegionObserver(); 090 091 @Override 092 public Optional<RegionObserver> getRegionObserver() { 093 return Optional.of(observer); 094 } 095 } 096 097 @BeforeClass 098 public static void setUp() throws Exception { 099 Configuration conf = UTIL.getConfiguration(); 100 conf.setBoolean("hbase.quota.enabled", true); 101 conf.setInt("hbase.quota.default.user.machine.read.num", 1); 102 conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter"); 103 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName()); 104 UTIL.startMiniCluster(3); 105 byte[][] splitKeys = new byte[8][]; 106 for (int i = 111; i < 999; i += 111) { 107 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 108 } 109 UTIL.createTable(TABLE_NAME, CF, splitKeys); 110 CONN = UTIL.getConnection(); 111 TABLE = CONN.getTable(TABLE_NAME); 112 TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L))); 113 } 114 115 @AfterClass 116 public static void tearDown() throws Exception { 117 UTIL.shutdownMiniCluster(); 118 } 119 120 @Test 121 public void testGet() throws InterruptedException, ExecutionException, IOException { 122 // Hit the table 5 times which ought to be enough to make a throttle happen 123 for (int i = 0; i < 5; i++) { 124 TABLE.get(new Get(Bytes.toBytes("000"))); 125 if (THROTTLING_OCCURRED.get()) { 126 break; 127 } 128 } 129 assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get()); 130 } 131}