001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.quotas.OperationQuota; 036import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 037import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Tag(MediumTests.TAG) 048@Tag(CoprocessorTests.TAG) 049public class TestRegionCoprocessorQuotaUsage { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class); 052 053 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 054 private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage"); 055 private static byte[] CF = Bytes.toBytes("CF"); 056 private static byte[] CQ = Bytes.toBytes("CQ"); 057 private static Connection CONN; 058 private static Table TABLE; 059 private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false); 060 061 public static class MyRegionObserver implements RegionObserver { 062 @Override 063 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 064 List<Cell> result) throws IOException { 065 066 // For the purposes of this test, we only need to catch a throttle happening once, then 067 // let future requests pass through so we don't make this test take any longer than necessary 068 LOG.info("Intercepting GetOp"); 069 if (!THROTTLING_OCCURRED.get()) { 070 try { 071 c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(), 072 OperationQuota.OperationType.GET); 073 LOG.info("Request was not throttled"); 074 } catch (RpcThrottlingException e) { 075 LOG.info("Intercepting was throttled"); 076 THROTTLING_OCCURRED.set(true); 077 throw e; 078 } 079 } 080 } 081 } 082 083 public static class MyCoprocessor implements RegionCoprocessor { 084 RegionObserver observer = new MyRegionObserver(); 085 086 @Override 087 public Optional<RegionObserver> getRegionObserver() { 088 return Optional.of(observer); 089 } 090 } 091 092 @BeforeAll 093 public static void setUp() throws Exception { 094 Configuration conf = UTIL.getConfiguration(); 095 conf.setBoolean("hbase.quota.enabled", true); 096 conf.setInt("hbase.quota.default.user.machine.read.num", 1); 097 conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter"); 098 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName()); 099 UTIL.startMiniCluster(3); 100 byte[][] splitKeys = new byte[8][]; 101 for (int i = 111; i < 999; i += 111) { 102 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 103 } 104 UTIL.createTable(TABLE_NAME, CF, splitKeys); 105 CONN = UTIL.getConnection(); 106 TABLE = CONN.getTable(TABLE_NAME); 107 TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L))); 108 } 109 110 @AfterAll 111 public static void tearDown() throws Exception { 112 UTIL.shutdownMiniCluster(); 113 } 114 115 @Test 116 public void testGet() throws InterruptedException, ExecutionException, IOException { 117 // Hit the table 5 times which ought to be enough to make a throttle happen 118 for (int i = 0; i < 5; i++) { 119 TABLE.get(new Get(Bytes.toBytes("000"))); 120 if (THROTTLING_OCCURRED.get()) { 121 break; 122 } 123 } 124 assertTrue(THROTTLING_OCCURRED.get(), "Throttling did not happen as expected"); 125 } 126}