001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.TreeMap; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 049import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 050import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 057import org.junit.jupiter.api.AfterAll; 058import org.junit.jupiter.api.BeforeAll; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Tag(LargeTests.TAG) 065@Tag(ClientTests.TAG) 066public class TestReplicaWithCluster { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 069 070 private static final int NB_SERVERS = 3; 071 private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName()); 072 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 073 074 // second minicluster used in testing of replication 075 private static HBaseTestingUtil HTU2; 076 private static final byte[] f = HConstants.CATALOG_FAMILY; 077 078 private final static int REFRESH_PERIOD = 1000; 079 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 080 081 /** 082 * This copro is used to synchronize the tests. 083 */ 084 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 085 static final AtomicLong sleepTime = new AtomicLong(0); 086 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 087 088 public SlowMeCopro() { 089 } 090 091 @Override 092 public Optional<RegionObserver> getRegionObserver() { 093 return Optional.of(this); 094 } 095 096 @Override 097 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 098 final Get get, final List<Cell> results) throws IOException { 099 100 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 101 CountDownLatch latch = cdl.get(); 102 try { 103 if (sleepTime.get() > 0) { 104 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 105 Thread.sleep(sleepTime.get()); 106 } else if (latch.getCount() > 0) { 107 LOG.info("Waiting for the counterCountDownLatch"); 108 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 109 if (latch.getCount() > 0) { 110 throw new RuntimeException("Can't wait more"); 111 } 112 } 113 } catch (InterruptedException e1) { 114 LOG.error(e1.toString(), e1); 115 } 116 } else { 117 LOG.info("We're not the primary replicas."); 118 } 119 } 120 } 121 122 /** 123 * This copro is used to simulate region server down exception for Get and Scan 124 */ 125 @CoreCoprocessor 126 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 127 128 public RegionServerStoppedCopro() { 129 } 130 131 @Override 132 public Optional<RegionObserver> getRegionObserver() { 133 return Optional.of(this); 134 } 135 136 @Override 137 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 138 final Get get, final List<Cell> results) throws IOException { 139 140 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 141 142 // Fail for the primary replica and replica 1 143 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 144 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 145 throw new RegionServerStoppedException( 146 "Server " + e.getEnvironment().getServerName() + " not running"); 147 } else { 148 LOG.info("We're replica region " + replicaId); 149 } 150 } 151 152 @Override 153 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 154 final Scan scan) throws IOException { 155 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 156 // Fail for the primary replica and replica 1 157 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 158 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 159 throw new RegionServerStoppedException( 160 "Server " + e.getEnvironment().getServerName() + " not running"); 161 } else { 162 LOG.info("We're replica region " + replicaId); 163 } 164 } 165 } 166 167 /** 168 * This copro is used to slow down the primary meta region scan a bit 169 */ 170 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 171 implements RegionCoprocessor, RegionObserver { 172 static boolean slowDownPrimaryMetaScan = false; 173 static boolean throwException = false; 174 175 @Override 176 public Optional<RegionObserver> getRegionObserver() { 177 return Optional.of(this); 178 } 179 180 @Override 181 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 182 final Get get, final List<Cell> results) throws IOException { 183 184 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 185 186 // Fail for the primary replica, but not for meta 187 if (throwException) { 188 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 189 LOG.info("Get, throw Region Server Stopped Exceptoin for region " 190 + e.getEnvironment().getRegion().getRegionInfo()); 191 throw new RegionServerStoppedException( 192 "Server " + e.getEnvironment().getServerName() + " not running"); 193 } 194 } else { 195 LOG.info("Get, We're replica region " + replicaId); 196 } 197 } 198 199 @Override 200 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 201 final Scan scan) throws IOException { 202 203 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 204 205 // Slow down with the primary meta region scan 206 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 207 if (slowDownPrimaryMetaScan) { 208 LOG.info("Scan with primary meta region, slow down a bit"); 209 try { 210 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 211 } catch (InterruptedException ie) { 212 // Ingore 213 } 214 } 215 216 // Fail for the primary replica 217 if (throwException) { 218 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " 219 + e.getEnvironment().getRegion().getRegionInfo()); 220 221 throw new RegionServerStoppedException( 222 "Server " + e.getEnvironment().getServerName() + " not running"); 223 } else { 224 LOG.info("Scan, We're replica region " + replicaId); 225 } 226 } else { 227 LOG.info("Scan, We're replica region " + replicaId); 228 } 229 } 230 } 231 232 @BeforeAll 233 public static void beforeClass() throws Exception { 234 // enable store file refreshing 235 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 236 REFRESH_PERIOD); 237 238 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 239 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 240 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 241 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 242 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 243 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 244 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 245 246 // Wait for primary call longer so make sure that it will get exception from the primary call 247 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 248 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 249 250 // Make sure master does not host system tables. 251 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 252 253 // Set system coprocessor so it can be applied to meta regions 254 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 255 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 256 257 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 258 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 259 260 HTU.startMiniCluster(NB_SERVERS); 261 // Enable meta replica at server side 262 HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2); 263 264 HTU.getHBaseCluster().startMaster(); 265 } 266 267 @AfterAll 268 public static void afterClass() throws Exception { 269 if (HTU2 != null) HTU2.shutdownMiniCluster(); 270 HTU.shutdownMiniCluster(); 271 } 272 273 @Test 274 public void testCreateDeleteTable() throws IOException { 275 // Create table then get the single region for our new table. 276 TableDescriptorBuilder builder = 277 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 278 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 279 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 280 builder.setRegionReplication(NB_SERVERS); 281 builder.setCoprocessor(SlowMeCopro.class.getName()); 282 TableDescriptor hdt = builder.build(); 283 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 284 285 Put p = new Put(row); 286 p.addColumn(f, row, row); 287 table.put(p); 288 289 Get g = new Get(row); 290 Result r = table.get(g); 291 assertFalse(r.isStale()); 292 293 try { 294 // But if we ask for stale we will get it 295 SlowMeCopro.cdl.set(new CountDownLatch(1)); 296 g = new Get(row); 297 g.setConsistency(Consistency.TIMELINE); 298 r = table.get(g); 299 assertTrue(r.isStale()); 300 SlowMeCopro.cdl.get().countDown(); 301 } finally { 302 SlowMeCopro.cdl.get().countDown(); 303 SlowMeCopro.sleepTime.set(0); 304 } 305 306 HTU.getAdmin().disableTable(hdt.getTableName()); 307 HTU.deleteTable(hdt.getTableName()); 308 } 309 310 @Test 311 public void testChangeTable() throws Exception { 312 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable")) 313 .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName()) 314 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 315 HTU.getAdmin().createTable(td); 316 Table table = HTU.getConnection().getTable(td.getTableName()); 317 // basic test: it should work. 318 Put p = new Put(row); 319 p.addColumn(f, row, row); 320 table.put(p); 321 322 Get g = new Get(row); 323 Result r = table.get(g); 324 assertFalse(r.isStale()); 325 326 // Add a CF, it should work. 327 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 328 td = TableDescriptorBuilder.newBuilder(td) 329 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build(); 330 HTU.getAdmin().disableTable(td.getTableName()); 331 HTU.getAdmin().modifyTable(td); 332 HTU.getAdmin().enableTable(td.getTableName()); 333 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 334 assertEquals(bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount(), 335 "fams=" + Arrays.toString(nHdt.getColumnFamilies())); 336 337 p = new Put(row); 338 p.addColumn(row, row, row); 339 table.put(p); 340 341 g = new Get(row); 342 r = table.get(g); 343 assertFalse(r.isStale()); 344 345 try { 346 SlowMeCopro.cdl.set(new CountDownLatch(1)); 347 g = new Get(row); 348 g.setConsistency(Consistency.TIMELINE); 349 r = table.get(g); 350 assertTrue(r.isStale()); 351 } finally { 352 SlowMeCopro.cdl.get().countDown(); 353 SlowMeCopro.sleepTime.set(0); 354 } 355 356 Admin admin = HTU.getAdmin(); 357 nHdt = admin.getDescriptor(td.getTableName()); 358 assertEquals(bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount(), 359 "fams=" + Arrays.toString(nHdt.getColumnFamilies())); 360 361 admin.disableTable(td.getTableName()); 362 admin.deleteTable(td.getTableName()); 363 admin.close(); 364 } 365 366 @SuppressWarnings("deprecation") 367 @Test 368 public void testReplicaAndReplication() throws Exception { 369 TableDescriptorBuilder builder = 370 HTU.createModifyableTableDescriptor("testReplicaAndReplication"); 371 builder.setRegionReplication(NB_SERVERS); 372 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row) 373 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 374 375 builder.setCoprocessor(SlowMeCopro.class.getName()); 376 TableDescriptor tableDescriptor = builder.build(); 377 HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 378 379 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 380 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 381 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 382 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 383 384 HTU2 = new HBaseTestingUtil(conf2); 385 HTU2.setZkCluster(miniZK); 386 HTU2.startMiniCluster(NB_SERVERS); 387 LOG.info("Setup second Zk"); 388 HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 389 390 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 391 Admin admin = connection.getAdmin()) { 392 ReplicationPeerConfig rpc = 393 ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getRpcConnnectionURI()).build(); 394 admin.addReplicationPeer("2", rpc); 395 } 396 397 Put p = new Put(row); 398 p.addColumn(row, row, row); 399 final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName()); 400 table.put(p); 401 402 HTU.getAdmin().flush(table.getName()); 403 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 404 405 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 406 @Override 407 public boolean evaluate() throws Exception { 408 try { 409 SlowMeCopro.cdl.set(new CountDownLatch(1)); 410 Get g = new Get(row); 411 g.setConsistency(Consistency.TIMELINE); 412 Result r = table.get(g); 413 assertTrue(r.isStale()); 414 return !r.isEmpty(); 415 } finally { 416 SlowMeCopro.cdl.get().countDown(); 417 SlowMeCopro.sleepTime.set(0); 418 } 419 } 420 }); 421 table.close(); 422 LOG.info("stale get on the first cluster done. Now for the second."); 423 424 final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName()); 425 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 426 @Override 427 public boolean evaluate() throws Exception { 428 try { 429 SlowMeCopro.cdl.set(new CountDownLatch(1)); 430 Get g = new Get(row); 431 g.setConsistency(Consistency.TIMELINE); 432 Result r = table2.get(g); 433 assertTrue(r.isStale()); 434 return !r.isEmpty(); 435 } finally { 436 SlowMeCopro.cdl.get().countDown(); 437 SlowMeCopro.sleepTime.set(0); 438 } 439 } 440 }); 441 table2.close(); 442 443 HTU.getAdmin().disableTable(tableDescriptor.getTableName()); 444 HTU.deleteTable(tableDescriptor.getTableName()); 445 446 HTU2.getAdmin().disableTable(tableDescriptor.getTableName()); 447 HTU2.deleteTable(tableDescriptor.getTableName()); 448 449 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 450 // the minicluster has negative impact of deleting all HConnections in JVM. 451 } 452 453 @Test 454 public void testBulkLoad() throws IOException { 455 // Create table then get the single region for our new table. 456 LOG.debug("Creating test table"); 457 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor( 458 TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, 459 HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 460 builder.setRegionReplication(NB_SERVERS); 461 builder.setCoprocessor(SlowMeCopro.class.getName()); 462 TableDescriptor hdt = builder.build(); 463 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 464 465 // create hfiles to load. 466 LOG.debug("Creating test data"); 467 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad"); 468 final int numRows = 10; 469 final byte[] qual = Bytes.toBytes("qual"); 470 final byte[] val = Bytes.toBytes("val"); 471 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR); 472 for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) { 473 Path hfile = new Path(dir, col.getNameAsString()); 474 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, 475 val, numRows); 476 family2Files.put(col.getName(), Collections.singletonList(hfile)); 477 } 478 479 // bulk load HFiles 480 LOG.debug("Loading test data"); 481 BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files); 482 483 // verify we can read them from the primary 484 LOG.debug("Verifying data load"); 485 for (int i = 0; i < numRows; i++) { 486 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 487 Get g = new Get(row); 488 Result r = table.get(g); 489 assertFalse(r.isStale()); 490 } 491 492 // verify we can read them from the replica 493 LOG.debug("Verifying replica queries"); 494 try { 495 SlowMeCopro.cdl.set(new CountDownLatch(1)); 496 for (int i = 0; i < numRows; i++) { 497 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 498 Get g = new Get(row); 499 g.setConsistency(Consistency.TIMELINE); 500 Result r = table.get(g); 501 assertTrue(r.isStale()); 502 } 503 SlowMeCopro.cdl.get().countDown(); 504 } finally { 505 SlowMeCopro.cdl.get().countDown(); 506 SlowMeCopro.sleepTime.set(0); 507 } 508 509 HTU.getAdmin().disableTable(hdt.getTableName()); 510 HTU.deleteTable(hdt.getTableName()); 511 } 512 513 @Test 514 public void testReplicaGetWithPrimaryDown() throws IOException { 515 // Create table then get the single region for our new table. 516 TableDescriptorBuilder builder = 517 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 518 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 519 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 520 builder.setRegionReplication(NB_SERVERS); 521 builder.setCoprocessor(RegionServerStoppedCopro.class.getName()); 522 TableDescriptor hdt = builder.build(); 523 try { 524 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 525 526 Put p = new Put(row); 527 p.addColumn(f, row, row); 528 table.put(p); 529 530 // Flush so it can be picked by the replica refresher thread 531 HTU.flush(table.getName()); 532 533 // Sleep for some time until data is picked up by replicas 534 try { 535 Thread.sleep(2 * REFRESH_PERIOD); 536 } catch (InterruptedException e1) { 537 LOG.error(e1.toString(), e1); 538 } 539 540 // But if we ask for stale we will get it 541 Get g = new Get(row); 542 g.setConsistency(Consistency.TIMELINE); 543 Result r = table.get(g); 544 assertTrue(r.isStale()); 545 } finally { 546 HTU.getAdmin().disableTable(hdt.getTableName()); 547 HTU.deleteTable(hdt.getTableName()); 548 } 549 } 550 551 @Test 552 public void testReplicaScanWithPrimaryDown() throws IOException { 553 // Create table then get the single region for our new table. 554 TableDescriptorBuilder builder = 555 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 556 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 557 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 558 builder.setRegionReplication(NB_SERVERS); 559 builder.setCoprocessor(RegionServerStoppedCopro.class.getName()); 560 TableDescriptor hdt = builder.build(); 561 try { 562 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 563 564 Put p = new Put(row); 565 p.addColumn(f, row, row); 566 table.put(p); 567 568 // Flush so it can be picked by the replica refresher thread 569 HTU.flush(table.getName()); 570 571 // Sleep for some time until data is picked up by replicas 572 try { 573 Thread.sleep(2 * REFRESH_PERIOD); 574 } catch (InterruptedException e1) { 575 LOG.error(e1.toString(), e1); 576 } 577 578 // But if we ask for stale we will get it 579 // Instantiating the Scan class 580 Scan scan = new Scan(); 581 582 // Scanning the required columns 583 scan.addFamily(f); 584 scan.setConsistency(Consistency.TIMELINE); 585 586 // Getting the scan result 587 ResultScanner scanner = table.getScanner(scan); 588 589 Result r = scanner.next(); 590 591 assertTrue(r.isStale()); 592 } finally { 593 HTU.getAdmin().disableTable(hdt.getTableName()); 594 HTU.deleteTable(hdt.getTableName()); 595 } 596 } 597 598 @Test 599 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 600 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 601 HTU.getConfiguration().set("hbase.rpc.client.impl", 602 "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 603 // Create table then get the single region for our new table. 604 TableDescriptorBuilder builder = 605 HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"), 606 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 607 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 608 builder.setRegionReplication(NB_SERVERS); 609 builder.setCoprocessor(SlowMeCopro.class.getName()); 610 TableDescriptor hdt = builder.build(); 611 try { 612 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 613 614 Put p = new Put(row); 615 p.addColumn(f, row, row); 616 table.put(p); 617 618 // Flush so it can be picked by the replica refresher thread 619 HTU.flush(table.getName()); 620 621 // Sleep for some time until data is picked up by replicas 622 try { 623 Thread.sleep(2 * REFRESH_PERIOD); 624 } catch (InterruptedException e1) { 625 LOG.error(e1.toString(), e1); 626 } 627 628 try { 629 // Create the new connection so new config can kick in 630 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 631 Table t = connection.getTable(hdt.getTableName()); 632 633 // But if we ask for stale we will get it 634 SlowMeCopro.cdl.set(new CountDownLatch(1)); 635 Get g = new Get(row); 636 g.setConsistency(Consistency.TIMELINE); 637 Result r = t.get(g); 638 assertTrue(r.isStale()); 639 SlowMeCopro.cdl.get().countDown(); 640 } finally { 641 SlowMeCopro.cdl.get().countDown(); 642 SlowMeCopro.sleepTime.set(0); 643 } 644 } finally { 645 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 646 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 647 HTU.getAdmin().disableTable(hdt.getTableName()); 648 HTU.deleteTable(hdt.getTableName()); 649 } 650 } 651}