001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
021import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
022
023import com.codahale.metrics.Counter;
024import java.io.IOException;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.StartTestingClusterOption;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.InternalScanner;
052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.zookeeper.KeeperException;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
072
073/**
074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
075 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
076 */
077@Category({ LargeTests.class, ClientTests.class })
078@SuppressWarnings("deprecation")
079public class TestReplicasClient {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestReplicasClient.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
086
087  private static TableName TABLE_NAME;
088  private Table table = null;
089  private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());;
090
091  private static RegionInfo hriPrimary;
092  private static RegionInfo hriSecondary;
093
094  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
095  private static final byte[] f = HConstants.CATALOG_FAMILY;
096
097  private final static int REFRESH_PERIOD = 1000;
098  private static ServerName rsServerName;
099
100  /**
101   * This copro is used to synchronize the tests.
102   */
103  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
104    static final AtomicLong sleepTime = new AtomicLong(0);
105    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
106    static final AtomicInteger countOfNext = new AtomicInteger(0);
107    private static final AtomicReference<CountDownLatch> primaryCdl =
108      new AtomicReference<>(new CountDownLatch(0));
109    private static final AtomicReference<CountDownLatch> secondaryCdl =
110      new AtomicReference<>(new CountDownLatch(0));
111
112    public SlowMeCopro() {
113    }
114
115    @Override
116    public Optional<RegionObserver> getRegionObserver() {
117      return Optional.of(this);
118    }
119
120    @Override
121    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
122      final Get get, final List<Cell> results) throws IOException {
123      slowdownCode(e);
124    }
125
126    @Override
127    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
128      final Scan scan) throws IOException {
129      slowdownCode(e);
130    }
131
132    @Override
133    public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
134      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
135      throws IOException {
136      // this will slow down a certain next operation if the conditions are met. The slowness
137      // will allow the call to go to a replica
138      if (slowDownNext.get()) {
139        // have some "next" return successfully from the primary; hence countOfNext checked
140        if (countOfNext.incrementAndGet() == 2) {
141          sleepTime.set(2000);
142          slowdownCode(e);
143        }
144      }
145      return true;
146    }
147
148    private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) {
149      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
150        LOG.info("We're the primary replicas.");
151        CountDownLatch latch = getPrimaryCdl().get();
152        try {
153          if (sleepTime.get() > 0) {
154            LOG.info("Sleeping for " + sleepTime.get() + " ms");
155            Thread.sleep(sleepTime.get());
156          } else if (latch.getCount() > 0) {
157            LOG.info("Waiting for the counterCountDownLatch");
158            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
159            if (latch.getCount() > 0) {
160              throw new RuntimeException("Can't wait more");
161            }
162          }
163        } catch (InterruptedException e1) {
164          LOG.error(e1.toString(), e1);
165        }
166      } else {
167        LOG.info("We're not the primary replicas.");
168        CountDownLatch latch = getSecondaryCdl().get();
169        try {
170          if (latch.getCount() > 0) {
171            LOG.info("Waiting for the secondary counterCountDownLatch");
172            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
173            if (latch.getCount() > 0) {
174              throw new RuntimeException("Can't wait more");
175            }
176          }
177        } catch (InterruptedException e1) {
178          LOG.error(e1.toString(), e1);
179        }
180      }
181    }
182
183    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
184      return primaryCdl;
185    }
186
187    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
188      return secondaryCdl;
189    }
190  }
191
192  @BeforeClass
193  public static void beforeClass() throws Exception {
194    // enable store file refreshing
195    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
196      REFRESH_PERIOD);
197    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
198    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
199    StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1)
200      .numAlwaysStandByMasters(1).numMasters(1).build();
201    HTU.startMiniCluster(option);
202
203    // Create table then get the single region for our new table.
204    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
205      TableName.valueOf(TestReplicasClient.class.getSimpleName()),
206      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
207      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
208    builder.setCoprocessor(SlowMeCopro.class.getName());
209    TableDescriptor hdt = builder.build();
210    HTU.createTable(hdt, new byte[][] { f }, null);
211    TABLE_NAME = hdt.getTableName();
212    try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
213      hriPrimary = locator.getRegionLocation(row, false).getRegion();
214    }
215
216    // mock a secondary region info to open
217    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
218
219    // No master
220    LOG.info("Master is going to be stopped");
221    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
222    Configuration c = new Configuration(HTU.getConfiguration());
223    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
224    LOG.info("Master has stopped");
225
226    rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
227    Assert.assertNotNull(rsServerName);
228  }
229
230  @AfterClass
231  public static void afterClass() throws Exception {
232    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
233    HTU.shutdownMiniCluster();
234  }
235
236  @Before
237  public void before() throws IOException {
238    HTU.getConnection().clearRegionLocationCache();
239    try {
240      openRegion(hriPrimary);
241    } catch (Exception ignored) {
242    }
243    try {
244      openRegion(hriSecondary);
245    } catch (Exception ignored) {
246    }
247    table = HTU.getConnection().getTable(TABLE_NAME);
248  }
249
250  @After
251  public void after() throws IOException, KeeperException {
252    try {
253      closeRegion(hriSecondary);
254    } catch (Exception ignored) {
255    }
256    try {
257      closeRegion(hriPrimary);
258    } catch (Exception ignored) {
259    }
260    HTU.getConnection().clearRegionLocationCache();
261  }
262
263  private HRegionServer getRS() {
264    return HTU.getMiniHBaseCluster().getRegionServer(0);
265  }
266
267  private void openRegion(RegionInfo hri) throws Exception {
268    try {
269      if (isRegionOpened(hri)) return;
270    } catch (Exception e) {
271    }
272    // first version is '0'
273    AdminProtos.OpenRegionRequest orr =
274      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
275    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
276    Assert.assertEquals(1, responseOpen.getOpeningStateCount());
277    Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
278      responseOpen.getOpeningState(0));
279    checkRegionIsOpened(hri);
280  }
281
282  private void closeRegion(RegionInfo hri) throws Exception {
283    AdminProtos.CloseRegionRequest crr =
284      ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName());
285    AdminProtos.CloseRegionResponse responseClose =
286      getRS().getRSRpcServices().closeRegion(null, crr);
287    Assert.assertTrue(responseClose.getClosed());
288
289    checkRegionIsClosed(hri.getEncodedName());
290  }
291
292  private void checkRegionIsOpened(RegionInfo hri) throws Exception {
293    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
294      Thread.sleep(1);
295    }
296  }
297
298  private boolean isRegionOpened(RegionInfo hri) throws Exception {
299    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
300  }
301
302  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
303
304    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
305      Thread.sleep(1);
306    }
307
308    try {
309      Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
310    } catch (NotServingRegionException expected) {
311      // That's how it work: if the region is closed we have an exception.
312    }
313
314    // We don't delete the znode here, because there is not always a znode.
315  }
316
317  private void flushRegion(RegionInfo regionInfo) throws IOException {
318    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
319  }
320
321  @Test
322  public void testUseRegionWithoutReplica() throws Exception {
323    byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica");
324    openRegion(hriSecondary);
325    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
326    try {
327      Get g = new Get(b1);
328      Result r = table.get(g);
329      Assert.assertFalse(r.isStale());
330    } finally {
331      closeRegion(hriSecondary);
332    }
333  }
334
335  @Test
336  public void testLocations() throws Exception {
337    byte[] b1 = Bytes.toBytes("testLocations");
338    openRegion(hriSecondary);
339
340    try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
341      RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
342      conn.clearRegionLocationCache();
343      List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
344      Assert.assertEquals(2, rl.size());
345
346      rl = locator.getRegionLocations(b1, false);
347      Assert.assertEquals(2, rl.size());
348
349      conn.clearRegionLocationCache();
350      rl = locator.getRegionLocations(b1, false);
351      Assert.assertEquals(2, rl.size());
352
353      rl = locator.getRegionLocations(b1, true);
354      Assert.assertEquals(2, rl.size());
355    } finally {
356      closeRegion(hriSecondary);
357    }
358  }
359
360  @Test
361  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
362    byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica");
363    openRegion(hriSecondary);
364
365    try {
366      // A get works and is not stale
367      Get g = new Get(b1);
368      Result r = table.get(g);
369      Assert.assertFalse(r.isStale());
370    } finally {
371      closeRegion(hriSecondary);
372    }
373  }
374
375  @Test
376  public void testGetNoResultStaleRegionWithReplica() throws Exception {
377    byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica");
378    openRegion(hriSecondary);
379
380    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
381    try {
382      Get g = new Get(b1);
383      g.setConsistency(Consistency.TIMELINE);
384      Result r = table.get(g);
385      Assert.assertTrue(r.isStale());
386    } finally {
387      SlowMeCopro.getPrimaryCdl().get().countDown();
388      closeRegion(hriSecondary);
389    }
390  }
391
392  @Test
393  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
394    byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
395    openRegion(hriSecondary);
396
397    try {
398      // We sleep; but we won't go to the stale region as we don't get the stale by default.
399      SlowMeCopro.sleepTime.set(2000);
400      Get g = new Get(b1);
401      Result r = table.get(g);
402      Assert.assertFalse(r.isStale());
403
404    } finally {
405      SlowMeCopro.sleepTime.set(0);
406      closeRegion(hriSecondary);
407    }
408  }
409
410  @Test
411  public void testFlushTable() throws Exception {
412    openRegion(hriSecondary);
413    try {
414      flushRegion(hriPrimary);
415      flushRegion(hriSecondary);
416
417      Put p = new Put(row);
418      p.addColumn(f, row, row);
419      table.put(p);
420
421      flushRegion(hriPrimary);
422      flushRegion(hriSecondary);
423    } finally {
424      Delete d = new Delete(row);
425      table.delete(d);
426      closeRegion(hriSecondary);
427    }
428  }
429
430  @Test
431  public void testFlushPrimary() throws Exception {
432    openRegion(hriSecondary);
433
434    try {
435      flushRegion(hriPrimary);
436
437      Put p = new Put(row);
438      p.addColumn(f, row, row);
439      table.put(p);
440
441      flushRegion(hriPrimary);
442    } finally {
443      Delete d = new Delete(row);
444      table.delete(d);
445      closeRegion(hriSecondary);
446    }
447  }
448
449  @Test
450  public void testFlushSecondary() throws Exception {
451    openRegion(hriSecondary);
452    try {
453      flushRegion(hriSecondary);
454
455      Put p = new Put(row);
456      p.addColumn(f, row, row);
457      table.put(p);
458
459      flushRegion(hriSecondary);
460    } catch (TableNotFoundException expected) {
461    } finally {
462      Delete d = new Delete(row);
463      table.delete(d);
464      closeRegion(hriSecondary);
465    }
466  }
467
468  @Test
469  public void testUseRegionWithReplica() throws Exception {
470    byte[] b1 = Bytes.toBytes("testUseRegionWithReplica");
471    openRegion(hriSecondary);
472
473    try {
474      // A simple put works, even if there here a second replica
475      Put p = new Put(b1);
476      p.addColumn(f, b1, b1);
477      table.put(p);
478      LOG.info("Put done");
479
480      // A get works and is not stale
481      Get g = new Get(b1);
482      Result r = table.get(g);
483      Assert.assertFalse(r.isStale());
484      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
485      LOG.info("get works and is not stale done");
486
487      // Even if it we have to wait a little on the main region
488      SlowMeCopro.sleepTime.set(2000);
489      g = new Get(b1);
490      r = table.get(g);
491      Assert.assertFalse(r.isStale());
492      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
493      SlowMeCopro.sleepTime.set(0);
494      LOG.info("sleep and is not stale done");
495
496      // But if we ask for stale we will get it
497      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
498      g = new Get(b1);
499      g.setConsistency(Consistency.TIMELINE);
500      r = table.get(g);
501      Assert.assertTrue(r.isStale());
502      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
503      SlowMeCopro.getPrimaryCdl().get().countDown();
504
505      LOG.info("stale done");
506
507      // exists works and is not stale
508      g = new Get(b1);
509      g.setCheckExistenceOnly(true);
510      r = table.get(g);
511      Assert.assertFalse(r.isStale());
512      Assert.assertTrue(r.getExists());
513      LOG.info("exists not stale done");
514
515      // exists works on stale but don't see the put
516      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
517      g = new Get(b1);
518      g.setCheckExistenceOnly(true);
519      g.setConsistency(Consistency.TIMELINE);
520      r = table.get(g);
521      Assert.assertTrue(r.isStale());
522      Assert.assertFalse("The secondary has stale data", r.getExists());
523      SlowMeCopro.getPrimaryCdl().get().countDown();
524      LOG.info("exists stale before flush done");
525
526      flushRegion(hriPrimary);
527      flushRegion(hriSecondary);
528      LOG.info("flush done");
529      Thread.sleep(1000 + REFRESH_PERIOD * 2);
530
531      // get works and is not stale
532      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
533      g = new Get(b1);
534      g.setConsistency(Consistency.TIMELINE);
535      r = table.get(g);
536      Assert.assertTrue(r.isStale());
537      Assert.assertFalse(r.isEmpty());
538      SlowMeCopro.getPrimaryCdl().get().countDown();
539      LOG.info("stale done");
540
541      // exists works on stale and we see the put after the flush
542      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
543      g = new Get(b1);
544      g.setCheckExistenceOnly(true);
545      g.setConsistency(Consistency.TIMELINE);
546      r = table.get(g);
547      Assert.assertTrue(r.isStale());
548      Assert.assertTrue(r.getExists());
549      SlowMeCopro.getPrimaryCdl().get().countDown();
550      LOG.info("exists stale after flush done");
551
552    } finally {
553      SlowMeCopro.getPrimaryCdl().get().countDown();
554      SlowMeCopro.sleepTime.set(0);
555      Delete d = new Delete(b1);
556      table.delete(d);
557      closeRegion(hriSecondary);
558    }
559  }
560
561  @Test
562  public void testHedgedRead() throws Exception {
563    byte[] b1 = Bytes.toBytes("testHedgedRead");
564    openRegion(hriSecondary);
565
566    try {
567      // A simple put works, even if there here a second replica
568      Put p = new Put(b1);
569      p.addColumn(f, b1, b1);
570      table.put(p);
571      LOG.info("Put done");
572
573      // A get works and is not stale
574      Get g = new Get(b1);
575      Result r = table.get(g);
576      Assert.assertFalse(r.isStale());
577      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
578      LOG.info("get works and is not stale done");
579
580      // reset
581      AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
582      Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps();
583      Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin();
584      hedgedReadOps.dec(hedgedReadOps.getCount());
585      hedgedReadWin.dec(hedgedReadWin.getCount());
586
587      // Wait a little on the main region, just enough to happen once hedged read
588      // and hedged read did not returned faster
589      long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
590      // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
591      // trigger the hedged read...
592      SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100);
593      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
594      g = new Get(b1);
595      g.setConsistency(Consistency.TIMELINE);
596      r = table.get(g);
597      Assert.assertFalse(r.isStale());
598      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
599      Assert.assertEquals(1, hedgedReadOps.getCount());
600      Assert.assertEquals(0, hedgedReadWin.getCount());
601      SlowMeCopro.sleepTime.set(0);
602      SlowMeCopro.getSecondaryCdl().get().countDown();
603      LOG.info("hedged read occurred but not faster");
604
605      // But if we ask for stale we will get it and hedged read returned faster
606      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
607      g = new Get(b1);
608      g.setConsistency(Consistency.TIMELINE);
609      r = table.get(g);
610      Assert.assertTrue(r.isStale());
611      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
612      Assert.assertEquals(2, hedgedReadOps.getCount());
613      // we update the metrics after we finish the request so we use a waitFor here, use assert
614      // directly may cause failure if we run too fast.
615      HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1);
616      SlowMeCopro.getPrimaryCdl().get().countDown();
617      LOG.info("hedged read occurred and faster");
618
619    } finally {
620      SlowMeCopro.getPrimaryCdl().get().countDown();
621      SlowMeCopro.getSecondaryCdl().get().countDown();
622      SlowMeCopro.sleepTime.set(0);
623      Delete d = new Delete(b1);
624      table.delete(d);
625      closeRegion(hriSecondary);
626    }
627  }
628
629  @Test
630  public void testScanMetricsByRegion() throws Exception {
631    byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
632    openRegion(hriSecondary);
633
634    try {
635      Put p = new Put(b1);
636      p.addColumn(f, b1, b1);
637      table.put(p);
638      LOG.info("Put done");
639      flushRegion(hriPrimary);
640
641      // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to secondary replica
642      Thread.sleep(2 * REFRESH_PERIOD);
643
644      // Explicitly read replica 0
645      Scan scan = new Scan();
646      scan.setEnableScanMetricsByRegion(true);
647      scan.withStartRow(b1, true);
648      scan.withStopRow(b1, true);
649      // Assert row was read from primary replica along with asserting scan metrics by region
650      assertScanMetrics(scan, hriPrimary, false);
651      LOG.info("Scanned primary replica");
652
653      // Read from region replica
654      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
655      scan = new Scan();
656      scan.setEnableScanMetricsByRegion(true);
657      scan.withStartRow(b1, true);
658      scan.withStopRow(b1, true);
659      scan.setConsistency(Consistency.TIMELINE);
660      // Assert row was read from secondary replica along with asserting scan metrics by region
661      assertScanMetrics(scan, hriSecondary, true);
662      LOG.info("Scanned secondary replica ");
663    } finally {
664      SlowMeCopro.getPrimaryCdl().get().countDown();
665      Delete d = new Delete(b1);
666      table.delete(d);
667      closeRegion(hriSecondary);
668    }
669  }
670
671  private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale)
672    throws IOException {
673    try (ResultScanner rs = table.getScanner(scan);) {
674      for (Result r : rs) {
675        Assert.assertEquals(isStale, r.isStale());
676        Assert.assertFalse(r.isEmpty());
677      }
678      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
679        rs.getScanMetrics().collectMetricsByRegion(false);
680      Assert.assertEquals(1, scanMetricsByRegion.size());
681      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
682        .entrySet()) {
683        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
684        Map<String, Long> metrics = entry.getValue();
685        Assert.assertEquals(rsServerName, scanMetricsRegionInfo.getServerName());
686        Assert.assertEquals(regionInfo.getEncodedName(),
687          scanMetricsRegionInfo.getEncodedRegionName());
688        Assert.assertEquals(1, (long) metrics.get(REGIONS_SCANNED_METRIC_NAME));
689        Assert.assertEquals(1, (long) metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
690      }
691    }
692  }
693}