001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.lang.reflect.Modifier; 029import java.net.SocketTimeoutException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Set; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.SynchronousQueue; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.stream.Collectors; 042import java.util.stream.IntStream; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 054import org.apache.hadoop.hbase.exceptions.DeserializationException; 055import org.apache.hadoop.hbase.exceptions.RegionMovedException; 056import org.apache.hadoop.hbase.filter.Filter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.ipc.RpcClient; 059import org.apache.hadoop.hbase.master.HMaster; 060import org.apache.hadoop.hbase.regionserver.HRegion; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.regionserver.Region; 063import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 064import org.apache.hadoop.hbase.testclassification.LargeTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.JVMClusterUtil; 068import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 069import org.apache.hadoop.hbase.util.Threads; 070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 071import org.junit.After; 072import org.junit.AfterClass; 073import org.junit.Assert; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Ignore; 077import org.junit.Rule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.rules.TestName; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 085import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 086import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level; 087 088/** 089 * This class is for testing HBaseConnectionManager features 090 */ 091@Category({LargeTests.class}) 092public class TestConnectionImplementation { 093 094 @ClassRule 095 public static final HBaseClassTestRule CLASS_RULE = 096 HBaseClassTestRule.forClass(TestConnectionImplementation.class); 097 098 private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class); 099 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 100 private static final TableName TABLE_NAME = 101 TableName.valueOf("test"); 102 private static final TableName TABLE_NAME1 = 103 TableName.valueOf("test1"); 104 private static final TableName TABLE_NAME2 = 105 TableName.valueOf("test2"); 106 private static final TableName TABLE_NAME3 = 107 TableName.valueOf("test3"); 108 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 109 private static final byte[] ROW = Bytes.toBytes("bbb"); 110 private static final byte[] ROW_X = Bytes.toBytes("xxx"); 111 private static final int RPC_RETRY = 5; 112 113 @Rule 114 public TestName name = new TestName(); 115 116 @BeforeClass 117 public static void setUpBeforeClass() throws Exception { 118 ResourceLeakDetector.setLevel(Level.PARANOID); 119 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 120 // Up the handlers; this test needs more than usual. 121 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 122 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 123 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 124 TEST_UTIL.startMiniCluster(2); 125 126 } 127 128 @AfterClass 129 public static void tearDownAfterClass() throws Exception { 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 @After 134 public void tearDown() throws IOException { 135 TEST_UTIL.getAdmin().balancerSwitch(true, true); 136 } 137 138 @Test 139 public void testClusterConnection() throws IOException { 140 ThreadPoolExecutor otherPool = 141 new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), 142 new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d") 143 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 144 145 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 146 Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); 147 // make sure the internally created ExecutorService is the one passed 148 assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool()); 149 150 final TableName tableName = TableName.valueOf(name.getMethodName()); 151 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 152 Table table = con1.getTable(tableName, otherPool); 153 154 ExecutorService pool = null; 155 156 if(table instanceof HTable) { 157 HTable t = (HTable) table; 158 // make sure passing a pool to the getTable does not trigger creation of an internal pool 159 assertNull("Internal Thread pool should be null", 160 ((ConnectionImplementation) con1).getCurrentBatchPool()); 161 // table should use the pool passed 162 assertTrue(otherPool == t.getPool()); 163 t.close(); 164 165 t = (HTable) con2.getTable(tableName); 166 // table should use the connectin's internal pool 167 assertTrue(otherPool == t.getPool()); 168 t.close(); 169 170 t = (HTable) con2.getTable(tableName); 171 // try other API too 172 assertTrue(otherPool == t.getPool()); 173 t.close(); 174 175 t = (HTable) con2.getTable(tableName); 176 // try other API too 177 assertTrue(otherPool == t.getPool()); 178 t.close(); 179 180 t = (HTable) con1.getTable(tableName); 181 pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); 182 // make sure an internal pool was created 183 assertNotNull("An internal Thread pool should have been created", pool); 184 // and that the table is using it 185 assertTrue(t.getPool() == pool); 186 t.close(); 187 188 t = (HTable) con1.getTable(tableName); 189 // still using the *same* internal pool 190 assertTrue(t.getPool() == pool); 191 t.close(); 192 } else { 193 table.close(); 194 } 195 196 con1.close(); 197 198 // if the pool was created on demand it should be closed upon connection close 199 if(pool != null) { 200 assertTrue(pool.isShutdown()); 201 } 202 203 con2.close(); 204 // if the pool is passed, it is not closed 205 assertFalse(otherPool.isShutdown()); 206 otherPool.shutdownNow(); 207 } 208 209 /** 210 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 211 * @throws IOException Unable to construct admin 212 */ 213 @Test 214 public void testAdminFactory() throws IOException { 215 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 216 Admin admin = con1.getAdmin(); 217 assertTrue(admin.getConnection() == con1); 218 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 219 con1.close(); 220 } 221 222 // Fails too often! Needs work. HBASE-12558 223 // May only fail on non-linux machines? E.g. macosx. 224 @Ignore @Test (expected = RegionServerStoppedException.class) 225 // Depends on mulitcast messaging facility that seems broken in hbase2 226 // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of 227 // dead servers is broke" 228 public void testClusterStatus() throws Exception { 229 final TableName tableName = TableName.valueOf(name.getMethodName()); 230 byte[] cf = "cf".getBytes(); 231 byte[] rk = "rk1".getBytes(); 232 233 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer(); 234 rs.waitForServerOnline(); 235 final ServerName sn = rs.getRegionServer().getServerName(); 236 237 Table t = TEST_UTIL.createTable(tableName, cf); 238 TEST_UTIL.waitTableAvailable(tableName); 239 TEST_UTIL.waitUntilNoRegionsInTransition(); 240 241 final ConnectionImplementation hci = (ConnectionImplementation)TEST_UTIL.getConnection(); 242 try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 243 while (l.getRegionLocation(rk).getPort() != sn.getPort()) { 244 TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(), 245 sn); 246 TEST_UTIL.waitUntilNoRegionsInTransition(); 247 hci.clearRegionCache(tableName); 248 } 249 Assert.assertNotNull(hci.clusterStatusListener); 250 TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000); 251 } 252 253 Put p1 = new Put(rk); 254 p1.addColumn(cf, "qual".getBytes(), "val".getBytes()); 255 t.put(p1); 256 257 rs.getRegionServer().abort("I'm dead"); 258 259 // We want the status to be updated. That's a least 10 second 260 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 261 @Override 262 public boolean evaluate() throws Exception { 263 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager(). 264 getDeadServers().isDeadServer(sn); 265 } 266 }); 267 268 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 269 @Override 270 public boolean evaluate() throws Exception { 271 return hci.clusterStatusListener.isDeadServer(sn); 272 } 273 }); 274 275 t.close(); 276 hci.getClient(sn); // will throw an exception: RegionServerStoppedException 277 } 278 279 /** 280 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 281 */ 282 @Test 283 public void testConnectionCloseAllowsInterrupt() throws Exception { 284 testConnectionClose(true); 285 } 286 287 @Test 288 public void testConnectionNotAllowsInterrupt() throws Exception { 289 testConnectionClose(false); 290 } 291 292 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 293 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 294 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 295 296 TEST_UTIL.getAdmin().balancerSwitch(false, true); 297 298 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 299 // We want to work on a separate connection. 300 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 301 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 302 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 303 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 304 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 305 // to avoid the client to be stuck when do the Get 306 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 307 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 308 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 309 310 Connection connection = ConnectionFactory.createConnection(c2); 311 final Table table = connection.getTable(tableName); 312 313 Put put = new Put(ROW); 314 put.addColumn(FAM_NAM, ROW, ROW); 315 table.put(put); 316 317 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 318 final AtomicInteger step = new AtomicInteger(0); 319 320 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 321 Thread t = new Thread("testConnectionCloseThread") { 322 @Override 323 public void run() { 324 int done = 0; 325 try { 326 step.set(1); 327 while (step.get() == 1) { 328 Get get = new Get(ROW); 329 table.get(get); 330 done++; 331 if (done % 100 == 0) 332 LOG.info("done=" + done); 333 // without the sleep, will cause the exception for too many files in 334 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 335 Thread.sleep(100); 336 } 337 } catch (Throwable t) { 338 failed.set(t); 339 LOG.error(t.toString(), t); 340 } 341 step.set(3); 342 } 343 }; 344 t.start(); 345 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 346 @Override 347 public boolean evaluate() throws Exception { 348 return step.get() == 1; 349 } 350 }); 351 352 ServerName sn; 353 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 354 sn = rl.getRegionLocation(ROW).getServerName(); 355 } 356 ConnectionImplementation conn = 357 (ConnectionImplementation) connection; 358 RpcClient rpcClient = conn.getRpcClient(); 359 360 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); 361 for (int i = 0; i < 500; i++) { 362 rpcClient.cancelConnections(sn); 363 Thread.sleep(50); 364 } 365 366 step.compareAndSet(1, 2); 367 // The test may fail here if the thread doing the gets is stuck. The way to find 368 // out what's happening is to look for the thread named 'testConnectionCloseThread' 369 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 370 @Override 371 public boolean evaluate() throws Exception { 372 return step.get() == 3; 373 } 374 }); 375 table.close(); 376 connection.close(); 377 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 378 } 379 380 /** 381 * Test that connection can become idle without breaking everything. 382 */ 383 @Test 384 public void testConnectionIdle() throws Exception { 385 final TableName tableName = TableName.valueOf(name.getMethodName()); 386 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 387 int idleTime = 20000; 388 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 389 390 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 391 // We want to work on a separate connection. 392 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 393 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 394 c2.setInt(RpcClient.IDLE_TIME, idleTime); 395 396 Connection connection = ConnectionFactory.createConnection(c2); 397 final Table table = connection.getTable(tableName); 398 399 Put put = new Put(ROW); 400 put.addColumn(FAM_NAM, ROW, ROW); 401 table.put(put); 402 403 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 404 mee.setValue(System.currentTimeMillis()); 405 EnvironmentEdgeManager.injectEdge(mee); 406 LOG.info("first get"); 407 table.get(new Get(ROW)); 408 409 LOG.info("first get - changing the time & sleeping"); 410 mee.incValue(idleTime + 1000); 411 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 412 // 1500 = sleep time in RpcClient#waitForWork + a margin 413 414 LOG.info("second get - connection has been marked idle in the middle"); 415 // To check that the connection actually became idle would need to read some private 416 // fields of RpcClient. 417 table.get(new Get(ROW)); 418 mee.incValue(idleTime + 1000); 419 420 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 421 // We're testing here a special case: 422 // time limit reached BUT connection not yet reclaimed AND a new call. 423 // in this situation, we don't close the connection, instead we use it immediately. 424 // If we're very unlucky we can have a race condition in the test: the connection is already 425 // under closing when we do the get, so we have an exception, and we don't retry as the 426 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 427 // a test issue only. 428 table.get(new Get(ROW)); 429 430 LOG.info("we're done - time will change back"); 431 432 table.close(); 433 434 connection.close(); 435 EnvironmentEdgeManager.reset(); 436 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 437 } 438 439 /** 440 * Test that the connection to the dead server is cut immediately when we receive the 441 * notification. 442 * @throws Exception 443 */ 444 @Test 445 public void testConnectionCut() throws Exception { 446 final TableName tableName = TableName.valueOf(name.getMethodName()); 447 448 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 449 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 450 451 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 452 // We want to work on a separate connection. 453 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 454 // try only once w/o any retry 455 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 456 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); 457 458 final Connection connection = ConnectionFactory.createConnection(c2); 459 final Table table = connection.getTable(tableName); 460 461 Put p = new Put(FAM_NAM); 462 p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); 463 table.put(p); 464 465 final ConnectionImplementation hci = (ConnectionImplementation) connection; 466 467 final HRegionLocation loc; 468 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 469 loc = rl.getRegionLocation(FAM_NAM); 470 } 471 472 Get get = new Get(FAM_NAM); 473 Assert.assertNotNull(table.get(get)); 474 475 get = new Get(FAM_NAM); 476 get.setFilter(new BlockingFilter()); 477 478 // This thread will mark the server as dead while we're waiting during a get. 479 Thread t = new Thread() { 480 @Override 481 public void run() { 482 synchronized (syncBlockingFilter) { 483 try { 484 syncBlockingFilter.wait(); 485 } catch (InterruptedException e) { 486 throw new RuntimeException(e); 487 } 488 } 489 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName()); 490 } 491 }; 492 493 t.start(); 494 try { 495 table.get(get); 496 Assert.fail(); 497 } catch (IOException expected) { 498 LOG.debug("Received: " + expected); 499 Assert.assertFalse(expected instanceof SocketTimeoutException); 500 Assert.assertFalse(syncBlockingFilter.get()); 501 } finally { 502 syncBlockingFilter.set(true); 503 t.join(); 504 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 505 } 506 507 table.close(); 508 connection.close(); 509 } 510 511 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false); 512 513 public static class BlockingFilter extends FilterBase { 514 @Override 515 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { 516 int i = 0; 517 while (i++ < 1000 && !syncBlockingFilter.get()) { 518 synchronized (syncBlockingFilter) { 519 syncBlockingFilter.notifyAll(); 520 } 521 Threads.sleep(100); 522 } 523 syncBlockingFilter.set(true); 524 return false; 525 } 526 @Override 527 public ReturnCode filterCell(final Cell ignored) throws IOException { 528 return ReturnCode.INCLUDE; 529 } 530 531 public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{ 532 return new BlockingFilter(); 533 } 534 } 535 536 /** 537 * Test that when we delete a location using the first row of a region 538 * that we really delete it. 539 * @throws Exception 540 */ 541 @Test 542 public void testRegionCaching() throws Exception { 543 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); 544 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 545 // test with no retry, or client cache will get updated after the first failure 546 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 547 Connection connection = ConnectionFactory.createConnection(conf); 548 final Table table = connection.getTable(TABLE_NAME); 549 550 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); 551 Put put = new Put(ROW); 552 put.addColumn(FAM_NAM, ROW, ROW); 553 table.put(put); 554 555 ConnectionImplementation conn = (ConnectionImplementation) connection; 556 557 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 558 559 // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at 560 // a location where the port is current port number +1 -- i.e. a non-existent location. 561 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 562 final int nextPort = loc.getPort() + 1; 563 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), 564 ServerName.valueOf("127.0.0.1", nextPort, 565 HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP); 566 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW) 567 .getRegionLocation().getPort(), nextPort); 568 569 conn.clearRegionCache(TABLE_NAME, ROW.clone()); 570 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW); 571 assertNull("What is this location?? " + rl, rl); 572 573 // We're now going to move the region and check that it works for the client 574 // First a new put to add the location in the cache 575 conn.clearRegionCache(TABLE_NAME); 576 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); 577 Put put2 = new Put(ROW); 578 put2.addColumn(FAM_NAM, ROW, ROW); 579 table.put(put2); 580 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 581 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone())); 582 583 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 584 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 585 586 // We can wait for all regions to be online, that makes log reading easier when debugging 587 TEST_UTIL.waitUntilNoRegionsInTransition(); 588 589 // Now moving the region to the second server 590 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 591 byte[] regionName = toMove.getRegionInfo().getRegionName(); 592 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 593 594 // Choose the other server. 595 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 596 int destServerId = curServerId == 0? 1: 0; 597 598 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 599 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 600 601 ServerName destServerName = destServer.getServerName(); 602 603 // Check that we are in the expected state 604 Assert.assertTrue(curServer != destServer); 605 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); 606 Assert.assertFalse( toMove.getPort() == destServerName.getPort()); 607 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 608 Assert.assertNull(destServer.getOnlineRegion(regionName)); 609 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 610 getAssignmentManager().hasRegionsInTransition()); 611 612 // Moving. It's possible that we don't have all the regions online at this point, so 613 // the test must depend only on the region we're looking at. 614 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); 615 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 616 617 while (destServer.getOnlineRegion(regionName) == null || 618 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 619 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 620 master.getAssignmentManager().hasRegionsInTransition()) { 621 // wait for the move to be finished 622 Thread.sleep(1); 623 } 624 625 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 626 627 // Check our new state. 628 Assert.assertNull(curServer.getOnlineRegion(regionName)); 629 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 630 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 631 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 632 633 634 // Cache was NOT updated and points to the wrong server 635 Assert.assertFalse( 636 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation() 637 .getPort() == destServerName.getPort()); 638 639 // This part relies on a number of tries equals to 1. 640 // We do a put and expect the cache to be updated, even if we don't retry 641 LOG.info("Put starting"); 642 Put put3 = new Put(ROW); 643 put3.addColumn(FAM_NAM, ROW, ROW); 644 try { 645 table.put(put3); 646 Assert.fail("Unreachable point"); 647 } catch (RetriesExhaustedWithDetailsException e) { 648 LOG.info("Put done, exception caught: " + e.getClass()); 649 Assert.assertEquals(1, e.getNumExceptions()); 650 Assert.assertEquals(1, e.getCauses().size()); 651 Assert.assertArrayEquals(ROW, e.getRow(0).getRow()); 652 653 // Check that we unserialized the exception as expected 654 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); 655 Assert.assertNotNull(cause); 656 Assert.assertTrue(cause instanceof RegionMovedException); 657 } catch (RetriesExhaustedException ree) { 658 // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException 659 // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue. 660 LOG.info("Put done, exception caught: " + ree.getClass()); 661 Throwable cause = ClientExceptionsUtil.findException(ree.getCause()); 662 Assert.assertNotNull(cause); 663 Assert.assertTrue(cause instanceof RegionMovedException); 664 } 665 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW)); 666 Assert.assertEquals( 667 "Previous server was " + curServer.getServerName().getHostAndPort(), 668 destServerName.getPort(), 669 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 670 671 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 672 .containsKey(encodedRegionNameBytes)); 673 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 674 .containsKey(encodedRegionNameBytes)); 675 676 // We move it back to do another test with a scan 677 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 678 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), 679 curServer.getServerName()); 680 681 while (curServer.getOnlineRegion(regionName) == null || 682 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 683 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 684 master.getAssignmentManager().hasRegionsInTransition()) { 685 // wait for the move to be finished 686 Thread.sleep(1); 687 } 688 689 // Check our new state. 690 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 691 Assert.assertNull(destServer.getOnlineRegion(regionName)); 692 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 693 694 // Cache was NOT updated and points to the wrong server 695 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() == 696 curServer.getServerName().getPort()); 697 698 Scan sc = new Scan(); 699 sc.setStopRow(ROW); 700 sc.setStartRow(ROW); 701 702 // The scanner takes the max retries from the connection configuration, not the table as 703 // the put. 704 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 705 706 try { 707 ResultScanner rs = table.getScanner(sc); 708 while (rs.next() != null) { 709 } 710 Assert.fail("Unreachable point"); 711 } catch (RetriesExhaustedException e) { 712 LOG.info("Scan done, expected exception caught: " + e.getClass()); 713 } 714 715 // Cache is updated with the right value. 716 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 717 Assert.assertEquals( 718 "Previous server was "+destServer.getServerName().getHostAndPort(), 719 curServer.getServerName().getPort(), 720 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 721 722 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 723 table.close(); 724 connection.close(); 725 } 726 727 /** 728 * Test that Connection or Pool are not closed when managed externally 729 * @throws Exception 730 */ 731 @Test 732 public void testConnectionManagement() throws Exception{ 733 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); 734 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 735 Table table = conn.getTable(TABLE_NAME1); 736 table.close(); 737 assertFalse(conn.isClosed()); 738 if(table instanceof HTable) { 739 assertFalse(((HTable) table).getPool().isShutdown()); 740 } 741 table = conn.getTable(TABLE_NAME1); 742 table.close(); 743 if(table instanceof HTable) { 744 assertFalse(((HTable) table).getPool().isShutdown()); 745 } 746 conn.close(); 747 if(table instanceof HTable) { 748 assertTrue(((HTable) table).getPool().isShutdown()); 749 } 750 table0.close(); 751 } 752 753 /** 754 * Test that stale cache updates don't override newer cached values. 755 */ 756 @Test 757 public void testCacheSeqNums() throws Exception{ 758 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM); 759 Put put = new Put(ROW); 760 put.addColumn(FAM_NAM, ROW, ROW); 761 table.put(put); 762 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 763 764 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 765 assertNotNull(location); 766 767 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); 768 769 // Same server as already in cache reporting - overwrites any value despite seqNum. 770 int nextPort = location.getPort() + 1; 771 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 772 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 773 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 774 Assert.assertEquals(nextPort, location.getPort()); 775 776 // No source specified - same. 777 nextPort = location.getPort() + 1; 778 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 779 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 780 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 781 Assert.assertEquals(nextPort, location.getPort()); 782 783 // Higher seqNum - overwrites lower seqNum. 784 nextPort = location.getPort() + 1; 785 conn.updateCachedLocation(location.getRegionInfo(), anySource, 786 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1); 787 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 788 Assert.assertEquals(nextPort, location.getPort()); 789 790 // Lower seqNum - does not overwrite higher seqNum. 791 nextPort = location.getPort() + 1; 792 conn.updateCachedLocation(location.getRegionInfo(), anySource, 793 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 794 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 795 Assert.assertEquals(nextPort - 1, location.getPort()); 796 table.close(); 797 } 798 799 @Test 800 public void testClosing() throws Exception { 801 Configuration configuration = 802 new Configuration(TEST_UTIL.getConfiguration()); 803 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 804 String.valueOf(ThreadLocalRandom.current().nextInt())); 805 806 // as connection caching is going away, now we're just testing 807 // that closed connection does actually get closed. 808 809 Connection c1 = ConnectionFactory.createConnection(configuration); 810 Connection c2 = ConnectionFactory.createConnection(configuration); 811 // no caching, different connections 812 assertTrue(c1 != c2); 813 814 // closing independently 815 c1.close(); 816 assertTrue(c1.isClosed()); 817 assertFalse(c2.isClosed()); 818 819 c2.close(); 820 assertTrue(c2.isClosed()); 821 } 822 823 /** 824 * Trivial test to verify that nobody messes with 825 * {@link ConnectionFactory#createConnection(Configuration)} 826 */ 827 @Test 828 public void testCreateConnection() throws Exception { 829 Configuration configuration = TEST_UTIL.getConfiguration(); 830 Connection c1 = ConnectionFactory.createConnection(configuration); 831 Connection c2 = ConnectionFactory.createConnection(configuration); 832 // created from the same configuration, yet they are different 833 assertTrue(c1 != c2); 834 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 835 } 836 837 /** 838 * This test checks that one can connect to the cluster with only the 839 * ZooKeeper quorum set. Other stuff like master address will be read 840 * from ZK by the client. 841 */ 842 @Test 843 public void testConnection() throws Exception{ 844 // We create an empty config and add the ZK address. 845 Configuration c = new Configuration(); 846 // This test only makes sense for ZK based connection registry. 847 c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 848 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 849 c.set(HConstants.ZOOKEEPER_QUORUM, 850 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); 851 c.set(HConstants.ZOOKEEPER_CLIENT_PORT, 852 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); 853 // This should be enough to connect 854 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c); 855 assertTrue(conn.isMasterRunning()); 856 conn.close(); 857 } 858 859 private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception { 860 Field numTries = hci.getClass().getDeclaredField("numTries"); 861 numTries.setAccessible(true); 862 Field modifiersField = Field.class.getDeclaredField("modifiers"); 863 modifiersField.setAccessible(true); 864 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); 865 final int prevNumRetriesVal = (Integer)numTries.get(hci); 866 numTries.set(hci, newVal); 867 868 return prevNumRetriesVal; 869 } 870 871 @Test 872 public void testMulti() throws Exception { 873 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM); 874 try { 875 ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection(); 876 877 // We're now going to move the region and check that it works for the client 878 // First a new put to add the location in the cache 879 conn.clearRegionCache(TABLE_NAME3); 880 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); 881 882 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 883 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 884 885 // We can wait for all regions to be online, that makes log reading easier when debugging 886 TEST_UTIL.waitUntilNoRegionsInTransition(); 887 888 Put put = new Put(ROW_X); 889 put.addColumn(FAM_NAM, ROW_X, ROW_X); 890 table.put(put); 891 892 // Now moving the region to the second server 893 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); 894 byte[] regionName = toMove.getRegionInfo().getRegionName(); 895 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 896 897 // Choose the other server. 898 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 899 int destServerId = (curServerId == 0 ? 1 : 0); 900 901 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 902 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 903 904 ServerName destServerName = destServer.getServerName(); 905 ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 906 907 //find another row in the cur server that is less than ROW_X 908 List<HRegion> regions = curServer.getRegions(TABLE_NAME3); 909 byte[] otherRow = null; 910 for (Region region : regions) { 911 if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) 912 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { 913 otherRow = region.getRegionInfo().getStartKey(); 914 break; 915 } 916 } 917 assertNotNull(otherRow); 918 // If empty row, set it to first row.-f 919 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); 920 Put put2 = new Put(otherRow); 921 put2.addColumn(FAM_NAM, otherRow, otherRow); 922 table.put(put2); //cache put2's location 923 924 // Check that we are in the expected state 925 Assert.assertTrue(curServer != destServer); 926 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); 927 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); 928 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 929 Assert.assertNull(destServer.getOnlineRegion(regionName)); 930 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 931 getAssignmentManager().hasRegionsInTransition()); 932 933 // Moving. It's possible that we don't have all the regions online at this point, so 934 // the test depends only on the region we're looking at. 935 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 936 TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName); 937 938 while (destServer.getOnlineRegion(regionName) == null || 939 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 940 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 941 master.getAssignmentManager().hasRegionsInTransition()) { 942 // wait for the move to be finished 943 Thread.sleep(1); 944 } 945 946 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 947 948 // Check our new state. 949 Assert.assertNull(curServer.getOnlineRegion(regionName)); 950 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 951 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 952 .containsKey(encodedRegionNameBytes)); 953 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 954 .containsKey(encodedRegionNameBytes)); 955 956 957 // Cache was NOT updated and points to the wrong server 958 Assert.assertFalse( 959 conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() 960 .getPort() == destServerName.getPort()); 961 962 // Hijack the number of retry to fail after 2 tries 963 final int prevNumRetriesVal = setNumTries(conn, 2); 964 965 Put put3 = new Put(ROW_X); 966 put3.addColumn(FAM_NAM, ROW_X, ROW_X); 967 Put put4 = new Put(otherRow); 968 put4.addColumn(FAM_NAM, otherRow, otherRow); 969 970 // do multi 971 ArrayList<Put> actions = Lists.newArrayList(put4, put3); 972 table.batch(actions, null); // first should be a valid row, 973 // second we get RegionMovedException. 974 975 setNumTries(conn, prevNumRetriesVal); 976 } finally { 977 table.close(); 978 } 979 } 980 981 @Test 982 public void testErrorBackoffTimeCalculation() throws Exception { 983 // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. 984 final long ANY_PAUSE = 100; 985 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); 986 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); 987 988 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); 989 EnvironmentEdgeManager.injectEdge(timeMachine); 990 try { 991 long largeAmountOfTime = ANY_PAUSE * 1000; 992 ConnectionImplementation.ServerErrorTracker tracker = 993 new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); 994 995 // The default backoff is 0. 996 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); 997 998 // Check some backoff values from HConstants sequence. 999 tracker.reportServerError(location); 1000 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1001 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1002 tracker.reportServerError(location); 1003 tracker.reportServerError(location); 1004 tracker.reportServerError(location); 1005 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3], 1006 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1007 1008 // All of this shouldn't affect backoff for different location. 1009 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1010 tracker.reportServerError(diffLocation); 1011 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1012 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1013 1014 // Check with different base. 1015 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3], 1016 tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); 1017 } finally { 1018 EnvironmentEdgeManager.reset(); 1019 } 1020 } 1021 1022 private static void assertEqualsWithJitter(long expected, long actual) { 1023 assertEqualsWithJitter(expected, actual, expected); 1024 } 1025 1026 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { 1027 assertTrue("Value not within jitter: " + expected + " vs " + actual, 1028 Math.abs(actual - expected) <= (0.01f * jitterBase)); 1029 } 1030 1031 @Test 1032 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException { 1033 Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 1034 // This test only makes sense for ZK based connection registry. 1035 config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 1036 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 1037 1038 final TableName tableName = TableName.valueOf(name.getMethodName()); 1039 TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close(); 1040 1041 Connection connection = ConnectionFactory.createConnection(config); 1042 Table table = connection.getTable(tableName); 1043 1044 // this will cache the meta location and table's region location 1045 table.get(new Get(Bytes.toBytes("foo"))); 1046 1047 // restart HBase 1048 TEST_UTIL.shutdownMiniHBaseCluster(); 1049 TEST_UTIL.restartHBaseCluster(2); 1050 // this should be able to discover new locations for meta and table's region 1051 table.get(new Get(Bytes.toBytes("foo"))); 1052 TEST_UTIL.deleteTable(tableName); 1053 table.close(); 1054 connection.close(); 1055 } 1056 1057 @Test 1058 public void testLocateRegionsWithRegionReplicas() throws IOException { 1059 int regionReplication = 3; 1060 byte[] family = Bytes.toBytes("cf"); 1061 TableName tableName = TableName.valueOf(name.getMethodName()); 1062 1063 // Create a table with region replicas 1064 TableDescriptorBuilder builder = TableDescriptorBuilder 1065 .newBuilder(tableName) 1066 .setRegionReplication(regionReplication) 1067 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1068 TEST_UTIL.getAdmin().createTable(builder.build()); 1069 1070 try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory. 1071 createConnection(TEST_UTIL.getConfiguration())) { 1072 // Get locations of the regions of the table 1073 List<HRegionLocation> locations = con.locateRegions(tableName, false, false); 1074 1075 // The size of the returned locations should be 3 1076 assertEquals(regionReplication, locations.size()); 1077 1078 // The replicaIds of the returned locations should be 0, 1 and 2 1079 Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication). 1080 boxed().collect(Collectors.toSet()); 1081 for (HRegionLocation location : locations) { 1082 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 1083 } 1084 } finally { 1085 TEST_UTIL.deleteTable(tableName); 1086 } 1087 } 1088 1089 @Test 1090 public void testMetaLookupThreadPoolCreated() throws Exception { 1091 final TableName tableName = TableName.valueOf(name.getMethodName()); 1092 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; 1093 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 1094 TEST_UTIL.getAdmin().disableTable(tableName); 1095 TEST_UTIL.getAdmin().deleteTable(tableName); 1096 } 1097 try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) { 1098 byte[] row = Bytes.toBytes("test"); 1099 ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); 1100 // check that metalookup pool would get created 1101 c.relocateRegion(tableName, row); 1102 ExecutorService ex = c.getCurrentMetaLookupPool(); 1103 assertNotNull(ex); 1104 } 1105 } 1106 1107 // There is no assertion, but you need to confirm that there is no resource leak output from netty 1108 @Test 1109 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException { 1110 TableName tableName = TableName.valueOf(name.getMethodName()); 1111 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 1112 TEST_UTIL.getAdmin().balancerSwitch(false, true); 1113 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 1114 Table table = connection.getTable(tableName)) { 1115 table.get(new Get(Bytes.toBytes("1"))); 1116 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName(); 1117 RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient(); 1118 rpcClient.cancelConnections(sn); 1119 Thread.sleep(1000); 1120 System.gc(); 1121 Thread.sleep(1000); 1122 } 1123 } 1124}