001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.CallRunner; 040import org.apache.hadoop.hbase.ipc.PriorityFunction; 041import org.apache.hadoop.hbase.ipc.RpcScheduler; 042import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; 043import org.apache.hadoop.hbase.regionserver.RSRpcServices; 044import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; 045import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.After; 050import org.junit.AfterClass; 051import org.junit.Before; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056 057import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 058import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 059 060@Category({ MediumTests.class, ClientTests.class }) 061public class TestAsyncClientPauseForCallQueueTooBig { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class); 066 067 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 068 069 private static TableName TABLE_NAME = TableName.valueOf("CQTBE"); 070 071 private static byte[] FAMILY = Bytes.toBytes("Family"); 072 073 private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); 074 075 private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1); 076 077 private static AsyncConnection CONN; 078 079 private static boolean FAIL = false; 080 081 private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new ConcurrentHashMap<>(); 082 083 public static final class CQTBERpcScheduler extends SimpleRpcScheduler { 084 085 public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 086 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 087 Abortable server, int highPriorityLevel) { 088 super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 089 metaTransitionHandler, priority, server, highPriorityLevel); 090 } 091 092 @Override 093 public boolean dispatch(CallRunner callTask) throws InterruptedException { 094 if (FAIL) { 095 MethodDescriptor method = callTask.getRpcCall().getMethod(); 096 // this is for test scan, where we will send a open scanner first and then a next, and we 097 // expect that we hit CQTBE two times. 098 if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) { 099 return false; 100 } 101 } 102 return super.dispatch(callTask); 103 } 104 } 105 106 public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory { 107 108 @Override 109 public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { 110 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 111 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 112 return new CQTBERpcScheduler(conf, handlerCount, 113 conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 114 HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), 115 conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, 116 HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), 117 conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT, 118 HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT), 119 priority, server, HConstants.QOS_THRESHOLD); 120 } 121 122 } 123 124 @BeforeClass 125 public static void setUp() throws Exception { 126 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); 127 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, 128 TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS)); 129 UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 130 CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class); 131 UTIL.startMiniCluster(1); 132 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 133 } 134 135 @AfterClass 136 public static void tearDown() throws Exception { 137 Closeables.close(CONN, true); 138 UTIL.shutdownMiniCluster(); 139 } 140 141 @Before 142 public void setUpBeforeTest() throws IOException { 143 try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { 144 for (int i = 0; i < 100; i++) { 145 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); 146 } 147 } 148 FAIL = true; 149 } 150 151 @After 152 public void tearDownAfterTest() throws IOException { 153 FAIL = false; 154 INVOKED.clear(); 155 UTIL.getAdmin().disableTable(TABLE_NAME); 156 UTIL.getAdmin().deleteTable(TABLE_NAME); 157 } 158 159 private void assertTime(Callable<Void> callable, long time) throws Exception { 160 long startNs = System.nanoTime(); 161 callable.call(); 162 long costNs = System.nanoTime() - startNs; 163 assertTrue(costNs > time); 164 } 165 166 @Test 167 public void testGet() throws Exception { 168 assertTime(() -> { 169 Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); 170 assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); 171 return null; 172 }, PAUSE_FOR_CQTBE_NS); 173 } 174 175 @Test 176 public void testBatch() throws Exception { 177 assertTime(() -> { 178 List<CompletableFuture<?>> futures = new ArrayList<>(); 179 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 180 for (int i = 100; i < 110; i++) { 181 futures.add(mutator 182 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 183 } 184 } 185 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 186 }, PAUSE_FOR_CQTBE_NS); 187 } 188 189 @Test 190 public void testScan() throws Exception { 191 // we will hit CallQueueTooBigException two times so the sleep time should be twice 192 assertTime(() -> { 193 try ( 194 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 195 for (int i = 0; i < 100; i++) { 196 Result result = scanner.next(); 197 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 198 } 199 assertNull(scanner.next()); 200 } 201 return null; 202 }, PAUSE_FOR_CQTBE_NS * 2); 203 } 204}