001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import com.codahale.metrics.Counter;
025import java.io.IOException;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.RegionLocations;
045import org.apache.hadoop.hbase.StartMiniClusterOption;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
050import org.apache.hadoop.hbase.coprocessor.RegionObserver;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.InternalScanner;
053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.zookeeper.KeeperException;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
072
073/**
074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
075 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
076 */
077@Category({LargeTests.class, ClientTests.class})
078public class TestReplicasClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestReplicasClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
085
086  private static TableName TABLE_NAME;
087  private Table table = null;
088  private static final byte[] row = TestReplicasClient.class.getName().getBytes();
089
090  private static RegionInfo hriPrimary;
091  private static RegionInfo hriSecondary;
092
093  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
094  private static final byte[] f = HConstants.CATALOG_FAMILY;
095
096  private final static int REFRESH_PERIOD = 1000;
097
098  /**
099   * This copro is used to synchronize the tests.
100   */
101  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
102    static final AtomicInteger primaryCountOfScan = new AtomicInteger(0);
103    static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0);
104    static final AtomicLong sleepTime = new AtomicLong(0);
105    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
106    static final AtomicInteger countOfNext = new AtomicInteger(0);
107    private static final AtomicReference<CountDownLatch> primaryCdl =
108        new AtomicReference<>(new CountDownLatch(0));
109    private static final AtomicReference<CountDownLatch> secondaryCdl =
110        new AtomicReference<>(new CountDownLatch(0));
111    public SlowMeCopro() {
112    }
113
114    @Override
115    public Optional<RegionObserver> getRegionObserver() {
116      return Optional.of(this);
117    }
118
119    @Override
120    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
121      final List<Cell> results) throws IOException {
122      slowdownCode(e);
123    }
124
125    private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) {
126      LOG.info("==========scan {} ", e.getEnvironment().getRegion().getRegionInfo().getReplicaId(),
127        new Exception());
128      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
129        primaryCountOfScan.incrementAndGet();
130      } else {
131        secondaryCountOfScan.incrementAndGet();
132      }
133    }
134
135    @Override
136    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
137      final Scan scan) throws IOException {
138      incrementScanCount(e);
139      slowdownCode(e);
140    }
141
142    @Override
143    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
144      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
145      throws IOException {
146      incrementScanCount(e);
147      // this will slow down a certain next operation if the conditions are met. The slowness
148      // will allow the call to go to a replica
149      if (slowDownNext.get()) {
150        // have some "next" return successfully from the primary; hence countOfNext checked
151        if (countOfNext.incrementAndGet() == 2) {
152          sleepTime.set(2000);
153          slowdownCode(e);
154        }
155      }
156      return true;
157    }
158
159    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
160      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
161        LOG.info("We're the primary replicas.");
162        CountDownLatch latch = getPrimaryCdl().get();
163        try {
164          if (sleepTime.get() > 0) {
165            LOG.info("Sleeping for " + sleepTime.get() + " ms");
166            Thread.sleep(sleepTime.get());
167          } else if (latch.getCount() > 0) {
168            LOG.info("Waiting for the counterCountDownLatch");
169            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
170            if (latch.getCount() > 0) {
171              throw new RuntimeException("Can't wait more");
172            }
173          }
174        } catch (InterruptedException e1) {
175          LOG.error(e1.toString(), e1);
176        }
177      } else {
178        LOG.info("We're not the primary replicas.");
179        CountDownLatch latch = getSecondaryCdl().get();
180        try {
181          if (latch.getCount() > 0) {
182            LOG.info("Waiting for the secondary counterCountDownLatch");
183            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
184            if (latch.getCount() > 0) {
185              throw new RuntimeException("Can't wait more");
186            }
187          }
188        } catch (InterruptedException e1) {
189          LOG.error(e1.toString(), e1);
190        }
191      }
192    }
193
194    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
195      return primaryCdl;
196    }
197
198    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
199      return secondaryCdl;
200    }
201  }
202
203  @BeforeClass
204  public static void beforeClass() throws Exception {
205    // enable store file refreshing
206    HTU.getConfiguration().setInt(
207        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
208    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
209    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
210    ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
211    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).
212        numAlwaysStandByMasters(1).numMasters(1).build();
213    HTU.startMiniCluster(option);
214
215    // Create table then get the single region for our new table.
216    TABLE_NAME = TableName.valueOf(TestReplicasClient.class.getSimpleName());
217    HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME);
218    hdt.addCoprocessor(SlowMeCopro.class.getName());
219    HTU.createTable(hdt, new byte[][]{f}, null);
220
221    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
222      hriPrimary = locator.getRegionLocation(row, false).getRegion();
223    }
224
225    // mock a secondary region info to open
226    hriSecondary =  RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
227
228    // No master
229    LOG.info("Master is going to be stopped");
230    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
231    Configuration c = new Configuration(HTU.getConfiguration());
232    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
233    LOG.info("Master has stopped");
234  }
235
236  @AfterClass
237  public static void afterClass() throws Exception {
238    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
239    HTU.shutdownMiniCluster();
240  }
241
242  @Before
243  public void before() throws IOException {
244    try {
245      openRegion(hriPrimary);
246    } catch (Exception ignored) {
247    }
248    try {
249      openRegion(hriSecondary);
250    } catch (Exception ignored) {
251    }
252    SlowMeCopro.slowDownNext.set(false);
253    SlowMeCopro.sleepTime.set(0);
254    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
255    SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0));
256    table = HTU.getConnection().getTable(TABLE_NAME);
257    try (ResultScanner scanner = table.getScanner(new Scan())) {
258      for (;;) {
259        Result result = scanner.next();
260        if (result == null) {
261          break;
262        }
263        table.delete(new Delete(result.getRow()));
264      }
265    }
266    flushRegion(hriPrimary);
267    HTU.getConnection().clearRegionLocationCache();
268    SlowMeCopro.primaryCountOfScan.set(0);
269    SlowMeCopro.secondaryCountOfScan.set(0);
270    SlowMeCopro.countOfNext.set(0);
271  }
272
273  @After
274  public void after() throws IOException, KeeperException {
275    SlowMeCopro.getPrimaryCdl().get().countDown();
276    SlowMeCopro.getSecondaryCdl().get().countDown();
277    try {
278      closeRegion(hriSecondary);
279    } catch (Exception ignored) {
280    }
281    try {
282      closeRegion(hriPrimary);
283    } catch (Exception ignored) {
284    }
285    if (table != null) {
286      table.close();
287    }
288    HTU.getConnection().clearRegionLocationCache();
289  }
290
291  private HRegionServer getRS() {
292    return HTU.getMiniHBaseCluster().getRegionServer(0);
293  }
294
295  private void openRegion(RegionInfo hri) throws Exception {
296    try {
297      if (isRegionOpened(hri)) {
298        return;
299      }
300    } catch (Exception e) {
301    }
302    // first version is '0'
303    AdminProtos.OpenRegionRequest orr =
304      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
305    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
306    assertEquals(1, responseOpen.getOpeningStateCount());
307    assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
308      responseOpen.getOpeningState(0));
309    checkRegionIsOpened(hri);
310  }
311
312  private void closeRegion(RegionInfo hri) throws Exception {
313    AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
314      getRS().getServerName(), hri.getRegionName());
315    AdminProtos.CloseRegionResponse responseClose = getRS()
316        .getRSRpcServices().closeRegion(null, crr);
317    assertTrue(responseClose.getClosed());
318
319    checkRegionIsClosed(hri.getEncodedName());
320  }
321
322  private void checkRegionIsOpened(RegionInfo hri) throws Exception {
323    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
324      Thread.sleep(1);
325    }
326  }
327
328  private boolean isRegionOpened(RegionInfo hri) throws Exception {
329    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
330  }
331
332  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
333
334    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
335      Thread.sleep(1);
336    }
337
338    try {
339      assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
340    } catch (NotServingRegionException expected) {
341      // That's how it work: if the region is closed we have an exception.
342    }
343
344    // We don't delete the znode here, because there is not always a znode.
345  }
346
347  private void flushRegion(RegionInfo regionInfo) throws IOException {
348    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
349  }
350
351  @Test
352  public void testUseRegionWithoutReplica() throws Exception {
353    byte[] b1 = "testUseRegionWithoutReplica".getBytes();
354    Get g = new Get(b1);
355    Result r = table.get(g);
356    assertFalse(r.isStale());
357  }
358
359  @Test
360  public void testLocations() throws Exception {
361    byte[] b1 = "testLocations".getBytes();
362    ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
363    hc.clearRegionLocationCache();
364    RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
365    assertEquals(2, rl.size());
366
367    rl = hc.locateRegion(table.getName(), b1, true, false);
368    assertEquals(2, rl.size());
369
370    hc.clearRegionLocationCache();
371    rl = hc.locateRegion(table.getName(), b1, true, false);
372    assertEquals(2, rl.size());
373
374    rl = hc.locateRegion(table.getName(), b1, false, false);
375    assertEquals(2, rl.size());
376  }
377
378  @Test
379  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
380    byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
381    // A get works and is not stale
382    Get g = new Get(b1);
383    Result r = table.get(g);
384    assertFalse(r.isStale());
385  }
386
387  @Test
388  public void testGetNoResultStaleRegionWithReplica() throws Exception {
389    byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
390    openRegion(hriSecondary);
391
392    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
393    Get g = new Get(b1);
394    g.setConsistency(Consistency.TIMELINE);
395    Result r = table.get(g);
396    assertTrue(r.isStale());
397  }
398
399  @Test
400  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
401    byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
402    // We sleep; but we won't go to the stale region as we don't get the stale by default.
403    SlowMeCopro.sleepTime.set(2000);
404    Get g = new Get(b1);
405    Result r = table.get(g);
406    assertFalse(r.isStale());
407  }
408
409  @Test
410  public void testFlushTable() throws Exception {
411    flushRegion(hriPrimary);
412    flushRegion(hriSecondary);
413
414    Put p = new Put(row);
415    p.addColumn(f, row, row);
416    table.put(p);
417
418    flushRegion(hriPrimary);
419    flushRegion(hriSecondary);
420  }
421
422  @Test
423  public void testFlushPrimary() throws Exception {
424    flushRegion(hriPrimary);
425
426    Put p = new Put(row);
427    p.addColumn(f, row, row);
428    table.put(p);
429
430    flushRegion(hriPrimary);
431  }
432
433  @Test
434  public void testFlushSecondary() throws Exception {
435    flushRegion(hriSecondary);
436
437    Put p = new Put(row);
438    p.addColumn(f, row, row);
439    table.put(p);
440
441    flushRegion(hriSecondary);
442  }
443
444  @Test
445  public void testUseRegionWithReplica() throws Exception {
446    byte[] b1 = "testUseRegionWithReplica".getBytes();
447    // A simple put works, even if there here a second replica
448    Put p = new Put(b1);
449    p.addColumn(f, b1, b1);
450    table.put(p);
451    LOG.info("Put done");
452
453    // A get works and is not stale
454    Get g = new Get(b1);
455    Result r = table.get(g);
456    assertFalse(r.isStale());
457    assertFalse(r.getColumnCells(f, b1).isEmpty());
458    LOG.info("get works and is not stale done");
459
460    // Even if it we have to wait a little on the main region
461    SlowMeCopro.sleepTime.set(2000);
462    g = new Get(b1);
463    r = table.get(g);
464    assertFalse(r.isStale());
465    assertFalse(r.getColumnCells(f, b1).isEmpty());
466    SlowMeCopro.sleepTime.set(0);
467    LOG.info("sleep and is not stale done");
468
469    // But if we ask for stale we will get it
470    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
471    g = new Get(b1);
472    g.setConsistency(Consistency.TIMELINE);
473    r = table.get(g);
474    assertTrue(r.isStale());
475    assertTrue(r.getColumnCells(f, b1).isEmpty());
476    SlowMeCopro.getPrimaryCdl().get().countDown();
477
478    LOG.info("stale done");
479
480    // exists works and is not stale
481    g = new Get(b1);
482    g.setCheckExistenceOnly(true);
483    r = table.get(g);
484    assertFalse(r.isStale());
485    assertTrue(r.getExists());
486    LOG.info("exists not stale done");
487
488    // exists works on stale but don't see the put
489    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
490    g = new Get(b1);
491    g.setCheckExistenceOnly(true);
492    g.setConsistency(Consistency.TIMELINE);
493    r = table.get(g);
494    assertTrue(r.isStale());
495    assertFalse("The secondary has stale data", r.getExists());
496    SlowMeCopro.getPrimaryCdl().get().countDown();
497    LOG.info("exists stale before flush done");
498
499    flushRegion(hriPrimary);
500    flushRegion(hriSecondary);
501    LOG.info("flush done");
502    Thread.sleep(1000 + REFRESH_PERIOD * 2);
503
504    // get works and is not stale
505    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
506    g = new Get(b1);
507    g.setConsistency(Consistency.TIMELINE);
508    r = table.get(g);
509    assertTrue(r.isStale());
510    assertFalse(r.isEmpty());
511    SlowMeCopro.getPrimaryCdl().get().countDown();
512    LOG.info("stale done");
513
514    // exists works on stale and we see the put after the flush
515    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
516    g = new Get(b1);
517    g.setCheckExistenceOnly(true);
518    g.setConsistency(Consistency.TIMELINE);
519    r = table.get(g);
520    assertTrue(r.isStale());
521    assertTrue(r.getExists());
522    SlowMeCopro.getPrimaryCdl().get().countDown();
523    LOG.info("exists stale after flush done");
524  }
525
526  @Test
527  public void testHedgedRead() throws Exception {
528    byte[] b1 = "testHedgedRead".getBytes();
529    // A simple put works, even if there here a second replica
530    Put p = new Put(b1);
531    p.addColumn(f, b1, b1);
532    table.put(p);
533    LOG.info("Put done");
534
535    // A get works and is not stale
536    Get g = new Get(b1);
537    Result r = table.get(g);
538    assertFalse(r.isStale());
539    assertFalse(r.getColumnCells(f, b1).isEmpty());
540    LOG.info("get works and is not stale done");
541
542    // reset
543    ClusterConnection connection = (ClusterConnection) HTU.getConnection();
544    Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
545    Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
546    hedgedReadOps.dec(hedgedReadOps.getCount());
547    hedgedReadWin.dec(hedgedReadWin.getCount());
548
549    // Wait a little on the main region, just enough to happen once hedged read
550    // and hedged read did not returned faster
551    int primaryCallTimeoutMicroSecond =
552      connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
553    SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
554    SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
555    g = new Get(b1);
556    g.setConsistency(Consistency.TIMELINE);
557    r = table.get(g);
558    assertFalse(r.isStale());
559    assertFalse(r.getColumnCells(f, b1).isEmpty());
560    assertEquals(1, hedgedReadOps.getCount());
561    assertEquals(0, hedgedReadWin.getCount());
562    SlowMeCopro.sleepTime.set(0);
563    SlowMeCopro.getSecondaryCdl().get().countDown();
564    LOG.info("hedged read occurred but not faster");
565
566    // But if we ask for stale we will get it and hedged read returned faster
567    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
568    g = new Get(b1);
569    g.setConsistency(Consistency.TIMELINE);
570    r = table.get(g);
571    assertTrue(r.isStale());
572    assertTrue(r.getColumnCells(f, b1).isEmpty());
573    assertEquals(2, hedgedReadOps.getCount());
574    assertEquals(1, hedgedReadWin.getCount());
575    SlowMeCopro.getPrimaryCdl().get().countDown();
576    LOG.info("hedged read occurred and faster");
577  }
578
579  @Test
580  public void testScanWithReplicas() throws Exception {
581    //simple scan
582    runMultipleScansOfOneType(false, false);
583  }
584
585  @Test
586  public void testSmallScanWithReplicas() throws Exception {
587    //small scan
588    runMultipleScansOfOneType(false, true);
589  }
590
591  @Test
592  public void testReverseScanWithReplicas() throws Exception {
593    //reverse scan
594    runMultipleScansOfOneType(true, false);
595  }
596
597  @Test
598  public void testCancelOfScan() throws Exception {
599    int numRows = 100;
600    for (int i = 0; i < numRows; i++) {
601      byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
602      Put p = new Put(b1);
603      p.addColumn(f, b1, b1);
604      table.put(p);
605    }
606    LOG.debug("PUT done");
607    int caching = 20;
608    byte[] start;
609    start = Bytes.toBytes("testUseRegionWithReplica" + 0);
610
611    flushRegion(hriPrimary);
612    LOG.info("flush done");
613    Thread.sleep(1000 + REFRESH_PERIOD * 2);
614
615    // now make some 'next' calls slow
616    SlowMeCopro.slowDownNext.set(true);
617    SlowMeCopro.countOfNext.set(0);
618    SlowMeCopro.sleepTime.set(5000);
619
620    Scan scan = new Scan().withStartRow(start);
621    scan.setCaching(caching);
622    scan.setConsistency(Consistency.TIMELINE);
623    ResultScanner scanner = table.getScanner(scan);
624    Iterator<Result> iter = scanner.iterator();
625    iter.next();
626    assertTrue(((ClientScanner) scanner).isAnyRPCcancelled());
627    SlowMeCopro.slowDownNext.set(false);
628    SlowMeCopro.countOfNext.set(0);
629  }
630
631  // make sure the scan will only go to the specific replica
632  @Test
633  public void testScanOnSpecificReplica() throws Exception {
634    Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE);
635    try (ResultScanner scanner = table.getScanner(scan)) {
636      scanner.next();
637    }
638    assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
639    assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
640  }
641
642  // make sure the scan will only go to the specific replica
643  @Test
644  public void testReverseScanOnSpecificReplica() throws Exception {
645    Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE);
646    try (ResultScanner scanner = table.getScanner(scan)) {
647      scanner.next();
648    }
649    assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
650    assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
651  }
652
653  private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
654    int numRows = 100;
655    int numCols = 10;
656    for (int i = 0; i < numRows; i++) {
657      byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
658      for (int col = 0; col < numCols; col++) {
659        Put p = new Put(b1);
660        String qualifier = "qualifer" + col;
661        KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
662        p.add(kv);
663        table.put(p);
664      }
665    }
666    LOG.debug("PUT done");
667    int caching = 20;
668    long maxResultSize = Long.MAX_VALUE;
669
670    byte[] start;
671    if (reversed) {
672      start = Bytes.toBytes("testUseRegionWithReplica" + (numRows - 1));
673    } else {
674      start = Bytes.toBytes("testUseRegionWithReplica" + 0);
675    }
676
677    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
678      numCols, false, false);
679
680    // Even if we were to slow the server down, unless we ask for stale
681    // we won't get it
682    SlowMeCopro.sleepTime.set(5000);
683    scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
684      numCols, false, false);
685    SlowMeCopro.sleepTime.set(0);
686
687    flushRegion(hriPrimary);
688    LOG.info("flush done");
689    Thread.sleep(1000 + REFRESH_PERIOD * 2);
690
691    // Now set the flag to get a response even if stale
692    SlowMeCopro.sleepTime.set(5000);
693    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
694      numCols, true, false);
695    SlowMeCopro.sleepTime.set(0);
696
697    // now make some 'next' calls slow
698    SlowMeCopro.slowDownNext.set(true);
699    SlowMeCopro.countOfNext.set(0);
700    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
701      numCols, true, true);
702    SlowMeCopro.slowDownNext.set(false);
703    SlowMeCopro.countOfNext.set(0);
704
705    // Make sure we do not get stale data..
706    SlowMeCopro.sleepTime.set(5000);
707    scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
708      numCols, false, false);
709    SlowMeCopro.sleepTime.set(0);
710
711    // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
712    // returned from the server before the replica switch occurs.
713    maxResultSize = 1;
714    SlowMeCopro.slowDownNext.set(true);
715    SlowMeCopro.countOfNext.set(0);
716    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
717      numCols, true, true);
718    maxResultSize = Long.MAX_VALUE;
719    SlowMeCopro.slowDownNext.set(false);
720    SlowMeCopro.countOfNext.set(0);
721  }
722
723  private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
724      int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
725      boolean staleExpected, boolean slowNext)
726          throws Exception {
727    Scan scan = new Scan().withStartRow(startRow);
728    scan.setCaching(caching);
729    scan.setMaxResultSize(maxResultSize);
730    scan.setReversed(reversed);
731    scan.setSmall(small);
732    scan.setConsistency(consistency);
733    ResultScanner scanner = table.getScanner(scan);
734    Iterator<Result> iter = scanner.iterator();
735
736    // Maps of row keys that we have seen so far
737    HashMap<String, Boolean> map = new HashMap<>();
738
739    // Tracked metrics
740    int rowCount = 0;
741    int cellCount = 0;
742    int countOfStale = 0;
743
744    while (iter.hasNext()) {
745      rowCount++;
746      Result r = iter.next();
747      String row = new String(r.getRow());
748
749      if (map.containsKey(row)) {
750        throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
751      }
752
753      map.put(row, true);
754      cellCount += r.rawCells().length;
755
756      if (!slowNext) {
757        assertTrue(r.isStale() == staleExpected);
758      }
759      if (r.isStale()) {
760        countOfStale++;
761      }
762    }
763    assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
764      rowCount == numRows);
765    assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
766      cellCount == (numRows * numCols));
767
768    if (slowNext) {
769      LOG.debug("Count of Stale " + countOfStale);
770      assertTrue(countOfStale > 1);
771
772      // If the scan was configured in such a way that a full row was NOT retrieved before the
773      // replica switch occurred, then it is possible that all rows were stale
774      if (maxResultSize != Long.MAX_VALUE) {
775        assertTrue(countOfStale <= numRows);
776      } else {
777        assertTrue(countOfStale < numRows);
778      }
779    }
780  }
781}