001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertThrows; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.RegionTooBusyException; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.RSRpcServices; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054 055@Tag(MediumTests.TAG) 056@Tag(ClientTests.TAG) 057public class TestAsyncClientPauseForRpcThrottling { 058 059 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 060 061 private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling"); 062 063 private static byte[] FAMILY = Bytes.toBytes("Family"); 064 065 private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); 066 067 private static AsyncConnection CONN; 068 private static final AtomicBoolean THROTTLE = new AtomicBoolean(false); 069 private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0); 070 private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); 071 private static final int RETRY_COUNT = 3; 072 private static final int MAX_MULTIPLIER_EXPECTATION = 2; 073 074 public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices { 075 076 public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException { 077 super(rs); 078 } 079 080 @Override 081 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 082 throws ServiceException { 083 maybeForceRetry(); 084 maybeThrottle(); 085 return super.get(controller, request); 086 } 087 088 @Override 089 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 090 throws ServiceException { 091 maybeForceRetry(); 092 maybeThrottle(); 093 return super.multi(rpcc, request); 094 } 095 096 @Override 097 public ClientProtos.ScanResponse scan(RpcController controller, 098 ClientProtos.ScanRequest request) throws ServiceException { 099 maybeForceRetry(); 100 maybeThrottle(); 101 return super.scan(controller, request); 102 } 103 104 private void maybeForceRetry() throws ServiceException { 105 if (FORCE_RETRIES.get() > 0) { 106 FORCE_RETRIES.addAndGet(-1); 107 throw new ServiceException(new RegionTooBusyException("Retry")); 108 } 109 } 110 111 private void maybeThrottle() throws ServiceException { 112 if (THROTTLE.get()) { 113 THROTTLE.set(false); 114 throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait " 115 + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms")); 116 } 117 } 118 } 119 120 public static final class ThrottlingRegionServerForTest extends HRegionServer { 121 122 public ThrottlingRegionServerForTest(Configuration conf) throws IOException { 123 super(conf); 124 } 125 126 @Override 127 protected RSRpcServices createRpcServices() throws IOException { 128 return new ThrottlingRSRpcServicesForTest(this); 129 } 130 } 131 132 @BeforeAll 133 public static void setUp() throws Exception { 134 assertTrue(MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT], 135 "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] " 136 + "in order for our tests to adequately verify that we aren't " 137 + "multiplying throttled pauses based on the retry count."); 138 139 UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 140 UTIL.startMiniCluster(1); 141 UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, 142 ThrottlingRegionServerForTest.class, HRegionServer.class); 143 HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer(); 144 145 try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { 146 UTIL.waitTableAvailable(TABLE_NAME); 147 for (int i = 0; i < 100; i++) { 148 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); 149 } 150 } 151 152 UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(), 153 regionServer.getServerName()); 154 Configuration conf = new Configuration(UTIL.getConfiguration()); 155 CONN = ConnectionFactory.createAsyncConnection(conf).get(); 156 } 157 158 @AfterAll 159 public static void tearDown() throws Exception { 160 UTIL.getAdmin().disableTable(TABLE_NAME); 161 UTIL.getAdmin().deleteTable(TABLE_NAME); 162 Closeables.close(CONN, true); 163 UTIL.shutdownMiniCluster(); 164 } 165 166 private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception { 167 long costNs = getCostNs(callable); 168 if (isGreater) { 169 assertTrue(costNs > time); 170 } else { 171 assertTrue(costNs <= time); 172 } 173 } 174 175 private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception { 176 long costNs = getCostNs(callable); 177 assertTrue(costNs > minNs); 178 assertTrue(costNs < maxNs); 179 } 180 181 private long getCostNs(Callable<Void> callable) throws Exception { 182 long startNs = System.nanoTime(); 183 callable.call(); 184 return System.nanoTime() - startNs; 185 } 186 187 @Test 188 public void itWaitsForThrottledGet() throws Exception { 189 boolean isThrottled = true; 190 THROTTLE.set(isThrottled); 191 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 192 assertTime(() -> { 193 table.get(new Get(Bytes.toBytes(0))).get(); 194 return null; 195 }, WAIT_INTERVAL_NANOS, isThrottled); 196 } 197 198 @Test 199 public void itDoesNotWaitForUnthrottledGet() throws Exception { 200 boolean isThrottled = false; 201 THROTTLE.set(isThrottled); 202 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 203 assertTime(() -> { 204 table.get(new Get(Bytes.toBytes(0))).get(); 205 return null; 206 }, WAIT_INTERVAL_NANOS, isThrottled); 207 } 208 209 @Test 210 public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception { 211 AsyncTable<AdvancedScanResultConsumer> table = 212 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build(); 213 boolean isThrottled = true; 214 THROTTLE.set(isThrottled); 215 assertTime(() -> { 216 assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get()); 217 return null; 218 }, WAIT_INTERVAL_NANOS, false); 219 } 220 221 @Test 222 public void itDoesNotMultiplyThrottledGetWait() throws Exception { 223 THROTTLE.set(true); 224 FORCE_RETRIES.set(RETRY_COUNT); 225 226 AsyncTable<AdvancedScanResultConsumer> table = 227 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 228 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); 229 230 assertTimeBetween(() -> { 231 table.get(new Get(Bytes.toBytes(0))).get(); 232 return null; 233 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 234 } 235 236 @Test 237 public void itWaitsForThrottledBatch() throws Exception { 238 boolean isThrottled = true; 239 THROTTLE.set(isThrottled); 240 assertTime(() -> { 241 List<CompletableFuture<?>> futures = new ArrayList<>(); 242 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 243 for (int i = 100; i < 110; i++) { 244 futures.add(mutator 245 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 246 } 247 } 248 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 249 }, WAIT_INTERVAL_NANOS, isThrottled); 250 } 251 252 @Test 253 public void itDoesNotWaitForUnthrottledBatch() throws Exception { 254 boolean isThrottled = false; 255 THROTTLE.set(isThrottled); 256 assertTime(() -> { 257 List<CompletableFuture<?>> futures = new ArrayList<>(); 258 try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { 259 for (int i = 100; i < 110; i++) { 260 futures.add(mutator 261 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 262 } 263 } 264 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 265 }, WAIT_INTERVAL_NANOS, isThrottled); 266 } 267 268 @Test 269 public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception { 270 boolean isThrottled = true; 271 THROTTLE.set(isThrottled); 272 assertTime(() -> { 273 List<CompletableFuture<?>> futures = new ArrayList<>(); 274 try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) 275 .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) { 276 for (int i = 100; i < 110; i++) { 277 futures.add(mutator 278 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 279 } 280 } 281 assertThrows(ExecutionException.class, 282 () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get()); 283 return null; 284 }, WAIT_INTERVAL_NANOS, false); 285 } 286 287 @Test 288 public void itDoesNotMultiplyThrottledBatchWait() throws Exception { 289 THROTTLE.set(true); 290 FORCE_RETRIES.set(RETRY_COUNT); 291 292 assertTimeBetween(() -> { 293 List<CompletableFuture<?>> futures = new ArrayList<>(); 294 try (AsyncBufferedMutator mutator = 295 CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 296 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) { 297 for (int i = 100; i < 110; i++) { 298 futures.add(mutator 299 .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); 300 } 301 } 302 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); 303 return null; 304 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 305 } 306 307 @Test 308 public void itWaitsForThrottledScan() throws Exception { 309 boolean isThrottled = true; 310 THROTTLE.set(isThrottled); 311 assertTime(() -> { 312 try ( 313 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 314 for (int i = 0; i < 100; i++) { 315 Result result = scanner.next(); 316 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 317 } 318 } 319 return null; 320 }, WAIT_INTERVAL_NANOS, isThrottled); 321 } 322 323 @Test 324 public void itDoesNotWaitForUnthrottledScan() throws Exception { 325 boolean isThrottled = false; 326 THROTTLE.set(isThrottled); 327 assertTime(() -> { 328 try ( 329 ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { 330 for (int i = 0; i < 100; i++) { 331 Result result = scanner.next(); 332 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 333 } 334 } 335 return null; 336 }, WAIT_INTERVAL_NANOS, isThrottled); 337 } 338 339 @Test 340 public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception { 341 AsyncTable<AdvancedScanResultConsumer> table = 342 CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build(); 343 boolean isThrottled = true; 344 THROTTLE.set(isThrottled); 345 assertTime(() -> { 346 try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { 347 for (int i = 0; i < 100; i++) { 348 assertThrows(RetriesExhaustedException.class, scanner::next); 349 } 350 } 351 return null; 352 }, WAIT_INTERVAL_NANOS, false); 353 } 354 355 @Test 356 public void itDoesNotMultiplyThrottledScanWait() throws Exception { 357 THROTTLE.set(true); 358 FORCE_RETRIES.set(RETRY_COUNT); 359 360 AsyncTable<AdvancedScanResultConsumer> table = 361 CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) 362 .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); 363 364 assertTimeBetween(() -> { 365 try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { 366 for (int i = 0; i < 100; i++) { 367 Result result = scanner.next(); 368 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); 369 } 370 } 371 return null; 372 }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); 373 } 374}