001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.Callable; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.CallRunner; 041import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; 042import org.apache.hadoop.hbase.ipc.PriorityFunction; 043import org.apache.hadoop.hbase.ipc.RpcScheduler; 044import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; 045import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; 046import org.apache.hadoop.hbase.regionserver.RSRpcServices; 047import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; 048import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.junit.After; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 062 063@Category({ MediumTests.class, ClientTests.class }) 064public class TestAsyncClientPauseForServerOverloaded { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class); 069 070 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 071 072 private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded"); 073 074 private static byte[] FAMILY = Bytes.toBytes("Family"); 075 076 private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); 077 078 private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1); 079 private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = 080 TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_SERVER_OVERLOADED_NANOS); 081 082 private static AsyncConnection CONN; 083 084 private static volatile FailMode MODE = null; 085 086 enum FailMode { 087 CALL_QUEUE_TOO_BIG, 088 CALL_DROPPED; 089 090 private ConcurrentMap<MethodDescriptor, AtomicInteger> invoked = new ConcurrentHashMap<>(); 091 092 // this is for test scan, where we will send a open scanner first and then a next, and we 093 // expect that we hit CQTBE two times. 094 private boolean shouldFail(CallRunner callRunner) { 095 MethodDescriptor method = callRunner.getRpcCall().getMethod(); 096 return invoked.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0; 097 } 098 } 099 100 public static final class OverloadedRpcScheduler extends SimpleRpcScheduler { 101 102 public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, 103 int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, 104 Abortable server, int highPriorityLevel) { 105 super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 106 metaTransitionHandler, priority, server, highPriorityLevel); 107 } 108 109 @Override 110 public boolean dispatch(CallRunner callTask) { 111 if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) { 112 return false; 113 } 114 return super.dispatch(callTask); 115 } 116 } 117 118 public static final class OverloadedQueue extends TestPluggableQueueImpl { 119 120 public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) { 121 super(maxQueueLength, priority, conf); 122 } 123 124 @Override 125 public boolean offer(CallRunner callRunner) { 126 if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) { 127 callRunner.drop(); 128 return true; 129 } 130 return super.offer(callRunner); 131 } 132 } 133 134 public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory { 135 136 @Override 137 public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { 138 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 139 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 140 return new OverloadedRpcScheduler(conf, handlerCount, 141 conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 142 HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), 143 conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, 144 HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), 145 conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT, 146 HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT), 147 priority, server, HConstants.QOS_THRESHOLD); 148 } 149 150 } 151 152 @BeforeClass 153 public static void setUp() throws Exception { 154 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); 155 UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); 156 UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", 157 OverloadedQueue.class, PluggableBlockingQueue.class); 158 UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 159 OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class); 160 UTIL.startMiniCluster(1); 161 162 Configuration conf = new Configuration(UTIL.getConfiguration()); 163 conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); 164 CONN = ConnectionFactory.createAsyncConnection(conf).get(); 165 } 166 167 @AfterClass 168 public static void tearDown() throws Exception { 169 Closeables.close(CONN, true); 170 UTIL.shutdownMiniCluster(); 171 } 172 173 @Before 174 public void setUpBeforeTest() throws IOException { 175 try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { 176 for (int i = 0; i < 100; i++) { 177 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); 178 } 179 } 180 MODE = FailMode.CALL_QUEUE_TOO_BIG; 181 } 182 183 @After 184 public void tearDownAfterTest() throws IOException { 185 for (FailMode mode : FailMode.values()) { 186 mode.invoked.clear(); 187 } 188 MODE = null; 189 UTIL.getAdmin().disableTable(TABLE_NAME); 190 UTIL.getAdmin().deleteTable(TABLE_NAME); 191 } 192 193 private void assertTime(Callable<Void> callable, long time) throws Exception { 194 for (FailMode mode : FailMode.values()) { 195 MODE = mode; 196 197 long startNs = System.nanoTime(); 198 callable.call(); 199 long costNs = System.nanoTime() - startNs; 200 assertTrue(costNs > time); 201 } 202 } 203 204 @Test 205 public void testGet() throws Exception { 206 assertTime(() -> { 207 Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); 208 assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); 209 return null; 210 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); 211 } 212 213 @Test 214 public void testBatch() throws Exception { 215 assertTime(() -> { 216 List<CompletableFuture<?>> futures = new ArrayList<>(); 217 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 218 for (int i = 100; i < 110; i++) { 219 futures.add(mutator 220 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 221 } 222 } 223 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 224 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); 225 } 226 227 @Test 228 public void testScan() throws Exception { 229 // we will hit CallQueueTooBigException two times so the sleep time should be twice 230 assertTime(() -> { 231 try ( 232 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 233 for (int i = 0; i < 100; i++) { 234 Result result = scanner.next(); 235 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 236 } 237 assertNull(scanner.next()); 238 } 239 return null; 240 }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2); 241 } 242}