001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.lang.reflect.Modifier; 029import java.net.SocketTimeoutException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Set; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.SynchronousQueue; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.stream.Collectors; 042import java.util.stream.IntStream; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 054import org.apache.hadoop.hbase.exceptions.DeserializationException; 055import org.apache.hadoop.hbase.exceptions.RegionMovedException; 056import org.apache.hadoop.hbase.filter.Filter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.ipc.RpcClient; 059import org.apache.hadoop.hbase.master.HMaster; 060import org.apache.hadoop.hbase.regionserver.HRegion; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.regionserver.Region; 063import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 064import org.apache.hadoop.hbase.testclassification.LargeTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.JVMClusterUtil; 068import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 069import org.apache.hadoop.hbase.util.Threads; 070import org.junit.After; 071import org.junit.AfterClass; 072import org.junit.Assert; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Ignore; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 084import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 085import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level; 086 087/** 088 * This class is for testing HBaseConnectionManager features 089 */ 090@Category({LargeTests.class}) 091public class TestConnectionImplementation { 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestConnectionImplementation.class); 096 097 private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class); 098 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 099 private static final TableName TABLE_NAME = 100 TableName.valueOf("test"); 101 private static final TableName TABLE_NAME1 = 102 TableName.valueOf("test1"); 103 private static final TableName TABLE_NAME2 = 104 TableName.valueOf("test2"); 105 private static final TableName TABLE_NAME3 = 106 TableName.valueOf("test3"); 107 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 108 private static final byte[] ROW = Bytes.toBytes("bbb"); 109 private static final byte[] ROW_X = Bytes.toBytes("xxx"); 110 private static final int RPC_RETRY = 5; 111 112 @Rule 113 public TestName name = new TestName(); 114 115 @BeforeClass 116 public static void setUpBeforeClass() throws Exception { 117 ResourceLeakDetector.setLevel(Level.PARANOID); 118 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 119 // Up the handlers; this test needs more than usual. 120 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 121 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 122 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 123 TEST_UTIL.startMiniCluster(2); 124 125 } 126 127 @AfterClass 128 public static void tearDownAfterClass() throws Exception { 129 TEST_UTIL.shutdownMiniCluster(); 130 } 131 132 @After 133 public void tearDown() throws IOException { 134 TEST_UTIL.getAdmin().balancerSwitch(true, true); 135 } 136 137 @Test 138 public void testClusterConnection() throws IOException { 139 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 140 5, TimeUnit.SECONDS, 141 new SynchronousQueue<>(), 142 Threads.newDaemonThreadFactory("test-hcm")); 143 144 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 145 Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); 146 // make sure the internally created ExecutorService is the one passed 147 assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool()); 148 149 final TableName tableName = TableName.valueOf(name.getMethodName()); 150 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 151 Table table = con1.getTable(tableName, otherPool); 152 153 ExecutorService pool = null; 154 155 if(table instanceof HTable) { 156 HTable t = (HTable) table; 157 // make sure passing a pool to the getTable does not trigger creation of an internal pool 158 assertNull("Internal Thread pool should be null", 159 ((ConnectionImplementation) con1).getCurrentBatchPool()); 160 // table should use the pool passed 161 assertTrue(otherPool == t.getPool()); 162 t.close(); 163 164 t = (HTable) con2.getTable(tableName); 165 // table should use the connectin's internal pool 166 assertTrue(otherPool == t.getPool()); 167 t.close(); 168 169 t = (HTable) con2.getTable(tableName); 170 // try other API too 171 assertTrue(otherPool == t.getPool()); 172 t.close(); 173 174 t = (HTable) con2.getTable(tableName); 175 // try other API too 176 assertTrue(otherPool == t.getPool()); 177 t.close(); 178 179 t = (HTable) con1.getTable(tableName); 180 pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); 181 // make sure an internal pool was created 182 assertNotNull("An internal Thread pool should have been created", pool); 183 // and that the table is using it 184 assertTrue(t.getPool() == pool); 185 t.close(); 186 187 t = (HTable) con1.getTable(tableName); 188 // still using the *same* internal pool 189 assertTrue(t.getPool() == pool); 190 t.close(); 191 } else { 192 table.close(); 193 } 194 195 con1.close(); 196 197 // if the pool was created on demand it should be closed upon connection close 198 if(pool != null) { 199 assertTrue(pool.isShutdown()); 200 } 201 202 con2.close(); 203 // if the pool is passed, it is not closed 204 assertFalse(otherPool.isShutdown()); 205 otherPool.shutdownNow(); 206 } 207 208 /** 209 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 210 * @throws IOException Unable to construct admin 211 */ 212 @Test 213 public void testAdminFactory() throws IOException { 214 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 215 Admin admin = con1.getAdmin(); 216 assertTrue(admin.getConnection() == con1); 217 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 218 con1.close(); 219 } 220 221 // Fails too often! Needs work. HBASE-12558 222 // May only fail on non-linux machines? E.g. macosx. 223 @Ignore @Test (expected = RegionServerStoppedException.class) 224 // Depends on mulitcast messaging facility that seems broken in hbase2 225 // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of 226 // dead servers is broke" 227 public void testClusterStatus() throws Exception { 228 final TableName tableName = TableName.valueOf(name.getMethodName()); 229 byte[] cf = "cf".getBytes(); 230 byte[] rk = "rk1".getBytes(); 231 232 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer(); 233 rs.waitForServerOnline(); 234 final ServerName sn = rs.getRegionServer().getServerName(); 235 236 Table t = TEST_UTIL.createTable(tableName, cf); 237 TEST_UTIL.waitTableAvailable(tableName); 238 TEST_UTIL.waitUntilNoRegionsInTransition(); 239 240 final ConnectionImplementation hci = (ConnectionImplementation)TEST_UTIL.getConnection(); 241 try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 242 while (l.getRegionLocation(rk).getPort() != sn.getPort()) { 243 TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(), 244 sn); 245 TEST_UTIL.waitUntilNoRegionsInTransition(); 246 hci.clearRegionCache(tableName); 247 } 248 Assert.assertNotNull(hci.clusterStatusListener); 249 TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000); 250 } 251 252 Put p1 = new Put(rk); 253 p1.addColumn(cf, "qual".getBytes(), "val".getBytes()); 254 t.put(p1); 255 256 rs.getRegionServer().abort("I'm dead"); 257 258 // We want the status to be updated. That's a least 10 second 259 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 260 @Override 261 public boolean evaluate() throws Exception { 262 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager(). 263 getDeadServers().isDeadServer(sn); 264 } 265 }); 266 267 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 268 @Override 269 public boolean evaluate() throws Exception { 270 return hci.clusterStatusListener.isDeadServer(sn); 271 } 272 }); 273 274 t.close(); 275 hci.getClient(sn); // will throw an exception: RegionServerStoppedException 276 } 277 278 /** 279 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 280 */ 281 @Test 282 public void testConnectionCloseAllowsInterrupt() throws Exception { 283 testConnectionClose(true); 284 } 285 286 @Test 287 public void testConnectionNotAllowsInterrupt() throws Exception { 288 testConnectionClose(false); 289 } 290 291 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 292 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 293 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 294 295 TEST_UTIL.getAdmin().balancerSwitch(false, true); 296 297 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 298 // We want to work on a separate connection. 299 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 300 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 301 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 302 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 303 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 304 // to avoid the client to be stuck when do the Get 305 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 306 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 307 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 308 309 Connection connection = ConnectionFactory.createConnection(c2); 310 final Table table = connection.getTable(tableName); 311 312 Put put = new Put(ROW); 313 put.addColumn(FAM_NAM, ROW, ROW); 314 table.put(put); 315 316 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 317 final AtomicInteger step = new AtomicInteger(0); 318 319 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 320 Thread t = new Thread("testConnectionCloseThread") { 321 @Override 322 public void run() { 323 int done = 0; 324 try { 325 step.set(1); 326 while (step.get() == 1) { 327 Get get = new Get(ROW); 328 table.get(get); 329 done++; 330 if (done % 100 == 0) 331 LOG.info("done=" + done); 332 // without the sleep, will cause the exception for too many files in 333 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 334 Thread.sleep(100); 335 } 336 } catch (Throwable t) { 337 failed.set(t); 338 LOG.error(t.toString(), t); 339 } 340 step.set(3); 341 } 342 }; 343 t.start(); 344 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 345 @Override 346 public boolean evaluate() throws Exception { 347 return step.get() == 1; 348 } 349 }); 350 351 ServerName sn; 352 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 353 sn = rl.getRegionLocation(ROW).getServerName(); 354 } 355 ConnectionImplementation conn = 356 (ConnectionImplementation) connection; 357 RpcClient rpcClient = conn.getRpcClient(); 358 359 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); 360 for (int i = 0; i < 500; i++) { 361 rpcClient.cancelConnections(sn); 362 Thread.sleep(50); 363 } 364 365 step.compareAndSet(1, 2); 366 // The test may fail here if the thread doing the gets is stuck. The way to find 367 // out what's happening is to look for the thread named 'testConnectionCloseThread' 368 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 369 @Override 370 public boolean evaluate() throws Exception { 371 return step.get() == 3; 372 } 373 }); 374 table.close(); 375 connection.close(); 376 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 377 } 378 379 /** 380 * Test that connection can become idle without breaking everything. 381 */ 382 @Test 383 public void testConnectionIdle() throws Exception { 384 final TableName tableName = TableName.valueOf(name.getMethodName()); 385 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 386 int idleTime = 20000; 387 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 388 389 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 390 // We want to work on a separate connection. 391 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 392 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 393 c2.setInt(RpcClient.IDLE_TIME, idleTime); 394 395 Connection connection = ConnectionFactory.createConnection(c2); 396 final Table table = connection.getTable(tableName); 397 398 Put put = new Put(ROW); 399 put.addColumn(FAM_NAM, ROW, ROW); 400 table.put(put); 401 402 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 403 mee.setValue(System.currentTimeMillis()); 404 EnvironmentEdgeManager.injectEdge(mee); 405 LOG.info("first get"); 406 table.get(new Get(ROW)); 407 408 LOG.info("first get - changing the time & sleeping"); 409 mee.incValue(idleTime + 1000); 410 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 411 // 1500 = sleep time in RpcClient#waitForWork + a margin 412 413 LOG.info("second get - connection has been marked idle in the middle"); 414 // To check that the connection actually became idle would need to read some private 415 // fields of RpcClient. 416 table.get(new Get(ROW)); 417 mee.incValue(idleTime + 1000); 418 419 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 420 // We're testing here a special case: 421 // time limit reached BUT connection not yet reclaimed AND a new call. 422 // in this situation, we don't close the connection, instead we use it immediately. 423 // If we're very unlucky we can have a race condition in the test: the connection is already 424 // under closing when we do the get, so we have an exception, and we don't retry as the 425 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 426 // a test issue only. 427 table.get(new Get(ROW)); 428 429 LOG.info("we're done - time will change back"); 430 431 table.close(); 432 433 connection.close(); 434 EnvironmentEdgeManager.reset(); 435 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 436 } 437 438 /** 439 * Test that the connection to the dead server is cut immediately when we receive the 440 * notification. 441 * @throws Exception 442 */ 443 @Test 444 public void testConnectionCut() throws Exception { 445 final TableName tableName = TableName.valueOf(name.getMethodName()); 446 447 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 448 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 449 450 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 451 // We want to work on a separate connection. 452 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 453 // try only once w/o any retry 454 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 455 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); 456 457 final Connection connection = ConnectionFactory.createConnection(c2); 458 final Table table = connection.getTable(tableName); 459 460 Put p = new Put(FAM_NAM); 461 p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); 462 table.put(p); 463 464 final ConnectionImplementation hci = (ConnectionImplementation) connection; 465 466 final HRegionLocation loc; 467 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 468 loc = rl.getRegionLocation(FAM_NAM); 469 } 470 471 Get get = new Get(FAM_NAM); 472 Assert.assertNotNull(table.get(get)); 473 474 get = new Get(FAM_NAM); 475 get.setFilter(new BlockingFilter()); 476 477 // This thread will mark the server as dead while we're waiting during a get. 478 Thread t = new Thread() { 479 @Override 480 public void run() { 481 synchronized (syncBlockingFilter) { 482 try { 483 syncBlockingFilter.wait(); 484 } catch (InterruptedException e) { 485 throw new RuntimeException(e); 486 } 487 } 488 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName()); 489 } 490 }; 491 492 t.start(); 493 try { 494 table.get(get); 495 Assert.fail(); 496 } catch (IOException expected) { 497 LOG.debug("Received: " + expected); 498 Assert.assertFalse(expected instanceof SocketTimeoutException); 499 Assert.assertFalse(syncBlockingFilter.get()); 500 } finally { 501 syncBlockingFilter.set(true); 502 t.join(); 503 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 504 } 505 506 table.close(); 507 connection.close(); 508 } 509 510 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false); 511 512 public static class BlockingFilter extends FilterBase { 513 @Override 514 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { 515 int i = 0; 516 while (i++ < 1000 && !syncBlockingFilter.get()) { 517 synchronized (syncBlockingFilter) { 518 syncBlockingFilter.notifyAll(); 519 } 520 Threads.sleep(100); 521 } 522 syncBlockingFilter.set(true); 523 return false; 524 } 525 @Override 526 public ReturnCode filterCell(final Cell ignored) throws IOException { 527 return ReturnCode.INCLUDE; 528 } 529 530 public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{ 531 return new BlockingFilter(); 532 } 533 } 534 535 /** 536 * Test that when we delete a location using the first row of a region 537 * that we really delete it. 538 * @throws Exception 539 */ 540 @Test 541 public void testRegionCaching() throws Exception { 542 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); 543 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 544 // test with no retry, or client cache will get updated after the first failure 545 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 546 Connection connection = ConnectionFactory.createConnection(conf); 547 final Table table = connection.getTable(TABLE_NAME); 548 549 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); 550 Put put = new Put(ROW); 551 put.addColumn(FAM_NAM, ROW, ROW); 552 table.put(put); 553 554 ConnectionImplementation conn = (ConnectionImplementation) connection; 555 556 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 557 558 // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at 559 // a location where the port is current port number +1 -- i.e. a non-existent location. 560 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 561 final int nextPort = loc.getPort() + 1; 562 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), 563 ServerName.valueOf("127.0.0.1", nextPort, 564 HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP); 565 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW) 566 .getRegionLocation().getPort(), nextPort); 567 568 conn.clearRegionCache(TABLE_NAME, ROW.clone()); 569 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW); 570 assertNull("What is this location?? " + rl, rl); 571 572 // We're now going to move the region and check that it works for the client 573 // First a new put to add the location in the cache 574 conn.clearRegionCache(TABLE_NAME); 575 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); 576 Put put2 = new Put(ROW); 577 put2.addColumn(FAM_NAM, ROW, ROW); 578 table.put(put2); 579 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 580 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone())); 581 582 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 583 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 584 585 // We can wait for all regions to be online, that makes log reading easier when debugging 586 TEST_UTIL.waitUntilNoRegionsInTransition(); 587 588 // Now moving the region to the second server 589 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 590 byte[] regionName = toMove.getRegionInfo().getRegionName(); 591 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 592 593 // Choose the other server. 594 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 595 int destServerId = curServerId == 0? 1: 0; 596 597 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 598 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 599 600 ServerName destServerName = destServer.getServerName(); 601 602 // Check that we are in the expected state 603 Assert.assertTrue(curServer != destServer); 604 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); 605 Assert.assertFalse( toMove.getPort() == destServerName.getPort()); 606 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 607 Assert.assertNull(destServer.getOnlineRegion(regionName)); 608 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 609 getAssignmentManager().hasRegionsInTransition()); 610 611 // Moving. It's possible that we don't have all the regions online at this point, so 612 // the test must depend only on the region we're looking at. 613 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); 614 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 615 616 while (destServer.getOnlineRegion(regionName) == null || 617 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 618 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 619 master.getAssignmentManager().hasRegionsInTransition()) { 620 // wait for the move to be finished 621 Thread.sleep(1); 622 } 623 624 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 625 626 // Check our new state. 627 Assert.assertNull(curServer.getOnlineRegion(regionName)); 628 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 629 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 630 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 631 632 633 // Cache was NOT updated and points to the wrong server 634 Assert.assertFalse( 635 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation() 636 .getPort() == destServerName.getPort()); 637 638 // This part relies on a number of tries equals to 1. 639 // We do a put and expect the cache to be updated, even if we don't retry 640 LOG.info("Put starting"); 641 Put put3 = new Put(ROW); 642 put3.addColumn(FAM_NAM, ROW, ROW); 643 try { 644 table.put(put3); 645 Assert.fail("Unreachable point"); 646 } catch (RetriesExhaustedWithDetailsException e) { 647 LOG.info("Put done, exception caught: " + e.getClass()); 648 Assert.assertEquals(1, e.getNumExceptions()); 649 Assert.assertEquals(1, e.getCauses().size()); 650 Assert.assertArrayEquals(ROW, e.getRow(0).getRow()); 651 652 // Check that we unserialized the exception as expected 653 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); 654 Assert.assertNotNull(cause); 655 Assert.assertTrue(cause instanceof RegionMovedException); 656 } catch (RetriesExhaustedException ree) { 657 // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException 658 // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue. 659 LOG.info("Put done, exception caught: " + ree.getClass()); 660 Throwable cause = ClientExceptionsUtil.findException(ree.getCause()); 661 Assert.assertNotNull(cause); 662 Assert.assertTrue(cause instanceof RegionMovedException); 663 } 664 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW)); 665 Assert.assertEquals( 666 "Previous server was " + curServer.getServerName().getHostAndPort(), 667 destServerName.getPort(), 668 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 669 670 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 671 .containsKey(encodedRegionNameBytes)); 672 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 673 .containsKey(encodedRegionNameBytes)); 674 675 // We move it back to do another test with a scan 676 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 677 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), 678 curServer.getServerName()); 679 680 while (curServer.getOnlineRegion(regionName) == null || 681 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 682 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 683 master.getAssignmentManager().hasRegionsInTransition()) { 684 // wait for the move to be finished 685 Thread.sleep(1); 686 } 687 688 // Check our new state. 689 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 690 Assert.assertNull(destServer.getOnlineRegion(regionName)); 691 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 692 693 // Cache was NOT updated and points to the wrong server 694 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() == 695 curServer.getServerName().getPort()); 696 697 Scan sc = new Scan(); 698 sc.setStopRow(ROW); 699 sc.setStartRow(ROW); 700 701 // The scanner takes the max retries from the connection configuration, not the table as 702 // the put. 703 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 704 705 try { 706 ResultScanner rs = table.getScanner(sc); 707 while (rs.next() != null) { 708 } 709 Assert.fail("Unreachable point"); 710 } catch (RetriesExhaustedException e) { 711 LOG.info("Scan done, expected exception caught: " + e.getClass()); 712 } 713 714 // Cache is updated with the right value. 715 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 716 Assert.assertEquals( 717 "Previous server was "+destServer.getServerName().getHostAndPort(), 718 curServer.getServerName().getPort(), 719 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 720 721 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 722 table.close(); 723 connection.close(); 724 } 725 726 /** 727 * Test that Connection or Pool are not closed when managed externally 728 * @throws Exception 729 */ 730 @Test 731 public void testConnectionManagement() throws Exception{ 732 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); 733 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 734 Table table = conn.getTable(TABLE_NAME1); 735 table.close(); 736 assertFalse(conn.isClosed()); 737 if(table instanceof HTable) { 738 assertFalse(((HTable) table).getPool().isShutdown()); 739 } 740 table = conn.getTable(TABLE_NAME1); 741 table.close(); 742 if(table instanceof HTable) { 743 assertFalse(((HTable) table).getPool().isShutdown()); 744 } 745 conn.close(); 746 if(table instanceof HTable) { 747 assertTrue(((HTable) table).getPool().isShutdown()); 748 } 749 table0.close(); 750 } 751 752 /** 753 * Test that stale cache updates don't override newer cached values. 754 */ 755 @Test 756 public void testCacheSeqNums() throws Exception{ 757 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM); 758 Put put = new Put(ROW); 759 put.addColumn(FAM_NAM, ROW, ROW); 760 table.put(put); 761 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 762 763 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 764 assertNotNull(location); 765 766 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); 767 768 // Same server as already in cache reporting - overwrites any value despite seqNum. 769 int nextPort = location.getPort() + 1; 770 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 771 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 772 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 773 Assert.assertEquals(nextPort, location.getPort()); 774 775 // No source specified - same. 776 nextPort = location.getPort() + 1; 777 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 778 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 779 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 780 Assert.assertEquals(nextPort, location.getPort()); 781 782 // Higher seqNum - overwrites lower seqNum. 783 nextPort = location.getPort() + 1; 784 conn.updateCachedLocation(location.getRegionInfo(), anySource, 785 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1); 786 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 787 Assert.assertEquals(nextPort, location.getPort()); 788 789 // Lower seqNum - does not overwrite higher seqNum. 790 nextPort = location.getPort() + 1; 791 conn.updateCachedLocation(location.getRegionInfo(), anySource, 792 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 793 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 794 Assert.assertEquals(nextPort - 1, location.getPort()); 795 table.close(); 796 } 797 798 @Test 799 public void testClosing() throws Exception { 800 Configuration configuration = 801 new Configuration(TEST_UTIL.getConfiguration()); 802 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 803 String.valueOf(ThreadLocalRandom.current().nextInt())); 804 805 // as connection caching is going away, now we're just testing 806 // that closed connection does actually get closed. 807 808 Connection c1 = ConnectionFactory.createConnection(configuration); 809 Connection c2 = ConnectionFactory.createConnection(configuration); 810 // no caching, different connections 811 assertTrue(c1 != c2); 812 813 // closing independently 814 c1.close(); 815 assertTrue(c1.isClosed()); 816 assertFalse(c2.isClosed()); 817 818 c2.close(); 819 assertTrue(c2.isClosed()); 820 } 821 822 /** 823 * Trivial test to verify that nobody messes with 824 * {@link ConnectionFactory#createConnection(Configuration)} 825 */ 826 @Test 827 public void testCreateConnection() throws Exception { 828 Configuration configuration = TEST_UTIL.getConfiguration(); 829 Connection c1 = ConnectionFactory.createConnection(configuration); 830 Connection c2 = ConnectionFactory.createConnection(configuration); 831 // created from the same configuration, yet they are different 832 assertTrue(c1 != c2); 833 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 834 } 835 836 /** 837 * This test checks that one can connect to the cluster with only the 838 * ZooKeeper quorum set. Other stuff like master address will be read 839 * from ZK by the client. 840 */ 841 @Test 842 public void testConnection() throws Exception{ 843 // We create an empty config and add the ZK address. 844 Configuration c = new Configuration(); 845 // This test only makes sense for ZK based connection registry. 846 c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 847 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 848 c.set(HConstants.ZOOKEEPER_QUORUM, 849 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); 850 c.set(HConstants.ZOOKEEPER_CLIENT_PORT, 851 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); 852 // This should be enough to connect 853 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c); 854 assertTrue(conn.isMasterRunning()); 855 conn.close(); 856 } 857 858 private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception { 859 Field numTries = hci.getClass().getDeclaredField("numTries"); 860 numTries.setAccessible(true); 861 Field modifiersField = Field.class.getDeclaredField("modifiers"); 862 modifiersField.setAccessible(true); 863 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); 864 final int prevNumRetriesVal = (Integer)numTries.get(hci); 865 numTries.set(hci, newVal); 866 867 return prevNumRetriesVal; 868 } 869 870 @Test 871 public void testMulti() throws Exception { 872 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM); 873 try { 874 ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection(); 875 876 // We're now going to move the region and check that it works for the client 877 // First a new put to add the location in the cache 878 conn.clearRegionCache(TABLE_NAME3); 879 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); 880 881 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 882 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 883 884 // We can wait for all regions to be online, that makes log reading easier when debugging 885 TEST_UTIL.waitUntilNoRegionsInTransition(); 886 887 Put put = new Put(ROW_X); 888 put.addColumn(FAM_NAM, ROW_X, ROW_X); 889 table.put(put); 890 891 // Now moving the region to the second server 892 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); 893 byte[] regionName = toMove.getRegionInfo().getRegionName(); 894 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 895 896 // Choose the other server. 897 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 898 int destServerId = (curServerId == 0 ? 1 : 0); 899 900 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 901 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 902 903 ServerName destServerName = destServer.getServerName(); 904 ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 905 906 //find another row in the cur server that is less than ROW_X 907 List<HRegion> regions = curServer.getRegions(TABLE_NAME3); 908 byte[] otherRow = null; 909 for (Region region : regions) { 910 if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) 911 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { 912 otherRow = region.getRegionInfo().getStartKey(); 913 break; 914 } 915 } 916 assertNotNull(otherRow); 917 // If empty row, set it to first row.-f 918 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); 919 Put put2 = new Put(otherRow); 920 put2.addColumn(FAM_NAM, otherRow, otherRow); 921 table.put(put2); //cache put2's location 922 923 // Check that we are in the expected state 924 Assert.assertTrue(curServer != destServer); 925 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); 926 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); 927 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 928 Assert.assertNull(destServer.getOnlineRegion(regionName)); 929 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 930 getAssignmentManager().hasRegionsInTransition()); 931 932 // Moving. It's possible that we don't have all the regions online at this point, so 933 // the test depends only on the region we're looking at. 934 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 935 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 936 937 while (destServer.getOnlineRegion(regionName) == null || 938 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 939 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 940 master.getAssignmentManager().hasRegionsInTransition()) { 941 // wait for the move to be finished 942 Thread.sleep(1); 943 } 944 945 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 946 947 // Check our new state. 948 Assert.assertNull(curServer.getOnlineRegion(regionName)); 949 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 950 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 951 .containsKey(encodedRegionNameBytes)); 952 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 953 .containsKey(encodedRegionNameBytes)); 954 955 956 // Cache was NOT updated and points to the wrong server 957 Assert.assertFalse( 958 conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() 959 .getPort() == destServerName.getPort()); 960 961 // Hijack the number of retry to fail after 2 tries 962 final int prevNumRetriesVal = setNumTries(conn, 2); 963 964 Put put3 = new Put(ROW_X); 965 put3.addColumn(FAM_NAM, ROW_X, ROW_X); 966 Put put4 = new Put(otherRow); 967 put4.addColumn(FAM_NAM, otherRow, otherRow); 968 969 // do multi 970 ArrayList<Put> actions = Lists.newArrayList(put4, put3); 971 table.batch(actions, null); // first should be a valid row, 972 // second we get RegionMovedException. 973 974 setNumTries(conn, prevNumRetriesVal); 975 } finally { 976 table.close(); 977 } 978 } 979 980 @Test 981 public void testErrorBackoffTimeCalculation() throws Exception { 982 // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. 983 final long ANY_PAUSE = 100; 984 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); 985 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); 986 987 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); 988 EnvironmentEdgeManager.injectEdge(timeMachine); 989 try { 990 long largeAmountOfTime = ANY_PAUSE * 1000; 991 ConnectionImplementation.ServerErrorTracker tracker = 992 new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); 993 994 // The default backoff is 0. 995 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); 996 997 // Check some backoff values from HConstants sequence. 998 tracker.reportServerError(location); 999 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1000 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1001 tracker.reportServerError(location); 1002 tracker.reportServerError(location); 1003 tracker.reportServerError(location); 1004 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3], 1005 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1006 1007 // All of this shouldn't affect backoff for different location. 1008 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1009 tracker.reportServerError(diffLocation); 1010 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1011 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1012 1013 // Check with different base. 1014 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3], 1015 tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); 1016 } finally { 1017 EnvironmentEdgeManager.reset(); 1018 } 1019 } 1020 1021 private static void assertEqualsWithJitter(long expected, long actual) { 1022 assertEqualsWithJitter(expected, actual, expected); 1023 } 1024 1025 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { 1026 assertTrue("Value not within jitter: " + expected + " vs " + actual, 1027 Math.abs(actual - expected) <= (0.01f * jitterBase)); 1028 } 1029 1030 @Test 1031 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException { 1032 Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 1033 // This test only makes sense for ZK based connection registry. 1034 config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 1035 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 1036 1037 final TableName tableName = TableName.valueOf(name.getMethodName()); 1038 TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close(); 1039 1040 Connection connection = ConnectionFactory.createConnection(config); 1041 Table table = connection.getTable(tableName); 1042 1043 // this will cache the meta location and table's region location 1044 table.get(new Get(Bytes.toBytes("foo"))); 1045 1046 // restart HBase 1047 TEST_UTIL.shutdownMiniHBaseCluster(); 1048 TEST_UTIL.restartHBaseCluster(2); 1049 // this should be able to discover new locations for meta and table's region 1050 table.get(new Get(Bytes.toBytes("foo"))); 1051 TEST_UTIL.deleteTable(tableName); 1052 table.close(); 1053 connection.close(); 1054 } 1055 1056 @Test 1057 public void testLocateRegionsWithRegionReplicas() throws IOException { 1058 int regionReplication = 3; 1059 byte[] family = Bytes.toBytes("cf"); 1060 TableName tableName = TableName.valueOf(name.getMethodName()); 1061 1062 // Create a table with region replicas 1063 TableDescriptorBuilder builder = TableDescriptorBuilder 1064 .newBuilder(tableName) 1065 .setRegionReplication(regionReplication) 1066 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1067 TEST_UTIL.getAdmin().createTable(builder.build()); 1068 1069 try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory. 1070 createConnection(TEST_UTIL.getConfiguration())) { 1071 // Get locations of the regions of the table 1072 List<HRegionLocation> locations = con.locateRegions(tableName, false, false); 1073 1074 // The size of the returned locations should be 3 1075 assertEquals(regionReplication, locations.size()); 1076 1077 // The replicaIds of the returned locations should be 0, 1 and 2 1078 Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication). 1079 boxed().collect(Collectors.toSet()); 1080 for (HRegionLocation location : locations) { 1081 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 1082 } 1083 } finally { 1084 TEST_UTIL.deleteTable(tableName); 1085 } 1086 } 1087 1088 @Test 1089 public void testMetaLookupThreadPoolCreated() throws Exception { 1090 final TableName tableName = TableName.valueOf(name.getMethodName()); 1091 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; 1092 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 1093 TEST_UTIL.getAdmin().disableTable(tableName); 1094 TEST_UTIL.getAdmin().deleteTable(tableName); 1095 } 1096 try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) { 1097 byte[] row = Bytes.toBytes("test"); 1098 ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); 1099 // check that metalookup pool would get created 1100 c.relocateRegion(tableName, row); 1101 ExecutorService ex = c.getCurrentMetaLookupPool(); 1102 assertNotNull(ex); 1103 } 1104 } 1105 1106 // There is no assertion, but you need to confirm that there is no resource leak output from netty 1107 @Test 1108 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException { 1109 TableName tableName = TableName.valueOf(name.getMethodName()); 1110 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 1111 TEST_UTIL.getAdmin().balancerSwitch(false, true); 1112 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 1113 Table table = connection.getTable(tableName)) { 1114 table.get(new Get(Bytes.toBytes("1"))); 1115 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName(); 1116 RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient(); 1117 rpcClient.cancelConnections(sn); 1118 Thread.sleep(1000); 1119 System.gc(); 1120 Thread.sleep(1000); 1121 } 1122 } 1123}