001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; 021import static org.junit.jupiter.api.Assertions.assertArrayEquals; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.Callable; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.CallRunner; 040import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; 041import org.apache.hadoop.hbase.ipc.PriorityFunction; 042import org.apache.hadoop.hbase.ipc.RpcScheduler; 043import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; 044import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; 045import org.apache.hadoop.hbase.regionserver.RSRpcServices; 046import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; 047import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.AfterEach; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057 058import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 059import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 060 061@Tag(MediumTests.TAG) 062@Tag(ClientTests.TAG) 063public class TestAsyncClientPauseForServerOverloaded { 064 065 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 066 067 private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded"); 068 069 private static byte[] FAMILY = Bytes.toBytes("Family"); 070 071 private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); 072 073 private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1); 074 private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = 075 TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_SERVER_OVERLOADED_NANOS); 076 077 private static AsyncConnection CONN; 078 079 private static volatile FailMode MODE = null; 080 081 enum FailMode { 082 CALL_QUEUE_TOO_BIG, 083 CALL_DROPPED; 084 085 private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>(); 086 087 // this is for test scan, where we will send a open scanner first and then a next, and we 088 // expect that we hit CQTBE two times. 089 private boolean shouldFail(CallRunner callRunner) { 090 MethodDescriptor method = callRunner.getRpcCall().getMethod(); 091 return invoked.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0; 092 } 093 } 094 095 public static final class OverloadedRpcScheduler extends SimpleRpcScheduler { 096 097 public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 098 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 099 Abortable server, int highPriorityLevel) { 100 super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 101 metaTransitionHandler, priority, server, highPriorityLevel); 102 } 103 104 @Override 105 public boolean dispatch(CallRunner callTask) { 106 if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) { 107 return false; 108 } 109 return super.dispatch(callTask); 110 } 111 } 112 113 public static final class OverloadedQueue extends TestPluggableQueueImpl { 114 115 public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) { 116 super(maxQueueLength, priority, conf); 117 } 118 119 @Override 120 public boolean offer(CallRunner callRunner) { 121 if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) { 122 callRunner.drop(); 123 return true; 124 } 125 return super.offer(callRunner); 126 } 127 } 128 129 public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory { 130 131 @Override 132 public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { 133 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 134 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 135 return new OverloadedRpcScheduler(conf, handlerCount, 136 conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 137 HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), 138 conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, 139 HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), 140 conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT, 141 HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT), 142 priority, server, HConstants.QOS_THRESHOLD); 143 } 144 145 } 146 147 @BeforeAll 148 public static void setUp() throws Exception { 149 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); 150 UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); 151 UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", 152 OverloadedQueue.class, PluggableBlockingQueue.class); 153 UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 154 OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class); 155 UTIL.startMiniCluster(1); 156 157 Configuration conf = new Configuration(UTIL.getConfiguration()); 158 conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); 159 CONN = ConnectionFactory.createAsyncConnection(conf).get(); 160 } 161 162 @AfterAll 163 public static void tearDown() throws Exception { 164 Closeables.close(CONN, true); 165 UTIL.shutdownMiniCluster(); 166 } 167 168 @BeforeEach 169 public void setUpBeforeTest() throws IOException { 170 try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { 171 for (int i = 0; i < 100; i++) { 172 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); 173 } 174 } 175 MODE = FailMode.CALL_QUEUE_TOO_BIG; 176 } 177 178 @AfterEach 179 public void tearDownAfterTest() throws IOException { 180 for (FailMode mode : FailMode.values()) { 181 mode.invoked.clear(); 182 } 183 MODE = null; 184 UTIL.getAdmin().disableTable(TABLE_NAME); 185 UTIL.getAdmin().deleteTable(TABLE_NAME); 186 } 187 188 private void assertTime(Callable<Void> callable, long time) throws Exception { 189 for (FailMode mode : FailMode.values()) { 190 MODE = mode; 191 192 long startNs = System.nanoTime(); 193 callable.call(); 194 long costNs = System.nanoTime() - startNs; 195 assertTrue(costNs > time); 196 } 197 } 198 199 @Test 200 public void testGet() throws Exception { 201 assertTime(() -> { 202 Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); 203 assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); 204 return null; 205 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); 206 } 207 208 @Test 209 public void testBatch() throws Exception { 210 assertTime(() -> { 211 List<CompletableFuture<?>> futures = new ArrayList<>(); 212 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 213 for (int i = 100; i < 110; i++) { 214 futures.add(mutator 215 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 216 } 217 } 218 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 219 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); 220 } 221 222 @Test 223 public void testScan() throws Exception { 224 // we will hit CallQueueTooBigException two times so the sleep time should be twice 225 assertTime(() -> { 226 try ( 227 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 228 for (int i = 0; i < 100; i++) { 229 Result result = scanner.next(); 230 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 231 } 232 assertNull(scanner.next()); 233 } 234 return null; 235 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2); 236 } 237}