001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 021import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 022 023import com.codahale.metrics.Counter; 024import java.io.IOException; 025import java.util.List; 026import java.util.Map; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.NotServingRegionException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.StartTestingClusterOption; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.InternalScanner; 052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.zookeeper.KeeperException; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 072 073/** 074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 075 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 076 */ 077@Category({ LargeTests.class, ClientTests.class }) 078@SuppressWarnings("deprecation") 079public class TestReplicasClient { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestReplicasClient.class); 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 086 087 private static TableName TABLE_NAME; 088 private Table table = null; 089 private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());; 090 091 private static RegionInfo hriPrimary; 092 private static RegionInfo hriSecondary; 093 094 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 095 private static final byte[] f = HConstants.CATALOG_FAMILY; 096 097 private final static int REFRESH_PERIOD = 1000; 098 private static ServerName rsServerName; 099 100 /** 101 * This copro is used to synchronize the tests. 102 */ 103 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 104 static final AtomicLong sleepTime = new AtomicLong(0); 105 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 106 static final AtomicInteger countOfNext = new AtomicInteger(0); 107 private static final AtomicReference<CountDownLatch> primaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 private static final AtomicReference<CountDownLatch> secondaryCdl = 110 new AtomicReference<>(new CountDownLatch(0)); 111 112 public SlowMeCopro() { 113 } 114 115 @Override 116 public Optional<RegionObserver> getRegionObserver() { 117 return Optional.of(this); 118 } 119 120 @Override 121 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 122 final Get get, final List<Cell> results) throws IOException { 123 slowdownCode(e); 124 } 125 126 @Override 127 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 128 final Scan scan) throws IOException { 129 slowdownCode(e); 130 } 131 132 @Override 133 public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 134 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 135 throws IOException { 136 // this will slow down a certain next operation if the conditions are met. The slowness 137 // will allow the call to go to a replica 138 if (slowDownNext.get()) { 139 // have some "next" return successfully from the primary; hence countOfNext checked 140 if (countOfNext.incrementAndGet() == 2) { 141 sleepTime.set(2000); 142 slowdownCode(e); 143 } 144 } 145 return true; 146 } 147 148 private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) { 149 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 150 LOG.info("We're the primary replicas."); 151 CountDownLatch latch = getPrimaryCdl().get(); 152 try { 153 if (sleepTime.get() > 0) { 154 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 155 Thread.sleep(sleepTime.get()); 156 } else if (latch.getCount() > 0) { 157 LOG.info("Waiting for the counterCountDownLatch"); 158 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 159 if (latch.getCount() > 0) { 160 throw new RuntimeException("Can't wait more"); 161 } 162 } 163 } catch (InterruptedException e1) { 164 LOG.error(e1.toString(), e1); 165 } 166 } else { 167 LOG.info("We're not the primary replicas."); 168 CountDownLatch latch = getSecondaryCdl().get(); 169 try { 170 if (latch.getCount() > 0) { 171 LOG.info("Waiting for the secondary counterCountDownLatch"); 172 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 173 if (latch.getCount() > 0) { 174 throw new RuntimeException("Can't wait more"); 175 } 176 } 177 } catch (InterruptedException e1) { 178 LOG.error(e1.toString(), e1); 179 } 180 } 181 } 182 183 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 184 return primaryCdl; 185 } 186 187 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 188 return secondaryCdl; 189 } 190 } 191 192 @BeforeClass 193 public static void beforeClass() throws Exception { 194 // enable store file refreshing 195 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 196 REFRESH_PERIOD); 197 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 198 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 199 StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1) 200 .numAlwaysStandByMasters(1).numMasters(1).build(); 201 HTU.startMiniCluster(option); 202 203 // Create table then get the single region for our new table. 204 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor( 205 TableName.valueOf(TestReplicasClient.class.getSimpleName()), 206 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 207 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 208 builder.setCoprocessor(SlowMeCopro.class.getName()); 209 TableDescriptor hdt = builder.build(); 210 HTU.createTable(hdt, new byte[][] { f }, null); 211 TABLE_NAME = hdt.getTableName(); 212 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { 213 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 214 } 215 216 // mock a secondary region info to open 217 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 218 219 // No master 220 LOG.info("Master is going to be stopped"); 221 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 222 Configuration c = new Configuration(HTU.getConfiguration()); 223 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 224 LOG.info("Master has stopped"); 225 226 rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName(); 227 Assert.assertNotNull(rsServerName); 228 } 229 230 @AfterClass 231 public static void afterClass() throws Exception { 232 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 233 HTU.shutdownMiniCluster(); 234 } 235 236 @Before 237 public void before() throws IOException { 238 HTU.getConnection().clearRegionLocationCache(); 239 try { 240 openRegion(hriPrimary); 241 } catch (Exception ignored) { 242 } 243 try { 244 openRegion(hriSecondary); 245 } catch (Exception ignored) { 246 } 247 table = HTU.getConnection().getTable(TABLE_NAME); 248 } 249 250 @After 251 public void after() throws IOException, KeeperException { 252 try { 253 closeRegion(hriSecondary); 254 } catch (Exception ignored) { 255 } 256 try { 257 closeRegion(hriPrimary); 258 } catch (Exception ignored) { 259 } 260 HTU.getConnection().clearRegionLocationCache(); 261 } 262 263 private HRegionServer getRS() { 264 return HTU.getMiniHBaseCluster().getRegionServer(0); 265 } 266 267 private void openRegion(RegionInfo hri) throws Exception { 268 try { 269 if (isRegionOpened(hri)) return; 270 } catch (Exception e) { 271 } 272 // first version is '0' 273 AdminProtos.OpenRegionRequest orr = 274 RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null); 275 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 276 Assert.assertEquals(1, responseOpen.getOpeningStateCount()); 277 Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 278 responseOpen.getOpeningState(0)); 279 checkRegionIsOpened(hri); 280 } 281 282 private void closeRegion(RegionInfo hri) throws Exception { 283 AdminProtos.CloseRegionRequest crr = 284 ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName()); 285 AdminProtos.CloseRegionResponse responseClose = 286 getRS().getRSRpcServices().closeRegion(null, crr); 287 Assert.assertTrue(responseClose.getClosed()); 288 289 checkRegionIsClosed(hri.getEncodedName()); 290 } 291 292 private void checkRegionIsOpened(RegionInfo hri) throws Exception { 293 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 294 Thread.sleep(1); 295 } 296 } 297 298 private boolean isRegionOpened(RegionInfo hri) throws Exception { 299 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 300 } 301 302 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 303 304 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 305 Thread.sleep(1); 306 } 307 308 try { 309 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 310 } catch (NotServingRegionException expected) { 311 // That's how it work: if the region is closed we have an exception. 312 } 313 314 // We don't delete the znode here, because there is not always a znode. 315 } 316 317 private void flushRegion(RegionInfo regionInfo) throws IOException { 318 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 319 } 320 321 @Test 322 public void testUseRegionWithoutReplica() throws Exception { 323 byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica"); 324 openRegion(hriSecondary); 325 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 326 try { 327 Get g = new Get(b1); 328 Result r = table.get(g); 329 Assert.assertFalse(r.isStale()); 330 } finally { 331 closeRegion(hriSecondary); 332 } 333 } 334 335 @Test 336 public void testLocations() throws Exception { 337 byte[] b1 = Bytes.toBytes("testLocations"); 338 openRegion(hriSecondary); 339 340 try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration()); 341 RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { 342 conn.clearRegionLocationCache(); 343 List<HRegionLocation> rl = locator.getRegionLocations(b1, true); 344 Assert.assertEquals(2, rl.size()); 345 346 rl = locator.getRegionLocations(b1, false); 347 Assert.assertEquals(2, rl.size()); 348 349 conn.clearRegionLocationCache(); 350 rl = locator.getRegionLocations(b1, false); 351 Assert.assertEquals(2, rl.size()); 352 353 rl = locator.getRegionLocations(b1, true); 354 Assert.assertEquals(2, rl.size()); 355 } finally { 356 closeRegion(hriSecondary); 357 } 358 } 359 360 @Test 361 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 362 byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica"); 363 openRegion(hriSecondary); 364 365 try { 366 // A get works and is not stale 367 Get g = new Get(b1); 368 Result r = table.get(g); 369 Assert.assertFalse(r.isStale()); 370 } finally { 371 closeRegion(hriSecondary); 372 } 373 } 374 375 @Test 376 public void testGetNoResultStaleRegionWithReplica() throws Exception { 377 byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica"); 378 openRegion(hriSecondary); 379 380 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 381 try { 382 Get g = new Get(b1); 383 g.setConsistency(Consistency.TIMELINE); 384 Result r = table.get(g); 385 Assert.assertTrue(r.isStale()); 386 } finally { 387 SlowMeCopro.getPrimaryCdl().get().countDown(); 388 closeRegion(hriSecondary); 389 } 390 } 391 392 @Test 393 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 394 byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica"); 395 openRegion(hriSecondary); 396 397 try { 398 // We sleep; but we won't go to the stale region as we don't get the stale by default. 399 SlowMeCopro.sleepTime.set(2000); 400 Get g = new Get(b1); 401 Result r = table.get(g); 402 Assert.assertFalse(r.isStale()); 403 404 } finally { 405 SlowMeCopro.sleepTime.set(0); 406 closeRegion(hriSecondary); 407 } 408 } 409 410 @Test 411 public void testFlushTable() throws Exception { 412 openRegion(hriSecondary); 413 try { 414 flushRegion(hriPrimary); 415 flushRegion(hriSecondary); 416 417 Put p = new Put(row); 418 p.addColumn(f, row, row); 419 table.put(p); 420 421 flushRegion(hriPrimary); 422 flushRegion(hriSecondary); 423 } finally { 424 Delete d = new Delete(row); 425 table.delete(d); 426 closeRegion(hriSecondary); 427 } 428 } 429 430 @Test 431 public void testFlushPrimary() throws Exception { 432 openRegion(hriSecondary); 433 434 try { 435 flushRegion(hriPrimary); 436 437 Put p = new Put(row); 438 p.addColumn(f, row, row); 439 table.put(p); 440 441 flushRegion(hriPrimary); 442 } finally { 443 Delete d = new Delete(row); 444 table.delete(d); 445 closeRegion(hriSecondary); 446 } 447 } 448 449 @Test 450 public void testFlushSecondary() throws Exception { 451 openRegion(hriSecondary); 452 try { 453 flushRegion(hriSecondary); 454 455 Put p = new Put(row); 456 p.addColumn(f, row, row); 457 table.put(p); 458 459 flushRegion(hriSecondary); 460 } catch (TableNotFoundException expected) { 461 } finally { 462 Delete d = new Delete(row); 463 table.delete(d); 464 closeRegion(hriSecondary); 465 } 466 } 467 468 @Test 469 public void testUseRegionWithReplica() throws Exception { 470 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica"); 471 openRegion(hriSecondary); 472 473 try { 474 // A simple put works, even if there here a second replica 475 Put p = new Put(b1); 476 p.addColumn(f, b1, b1); 477 table.put(p); 478 LOG.info("Put done"); 479 480 // A get works and is not stale 481 Get g = new Get(b1); 482 Result r = table.get(g); 483 Assert.assertFalse(r.isStale()); 484 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 485 LOG.info("get works and is not stale done"); 486 487 // Even if it we have to wait a little on the main region 488 SlowMeCopro.sleepTime.set(2000); 489 g = new Get(b1); 490 r = table.get(g); 491 Assert.assertFalse(r.isStale()); 492 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 493 SlowMeCopro.sleepTime.set(0); 494 LOG.info("sleep and is not stale done"); 495 496 // But if we ask for stale we will get it 497 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 498 g = new Get(b1); 499 g.setConsistency(Consistency.TIMELINE); 500 r = table.get(g); 501 Assert.assertTrue(r.isStale()); 502 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 503 SlowMeCopro.getPrimaryCdl().get().countDown(); 504 505 LOG.info("stale done"); 506 507 // exists works and is not stale 508 g = new Get(b1); 509 g.setCheckExistenceOnly(true); 510 r = table.get(g); 511 Assert.assertFalse(r.isStale()); 512 Assert.assertTrue(r.getExists()); 513 LOG.info("exists not stale done"); 514 515 // exists works on stale but don't see the put 516 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 517 g = new Get(b1); 518 g.setCheckExistenceOnly(true); 519 g.setConsistency(Consistency.TIMELINE); 520 r = table.get(g); 521 Assert.assertTrue(r.isStale()); 522 Assert.assertFalse("The secondary has stale data", r.getExists()); 523 SlowMeCopro.getPrimaryCdl().get().countDown(); 524 LOG.info("exists stale before flush done"); 525 526 flushRegion(hriPrimary); 527 flushRegion(hriSecondary); 528 LOG.info("flush done"); 529 Thread.sleep(1000 + REFRESH_PERIOD * 2); 530 531 // get works and is not stale 532 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 533 g = new Get(b1); 534 g.setConsistency(Consistency.TIMELINE); 535 r = table.get(g); 536 Assert.assertTrue(r.isStale()); 537 Assert.assertFalse(r.isEmpty()); 538 SlowMeCopro.getPrimaryCdl().get().countDown(); 539 LOG.info("stale done"); 540 541 // exists works on stale and we see the put after the flush 542 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 543 g = new Get(b1); 544 g.setCheckExistenceOnly(true); 545 g.setConsistency(Consistency.TIMELINE); 546 r = table.get(g); 547 Assert.assertTrue(r.isStale()); 548 Assert.assertTrue(r.getExists()); 549 SlowMeCopro.getPrimaryCdl().get().countDown(); 550 LOG.info("exists stale after flush done"); 551 552 } finally { 553 SlowMeCopro.getPrimaryCdl().get().countDown(); 554 SlowMeCopro.sleepTime.set(0); 555 Delete d = new Delete(b1); 556 table.delete(d); 557 closeRegion(hriSecondary); 558 } 559 } 560 561 @Test 562 public void testHedgedRead() throws Exception { 563 byte[] b1 = Bytes.toBytes("testHedgedRead"); 564 openRegion(hriSecondary); 565 566 try { 567 // A simple put works, even if there here a second replica 568 Put p = new Put(b1); 569 p.addColumn(f, b1, b1); 570 table.put(p); 571 LOG.info("Put done"); 572 573 // A get works and is not stale 574 Get g = new Get(b1); 575 Result r = table.get(g); 576 Assert.assertFalse(r.isStale()); 577 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 578 LOG.info("get works and is not stale done"); 579 580 // reset 581 AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection(); 582 Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps(); 583 Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin(); 584 hedgedReadOps.dec(hedgedReadOps.getCount()); 585 hedgedReadWin.dec(hedgedReadWin.getCount()); 586 587 // Wait a little on the main region, just enough to happen once hedged read 588 // and hedged read did not returned faster 589 long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); 590 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not 591 // trigger the hedged read... 592 SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100); 593 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 594 g = new Get(b1); 595 g.setConsistency(Consistency.TIMELINE); 596 r = table.get(g); 597 Assert.assertFalse(r.isStale()); 598 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 599 Assert.assertEquals(1, hedgedReadOps.getCount()); 600 Assert.assertEquals(0, hedgedReadWin.getCount()); 601 SlowMeCopro.sleepTime.set(0); 602 SlowMeCopro.getSecondaryCdl().get().countDown(); 603 LOG.info("hedged read occurred but not faster"); 604 605 // But if we ask for stale we will get it and hedged read returned faster 606 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 607 g = new Get(b1); 608 g.setConsistency(Consistency.TIMELINE); 609 r = table.get(g); 610 Assert.assertTrue(r.isStale()); 611 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 612 Assert.assertEquals(2, hedgedReadOps.getCount()); 613 // we update the metrics after we finish the request so we use a waitFor here, use assert 614 // directly may cause failure if we run too fast. 615 HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1); 616 SlowMeCopro.getPrimaryCdl().get().countDown(); 617 LOG.info("hedged read occurred and faster"); 618 619 } finally { 620 SlowMeCopro.getPrimaryCdl().get().countDown(); 621 SlowMeCopro.getSecondaryCdl().get().countDown(); 622 SlowMeCopro.sleepTime.set(0); 623 Delete d = new Delete(b1); 624 table.delete(d); 625 closeRegion(hriSecondary); 626 } 627 } 628 629 @Test 630 public void testScanMetricsByRegion() throws Exception { 631 byte[] b1 = Bytes.toBytes("testScanMetricsByRegion"); 632 openRegion(hriSecondary); 633 634 try { 635 Put p = new Put(b1); 636 p.addColumn(f, b1, b1); 637 table.put(p); 638 LOG.info("Put done"); 639 flushRegion(hriPrimary); 640 641 // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to secondary replica 642 Thread.sleep(2 * REFRESH_PERIOD); 643 644 // Explicitly read replica 0 645 Scan scan = new Scan(); 646 scan.setEnableScanMetricsByRegion(true); 647 scan.withStartRow(b1, true); 648 scan.withStopRow(b1, true); 649 // Assert row was read from primary replica along with asserting scan metrics by region 650 assertScanMetrics(scan, hriPrimary, false); 651 LOG.info("Scanned primary replica"); 652 653 // Read from region replica 654 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 655 scan = new Scan(); 656 scan.setEnableScanMetricsByRegion(true); 657 scan.withStartRow(b1, true); 658 scan.withStopRow(b1, true); 659 scan.setConsistency(Consistency.TIMELINE); 660 // Assert row was read from secondary replica along with asserting scan metrics by region 661 assertScanMetrics(scan, hriSecondary, true); 662 LOG.info("Scanned secondary replica "); 663 } finally { 664 SlowMeCopro.getPrimaryCdl().get().countDown(); 665 Delete d = new Delete(b1); 666 table.delete(d); 667 closeRegion(hriSecondary); 668 } 669 } 670 671 private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale) 672 throws IOException { 673 try (ResultScanner rs = table.getScanner(scan);) { 674 for (Result r : rs) { 675 Assert.assertEquals(isStale, r.isStale()); 676 Assert.assertFalse(r.isEmpty()); 677 } 678 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 679 rs.getScanMetrics().collectMetricsByRegion(false); 680 Assert.assertEquals(1, scanMetricsByRegion.size()); 681 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 682 .entrySet()) { 683 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 684 Map<String, Long> metrics = entry.getValue(); 685 Assert.assertEquals(rsServerName, scanMetricsRegionInfo.getServerName()); 686 Assert.assertEquals(regionInfo.getEncodedName(), 687 scanMetricsRegionInfo.getEncodedRegionName()); 688 Assert.assertEquals(1, (long) metrics.get(REGIONS_SCANNED_METRIC_NAME)); 689 Assert.assertEquals(1, (long) metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 690 } 691 } 692 } 693}