001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.lang.reflect.Modifier; 029import java.net.SocketTimeoutException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Set; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.SynchronousQueue; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.stream.Collectors; 042import java.util.stream.IntStream; 043 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.RegionLocations; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Waiter; 054import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 055import org.apache.hadoop.hbase.exceptions.DeserializationException; 056import org.apache.hadoop.hbase.exceptions.RegionMovedException; 057import org.apache.hadoop.hbase.filter.Filter; 058import org.apache.hadoop.hbase.filter.FilterBase; 059import org.apache.hadoop.hbase.ipc.RpcClient; 060import org.apache.hadoop.hbase.master.HMaster; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.HRegionServer; 063import org.apache.hadoop.hbase.regionserver.Region; 064import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.util.Bytes; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.util.JVMClusterUtil; 069import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 070import org.apache.hadoop.hbase.util.Threads; 071import org.junit.AfterClass; 072import org.junit.Assert; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Ignore; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 084 085/** 086 * This class is for testing HBaseConnectionManager features 087 */ 088@Category({LargeTests.class}) 089public class TestConnectionImplementation { 090 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestConnectionImplementation.class); 094 095 private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class); 096 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 097 private static final TableName TABLE_NAME = 098 TableName.valueOf("test"); 099 private static final TableName TABLE_NAME1 = 100 TableName.valueOf("test1"); 101 private static final TableName TABLE_NAME2 = 102 TableName.valueOf("test2"); 103 private static final TableName TABLE_NAME3 = 104 TableName.valueOf("test3"); 105 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 106 private static final byte[] ROW = Bytes.toBytes("bbb"); 107 private static final byte[] ROW_X = Bytes.toBytes("xxx"); 108 private static final int RPC_RETRY = 5; 109 110 @Rule 111 public TestName name = new TestName(); 112 113 @BeforeClass 114 public static void setUpBeforeClass() throws Exception { 115 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 116 // Up the handlers; this test needs more than usual. 117 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 118 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 119 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3); 120 TEST_UTIL.startMiniCluster(2); 121 122 } 123 124 @AfterClass 125 public static void tearDownAfterClass() throws Exception { 126 TEST_UTIL.shutdownMiniCluster(); 127 } 128 129 @Test 130 public void testClusterConnection() throws IOException { 131 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 132 5, TimeUnit.SECONDS, 133 new SynchronousQueue<>(), 134 Threads.newDaemonThreadFactory("test-hcm")); 135 136 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 137 Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool); 138 // make sure the internally created ExecutorService is the one passed 139 assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool()); 140 141 final TableName tableName = TableName.valueOf(name.getMethodName()); 142 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 143 Table table = con1.getTable(tableName, otherPool); 144 145 ExecutorService pool = null; 146 147 if(table instanceof HTable) { 148 HTable t = (HTable) table; 149 // make sure passing a pool to the getTable does not trigger creation of an internal pool 150 assertNull("Internal Thread pool should be null", 151 ((ConnectionImplementation) con1).getCurrentBatchPool()); 152 // table should use the pool passed 153 assertTrue(otherPool == t.getPool()); 154 t.close(); 155 156 t = (HTable) con2.getTable(tableName); 157 // table should use the connectin's internal pool 158 assertTrue(otherPool == t.getPool()); 159 t.close(); 160 161 t = (HTable) con2.getTable(tableName); 162 // try other API too 163 assertTrue(otherPool == t.getPool()); 164 t.close(); 165 166 t = (HTable) con2.getTable(tableName); 167 // try other API too 168 assertTrue(otherPool == t.getPool()); 169 t.close(); 170 171 t = (HTable) con1.getTable(tableName); 172 pool = ((ConnectionImplementation) con1).getCurrentBatchPool(); 173 // make sure an internal pool was created 174 assertNotNull("An internal Thread pool should have been created", pool); 175 // and that the table is using it 176 assertTrue(t.getPool() == pool); 177 t.close(); 178 179 t = (HTable) con1.getTable(tableName); 180 // still using the *same* internal pool 181 assertTrue(t.getPool() == pool); 182 t.close(); 183 } else { 184 table.close(); 185 } 186 187 con1.close(); 188 189 // if the pool was created on demand it should be closed upon connection close 190 if(pool != null) { 191 assertTrue(pool.isShutdown()); 192 } 193 194 con2.close(); 195 // if the pool is passed, it is not closed 196 assertFalse(otherPool.isShutdown()); 197 otherPool.shutdownNow(); 198 } 199 200 /** 201 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object 202 * @throws IOException Unable to construct admin 203 */ 204 @Test 205 public void testAdminFactory() throws IOException { 206 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 207 Admin admin = con1.getAdmin(); 208 assertTrue(admin.getConnection() == con1); 209 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration()); 210 con1.close(); 211 } 212 213 // Fails too often! Needs work. HBASE-12558 214 // May only fail on non-linux machines? E.g. macosx. 215 @Ignore @Test (expected = RegionServerStoppedException.class) 216 // Depends on mulitcast messaging facility that seems broken in hbase2 217 // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of 218 // dead servers is broke" 219 public void testClusterStatus() throws Exception { 220 final TableName tableName = TableName.valueOf(name.getMethodName()); 221 byte[] cf = "cf".getBytes(); 222 byte[] rk = "rk1".getBytes(); 223 224 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer(); 225 rs.waitForServerOnline(); 226 final ServerName sn = rs.getRegionServer().getServerName(); 227 228 Table t = TEST_UTIL.createTable(tableName, cf); 229 TEST_UTIL.waitTableAvailable(tableName); 230 TEST_UTIL.waitUntilNoRegionsInTransition(); 231 232 final ConnectionImplementation hci = (ConnectionImplementation)TEST_UTIL.getConnection(); 233 try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 234 while (l.getRegionLocation(rk).getPort() != sn.getPort()) { 235 TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo(). 236 getEncodedNameAsBytes(), Bytes.toBytes(sn.toString())); 237 TEST_UTIL.waitUntilNoRegionsInTransition(); 238 hci.clearRegionCache(tableName); 239 } 240 Assert.assertNotNull(hci.clusterStatusListener); 241 TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000); 242 } 243 244 Put p1 = new Put(rk); 245 p1.addColumn(cf, "qual".getBytes(), "val".getBytes()); 246 t.put(p1); 247 248 rs.getRegionServer().abort("I'm dead"); 249 250 // We want the status to be updated. That's a least 10 second 251 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 252 @Override 253 public boolean evaluate() throws Exception { 254 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager(). 255 getDeadServers().isDeadServer(sn); 256 } 257 }); 258 259 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() { 260 @Override 261 public boolean evaluate() throws Exception { 262 return hci.clusterStatusListener.isDeadServer(sn); 263 } 264 }); 265 266 t.close(); 267 hci.getClient(sn); // will throw an exception: RegionServerStoppedException 268 } 269 270 /** 271 * Test that we can handle connection close: it will trigger a retry, but the calls will finish. 272 */ 273 @Test 274 public void testConnectionCloseAllowsInterrupt() throws Exception { 275 testConnectionClose(true); 276 } 277 278 @Test 279 public void testConnectionNotAllowsInterrupt() throws Exception { 280 testConnectionClose(false); 281 } 282 283 private void testConnectionClose(boolean allowsInterrupt) throws Exception { 284 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); 285 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 286 287 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 288 289 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 290 // We want to work on a separate connection. 291 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 292 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot 293 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. 294 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire 295 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); 296 // to avoid the client to be stuck when do the Get 297 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000); 298 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000); 299 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); 300 301 Connection connection = ConnectionFactory.createConnection(c2); 302 final Table table = connection.getTable(tableName); 303 304 Put put = new Put(ROW); 305 put.addColumn(FAM_NAM, ROW, ROW); 306 table.put(put); 307 308 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3 309 final AtomicInteger step = new AtomicInteger(0); 310 311 final AtomicReference<Throwable> failed = new AtomicReference<>(null); 312 Thread t = new Thread("testConnectionCloseThread") { 313 @Override 314 public void run() { 315 int done = 0; 316 try { 317 step.set(1); 318 while (step.get() == 1) { 319 Get get = new Get(ROW); 320 table.get(get); 321 done++; 322 if (done % 100 == 0) 323 LOG.info("done=" + done); 324 // without the sleep, will cause the exception for too many files in 325 // org.apache.hadoop.hdfs.server.datanode.DataXceiver 326 Thread.sleep(100); 327 } 328 } catch (Throwable t) { 329 failed.set(t); 330 LOG.error(t.toString(), t); 331 } 332 step.set(3); 333 } 334 }; 335 t.start(); 336 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() { 337 @Override 338 public boolean evaluate() throws Exception { 339 return step.get() == 1; 340 } 341 }); 342 343 ServerName sn; 344 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 345 sn = rl.getRegionLocation(ROW).getServerName(); 346 } 347 ConnectionImplementation conn = 348 (ConnectionImplementation) connection; 349 RpcClient rpcClient = conn.getRpcClient(); 350 351 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); 352 for (int i = 0; i < 5000; i++) { 353 rpcClient.cancelConnections(sn); 354 Thread.sleep(5); 355 } 356 357 step.compareAndSet(1, 2); 358 // The test may fail here if the thread doing the gets is stuck. The way to find 359 // out what's happening is to look for the thread named 'testConnectionCloseThread' 360 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() { 361 @Override 362 public boolean evaluate() throws Exception { 363 return step.get() == 3; 364 } 365 }); 366 table.close(); 367 connection.close(); 368 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); 369 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 370 } 371 372 /** 373 * Test that connection can become idle without breaking everything. 374 */ 375 @Test 376 public void testConnectionIdle() throws Exception { 377 final TableName tableName = TableName.valueOf(name.getMethodName()); 378 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 379 int idleTime = 20000; 380 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 381 382 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 383 // We want to work on a separate connection. 384 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 385 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed 386 c2.setInt(RpcClient.IDLE_TIME, idleTime); 387 388 Connection connection = ConnectionFactory.createConnection(c2); 389 final Table table = connection.getTable(tableName); 390 391 Put put = new Put(ROW); 392 put.addColumn(FAM_NAM, ROW, ROW); 393 table.put(put); 394 395 ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); 396 mee.setValue(System.currentTimeMillis()); 397 EnvironmentEdgeManager.injectEdge(mee); 398 LOG.info("first get"); 399 table.get(new Get(ROW)); 400 401 LOG.info("first get - changing the time & sleeping"); 402 mee.incValue(idleTime + 1000); 403 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle. 404 // 1500 = sleep time in RpcClient#waitForWork + a margin 405 406 LOG.info("second get - connection has been marked idle in the middle"); 407 // To check that the connection actually became idle would need to read some private 408 // fields of RpcClient. 409 table.get(new Get(ROW)); 410 mee.incValue(idleTime + 1000); 411 412 LOG.info("third get - connection is idle, but the reader doesn't know yet"); 413 // We're testing here a special case: 414 // time limit reached BUT connection not yet reclaimed AND a new call. 415 // in this situation, we don't close the connection, instead we use it immediately. 416 // If we're very unlucky we can have a race condition in the test: the connection is already 417 // under closing when we do the get, so we have an exception, and we don't retry as the 418 // retry number is 1. The probability is very very low, and seems acceptable for now. It's 419 // a test issue only. 420 table.get(new Get(ROW)); 421 422 LOG.info("we're done - time will change back"); 423 424 table.close(); 425 426 connection.close(); 427 EnvironmentEdgeManager.reset(); 428 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 429 } 430 431 /** 432 * Test that the connection to the dead server is cut immediately when we receive the 433 * notification. 434 * @throws Exception 435 */ 436 @Test 437 public void testConnectionCut() throws Exception { 438 final TableName tableName = TableName.valueOf(name.getMethodName()); 439 440 TEST_UTIL.createTable(tableName, FAM_NAM).close(); 441 boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); 442 443 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); 444 // We want to work on a separate connection. 445 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 446 // try only once w/o any retry 447 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 448 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); 449 450 final Connection connection = ConnectionFactory.createConnection(c2); 451 final Table table = connection.getTable(tableName); 452 453 Put p = new Put(FAM_NAM); 454 p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); 455 table.put(p); 456 457 final ConnectionImplementation hci = (ConnectionImplementation) connection; 458 459 final HRegionLocation loc; 460 try(RegionLocator rl = connection.getRegionLocator(tableName)) { 461 loc = rl.getRegionLocation(FAM_NAM); 462 } 463 464 Get get = new Get(FAM_NAM); 465 Assert.assertNotNull(table.get(get)); 466 467 get = new Get(FAM_NAM); 468 get.setFilter(new BlockingFilter()); 469 470 // This thread will mark the server as dead while we're waiting during a get. 471 Thread t = new Thread() { 472 @Override 473 public void run() { 474 synchronized (syncBlockingFilter) { 475 try { 476 syncBlockingFilter.wait(); 477 } catch (InterruptedException e) { 478 throw new RuntimeException(e); 479 } 480 } 481 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName()); 482 } 483 }; 484 485 t.start(); 486 try { 487 table.get(get); 488 Assert.fail(); 489 } catch (IOException expected) { 490 LOG.debug("Received: " + expected); 491 Assert.assertFalse(expected instanceof SocketTimeoutException); 492 Assert.assertFalse(syncBlockingFilter.get()); 493 } finally { 494 syncBlockingFilter.set(true); 495 t.join(); 496 TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true); 497 } 498 499 table.close(); 500 connection.close(); 501 } 502 503 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false); 504 505 public static class BlockingFilter extends FilterBase { 506 @Override 507 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { 508 int i = 0; 509 while (i++ < 1000 && !syncBlockingFilter.get()) { 510 synchronized (syncBlockingFilter) { 511 syncBlockingFilter.notifyAll(); 512 } 513 Threads.sleep(100); 514 } 515 syncBlockingFilter.set(true); 516 return false; 517 } 518 @Override 519 public ReturnCode filterCell(final Cell ignored) throws IOException { 520 return ReturnCode.INCLUDE; 521 } 522 523 public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{ 524 return new BlockingFilter(); 525 } 526 } 527 528 /** 529 * Test that when we delete a location using the first row of a region 530 * that we really delete it. 531 * @throws Exception 532 */ 533 @Test 534 public void testRegionCaching() throws Exception { 535 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); 536 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 537 // test with no retry, or client cache will get updated after the first failure 538 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 539 Connection connection = ConnectionFactory.createConnection(conf); 540 final Table table = connection.getTable(TABLE_NAME); 541 542 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); 543 Put put = new Put(ROW); 544 put.addColumn(FAM_NAM, ROW, ROW); 545 table.put(put); 546 547 ConnectionImplementation conn = (ConnectionImplementation) connection; 548 549 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 550 551 // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at 552 // a location where the port is current port number +1 -- i.e. a non-existent location. 553 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 554 final int nextPort = loc.getPort() + 1; 555 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), 556 ServerName.valueOf("127.0.0.1", nextPort, 557 HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP); 558 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW) 559 .getRegionLocation().getPort(), nextPort); 560 561 conn.clearRegionCache(TABLE_NAME, ROW.clone()); 562 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW); 563 assertNull("What is this location?? " + rl, rl); 564 565 // We're now going to move the region and check that it works for the client 566 // First a new put to add the location in the cache 567 conn.clearRegionCache(TABLE_NAME); 568 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); 569 Put put2 = new Put(ROW); 570 put2.addColumn(FAM_NAM, ROW, ROW); 571 table.put(put2); 572 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 573 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone())); 574 575 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 576 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 577 578 // We can wait for all regions to be online, that makes log reading easier when debugging 579 TEST_UTIL.waitUntilNoRegionsInTransition(); 580 581 // Now moving the region to the second server 582 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation(); 583 byte[] regionName = toMove.getRegionInfo().getRegionName(); 584 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 585 586 // Choose the other server. 587 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 588 int destServerId = curServerId == 0? 1: 0; 589 590 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 591 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 592 593 ServerName destServerName = destServer.getServerName(); 594 595 // Check that we are in the expected state 596 Assert.assertTrue(curServer != destServer); 597 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); 598 Assert.assertFalse( toMove.getPort() == destServerName.getPort()); 599 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 600 Assert.assertNull(destServer.getOnlineRegion(regionName)); 601 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 602 getAssignmentManager().hasRegionsInTransition()); 603 604 // Moving. It's possible that we don't have all the regions online at this point, so 605 // the test must depend only on the region we're looking at. 606 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); 607 TEST_UTIL.getAdmin().move( 608 toMove.getRegionInfo().getEncodedNameAsBytes(), 609 destServerName.getServerName().getBytes() 610 ); 611 612 while (destServer.getOnlineRegion(regionName) == null || 613 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 614 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 615 master.getAssignmentManager().hasRegionsInTransition()) { 616 // wait for the move to be finished 617 Thread.sleep(1); 618 } 619 620 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 621 622 // Check our new state. 623 Assert.assertNull(curServer.getOnlineRegion(regionName)); 624 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 625 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 626 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); 627 628 629 // Cache was NOT updated and points to the wrong server 630 Assert.assertFalse( 631 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation() 632 .getPort() == destServerName.getPort()); 633 634 // This part relies on a number of tries equals to 1. 635 // We do a put and expect the cache to be updated, even if we don't retry 636 LOG.info("Put starting"); 637 Put put3 = new Put(ROW); 638 put3.addColumn(FAM_NAM, ROW, ROW); 639 try { 640 table.put(put3); 641 Assert.fail("Unreachable point"); 642 } catch (RetriesExhaustedWithDetailsException e) { 643 LOG.info("Put done, exception caught: " + e.getClass()); 644 Assert.assertEquals(1, e.getNumExceptions()); 645 Assert.assertEquals(1, e.getCauses().size()); 646 Assert.assertArrayEquals(ROW, e.getRow(0).getRow()); 647 648 // Check that we unserialized the exception as expected 649 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0)); 650 Assert.assertNotNull(cause); 651 Assert.assertTrue(cause instanceof RegionMovedException); 652 } catch (RetriesExhaustedException ree) { 653 // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException 654 // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue. 655 LOG.info("Put done, exception caught: " + ree.getClass()); 656 Throwable cause = ClientExceptionsUtil.findException(ree.getCause()); 657 Assert.assertNotNull(cause); 658 Assert.assertTrue(cause instanceof RegionMovedException); 659 } 660 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW)); 661 Assert.assertEquals( 662 "Previous server was " + curServer.getServerName().getHostAndPort(), 663 destServerName.getPort(), 664 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 665 666 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 667 .containsKey(encodedRegionNameBytes)); 668 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 669 .containsKey(encodedRegionNameBytes)); 670 671 // We move it back to do another test with a scan 672 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 673 TEST_UTIL.getAdmin().move( 674 toMove.getRegionInfo().getEncodedNameAsBytes(), 675 curServer.getServerName().getServerName().getBytes() 676 ); 677 678 while (curServer.getOnlineRegion(regionName) == null || 679 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 680 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 681 master.getAssignmentManager().hasRegionsInTransition()) { 682 // wait for the move to be finished 683 Thread.sleep(1); 684 } 685 686 // Check our new state. 687 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 688 Assert.assertNull(destServer.getOnlineRegion(regionName)); 689 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); 690 691 // Cache was NOT updated and points to the wrong server 692 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() == 693 curServer.getServerName().getPort()); 694 695 Scan sc = new Scan(); 696 sc.setStopRow(ROW); 697 sc.setStartRow(ROW); 698 699 // The scanner takes the max retries from the connection configuration, not the table as 700 // the put. 701 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 702 703 try { 704 ResultScanner rs = table.getScanner(sc); 705 while (rs.next() != null) { 706 } 707 Assert.fail("Unreachable point"); 708 } catch (RetriesExhaustedException e) { 709 LOG.info("Scan done, expected exception caught: " + e.getClass()); 710 } 711 712 // Cache is updated with the right value. 713 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); 714 Assert.assertEquals( 715 "Previous server was "+destServer.getServerName().getHostAndPort(), 716 curServer.getServerName().getPort(), 717 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); 718 719 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 720 table.close(); 721 connection.close(); 722 } 723 724 /** 725 * Test that Connection or Pool are not closed when managed externally 726 * @throws Exception 727 */ 728 @Test 729 public void testConnectionManagement() throws Exception{ 730 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); 731 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 732 Table table = conn.getTable(TABLE_NAME1); 733 table.close(); 734 assertFalse(conn.isClosed()); 735 if(table instanceof HTable) { 736 assertFalse(((HTable) table).getPool().isShutdown()); 737 } 738 table = conn.getTable(TABLE_NAME1); 739 table.close(); 740 if(table instanceof HTable) { 741 assertFalse(((HTable) table).getPool().isShutdown()); 742 } 743 conn.close(); 744 if(table instanceof HTable) { 745 assertTrue(((HTable) table).getPool().isShutdown()); 746 } 747 table0.close(); 748 } 749 750 /** 751 * Test that stale cache updates don't override newer cached values. 752 */ 753 @Test 754 public void testCacheSeqNums() throws Exception{ 755 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM); 756 Put put = new Put(ROW); 757 put.addColumn(FAM_NAM, ROW, ROW); 758 table.put(put); 759 ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection(); 760 761 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 762 assertNotNull(location); 763 764 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); 765 766 // Same server as already in cache reporting - overwrites any value despite seqNum. 767 int nextPort = location.getPort() + 1; 768 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 769 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 770 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 771 Assert.assertEquals(nextPort, location.getPort()); 772 773 // No source specified - same. 774 nextPort = location.getPort() + 1; 775 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), 776 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 777 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 778 Assert.assertEquals(nextPort, location.getPort()); 779 780 // Higher seqNum - overwrites lower seqNum. 781 nextPort = location.getPort() + 1; 782 conn.updateCachedLocation(location.getRegionInfo(), anySource, 783 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1); 784 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 785 Assert.assertEquals(nextPort, location.getPort()); 786 787 // Lower seqNum - does not overwrite higher seqNum. 788 nextPort = location.getPort() + 1; 789 conn.updateCachedLocation(location.getRegionInfo(), anySource, 790 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); 791 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation(); 792 Assert.assertEquals(nextPort - 1, location.getPort()); 793 table.close(); 794 } 795 796 @Test 797 public void testClosing() throws Exception { 798 Configuration configuration = 799 new Configuration(TEST_UTIL.getConfiguration()); 800 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, 801 String.valueOf(ThreadLocalRandom.current().nextInt())); 802 803 // as connection caching is going away, now we're just testing 804 // that closed connection does actually get closed. 805 806 Connection c1 = ConnectionFactory.createConnection(configuration); 807 Connection c2 = ConnectionFactory.createConnection(configuration); 808 // no caching, different connections 809 assertTrue(c1 != c2); 810 811 // closing independently 812 c1.close(); 813 assertTrue(c1.isClosed()); 814 assertFalse(c2.isClosed()); 815 816 c2.close(); 817 assertTrue(c2.isClosed()); 818 } 819 820 /** 821 * Trivial test to verify that nobody messes with 822 * {@link ConnectionFactory#createConnection(Configuration)} 823 */ 824 @Test 825 public void testCreateConnection() throws Exception { 826 Configuration configuration = TEST_UTIL.getConfiguration(); 827 Connection c1 = ConnectionFactory.createConnection(configuration); 828 Connection c2 = ConnectionFactory.createConnection(configuration); 829 // created from the same configuration, yet they are different 830 assertTrue(c1 != c2); 831 assertTrue(c1.getConfiguration() == c2.getConfiguration()); 832 } 833 834 /** 835 * This test checks that one can connect to the cluster with only the 836 * ZooKeeper quorum set. Other stuff like master address will be read 837 * from ZK by the client. 838 */ 839 @Test 840 public void testConnection() throws Exception{ 841 // We create an empty config and add the ZK address. 842 Configuration c = new Configuration(); 843 c.set(HConstants.ZOOKEEPER_QUORUM, 844 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)); 845 c.set(HConstants.ZOOKEEPER_CLIENT_PORT, 846 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT)); 847 848 // This should be enough to connect 849 ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c); 850 assertTrue(conn.isMasterRunning()); 851 conn.close(); 852 } 853 854 private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception { 855 Field numTries = hci.getClass().getDeclaredField("numTries"); 856 numTries.setAccessible(true); 857 Field modifiersField = Field.class.getDeclaredField("modifiers"); 858 modifiersField.setAccessible(true); 859 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL); 860 final int prevNumRetriesVal = (Integer)numTries.get(hci); 861 numTries.set(hci, newVal); 862 863 return prevNumRetriesVal; 864 } 865 866 @Test 867 public void testMulti() throws Exception { 868 Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM); 869 try { 870 ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection(); 871 872 // We're now going to move the region and check that it works for the client 873 // First a new put to add the location in the cache 874 conn.clearRegionCache(TABLE_NAME3); 875 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); 876 877 TEST_UTIL.getAdmin().setBalancerRunning(false, false); 878 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 879 880 // We can wait for all regions to be online, that makes log reading easier when debugging 881 TEST_UTIL.waitUntilNoRegionsInTransition(); 882 883 Put put = new Put(ROW_X); 884 put.addColumn(FAM_NAM, ROW_X, ROW_X); 885 table.put(put); 886 887 // Now moving the region to the second server 888 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation(); 889 byte[] regionName = toMove.getRegionInfo().getRegionName(); 890 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); 891 892 // Choose the other server. 893 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); 894 int destServerId = (curServerId == 0 ? 1 : 0); 895 896 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); 897 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); 898 899 ServerName destServerName = destServer.getServerName(); 900 ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 901 902 //find another row in the cur server that is less than ROW_X 903 List<HRegion> regions = curServer.getRegions(TABLE_NAME3); 904 byte[] otherRow = null; 905 for (Region region : regions) { 906 if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) 907 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { 908 otherRow = region.getRegionInfo().getStartKey(); 909 break; 910 } 911 } 912 assertNotNull(otherRow); 913 // If empty row, set it to first row.-f 914 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa"); 915 Put put2 = new Put(otherRow); 916 put2.addColumn(FAM_NAM, otherRow, otherRow); 917 table.put(put2); //cache put2's location 918 919 // Check that we are in the expected state 920 Assert.assertTrue(curServer != destServer); 921 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); 922 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); 923 Assert.assertNotNull(curServer.getOnlineRegion(regionName)); 924 Assert.assertNull(destServer.getOnlineRegion(regionName)); 925 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). 926 getAssignmentManager().hasRegionsInTransition()); 927 928 // Moving. It's possible that we don't have all the regions online at this point, so 929 // the test depends only on the region we're looking at. 930 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); 931 TEST_UTIL.getAdmin().move( 932 toMove.getRegionInfo().getEncodedNameAsBytes(), 933 destServerName.getServerName().getBytes() 934 ); 935 936 while (destServer.getOnlineRegion(regionName) == null || 937 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 938 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || 939 master.getAssignmentManager().hasRegionsInTransition()) { 940 // wait for the move to be finished 941 Thread.sleep(1); 942 } 943 944 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); 945 946 // Check our new state. 947 Assert.assertNull(curServer.getOnlineRegion(regionName)); 948 Assert.assertNotNull(destServer.getOnlineRegion(regionName)); 949 Assert.assertFalse(destServer.getRegionsInTransitionInRS() 950 .containsKey(encodedRegionNameBytes)); 951 Assert.assertFalse(curServer.getRegionsInTransitionInRS() 952 .containsKey(encodedRegionNameBytes)); 953 954 955 // Cache was NOT updated and points to the wrong server 956 Assert.assertFalse( 957 conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() 958 .getPort() == destServerName.getPort()); 959 960 // Hijack the number of retry to fail after 2 tries 961 final int prevNumRetriesVal = setNumTries(conn, 2); 962 963 Put put3 = new Put(ROW_X); 964 put3.addColumn(FAM_NAM, ROW_X, ROW_X); 965 Put put4 = new Put(otherRow); 966 put4.addColumn(FAM_NAM, otherRow, otherRow); 967 968 // do multi 969 ArrayList<Put> actions = Lists.newArrayList(put4, put3); 970 table.batch(actions, null); // first should be a valid row, 971 // second we get RegionMovedException. 972 973 setNumTries(conn, prevNumRetriesVal); 974 } finally { 975 table.close(); 976 } 977 } 978 979 @Test 980 public void testErrorBackoffTimeCalculation() throws Exception { 981 // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. 982 final long ANY_PAUSE = 100; 983 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); 984 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); 985 986 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); 987 EnvironmentEdgeManager.injectEdge(timeMachine); 988 try { 989 long largeAmountOfTime = ANY_PAUSE * 1000; 990 ConnectionImplementation.ServerErrorTracker tracker = 991 new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); 992 993 // The default backoff is 0. 994 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE)); 995 996 // Check some backoff values from HConstants sequence. 997 tracker.reportServerError(location); 998 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 999 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1000 tracker.reportServerError(location); 1001 tracker.reportServerError(location); 1002 tracker.reportServerError(location); 1003 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3], 1004 tracker.calculateBackoffTime(location, ANY_PAUSE)); 1005 1006 // All of this shouldn't affect backoff for different location. 1007 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1008 tracker.reportServerError(diffLocation); 1009 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0], 1010 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); 1011 1012 // Check with different base. 1013 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3], 1014 tracker.calculateBackoffTime(location, ANY_PAUSE * 2)); 1015 } finally { 1016 EnvironmentEdgeManager.reset(); 1017 } 1018 } 1019 1020 private static void assertEqualsWithJitter(long expected, long actual) { 1021 assertEqualsWithJitter(expected, actual, expected); 1022 } 1023 1024 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) { 1025 assertTrue("Value not within jitter: " + expected + " vs " + actual, 1026 Math.abs(actual - expected) <= (0.01f * jitterBase)); 1027 } 1028 1029 @Test 1030 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException { 1031 Configuration config = new Configuration(TEST_UTIL.getConfiguration()); 1032 1033 final TableName tableName = TableName.valueOf(name.getMethodName()); 1034 TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close(); 1035 1036 Connection connection = ConnectionFactory.createConnection(config); 1037 Table table = connection.getTable(tableName); 1038 1039 // this will cache the meta location and table's region location 1040 table.get(new Get(Bytes.toBytes("foo"))); 1041 1042 // restart HBase 1043 TEST_UTIL.shutdownMiniHBaseCluster(); 1044 TEST_UTIL.restartHBaseCluster(2); 1045 // this should be able to discover new locations for meta and table's region 1046 table.get(new Get(Bytes.toBytes("foo"))); 1047 TEST_UTIL.deleteTable(tableName); 1048 table.close(); 1049 connection.close(); 1050 } 1051 1052 @Test 1053 public void testLocateRegionsWithRegionReplicas() throws IOException { 1054 int regionReplication = 3; 1055 byte[] family = Bytes.toBytes("cf"); 1056 TableName tableName = TableName.valueOf(name.getMethodName()); 1057 1058 // Create a table with region replicas 1059 TableDescriptorBuilder builder = TableDescriptorBuilder 1060 .newBuilder(tableName) 1061 .setRegionReplication(regionReplication) 1062 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 1063 TEST_UTIL.getAdmin().createTable(builder.build()); 1064 1065 try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory. 1066 createConnection(TEST_UTIL.getConfiguration())) { 1067 // Get locations of the regions of the table 1068 List<HRegionLocation> locations = con.locateRegions(tableName, false, false); 1069 1070 // The size of the returned locations should be 3 1071 assertEquals(regionReplication, locations.size()); 1072 1073 // The replicaIds of the returned locations should be 0, 1 and 2 1074 Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication). 1075 boxed().collect(Collectors.toSet()); 1076 for (HRegionLocation location : locations) { 1077 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId())); 1078 } 1079 } finally { 1080 TEST_UTIL.deleteTable(tableName); 1081 } 1082 } 1083}