001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 021import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertNotNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import com.codahale.metrics.Counter; 028import java.io.IOException; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import java.util.concurrent.atomic.AtomicReference; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.StartTestingClusterOption; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.TableNotFoundException; 048import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 052import org.apache.hadoop.hbase.coprocessor.RegionObserver; 053import org.apache.hadoop.hbase.regionserver.HRegionServer; 054import org.apache.hadoop.hbase.regionserver.InternalScanner; 055import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 056import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.zookeeper.KeeperException; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.AfterEach; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.BeforeEach; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.Test; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 073 074/** 075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 076 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 077 */ 078@Tag(LargeTests.TAG) 079@Tag(ClientTests.TAG) 080@SuppressWarnings("deprecation") 081public class TestReplicasClient { 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 084 085 private static TableName TABLE_NAME; 086 private Table table = null; 087 private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());; 088 089 private static RegionInfo hriPrimary; 090 private static RegionInfo hriSecondary; 091 092 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 093 private static final byte[] f = HConstants.CATALOG_FAMILY; 094 095 private final static int REFRESH_PERIOD = 1000; 096 private static ServerName rsServerName; 097 098 /** 099 * This copro is used to synchronize the tests. 100 */ 101 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 102 static final AtomicLong sleepTime = new AtomicLong(0); 103 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 104 static final AtomicInteger countOfNext = new AtomicInteger(0); 105 private static final AtomicReference<CountDownLatch> primaryCdl = 106 new AtomicReference<>(new CountDownLatch(0)); 107 private static final AtomicReference<CountDownLatch> secondaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 110 public SlowMeCopro() { 111 } 112 113 @Override 114 public Optional<RegionObserver> getRegionObserver() { 115 return Optional.of(this); 116 } 117 118 @Override 119 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 120 final Get get, final List<Cell> results) throws IOException { 121 slowdownCode(e); 122 } 123 124 @Override 125 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 126 final Scan scan) throws IOException { 127 slowdownCode(e); 128 } 129 130 @Override 131 public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 132 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 133 throws IOException { 134 // this will slow down a certain next operation if the conditions are met. The slowness 135 // will allow the call to go to a replica 136 if (slowDownNext.get()) { 137 // have some "next" return successfully from the primary; hence countOfNext checked 138 if (countOfNext.incrementAndGet() == 2) { 139 sleepTime.set(2000); 140 slowdownCode(e); 141 } 142 } 143 return true; 144 } 145 146 private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) { 147 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 148 LOG.info("We're the primary replicas."); 149 CountDownLatch latch = getPrimaryCdl().get(); 150 try { 151 if (sleepTime.get() > 0) { 152 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 153 Thread.sleep(sleepTime.get()); 154 } else if (latch.getCount() > 0) { 155 LOG.info("Waiting for the counterCountDownLatch"); 156 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 157 if (latch.getCount() > 0) { 158 throw new RuntimeException("Can't wait more"); 159 } 160 } 161 } catch (InterruptedException e1) { 162 LOG.error(e1.toString(), e1); 163 } 164 } else { 165 LOG.info("We're not the primary replicas."); 166 CountDownLatch latch = getSecondaryCdl().get(); 167 try { 168 if (latch.getCount() > 0) { 169 LOG.info("Waiting for the secondary counterCountDownLatch"); 170 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 171 if (latch.getCount() > 0) { 172 throw new RuntimeException("Can't wait more"); 173 } 174 } 175 } catch (InterruptedException e1) { 176 LOG.error(e1.toString(), e1); 177 } 178 } 179 } 180 181 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 182 return primaryCdl; 183 } 184 185 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 186 return secondaryCdl; 187 } 188 } 189 190 @BeforeAll 191 public static void beforeClass() throws Exception { 192 // enable store file refreshing 193 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 194 REFRESH_PERIOD); 195 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 196 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 197 StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1) 198 .numAlwaysStandByMasters(1).numMasters(1).build(); 199 HTU.startMiniCluster(option); 200 201 // Create table then get the single region for our new table. 202 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor( 203 TableName.valueOf(TestReplicasClient.class.getSimpleName()), 204 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 205 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 206 builder.setCoprocessor(SlowMeCopro.class.getName()); 207 TableDescriptor hdt = builder.build(); 208 HTU.createTable(hdt, new byte[][] { f }, null); 209 TABLE_NAME = hdt.getTableName(); 210 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { 211 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 212 } 213 214 // mock a secondary region info to open 215 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 216 217 // No master 218 LOG.info("Master is going to be stopped"); 219 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 220 Configuration c = new Configuration(HTU.getConfiguration()); 221 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 222 LOG.info("Master has stopped"); 223 224 rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName(); 225 assertNotNull(rsServerName); 226 } 227 228 @AfterAll 229 public static void afterClass() throws Exception { 230 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 231 HTU.shutdownMiniCluster(); 232 } 233 234 @BeforeEach 235 public void before() throws IOException { 236 HTU.getConnection().clearRegionLocationCache(); 237 try { 238 openRegion(hriPrimary); 239 } catch (Exception ignored) { 240 } 241 try { 242 openRegion(hriSecondary); 243 } catch (Exception ignored) { 244 } 245 table = HTU.getConnection().getTable(TABLE_NAME); 246 } 247 248 @AfterEach 249 public void after() throws IOException, KeeperException { 250 try { 251 closeRegion(hriSecondary); 252 } catch (Exception ignored) { 253 } 254 try { 255 closeRegion(hriPrimary); 256 } catch (Exception ignored) { 257 } 258 HTU.getConnection().clearRegionLocationCache(); 259 } 260 261 private HRegionServer getRS() { 262 return HTU.getMiniHBaseCluster().getRegionServer(0); 263 } 264 265 private void openRegion(RegionInfo hri) throws Exception { 266 try { 267 if (isRegionOpened(hri)) return; 268 } catch (Exception e) { 269 } 270 // first version is '0' 271 AdminProtos.OpenRegionRequest orr = 272 RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null); 273 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 274 assertEquals(1, responseOpen.getOpeningStateCount()); 275 assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 276 responseOpen.getOpeningState(0)); 277 checkRegionIsOpened(hri); 278 } 279 280 private void closeRegion(RegionInfo hri) throws Exception { 281 AdminProtos.CloseRegionRequest crr = 282 ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName()); 283 AdminProtos.CloseRegionResponse responseClose = 284 getRS().getRSRpcServices().closeRegion(null, crr); 285 assertTrue(responseClose.getClosed()); 286 287 checkRegionIsClosed(hri.getEncodedName()); 288 } 289 290 private void checkRegionIsOpened(RegionInfo hri) throws Exception { 291 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 292 Thread.sleep(1); 293 } 294 } 295 296 private boolean isRegionOpened(RegionInfo hri) throws Exception { 297 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 298 } 299 300 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 301 302 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 303 Thread.sleep(1); 304 } 305 306 try { 307 assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 308 } catch (NotServingRegionException expected) { 309 // That's how it work: if the region is closed we have an exception. 310 } 311 312 // We don't delete the znode here, because there is not always a znode. 313 } 314 315 private void flushRegion(RegionInfo regionInfo) throws IOException { 316 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 317 } 318 319 @Test 320 public void testUseRegionWithoutReplica() throws Exception { 321 byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica"); 322 openRegion(hriSecondary); 323 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 324 try { 325 Get g = new Get(b1); 326 Result r = table.get(g); 327 assertFalse(r.isStale()); 328 } finally { 329 closeRegion(hriSecondary); 330 } 331 } 332 333 @Test 334 public void testLocations() throws Exception { 335 byte[] b1 = Bytes.toBytes("testLocations"); 336 openRegion(hriSecondary); 337 338 try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration()); 339 RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { 340 conn.clearRegionLocationCache(); 341 List<HRegionLocation> rl = locator.getRegionLocations(b1, true); 342 assertEquals(2, rl.size()); 343 344 rl = locator.getRegionLocations(b1, false); 345 assertEquals(2, rl.size()); 346 347 conn.clearRegionLocationCache(); 348 rl = locator.getRegionLocations(b1, false); 349 assertEquals(2, rl.size()); 350 351 rl = locator.getRegionLocations(b1, true); 352 assertEquals(2, rl.size()); 353 } finally { 354 closeRegion(hriSecondary); 355 } 356 } 357 358 @Test 359 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 360 byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica"); 361 openRegion(hriSecondary); 362 363 try { 364 // A get works and is not stale 365 Get g = new Get(b1); 366 Result r = table.get(g); 367 assertFalse(r.isStale()); 368 } finally { 369 closeRegion(hriSecondary); 370 } 371 } 372 373 @Test 374 public void testGetNoResultStaleRegionWithReplica() throws Exception { 375 byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica"); 376 openRegion(hriSecondary); 377 378 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 379 try { 380 Get g = new Get(b1); 381 g.setConsistency(Consistency.TIMELINE); 382 Result r = table.get(g); 383 assertTrue(r.isStale()); 384 } finally { 385 SlowMeCopro.getPrimaryCdl().get().countDown(); 386 closeRegion(hriSecondary); 387 } 388 } 389 390 @Test 391 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 392 byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica"); 393 openRegion(hriSecondary); 394 395 try { 396 // We sleep; but we won't go to the stale region as we don't get the stale by default. 397 SlowMeCopro.sleepTime.set(2000); 398 Get g = new Get(b1); 399 Result r = table.get(g); 400 assertFalse(r.isStale()); 401 402 } finally { 403 SlowMeCopro.sleepTime.set(0); 404 closeRegion(hriSecondary); 405 } 406 } 407 408 @Test 409 public void testFlushTable() throws Exception { 410 openRegion(hriSecondary); 411 try { 412 flushRegion(hriPrimary); 413 flushRegion(hriSecondary); 414 415 Put p = new Put(row); 416 p.addColumn(f, row, row); 417 table.put(p); 418 419 flushRegion(hriPrimary); 420 flushRegion(hriSecondary); 421 } finally { 422 Delete d = new Delete(row); 423 table.delete(d); 424 closeRegion(hriSecondary); 425 } 426 } 427 428 @Test 429 public void testFlushPrimary() throws Exception { 430 openRegion(hriSecondary); 431 432 try { 433 flushRegion(hriPrimary); 434 435 Put p = new Put(row); 436 p.addColumn(f, row, row); 437 table.put(p); 438 439 flushRegion(hriPrimary); 440 } finally { 441 Delete d = new Delete(row); 442 table.delete(d); 443 closeRegion(hriSecondary); 444 } 445 } 446 447 @Test 448 public void testFlushSecondary() throws Exception { 449 openRegion(hriSecondary); 450 try { 451 flushRegion(hriSecondary); 452 453 Put p = new Put(row); 454 p.addColumn(f, row, row); 455 table.put(p); 456 457 flushRegion(hriSecondary); 458 } catch (TableNotFoundException expected) { 459 } finally { 460 Delete d = new Delete(row); 461 table.delete(d); 462 closeRegion(hriSecondary); 463 } 464 } 465 466 @Test 467 public void testUseRegionWithReplica() throws Exception { 468 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica"); 469 openRegion(hriSecondary); 470 471 try { 472 // A simple put works, even if there here a second replica 473 Put p = new Put(b1); 474 p.addColumn(f, b1, b1); 475 table.put(p); 476 LOG.info("Put done"); 477 478 // A get works and is not stale 479 Get g = new Get(b1); 480 Result r = table.get(g); 481 assertFalse(r.isStale()); 482 assertFalse(r.getColumnCells(f, b1).isEmpty()); 483 LOG.info("get works and is not stale done"); 484 485 // Even if it we have to wait a little on the main region 486 SlowMeCopro.sleepTime.set(2000); 487 g = new Get(b1); 488 r = table.get(g); 489 assertFalse(r.isStale()); 490 assertFalse(r.getColumnCells(f, b1).isEmpty()); 491 SlowMeCopro.sleepTime.set(0); 492 LOG.info("sleep and is not stale done"); 493 494 // But if we ask for stale we will get it 495 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 496 g = new Get(b1); 497 g.setConsistency(Consistency.TIMELINE); 498 r = table.get(g); 499 assertTrue(r.isStale()); 500 assertTrue(r.getColumnCells(f, b1).isEmpty()); 501 SlowMeCopro.getPrimaryCdl().get().countDown(); 502 503 LOG.info("stale done"); 504 505 // exists works and is not stale 506 g = new Get(b1); 507 g.setCheckExistenceOnly(true); 508 r = table.get(g); 509 assertFalse(r.isStale()); 510 assertTrue(r.getExists()); 511 LOG.info("exists not stale done"); 512 513 // exists works on stale but don't see the put 514 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 515 g = new Get(b1); 516 g.setCheckExistenceOnly(true); 517 g.setConsistency(Consistency.TIMELINE); 518 r = table.get(g); 519 assertTrue(r.isStale()); 520 assertFalse(r.getExists(), "The secondary has stale data"); 521 SlowMeCopro.getPrimaryCdl().get().countDown(); 522 LOG.info("exists stale before flush done"); 523 524 flushRegion(hriPrimary); 525 flushRegion(hriSecondary); 526 LOG.info("flush done"); 527 Thread.sleep(1000 + REFRESH_PERIOD * 2); 528 529 // get works and is not stale 530 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 531 g = new Get(b1); 532 g.setConsistency(Consistency.TIMELINE); 533 r = table.get(g); 534 assertTrue(r.isStale()); 535 assertFalse(r.isEmpty()); 536 SlowMeCopro.getPrimaryCdl().get().countDown(); 537 LOG.info("stale done"); 538 539 // exists works on stale and we see the put after the flush 540 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 541 g = new Get(b1); 542 g.setCheckExistenceOnly(true); 543 g.setConsistency(Consistency.TIMELINE); 544 r = table.get(g); 545 assertTrue(r.isStale()); 546 assertTrue(r.getExists()); 547 SlowMeCopro.getPrimaryCdl().get().countDown(); 548 LOG.info("exists stale after flush done"); 549 550 } finally { 551 SlowMeCopro.getPrimaryCdl().get().countDown(); 552 SlowMeCopro.sleepTime.set(0); 553 Delete d = new Delete(b1); 554 table.delete(d); 555 closeRegion(hriSecondary); 556 } 557 } 558 559 @Test 560 public void testHedgedRead() throws Exception { 561 byte[] b1 = Bytes.toBytes("testHedgedRead"); 562 openRegion(hriSecondary); 563 564 try { 565 // A simple put works, even if there here a second replica 566 Put p = new Put(b1); 567 p.addColumn(f, b1, b1); 568 table.put(p); 569 LOG.info("Put done"); 570 571 // A get works and is not stale 572 Get g = new Get(b1); 573 Result r = table.get(g); 574 assertFalse(r.isStale()); 575 assertFalse(r.getColumnCells(f, b1).isEmpty()); 576 LOG.info("get works and is not stale done"); 577 578 // reset 579 AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection(); 580 Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps(); 581 Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin(); 582 hedgedReadOps.dec(hedgedReadOps.getCount()); 583 hedgedReadWin.dec(hedgedReadWin.getCount()); 584 585 // Wait a little on the main region, just enough to happen once hedged read 586 // and hedged read did not returned faster 587 long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); 588 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not 589 // trigger the hedged read... 590 SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100); 591 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 592 g = new Get(b1); 593 g.setConsistency(Consistency.TIMELINE); 594 r = table.get(g); 595 assertFalse(r.isStale()); 596 assertFalse(r.getColumnCells(f, b1).isEmpty()); 597 assertEquals(1, hedgedReadOps.getCount()); 598 assertEquals(0, hedgedReadWin.getCount()); 599 SlowMeCopro.sleepTime.set(0); 600 SlowMeCopro.getSecondaryCdl().get().countDown(); 601 LOG.info("hedged read occurred but not faster"); 602 603 // But if we ask for stale we will get it and hedged read returned faster 604 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 605 g = new Get(b1); 606 g.setConsistency(Consistency.TIMELINE); 607 r = table.get(g); 608 assertTrue(r.isStale()); 609 assertTrue(r.getColumnCells(f, b1).isEmpty()); 610 assertEquals(2, hedgedReadOps.getCount()); 611 // we update the metrics after we finish the request so we use a waitFor here, use assert 612 // directly may cause failure if we run too fast. 613 HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1); 614 SlowMeCopro.getPrimaryCdl().get().countDown(); 615 LOG.info("hedged read occurred and faster"); 616 617 } finally { 618 SlowMeCopro.getPrimaryCdl().get().countDown(); 619 SlowMeCopro.getSecondaryCdl().get().countDown(); 620 SlowMeCopro.sleepTime.set(0); 621 Delete d = new Delete(b1); 622 table.delete(d); 623 closeRegion(hriSecondary); 624 } 625 } 626 627 @Test 628 public void testScanMetricsByRegion() throws Exception { 629 byte[] b1 = Bytes.toBytes("testScanMetricsByRegion"); 630 openRegion(hriSecondary); 631 632 try { 633 Put p = new Put(b1); 634 p.addColumn(f, b1, b1); 635 table.put(p); 636 LOG.info("Put done"); 637 flushRegion(hriPrimary); 638 639 // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to secondary replica 640 Thread.sleep(2 * REFRESH_PERIOD); 641 642 // Explicitly read replica 0 643 Scan scan = new Scan(); 644 scan.setEnableScanMetricsByRegion(true); 645 scan.withStartRow(b1, true); 646 scan.withStopRow(b1, true); 647 // Assert row was read from primary replica along with asserting scan metrics by region 648 assertScanMetrics(scan, hriPrimary, false); 649 LOG.info("Scanned primary replica"); 650 651 // Read from region replica 652 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 653 scan = new Scan(); 654 scan.setEnableScanMetricsByRegion(true); 655 scan.withStartRow(b1, true); 656 scan.withStopRow(b1, true); 657 scan.setConsistency(Consistency.TIMELINE); 658 // Assert row was read from secondary replica along with asserting scan metrics by region 659 assertScanMetrics(scan, hriSecondary, true); 660 LOG.info("Scanned secondary replica "); 661 } finally { 662 SlowMeCopro.getPrimaryCdl().get().countDown(); 663 Delete d = new Delete(b1); 664 table.delete(d); 665 closeRegion(hriSecondary); 666 } 667 } 668 669 private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale) 670 throws IOException { 671 try (ResultScanner rs = table.getScanner(scan);) { 672 for (Result r : rs) { 673 assertEquals(isStale, r.isStale()); 674 assertFalse(r.isEmpty()); 675 } 676 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 677 rs.getScanMetrics().collectMetricsByRegion(false); 678 assertEquals(1, scanMetricsByRegion.size()); 679 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 680 .entrySet()) { 681 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 682 Map<String, Long> metrics = entry.getValue(); 683 assertEquals(rsServerName, scanMetricsRegionInfo.getServerName()); 684 assertEquals(regionInfo.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName()); 685 assertEquals(1, (long) metrics.get(REGIONS_SCANNED_METRIC_NAME)); 686 assertEquals(1, (long) metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 687 } 688 } 689 } 690}