001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.RegionLocations; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 045import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 053import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 060import org.junit.AfterClass; 061import org.junit.Assert; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ LargeTests.class, ClientTests.class }) 072public class TestReplicaWithCluster { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestReplicaWithCluster.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 079 080 private static final int NB_SERVERS = 3; 081 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); 082 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 083 084 // second minicluster used in testing of replication 085 private static HBaseTestingUtility HTU2; 086 private static final byte[] f = HConstants.CATALOG_FAMILY; 087 088 private final static int REFRESH_PERIOD = 1000; 089 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 090 091 @Rule 092 public TestName name = new TestName(); 093 094 /** 095 * This copro is used to synchronize the tests. 096 */ 097 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 098 static final AtomicLong sleepTime = new AtomicLong(0); 099 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 100 101 public SlowMeCopro() { 102 } 103 104 @Override 105 public Optional<RegionObserver> getRegionObserver() { 106 return Optional.of(this); 107 } 108 109 @Override 110 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 111 final List<Cell> results) throws IOException { 112 113 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 114 CountDownLatch latch = cdl.get(); 115 try { 116 if (sleepTime.get() > 0) { 117 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 118 Thread.sleep(sleepTime.get()); 119 } else if (latch.getCount() > 0) { 120 LOG.info("Waiting for the counterCountDownLatch"); 121 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 122 if (latch.getCount() > 0) { 123 throw new RuntimeException("Can't wait more"); 124 } 125 } 126 } catch (InterruptedException e1) { 127 LOG.error(e1.toString(), e1); 128 } 129 } else { 130 LOG.info("We're not the primary replicas."); 131 } 132 } 133 } 134 135 /** 136 * This copro is used to simulate region server down exception for Get and Scan 137 */ 138 @CoreCoprocessor 139 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 140 141 public RegionServerStoppedCopro() { 142 } 143 144 @Override 145 public Optional<RegionObserver> getRegionObserver() { 146 return Optional.of(this); 147 } 148 149 @Override 150 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 151 final List<Cell> results) throws IOException { 152 153 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 154 155 // Fail for the primary replica and replica 1 156 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 157 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 158 throw new RegionServerStoppedException( 159 "Server " + e.getEnvironment().getServerName() + " not running"); 160 } else { 161 LOG.info("We're replica region " + replicaId); 162 } 163 } 164 165 @Override 166 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 167 final Scan scan) throws IOException { 168 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 169 // Fail for the primary replica and replica 1 170 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 171 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 172 throw new RegionServerStoppedException( 173 "Server " + e.getEnvironment().getServerName() + " not running"); 174 } else { 175 LOG.info("We're replica region " + replicaId); 176 } 177 } 178 } 179 180 /** 181 * This copro is used to slow down the primary meta region scan a bit 182 */ 183 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 184 implements RegionCoprocessor, RegionObserver { 185 static boolean slowDownPrimaryMetaScan = false; 186 static boolean throwException = false; 187 188 @Override 189 public Optional<RegionObserver> getRegionObserver() { 190 return Optional.of(this); 191 } 192 193 @Override 194 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 195 final List<Cell> results) throws IOException { 196 197 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 198 199 // Fail for the primary replica, but not for meta 200 if (throwException) { 201 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 202 LOG.info("Get, throw Region Server Stopped Exceptoin for region " 203 + e.getEnvironment().getRegion().getRegionInfo()); 204 throw new RegionServerStoppedException( 205 "Server " + e.getEnvironment().getServerName() + " not running"); 206 } 207 } else { 208 LOG.info("Get, We're replica region " + replicaId); 209 } 210 } 211 212 @Override 213 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 214 final Scan scan) throws IOException { 215 216 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 217 218 // Slow down with the primary meta region scan 219 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 220 if (slowDownPrimaryMetaScan) { 221 LOG.info("Scan with primary meta region, slow down a bit"); 222 try { 223 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 224 } catch (InterruptedException ie) { 225 // Ingore 226 } 227 } 228 229 // Fail for the primary replica 230 if (throwException) { 231 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " 232 + e.getEnvironment().getRegion().getRegionInfo()); 233 234 throw new RegionServerStoppedException( 235 "Server " + e.getEnvironment().getServerName() + " not running"); 236 } else { 237 LOG.info("Scan, We're replica region " + replicaId); 238 } 239 } else { 240 LOG.info("Scan, We're replica region " + replicaId); 241 } 242 } 243 } 244 245 @BeforeClass 246 public static void beforeClass() throws Exception { 247 // enable store file refreshing 248 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 249 REFRESH_PERIOD); 250 251 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 252 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 253 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 254 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 255 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 256 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 257 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 258 259 // Wait for primary call longer so make sure that it will get exception from the primary call 260 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 261 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 262 263 // Make sure master does not host system tables. 264 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 265 266 // Set system coprocessor so it can be applied to meta regions 267 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 268 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 269 270 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 271 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 272 273 HTU.startMiniCluster(NB_SERVERS); 274 // Enable meta replica at server side 275 HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2); 276 277 HTU.getHBaseCluster().startMaster(); 278 } 279 280 @AfterClass 281 public static void afterClass() throws Exception { 282 if (HTU2 != null) HTU2.shutdownMiniCluster(); 283 HTU.shutdownMiniCluster(); 284 } 285 286 @Test 287 public void testCreateDeleteTable() throws IOException { 288 // Create table then get the single region for our new table. 289 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 290 hdt.setRegionReplication(NB_SERVERS); 291 hdt.addCoprocessor(SlowMeCopro.class.getName()); 292 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 293 294 Put p = new Put(row); 295 p.addColumn(f, row, row); 296 table.put(p); 297 298 Get g = new Get(row); 299 Result r = table.get(g); 300 Assert.assertFalse(r.isStale()); 301 302 try { 303 // But if we ask for stale we will get it 304 SlowMeCopro.cdl.set(new CountDownLatch(1)); 305 g = new Get(row); 306 g.setConsistency(Consistency.TIMELINE); 307 r = table.get(g); 308 Assert.assertTrue(r.isStale()); 309 SlowMeCopro.cdl.get().countDown(); 310 } finally { 311 SlowMeCopro.cdl.get().countDown(); 312 SlowMeCopro.sleepTime.set(0); 313 } 314 315 HTU.getAdmin().disableTable(hdt.getTableName()); 316 HTU.deleteTable(hdt.getTableName()); 317 } 318 319 @Test 320 public void testChangeTable() throws Exception { 321 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 322 .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName()) 323 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 324 HTU.getAdmin().createTable(td); 325 Table table = HTU.getConnection().getTable(td.getTableName()); 326 // basic test: it should work. 327 Put p = new Put(row); 328 p.addColumn(f, row, row); 329 table.put(p); 330 331 Get g = new Get(row); 332 Result r = table.get(g); 333 Assert.assertFalse(r.isStale()); 334 335 // Add a CF, it should work. 336 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 337 td = TableDescriptorBuilder.newBuilder(td) 338 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build(); 339 HTU.getAdmin().disableTable(td.getTableName()); 340 HTU.getAdmin().modifyTable(td); 341 HTU.getAdmin().enableTable(td.getTableName()); 342 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 343 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 344 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 345 346 p = new Put(row); 347 p.addColumn(row, row, row); 348 table.put(p); 349 350 g = new Get(row); 351 r = table.get(g); 352 Assert.assertFalse(r.isStale()); 353 354 try { 355 SlowMeCopro.cdl.set(new CountDownLatch(1)); 356 g = new Get(row); 357 g.setConsistency(Consistency.TIMELINE); 358 r = table.get(g); 359 Assert.assertTrue(r.isStale()); 360 } finally { 361 SlowMeCopro.cdl.get().countDown(); 362 SlowMeCopro.sleepTime.set(0); 363 } 364 365 Admin admin = HTU.getAdmin(); 366 nHdt = admin.getDescriptor(td.getTableName()); 367 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 368 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 369 370 admin.disableTable(td.getTableName()); 371 admin.deleteTable(td.getTableName()); 372 admin.close(); 373 } 374 375 @SuppressWarnings("deprecation") 376 @Test 377 public void testReplicaAndReplication() throws Exception { 378 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 379 hdt.setRegionReplication(NB_SERVERS); 380 381 HColumnDescriptor fam = new HColumnDescriptor(row); 382 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 383 hdt.addFamily(fam); 384 385 hdt.addCoprocessor(SlowMeCopro.class.getName()); 386 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 387 388 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 389 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 390 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 391 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 392 393 HTU2 = new HBaseTestingUtility(conf2); 394 HTU2.setZkCluster(miniZK); 395 HTU2.startMiniCluster(NB_SERVERS); 396 LOG.info("Setup second Zk"); 397 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 398 399 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 400 401 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 402 rpc.setClusterKey(HTU2.getClusterKey()); 403 admin.addPeer("2", rpc, null); 404 admin.close(); 405 406 Put p = new Put(row); 407 p.addColumn(row, row, row); 408 final Table table = HTU.getConnection().getTable(hdt.getTableName()); 409 table.put(p); 410 411 HTU.getAdmin().flush(table.getName()); 412 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 413 414 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 415 @Override 416 public boolean evaluate() throws Exception { 417 try { 418 SlowMeCopro.cdl.set(new CountDownLatch(1)); 419 Get g = new Get(row); 420 g.setConsistency(Consistency.TIMELINE); 421 Result r = table.get(g); 422 Assert.assertTrue(r.isStale()); 423 return !r.isEmpty(); 424 } finally { 425 SlowMeCopro.cdl.get().countDown(); 426 SlowMeCopro.sleepTime.set(0); 427 } 428 } 429 }); 430 table.close(); 431 LOG.info("stale get on the first cluster done. Now for the second."); 432 433 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); 434 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 435 @Override 436 public boolean evaluate() throws Exception { 437 try { 438 SlowMeCopro.cdl.set(new CountDownLatch(1)); 439 Get g = new Get(row); 440 g.setConsistency(Consistency.TIMELINE); 441 Result r = table2.get(g); 442 Assert.assertTrue(r.isStale()); 443 return !r.isEmpty(); 444 } finally { 445 SlowMeCopro.cdl.get().countDown(); 446 SlowMeCopro.sleepTime.set(0); 447 } 448 } 449 }); 450 table2.close(); 451 452 HTU.getAdmin().disableTable(hdt.getTableName()); 453 HTU.deleteTable(hdt.getTableName()); 454 455 HTU2.getAdmin().disableTable(hdt.getTableName()); 456 HTU2.deleteTable(hdt.getTableName()); 457 458 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 459 // the minicluster has negative impact of deleting all HConnections in JVM. 460 } 461 462 @Test 463 public void testBulkLoad() throws IOException { 464 // Create table then get the single region for our new table. 465 LOG.debug("Creating test table"); 466 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 467 hdt.setRegionReplication(NB_SERVERS); 468 hdt.addCoprocessor(SlowMeCopro.class.getName()); 469 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 470 471 // create hfiles to load. 472 LOG.debug("Creating test data"); 473 Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName()); 474 final int numRows = 10; 475 final byte[] qual = Bytes.toBytes("qual"); 476 final byte[] val = Bytes.toBytes("val"); 477 final List<Pair<byte[], String>> famPaths = new ArrayList<>(); 478 for (HColumnDescriptor col : hdt.getColumnFamilies()) { 479 Path hfile = new Path(dir, col.getNameAsString()); 480 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, 481 val, numRows); 482 famPaths.add(new Pair<>(col.getName(), hfile.toString())); 483 } 484 485 // bulk load HFiles 486 LOG.debug("Loading test data"); 487 final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); 488 table = conn.getTable(hdt.getTableName()); 489 final String bulkToken = 490 new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); 491 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, hdt.getTableName(), 492 TestHRegionServerBulkLoad.rowkey(0), 493 new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { 494 @Override 495 protected Void rpcCall() throws Exception { 496 LOG.debug("Going to connect to server " + getLocation() + " for row " 497 + Bytes.toStringBinary(getRow())); 498 SecureBulkLoadClient secureClient = null; 499 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 500 try (Table table = conn.getTable(getTableName())) { 501 secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); 502 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, bulkToken); 503 } 504 return null; 505 } 506 }; 507 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); 508 RpcRetryingCaller<Void> caller = factory.newCaller(); 509 caller.callWithRetries(callable, 10000); 510 511 // verify we can read them from the primary 512 LOG.debug("Verifying data load"); 513 for (int i = 0; i < numRows; i++) { 514 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 515 Get g = new Get(row); 516 Result r = table.get(g); 517 Assert.assertFalse(r.isStale()); 518 } 519 520 // verify we can read them from the replica 521 LOG.debug("Verifying replica queries"); 522 try { 523 SlowMeCopro.cdl.set(new CountDownLatch(1)); 524 for (int i = 0; i < numRows; i++) { 525 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 526 Get g = new Get(row); 527 g.setConsistency(Consistency.TIMELINE); 528 Result r = table.get(g); 529 Assert.assertTrue(r.isStale()); 530 } 531 SlowMeCopro.cdl.get().countDown(); 532 } finally { 533 SlowMeCopro.cdl.get().countDown(); 534 SlowMeCopro.sleepTime.set(0); 535 } 536 537 HTU.getAdmin().disableTable(hdt.getTableName()); 538 HTU.deleteTable(hdt.getTableName()); 539 } 540 541 @Test 542 public void testReplicaGetWithPrimaryDown() throws IOException { 543 // Create table then get the single region for our new table. 544 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 545 hdt.setRegionReplication(NB_SERVERS); 546 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 547 try { 548 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 549 550 Put p = new Put(row); 551 p.addColumn(f, row, row); 552 table.put(p); 553 554 // Flush so it can be picked by the replica refresher thread 555 HTU.flush(table.getName()); 556 557 // Sleep for some time until data is picked up by replicas 558 try { 559 Thread.sleep(2 * REFRESH_PERIOD); 560 } catch (InterruptedException e1) { 561 LOG.error(e1.toString(), e1); 562 } 563 564 // But if we ask for stale we will get it 565 Get g = new Get(row); 566 g.setConsistency(Consistency.TIMELINE); 567 Result r = table.get(g); 568 Assert.assertTrue(r.isStale()); 569 } finally { 570 HTU.getAdmin().disableTable(hdt.getTableName()); 571 HTU.deleteTable(hdt.getTableName()); 572 } 573 } 574 575 @Test 576 public void testReplicaScanWithPrimaryDown() throws IOException { 577 // Create table then get the single region for our new table. 578 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 579 hdt.setRegionReplication(NB_SERVERS); 580 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 581 582 try { 583 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 584 585 Put p = new Put(row); 586 p.addColumn(f, row, row); 587 table.put(p); 588 589 // Flush so it can be picked by the replica refresher thread 590 HTU.flush(table.getName()); 591 592 // Sleep for some time until data is picked up by replicas 593 try { 594 Thread.sleep(2 * REFRESH_PERIOD); 595 } catch (InterruptedException e1) { 596 LOG.error(e1.toString(), e1); 597 } 598 599 // But if we ask for stale we will get it 600 // Instantiating the Scan class 601 Scan scan = new Scan(); 602 603 // Scanning the required columns 604 scan.addFamily(f); 605 scan.setConsistency(Consistency.TIMELINE); 606 607 // Getting the scan result 608 ResultScanner scanner = table.getScanner(scan); 609 610 Result r = scanner.next(); 611 612 Assert.assertTrue(r.isStale()); 613 } finally { 614 HTU.getAdmin().disableTable(hdt.getTableName()); 615 HTU.deleteTable(hdt.getTableName()); 616 } 617 } 618 619 @Test 620 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 621 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 622 HTU.getConfiguration().set("hbase.rpc.client.impl", 623 "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 624 // Create table then get the single region for our new table. 625 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 626 hdt.setRegionReplication(NB_SERVERS); 627 hdt.addCoprocessor(SlowMeCopro.class.getName()); 628 629 try { 630 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 631 632 Put p = new Put(row); 633 p.addColumn(f, row, row); 634 table.put(p); 635 636 // Flush so it can be picked by the replica refresher thread 637 HTU.flush(table.getName()); 638 639 // Sleep for some time until data is picked up by replicas 640 try { 641 Thread.sleep(2 * REFRESH_PERIOD); 642 } catch (InterruptedException e1) { 643 LOG.error(e1.toString(), e1); 644 } 645 646 try { 647 // Create the new connection so new config can kick in 648 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 649 Table t = connection.getTable(hdt.getTableName()); 650 651 // But if we ask for stale we will get it 652 SlowMeCopro.cdl.set(new CountDownLatch(1)); 653 Get g = new Get(row); 654 g.setConsistency(Consistency.TIMELINE); 655 Result r = t.get(g); 656 Assert.assertTrue(r.isStale()); 657 SlowMeCopro.cdl.get().countDown(); 658 } finally { 659 SlowMeCopro.cdl.get().countDown(); 660 SlowMeCopro.sleepTime.set(0); 661 } 662 } finally { 663 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 664 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 665 HTU.getAdmin().disableTable(hdt.getTableName()); 666 HTU.deleteTable(hdt.getTableName()); 667 } 668 } 669 670 // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table 671 // scan will always get the result from primary meta region as long as the result is returned 672 // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region. 673 @Test 674 public void testGetRegionLocationFromPrimaryMetaRegion() 675 throws IOException, InterruptedException { 676 HTU.getAdmin().setBalancerRunning(false, true); 677 Configuration conf = new Configuration(HTU.getConfiguration()); 678 conf.setBoolean(USE_META_REPLICAS, true); 679 Connection conn = ConnectionFactory.createConnection(conf); 680 681 // Create table then get the single region for our new table. 682 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 683 hdt.setRegionReplication(2); 684 try { 685 686 HTU.createTable(hdt, new byte[][] { f }, null); 687 688 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; 689 690 // Get user table location, always get it from the primary meta replica 691 RegionLocations url = 692 ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false); 693 694 } finally { 695 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; 696 HTU.getAdmin().setBalancerRunning(true, true); 697 HTU.getAdmin().disableTable(hdt.getTableName()); 698 HTU.deleteTable(hdt.getTableName()); 699 } 700 } 701 702 // This test is to simulate the case that the meta region and the primary user region 703 // are down, hbase client is able to access user replica regions and return stale data. 704 // Meta replica is enabled to show the case that the meta replica region could be out of sync 705 // with the primary meta region. 706 @Test 707 public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { 708 HTU.getAdmin().setBalancerRunning(false, true); 709 710 Configuration conf = new Configuration(HTU.getConfiguration()); 711 conf.setBoolean(USE_META_REPLICAS, true); 712 Connection conn = ConnectionFactory.createConnection(conf); 713 714 // Create table then get the single region for our new table. 715 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 716 hdt.setRegionReplication(2); 717 try { 718 719 HTU.createTable(hdt, new byte[][] { f }, null); 720 Table table = conn.getTable(TableName.valueOf(name.getMethodName())); 721 722 // Get Meta location 723 RegionLocations mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME, 724 HConstants.EMPTY_START_ROW, false, false); 725 726 // Get user table location 727 RegionLocations url = 728 ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false); 729 730 // Make sure that user primary region is co-hosted with the meta region 731 if ( 732 !url.getDefaultRegionLocation().getServerName() 733 .equals(mrl.getDefaultRegionLocation().getServerName()) 734 ) { 735 HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), 736 mrl.getDefaultRegionLocation().getServerName()); 737 } 738 739 // Make sure that the user replica region is not hosted by the same region server with 740 // primary 741 if ( 742 url.getRegionLocation(1).getServerName() 743 .equals(mrl.getDefaultRegionLocation().getServerName()) 744 ) { 745 HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), 746 url.getDefaultRegionLocation().getServerName()); 747 } 748 749 // Wait until the meta table is updated with new location info 750 while (true) { 751 mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME, 752 HConstants.EMPTY_START_ROW, false, false); 753 754 // Get user table location 755 url = ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, true); 756 757 LOG.info("meta locations " + mrl); 758 LOG.info("table locations " + url); 759 ServerName a = url.getDefaultRegionLocation().getServerName(); 760 ServerName b = mrl.getDefaultRegionLocation().getServerName(); 761 if (a.equals(b)) { 762 break; 763 } else { 764 LOG.info("Waiting for new region info to be updated in meta table"); 765 Thread.sleep(100); 766 } 767 } 768 769 Put p = new Put(row); 770 p.addColumn(f, row, row); 771 table.put(p); 772 773 // Flush so it can be picked by the replica refresher thread 774 HTU.flush(table.getName()); 775 776 // Sleep for some time until data is picked up by replicas 777 try { 778 Thread.sleep(2 * REFRESH_PERIOD); 779 } catch (InterruptedException e1) { 780 LOG.error(e1.toString(), e1); 781 } 782 783 // Simulating the RS down 784 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; 785 786 // The first Get is supposed to succeed 787 Get g = new Get(row); 788 g.setConsistency(Consistency.TIMELINE); 789 Result r = table.get(g); 790 Assert.assertTrue(r.isStale()); 791 792 // The second Get will succeed as well 793 r = table.get(g); 794 Assert.assertTrue(r.isStale()); 795 } finally { 796 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; 797 HTU.getAdmin().setBalancerRunning(true, true); 798 HTU.getAdmin().disableTable(hdt.getTableName()); 799 HTU.deleteTable(hdt.getTableName()); 800 } 801 } 802}