001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Modifier; 030import java.net.SocketTimeoutException; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Set; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.SynchronousQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.stream.Collectors; 043import java.util.stream.IntStream; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.CallDroppedException; 046import org.apache.hadoop.hbase.CallQueueTooBigException; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseServerException; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.RegionLocations; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.Waiter; 057import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 058import org.apache.hadoop.hbase.exceptions.DeserializationException; 059import org.apache.hadoop.hbase.exceptions.RegionMovedException; 060import org.apache.hadoop.hbase.filter.Filter; 061import org.apache.hadoop.hbase.filter.FilterBase; 062import org.apache.hadoop.hbase.ipc.RpcClient; 063import org.apache.hadoop.hbase.master.HMaster; 064import org.apache.hadoop.hbase.regionserver.HRegion; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.Region; 067import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.JVMClusterUtil; 072import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 073import org.apache.hadoop.hbase.util.Threads; 074import org.junit.After; 075import org.junit.AfterClass; 076import org.junit.Assert; 077import org.junit.BeforeClass; 078import org.junit.ClassRule; 079import org.junit.Ignore; 080import org.junit.Rule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.rules.TestName; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 088import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 089import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 090import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level; 091 092/** 093 * This class is for testing HBaseConnectionManager features 094 */ 095@Category({ LargeTests.class }) 096public class TestConnectionImplementation { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestConnectionImplementation.class); 101 102 private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class); 103 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 104 private static final TableName TABLE_NAME = TableName.valueOf("test"); 105 private static final TableName TABLE_NAME1 = TableName.valueOf("test1"); 106 private static final TableName TABLE_NAME2 = TableName.valueOf("test2"); 107 private static final TableName TABLE_NAME3 = TableName.valueOf("test3"); 108 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 109 private static final byte[] ROW = Bytes.toBytes("bbb"); 110 private static final byte[] ROW_X = Bytes.toBytes("xxx"); 111 private static final int RPC_RETRY = 5; 112 113 @Rule 114 public TestName name = new TestName(); 115 116 @BeforeClass 117 public static void setUpBeforeClass() throws Exception { 118 ResourceLeakDetector.setLevel(Level.PARANOID); 119 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 120 // Up the handlers; this test needs more than usual. 121 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 122 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 123 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 124 TEST_UTIL.startMiniCluster(2); 125 126 } 127 128 @AfterClass 129 public static void tearDownAfterClass() throws Exception { 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 @After 134 public void tearDown() throws IOException { 135 TEST_UTIL.getAdmin().balancerSwitch(true, true); 136 } 137 138 @Test 139 public void testClusterConnection() throws IOException { 140 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, 141 new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d") 142 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 143 144 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 145 Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); 146 // make sure the internally created ExecutorService is the one passed 147 assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool()); 148 149 final TableName tableName = TableName.valueOf(name.getMethodName()); 150 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 151 Table table = con1.getTable(tableName, otherPool); 152 153 ExecutorService pool = null; 154 155 if (table instanceof HTable) { 156 HTable t = (HTable) table; 157 // make sure passing a pool to the getTable does not trigger creation of an internal pool 158 assertNull("Internal Thread pool should be null", 159 ((ConnectionImplementation) con1).getCurrentBatchPool()); 160 // table should use the pool passed 161 assertTrue(otherPool == t.getPool()); 162 t.close(); 163 164 t = (HTable) con2.getTable(tableName); 165 // table should use the connectin's internal pool 166 assertTrue(otherPool == t.getPool()); 167 t.close(); 168 169 t = (HTable) con2.getTable(tableName); 170 // try other API too 171 assertTrue(otherPool == t.getPool()); 172 t.close(); 173 174 t = (HTable) con2.getTable(tableName); 175 // try other API too 176 assertTrue(otherPool == t.getPool()); 177 t.close(); 178 179 t = (HTable) con1.getTable(tableName); 180 pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); 181 // make sure an internal pool was created 182 assertNotNull("An internal Thread pool should have been created", pool); 183 // and that the table is using it 184 assertTrue(t.getPool() == pool); 185 t.close(); 186 187 t = (HTable) con1.getTable(tableName); 188 // still using the *same* internal pool 189 assertTrue(t.getPool() == pool); 190 t.close(); 191 } else { 192 table.close(); 193 } 194 195 con1.close(); 196 197 // if the pool was created on demand it should be closed upon connection close 198 if (pool != null) { 199 assertTrue(pool.isShutdown()); 200 } 201 202 con2.close(); 203 // if the pool is passed, it is not closed 204 assertFalse(otherPool.isShutdown()); 205 otherPool.shutdownNow(); 206 } 207 208 /** 209 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 210 * @throws IOException Unable to construct admin 211 */ 212 @Test 213 public void testAdminFactory() throws IOException { 214 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 215 Admin admin = con1.getAdmin(); 216 assertTrue(admin.getConnection() == con1); 217 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 218 con1.close(); 219 } 220 221 // Fails too often! Needs work. HBASE-12558 222 // May only fail on non-linux machines? E.g. macosx. 223 @Ignore 224 @Test(expected = RegionServerStoppedException.class) 225 // Depends on mulitcast messaging facility that seems broken in hbase2 226 // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of 227 // dead servers is broke" 228 public void testClusterStatus() throws Exception { 229 final TableName tableName = TableName.valueOf(name.getMethodName()); 230 byte[] cf = "cf".getBytes(); 231 byte[] rk = "rk1".getBytes(); 232 233 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer(); 234 rs.waitForServerOnline(); 235 final ServerName sn = rs.getRegionServer().getServerName(); 236 237 Table t = TEST_UTIL.createTable(tableName, cf); 238 TEST_UTIL.waitTableAvailable(tableName); 239 TEST_UTIL.waitUntilNoRegionsInTransition(); 240 241 final ConnectionImplementation hci = (ConnectionImplementation) TEST_UTIL.getConnection(); 242 try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 243 while (l.getRegionLocation(rk).getPort() != sn.getPort()) { 244 TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(), 245 sn); 246 TEST_UTIL.waitUntilNoRegionsInTransition(); 247 hci.clearRegionCache(tableName); 248 } 249 Assert.assertNotNull(hci.clusterStatusListener); 250 TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000); 251 } 252 253 Put p1 = new Put(rk); 254 p1.addColumn(cf, "qual".getBytes(), "val".getBytes()); 255 t.put(p1); 256 257 rs.getRegionServer().abort("I'm dead"); 258 259 // We want the status to be updated. That's a least 10 second 260 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 261 @Override 262 public boolean evaluate() throws Exception { 263 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().getDeadServers() 264 .isDeadServer(sn); 265 } 266 }); 267 268 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 269 @Override 270 public boolean evaluate() throws Exception { 271 return hci.clusterStatusListener.isDeadServer(sn); 272 } 273 }); 274 275 t.close(); 276 hci.getClient(sn); // will throw an exception: RegionServerStoppedException 277 } 278 279 /** 280 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 281 */ 282 @Test 283 public void testConnectionCloseAllowsInterrupt() throws Exception { 284 testConnectionClose(true); 285 } 286 287 @Test 288 public void testConnectionNotAllowsInterrupt() throws Exception { 289 testConnectionClose(false); 290 } 291 292 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 293 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 294 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 295 296 TEST_UTIL.getAdmin().balancerSwitch(false, true); 297 298 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 299 // We want to work on a separate connection. 300 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 301 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 302 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 303 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 304 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 305 // to avoid the client to be stuck when do the Get 306 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 307 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 308 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 309 310 Connection connection = ConnectionFactory.createConnection(c2); 311 final Table table = connection.getTable(tableName); 312 313 Put put = new Put(ROW); 314 put.addColumn(FAM_NAM, ROW, ROW); 315 table.put(put); 316 317 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 318 final AtomicInteger step = new AtomicInteger(0); 319 320 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 321 Thread t = new Thread("testConnectionCloseThread") { 322 @Override 323 public void run() { 324 int done = 0; 325 try { 326 step.set(1); 327 while (step.get() == 1) { 328 Get get = new Get(ROW); 329 table.get(get); 330 done++; 331 if (done % 100 == 0) LOG.info("done=" + done); 332 // without the sleep, will cause the exception for too many files in 333 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 334 Thread.sleep(100); 335 } 336 } catch (Throwable t) { 337 failed.set(t); 338 LOG.error(t.toString(), t); 339 } 340 step.set(3); 341 } 342 }; 343 t.start(); 344 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 345 @Override 346 public boolean evaluate() throws Exception { 347 return step.get() == 1; 348 } 349 }); 350 351 ServerName sn; 352 try (RegionLocator rl = connection.getRegionLocator(tableName)) { 353 sn = rl.getRegionLocation(ROW).getServerName(); 354 } 355 ConnectionImplementation conn = (ConnectionImplementation) connection; 356 RpcClient rpcClient = conn.getRpcClient(); 357 358 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); 359 for (int i = 0; i < 500; i++) { 360 rpcClient.cancelConnections(sn); 361 Thread.sleep(50); 362 } 363 364 step.compareAndSet(1, 2); 365 // The test may fail here if the thread doing the gets is stuck. The way to find 366 // out what's happening is to look for the thread named 'testConnectionCloseThread' 367 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 368 @Override 369 public boolean evaluate() throws Exception { 370 return step.get() == 3; 371 } 372 }); 373 table.close(); 374 connection.close(); 375 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 376 } 377 378 /** 379 * Test that connection can become idle without breaking everything. 380 */ 381 @Test 382 public void testConnectionIdle() throws Exception { 383 final TableName tableName = TableName.valueOf(name.getMethodName()); 384 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 385 int idleTime = 20000; 386 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 387 388 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 389 // We want to work on a separate connection. 390 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 391 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 392 c2.setInt(RpcClient.IDLE_TIME, idleTime); 393 394 Connection connection = ConnectionFactory.createConnection(c2); 395 final Table table = connection.getTable(tableName); 396 397 Put put = new Put(ROW); 398 put.addColumn(FAM_NAM, ROW, ROW); 399 table.put(put); 400 401 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 402 mee.setValue(EnvironmentEdgeManager.currentTime()); 403 EnvironmentEdgeManager.injectEdge(mee); 404 LOG.info("first get"); 405 table.get(new Get(ROW)); 406 407 LOG.info("first get - changing the time & sleeping"); 408 mee.incValue(idleTime + 1000); 409 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 410 // 1500 = sleep time in RpcClient#waitForWork + a margin 411 412 LOG.info("second get - connection has been marked idle in the middle"); 413 // To check that the connection actually became idle would need to read some private 414 // fields of RpcClient. 415 table.get(new Get(ROW)); 416 mee.incValue(idleTime + 1000); 417 418 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 419 // We're testing here a special case: 420 // time limit reached BUT connection not yet reclaimed AND a new call. 421 // in this situation, we don't close the connection, instead we use it immediately. 422 // If we're very unlucky we can have a race condition in the test: the connection is already 423 // under closing when we do the get, so we have an exception, and we don't retry as the 424 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 425 // a test issue only. 426 table.get(new Get(ROW)); 427 428 LOG.info("we're done - time will change back"); 429 430 table.close(); 431 432 connection.close(); 433 EnvironmentEdgeManager.reset(); 434 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 435 } 436 437 /** 438 * Test that the connection to the dead server is cut immediately when we receive the 439 * notification. n 440 */ 441 @Test 442 public void testConnectionCut() throws Exception { 443 final TableName tableName = TableName.valueOf(name.getMethodName()); 444 445 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 446 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 447 448 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 449 // We want to work on a separate connection. 450 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 451 // try only once w/o any retry 452 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 453 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); 454 455 final Connection connection = ConnectionFactory.createConnection(c2); 456 final Table table = connection.getTable(tableName); 457 458 Put p = new Put(FAM_NAM); 459 p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); 460 table.put(p); 461 462 final ConnectionImplementation hci = (ConnectionImplementation) connection; 463 464 final HRegionLocation loc; 465 try (RegionLocator rl = connection.getRegionLocator(tableName)) { 466 loc = rl.getRegionLocation(FAM_NAM); 467 } 468 469 Get get = new Get(FAM_NAM); 470 Assert.assertNotNull(table.get(get)); 471 472 get = new Get(FAM_NAM); 473 get.setFilter(new BlockingFilter()); 474 475 // This thread will mark the server as dead while we're waiting during a get. 476 Thread t = new Thread() { 477 @Override 478 public void run() { 479 synchronized (syncBlockingFilter) { 480 try { 481 syncBlockingFilter.wait(); 482 } catch (InterruptedException e) { 483 throw new RuntimeException(e); 484 } 485 } 486 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName()); 487 } 488 }; 489 490 t.start(); 491 try { 492 table.get(get); 493 Assert.fail(); 494 } catch (IOException expected) { 495 LOG.debug("Received: " + expected); 496 Assert.assertFalse(expected instanceof SocketTimeoutException); 497 Assert.assertFalse(syncBlockingFilter.get()); 498 } finally { 499 syncBlockingFilter.set(true); 500 t.join(); 501 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 502 } 503 504 table.close(); 505 connection.close(); 506 } 507 508 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false); 509 510 public static class BlockingFilter extends FilterBase { 511 @Override 512 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { 513 int i = 0; 514 while (i++ < 1000 && !syncBlockingFilter.get()) { 515 synchronized (syncBlockingFilter) { 516 syncBlockingFilter.notifyAll(); 517 } 518 Threads.sleep(100); 519 } 520 syncBlockingFilter.set(true); 521 return false; 522 } 523 524 @Override 525 public ReturnCode filterCell(final Cell ignored) throws IOException { 526 return ReturnCode.INCLUDE; 527 } 528 529 public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException { 530 return new BlockingFilter(); 531 } 532 } 533 534 /** 535 * Test that when we delete a location using the first row of a region that we really delete it. n 536 */ 537 @Test 538 public void testRegionCaching() throws Exception { 539 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); 540 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 541 // test with no retry, or client cache will get updated after the first failure 542 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 543 Connection connection = ConnectionFactory.createConnection(conf); 544 final Table table = connection.getTable(TABLE_NAME); 545 546 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); 547 Put put = new Put(ROW); 548 put.addColumn(FAM_NAM, ROW, ROW); 549 table.put(put); 550 551 ConnectionImplementation conn = (ConnectionImplementation) connection; 552 553 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 554 555 // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at 556 // a location where the port is current port number +1 -- i.e. a non-existent location. 557 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 558 final int nextPort = loc.getPort() + 1; 559 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), 560 ServerName.valueOf("127.0.0.1", nextPort, HConstants.LATEST_TIMESTAMP), 561 HConstants.LATEST_TIMESTAMP); 562 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort(), 563 nextPort); 564 565 conn.clearRegionCache(TABLE_NAME, ROW.clone()); 566 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW); 567 assertNull("What is this location?? " + rl, rl); 568 569 // We're now going to move the region and check that it works for the client 570 // First a new put to add the location in the cache 571 conn.clearRegionCache(TABLE_NAME); 572 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); 573 Put put2 = new Put(ROW); 574 put2.addColumn(FAM_NAM, ROW, ROW); 575 table.put(put2); 576 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 577 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone())); 578 579 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 580 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 581 582 // We can wait for all regions to be online, that makes log reading easier when debugging 583 TEST_UTIL.waitUntilNoRegionsInTransition(); 584 585 // Now moving the region to the second server 586 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 587 byte[] regionName = toMove.getRegionInfo().getRegionName(); 588 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 589 590 // Choose the other server. 591 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 592 int destServerId = curServerId == 0 ? 1 : 0; 593 594 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 595 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 596 597 ServerName destServerName = destServer.getServerName(); 598 599 // Check that we are in the expected state 600 Assert.assertTrue(curServer != destServer); 601 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); 602 Assert.assertFalse(toMove.getPort() == destServerName.getPort()); 603 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 604 Assert.assertNull(destServer.getOnlineRegion(regionName)); 605 Assert.assertFalse( 606 TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()); 607 608 // Moving. It's possible that we don't have all the regions online at this point, so 609 // the test must depend only on the region we're looking at. 610 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 611 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 612 613 while ( 614 destServer.getOnlineRegion(regionName) == null 615 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 616 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 617 || master.getAssignmentManager().hasRegionsInTransition() 618 ) { 619 // wait for the move to be finished 620 Thread.sleep(1); 621 } 622 623 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 624 625 // Check our new state. 626 Assert.assertNull(curServer.getOnlineRegion(regionName)); 627 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 628 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 629 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 630 631 // Cache was NOT updated and points to the wrong server 632 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() 633 == destServerName.getPort()); 634 635 // This part relies on a number of tries equals to 1. 636 // We do a put and expect the cache to be updated, even if we don't retry 637 LOG.info("Put starting"); 638 Put put3 = new Put(ROW); 639 put3.addColumn(FAM_NAM, ROW, ROW); 640 try { 641 table.put(put3); 642 Assert.fail("Unreachable point"); 643 } catch (RetriesExhaustedWithDetailsException e) { 644 LOG.info("Put done, exception caught: " + e.getClass()); 645 Assert.assertEquals(1, e.getNumExceptions()); 646 Assert.assertEquals(1, e.getCauses().size()); 647 Assert.assertArrayEquals(ROW, e.getRow(0).getRow()); 648 649 // Check that we unserialized the exception as expected 650 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); 651 Assert.assertNotNull(cause); 652 Assert.assertTrue(cause instanceof RegionMovedException); 653 } catch (RetriesExhaustedException ree) { 654 // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException 655 // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue. 656 LOG.info("Put done, exception caught: " + ree.getClass()); 657 Throwable cause = ClientExceptionsUtil.findException(ree.getCause()); 658 Assert.assertNotNull(cause); 659 Assert.assertTrue(cause instanceof RegionMovedException); 660 } 661 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW)); 662 Assert.assertEquals("Previous server was " + curServer.getServerName().getAddress(), 663 destServerName.getPort(), 664 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 665 666 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 667 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 668 669 // We move it back to do another test with a scan 670 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 671 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), 672 curServer.getServerName()); 673 674 while ( 675 curServer.getOnlineRegion(regionName) == null 676 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 677 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 678 || master.getAssignmentManager().hasRegionsInTransition() 679 ) { 680 // wait for the move to be finished 681 Thread.sleep(1); 682 } 683 684 // Check our new state. 685 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 686 Assert.assertNull(destServer.getOnlineRegion(regionName)); 687 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 688 689 // Cache was NOT updated and points to the wrong server 690 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() 691 == curServer.getServerName().getPort()); 692 693 Scan sc = new Scan(); 694 sc.setStopRow(ROW); 695 sc.setStartRow(ROW); 696 697 // The scanner takes the max retries from the connection configuration, not the table as 698 // the put. 699 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 700 701 try { 702 ResultScanner rs = table.getScanner(sc); 703 while (rs.next() != null) { 704 } 705 Assert.fail("Unreachable point"); 706 } catch (RetriesExhaustedException e) { 707 LOG.info("Scan done, expected exception caught: " + e.getClass()); 708 } 709 710 // Cache is updated with the right value. 711 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 712 Assert.assertEquals("Previous server was " + destServer.getServerName().getAddress(), 713 curServer.getServerName().getPort(), 714 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 715 716 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 717 table.close(); 718 connection.close(); 719 } 720 721 /** 722 * Test that Connection or Pool are not closed when managed externally n 723 */ 724 @Test 725 public void testConnectionManagement() throws Exception { 726 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); 727 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 728 Table table = conn.getTable(TABLE_NAME1); 729 table.close(); 730 assertFalse(conn.isClosed()); 731 if (table instanceof HTable) { 732 assertFalse(((HTable) table).getPool().isShutdown()); 733 } 734 table = conn.getTable(TABLE_NAME1); 735 table.close(); 736 if (table instanceof HTable) { 737 assertFalse(((HTable) table).getPool().isShutdown()); 738 } 739 conn.close(); 740 if (table instanceof HTable) { 741 assertTrue(((HTable) table).getPool().isShutdown()); 742 } 743 table0.close(); 744 } 745 746 /** 747 * Test that stale cache updates don't override newer cached values. 748 */ 749 @Test 750 public void testCacheSeqNums() throws Exception { 751 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM); 752 Put put = new Put(ROW); 753 put.addColumn(FAM_NAM, ROW, ROW); 754 table.put(put); 755 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 756 757 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 758 assertNotNull(location); 759 760 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); 761 762 // Same server as already in cache reporting - overwrites any value despite seqNum. 763 int nextPort = location.getPort() + 1; 764 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 765 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 766 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 767 Assert.assertEquals(nextPort, location.getPort()); 768 769 // No source specified - same. 770 nextPort = location.getPort() + 1; 771 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 772 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 773 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 774 Assert.assertEquals(nextPort, location.getPort()); 775 776 // Higher seqNum - overwrites lower seqNum. 777 nextPort = location.getPort() + 1; 778 conn.updateCachedLocation(location.getRegionInfo(), anySource, 779 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1); 780 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 781 Assert.assertEquals(nextPort, location.getPort()); 782 783 // Lower seqNum - does not overwrite higher seqNum. 784 nextPort = location.getPort() + 1; 785 conn.updateCachedLocation(location.getRegionInfo(), anySource, 786 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 787 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 788 Assert.assertEquals(nextPort - 1, location.getPort()); 789 table.close(); 790 } 791 792 @Test 793 public void testClosing() throws Exception { 794 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); 795 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 796 String.valueOf(ThreadLocalRandom.current().nextInt())); 797 798 // as connection caching is going away, now we're just testing 799 // that closed connection does actually get closed. 800 801 Connection c1 = ConnectionFactory.createConnection(configuration); 802 Connection c2 = ConnectionFactory.createConnection(configuration); 803 // no caching, different connections 804 assertTrue(c1 != c2); 805 806 // closing independently 807 c1.close(); 808 assertTrue(c1.isClosed()); 809 assertFalse(c2.isClosed()); 810 811 c2.close(); 812 assertTrue(c2.isClosed()); 813 } 814 815 /** 816 * Trivial test to verify that nobody messes with 817 * {@link ConnectionFactory#createConnection(Configuration)} 818 */ 819 @Test 820 public void testCreateConnection() throws Exception { 821 Configuration configuration = TEST_UTIL.getConfiguration(); 822 Connection c1 = ConnectionFactory.createConnection(configuration); 823 Connection c2 = ConnectionFactory.createConnection(configuration); 824 // created from the same configuration, yet they are different 825 assertTrue(c1 != c2); 826 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 827 } 828 829 /** 830 * This test checks that one can connect to the cluster with only the ZooKeeper quorum set. Other 831 * stuff like master address will be read from ZK by the client. 832 */ 833 @Test 834 public void testConnection() throws Exception { 835 // We create an empty config and add the ZK address. 836 Configuration c = new Configuration(); 837 // This test only makes sense for ZK based connection registry. 838 c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 839 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 840 c.set(HConstants.ZOOKEEPER_QUORUM, 841 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); 842 c.set(HConstants.ZOOKEEPER_CLIENT_PORT, 843 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); 844 // This should be enough to connect 845 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c); 846 assertTrue(conn.isMasterRunning()); 847 conn.close(); 848 } 849 850 private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception { 851 Field numTries = hci.getClass().getDeclaredField("numTries"); 852 numTries.setAccessible(true); 853 Field modifiersField = Field.class.getDeclaredField("modifiers"); 854 modifiersField.setAccessible(true); 855 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); 856 final int prevNumRetriesVal = (Integer) numTries.get(hci); 857 numTries.set(hci, newVal); 858 859 return prevNumRetriesVal; 860 } 861 862 @Test 863 public void testMulti() throws Exception { 864 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM); 865 try { 866 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 867 868 // We're now going to move the region and check that it works for the client 869 // First a new put to add the location in the cache 870 conn.clearRegionCache(TABLE_NAME3); 871 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); 872 873 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 874 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 875 876 // We can wait for all regions to be online, that makes log reading easier when debugging 877 TEST_UTIL.waitUntilNoRegionsInTransition(); 878 879 Put put = new Put(ROW_X); 880 put.addColumn(FAM_NAM, ROW_X, ROW_X); 881 table.put(put); 882 883 // Now moving the region to the second server 884 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); 885 byte[] regionName = toMove.getRegionInfo().getRegionName(); 886 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 887 888 // Choose the other server. 889 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 890 int destServerId = (curServerId == 0 ? 1 : 0); 891 892 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 893 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 894 895 ServerName destServerName = destServer.getServerName(); 896 ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 897 898 // find another row in the cur server that is less than ROW_X 899 List<HRegion> regions = curServer.getRegions(TABLE_NAME3); 900 byte[] otherRow = null; 901 for (Region region : regions) { 902 if ( 903 !region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) 904 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0 905 ) { 906 otherRow = region.getRegionInfo().getStartKey(); 907 break; 908 } 909 } 910 assertNotNull(otherRow); 911 // If empty row, set it to first row.-f 912 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); 913 Put put2 = new Put(otherRow); 914 put2.addColumn(FAM_NAM, otherRow, otherRow); 915 table.put(put2); // cache put2's location 916 917 // Check that we are in the expected state 918 Assert.assertTrue(curServer != destServer); 919 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); 920 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); 921 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 922 Assert.assertNull(destServer.getOnlineRegion(regionName)); 923 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager() 924 .hasRegionsInTransition()); 925 926 // Moving. It's possible that we don't have all the regions online at this point, so 927 // the test depends only on the region we're looking at. 928 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 929 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 930 931 while ( 932 destServer.getOnlineRegion(regionName) == null 933 || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 934 || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) 935 || master.getAssignmentManager().hasRegionsInTransition() 936 ) { 937 // wait for the move to be finished 938 Thread.sleep(1); 939 } 940 941 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 942 943 // Check our new state. 944 Assert.assertNull(curServer.getOnlineRegion(regionName)); 945 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 946 Assert 947 .assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 948 Assert 949 .assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 950 951 // Cache was NOT updated and points to the wrong server 952 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation().getPort() 953 == destServerName.getPort()); 954 955 // Hijack the number of retry to fail after 2 tries 956 final int prevNumRetriesVal = setNumTries(conn, 2); 957 958 Put put3 = new Put(ROW_X); 959 put3.addColumn(FAM_NAM, ROW_X, ROW_X); 960 Put put4 = new Put(otherRow); 961 put4.addColumn(FAM_NAM, otherRow, otherRow); 962 963 // do multi 964 ArrayList<Put> actions = Lists.newArrayList(put4, put3); 965 table.batch(actions, null); // first should be a valid row, 966 // second we get RegionMovedException. 967 968 setNumTries(conn, prevNumRetriesVal); 969 } finally { 970 table.close(); 971 } 972 } 973 974 @Test 975 public void testErrorBackoffTimeCalculation() throws Exception { 976 // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. 977 final long ANY_PAUSE = 100; 978 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); 979 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); 980 981 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); 982 EnvironmentEdgeManager.injectEdge(timeMachine); 983 try { 984 long largeAmountOfTime = ANY_PAUSE * 1000; 985 ConnectionImplementation.ServerErrorTracker tracker = 986 new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); 987 988 // The default backoff is 0. 989 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); 990 991 // Check some backoff values from HConstants sequence. 992 tracker.reportServerError(location); 993 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 994 tracker.calculateBackoffTime(location, ANY_PAUSE)); 995 tracker.reportServerError(location); 996 tracker.reportServerError(location); 997 tracker.reportServerError(location); 998 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3], 999 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1000 1001 // All of this shouldn't affect backoff for different location. 1002 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1003 tracker.reportServerError(diffLocation); 1004 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1005 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1006 1007 // Check with different base. 1008 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3], 1009 tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); 1010 } finally { 1011 EnvironmentEdgeManager.reset(); 1012 } 1013 } 1014 1015 private static void assertEqualsWithJitter(long expected, long actual) { 1016 assertEqualsWithJitter(expected, actual, expected); 1017 } 1018 1019 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { 1020 assertTrue("Value not within jitter: " + expected + " vs " + actual, 1021 Math.abs(actual - expected) <= (0.01f * jitterBase)); 1022 } 1023 1024 @Test 1025 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException { 1026 Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 1027 // This test only makes sense for ZK based connection registry. 1028 config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 1029 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 1030 1031 final TableName tableName = TableName.valueOf(name.getMethodName()); 1032 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close(); 1033 1034 Connection connection = ConnectionFactory.createConnection(config); 1035 Table table = connection.getTable(tableName); 1036 1037 // this will cache the meta location and table's region location 1038 table.get(new Get(Bytes.toBytes("foo"))); 1039 1040 // restart HBase 1041 TEST_UTIL.shutdownMiniHBaseCluster(); 1042 TEST_UTIL.restartHBaseCluster(2); 1043 // this should be able to discover new locations for meta and table's region 1044 table.get(new Get(Bytes.toBytes("foo"))); 1045 TEST_UTIL.deleteTable(tableName); 1046 table.close(); 1047 connection.close(); 1048 } 1049 1050 @Test 1051 public void testLocateRegionsWithRegionReplicas() throws IOException { 1052 int regionReplication = 3; 1053 byte[] family = Bytes.toBytes("cf"); 1054 TableName tableName = TableName.valueOf(name.getMethodName()); 1055 1056 // Create a table with region replicas 1057 TableDescriptorBuilder builder = 1058 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication) 1059 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1060 TEST_UTIL.getAdmin().createTable(builder.build()); 1061 1062 try (ConnectionImplementation con = 1063 (ConnectionImplementation) ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { 1064 // Get locations of the regions of the table 1065 List<HRegionLocation> locations = con.locateRegions(tableName, false, false); 1066 1067 // The size of the returned locations should be 3 1068 assertEquals(regionReplication, locations.size()); 1069 1070 // The replicaIds of the returned locations should be 0, 1 and 2 1071 Set<Integer> expectedReplicaIds = 1072 IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet()); 1073 for (HRegionLocation location : locations) { 1074 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 1075 } 1076 } finally { 1077 TEST_UTIL.deleteTable(tableName); 1078 } 1079 } 1080 1081 @Test 1082 public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException { 1083 testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class); 1084 } 1085 1086 @Test 1087 public void testLocateRegionsRetrySpecialPauseCDE() throws IOException { 1088 testLocateRegionsRetrySpecialPause(CallDroppedException.class); 1089 } 1090 1091 private void testLocateRegionsRetrySpecialPause( 1092 Class<? extends HBaseServerException> exceptionClass) throws IOException { 1093 1094 int regionReplication = 3; 1095 byte[] family = Bytes.toBytes("cf"); 1096 TableName tableName = TableName.valueOf(name.getMethodName()); 1097 1098 // Create a table with region replicas 1099 TableDescriptorBuilder builder = 1100 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication) 1101 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1102 TEST_UTIL.getAdmin().createTable(builder.build()); 1103 1104 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 1105 1106 conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, ThrowingCallerFactory.class, 1107 RpcRetryingCallerFactory.class); 1108 conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class); 1109 1110 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 1111 // normal pause very short, 10 millis 1112 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10); 1113 1114 // special pause 10x longer, so we can detect it 1115 long specialPauseMillis = 1000; 1116 conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 1117 specialPauseMillis); 1118 1119 try (ConnectionImplementation con = 1120 (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { 1121 // Get locations of the regions of the table 1122 1123 long start = System.nanoTime(); 1124 try { 1125 con.locateRegion(tableName, new byte[0], false, true, 0); 1126 } catch (HBaseServerException e) { 1127 assertTrue(e.isServerOverloaded()); 1128 // pass: expected 1129 } 1130 assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis)); 1131 } finally { 1132 TEST_UTIL.deleteTable(tableName); 1133 } 1134 } 1135 1136 private static class ThrowingCallerFactory extends RpcRetryingCallerFactory { 1137 1138 private final Class<? extends HBaseServerException> exceptionClass; 1139 1140 public ThrowingCallerFactory(Configuration conf) { 1141 super(conf); 1142 this.exceptionClass = 1143 conf.getClass("testSpecialPauseException", null, HBaseServerException.class); 1144 } 1145 1146 @Override 1147 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 1148 return newCaller(); 1149 } 1150 1151 @Override 1152 public <T> RpcRetryingCaller<T> newCaller() { 1153 return new RpcRetryingCaller<T>() { 1154 @Override 1155 public void cancel() { 1156 1157 } 1158 1159 @Override 1160 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 1161 throws IOException, RuntimeException { 1162 return callWithoutRetries(null, 0); 1163 } 1164 1165 @Override 1166 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 1167 throws IOException, RuntimeException { 1168 try { 1169 throw exceptionClass.getConstructor().newInstance(); 1170 } catch (IllegalAccessException | InstantiationException | InvocationTargetException 1171 | NoSuchMethodException e) { 1172 throw new RuntimeException(e); 1173 } 1174 } 1175 }; 1176 } 1177 } 1178 1179 @Test 1180 public void testMetaLookupThreadPoolCreated() throws Exception { 1181 final TableName tableName = TableName.valueOf(name.getMethodName()); 1182 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; 1183 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 1184 TEST_UTIL.getAdmin().disableTable(tableName); 1185 TEST_UTIL.getAdmin().deleteTable(tableName); 1186 } 1187 try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) { 1188 byte[] row = Bytes.toBytes("test"); 1189 ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); 1190 // check that metalookup pool would get created 1191 c.relocateRegion(tableName, row); 1192 ExecutorService ex = c.getCurrentMetaLookupPool(); 1193 assertNotNull(ex); 1194 } 1195 } 1196 1197 // There is no assertion, but you need to confirm that there is no resource leak output from netty 1198 @Test 1199 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException { 1200 TableName tableName = TableName.valueOf(name.getMethodName()); 1201 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 1202 TEST_UTIL.getAdmin().balancerSwitch(false, true); 1203 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 1204 Table table = connection.getTable(tableName)) { 1205 table.get(new Get(Bytes.toBytes("1"))); 1206 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName(); 1207 RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient(); 1208 rpcClient.cancelConnections(sn); 1209 Thread.sleep(1000); 1210 System.gc(); 1211 Thread.sleep(1000); 1212 } 1213 } 1214}