001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.concurrent.atomic.AtomicReference; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HColumnDescriptor; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HTableDescriptor; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 043import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 049import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 050import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 051import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 053import org.apache.hadoop.hbase.testclassification.ClientTests; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 058import org.junit.AfterClass; 059import org.junit.Assert; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Category({LargeTests.class, ClientTests.class}) 068public class TestReplicaWithCluster { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestReplicaWithCluster.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 075 076 private static final int NB_SERVERS = 3; 077 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); 078 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 079 080 // second minicluster used in testing of replication 081 private static HBaseTestingUtility HTU2; 082 private static final byte[] f = HConstants.CATALOG_FAMILY; 083 084 private final static int REFRESH_PERIOD = 1000; 085 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 086 087 /** 088 * This copro is used to synchronize the tests. 089 */ 090 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 091 static final AtomicLong sleepTime = new AtomicLong(0); 092 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 093 094 public SlowMeCopro() { 095 } 096 097 @Override 098 public Optional<RegionObserver> getRegionObserver() { 099 return Optional.of(this); 100 } 101 102 @Override 103 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 104 final Get get, final List<Cell> results) throws IOException { 105 106 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 107 CountDownLatch latch = cdl.get(); 108 try { 109 if (sleepTime.get() > 0) { 110 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 111 Thread.sleep(sleepTime.get()); 112 } else if (latch.getCount() > 0) { 113 LOG.info("Waiting for the counterCountDownLatch"); 114 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 115 if (latch.getCount() > 0) { 116 throw new RuntimeException("Can't wait more"); 117 } 118 } 119 } catch (InterruptedException e1) { 120 LOG.error(e1.toString(), e1); 121 } 122 } else { 123 LOG.info("We're not the primary replicas."); 124 } 125 } 126 } 127 128 /** 129 * This copro is used to simulate region server down exception for Get and Scan 130 */ 131 @CoreCoprocessor 132 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 133 134 public RegionServerStoppedCopro() { 135 } 136 137 @Override 138 public Optional<RegionObserver> getRegionObserver() { 139 return Optional.of(this); 140 } 141 142 @Override 143 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 144 final Get get, final List<Cell> results) throws IOException { 145 146 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 147 148 // Fail for the primary replica and replica 1 149 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 150 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 151 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 152 + " not running"); 153 } else { 154 LOG.info("We're replica region " + replicaId); 155 } 156 } 157 158 @Override 159 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 160 final Scan scan) throws IOException { 161 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 162 // Fail for the primary replica and replica 1 163 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 164 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 165 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 166 + " not running"); 167 } else { 168 LOG.info("We're replica region " + replicaId); 169 } 170 } 171 } 172 173 /** 174 * This copro is used to slow down the primary meta region scan a bit 175 */ 176 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 177 implements RegionCoprocessor, RegionObserver { 178 static boolean slowDownPrimaryMetaScan = false; 179 static boolean throwException = false; 180 181 @Override 182 public Optional<RegionObserver> getRegionObserver() { 183 return Optional.of(this); 184 } 185 186 @Override 187 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 188 final Get get, final List<Cell> results) throws IOException { 189 190 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 191 192 // Fail for the primary replica, but not for meta 193 if (throwException) { 194 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 195 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() 196 .getRegion().getRegionInfo()); 197 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 198 + " not running"); 199 } 200 } else { 201 LOG.info("Get, We're replica region " + replicaId); 202 } 203 } 204 205 @Override 206 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 207 final Scan scan) throws IOException { 208 209 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 210 211 // Slow down with the primary meta region scan 212 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 213 if (slowDownPrimaryMetaScan) { 214 LOG.info("Scan with primary meta region, slow down a bit"); 215 try { 216 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 217 } catch (InterruptedException ie) { 218 // Ingore 219 } 220 } 221 222 // Fail for the primary replica 223 if (throwException) { 224 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() 225 .getRegion().getRegionInfo()); 226 227 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 228 + " not running"); 229 } else { 230 LOG.info("Scan, We're replica region " + replicaId); 231 } 232 } else { 233 LOG.info("Scan, We're replica region " + replicaId); 234 } 235 } 236 } 237 238 @BeforeClass 239 public static void beforeClass() throws Exception { 240 // enable store file refreshing 241 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 242 REFRESH_PERIOD); 243 244 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 245 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 246 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 247 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 248 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 249 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 250 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 251 252 // Wait for primary call longer so make sure that it will get exception from the primary call 253 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 254 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 255 256 // Retry less so it can fail faster 257 HTU.getConfiguration().setInt("hbase.client.retries.number", 1); 258 259 // Enable meta replica at server side 260 HTU.getConfiguration().setInt("hbase.meta.replica.count", 2); 261 262 // Make sure master does not host system tables. 263 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 264 265 // Set system coprocessor so it can be applied to meta regions 266 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 267 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 268 269 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 270 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 271 272 HTU.startMiniCluster(NB_SERVERS); 273 HTU.getHBaseCluster().startMaster(); 274 } 275 276 @AfterClass 277 public static void afterClass() throws Exception { 278 if (HTU2 != null) 279 HTU2.shutdownMiniCluster(); 280 HTU.shutdownMiniCluster(); 281 } 282 283 @Test 284 public void testCreateDeleteTable() throws IOException { 285 // Create table then get the single region for our new table. 286 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); 287 hdt.setRegionReplication(NB_SERVERS); 288 hdt.addCoprocessor(SlowMeCopro.class.getName()); 289 Table table = HTU.createTable(hdt, new byte[][]{f}, null); 290 291 Put p = new Put(row); 292 p.addColumn(f, row, row); 293 table.put(p); 294 295 Get g = new Get(row); 296 Result r = table.get(g); 297 Assert.assertFalse(r.isStale()); 298 299 try { 300 // But if we ask for stale we will get it 301 SlowMeCopro.cdl.set(new CountDownLatch(1)); 302 g = new Get(row); 303 g.setConsistency(Consistency.TIMELINE); 304 r = table.get(g); 305 Assert.assertTrue(r.isStale()); 306 SlowMeCopro.cdl.get().countDown(); 307 } finally { 308 SlowMeCopro.cdl.get().countDown(); 309 SlowMeCopro.sleepTime.set(0); 310 } 311 312 HTU.getAdmin().disableTable(hdt.getTableName()); 313 HTU.deleteTable(hdt.getTableName()); 314 } 315 316 @Test 317 public void testChangeTable() throws Exception { 318 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable")) 319 .setRegionReplication(NB_SERVERS) 320 .setCoprocessor(SlowMeCopro.class.getName()) 321 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)) 322 .build(); 323 HTU.getAdmin().createTable(td); 324 Table table = HTU.getConnection().getTable(td.getTableName()); 325 // basic test: it should work. 326 Put p = new Put(row); 327 p.addColumn(f, row, row); 328 table.put(p); 329 330 Get g = new Get(row); 331 Result r = table.get(g); 332 Assert.assertFalse(r.isStale()); 333 334 // Add a CF, it should work. 335 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 336 td = TableDescriptorBuilder.newBuilder(td) 337 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)) 338 .build(); 339 HTU.getAdmin().disableTable(td.getTableName()); 340 HTU.getAdmin().modifyTable(td); 341 HTU.getAdmin().enableTable(td.getTableName()); 342 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 343 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 344 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 345 346 p = new Put(row); 347 p.addColumn(row, row, row); 348 table.put(p); 349 350 g = new Get(row); 351 r = table.get(g); 352 Assert.assertFalse(r.isStale()); 353 354 try { 355 SlowMeCopro.cdl.set(new CountDownLatch(1)); 356 g = new Get(row); 357 g.setConsistency(Consistency.TIMELINE); 358 r = table.get(g); 359 Assert.assertTrue(r.isStale()); 360 } finally { 361 SlowMeCopro.cdl.get().countDown(); 362 SlowMeCopro.sleepTime.set(0); 363 } 364 365 Admin admin = HTU.getAdmin(); 366 nHdt =admin.getDescriptor(td.getTableName()); 367 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 368 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 369 370 admin.disableTable(td.getTableName()); 371 admin.deleteTable(td.getTableName()); 372 admin.close(); 373 } 374 375 @SuppressWarnings("deprecation") 376 @Test 377 public void testReplicaAndReplication() throws Exception { 378 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication"); 379 hdt.setRegionReplication(NB_SERVERS); 380 381 HColumnDescriptor fam = new HColumnDescriptor(row); 382 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 383 hdt.addFamily(fam); 384 385 hdt.addCoprocessor(SlowMeCopro.class.getName()); 386 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 387 388 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 389 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 390 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 391 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 392 393 HTU2 = new HBaseTestingUtility(conf2); 394 HTU2.setZkCluster(miniZK); 395 HTU2.startMiniCluster(NB_SERVERS); 396 LOG.info("Setup second Zk"); 397 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 398 399 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 400 401 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 402 rpc.setClusterKey(HTU2.getClusterKey()); 403 admin.addPeer("2", rpc, null); 404 admin.close(); 405 406 Put p = new Put(row); 407 p.addColumn(row, row, row); 408 final Table table = HTU.getConnection().getTable(hdt.getTableName()); 409 table.put(p); 410 411 HTU.getAdmin().flush(table.getName()); 412 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 413 414 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 415 @Override public boolean evaluate() throws Exception { 416 try { 417 SlowMeCopro.cdl.set(new CountDownLatch(1)); 418 Get g = new Get(row); 419 g.setConsistency(Consistency.TIMELINE); 420 Result r = table.get(g); 421 Assert.assertTrue(r.isStale()); 422 return !r.isEmpty(); 423 } finally { 424 SlowMeCopro.cdl.get().countDown(); 425 SlowMeCopro.sleepTime.set(0); 426 } 427 } 428 }); 429 table.close(); 430 LOG.info("stale get on the first cluster done. Now for the second."); 431 432 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); 433 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 434 @Override public boolean evaluate() throws Exception { 435 try { 436 SlowMeCopro.cdl.set(new CountDownLatch(1)); 437 Get g = new Get(row); 438 g.setConsistency(Consistency.TIMELINE); 439 Result r = table2.get(g); 440 Assert.assertTrue(r.isStale()); 441 return !r.isEmpty(); 442 } finally { 443 SlowMeCopro.cdl.get().countDown(); 444 SlowMeCopro.sleepTime.set(0); 445 } 446 } 447 }); 448 table2.close(); 449 450 HTU.getAdmin().disableTable(hdt.getTableName()); 451 HTU.deleteTable(hdt.getTableName()); 452 453 HTU2.getAdmin().disableTable(hdt.getTableName()); 454 HTU2.deleteTable(hdt.getTableName()); 455 456 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 457 // the minicluster has negative impact of deleting all HConnections in JVM. 458 } 459 460 @Test 461 public void testBulkLoad() throws IOException { 462 // Create table then get the single region for our new table. 463 LOG.debug("Creating test table"); 464 HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad"); 465 hdt.setRegionReplication(NB_SERVERS); 466 hdt.addCoprocessor(SlowMeCopro.class.getName()); 467 Table table = HTU.createTable(hdt, new byte[][]{f}, null); 468 469 // create hfiles to load. 470 LOG.debug("Creating test data"); 471 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad"); 472 final int numRows = 10; 473 final byte[] qual = Bytes.toBytes("qual"); 474 final byte[] val = Bytes.toBytes("val"); 475 final List<Pair<byte[], String>> famPaths = new ArrayList<>(); 476 for (HColumnDescriptor col : hdt.getColumnFamilies()) { 477 Path hfile = new Path(dir, col.getNameAsString()); 478 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), 479 qual, val, numRows); 480 famPaths.add(new Pair<>(col.getName(), hfile.toString())); 481 } 482 483 // bulk load HFiles 484 LOG.debug("Loading test data"); 485 final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); 486 table = conn.getTable(hdt.getTableName()); 487 final String bulkToken = 488 new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); 489 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, 490 hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), 491 new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 492 @Override 493 protected Void rpcCall() throws Exception { 494 LOG.debug("Going to connect to server " + getLocation() + " for row " 495 + Bytes.toStringBinary(getRow())); 496 SecureBulkLoadClient secureClient = null; 497 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 498 try (Table table = conn.getTable(getTableName())) { 499 secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); 500 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 501 true, null, bulkToken); 502 } 503 return null; 504 } 505 }; 506 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); 507 RpcRetryingCaller<Void> caller = factory.newCaller(); 508 caller.callWithRetries(callable, 10000); 509 510 // verify we can read them from the primary 511 LOG.debug("Verifying data load"); 512 for (int i = 0; i < numRows; i++) { 513 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 514 Get g = new Get(row); 515 Result r = table.get(g); 516 Assert.assertFalse(r.isStale()); 517 } 518 519 // verify we can read them from the replica 520 LOG.debug("Verifying replica queries"); 521 try { 522 SlowMeCopro.cdl.set(new CountDownLatch(1)); 523 for (int i = 0; i < numRows; i++) { 524 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 525 Get g = new Get(row); 526 g.setConsistency(Consistency.TIMELINE); 527 Result r = table.get(g); 528 Assert.assertTrue(r.isStale()); 529 } 530 SlowMeCopro.cdl.get().countDown(); 531 } finally { 532 SlowMeCopro.cdl.get().countDown(); 533 SlowMeCopro.sleepTime.set(0); 534 } 535 536 HTU.getAdmin().disableTable(hdt.getTableName()); 537 HTU.deleteTable(hdt.getTableName()); 538 } 539 540 @Test 541 public void testReplicaGetWithPrimaryDown() throws IOException { 542 // Create table then get the single region for our new table. 543 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); 544 hdt.setRegionReplication(NB_SERVERS); 545 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 546 try { 547 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 548 549 Put p = new Put(row); 550 p.addColumn(f, row, row); 551 table.put(p); 552 553 // Flush so it can be picked by the replica refresher thread 554 HTU.flush(table.getName()); 555 556 // Sleep for some time until data is picked up by replicas 557 try { 558 Thread.sleep(2 * REFRESH_PERIOD); 559 } catch (InterruptedException e1) { 560 LOG.error(e1.toString(), e1); 561 } 562 563 // But if we ask for stale we will get it 564 Get g = new Get(row); 565 g.setConsistency(Consistency.TIMELINE); 566 Result r = table.get(g); 567 Assert.assertTrue(r.isStale()); 568 } finally { 569 HTU.getAdmin().disableTable(hdt.getTableName()); 570 HTU.deleteTable(hdt.getTableName()); 571 } 572 } 573 574 @Test 575 public void testReplicaScanWithPrimaryDown() throws IOException { 576 // Create table then get the single region for our new table. 577 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); 578 hdt.setRegionReplication(NB_SERVERS); 579 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 580 581 try { 582 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 583 584 Put p = new Put(row); 585 p.addColumn(f, row, row); 586 table.put(p); 587 588 // Flush so it can be picked by the replica refresher thread 589 HTU.flush(table.getName()); 590 591 // Sleep for some time until data is picked up by replicas 592 try { 593 Thread.sleep(2 * REFRESH_PERIOD); 594 } catch (InterruptedException e1) { 595 LOG.error(e1.toString(), e1); 596 } 597 598 // But if we ask for stale we will get it 599 // Instantiating the Scan class 600 Scan scan = new Scan(); 601 602 // Scanning the required columns 603 scan.addFamily(f); 604 scan.setConsistency(Consistency.TIMELINE); 605 606 // Getting the scan result 607 ResultScanner scanner = table.getScanner(scan); 608 609 Result r = scanner.next(); 610 611 Assert.assertTrue(r.isStale()); 612 } finally { 613 HTU.getAdmin().disableTable(hdt.getTableName()); 614 HTU.deleteTable(hdt.getTableName()); 615 } 616 } 617 618 @Test 619 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 620 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 621 HTU.getConfiguration().set( 622 "hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 623 // Create table then get the single region for our new table. 624 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithAsyncRpcClientImpl"); 625 hdt.setRegionReplication(NB_SERVERS); 626 hdt.addCoprocessor(SlowMeCopro.class.getName()); 627 628 try { 629 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 630 631 Put p = new Put(row); 632 p.addColumn(f, row, row); 633 table.put(p); 634 635 // Flush so it can be picked by the replica refresher thread 636 HTU.flush(table.getName()); 637 638 // Sleep for some time until data is picked up by replicas 639 try { 640 Thread.sleep(2 * REFRESH_PERIOD); 641 } catch (InterruptedException e1) { 642 LOG.error(e1.toString(), e1); 643 } 644 645 try { 646 // Create the new connection so new config can kick in 647 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 648 Table t = connection.getTable(hdt.getTableName()); 649 650 // But if we ask for stale we will get it 651 SlowMeCopro.cdl.set(new CountDownLatch(1)); 652 Get g = new Get(row); 653 g.setConsistency(Consistency.TIMELINE); 654 Result r = t.get(g); 655 Assert.assertTrue(r.isStale()); 656 SlowMeCopro.cdl.get().countDown(); 657 } finally { 658 SlowMeCopro.cdl.get().countDown(); 659 SlowMeCopro.sleepTime.set(0); 660 } 661 } finally { 662 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 663 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 664 HTU.getAdmin().disableTable(hdt.getTableName()); 665 HTU.deleteTable(hdt.getTableName()); 666 } 667 } 668 669 // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table 670 // scan will always get the result from primary meta region as long as the result is returned 671 // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region. 672 @Test 673 public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException { 674 HTU.getAdmin().setBalancerRunning(false, true); 675 676 ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true); 677 678 // Create table then get the single region for our new table. 679 HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion"); 680 hdt.setRegionReplication(2); 681 try { 682 683 HTU.createTable(hdt, new byte[][] { f }, null); 684 685 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; 686 687 // Get user table location, always get it from the primary meta replica 688 RegionLocations url = ((ClusterConnection) HTU.getConnection()) 689 .locateRegion(hdt.getTableName(), row, false, false); 690 691 } finally { 692 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; 693 ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false); 694 HTU.getAdmin().setBalancerRunning(true, true); 695 HTU.getAdmin().disableTable(hdt.getTableName()); 696 HTU.deleteTable(hdt.getTableName()); 697 } 698 } 699 700 701 // This test is to simulate the case that the meta region and the primary user region 702 // are down, hbase client is able to access user replica regions and return stale data. 703 // Meta replica is enabled to show the case that the meta replica region could be out of sync 704 // with the primary meta region. 705 @Test 706 public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { 707 HTU.getAdmin().setBalancerRunning(false, true); 708 709 ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true); 710 711 // Create table then get the single region for our new table. 712 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown"); 713 hdt.setRegionReplication(2); 714 try { 715 716 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 717 718 // Get Meta location 719 RegionLocations mrl = ((ClusterConnection) HTU.getConnection()) 720 .locateRegion(TableName.META_TABLE_NAME, 721 HConstants.EMPTY_START_ROW, false, false); 722 723 // Get user table location 724 RegionLocations url = ((ClusterConnection) HTU.getConnection()) 725 .locateRegion(hdt.getTableName(), row, false, false); 726 727 // Make sure that user primary region is co-hosted with the meta region 728 if (!url.getDefaultRegionLocation().getServerName().equals( 729 mrl.getDefaultRegionLocation().getServerName())) { 730 HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), 731 mrl.getDefaultRegionLocation().getServerName()); 732 } 733 734 // Make sure that the user replica region is not hosted by the same region server with 735 // primary 736 if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation() 737 .getServerName())) { 738 HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), 739 url.getDefaultRegionLocation().getServerName()); 740 } 741 742 // Wait until the meta table is updated with new location info 743 while (true) { 744 mrl = ((ClusterConnection) HTU.getConnection()) 745 .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); 746 747 // Get user table location 748 url = ((ClusterConnection) HTU.getConnection()) 749 .locateRegion(hdt.getTableName(), row, false, true); 750 751 LOG.info("meta locations " + mrl); 752 LOG.info("table locations " + url); 753 ServerName a = url.getDefaultRegionLocation().getServerName(); 754 ServerName b = mrl.getDefaultRegionLocation().getServerName(); 755 if(a.equals(b)) { 756 break; 757 } else { 758 LOG.info("Waiting for new region info to be updated in meta table"); 759 Thread.sleep(100); 760 } 761 } 762 763 Put p = new Put(row); 764 p.addColumn(f, row, row); 765 table.put(p); 766 767 // Flush so it can be picked by the replica refresher thread 768 HTU.flush(table.getName()); 769 770 // Sleep for some time until data is picked up by replicas 771 try { 772 Thread.sleep(2 * REFRESH_PERIOD); 773 } catch (InterruptedException e1) { 774 LOG.error(e1.toString(), e1); 775 } 776 777 // Simulating the RS down 778 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; 779 780 // The first Get is supposed to succeed 781 Get g = new Get(row); 782 g.setConsistency(Consistency.TIMELINE); 783 Result r = table.get(g); 784 Assert.assertTrue(r.isStale()); 785 786 // The second Get will succeed as well 787 r = table.get(g); 788 Assert.assertTrue(r.isStale()); 789 790 } finally { 791 ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false); 792 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; 793 HTU.getAdmin().setBalancerRunning(true, true); 794 HTU.getAdmin().disableTable(hdt.getTableName()); 795 HTU.deleteTable(hdt.getTableName()); 796 } 797 } 798}