001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
021import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import com.codahale.metrics.Counter;
028import java.io.IOException;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import java.util.concurrent.atomic.AtomicReference;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.StartTestingClusterOption;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotFoundException;
048import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
052import org.apache.hadoop.hbase.coprocessor.RegionObserver;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.regionserver.InternalScanner;
055import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
056import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.zookeeper.KeeperException;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.AfterEach;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
073
074/**
075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
076 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
077 */
078@Tag(LargeTests.TAG)
079@Tag(ClientTests.TAG)
080@SuppressWarnings("deprecation")
081public class TestReplicasClient {
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
084
085  private static TableName TABLE_NAME;
086  private Table table = null;
087  private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());;
088
089  private static RegionInfo hriPrimary;
090  private static RegionInfo hriSecondary;
091
092  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
093  private static final byte[] f = HConstants.CATALOG_FAMILY;
094
095  private final static int REFRESH_PERIOD = 1000;
096  private static ServerName rsServerName;
097
098  /**
099   * This copro is used to synchronize the tests.
100   */
101  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
102    static final AtomicLong sleepTime = new AtomicLong(0);
103    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
104    static final AtomicInteger countOfNext = new AtomicInteger(0);
105    private static final AtomicReference<CountDownLatch> primaryCdl =
106      new AtomicReference<>(new CountDownLatch(0));
107    private static final AtomicReference<CountDownLatch> secondaryCdl =
108      new AtomicReference<>(new CountDownLatch(0));
109
110    public SlowMeCopro() {
111    }
112
113    @Override
114    public Optional<RegionObserver> getRegionObserver() {
115      return Optional.of(this);
116    }
117
118    @Override
119    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
120      final Get get, final List<Cell> results) throws IOException {
121      slowdownCode(e);
122    }
123
124    @Override
125    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
126      final Scan scan) throws IOException {
127      slowdownCode(e);
128    }
129
130    @Override
131    public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
132      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
133      throws IOException {
134      // this will slow down a certain next operation if the conditions are met. The slowness
135      // will allow the call to go to a replica
136      if (slowDownNext.get()) {
137        // have some "next" return successfully from the primary; hence countOfNext checked
138        if (countOfNext.incrementAndGet() == 2) {
139          sleepTime.set(2000);
140          slowdownCode(e);
141        }
142      }
143      return true;
144    }
145
146    private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) {
147      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
148        LOG.info("We're the primary replicas.");
149        CountDownLatch latch = getPrimaryCdl().get();
150        try {
151          if (sleepTime.get() > 0) {
152            LOG.info("Sleeping for " + sleepTime.get() + " ms");
153            Thread.sleep(sleepTime.get());
154          } else if (latch.getCount() > 0) {
155            LOG.info("Waiting for the counterCountDownLatch");
156            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
157            if (latch.getCount() > 0) {
158              throw new RuntimeException("Can't wait more");
159            }
160          }
161        } catch (InterruptedException e1) {
162          LOG.error(e1.toString(), e1);
163        }
164      } else {
165        LOG.info("We're not the primary replicas.");
166        CountDownLatch latch = getSecondaryCdl().get();
167        try {
168          if (latch.getCount() > 0) {
169            LOG.info("Waiting for the secondary counterCountDownLatch");
170            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
171            if (latch.getCount() > 0) {
172              throw new RuntimeException("Can't wait more");
173            }
174          }
175        } catch (InterruptedException e1) {
176          LOG.error(e1.toString(), e1);
177        }
178      }
179    }
180
181    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
182      return primaryCdl;
183    }
184
185    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
186      return secondaryCdl;
187    }
188  }
189
190  @BeforeAll
191  public static void beforeClass() throws Exception {
192    // enable store file refreshing
193    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
194      REFRESH_PERIOD);
195    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
196    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
197    StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1)
198      .numAlwaysStandByMasters(1).numMasters(1).build();
199    HTU.startMiniCluster(option);
200
201    // Create table then get the single region for our new table.
202    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
203      TableName.valueOf(TestReplicasClient.class.getSimpleName()),
204      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
205      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
206    builder.setCoprocessor(SlowMeCopro.class.getName());
207    TableDescriptor hdt = builder.build();
208    HTU.createTable(hdt, new byte[][] { f }, null);
209    TABLE_NAME = hdt.getTableName();
210    try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
211      hriPrimary = locator.getRegionLocation(row, false).getRegion();
212    }
213
214    // mock a secondary region info to open
215    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
216
217    // No master
218    LOG.info("Master is going to be stopped");
219    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
220    Configuration c = new Configuration(HTU.getConfiguration());
221    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
222    LOG.info("Master has stopped");
223
224    rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
225    assertNotNull(rsServerName);
226  }
227
228  @AfterAll
229  public static void afterClass() throws Exception {
230    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
231    HTU.shutdownMiniCluster();
232  }
233
234  @BeforeEach
235  public void before() throws IOException {
236    HTU.getConnection().clearRegionLocationCache();
237    try {
238      openRegion(hriPrimary);
239    } catch (Exception ignored) {
240    }
241    try {
242      openRegion(hriSecondary);
243    } catch (Exception ignored) {
244    }
245    table = HTU.getConnection().getTable(TABLE_NAME);
246  }
247
248  @AfterEach
249  public void after() throws IOException, KeeperException {
250    try {
251      closeRegion(hriSecondary);
252    } catch (Exception ignored) {
253    }
254    try {
255      closeRegion(hriPrimary);
256    } catch (Exception ignored) {
257    }
258    HTU.getConnection().clearRegionLocationCache();
259  }
260
261  private HRegionServer getRS() {
262    return HTU.getMiniHBaseCluster().getRegionServer(0);
263  }
264
265  private void openRegion(RegionInfo hri) throws Exception {
266    try {
267      if (isRegionOpened(hri)) return;
268    } catch (Exception e) {
269    }
270    // first version is '0'
271    AdminProtos.OpenRegionRequest orr =
272      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
273    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
274    assertEquals(1, responseOpen.getOpeningStateCount());
275    assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
276      responseOpen.getOpeningState(0));
277    checkRegionIsOpened(hri);
278  }
279
280  private void closeRegion(RegionInfo hri) throws Exception {
281    AdminProtos.CloseRegionRequest crr =
282      ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName());
283    AdminProtos.CloseRegionResponse responseClose =
284      getRS().getRSRpcServices().closeRegion(null, crr);
285    assertTrue(responseClose.getClosed());
286
287    checkRegionIsClosed(hri.getEncodedName());
288  }
289
290  private void checkRegionIsOpened(RegionInfo hri) throws Exception {
291    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
292      Thread.sleep(1);
293    }
294  }
295
296  private boolean isRegionOpened(RegionInfo hri) throws Exception {
297    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
298  }
299
300  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
301
302    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
303      Thread.sleep(1);
304    }
305
306    try {
307      assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
308    } catch (NotServingRegionException expected) {
309      // That's how it work: if the region is closed we have an exception.
310    }
311
312    // We don't delete the znode here, because there is not always a znode.
313  }
314
315  private void flushRegion(RegionInfo regionInfo) throws IOException {
316    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
317  }
318
319  @Test
320  public void testUseRegionWithoutReplica() throws Exception {
321    byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica");
322    openRegion(hriSecondary);
323    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
324    try {
325      Get g = new Get(b1);
326      Result r = table.get(g);
327      assertFalse(r.isStale());
328    } finally {
329      closeRegion(hriSecondary);
330    }
331  }
332
333  @Test
334  public void testLocations() throws Exception {
335    byte[] b1 = Bytes.toBytes("testLocations");
336    openRegion(hriSecondary);
337
338    try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
339      RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
340      conn.clearRegionLocationCache();
341      List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
342      assertEquals(2, rl.size());
343
344      rl = locator.getRegionLocations(b1, false);
345      assertEquals(2, rl.size());
346
347      conn.clearRegionLocationCache();
348      rl = locator.getRegionLocations(b1, false);
349      assertEquals(2, rl.size());
350
351      rl = locator.getRegionLocations(b1, true);
352      assertEquals(2, rl.size());
353    } finally {
354      closeRegion(hriSecondary);
355    }
356  }
357
358  @Test
359  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
360    byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica");
361    openRegion(hriSecondary);
362
363    try {
364      // A get works and is not stale
365      Get g = new Get(b1);
366      Result r = table.get(g);
367      assertFalse(r.isStale());
368    } finally {
369      closeRegion(hriSecondary);
370    }
371  }
372
373  @Test
374  public void testGetNoResultStaleRegionWithReplica() throws Exception {
375    byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica");
376    openRegion(hriSecondary);
377
378    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
379    try {
380      Get g = new Get(b1);
381      g.setConsistency(Consistency.TIMELINE);
382      Result r = table.get(g);
383      assertTrue(r.isStale());
384    } finally {
385      SlowMeCopro.getPrimaryCdl().get().countDown();
386      closeRegion(hriSecondary);
387    }
388  }
389
390  @Test
391  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
392    byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
393    openRegion(hriSecondary);
394
395    try {
396      // We sleep; but we won't go to the stale region as we don't get the stale by default.
397      SlowMeCopro.sleepTime.set(2000);
398      Get g = new Get(b1);
399      Result r = table.get(g);
400      assertFalse(r.isStale());
401
402    } finally {
403      SlowMeCopro.sleepTime.set(0);
404      closeRegion(hriSecondary);
405    }
406  }
407
408  @Test
409  public void testFlushTable() throws Exception {
410    openRegion(hriSecondary);
411    try {
412      flushRegion(hriPrimary);
413      flushRegion(hriSecondary);
414
415      Put p = new Put(row);
416      p.addColumn(f, row, row);
417      table.put(p);
418
419      flushRegion(hriPrimary);
420      flushRegion(hriSecondary);
421    } finally {
422      Delete d = new Delete(row);
423      table.delete(d);
424      closeRegion(hriSecondary);
425    }
426  }
427
428  @Test
429  public void testFlushPrimary() throws Exception {
430    openRegion(hriSecondary);
431
432    try {
433      flushRegion(hriPrimary);
434
435      Put p = new Put(row);
436      p.addColumn(f, row, row);
437      table.put(p);
438
439      flushRegion(hriPrimary);
440    } finally {
441      Delete d = new Delete(row);
442      table.delete(d);
443      closeRegion(hriSecondary);
444    }
445  }
446
447  @Test
448  public void testFlushSecondary() throws Exception {
449    openRegion(hriSecondary);
450    try {
451      flushRegion(hriSecondary);
452
453      Put p = new Put(row);
454      p.addColumn(f, row, row);
455      table.put(p);
456
457      flushRegion(hriSecondary);
458    } catch (TableNotFoundException expected) {
459    } finally {
460      Delete d = new Delete(row);
461      table.delete(d);
462      closeRegion(hriSecondary);
463    }
464  }
465
466  @Test
467  public void testUseRegionWithReplica() throws Exception {
468    byte[] b1 = Bytes.toBytes("testUseRegionWithReplica");
469    openRegion(hriSecondary);
470
471    try {
472      // A simple put works, even if there here a second replica
473      Put p = new Put(b1);
474      p.addColumn(f, b1, b1);
475      table.put(p);
476      LOG.info("Put done");
477
478      // A get works and is not stale
479      Get g = new Get(b1);
480      Result r = table.get(g);
481      assertFalse(r.isStale());
482      assertFalse(r.getColumnCells(f, b1).isEmpty());
483      LOG.info("get works and is not stale done");
484
485      // Even if it we have to wait a little on the main region
486      SlowMeCopro.sleepTime.set(2000);
487      g = new Get(b1);
488      r = table.get(g);
489      assertFalse(r.isStale());
490      assertFalse(r.getColumnCells(f, b1).isEmpty());
491      SlowMeCopro.sleepTime.set(0);
492      LOG.info("sleep and is not stale done");
493
494      // But if we ask for stale we will get it
495      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
496      g = new Get(b1);
497      g.setConsistency(Consistency.TIMELINE);
498      r = table.get(g);
499      assertTrue(r.isStale());
500      assertTrue(r.getColumnCells(f, b1).isEmpty());
501      SlowMeCopro.getPrimaryCdl().get().countDown();
502
503      LOG.info("stale done");
504
505      // exists works and is not stale
506      g = new Get(b1);
507      g.setCheckExistenceOnly(true);
508      r = table.get(g);
509      assertFalse(r.isStale());
510      assertTrue(r.getExists());
511      LOG.info("exists not stale done");
512
513      // exists works on stale but don't see the put
514      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
515      g = new Get(b1);
516      g.setCheckExistenceOnly(true);
517      g.setConsistency(Consistency.TIMELINE);
518      r = table.get(g);
519      assertTrue(r.isStale());
520      assertFalse(r.getExists(), "The secondary has stale data");
521      SlowMeCopro.getPrimaryCdl().get().countDown();
522      LOG.info("exists stale before flush done");
523
524      flushRegion(hriPrimary);
525      flushRegion(hriSecondary);
526      LOG.info("flush done");
527      Thread.sleep(1000 + REFRESH_PERIOD * 2);
528
529      // get works and is not stale
530      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
531      g = new Get(b1);
532      g.setConsistency(Consistency.TIMELINE);
533      r = table.get(g);
534      assertTrue(r.isStale());
535      assertFalse(r.isEmpty());
536      SlowMeCopro.getPrimaryCdl().get().countDown();
537      LOG.info("stale done");
538
539      // exists works on stale and we see the put after the flush
540      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
541      g = new Get(b1);
542      g.setCheckExistenceOnly(true);
543      g.setConsistency(Consistency.TIMELINE);
544      r = table.get(g);
545      assertTrue(r.isStale());
546      assertTrue(r.getExists());
547      SlowMeCopro.getPrimaryCdl().get().countDown();
548      LOG.info("exists stale after flush done");
549
550    } finally {
551      SlowMeCopro.getPrimaryCdl().get().countDown();
552      SlowMeCopro.sleepTime.set(0);
553      Delete d = new Delete(b1);
554      table.delete(d);
555      closeRegion(hriSecondary);
556    }
557  }
558
559  @Test
560  public void testHedgedRead() throws Exception {
561    byte[] b1 = Bytes.toBytes("testHedgedRead");
562    openRegion(hriSecondary);
563
564    try {
565      // A simple put works, even if there here a second replica
566      Put p = new Put(b1);
567      p.addColumn(f, b1, b1);
568      table.put(p);
569      LOG.info("Put done");
570
571      // A get works and is not stale
572      Get g = new Get(b1);
573      Result r = table.get(g);
574      assertFalse(r.isStale());
575      assertFalse(r.getColumnCells(f, b1).isEmpty());
576      LOG.info("get works and is not stale done");
577
578      // reset
579      AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
580      Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps();
581      Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin();
582      hedgedReadOps.dec(hedgedReadOps.getCount());
583      hedgedReadWin.dec(hedgedReadWin.getCount());
584
585      // Wait a little on the main region, just enough to happen once hedged read
586      // and hedged read did not returned faster
587      long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
588      // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
589      // trigger the hedged read...
590      SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100);
591      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
592      g = new Get(b1);
593      g.setConsistency(Consistency.TIMELINE);
594      r = table.get(g);
595      assertFalse(r.isStale());
596      assertFalse(r.getColumnCells(f, b1).isEmpty());
597      assertEquals(1, hedgedReadOps.getCount());
598      assertEquals(0, hedgedReadWin.getCount());
599      SlowMeCopro.sleepTime.set(0);
600      SlowMeCopro.getSecondaryCdl().get().countDown();
601      LOG.info("hedged read occurred but not faster");
602
603      // But if we ask for stale we will get it and hedged read returned faster
604      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
605      g = new Get(b1);
606      g.setConsistency(Consistency.TIMELINE);
607      r = table.get(g);
608      assertTrue(r.isStale());
609      assertTrue(r.getColumnCells(f, b1).isEmpty());
610      assertEquals(2, hedgedReadOps.getCount());
611      // we update the metrics after we finish the request so we use a waitFor here, use assert
612      // directly may cause failure if we run too fast.
613      HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1);
614      SlowMeCopro.getPrimaryCdl().get().countDown();
615      LOG.info("hedged read occurred and faster");
616
617    } finally {
618      SlowMeCopro.getPrimaryCdl().get().countDown();
619      SlowMeCopro.getSecondaryCdl().get().countDown();
620      SlowMeCopro.sleepTime.set(0);
621      Delete d = new Delete(b1);
622      table.delete(d);
623      closeRegion(hriSecondary);
624    }
625  }
626
627  @Test
628  public void testScanMetricsByRegion() throws Exception {
629    byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
630    openRegion(hriSecondary);
631
632    try {
633      Put p = new Put(b1);
634      p.addColumn(f, b1, b1);
635      table.put(p);
636      LOG.info("Put done");
637      flushRegion(hriPrimary);
638
639      // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to secondary replica
640      Thread.sleep(2 * REFRESH_PERIOD);
641
642      // Explicitly read replica 0
643      Scan scan = new Scan();
644      scan.setEnableScanMetricsByRegion(true);
645      scan.withStartRow(b1, true);
646      scan.withStopRow(b1, true);
647      // Assert row was read from primary replica along with asserting scan metrics by region
648      assertScanMetrics(scan, hriPrimary, false);
649      LOG.info("Scanned primary replica");
650
651      // Read from region replica
652      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
653      scan = new Scan();
654      scan.setEnableScanMetricsByRegion(true);
655      scan.withStartRow(b1, true);
656      scan.withStopRow(b1, true);
657      scan.setConsistency(Consistency.TIMELINE);
658      // Assert row was read from secondary replica along with asserting scan metrics by region
659      assertScanMetrics(scan, hriSecondary, true);
660      LOG.info("Scanned secondary replica ");
661    } finally {
662      SlowMeCopro.getPrimaryCdl().get().countDown();
663      Delete d = new Delete(b1);
664      table.delete(d);
665      closeRegion(hriSecondary);
666    }
667  }
668
669  private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale)
670    throws IOException {
671    try (ResultScanner rs = table.getScanner(scan);) {
672      for (Result r : rs) {
673        assertEquals(isStale, r.isStale());
674        assertFalse(r.isEmpty());
675      }
676      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
677        rs.getScanMetrics().collectMetricsByRegion(false);
678      assertEquals(1, scanMetricsByRegion.size());
679      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
680        .entrySet()) {
681        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
682        Map<String, Long> metrics = entry.getValue();
683        assertEquals(rsServerName, scanMetricsRegionInfo.getServerName());
684        assertEquals(regionInfo.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName());
685        assertEquals(1, (long) metrics.get(REGIONS_SCANNED_METRIC_NAME));
686        assertEquals(1, (long) metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
687      }
688    }
689  }
690}