001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.concurrent.atomic.AtomicReference; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HColumnDescriptor; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HTableDescriptor; 039import org.apache.hadoop.hbase.RegionLocations; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 044import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.ObserverContext; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.RegionObserver; 049import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 051import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 052import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 053import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@Category({LargeTests.class, ClientTests.class}) 071public class TestReplicaWithCluster { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestReplicaWithCluster.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 078 079 private static final int NB_SERVERS = 3; 080 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); 081 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 082 083 // second minicluster used in testing of replication 084 private static HBaseTestingUtility HTU2; 085 private static final byte[] f = HConstants.CATALOG_FAMILY; 086 087 private final static int REFRESH_PERIOD = 1000; 088 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 089 090 @Rule 091 public TestName name = new TestName(); 092 093 /** 094 * This copro is used to synchronize the tests. 095 */ 096 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 097 static final AtomicLong sleepTime = new AtomicLong(0); 098 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 099 100 public SlowMeCopro() { 101 } 102 103 @Override 104 public Optional<RegionObserver> getRegionObserver() { 105 return Optional.of(this); 106 } 107 108 @Override 109 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 110 final Get get, final List<Cell> results) throws IOException { 111 112 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 113 CountDownLatch latch = cdl.get(); 114 try { 115 if (sleepTime.get() > 0) { 116 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 117 Thread.sleep(sleepTime.get()); 118 } else if (latch.getCount() > 0) { 119 LOG.info("Waiting for the counterCountDownLatch"); 120 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 121 if (latch.getCount() > 0) { 122 throw new RuntimeException("Can't wait more"); 123 } 124 } 125 } catch (InterruptedException e1) { 126 LOG.error(e1.toString(), e1); 127 } 128 } else { 129 LOG.info("We're not the primary replicas."); 130 } 131 } 132 } 133 134 /** 135 * This copro is used to simulate region server down exception for Get and Scan 136 */ 137 @CoreCoprocessor 138 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 139 140 public RegionServerStoppedCopro() { 141 } 142 143 @Override 144 public Optional<RegionObserver> getRegionObserver() { 145 return Optional.of(this); 146 } 147 148 @Override 149 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 150 final Get get, final List<Cell> results) throws IOException { 151 152 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 153 154 // Fail for the primary replica and replica 1 155 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 156 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 157 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 158 + " not running"); 159 } else { 160 LOG.info("We're replica region " + replicaId); 161 } 162 } 163 164 @Override 165 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 166 final Scan scan) throws IOException { 167 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 168 // Fail for the primary replica and replica 1 169 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 170 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 171 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 172 + " not running"); 173 } else { 174 LOG.info("We're replica region " + replicaId); 175 } 176 } 177 } 178 179 /** 180 * This copro is used to slow down the primary meta region scan a bit 181 */ 182 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 183 implements RegionCoprocessor, RegionObserver { 184 static boolean slowDownPrimaryMetaScan = false; 185 static boolean throwException = false; 186 187 @Override 188 public Optional<RegionObserver> getRegionObserver() { 189 return Optional.of(this); 190 } 191 192 @Override 193 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 194 final Get get, final List<Cell> results) throws IOException { 195 196 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 197 198 // Fail for the primary replica, but not for meta 199 if (throwException) { 200 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 201 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() 202 .getRegion().getRegionInfo()); 203 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 204 + " not running"); 205 } 206 } else { 207 LOG.info("Get, We're replica region " + replicaId); 208 } 209 } 210 211 @Override 212 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 213 final Scan scan) throws IOException { 214 215 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 216 217 // Slow down with the primary meta region scan 218 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 219 if (slowDownPrimaryMetaScan) { 220 LOG.info("Scan with primary meta region, slow down a bit"); 221 try { 222 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 223 } catch (InterruptedException ie) { 224 // Ingore 225 } 226 } 227 228 // Fail for the primary replica 229 if (throwException) { 230 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() 231 .getRegion().getRegionInfo()); 232 233 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() 234 + " not running"); 235 } else { 236 LOG.info("Scan, We're replica region " + replicaId); 237 } 238 } else { 239 LOG.info("Scan, We're replica region " + replicaId); 240 } 241 } 242 } 243 244 @BeforeClass 245 public static void beforeClass() throws Exception { 246 // enable store file refreshing 247 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 248 REFRESH_PERIOD); 249 250 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 251 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 252 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 253 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 254 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 255 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 256 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 257 258 // Wait for primary call longer so make sure that it will get exception from the primary call 259 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 260 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 261 262 // Make sure master does not host system tables. 263 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 264 265 // Set system coprocessor so it can be applied to meta regions 266 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 267 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 268 269 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 270 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 271 272 HTU.startMiniCluster(NB_SERVERS); 273 // Enable meta replica at server side 274 HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2); 275 276 HTU.getHBaseCluster().startMaster(); 277 } 278 279 @AfterClass 280 public static void afterClass() throws Exception { 281 if (HTU2 != null) 282 HTU2.shutdownMiniCluster(); 283 HTU.shutdownMiniCluster(); 284 } 285 286 @Test 287 public void testCreateDeleteTable() throws IOException { 288 // Create table then get the single region for our new table. 289 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 290 hdt.setRegionReplication(NB_SERVERS); 291 hdt.addCoprocessor(SlowMeCopro.class.getName()); 292 Table table = HTU.createTable(hdt, new byte[][]{f}, null); 293 294 Put p = new Put(row); 295 p.addColumn(f, row, row); 296 table.put(p); 297 298 Get g = new Get(row); 299 Result r = table.get(g); 300 Assert.assertFalse(r.isStale()); 301 302 try { 303 // But if we ask for stale we will get it 304 SlowMeCopro.cdl.set(new CountDownLatch(1)); 305 g = new Get(row); 306 g.setConsistency(Consistency.TIMELINE); 307 r = table.get(g); 308 Assert.assertTrue(r.isStale()); 309 SlowMeCopro.cdl.get().countDown(); 310 } finally { 311 SlowMeCopro.cdl.get().countDown(); 312 SlowMeCopro.sleepTime.set(0); 313 } 314 315 HTU.getAdmin().disableTable(hdt.getTableName()); 316 HTU.deleteTable(hdt.getTableName()); 317 } 318 319 @Test 320 public void testChangeTable() throws Exception { 321 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 322 .setRegionReplication(NB_SERVERS) 323 .setCoprocessor(SlowMeCopro.class.getName()) 324 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)) 325 .build(); 326 HTU.getAdmin().createTable(td); 327 Table table = HTU.getConnection().getTable(td.getTableName()); 328 // basic test: it should work. 329 Put p = new Put(row); 330 p.addColumn(f, row, row); 331 table.put(p); 332 333 Get g = new Get(row); 334 Result r = table.get(g); 335 Assert.assertFalse(r.isStale()); 336 337 // Add a CF, it should work. 338 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 339 td = TableDescriptorBuilder.newBuilder(td) 340 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)) 341 .build(); 342 HTU.getAdmin().disableTable(td.getTableName()); 343 HTU.getAdmin().modifyTable(td); 344 HTU.getAdmin().enableTable(td.getTableName()); 345 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 346 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 347 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 348 349 p = new Put(row); 350 p.addColumn(row, row, row); 351 table.put(p); 352 353 g = new Get(row); 354 r = table.get(g); 355 Assert.assertFalse(r.isStale()); 356 357 try { 358 SlowMeCopro.cdl.set(new CountDownLatch(1)); 359 g = new Get(row); 360 g.setConsistency(Consistency.TIMELINE); 361 r = table.get(g); 362 Assert.assertTrue(r.isStale()); 363 } finally { 364 SlowMeCopro.cdl.get().countDown(); 365 SlowMeCopro.sleepTime.set(0); 366 } 367 368 Admin admin = HTU.getAdmin(); 369 nHdt =admin.getDescriptor(td.getTableName()); 370 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 371 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 372 373 admin.disableTable(td.getTableName()); 374 admin.deleteTable(td.getTableName()); 375 admin.close(); 376 } 377 378 @SuppressWarnings("deprecation") 379 @Test 380 public void testReplicaAndReplication() throws Exception { 381 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 382 hdt.setRegionReplication(NB_SERVERS); 383 384 HColumnDescriptor fam = new HColumnDescriptor(row); 385 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 386 hdt.addFamily(fam); 387 388 hdt.addCoprocessor(SlowMeCopro.class.getName()); 389 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 390 391 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 392 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 393 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 394 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 395 396 HTU2 = new HBaseTestingUtility(conf2); 397 HTU2.setZkCluster(miniZK); 398 HTU2.startMiniCluster(NB_SERVERS); 399 LOG.info("Setup second Zk"); 400 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 401 402 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 403 404 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 405 rpc.setClusterKey(HTU2.getClusterKey()); 406 admin.addPeer("2", rpc, null); 407 admin.close(); 408 409 Put p = new Put(row); 410 p.addColumn(row, row, row); 411 final Table table = HTU.getConnection().getTable(hdt.getTableName()); 412 table.put(p); 413 414 HTU.getAdmin().flush(table.getName()); 415 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 416 417 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 418 @Override public boolean evaluate() throws Exception { 419 try { 420 SlowMeCopro.cdl.set(new CountDownLatch(1)); 421 Get g = new Get(row); 422 g.setConsistency(Consistency.TIMELINE); 423 Result r = table.get(g); 424 Assert.assertTrue(r.isStale()); 425 return !r.isEmpty(); 426 } finally { 427 SlowMeCopro.cdl.get().countDown(); 428 SlowMeCopro.sleepTime.set(0); 429 } 430 } 431 }); 432 table.close(); 433 LOG.info("stale get on the first cluster done. Now for the second."); 434 435 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); 436 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 437 @Override public boolean evaluate() throws Exception { 438 try { 439 SlowMeCopro.cdl.set(new CountDownLatch(1)); 440 Get g = new Get(row); 441 g.setConsistency(Consistency.TIMELINE); 442 Result r = table2.get(g); 443 Assert.assertTrue(r.isStale()); 444 return !r.isEmpty(); 445 } finally { 446 SlowMeCopro.cdl.get().countDown(); 447 SlowMeCopro.sleepTime.set(0); 448 } 449 } 450 }); 451 table2.close(); 452 453 HTU.getAdmin().disableTable(hdt.getTableName()); 454 HTU.deleteTable(hdt.getTableName()); 455 456 HTU2.getAdmin().disableTable(hdt.getTableName()); 457 HTU2.deleteTable(hdt.getTableName()); 458 459 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 460 // the minicluster has negative impact of deleting all HConnections in JVM. 461 } 462 463 @Test 464 public void testBulkLoad() throws IOException { 465 // Create table then get the single region for our new table. 466 LOG.debug("Creating test table"); 467 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 468 hdt.setRegionReplication(NB_SERVERS); 469 hdt.addCoprocessor(SlowMeCopro.class.getName()); 470 Table table = HTU.createTable(hdt, new byte[][]{f}, null); 471 472 // create hfiles to load. 473 LOG.debug("Creating test data"); 474 Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName()); 475 final int numRows = 10; 476 final byte[] qual = Bytes.toBytes("qual"); 477 final byte[] val = Bytes.toBytes("val"); 478 final List<Pair<byte[], String>> famPaths = new ArrayList<>(); 479 for (HColumnDescriptor col : hdt.getColumnFamilies()) { 480 Path hfile = new Path(dir, col.getNameAsString()); 481 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), 482 qual, val, numRows); 483 famPaths.add(new Pair<>(col.getName(), hfile.toString())); 484 } 485 486 // bulk load HFiles 487 LOG.debug("Loading test data"); 488 final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); 489 table = conn.getTable(hdt.getTableName()); 490 final String bulkToken = 491 new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); 492 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, 493 hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), 494 new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 495 @Override 496 protected Void rpcCall() throws Exception { 497 LOG.debug("Going to connect to server " + getLocation() + " for row " 498 + Bytes.toStringBinary(getRow())); 499 SecureBulkLoadClient secureClient = null; 500 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 501 try (Table table = conn.getTable(getTableName())) { 502 secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); 503 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 504 true, null, bulkToken); 505 } 506 return null; 507 } 508 }; 509 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); 510 RpcRetryingCaller<Void> caller = factory.newCaller(); 511 caller.callWithRetries(callable, 10000); 512 513 // verify we can read them from the primary 514 LOG.debug("Verifying data load"); 515 for (int i = 0; i < numRows; i++) { 516 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 517 Get g = new Get(row); 518 Result r = table.get(g); 519 Assert.assertFalse(r.isStale()); 520 } 521 522 // verify we can read them from the replica 523 LOG.debug("Verifying replica queries"); 524 try { 525 SlowMeCopro.cdl.set(new CountDownLatch(1)); 526 for (int i = 0; i < numRows; i++) { 527 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 528 Get g = new Get(row); 529 g.setConsistency(Consistency.TIMELINE); 530 Result r = table.get(g); 531 Assert.assertTrue(r.isStale()); 532 } 533 SlowMeCopro.cdl.get().countDown(); 534 } finally { 535 SlowMeCopro.cdl.get().countDown(); 536 SlowMeCopro.sleepTime.set(0); 537 } 538 539 HTU.getAdmin().disableTable(hdt.getTableName()); 540 HTU.deleteTable(hdt.getTableName()); 541 } 542 543 @Test 544 public void testReplicaGetWithPrimaryDown() throws IOException { 545 // Create table then get the single region for our new table. 546 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 547 hdt.setRegionReplication(NB_SERVERS); 548 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 549 try { 550 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 551 552 Put p = new Put(row); 553 p.addColumn(f, row, row); 554 table.put(p); 555 556 // Flush so it can be picked by the replica refresher thread 557 HTU.flush(table.getName()); 558 559 // Sleep for some time until data is picked up by replicas 560 try { 561 Thread.sleep(2 * REFRESH_PERIOD); 562 } catch (InterruptedException e1) { 563 LOG.error(e1.toString(), e1); 564 } 565 566 // But if we ask for stale we will get it 567 Get g = new Get(row); 568 g.setConsistency(Consistency.TIMELINE); 569 Result r = table.get(g); 570 Assert.assertTrue(r.isStale()); 571 } finally { 572 HTU.getAdmin().disableTable(hdt.getTableName()); 573 HTU.deleteTable(hdt.getTableName()); 574 } 575 } 576 577 @Test 578 public void testReplicaScanWithPrimaryDown() throws IOException { 579 // Create table then get the single region for our new table. 580 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 581 hdt.setRegionReplication(NB_SERVERS); 582 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 583 584 try { 585 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 586 587 Put p = new Put(row); 588 p.addColumn(f, row, row); 589 table.put(p); 590 591 // Flush so it can be picked by the replica refresher thread 592 HTU.flush(table.getName()); 593 594 // Sleep for some time until data is picked up by replicas 595 try { 596 Thread.sleep(2 * REFRESH_PERIOD); 597 } catch (InterruptedException e1) { 598 LOG.error(e1.toString(), e1); 599 } 600 601 // But if we ask for stale we will get it 602 // Instantiating the Scan class 603 Scan scan = new Scan(); 604 605 // Scanning the required columns 606 scan.addFamily(f); 607 scan.setConsistency(Consistency.TIMELINE); 608 609 // Getting the scan result 610 ResultScanner scanner = table.getScanner(scan); 611 612 Result r = scanner.next(); 613 614 Assert.assertTrue(r.isStale()); 615 } finally { 616 HTU.getAdmin().disableTable(hdt.getTableName()); 617 HTU.deleteTable(hdt.getTableName()); 618 } 619 } 620 621 @Test 622 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 623 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 624 HTU.getConfiguration().set( 625 "hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 626 // Create table then get the single region for our new table. 627 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 628 hdt.setRegionReplication(NB_SERVERS); 629 hdt.addCoprocessor(SlowMeCopro.class.getName()); 630 631 try { 632 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 633 634 Put p = new Put(row); 635 p.addColumn(f, row, row); 636 table.put(p); 637 638 // Flush so it can be picked by the replica refresher thread 639 HTU.flush(table.getName()); 640 641 // Sleep for some time until data is picked up by replicas 642 try { 643 Thread.sleep(2 * REFRESH_PERIOD); 644 } catch (InterruptedException e1) { 645 LOG.error(e1.toString(), e1); 646 } 647 648 try { 649 // Create the new connection so new config can kick in 650 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 651 Table t = connection.getTable(hdt.getTableName()); 652 653 // But if we ask for stale we will get it 654 SlowMeCopro.cdl.set(new CountDownLatch(1)); 655 Get g = new Get(row); 656 g.setConsistency(Consistency.TIMELINE); 657 Result r = t.get(g); 658 Assert.assertTrue(r.isStale()); 659 SlowMeCopro.cdl.get().countDown(); 660 } finally { 661 SlowMeCopro.cdl.get().countDown(); 662 SlowMeCopro.sleepTime.set(0); 663 } 664 } finally { 665 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 666 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 667 HTU.getAdmin().disableTable(hdt.getTableName()); 668 HTU.deleteTable(hdt.getTableName()); 669 } 670 } 671 672 // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table 673 // scan will always get the result from primary meta region as long as the result is returned 674 // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region. 675 @Test 676 public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException { 677 HTU.getAdmin().setBalancerRunning(false, true); 678 Configuration conf = new Configuration(HTU.getConfiguration()); 679 conf.setBoolean(USE_META_REPLICAS, true); 680 Connection conn = ConnectionFactory.createConnection(conf); 681 682 // Create table then get the single region for our new table. 683 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 684 hdt.setRegionReplication(2); 685 try { 686 687 HTU.createTable(hdt, new byte[][] { f }, null); 688 689 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; 690 691 // Get user table location, always get it from the primary meta replica 692 RegionLocations url = ((ClusterConnection)conn) 693 .locateRegion(hdt.getTableName(), row, false, false); 694 695 } finally { 696 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; 697 HTU.getAdmin().setBalancerRunning(true, true); 698 HTU.getAdmin().disableTable(hdt.getTableName()); 699 HTU.deleteTable(hdt.getTableName()); 700 } 701 } 702 703 704 // This test is to simulate the case that the meta region and the primary user region 705 // are down, hbase client is able to access user replica regions and return stale data. 706 // Meta replica is enabled to show the case that the meta replica region could be out of sync 707 // with the primary meta region. 708 @Test 709 public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { 710 HTU.getAdmin().setBalancerRunning(false, true); 711 712 Configuration conf = new Configuration(HTU.getConfiguration()); 713 conf.setBoolean(USE_META_REPLICAS, true); 714 Connection conn = ConnectionFactory.createConnection(conf); 715 716 // Create table then get the single region for our new table. 717 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 718 hdt.setRegionReplication(2); 719 try { 720 721 HTU.createTable(hdt, new byte[][] { f }, null); 722 Table table = conn.getTable(TableName.valueOf(name.getMethodName())); 723 724 // Get Meta location 725 RegionLocations mrl = ((ClusterConnection)conn) 726 .locateRegion(TableName.META_TABLE_NAME, 727 HConstants.EMPTY_START_ROW, false, false); 728 729 // Get user table location 730 RegionLocations url = ((ClusterConnection)conn) 731 .locateRegion(hdt.getTableName(), row, false, false); 732 733 // Make sure that user primary region is co-hosted with the meta region 734 if (!url.getDefaultRegionLocation().getServerName().equals( 735 mrl.getDefaultRegionLocation().getServerName())) { 736 HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), 737 mrl.getDefaultRegionLocation().getServerName()); 738 } 739 740 // Make sure that the user replica region is not hosted by the same region server with 741 // primary 742 if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation() 743 .getServerName())) { 744 HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), 745 url.getDefaultRegionLocation().getServerName()); 746 } 747 748 // Wait until the meta table is updated with new location info 749 while (true) { 750 mrl = ((ClusterConnection)conn) 751 .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); 752 753 // Get user table location 754 url = ((ClusterConnection)conn) 755 .locateRegion(hdt.getTableName(), row, false, true); 756 757 LOG.info("meta locations " + mrl); 758 LOG.info("table locations " + url); 759 ServerName a = url.getDefaultRegionLocation().getServerName(); 760 ServerName b = mrl.getDefaultRegionLocation().getServerName(); 761 if(a.equals(b)) { 762 break; 763 } else { 764 LOG.info("Waiting for new region info to be updated in meta table"); 765 Thread.sleep(100); 766 } 767 } 768 769 Put p = new Put(row); 770 p.addColumn(f, row, row); 771 table.put(p); 772 773 // Flush so it can be picked by the replica refresher thread 774 HTU.flush(table.getName()); 775 776 // Sleep for some time until data is picked up by replicas 777 try { 778 Thread.sleep(2 * REFRESH_PERIOD); 779 } catch (InterruptedException e1) { 780 LOG.error(e1.toString(), e1); 781 } 782 783 // Simulating the RS down 784 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; 785 786 // The first Get is supposed to succeed 787 Get g = new Get(row); 788 g.setConsistency(Consistency.TIMELINE); 789 Result r = table.get(g); 790 Assert.assertTrue(r.isStale()); 791 792 // The second Get will succeed as well 793 r = table.get(g); 794 Assert.assertTrue(r.isStale()); 795 } finally { 796 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; 797 HTU.getAdmin().setBalancerRunning(true, true); 798 HTU.getAdmin().disableTable(hdt.getTableName()); 799 HTU.deleteTable(hdt.getTableName()); 800 } 801 } 802}