001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.function.Supplier; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.NamespaceDescriptor; 032import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 035import org.apache.hadoop.hbase.ipc.CallTimeoutException; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.regionserver.RSRpcServices; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.jupiter.api.AfterAll; 042import org.junit.jupiter.api.BeforeAll; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045import org.junit.jupiter.api.TestInfo; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 051 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 054 055@Tag(ClientTests.TAG) 056@Tag(LargeTests.TAG) 057public class TestClientScannerTimeouts { 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); 060 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 061 062 private static AsyncConnection ASYNC_CONN; 063 private static Connection CONN; 064 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 065 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 066 private static final byte[] VALUE = Bytes.toBytes("testValue"); 067 068 private static final byte[] ROW0 = Bytes.toBytes("row-0"); 069 private static final byte[] ROW1 = Bytes.toBytes("row-1"); 070 private static final byte[] ROW2 = Bytes.toBytes("row-2"); 071 private static final byte[] ROW3 = Bytes.toBytes("row-3"); 072 private static final int rpcTimeout = 1000; 073 private static final int scanTimeout = 3 * rpcTimeout; 074 private static final int metaScanTimeout = 6 * rpcTimeout; 075 private static final int CLIENT_RETRIES_NUMBER = 3; 076 077 private static Table table; 078 private static AsyncTable<AdvancedScanResultConsumer> asyncTable; 079 080 @BeforeAll 081 public static void setUpBeforeClass() throws Exception { 082 Configuration conf = TEST_UTIL.getConfiguration(); 083 // Don't report so often so easier to see other rpcs 084 conf.setInt("hbase.regionserver.msginterval", 3 * 10000); 085 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); 086 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); 087 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); 088 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); 089 TEST_UTIL.startMiniCluster(1); 090 091 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); 092 conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); 093 conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); 094 ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); 095 CONN = ASYNC_CONN.toConnection(); 096 } 097 098 @AfterAll 099 public static void tearDownAfterClass() throws Exception { 100 CONN.close(); 101 ASYNC_CONN.close(); 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 105 public void setup(boolean isSystemTable, TestInfo testInfo) throws IOException { 106 RSRpcServicesWithScanTimeout.reset(); 107 108 String nameAsString = testInfo.getTestMethod().get().getName(); 109 if (isSystemTable) { 110 nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; 111 } 112 final TableName tableName = TableName.valueOf(nameAsString); 113 114 TEST_UTIL.createTable(tableName, FAMILY); 115 table = CONN.getTable(tableName); 116 asyncTable = ASYNC_CONN.getTable(tableName); 117 putToTable(table, ROW0); 118 putToTable(table, ROW1); 119 putToTable(table, ROW2); 120 putToTable(table, ROW3); 121 LOG.info("Wrote our four values"); 122 123 table.getRegionLocator().getAllRegionLocations(); 124 125 // reset again incase the creation/population caused anything to trigger 126 RSRpcServicesWithScanTimeout.reset(); 127 } 128 129 private void expectRow(byte[] expected, Result result) { 130 assertTrue(Bytes.equals(expected, result.getRow()), 131 "Expected row: " + Bytes.toString(expected)); 132 } 133 134 private void expectNumTries(int expected) { 135 assertEquals(expected, RSRpcServicesWithScanTimeout.tryNumber, 136 "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber); 137 // reset for next 138 RSRpcServicesWithScanTimeout.tryNumber = 0; 139 } 140 141 /** 142 * verify that we don't miss any data when encountering an OutOfOrderScannerNextException. 143 * Typically, the only way to naturally trigger this is if a client-side timeout causes an 144 * erroneous next() call. This is relatively hard to do these days because the server attempts to 145 * always return before the timeout. In this test we force the server to throw this exception, so 146 * that we can test the retry logic appropriately. 147 */ 148 @Test 149 public void testRetryOutOfOrderScannerNextException(TestInfo testInfo) throws IOException { 150 expectRetryOutOfOrderScannerNext(this::getScanner, testInfo); 151 } 152 153 /** 154 * AsyncTable version of above 155 */ 156 @Test 157 public void testRetryOutOfOrderScannerNextExceptionAsync(TestInfo testInfo) throws IOException { 158 expectRetryOutOfOrderScannerNext(this::getAsyncScanner, testInfo); 159 } 160 161 /** 162 * verify that we honor the {@link HConstants#HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD} for normal 163 * scans. 164 */ 165 @Test 166 public void testNormalScanTimeoutOnNext(TestInfo testInfo) throws IOException { 167 setup(false, testInfo); 168 expectTimeoutOnNext(scanTimeout, this::getScanner); 169 } 170 171 /** 172 * AsyncTable version of above 173 */ 174 @Test 175 public void testNormalScanTimeoutOnNextAsync(TestInfo testInfo) throws IOException { 176 setup(false, testInfo); 177 expectTimeoutOnNext(scanTimeout, this::getAsyncScanner); 178 } 179 180 /** 181 * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for 182 * meta scans 183 */ 184 @Test 185 public void testNormalScanTimeoutOnOpenScanner(TestInfo testInfo) throws IOException { 186 setup(false, testInfo); 187 expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner); 188 } 189 190 /** 191 * AsyncTable version of above 192 */ 193 @Test 194 public void testNormalScanTimeoutOnOpenScannerAsync(TestInfo testInfo) throws IOException { 195 setup(false, testInfo); 196 expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner); 197 } 198 199 /** 200 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for 201 * next() calls in meta scans 202 */ 203 @Test 204 public void testMetaScanTimeoutOnNext(TestInfo testInfo) throws IOException { 205 setup(true, testInfo); 206 expectTimeoutOnNext(metaScanTimeout, this::getScanner); 207 } 208 209 /** 210 * AsyncTable version of above 211 */ 212 @Test 213 public void testMetaScanTimeoutOnNextAsync(TestInfo testInfo) throws IOException { 214 setup(true, testInfo); 215 expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner); 216 } 217 218 /** 219 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for 220 * openScanner() calls for meta scans 221 */ 222 @Test 223 public void testMetaScanTimeoutOnOpenScanner(TestInfo testInfo) throws IOException { 224 setup(true, testInfo); 225 expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); 226 } 227 228 /** 229 * AsyncTable version of above 230 */ 231 @Test 232 public void testMetaScanTimeoutOnOpenScannerAsync(TestInfo testInfo) throws IOException { 233 setup(true, testInfo); 234 expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); 235 } 236 237 private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier, 238 TestInfo testInfo) throws IOException { 239 setup(false, testInfo); 240 RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1; 241 242 LOG.info( 243 "Opening scanner, expecting no errors from first next() call from openScanner response"); 244 ResultScanner scanner = scannerSupplier.get(); 245 Result result = scanner.next(); 246 expectRow(ROW0, result); 247 expectNumTries(0); 248 249 LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); 250 result = scanner.next(); 251 expectRow(ROW1, result); 252 expectNumTries(0); 253 254 LOG.info( 255 "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); 256 result = scanner.next(); 257 expectRow(ROW2, result); 258 expectNumTries(1); 259 260 // reset so no errors. since last call restarted the scan and following 261 // call would otherwise fail 262 RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; 263 264 LOG.info("Finishing scan, expecting no errors"); 265 result = scanner.next(); 266 expectRow(ROW3, result); 267 scanner.close(); 268 269 LOG.info("Testing always throw exception"); 270 byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; 271 int i = 0; 272 273 // test the case that RPC always throws 274 scanner = scannerSupplier.get(); 275 RSRpcServicesWithScanTimeout.throwAlways = true; 276 277 while (true) { 278 LOG.info("Calling scanner.next()"); 279 result = scanner.next(); 280 if (result == null) { 281 break; 282 } else { 283 byte[] expectedResult = expectedResults[i++]; 284 expectRow(expectedResult, result); 285 } 286 } 287 288 // ensure we verified all rows. this along with the expectRow check above 289 // proves that we didn't miss any rows. 290 assertEquals(expectedResults.length, i, "Expected to exhaust expectedResults array length=" 291 + expectedResults.length + ", actual index=" + i); 292 293 // expect all but the first row (which came from initial openScanner) to have thrown an error 294 expectNumTries(expectedResults.length - 1); 295 296 } 297 298 private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier) 299 throws IOException { 300 RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; 301 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 302 303 LOG.info( 304 "Opening scanner, expecting no timeouts from first next() call from openScanner response"); 305 ResultScanner scanner = scannerSupplier.get(); 306 Result result = scanner.next(); 307 expectRow(ROW0, result); 308 309 LOG.info("Making first next() RPC, expecting no timeout for seqNo 0"); 310 result = scanner.next(); 311 expectRow(ROW1, result); 312 313 LOG.info("Making second next() RPC, expecting timeout"); 314 long start = System.nanoTime(); 315 try { 316 scanner.next(); 317 fail("Expected CallTimeoutException"); 318 } catch (RetriesExhaustedException e) { 319 assertTrue(e.getCause() instanceof CallTimeoutException, "Expected CallTimeoutException"); 320 } 321 expectTimeout(start, timeout); 322 } 323 324 private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier) 325 throws IOException { 326 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 327 RSRpcServicesWithScanTimeout.sleepOnOpen = true; 328 LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response"); 329 long start = System.nanoTime(); 330 try { 331 scannerSupplier.get().next(); 332 fail("Expected CallTimeoutException"); 333 } catch (RetriesExhaustedException e) { 334 assertTrue(e.getCause() instanceof CallTimeoutException, 335 "Expected CallTimeoutException, but was " + e.getCause()); 336 } 337 expectTimeout(start, timeout); 338 } 339 340 private void expectTimeout(long start, int timeout) { 341 long duration = System.nanoTime() - start; 342 assertTrue(duration >= timeout, "Expected duration >= " + timeout + ", but was " + duration); 343 } 344 345 private ResultScanner getScanner() { 346 Scan scan = new Scan(); 347 scan.setCaching(1); 348 try { 349 return table.getScanner(scan); 350 } catch (IOException e) { 351 throw new RuntimeException(e); 352 } 353 } 354 355 private ResultScanner getAsyncScanner() { 356 Scan scan = new Scan(); 357 scan.setCaching(1); 358 return asyncTable.getScanner(scan); 359 } 360 361 private void putToTable(Table ht, byte[] rowkey) throws IOException { 362 Put put = new Put(rowkey); 363 put.addColumn(FAMILY, QUALIFIER, VALUE); 364 ht.put(put); 365 } 366 367 private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { 368 public RegionServerWithScanTimeout(Configuration conf) 369 throws IOException, InterruptedException { 370 super(conf); 371 } 372 373 @Override 374 protected RSRpcServices createRpcServices() throws IOException { 375 return new RSRpcServicesWithScanTimeout(this); 376 } 377 } 378 379 private static class RSRpcServicesWithScanTimeout extends RSRpcServices { 380 private long tableScannerId; 381 382 private static long seqNoToThrowOn = -1; 383 private static boolean throwAlways = false; 384 private static boolean threw; 385 386 private static long seqNoToSleepOn = -1; 387 private static boolean sleepOnOpen = false; 388 private static volatile boolean slept; 389 private static int tryNumber = 0; 390 391 private static int sleepTime = rpcTimeout + 500; 392 393 public static void setSleepForTimeout(int timeout) { 394 sleepTime = timeout + 500; 395 } 396 397 public static void reset() { 398 setSleepForTimeout(scanTimeout); 399 400 seqNoToSleepOn = -1; 401 seqNoToThrowOn = -1; 402 throwAlways = false; 403 threw = false; 404 sleepOnOpen = false; 405 slept = false; 406 tryNumber = 0; 407 } 408 409 public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { 410 super(rs); 411 } 412 413 @Override 414 public ScanResponse scan(final RpcController controller, final ScanRequest request) 415 throws ServiceException { 416 if (request.hasScannerId()) { 417 ScanResponse scanResponse = super.scan(controller, request); 418 if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { 419 return scanResponse; 420 } 421 422 if ( 423 throwAlways 424 || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq()) 425 ) { 426 threw = true; 427 tryNumber++; 428 LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber, 429 tableScannerId); 430 throw new ServiceException(new OutOfOrderScannerNextException()); 431 } 432 433 if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { 434 try { 435 LOG.info("SLEEPING " + sleepTime); 436 Thread.sleep(sleepTime); 437 } catch (InterruptedException e) { 438 } 439 slept = true; 440 tryNumber++; 441 } 442 return scanResponse; 443 } else { 444 ScanResponse scanRes = super.scan(controller, request); 445 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); 446 if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { 447 tableScannerId = scanRes.getScannerId(); 448 if (sleepOnOpen) { 449 try { 450 LOG.info("openScanner SLEEPING " + sleepTime); 451 Thread.sleep(sleepTime); 452 } catch (InterruptedException e) { 453 } 454 } 455 } 456 return scanRes; 457 } 458 } 459 } 460}