001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import com.codahale.metrics.Counter; 025import java.io.IOException; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.RegionLocations; 045import org.apache.hadoop.hbase.StartMiniClusterOption; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.RegionObserver; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.InternalScanner; 053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.zookeeper.KeeperException; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 072 073/** 074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 075 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 076 */ 077@Category({ LargeTests.class, ClientTests.class }) 078public class TestReplicasClient { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestReplicasClient.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 085 086 private static TableName TABLE_NAME; 087 private Table table = null; 088 private static final byte[] row = TestReplicasClient.class.getName().getBytes(); 089 090 private static RegionInfo hriPrimary; 091 private static RegionInfo hriSecondary; 092 093 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 094 private static final byte[] f = HConstants.CATALOG_FAMILY; 095 096 private final static int REFRESH_PERIOD = 1000; 097 098 /** 099 * This copro is used to synchronize the tests. 100 */ 101 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 102 static final AtomicInteger primaryCountOfScan = new AtomicInteger(0); 103 static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0); 104 static final AtomicLong sleepTime = new AtomicLong(0); 105 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 106 static final AtomicInteger countOfNext = new AtomicInteger(0); 107 private static final AtomicReference<CountDownLatch> primaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 private static final AtomicReference<CountDownLatch> secondaryCdl = 110 new AtomicReference<>(new CountDownLatch(0)); 111 112 public SlowMeCopro() { 113 } 114 115 @Override 116 public Optional<RegionObserver> getRegionObserver() { 117 return Optional.of(this); 118 } 119 120 @Override 121 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 122 final List<Cell> results) throws IOException { 123 slowdownCode(e); 124 } 125 126 private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) { 127 LOG.info("==========scan {} ", e.getEnvironment().getRegion().getRegionInfo().getReplicaId(), 128 new Exception()); 129 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 130 primaryCountOfScan.incrementAndGet(); 131 } else { 132 secondaryCountOfScan.incrementAndGet(); 133 } 134 } 135 136 @Override 137 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 138 final Scan scan) throws IOException { 139 incrementScanCount(e); 140 slowdownCode(e); 141 } 142 143 @Override 144 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 145 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 146 throws IOException { 147 incrementScanCount(e); 148 // this will slow down a certain next operation if the conditions are met. The slowness 149 // will allow the call to go to a replica 150 if (slowDownNext.get()) { 151 // have some "next" return successfully from the primary; hence countOfNext checked 152 if (countOfNext.incrementAndGet() == 2) { 153 sleepTime.set(2000); 154 slowdownCode(e); 155 } 156 } 157 return true; 158 } 159 160 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 161 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 162 LOG.info("We're the primary replicas."); 163 CountDownLatch latch = getPrimaryCdl().get(); 164 try { 165 if (sleepTime.get() > 0) { 166 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 167 Thread.sleep(sleepTime.get()); 168 } else if (latch.getCount() > 0) { 169 LOG.info("Waiting for the counterCountDownLatch"); 170 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 171 if (latch.getCount() > 0) { 172 throw new RuntimeException("Can't wait more"); 173 } 174 } 175 } catch (InterruptedException e1) { 176 LOG.error(e1.toString(), e1); 177 } 178 } else { 179 LOG.info("We're not the primary replicas."); 180 CountDownLatch latch = getSecondaryCdl().get(); 181 try { 182 if (latch.getCount() > 0) { 183 LOG.info("Waiting for the secondary counterCountDownLatch"); 184 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 185 if (latch.getCount() > 0) { 186 throw new RuntimeException("Can't wait more"); 187 } 188 } 189 } catch (InterruptedException e1) { 190 LOG.error(e1.toString(), e1); 191 } 192 } 193 } 194 195 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 196 return primaryCdl; 197 } 198 199 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 200 return secondaryCdl; 201 } 202 } 203 204 @BeforeClass 205 public static void beforeClass() throws Exception { 206 // enable store file refreshing 207 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 208 REFRESH_PERIOD); 209 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 210 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 211 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); 212 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1) 213 .numAlwaysStandByMasters(1).numMasters(1).build(); 214 HTU.startMiniCluster(option); 215 216 // Create table then get the single region for our new table. 217 TABLE_NAME = TableName.valueOf(TestReplicasClient.class.getSimpleName()); 218 HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME); 219 hdt.addCoprocessor(SlowMeCopro.class.getName()); 220 HTU.createTable(hdt, new byte[][] { f }, null); 221 222 try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) { 223 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 224 } 225 226 // mock a secondary region info to open 227 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 228 229 // No master 230 LOG.info("Master is going to be stopped"); 231 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 232 Configuration c = new Configuration(HTU.getConfiguration()); 233 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 234 LOG.info("Master has stopped"); 235 } 236 237 @AfterClass 238 public static void afterClass() throws Exception { 239 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 240 HTU.shutdownMiniCluster(); 241 } 242 243 @Before 244 public void before() throws IOException { 245 try { 246 openRegion(hriPrimary); 247 } catch (Exception ignored) { 248 } 249 try { 250 openRegion(hriSecondary); 251 } catch (Exception ignored) { 252 } 253 SlowMeCopro.slowDownNext.set(false); 254 SlowMeCopro.sleepTime.set(0); 255 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 256 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0)); 257 table = HTU.getConnection().getTable(TABLE_NAME); 258 try (ResultScanner scanner = table.getScanner(new Scan())) { 259 for (;;) { 260 Result result = scanner.next(); 261 if (result == null) { 262 break; 263 } 264 table.delete(new Delete(result.getRow())); 265 } 266 } 267 flushRegion(hriPrimary); 268 HTU.getConnection().clearRegionLocationCache(); 269 SlowMeCopro.primaryCountOfScan.set(0); 270 SlowMeCopro.secondaryCountOfScan.set(0); 271 SlowMeCopro.countOfNext.set(0); 272 } 273 274 @After 275 public void after() throws IOException, KeeperException { 276 SlowMeCopro.getPrimaryCdl().get().countDown(); 277 SlowMeCopro.getSecondaryCdl().get().countDown(); 278 try { 279 closeRegion(hriSecondary); 280 } catch (Exception ignored) { 281 } 282 try { 283 closeRegion(hriPrimary); 284 } catch (Exception ignored) { 285 } 286 if (table != null) { 287 table.close(); 288 } 289 HTU.getConnection().clearRegionLocationCache(); 290 } 291 292 private HRegionServer getRS() { 293 return HTU.getMiniHBaseCluster().getRegionServer(0); 294 } 295 296 private void openRegion(RegionInfo hri) throws Exception { 297 try { 298 if (isRegionOpened(hri)) { 299 return; 300 } 301 } catch (Exception e) { 302 } 303 // first version is '0' 304 AdminProtos.OpenRegionRequest orr = 305 RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null); 306 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 307 assertEquals(1, responseOpen.getOpeningStateCount()); 308 assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 309 responseOpen.getOpeningState(0)); 310 checkRegionIsOpened(hri); 311 } 312 313 private void closeRegion(RegionInfo hri) throws Exception { 314 AdminProtos.CloseRegionRequest crr = 315 ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName()); 316 AdminProtos.CloseRegionResponse responseClose = 317 getRS().getRSRpcServices().closeRegion(null, crr); 318 assertTrue(responseClose.getClosed()); 319 320 checkRegionIsClosed(hri.getEncodedName()); 321 } 322 323 private void checkRegionIsOpened(RegionInfo hri) throws Exception { 324 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 325 Thread.sleep(1); 326 } 327 } 328 329 private boolean isRegionOpened(RegionInfo hri) throws Exception { 330 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 331 } 332 333 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 334 335 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 336 Thread.sleep(1); 337 } 338 339 try { 340 assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 341 } catch (NotServingRegionException expected) { 342 // That's how it work: if the region is closed we have an exception. 343 } 344 345 // We don't delete the znode here, because there is not always a znode. 346 } 347 348 private void flushRegion(RegionInfo regionInfo) throws IOException { 349 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 350 } 351 352 @Test 353 public void testUseRegionWithoutReplica() throws Exception { 354 byte[] b1 = "testUseRegionWithoutReplica".getBytes(); 355 Get g = new Get(b1); 356 Result r = table.get(g); 357 assertFalse(r.isStale()); 358 } 359 360 @Test 361 public void testLocations() throws Exception { 362 byte[] b1 = "testLocations".getBytes(); 363 ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection(); 364 hc.clearRegionLocationCache(); 365 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); 366 assertEquals(2, rl.size()); 367 368 rl = hc.locateRegion(table.getName(), b1, true, false); 369 assertEquals(2, rl.size()); 370 371 hc.clearRegionLocationCache(); 372 rl = hc.locateRegion(table.getName(), b1, true, false); 373 assertEquals(2, rl.size()); 374 375 rl = hc.locateRegion(table.getName(), b1, false, false); 376 assertEquals(2, rl.size()); 377 } 378 379 @Test 380 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 381 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes(); 382 // A get works and is not stale 383 Get g = new Get(b1); 384 Result r = table.get(g); 385 assertFalse(r.isStale()); 386 } 387 388 @Test 389 public void testGetNoResultStaleRegionWithReplica() throws Exception { 390 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); 391 openRegion(hriSecondary); 392 393 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 394 Get g = new Get(b1); 395 g.setConsistency(Consistency.TIMELINE); 396 Result r = table.get(g); 397 assertTrue(r.isStale()); 398 } 399 400 @Test 401 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 402 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes(); 403 // We sleep; but we won't go to the stale region as we don't get the stale by default. 404 SlowMeCopro.sleepTime.set(2000); 405 Get g = new Get(b1); 406 Result r = table.get(g); 407 assertFalse(r.isStale()); 408 } 409 410 @Test 411 public void testFlushTable() throws Exception { 412 flushRegion(hriPrimary); 413 flushRegion(hriSecondary); 414 415 Put p = new Put(row); 416 p.addColumn(f, row, row); 417 table.put(p); 418 419 flushRegion(hriPrimary); 420 flushRegion(hriSecondary); 421 } 422 423 @Test 424 public void testFlushPrimary() throws Exception { 425 flushRegion(hriPrimary); 426 427 Put p = new Put(row); 428 p.addColumn(f, row, row); 429 table.put(p); 430 431 flushRegion(hriPrimary); 432 } 433 434 @Test 435 public void testFlushSecondary() throws Exception { 436 flushRegion(hriSecondary); 437 438 Put p = new Put(row); 439 p.addColumn(f, row, row); 440 table.put(p); 441 442 flushRegion(hriSecondary); 443 } 444 445 @Test 446 public void testUseRegionWithReplica() throws Exception { 447 byte[] b1 = "testUseRegionWithReplica".getBytes(); 448 // A simple put works, even if there here a second replica 449 Put p = new Put(b1); 450 p.addColumn(f, b1, b1); 451 table.put(p); 452 LOG.info("Put done"); 453 454 // A get works and is not stale 455 Get g = new Get(b1); 456 Result r = table.get(g); 457 assertFalse(r.isStale()); 458 assertFalse(r.getColumnCells(f, b1).isEmpty()); 459 LOG.info("get works and is not stale done"); 460 461 // Even if it we have to wait a little on the main region 462 SlowMeCopro.sleepTime.set(2000); 463 g = new Get(b1); 464 r = table.get(g); 465 assertFalse(r.isStale()); 466 assertFalse(r.getColumnCells(f, b1).isEmpty()); 467 SlowMeCopro.sleepTime.set(0); 468 LOG.info("sleep and is not stale done"); 469 470 // But if we ask for stale we will get it 471 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 472 g = new Get(b1); 473 g.setConsistency(Consistency.TIMELINE); 474 r = table.get(g); 475 assertTrue(r.isStale()); 476 assertTrue(r.getColumnCells(f, b1).isEmpty()); 477 SlowMeCopro.getPrimaryCdl().get().countDown(); 478 479 LOG.info("stale done"); 480 481 // exists works and is not stale 482 g = new Get(b1); 483 g.setCheckExistenceOnly(true); 484 r = table.get(g); 485 assertFalse(r.isStale()); 486 assertTrue(r.getExists()); 487 LOG.info("exists not stale done"); 488 489 // exists works on stale but don't see the put 490 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 491 g = new Get(b1); 492 g.setCheckExistenceOnly(true); 493 g.setConsistency(Consistency.TIMELINE); 494 r = table.get(g); 495 assertTrue(r.isStale()); 496 assertFalse("The secondary has stale data", r.getExists()); 497 SlowMeCopro.getPrimaryCdl().get().countDown(); 498 LOG.info("exists stale before flush done"); 499 500 flushRegion(hriPrimary); 501 flushRegion(hriSecondary); 502 LOG.info("flush done"); 503 Thread.sleep(1000 + REFRESH_PERIOD * 2); 504 505 // get works and is not stale 506 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 507 g = new Get(b1); 508 g.setConsistency(Consistency.TIMELINE); 509 r = table.get(g); 510 assertTrue(r.isStale()); 511 assertFalse(r.isEmpty()); 512 SlowMeCopro.getPrimaryCdl().get().countDown(); 513 LOG.info("stale done"); 514 515 // exists works on stale and we see the put after the flush 516 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 517 g = new Get(b1); 518 g.setCheckExistenceOnly(true); 519 g.setConsistency(Consistency.TIMELINE); 520 r = table.get(g); 521 assertTrue(r.isStale()); 522 assertTrue(r.getExists()); 523 SlowMeCopro.getPrimaryCdl().get().countDown(); 524 LOG.info("exists stale after flush done"); 525 } 526 527 @Test 528 public void testHedgedRead() throws Exception { 529 byte[] b1 = "testHedgedRead".getBytes(); 530 // A simple put works, even if there here a second replica 531 Put p = new Put(b1); 532 p.addColumn(f, b1, b1); 533 table.put(p); 534 LOG.info("Put done"); 535 536 // A get works and is not stale 537 Get g = new Get(b1); 538 Result r = table.get(g); 539 assertFalse(r.isStale()); 540 assertFalse(r.getColumnCells(f, b1).isEmpty()); 541 LOG.info("get works and is not stale done"); 542 543 // reset 544 ClusterConnection connection = (ClusterConnection) HTU.getConnection(); 545 Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps; 546 Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin; 547 hedgedReadOps.dec(hedgedReadOps.getCount()); 548 hedgedReadWin.dec(hedgedReadWin.getCount()); 549 550 // Wait a little on the main region, just enough to happen once hedged read 551 // and hedged read did not returned faster 552 int primaryCallTimeoutMicroSecond = 553 connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond(); 554 SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond)); 555 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 556 g = new Get(b1); 557 g.setConsistency(Consistency.TIMELINE); 558 r = table.get(g); 559 assertFalse(r.isStale()); 560 assertFalse(r.getColumnCells(f, b1).isEmpty()); 561 assertEquals(1, hedgedReadOps.getCount()); 562 assertEquals(0, hedgedReadWin.getCount()); 563 SlowMeCopro.sleepTime.set(0); 564 SlowMeCopro.getSecondaryCdl().get().countDown(); 565 LOG.info("hedged read occurred but not faster"); 566 567 // But if we ask for stale we will get it and hedged read returned faster 568 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 569 g = new Get(b1); 570 g.setConsistency(Consistency.TIMELINE); 571 r = table.get(g); 572 assertTrue(r.isStale()); 573 assertTrue(r.getColumnCells(f, b1).isEmpty()); 574 assertEquals(2, hedgedReadOps.getCount()); 575 assertEquals(1, hedgedReadWin.getCount()); 576 SlowMeCopro.getPrimaryCdl().get().countDown(); 577 LOG.info("hedged read occurred and faster"); 578 } 579 580 @Test 581 public void testScanWithReplicas() throws Exception { 582 // simple scan 583 runMultipleScansOfOneType(false, false); 584 } 585 586 @Test 587 public void testSmallScanWithReplicas() throws Exception { 588 // small scan 589 runMultipleScansOfOneType(false, true); 590 } 591 592 @Test 593 public void testReverseScanWithReplicas() throws Exception { 594 // reverse scan 595 runMultipleScansOfOneType(true, false); 596 } 597 598 @Test 599 public void testCancelOfScan() throws Exception { 600 int numRows = 100; 601 for (int i = 0; i < numRows; i++) { 602 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 603 Put p = new Put(b1); 604 p.addColumn(f, b1, b1); 605 table.put(p); 606 } 607 LOG.debug("PUT done"); 608 int caching = 20; 609 byte[] start; 610 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 611 612 flushRegion(hriPrimary); 613 LOG.info("flush done"); 614 Thread.sleep(1000 + REFRESH_PERIOD * 2); 615 616 // now make some 'next' calls slow 617 SlowMeCopro.slowDownNext.set(true); 618 SlowMeCopro.countOfNext.set(0); 619 SlowMeCopro.sleepTime.set(5000); 620 621 Scan scan = new Scan().withStartRow(start); 622 scan.setCaching(caching); 623 scan.setConsistency(Consistency.TIMELINE); 624 ResultScanner scanner = table.getScanner(scan); 625 Iterator<Result> iter = scanner.iterator(); 626 iter.next(); 627 assertTrue(((ClientScanner) scanner).isAnyRPCcancelled()); 628 SlowMeCopro.slowDownNext.set(false); 629 SlowMeCopro.countOfNext.set(0); 630 } 631 632 // make sure the scan will only go to the specific replica 633 @Test 634 public void testScanOnSpecificReplica() throws Exception { 635 Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE); 636 try (ResultScanner scanner = table.getScanner(scan)) { 637 scanner.next(); 638 } 639 assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0); 640 assertEquals(0, SlowMeCopro.primaryCountOfScan.get()); 641 } 642 643 // make sure the scan will only go to the specific replica 644 @Test 645 public void testReverseScanOnSpecificReplica() throws Exception { 646 Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE); 647 try (ResultScanner scanner = table.getScanner(scan)) { 648 scanner.next(); 649 } 650 assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0); 651 assertEquals(0, SlowMeCopro.primaryCountOfScan.get()); 652 } 653 654 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception { 655 int numRows = 100; 656 int numCols = 10; 657 for (int i = 0; i < numRows; i++) { 658 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 659 for (int col = 0; col < numCols; col++) { 660 Put p = new Put(b1); 661 String qualifier = "qualifer" + col; 662 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes()); 663 p.add(kv); 664 table.put(p); 665 } 666 } 667 LOG.debug("PUT done"); 668 int caching = 20; 669 long maxResultSize = Long.MAX_VALUE; 670 671 byte[] start; 672 if (reversed) { 673 start = Bytes.toBytes("testUseRegionWithReplica" + (numRows - 1)); 674 } else { 675 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 676 } 677 678 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 679 numCols, false, false); 680 681 // Even if we were to slow the server down, unless we ask for stale 682 // we won't get it 683 SlowMeCopro.sleepTime.set(5000); 684 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, 685 numCols, false, false); 686 SlowMeCopro.sleepTime.set(0); 687 688 flushRegion(hriPrimary); 689 LOG.info("flush done"); 690 Thread.sleep(1000 + REFRESH_PERIOD * 2); 691 692 // Now set the flag to get a response even if stale 693 SlowMeCopro.sleepTime.set(5000); 694 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 695 numCols, true, false); 696 SlowMeCopro.sleepTime.set(0); 697 698 // now make some 'next' calls slow 699 SlowMeCopro.slowDownNext.set(true); 700 SlowMeCopro.countOfNext.set(0); 701 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 702 numCols, true, true); 703 SlowMeCopro.slowDownNext.set(false); 704 SlowMeCopro.countOfNext.set(0); 705 706 // Make sure we do not get stale data.. 707 SlowMeCopro.sleepTime.set(5000); 708 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, 709 numCols, false, false); 710 SlowMeCopro.sleepTime.set(0); 711 712 // While the next calls are slow, set maxResultSize to 1 so that some partial results will be 713 // returned from the server before the replica switch occurs. 714 maxResultSize = 1; 715 SlowMeCopro.slowDownNext.set(true); 716 SlowMeCopro.countOfNext.set(0); 717 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, 718 numCols, true, true); 719 maxResultSize = Long.MAX_VALUE; 720 SlowMeCopro.slowDownNext.set(false); 721 SlowMeCopro.countOfNext.set(0); 722 } 723 724 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency, 725 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols, 726 boolean staleExpected, boolean slowNext) throws Exception { 727 Scan scan = new Scan().withStartRow(startRow); 728 scan.setCaching(caching); 729 scan.setMaxResultSize(maxResultSize); 730 scan.setReversed(reversed); 731 scan.setSmall(small); 732 scan.setConsistency(consistency); 733 ResultScanner scanner = table.getScanner(scan); 734 Iterator<Result> iter = scanner.iterator(); 735 736 // Maps of row keys that we have seen so far 737 HashMap<String, Boolean> map = new HashMap<>(); 738 739 // Tracked metrics 740 int rowCount = 0; 741 int cellCount = 0; 742 int countOfStale = 0; 743 744 while (iter.hasNext()) { 745 rowCount++; 746 Result r = iter.next(); 747 String row = new String(r.getRow()); 748 749 if (map.containsKey(row)) { 750 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow())); 751 } 752 753 map.put(row, true); 754 cellCount += r.rawCells().length; 755 756 if (!slowNext) { 757 assertTrue(r.isStale() == staleExpected); 758 } 759 if (r.isStale()) { 760 countOfStale++; 761 } 762 } 763 assertTrue("Count of rows " + rowCount + " num rows expected " + numRows, rowCount == numRows); 764 assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols, 765 cellCount == (numRows * numCols)); 766 767 if (slowNext) { 768 LOG.debug("Count of Stale " + countOfStale); 769 assertTrue(countOfStale > 1); 770 771 // If the scan was configured in such a way that a full row was NOT retrieved before the 772 // replica switch occurred, then it is possible that all rows were stale 773 if (maxResultSize != Long.MAX_VALUE) { 774 assertTrue(countOfStale <= numRows); 775 } else { 776 assertTrue(countOfStale < numRows); 777 } 778 } 779 } 780}