001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import com.codahale.metrics.Counter; 025import java.io.IOException; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.RegionLocations; 045import org.apache.hadoop.hbase.StartMiniClusterOption; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.RegionObserver; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.InternalScanner; 053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.zookeeper.KeeperException; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 072 073/** 074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 075 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 076 */ 077@Category({LargeTests.class, ClientTests.class}) 078public class TestReplicasClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestReplicasClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 085 086 private static TableName TABLE_NAME; 087 private Table table = null; 088 private static final byte[] row = TestReplicasClient.class.getName().getBytes(); 089 090 private static RegionInfo hriPrimary; 091 private static RegionInfo hriSecondary; 092 093 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 094 private static final byte[] f = HConstants.CATALOG_FAMILY; 095 096 private final static int REFRESH_PERIOD = 1000; 097 098 /** 099 * This copro is used to synchronize the tests. 100 */ 101 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 102 static final AtomicInteger primaryCountOfScan = new AtomicInteger(0); 103 static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0); 104 static final AtomicLong sleepTime = new AtomicLong(0); 105 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 106 static final AtomicInteger countOfNext = new AtomicInteger(0); 107 private static final AtomicReference<CountDownLatch> primaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 private static final AtomicReference<CountDownLatch> secondaryCdl = 110 new AtomicReference<>(new CountDownLatch(0)); 111 public SlowMeCopro() { 112 } 113 114 @Override 115 public Optional<RegionObserver> getRegionObserver() { 116 return Optional.of(this); 117 } 118 119 @Override 120 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 121 final List<Cell> results) throws IOException { 122 slowdownCode(e); 123 } 124 125 private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) { 126 LOG.info("==========scan {} ", e.getEnvironment().getRegion().getRegionInfo().getReplicaId(), 127 new Exception()); 128 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 129 primaryCountOfScan.incrementAndGet(); 130 } else { 131 secondaryCountOfScan.incrementAndGet(); 132 } 133 } 134 135 @Override 136 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 137 final Scan scan) throws IOException { 138 incrementScanCount(e); 139 slowdownCode(e); 140 } 141 142 @Override 143 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 144 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 145 throws IOException { 146 incrementScanCount(e); 147 // this will slow down a certain next operation if the conditions are met. The slowness 148 // will allow the call to go to a replica 149 if (slowDownNext.get()) { 150 // have some "next" return successfully from the primary; hence countOfNext checked 151 if (countOfNext.incrementAndGet() == 2) { 152 sleepTime.set(2000); 153 slowdownCode(e); 154 } 155 } 156 return true; 157 } 158 159 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 160 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 161 LOG.info("We're the primary replicas."); 162 CountDownLatch latch = getPrimaryCdl().get(); 163 try { 164 if (sleepTime.get() > 0) { 165 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 166 Thread.sleep(sleepTime.get()); 167 } else if (latch.getCount() > 0) { 168 LOG.info("Waiting for the counterCountDownLatch"); 169 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 170 if (latch.getCount() > 0) { 171 throw new RuntimeException("Can't wait more"); 172 } 173 } 174 } catch (InterruptedException e1) { 175 LOG.error(e1.toString(), e1); 176 } 177 } else { 178 LOG.info("We're not the primary replicas."); 179 CountDownLatch latch = getSecondaryCdl().get(); 180 try { 181 if (latch.getCount() > 0) { 182 LOG.info("Waiting for the secondary counterCountDownLatch"); 183 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 184 if (latch.getCount() > 0) { 185 throw new RuntimeException("Can't wait more"); 186 } 187 } 188 } catch (InterruptedException e1) { 189 LOG.error(e1.toString(), e1); 190 } 191 } 192 } 193 194 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 195 return primaryCdl; 196 } 197 198 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 199 return secondaryCdl; 200 } 201 } 202 203 @BeforeClass 204 public static void beforeClass() throws Exception { 205 // enable store file refreshing 206 HTU.getConfiguration().setInt( 207 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); 208 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 209 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 210 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); 211 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1). 212 numAlwaysStandByMasters(1).numMasters(1).build(); 213 HTU.startMiniCluster(option); 214 215 // Create table then get the single region for our new table. 216 TABLE_NAME = TableName.valueOf(TestReplicasClient.class.getSimpleName()); 217 HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME); 218 hdt.addCoprocessor(SlowMeCopro.class.getName()); 219 HTU.createTable(hdt, new byte[][]{f}, null); 220 221 try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) { 222 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 223 } 224 225 // mock a secondary region info to open 226 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 227 228 // No master 229 LOG.info("Master is going to be stopped"); 230 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 231 Configuration c = new Configuration(HTU.getConfiguration()); 232 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 233 LOG.info("Master has stopped"); 234 } 235 236 @AfterClass 237 public static void afterClass() throws Exception { 238 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 239 HTU.shutdownMiniCluster(); 240 } 241 242 @Before 243 public void before() throws IOException { 244 try { 245 openRegion(hriPrimary); 246 } catch (Exception ignored) { 247 } 248 try { 249 openRegion(hriSecondary); 250 } catch (Exception ignored) { 251 } 252 SlowMeCopro.slowDownNext.set(false); 253 SlowMeCopro.sleepTime.set(0); 254 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 255 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0)); 256 table = HTU.getConnection().getTable(TABLE_NAME); 257 try (ResultScanner scanner = table.getScanner(new Scan())) { 258 for (;;) { 259 Result result = scanner.next(); 260 if (result == null) { 261 break; 262 } 263 table.delete(new Delete(result.getRow())); 264 } 265 } 266 flushRegion(hriPrimary); 267 HTU.getConnection().clearRegionLocationCache(); 268 SlowMeCopro.primaryCountOfScan.set(0); 269 SlowMeCopro.secondaryCountOfScan.set(0); 270 SlowMeCopro.countOfNext.set(0); 271 } 272 273 @After 274 public void after() throws IOException, KeeperException { 275 SlowMeCopro.getPrimaryCdl().get().countDown(); 276 SlowMeCopro.getSecondaryCdl().get().countDown(); 277 try { 278 closeRegion(hriSecondary); 279 } catch (Exception ignored) { 280 } 281 try { 282 closeRegion(hriPrimary); 283 } catch (Exception ignored) { 284 } 285 if (table != null) { 286 table.close(); 287 } 288 HTU.getConnection().clearRegionLocationCache(); 289 } 290 291 private HRegionServer getRS() { 292 return HTU.getMiniHBaseCluster().getRegionServer(0); 293 } 294 295 private void openRegion(RegionInfo hri) throws Exception { 296 try { 297 if (isRegionOpened(hri)) { 298 return; 299 } 300 } catch (Exception e) { 301 } 302 // first version is '0' 303 AdminProtos.OpenRegionRequest orr = 304 RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null); 305 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 306 assertEquals(1, responseOpen.getOpeningStateCount()); 307 assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 308 responseOpen.getOpeningState(0)); 309 checkRegionIsOpened(hri); 310 } 311 312 private void closeRegion(RegionInfo hri) throws Exception { 313 AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest( 314 getRS().getServerName(), hri.getRegionName()); 315 AdminProtos.CloseRegionResponse responseClose = getRS() 316 .getRSRpcServices().closeRegion(null, crr); 317 assertTrue(responseClose.getClosed()); 318 319 checkRegionIsClosed(hri.getEncodedName()); 320 } 321 322 private void checkRegionIsOpened(RegionInfo hri) throws Exception { 323 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 324 Thread.sleep(1); 325 } 326 } 327 328 private boolean isRegionOpened(RegionInfo hri) throws Exception { 329 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 330 } 331 332 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 333 334 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 335 Thread.sleep(1); 336 } 337 338 try { 339 assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 340 } catch (NotServingRegionException expected) { 341 // That's how it work: if the region is closed we have an exception. 342 } 343 344 // We don't delete the znode here, because there is not always a znode. 345 } 346 347 private void flushRegion(RegionInfo regionInfo) throws IOException { 348 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 349 } 350 351 @Test 352 public void testUseRegionWithoutReplica() throws Exception { 353 byte[] b1 = "testUseRegionWithoutReplica".getBytes(); 354 Get g = new Get(b1); 355 Result r = table.get(g); 356 assertFalse(r.isStale()); 357 } 358 359 @Test 360 public void testLocations() throws Exception { 361 byte[] b1 = "testLocations".getBytes(); 362 ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection(); 363 hc.clearRegionLocationCache(); 364 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); 365 assertEquals(2, rl.size()); 366 367 rl = hc.locateRegion(table.getName(), b1, true, false); 368 assertEquals(2, rl.size()); 369 370 hc.clearRegionLocationCache(); 371 rl = hc.locateRegion(table.getName(), b1, true, false); 372 assertEquals(2, rl.size()); 373 374 rl = hc.locateRegion(table.getName(), b1, false, false); 375 assertEquals(2, rl.size()); 376 } 377 378 @Test 379 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 380 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes(); 381 // A get works and is not stale 382 Get g = new Get(b1); 383 Result r = table.get(g); 384 assertFalse(r.isStale()); 385 } 386 387 @Test 388 public void testGetNoResultStaleRegionWithReplica() throws Exception { 389 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); 390 openRegion(hriSecondary); 391 392 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 393 Get g = new Get(b1); 394 g.setConsistency(Consistency.TIMELINE); 395 Result r = table.get(g); 396 assertTrue(r.isStale()); 397 } 398 399 @Test 400 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 401 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes(); 402 // We sleep; but we won't go to the stale region as we don't get the stale by default. 403 SlowMeCopro.sleepTime.set(2000); 404 Get g = new Get(b1); 405 Result r = table.get(g); 406 assertFalse(r.isStale()); 407 } 408 409 @Test 410 public void testFlushTable() throws Exception { 411 flushRegion(hriPrimary); 412 flushRegion(hriSecondary); 413 414 Put p = new Put(row); 415 p.addColumn(f, row, row); 416 table.put(p); 417 418 flushRegion(hriPrimary); 419 flushRegion(hriSecondary); 420 } 421 422 @Test 423 public void testFlushPrimary() throws Exception { 424 flushRegion(hriPrimary); 425 426 Put p = new Put(row); 427 p.addColumn(f, row, row); 428 table.put(p); 429 430 flushRegion(hriPrimary); 431 } 432 433 @Test 434 public void testFlushSecondary() throws Exception { 435 flushRegion(hriSecondary); 436 437 Put p = new Put(row); 438 p.addColumn(f, row, row); 439 table.put(p); 440 441 flushRegion(hriSecondary); 442 } 443 444 @Test 445 public void testUseRegionWithReplica() throws Exception { 446 byte[] b1 = "testUseRegionWithReplica".getBytes(); 447 // A simple put works, even if there here a second replica 448 Put p = new Put(b1); 449 p.addColumn(f, b1, b1); 450 table.put(p); 451 LOG.info("Put done"); 452 453 // A get works and is not stale 454 Get g = new Get(b1); 455 Result r = table.get(g); 456 assertFalse(r.isStale()); 457 assertFalse(r.getColumnCells(f, b1).isEmpty()); 458 LOG.info("get works and is not stale done"); 459 460 // Even if it we have to wait a little on the main region 461 SlowMeCopro.sleepTime.set(2000); 462 g = new Get(b1); 463 r = table.get(g); 464 assertFalse(r.isStale()); 465 assertFalse(r.getColumnCells(f, b1).isEmpty()); 466 SlowMeCopro.sleepTime.set(0); 467 LOG.info("sleep and is not stale done"); 468 469 // But if we ask for stale we will get it 470 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 471 g = new Get(b1); 472 g.setConsistency(Consistency.TIMELINE); 473 r = table.get(g); 474 assertTrue(r.isStale()); 475 assertTrue(r.getColumnCells(f, b1).isEmpty()); 476 SlowMeCopro.getPrimaryCdl().get().countDown(); 477 478 LOG.info("stale done"); 479 480 // exists works and is not stale 481 g = new Get(b1); 482 g.setCheckExistenceOnly(true); 483 r = table.get(g); 484 assertFalse(r.isStale()); 485 assertTrue(r.getExists()); 486 LOG.info("exists not stale done"); 487 488 // exists works on stale but don't see the put 489 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 490 g = new Get(b1); 491 g.setCheckExistenceOnly(true); 492 g.setConsistency(Consistency.TIMELINE); 493 r = table.get(g); 494 assertTrue(r.isStale()); 495 assertFalse("The secondary has stale data", r.getExists()); 496 SlowMeCopro.getPrimaryCdl().get().countDown(); 497 LOG.info("exists stale before flush done"); 498 499 flushRegion(hriPrimary); 500 flushRegion(hriSecondary); 501 LOG.info("flush done"); 502 Thread.sleep(1000 + REFRESH_PERIOD * 2); 503 504 // get works and is not stale 505 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 506 g = new Get(b1); 507 g.setConsistency(Consistency.TIMELINE); 508 r = table.get(g); 509 assertTrue(r.isStale()); 510 assertFalse(r.isEmpty()); 511 SlowMeCopro.getPrimaryCdl().get().countDown(); 512 LOG.info("stale done"); 513 514 // exists works on stale and we see the put after the flush 515 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 516 g = new Get(b1); 517 g.setCheckExistenceOnly(true); 518 g.setConsistency(Consistency.TIMELINE); 519 r = table.get(g); 520 assertTrue(r.isStale()); 521 assertTrue(r.getExists()); 522 SlowMeCopro.getPrimaryCdl().get().countDown(); 523 LOG.info("exists stale after flush done"); 524 } 525 526 @Test 527 public void testHedgedRead() throws Exception { 528 byte[] b1 = "testHedgedRead".getBytes(); 529 // A simple put works, even if there here a second replica 530 Put p = new Put(b1); 531 p.addColumn(f, b1, b1); 532 table.put(p); 533 LOG.info("Put done"); 534 535 // A get works and is not stale 536 Get g = new Get(b1); 537 Result r = table.get(g); 538 assertFalse(r.isStale()); 539 assertFalse(r.getColumnCells(f, b1).isEmpty()); 540 LOG.info("get works and is not stale done"); 541 542 // reset 543 ClusterConnection connection = (ClusterConnection) HTU.getConnection(); 544 Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps; 545 Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin; 546 hedgedReadOps.dec(hedgedReadOps.getCount()); 547 hedgedReadWin.dec(hedgedReadWin.getCount()); 548 549 // Wait a little on the main region, just enough to happen once hedged read 550 // and hedged read did not returned faster 551 int primaryCallTimeoutMicroSecond = 552 connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond(); 553 SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond)); 554 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 555 g = new Get(b1); 556 g.setConsistency(Consistency.TIMELINE); 557 r = table.get(g); 558 assertFalse(r.isStale()); 559 assertFalse(r.getColumnCells(f, b1).isEmpty()); 560 assertEquals(1, hedgedReadOps.getCount()); 561 assertEquals(0, hedgedReadWin.getCount()); 562 SlowMeCopro.sleepTime.set(0); 563 SlowMeCopro.getSecondaryCdl().get().countDown(); 564 LOG.info("hedged read occurred but not faster"); 565 566 // But if we ask for stale we will get it and hedged read returned faster 567 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 568 g = new Get(b1); 569 g.setConsistency(Consistency.TIMELINE); 570 r = table.get(g); 571 assertTrue(r.isStale()); 572 assertTrue(r.getColumnCells(f, b1).isEmpty()); 573 assertEquals(2, hedgedReadOps.getCount()); 574 assertEquals(1, hedgedReadWin.getCount()); 575 SlowMeCopro.getPrimaryCdl().get().countDown(); 576 LOG.info("hedged read occurred and faster"); 577 } 578 579 @Test 580 public void testScanWithReplicas() throws Exception { 581 //simple scan 582 runMultipleScansOfOneType(false, false); 583 } 584 585 @Test 586 public void testSmallScanWithReplicas() throws Exception { 587 //small scan 588 runMultipleScansOfOneType(false, true); 589 } 590 591 @Test 592 public void testReverseScanWithReplicas() throws Exception { 593 //reverse scan 594 runMultipleScansOfOneType(true, false); 595 } 596 597 @Test 598 public void testCancelOfScan() throws Exception { 599 int numRows = 100; 600 for (int i = 0; i < numRows; i++) { 601 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 602 Put p = new Put(b1); 603 p.addColumn(f, b1, b1); 604 table.put(p); 605 } 606 LOG.debug("PUT done"); 607 int caching = 20; 608 byte[] start; 609 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 610 611 flushRegion(hriPrimary); 612 LOG.info("flush done"); 613 Thread.sleep(1000 + REFRESH_PERIOD * 2); 614 615 // now make some 'next' calls slow 616 SlowMeCopro.slowDownNext.set(true); 617 SlowMeCopro.countOfNext.set(0); 618 SlowMeCopro.sleepTime.set(5000); 619 620 Scan scan = new Scan().withStartRow(start); 621 scan.setCaching(caching); 622 scan.setConsistency(Consistency.TIMELINE); 623 ResultScanner scanner = table.getScanner(scan); 624 Iterator<Result> iter = scanner.iterator(); 625 iter.next(); 626 assertTrue(((ClientScanner) scanner).isAnyRPCcancelled()); 627 SlowMeCopro.slowDownNext.set(false); 628 SlowMeCopro.countOfNext.set(0); 629 } 630 631 // make sure the scan will only go to the specific replica 632 @Test 633 public void testScanOnSpecificReplica() throws Exception { 634 Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE); 635 try (ResultScanner scanner = table.getScanner(scan)) { 636 scanner.next(); 637 } 638 assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0); 639 assertEquals(0, SlowMeCopro.primaryCountOfScan.get()); 640 } 641 642 // make sure the scan will only go to the specific replica 643 @Test 644 public void testReverseScanOnSpecificReplica() throws Exception { 645 Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE); 646 try (ResultScanner scanner = table.getScanner(scan)) { 647 scanner.next(); 648 } 649 assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0); 650 assertEquals(0, SlowMeCopro.primaryCountOfScan.get()); 651 } 652 653 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception { 654 int numRows = 100; 655 int numCols = 10; 656 for (int i = 0; i < numRows; i++) { 657 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 658 for (int col = 0; col < numCols; col++) { 659 Put p = new Put(b1); 660 String qualifier = "qualifer" + col; 661 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes()); 662 p.add(kv); 663 table.put(p); 664 } 665 } 666 LOG.debug("PUT done"); 667 int caching = 20; 668 long maxResultSize = Long.MAX_VALUE; 669 670 byte[] start; 671 if (reversed) { 672 start = Bytes.toBytes("testUseRegionWithReplica" + (numRows - 1)); 673 } else { 674 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 675 } 676 677 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 678 numCols, false, false); 679 680 // Even if we were to slow the server down, unless we ask for stale 681 // we won't get it 682 SlowMeCopro.sleepTime.set(5000); 683 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, 684 numCols, false, false); 685 SlowMeCopro.sleepTime.set(0); 686 687 flushRegion(hriPrimary); 688 LOG.info("flush done"); 689 Thread.sleep(1000 + REFRESH_PERIOD * 2); 690 691 // Now set the flag to get a response even if stale 692 SlowMeCopro.sleepTime.set(5000); 693 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 694 numCols, true, false); 695 SlowMeCopro.sleepTime.set(0); 696 697 // now make some 'next' calls slow 698 SlowMeCopro.slowDownNext.set(true); 699 SlowMeCopro.countOfNext.set(0); 700 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 701 numCols, true, true); 702 SlowMeCopro.slowDownNext.set(false); 703 SlowMeCopro.countOfNext.set(0); 704 705 // Make sure we do not get stale data.. 706 SlowMeCopro.sleepTime.set(5000); 707 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, 708 numCols, false, false); 709 SlowMeCopro.sleepTime.set(0); 710 711 // While the next calls are slow, set maxResultSize to 1 so that some partial results will be 712 // returned from the server before the replica switch occurs. 713 maxResultSize = 1; 714 SlowMeCopro.slowDownNext.set(true); 715 SlowMeCopro.countOfNext.set(0); 716 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 717 numCols, true, true); 718 maxResultSize = Long.MAX_VALUE; 719 SlowMeCopro.slowDownNext.set(false); 720 SlowMeCopro.countOfNext.set(0); 721 } 722 723 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency, 724 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols, 725 boolean staleExpected, boolean slowNext) 726 throws Exception { 727 Scan scan = new Scan().withStartRow(startRow); 728 scan.setCaching(caching); 729 scan.setMaxResultSize(maxResultSize); 730 scan.setReversed(reversed); 731 scan.setSmall(small); 732 scan.setConsistency(consistency); 733 ResultScanner scanner = table.getScanner(scan); 734 Iterator<Result> iter = scanner.iterator(); 735 736 // Maps of row keys that we have seen so far 737 HashMap<String, Boolean> map = new HashMap<>(); 738 739 // Tracked metrics 740 int rowCount = 0; 741 int cellCount = 0; 742 int countOfStale = 0; 743 744 while (iter.hasNext()) { 745 rowCount++; 746 Result r = iter.next(); 747 String row = new String(r.getRow()); 748 749 if (map.containsKey(row)) { 750 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow())); 751 } 752 753 map.put(row, true); 754 cellCount += r.rawCells().length; 755 756 if (!slowNext) { 757 assertTrue(r.isStale() == staleExpected); 758 } 759 if (r.isStale()) { 760 countOfStale++; 761 } 762 } 763 assertTrue("Count of rows " + rowCount + " num rows expected " + numRows, 764 rowCount == numRows); 765 assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols, 766 cellCount == (numRows * numCols)); 767 768 if (slowNext) { 769 LOG.debug("Count of Stale " + countOfStale); 770 assertTrue(countOfStale > 1); 771 772 // If the scan was configured in such a way that a full row was NOT retrieved before the 773 // replica switch occurred, then it is possible that all rows were stale 774 if (maxResultSize != Long.MAX_VALUE) { 775 assertTrue(countOfStale <= numRows); 776 } else { 777 assertTrue(countOfStale < numRows); 778 } 779 } 780 } 781}