001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.net.SocketTimeoutException; 028import java.util.concurrent.TimeUnit; 029import java.util.function.Supplier; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MiniHBaseCluster; 035import org.apache.hadoop.hbase.NamespaceDescriptor; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 038import org.apache.hadoop.hbase.ipc.CallTimeoutException; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.RSRpcServices; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 059 060@Category({ MediumTests.class, ClientTests.class }) 061public class TestClientScannerTimeouts { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestClientScannerTimeouts.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); 068 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 069 070 private static AsyncConnection ASYNC_CONN; 071 private static Connection CONN; 072 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 073 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 074 private static final byte[] VALUE = Bytes.toBytes("testValue"); 075 076 private static final byte[] ROW0 = Bytes.toBytes("row-0"); 077 private static final byte[] ROW1 = Bytes.toBytes("row-1"); 078 private static final byte[] ROW2 = Bytes.toBytes("row-2"); 079 private static final byte[] ROW3 = Bytes.toBytes("row-3"); 080 private static final int rpcTimeout = 1000; 081 private static final int scanTimeout = 3 * rpcTimeout; 082 private static final int metaScanTimeout = 6 * rpcTimeout; 083 private static final int CLIENT_RETRIES_NUMBER = 3; 084 085 private static TableName tableName; 086 087 @Rule 088 public TestName name = new TestName(); 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 Configuration conf = TEST_UTIL.getConfiguration(); 093 // Don't report so often so easier to see other rpcs 094 conf.setInt("hbase.regionserver.msginterval", 3 * 10000); 095 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); 096 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); 097 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); 098 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); 099 TEST_UTIL.startMiniCluster(1); 100 101 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); 102 conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); 103 conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); 104 ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); 105 CONN = ConnectionFactory.createConnection(conf); 106 } 107 108 @AfterClass 109 public static void tearDownAfterClass() throws Exception { 110 CONN.close(); 111 ASYNC_CONN.close(); 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 public void setup(boolean isSystemTable) throws IOException { 116 RSRpcServicesWithScanTimeout.reset(); 117 118 String nameAsString = name.getMethodName(); 119 if (isSystemTable) { 120 nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; 121 } 122 tableName = TableName.valueOf(nameAsString); 123 TEST_UTIL.createTable(tableName, FAMILY); 124 125 Table table = CONN.getTable(tableName); 126 putToTable(table, ROW0); 127 putToTable(table, ROW1); 128 putToTable(table, ROW2); 129 putToTable(table, ROW3); 130 LOG.info("Wrote our four values"); 131 132 table.getRegionLocator().getAllRegionLocations(); 133 134 // reset again incase the creation/population caused anything to trigger 135 RSRpcServicesWithScanTimeout.reset(); 136 } 137 138 private void expectRow(byte[] expected, Result result) { 139 assertTrue("Expected row: " + Bytes.toString(expected), 140 Bytes.equals(expected, result.getRow())); 141 } 142 143 private void expectNumTries(int expected) { 144 assertEquals( 145 "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber, 146 expected, RSRpcServicesWithScanTimeout.tryNumber); 147 // reset for next 148 RSRpcServicesWithScanTimeout.tryNumber = 0; 149 } 150 151 /** 152 * verify that we don't miss any data when encountering an OutOfOrderScannerNextException. 153 * Typically, the only way to naturally trigger this is if a client-side timeout causes an 154 * erroneous next() call. This is relatively hard to do these days because the server attempts to 155 * always return before the timeout. In this test we force the server to throw this exception, so 156 * that we can test the retry logic appropriately. 157 */ 158 @Test 159 public void testRetryOutOfOrderScannerNextException() throws IOException { 160 expectRetryOutOfOrderScannerNext(() -> getScanner(CONN)); 161 } 162 163 /** 164 * AsyncTable version of above 165 */ 166 @Test 167 public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { 168 expectRetryOutOfOrderScannerNext(this::getAsyncScanner); 169 } 170 171 /** 172 * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a 173 * special connection which has retries disabled, because otherwise the scanner will retry the 174 * timed out next() call and mess up the test. 175 */ 176 @Test 177 public void testNormalScanTimeoutOnNext() throws IOException { 178 setup(false); 179 // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and 180 // will retry until scannerTimeout. This makes it more difficult to test the timeouts 181 // of normal next() calls. So we use a separate connection here which has retries disabled. 182 Configuration confNoRetries = new Configuration(CONN.getConfiguration()); 183 confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 184 try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { 185 expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); 186 } 187 } 188 189 /** 190 * AsyncTable version of above 191 */ 192 @Test 193 public void testNormalScanTimeoutOnNextAsync() throws IOException { 194 setup(false); 195 expectTimeoutOnNext(scanTimeout, this::getAsyncScanner); 196 } 197 198 /** 199 * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for 200 * meta scans 201 */ 202 @Test 203 public void testNormalScanTimeoutOnOpenScanner() throws IOException { 204 setup(false); 205 expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner); 206 } 207 208 /** 209 * AsyncTable version of above 210 */ 211 @Test 212 public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { 213 setup(false); 214 expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner); 215 } 216 217 /** 218 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for 219 * next() calls in meta scans 220 */ 221 @Test 222 public void testMetaScanTimeoutOnNext() throws IOException { 223 setup(true); 224 expectTimeoutOnNext(metaScanTimeout, this::getScanner); 225 } 226 227 /** 228 * AsyncTable version of above 229 */ 230 @Test 231 public void testMetaScanTimeoutOnNextAsync() throws IOException { 232 setup(true); 233 expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner); 234 } 235 236 /** 237 * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for 238 * openScanner() calls for meta scans 239 */ 240 @Test 241 public void testMetaScanTimeoutOnOpenScanner() throws IOException { 242 setup(true); 243 expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); 244 } 245 246 /** 247 * AsyncTable version of above 248 */ 249 @Test 250 public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { 251 setup(true); 252 expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); 253 } 254 255 private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier) 256 throws IOException { 257 setup(false); 258 RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1; 259 260 LOG.info( 261 "Opening scanner, expecting no errors from first next() call from openScanner response"); 262 ResultScanner scanner = scannerSupplier.get(); 263 Result result = scanner.next(); 264 expectRow(ROW0, result); 265 expectNumTries(0); 266 267 LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); 268 result = scanner.next(); 269 expectRow(ROW1, result); 270 expectNumTries(0); 271 272 LOG.info( 273 "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); 274 result = scanner.next(); 275 expectRow(ROW2, result); 276 expectNumTries(1); 277 278 // reset so no errors. since last call restarted the scan and following 279 // call would otherwise fail 280 RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; 281 282 LOG.info("Finishing scan, expecting no errors"); 283 result = scanner.next(); 284 expectRow(ROW3, result); 285 scanner.close(); 286 287 LOG.info("Testing always throw exception"); 288 byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; 289 int i = 0; 290 291 // test the case that RPC always throws 292 scanner = scannerSupplier.get(); 293 RSRpcServicesWithScanTimeout.throwAlways = true; 294 295 while (true) { 296 LOG.info("Calling scanner.next()"); 297 result = scanner.next(); 298 if (result == null) { 299 break; 300 } else { 301 byte[] expectedResult = expectedResults[i++]; 302 expectRow(expectedResult, result); 303 } 304 } 305 306 // ensure we verified all rows. this along with the expectRow check above 307 // proves that we didn't miss any rows. 308 assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length 309 + ", actual index=" + i, expectedResults.length, i); 310 311 // expect all but the first row (which came from initial openScanner) to have thrown an error 312 expectNumTries(expectedResults.length - 1); 313 314 } 315 316 private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier) 317 throws IOException { 318 RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; 319 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 320 321 LOG.info( 322 "Opening scanner, expecting no timeouts from first next() call from openScanner response"); 323 ResultScanner scanner = scannerSupplier.get(); 324 Result result = scanner.next(); 325 expectRow(ROW0, result); 326 327 LOG.info("Making first next() RPC, expecting no timeout for seqNo 0"); 328 result = scanner.next(); 329 expectRow(ROW1, result); 330 331 LOG.info("Making second next() RPC, expecting timeout"); 332 long start = System.nanoTime(); 333 try { 334 scanner.next(); 335 fail("Expected CallTimeoutException"); 336 } catch (RetriesExhaustedException e) { 337 assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException 338 || e.getCause() instanceof SocketTimeoutException); 339 } 340 expectTimeout(start, timeout); 341 } 342 343 private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier) 344 throws IOException { 345 RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); 346 RSRpcServicesWithScanTimeout.sleepOnOpen = true; 347 LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response"); 348 long start = System.nanoTime(); 349 try { 350 scannerSupplier.get().next(); 351 fail("Expected SocketTimeoutException or CallTimeoutException"); 352 } catch (RetriesExhaustedException e) { 353 LOG.info("Got error", e); 354 assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(), 355 e.getCause() instanceof CallTimeoutException 356 || e.getCause() instanceof SocketTimeoutException); 357 } 358 expectTimeout(start, timeout); 359 } 360 361 private void expectTimeout(long start, int timeout) { 362 long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); 363 LOG.info("Expected duration >= {}, and got {}", timeout, duration); 364 assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout); 365 } 366 367 private ResultScanner getScanner() { 368 return getScanner(CONN); 369 } 370 371 private ResultScanner getScanner(Connection conn) { 372 Scan scan = new Scan(); 373 scan.setCaching(1); 374 try { 375 return conn.getTable(tableName).getScanner(scan); 376 } catch (IOException e) { 377 throw new RuntimeException(e); 378 } 379 } 380 381 private ResultScanner getAsyncScanner() { 382 Scan scan = new Scan(); 383 scan.setCaching(1); 384 return ASYNC_CONN.getTable(tableName).getScanner(scan); 385 } 386 387 private void putToTable(Table ht, byte[] rowkey) throws IOException { 388 Put put = new Put(rowkey); 389 put.addColumn(FAMILY, QUALIFIER, VALUE); 390 ht.put(put); 391 } 392 393 private static class RegionServerWithScanTimeout 394 extends MiniHBaseCluster.MiniHBaseClusterRegionServer { 395 public RegionServerWithScanTimeout(Configuration conf) 396 throws IOException, InterruptedException { 397 super(conf); 398 } 399 400 @Override 401 protected RSRpcServices createRpcServices() throws IOException { 402 return new RSRpcServicesWithScanTimeout(this); 403 } 404 } 405 406 private static class RSRpcServicesWithScanTimeout extends RSRpcServices { 407 private long tableScannerId; 408 409 private static long seqNoToThrowOn = -1; 410 private static boolean throwAlways = false; 411 private static boolean threw; 412 413 private static long seqNoToSleepOn = -1; 414 private static boolean sleepOnOpen = false; 415 private static volatile boolean slept; 416 private static int tryNumber = 0; 417 418 private static int sleepTime = rpcTimeout + 500; 419 420 public static void setSleepForTimeout(int timeout) { 421 sleepTime = timeout + 500; 422 } 423 424 public static void reset() { 425 setSleepForTimeout(scanTimeout); 426 427 seqNoToSleepOn = -1; 428 seqNoToThrowOn = -1; 429 throwAlways = false; 430 threw = false; 431 sleepOnOpen = false; 432 slept = false; 433 tryNumber = 0; 434 } 435 436 public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { 437 super(rs); 438 } 439 440 @Override 441 public ScanResponse scan(final RpcController controller, final ScanRequest request) 442 throws ServiceException { 443 if (request.hasScannerId()) { 444 LOG.info("Got request {}", request); 445 ScanResponse scanResponse = super.scan(controller, request); 446 if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { 447 return scanResponse; 448 } 449 450 if ( 451 throwAlways 452 || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq()) 453 ) { 454 threw = true; 455 tryNumber++; 456 LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber, 457 tableScannerId); 458 throw new ServiceException(new OutOfOrderScannerNextException()); 459 } 460 461 if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { 462 try { 463 LOG.info("SLEEPING " + sleepTime); 464 Thread.sleep(sleepTime); 465 } catch (InterruptedException e) { 466 } 467 slept = true; 468 tryNumber++; 469 } 470 return scanResponse; 471 } else { 472 ScanResponse scanRes = super.scan(controller, request); 473 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); 474 if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { 475 tableScannerId = scanRes.getScannerId(); 476 if (sleepOnOpen) { 477 try { 478 LOG.info("openScanner SLEEPING " + sleepTime); 479 Thread.sleep(sleepTime); 480 } catch (InterruptedException e) { 481 } 482 } 483 } 484 return scanRes; 485 } 486 } 487 } 488}