001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.codahale.metrics.Counter;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Optional;
027import java.util.Random;
028import java.util.Set;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionInfo;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.RegionLocations;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.InternalScanner;
052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.zookeeper.KeeperException;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Ignore;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
073
074/**
075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
076 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
077 */
078@Category({MediumTests.class, ClientTests.class})
079@SuppressWarnings("deprecation")
080public class TestReplicasClient {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084      HBaseClassTestRule.forClass(TestReplicasClient.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
087
088  private static final int NB_SERVERS = 1;
089  private static Table table = null;
090  private static final byte[] row = TestReplicasClient.class.getName().getBytes();
091
092  private static HRegionInfo hriPrimary;
093  private static HRegionInfo hriSecondary;
094
095  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
096  private static final byte[] f = HConstants.CATALOG_FAMILY;
097
098  private final static int REFRESH_PERIOD = 1000;
099
100  /**
101   * This copro is used to synchronize the tests.
102   */
103  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
104    static final AtomicLong sleepTime = new AtomicLong(0);
105    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
106    static final AtomicInteger countOfNext = new AtomicInteger(0);
107    private static final AtomicReference<CountDownLatch> primaryCdl =
108        new AtomicReference<>(new CountDownLatch(0));
109    private static final AtomicReference<CountDownLatch> secondaryCdl =
110        new AtomicReference<>(new CountDownLatch(0));
111    Random r = new Random();
112    public SlowMeCopro() {
113    }
114
115    @Override
116    public Optional<RegionObserver> getRegionObserver() {
117      return Optional.of(this);
118    }
119
120    @Override
121    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
122                         final Get get, final List<Cell> results) throws IOException {
123      slowdownCode(e);
124    }
125
126    @Override
127    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
128        final Scan scan) throws IOException {
129      slowdownCode(e);
130    }
131
132    @Override
133    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
134        final InternalScanner s, final List<Result> results,
135        final int limit, final boolean hasMore) throws IOException {
136      //this will slow down a certain next operation if the conditions are met. The slowness
137      //will allow the call to go to a replica
138      if (slowDownNext.get()) {
139        //have some "next" return successfully from the primary; hence countOfNext checked
140        if (countOfNext.incrementAndGet() == 2) {
141          sleepTime.set(2000);
142          slowdownCode(e);
143        }
144      }
145      return true;
146    }
147
148    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
149      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
150        LOG.info("We're the primary replicas.");
151        CountDownLatch latch = getPrimaryCdl().get();
152        try {
153          if (sleepTime.get() > 0) {
154            LOG.info("Sleeping for " + sleepTime.get() + " ms");
155            Thread.sleep(sleepTime.get());
156          } else if (latch.getCount() > 0) {
157            LOG.info("Waiting for the counterCountDownLatch");
158            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
159            if (latch.getCount() > 0) {
160              throw new RuntimeException("Can't wait more");
161            }
162          }
163        } catch (InterruptedException e1) {
164          LOG.error(e1.toString(), e1);
165        }
166      } else {
167        LOG.info("We're not the primary replicas.");
168        CountDownLatch latch = getSecondaryCdl().get();
169        try {
170          if (latch.getCount() > 0) {
171            LOG.info("Waiting for the secondary counterCountDownLatch");
172            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
173            if (latch.getCount() > 0) {
174              throw new RuntimeException("Can't wait more");
175            }
176          }
177        } catch (InterruptedException e1) {
178          LOG.error(e1.toString(), e1);
179        }
180      }
181    }
182
183    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
184      return primaryCdl;
185    }
186
187    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
188      return secondaryCdl;
189    }
190  }
191
192  @BeforeClass
193  public static void beforeClass() throws Exception {
194    // enable store file refreshing
195    HTU.getConfiguration().setInt(
196        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
197    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
198    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
199    ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
200    HTU.startMiniCluster(NB_SERVERS);
201
202    // Create table then get the single region for our new table.
203    HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
204    hdt.addCoprocessor(SlowMeCopro.class.getName());
205    table = HTU.createTable(hdt, new byte[][]{f}, null);
206
207    try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
208      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
209    }
210
211    // mock a secondary region info to open
212    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
213        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
214
215    // No master
216    LOG.info("Master is going to be stopped");
217    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
218    Configuration c = new Configuration(HTU.getConfiguration());
219    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
220    LOG.info("Master has stopped");
221  }
222
223  @AfterClass
224  public static void afterClass() throws Exception {
225    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
226    if (table != null) table.close();
227    HTU.shutdownMiniCluster();
228  }
229
230  @Before
231  public void before() throws IOException {
232    ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache();
233    try {
234      openRegion(hriPrimary);
235    } catch (Exception ignored) {
236    }
237    try {
238      openRegion(hriSecondary);
239    } catch (Exception ignored) {
240    }
241  }
242
243  @After
244  public void after() throws IOException, KeeperException {
245    try {
246      closeRegion(hriSecondary);
247    } catch (Exception ignored) {
248    }
249    try {
250      closeRegion(hriPrimary);
251    } catch (Exception ignored) {
252    }
253
254    ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache();
255  }
256
257  private HRegionServer getRS() {
258    return HTU.getMiniHBaseCluster().getRegionServer(0);
259  }
260
261  private void openRegion(HRegionInfo hri) throws Exception {
262    try {
263      if (isRegionOpened(hri)) return;
264    } catch (Exception e){}
265    // first version is '0'
266    AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
267      getRS().getServerName(), hri, null);
268    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
269    Assert.assertEquals(1, responseOpen.getOpeningStateCount());
270    Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
271        responseOpen.getOpeningState(0));
272    checkRegionIsOpened(hri);
273  }
274
275  private void closeRegion(HRegionInfo hri) throws Exception {
276    AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
277      getRS().getServerName(), hri.getRegionName());
278    AdminProtos.CloseRegionResponse responseClose = getRS()
279        .getRSRpcServices().closeRegion(null, crr);
280    Assert.assertTrue(responseClose.getClosed());
281
282    checkRegionIsClosed(hri.getEncodedName());
283  }
284
285  private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
286    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
287      Thread.sleep(1);
288    }
289  }
290
291  private boolean isRegionOpened(HRegionInfo hri) throws Exception {
292    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
293  }
294
295  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
296
297    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
298      Thread.sleep(1);
299    }
300
301    try {
302      Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
303    } catch (NotServingRegionException expected) {
304      // That's how it work: if the region is closed we have an exception.
305    }
306
307    // We don't delete the znode here, because there is not always a znode.
308  }
309
310  private void flushRegion(HRegionInfo regionInfo) throws IOException {
311    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
312  }
313
314  @Test
315  public void testUseRegionWithoutReplica() throws Exception {
316    byte[] b1 = "testUseRegionWithoutReplica".getBytes();
317    openRegion(hriSecondary);
318    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
319    try {
320      Get g = new Get(b1);
321      Result r = table.get(g);
322      Assert.assertFalse(r.isStale());
323    } finally {
324      closeRegion(hriSecondary);
325    }
326  }
327
328  @Test
329  public void testLocations() throws Exception {
330    byte[] b1 = "testLocations".getBytes();
331    openRegion(hriSecondary);
332    ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
333
334    try {
335      hc.clearRegionCache();
336      RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
337      Assert.assertEquals(2, rl.size());
338
339      rl = hc.locateRegion(table.getName(), b1, true, false);
340      Assert.assertEquals(2, rl.size());
341
342      hc.clearRegionCache();
343      rl = hc.locateRegion(table.getName(), b1, true, false);
344      Assert.assertEquals(2, rl.size());
345
346      rl = hc.locateRegion(table.getName(), b1, false, false);
347      Assert.assertEquals(2, rl.size());
348    } finally {
349      closeRegion(hriSecondary);
350    }
351  }
352
353  @Test
354  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
355    byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
356    openRegion(hriSecondary);
357
358    try {
359      // A get works and is not stale
360      Get g = new Get(b1);
361      Result r = table.get(g);
362      Assert.assertFalse(r.isStale());
363    } finally {
364      closeRegion(hriSecondary);
365    }
366  }
367
368
369  @Test
370  public void testGetNoResultStaleRegionWithReplica() throws Exception {
371    byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
372    openRegion(hriSecondary);
373
374    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
375    try {
376      Get g = new Get(b1);
377      g.setConsistency(Consistency.TIMELINE);
378      Result r = table.get(g);
379      Assert.assertTrue(r.isStale());
380    } finally {
381      SlowMeCopro.getPrimaryCdl().get().countDown();
382      closeRegion(hriSecondary);
383    }
384  }
385
386  @Test
387  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
388    byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
389    openRegion(hriSecondary);
390
391    try {
392      // We sleep; but we won't go to the stale region as we don't get the stale by default.
393      SlowMeCopro.sleepTime.set(2000);
394      Get g = new Get(b1);
395      Result r = table.get(g);
396      Assert.assertFalse(r.isStale());
397
398    } finally {
399      SlowMeCopro.sleepTime.set(0);
400      closeRegion(hriSecondary);
401    }
402  }
403
404  @Test
405  public void testFlushTable() throws Exception {
406    openRegion(hriSecondary);
407    try {
408      flushRegion(hriPrimary);
409      flushRegion(hriSecondary);
410
411      Put p = new Put(row);
412      p.addColumn(f, row, row);
413      table.put(p);
414
415      flushRegion(hriPrimary);
416      flushRegion(hriSecondary);
417    } finally {
418      Delete d = new Delete(row);
419      table.delete(d);
420      closeRegion(hriSecondary);
421    }
422  }
423
424  @Test
425  public void testFlushPrimary() throws Exception {
426    openRegion(hriSecondary);
427
428    try {
429      flushRegion(hriPrimary);
430
431      Put p = new Put(row);
432      p.addColumn(f, row, row);
433      table.put(p);
434
435      flushRegion(hriPrimary);
436    } finally {
437      Delete d = new Delete(row);
438      table.delete(d);
439      closeRegion(hriSecondary);
440    }
441  }
442
443  @Test
444  public void testFlushSecondary() throws Exception {
445    openRegion(hriSecondary);
446    try {
447      flushRegion(hriSecondary);
448
449      Put p = new Put(row);
450      p.addColumn(f, row, row);
451      table.put(p);
452
453      flushRegion(hriSecondary);
454    } catch (TableNotFoundException expected) {
455    } finally {
456      Delete d = new Delete(row);
457      table.delete(d);
458      closeRegion(hriSecondary);
459    }
460  }
461
462  @Test
463  public void testUseRegionWithReplica() throws Exception {
464    byte[] b1 = "testUseRegionWithReplica".getBytes();
465    openRegion(hriSecondary);
466
467    try {
468      // A simple put works, even if there here a second replica
469      Put p = new Put(b1);
470      p.addColumn(f, b1, b1);
471      table.put(p);
472      LOG.info("Put done");
473
474      // A get works and is not stale
475      Get g = new Get(b1);
476      Result r = table.get(g);
477      Assert.assertFalse(r.isStale());
478      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
479      LOG.info("get works and is not stale done");
480
481      // Even if it we have to wait a little on the main region
482      SlowMeCopro.sleepTime.set(2000);
483      g = new Get(b1);
484      r = table.get(g);
485      Assert.assertFalse(r.isStale());
486      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
487      SlowMeCopro.sleepTime.set(0);
488      LOG.info("sleep and is not stale done");
489
490      // But if we ask for stale we will get it
491      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
492      g = new Get(b1);
493      g.setConsistency(Consistency.TIMELINE);
494      r = table.get(g);
495      Assert.assertTrue(r.isStale());
496      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
497      SlowMeCopro.getPrimaryCdl().get().countDown();
498
499      LOG.info("stale done");
500
501      // exists works and is not stale
502      g = new Get(b1);
503      g.setCheckExistenceOnly(true);
504      r = table.get(g);
505      Assert.assertFalse(r.isStale());
506      Assert.assertTrue(r.getExists());
507      LOG.info("exists not stale done");
508
509      // exists works on stale but don't see the put
510      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
511      g = new Get(b1);
512      g.setCheckExistenceOnly(true);
513      g.setConsistency(Consistency.TIMELINE);
514      r = table.get(g);
515      Assert.assertTrue(r.isStale());
516      Assert.assertFalse("The secondary has stale data", r.getExists());
517      SlowMeCopro.getPrimaryCdl().get().countDown();
518      LOG.info("exists stale before flush done");
519
520      flushRegion(hriPrimary);
521      flushRegion(hriSecondary);
522      LOG.info("flush done");
523      Thread.sleep(1000 + REFRESH_PERIOD * 2);
524
525      // get works and is not stale
526      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
527      g = new Get(b1);
528      g.setConsistency(Consistency.TIMELINE);
529      r = table.get(g);
530      Assert.assertTrue(r.isStale());
531      Assert.assertFalse(r.isEmpty());
532      SlowMeCopro.getPrimaryCdl().get().countDown();
533      LOG.info("stale done");
534
535      // exists works on stale and we see the put after the flush
536      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
537      g = new Get(b1);
538      g.setCheckExistenceOnly(true);
539      g.setConsistency(Consistency.TIMELINE);
540      r = table.get(g);
541      Assert.assertTrue(r.isStale());
542      Assert.assertTrue(r.getExists());
543      SlowMeCopro.getPrimaryCdl().get().countDown();
544      LOG.info("exists stale after flush done");
545
546    } finally {
547      SlowMeCopro.getPrimaryCdl().get().countDown();
548      SlowMeCopro.sleepTime.set(0);
549      Delete d = new Delete(b1);
550      table.delete(d);
551      closeRegion(hriSecondary);
552    }
553  }
554
555  @Test
556  public void testHedgedRead() throws Exception {
557    byte[] b1 = "testHedgedRead".getBytes();
558    openRegion(hriSecondary);
559
560    try {
561      // A simple put works, even if there here a second replica
562      Put p = new Put(b1);
563      p.addColumn(f, b1, b1);
564      table.put(p);
565      LOG.info("Put done");
566
567      // A get works and is not stale
568      Get g = new Get(b1);
569      Result r = table.get(g);
570      Assert.assertFalse(r.isStale());
571      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
572      LOG.info("get works and is not stale done");
573
574      //reset
575      ClusterConnection connection = (ClusterConnection) HTU.getConnection();
576      Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
577      Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
578      hedgedReadOps.dec(hedgedReadOps.getCount());
579      hedgedReadWin.dec(hedgedReadWin.getCount());
580
581      // Wait a little on the main region, just enough to happen once hedged read
582      // and hedged read did not returned faster
583      int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
584      SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
585      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
586      g = new Get(b1);
587      g.setConsistency(Consistency.TIMELINE);
588      r = table.get(g);
589      Assert.assertFalse(r.isStale());
590      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
591      Assert.assertEquals(1, hedgedReadOps.getCount());
592      Assert.assertEquals(0, hedgedReadWin.getCount());
593      SlowMeCopro.sleepTime.set(0);
594      SlowMeCopro.getSecondaryCdl().get().countDown();
595      LOG.info("hedged read occurred but not faster");
596
597
598      // But if we ask for stale we will get it and hedged read returned faster
599      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
600      g = new Get(b1);
601      g.setConsistency(Consistency.TIMELINE);
602      r = table.get(g);
603      Assert.assertTrue(r.isStale());
604      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
605      Assert.assertEquals(2, hedgedReadOps.getCount());
606      Assert.assertEquals(1, hedgedReadWin.getCount());
607      SlowMeCopro.getPrimaryCdl().get().countDown();
608      LOG.info("hedged read occurred and faster");
609
610    } finally {
611      SlowMeCopro.getPrimaryCdl().get().countDown();
612      SlowMeCopro.getSecondaryCdl().get().countDown();
613      SlowMeCopro.sleepTime.set(0);
614      Delete d = new Delete(b1);
615      table.delete(d);
616      closeRegion(hriSecondary);
617    }
618  }
619
620  @Ignore // Disabled because it is flakey. Fails 17% on constrained GCE. %3 on Apache.
621  @Test
622  public void testCancelOfMultiGet() throws Exception {
623    openRegion(hriSecondary);
624    try {
625      List<Put> puts = new ArrayList<>(2);
626      byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
627      Put p = new Put(b1);
628      p.addColumn(f, b1, b1);
629      puts.add(p);
630
631      byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
632      p = new Put(b2);
633      p.addColumn(f, b2, b2);
634      puts.add(p);
635      table.put(puts);
636      LOG.debug("PUT done");
637      flushRegion(hriPrimary);
638      LOG.info("flush done");
639
640      Thread.sleep(1000 + REFRESH_PERIOD * 2);
641
642      AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess();
643
644      // Make primary slowdown
645      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
646
647      List<Get> gets = new ArrayList<>();
648      Get g = new Get(b1);
649      g.setCheckExistenceOnly(true);
650      g.setConsistency(Consistency.TIMELINE);
651      gets.add(g);
652      g = new Get(b2);
653      g.setCheckExistenceOnly(true);
654      g.setConsistency(Consistency.TIMELINE);
655      gets.add(g);
656      Object[] results = new Object[2];
657
658      int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
659      int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
660      AsyncProcessTask task = AsyncProcessTask.newBuilder()
661              .setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
662              .setTableName(table.getName())
663              .setRowAccess(gets)
664              .setResults(results)
665              .setOperationTimeout(operationTimeout)
666              .setRpcTimeout(readTimeout)
667              .build();
668      AsyncRequestFuture reqs = ap.submit(task);
669      reqs.waitUntilDone();
670      // verify we got the right results back
671      for (Object r : results) {
672        Assert.assertTrue(((Result)r).isStale());
673        Assert.assertTrue(((Result)r).getExists());
674      }
675      Set<CancellableRegionServerCallable> set =
676          ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
677      // verify we did cancel unneeded calls
678      Assert.assertTrue(!set.isEmpty());
679      for (CancellableRegionServerCallable m : set) {
680        Assert.assertTrue(m.isCancelled());
681      }
682    } finally {
683      SlowMeCopro.getPrimaryCdl().get().countDown();
684      SlowMeCopro.sleepTime.set(0);
685      SlowMeCopro.slowDownNext.set(false);
686      SlowMeCopro.countOfNext.set(0);
687      for (int i = 0; i < 2; i++) {
688        byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
689        Delete d = new Delete(b1);
690        table.delete(d);
691      }
692      closeRegion(hriSecondary);
693    }
694  }
695
696  @Test
697  public void testScanWithReplicas() throws Exception {
698    //simple scan
699    runMultipleScansOfOneType(false, false);
700  }
701
702  @Test
703  public void testSmallScanWithReplicas() throws Exception {
704    //small scan
705    runMultipleScansOfOneType(false, true);
706  }
707
708  @Test
709  public void testReverseScanWithReplicas() throws Exception {
710    //reverse scan
711    runMultipleScansOfOneType(true, false);
712  }
713
714  @Test
715  public void testCancelOfScan() throws Exception {
716    openRegion(hriSecondary);
717    int NUMROWS = 100;
718    try {
719      for (int i = 0; i < NUMROWS; i++) {
720        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
721        Put p = new Put(b1);
722        p.addColumn(f, b1, b1);
723        table.put(p);
724      }
725      LOG.debug("PUT done");
726      int caching = 20;
727      byte[] start;
728      start = Bytes.toBytes("testUseRegionWithReplica" + 0);
729
730      flushRegion(hriPrimary);
731      LOG.info("flush done");
732      Thread.sleep(1000 + REFRESH_PERIOD * 2);
733
734      // now make some 'next' calls slow
735      SlowMeCopro.slowDownNext.set(true);
736      SlowMeCopro.countOfNext.set(0);
737      SlowMeCopro.sleepTime.set(5000);
738
739      Scan scan = new Scan(start);
740      scan.setCaching(caching);
741      scan.setConsistency(Consistency.TIMELINE);
742      ResultScanner scanner = table.getScanner(scan);
743      Iterator<Result> iter = scanner.iterator();
744      iter.next();
745      Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
746      SlowMeCopro.slowDownNext.set(false);
747      SlowMeCopro.countOfNext.set(0);
748    } finally {
749      SlowMeCopro.getPrimaryCdl().get().countDown();
750      SlowMeCopro.sleepTime.set(0);
751      SlowMeCopro.slowDownNext.set(false);
752      SlowMeCopro.countOfNext.set(0);
753      for (int i = 0; i < NUMROWS; i++) {
754        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
755        Delete d = new Delete(b1);
756        table.delete(d);
757      }
758      closeRegion(hriSecondary);
759    }
760  }
761
762  private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
763    openRegion(hriSecondary);
764    int NUMROWS = 100;
765    int NUMCOLS = 10;
766    try {
767      for (int i = 0; i < NUMROWS; i++) {
768        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
769        for (int col = 0; col < NUMCOLS; col++) {
770          Put p = new Put(b1);
771          String qualifier = "qualifer" + col;
772          KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
773          p.add(kv);
774          table.put(p);
775        }
776      }
777      LOG.debug("PUT done");
778      int caching = 20;
779      long maxResultSize = Long.MAX_VALUE;
780
781      byte[] start;
782      if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
783      else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
784
785      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
786        start, NUMROWS, NUMCOLS, false, false);
787
788      // Even if we were to slow the server down, unless we ask for stale
789      // we won't get it
790      SlowMeCopro.sleepTime.set(5000);
791      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
792        NUMCOLS, false, false);
793      SlowMeCopro.sleepTime.set(0);
794
795      flushRegion(hriPrimary);
796      LOG.info("flush done");
797      Thread.sleep(1000 + REFRESH_PERIOD * 2);
798
799      //Now set the flag to get a response even if stale
800      SlowMeCopro.sleepTime.set(5000);
801      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
802        start, NUMROWS, NUMCOLS, true, false);
803      SlowMeCopro.sleepTime.set(0);
804
805      // now make some 'next' calls slow
806      SlowMeCopro.slowDownNext.set(true);
807      SlowMeCopro.countOfNext.set(0);
808      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
809        NUMROWS, NUMCOLS, true, true);
810      SlowMeCopro.slowDownNext.set(false);
811      SlowMeCopro.countOfNext.set(0);
812
813      // Make sure we do not get stale data..
814      SlowMeCopro.sleepTime.set(5000);
815      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
816        start, NUMROWS, NUMCOLS, false, false);
817      SlowMeCopro.sleepTime.set(0);
818
819      // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
820      // returned from the server before the replica switch occurs.
821      maxResultSize = 1;
822      SlowMeCopro.slowDownNext.set(true);
823      SlowMeCopro.countOfNext.set(0);
824      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
825        NUMROWS, NUMCOLS, true, true);
826      maxResultSize = Long.MAX_VALUE;
827      SlowMeCopro.slowDownNext.set(false);
828      SlowMeCopro.countOfNext.set(0);
829    } finally {
830      SlowMeCopro.getPrimaryCdl().get().countDown();
831      SlowMeCopro.sleepTime.set(0);
832      SlowMeCopro.slowDownNext.set(false);
833      SlowMeCopro.countOfNext.set(0);
834      for (int i = 0; i < NUMROWS; i++) {
835        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
836        Delete d = new Delete(b1);
837        table.delete(d);
838      }
839      closeRegion(hriSecondary);
840    }
841  }
842
843  private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
844      int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
845      boolean staleExpected, boolean slowNext)
846          throws Exception {
847    Scan scan = new Scan(startRow);
848    scan.setCaching(caching);
849    scan.setMaxResultSize(maxResultSize);
850    scan.setReversed(reversed);
851    scan.setSmall(small);
852    scan.setConsistency(consistency);
853    ResultScanner scanner = table.getScanner(scan);
854    Iterator<Result> iter = scanner.iterator();
855
856    // Maps of row keys that we have seen so far
857    HashMap<String, Boolean> map = new HashMap<>();
858
859    // Tracked metrics
860    int rowCount = 0;
861    int cellCount = 0;
862    int countOfStale = 0;
863
864    while (iter.hasNext()) {
865      rowCount++;
866      Result r = iter.next();
867      String row = new String(r.getRow());
868
869      if (map.containsKey(row)) {
870        throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
871      }
872
873      map.put(row, true);
874
875      for (Cell cell : r.rawCells()) {
876        cellCount++;
877      }
878
879      if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
880      if (r.isStale()) countOfStale++;
881    }
882    Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
883      rowCount == numRows);
884    Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
885      cellCount == (numRows * numCols));
886
887    if (slowNext) {
888      LOG.debug("Count of Stale " + countOfStale);
889      Assert.assertTrue(countOfStale > 1);
890
891      // If the scan was configured in such a way that a full row was NOT retrieved before the
892      // replica switch occurred, then it is possible that all rows were stale
893      if (maxResultSize != Long.MAX_VALUE) {
894        Assert.assertTrue(countOfStale <= numRows);
895      } else {
896        Assert.assertTrue(countOfStale < numRows);
897      }
898    }
899  }
900}