001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.codahale.metrics.Counter;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.RegionLocations;
044import org.apache.hadoop.hbase.StartMiniClusterOption;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.InternalScanner;
052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.zookeeper.KeeperException;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Ignore;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
073
074/**
075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
076 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
077 */
078@Category({LargeTests.class, ClientTests.class})
079@SuppressWarnings("deprecation")
080public class TestReplicasClient {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084      HBaseClassTestRule.forClass(TestReplicasClient.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
087
088  private static final int NB_SERVERS = 1;
089  private static Table table = null;
090  private static final byte[] row = TestReplicasClient.class.getName().getBytes();
091
092  private static HRegionInfo hriPrimary;
093  private static HRegionInfo hriSecondary;
094
095  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
096  private static final byte[] f = HConstants.CATALOG_FAMILY;
097
098  private final static int REFRESH_PERIOD = 1000;
099
100  /**
101   * This copro is used to synchronize the tests.
102   */
103  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
104    static final AtomicLong sleepTime = new AtomicLong(0);
105    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
106    static final AtomicInteger countOfNext = new AtomicInteger(0);
107    private static final AtomicReference<CountDownLatch> primaryCdl =
108        new AtomicReference<>(new CountDownLatch(0));
109    private static final AtomicReference<CountDownLatch> secondaryCdl =
110        new AtomicReference<>(new CountDownLatch(0));
111    public SlowMeCopro() {
112    }
113
114    @Override
115    public Optional<RegionObserver> getRegionObserver() {
116      return Optional.of(this);
117    }
118
119    @Override
120    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
121                         final Get get, final List<Cell> results) throws IOException {
122      slowdownCode(e);
123    }
124
125    @Override
126    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
127        final Scan scan) throws IOException {
128      slowdownCode(e);
129    }
130
131    @Override
132    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
133        final InternalScanner s, final List<Result> results,
134        final int limit, final boolean hasMore) throws IOException {
135      //this will slow down a certain next operation if the conditions are met. The slowness
136      //will allow the call to go to a replica
137      if (slowDownNext.get()) {
138        //have some "next" return successfully from the primary; hence countOfNext checked
139        if (countOfNext.incrementAndGet() == 2) {
140          sleepTime.set(2000);
141          slowdownCode(e);
142        }
143      }
144      return true;
145    }
146
147    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
148      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
149        LOG.info("We're the primary replicas.");
150        CountDownLatch latch = getPrimaryCdl().get();
151        try {
152          if (sleepTime.get() > 0) {
153            LOG.info("Sleeping for " + sleepTime.get() + " ms");
154            Thread.sleep(sleepTime.get());
155          } else if (latch.getCount() > 0) {
156            LOG.info("Waiting for the counterCountDownLatch");
157            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
158            if (latch.getCount() > 0) {
159              throw new RuntimeException("Can't wait more");
160            }
161          }
162        } catch (InterruptedException e1) {
163          LOG.error(e1.toString(), e1);
164        }
165      } else {
166        LOG.info("We're not the primary replicas.");
167        CountDownLatch latch = getSecondaryCdl().get();
168        try {
169          if (latch.getCount() > 0) {
170            LOG.info("Waiting for the secondary counterCountDownLatch");
171            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
172            if (latch.getCount() > 0) {
173              throw new RuntimeException("Can't wait more");
174            }
175          }
176        } catch (InterruptedException e1) {
177          LOG.error(e1.toString(), e1);
178        }
179      }
180    }
181
182    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
183      return primaryCdl;
184    }
185
186    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
187      return secondaryCdl;
188    }
189  }
190
191  @BeforeClass
192  public static void beforeClass() throws Exception {
193    // enable store file refreshing
194    HTU.getConfiguration().setInt(
195        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
196    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
197    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
198    ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
199    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).
200        numAlwaysStandByMasters(1).numMasters(1).build();
201    HTU.startMiniCluster(option);
202
203    // Create table then get the single region for our new table.
204    HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
205    hdt.addCoprocessor(SlowMeCopro.class.getName());
206    table = HTU.createTable(hdt, new byte[][]{f}, null);
207
208    try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
209      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
210    }
211
212    // mock a secondary region info to open
213    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
214        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
215
216    // No master
217    LOG.info("Master is going to be stopped");
218    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
219    Configuration c = new Configuration(HTU.getConfiguration());
220    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
221    LOG.info("Master has stopped");
222  }
223
224  @AfterClass
225  public static void afterClass() throws Exception {
226    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
227    if (table != null) table.close();
228    HTU.shutdownMiniCluster();
229  }
230
231  @Before
232  public void before() throws IOException {
233    ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
234    try {
235      openRegion(hriPrimary);
236    } catch (Exception ignored) {
237    }
238    try {
239      openRegion(hriSecondary);
240    } catch (Exception ignored) {
241    }
242  }
243
244  @After
245  public void after() throws IOException, KeeperException {
246    try {
247      closeRegion(hriSecondary);
248    } catch (Exception ignored) {
249    }
250    try {
251      closeRegion(hriPrimary);
252    } catch (Exception ignored) {
253    }
254
255    ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
256  }
257
258  private HRegionServer getRS() {
259    return HTU.getMiniHBaseCluster().getRegionServer(0);
260  }
261
262  private void openRegion(HRegionInfo hri) throws Exception {
263    try {
264      if (isRegionOpened(hri)) return;
265    } catch (Exception e){}
266    // first version is '0'
267    AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
268      getRS().getServerName(), hri, null);
269    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
270    Assert.assertEquals(1, responseOpen.getOpeningStateCount());
271    Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
272        responseOpen.getOpeningState(0));
273    checkRegionIsOpened(hri);
274  }
275
276  private void closeRegion(HRegionInfo hri) throws Exception {
277    AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
278      getRS().getServerName(), hri.getRegionName());
279    AdminProtos.CloseRegionResponse responseClose = getRS()
280        .getRSRpcServices().closeRegion(null, crr);
281    Assert.assertTrue(responseClose.getClosed());
282
283    checkRegionIsClosed(hri.getEncodedName());
284  }
285
286  private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
287    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
288      Thread.sleep(1);
289    }
290  }
291
292  private boolean isRegionOpened(HRegionInfo hri) throws Exception {
293    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
294  }
295
296  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
297
298    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
299      Thread.sleep(1);
300    }
301
302    try {
303      Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
304    } catch (NotServingRegionException expected) {
305      // That's how it work: if the region is closed we have an exception.
306    }
307
308    // We don't delete the znode here, because there is not always a znode.
309  }
310
311  private void flushRegion(HRegionInfo regionInfo) throws IOException {
312    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
313  }
314
315  @Test
316  public void testUseRegionWithoutReplica() throws Exception {
317    byte[] b1 = "testUseRegionWithoutReplica".getBytes();
318    openRegion(hriSecondary);
319    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
320    try {
321      Get g = new Get(b1);
322      Result r = table.get(g);
323      Assert.assertFalse(r.isStale());
324    } finally {
325      closeRegion(hriSecondary);
326    }
327  }
328
329  @Test
330  public void testLocations() throws Exception {
331    byte[] b1 = "testLocations".getBytes();
332    openRegion(hriSecondary);
333    ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
334
335    try {
336      hc.clearRegionLocationCache();
337      RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
338      Assert.assertEquals(2, rl.size());
339
340      rl = hc.locateRegion(table.getName(), b1, true, false);
341      Assert.assertEquals(2, rl.size());
342
343      hc.clearRegionLocationCache();
344      rl = hc.locateRegion(table.getName(), b1, true, false);
345      Assert.assertEquals(2, rl.size());
346
347      rl = hc.locateRegion(table.getName(), b1, false, false);
348      Assert.assertEquals(2, rl.size());
349    } finally {
350      closeRegion(hriSecondary);
351    }
352  }
353
354  @Test
355  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
356    byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
357    openRegion(hriSecondary);
358
359    try {
360      // A get works and is not stale
361      Get g = new Get(b1);
362      Result r = table.get(g);
363      Assert.assertFalse(r.isStale());
364    } finally {
365      closeRegion(hriSecondary);
366    }
367  }
368
369
370  @Test
371  public void testGetNoResultStaleRegionWithReplica() throws Exception {
372    byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
373    openRegion(hriSecondary);
374
375    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
376    try {
377      Get g = new Get(b1);
378      g.setConsistency(Consistency.TIMELINE);
379      Result r = table.get(g);
380      Assert.assertTrue(r.isStale());
381    } finally {
382      SlowMeCopro.getPrimaryCdl().get().countDown();
383      closeRegion(hriSecondary);
384    }
385  }
386
387  @Test
388  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
389    byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
390    openRegion(hriSecondary);
391
392    try {
393      // We sleep; but we won't go to the stale region as we don't get the stale by default.
394      SlowMeCopro.sleepTime.set(2000);
395      Get g = new Get(b1);
396      Result r = table.get(g);
397      Assert.assertFalse(r.isStale());
398
399    } finally {
400      SlowMeCopro.sleepTime.set(0);
401      closeRegion(hriSecondary);
402    }
403  }
404
405  @Test
406  public void testFlushTable() throws Exception {
407    openRegion(hriSecondary);
408    try {
409      flushRegion(hriPrimary);
410      flushRegion(hriSecondary);
411
412      Put p = new Put(row);
413      p.addColumn(f, row, row);
414      table.put(p);
415
416      flushRegion(hriPrimary);
417      flushRegion(hriSecondary);
418    } finally {
419      Delete d = new Delete(row);
420      table.delete(d);
421      closeRegion(hriSecondary);
422    }
423  }
424
425  @Test
426  public void testFlushPrimary() throws Exception {
427    openRegion(hriSecondary);
428
429    try {
430      flushRegion(hriPrimary);
431
432      Put p = new Put(row);
433      p.addColumn(f, row, row);
434      table.put(p);
435
436      flushRegion(hriPrimary);
437    } finally {
438      Delete d = new Delete(row);
439      table.delete(d);
440      closeRegion(hriSecondary);
441    }
442  }
443
444  @Test
445  public void testFlushSecondary() throws Exception {
446    openRegion(hriSecondary);
447    try {
448      flushRegion(hriSecondary);
449
450      Put p = new Put(row);
451      p.addColumn(f, row, row);
452      table.put(p);
453
454      flushRegion(hriSecondary);
455    } catch (TableNotFoundException expected) {
456    } finally {
457      Delete d = new Delete(row);
458      table.delete(d);
459      closeRegion(hriSecondary);
460    }
461  }
462
463  @Test
464  public void testUseRegionWithReplica() throws Exception {
465    byte[] b1 = "testUseRegionWithReplica".getBytes();
466    openRegion(hriSecondary);
467
468    try {
469      // A simple put works, even if there here a second replica
470      Put p = new Put(b1);
471      p.addColumn(f, b1, b1);
472      table.put(p);
473      LOG.info("Put done");
474
475      // A get works and is not stale
476      Get g = new Get(b1);
477      Result r = table.get(g);
478      Assert.assertFalse(r.isStale());
479      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
480      LOG.info("get works and is not stale done");
481
482      // Even if it we have to wait a little on the main region
483      SlowMeCopro.sleepTime.set(2000);
484      g = new Get(b1);
485      r = table.get(g);
486      Assert.assertFalse(r.isStale());
487      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
488      SlowMeCopro.sleepTime.set(0);
489      LOG.info("sleep and is not stale done");
490
491      // But if we ask for stale we will get it
492      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
493      g = new Get(b1);
494      g.setConsistency(Consistency.TIMELINE);
495      r = table.get(g);
496      Assert.assertTrue(r.isStale());
497      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
498      SlowMeCopro.getPrimaryCdl().get().countDown();
499
500      LOG.info("stale done");
501
502      // exists works and is not stale
503      g = new Get(b1);
504      g.setCheckExistenceOnly(true);
505      r = table.get(g);
506      Assert.assertFalse(r.isStale());
507      Assert.assertTrue(r.getExists());
508      LOG.info("exists not stale done");
509
510      // exists works on stale but don't see the put
511      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
512      g = new Get(b1);
513      g.setCheckExistenceOnly(true);
514      g.setConsistency(Consistency.TIMELINE);
515      r = table.get(g);
516      Assert.assertTrue(r.isStale());
517      Assert.assertFalse("The secondary has stale data", r.getExists());
518      SlowMeCopro.getPrimaryCdl().get().countDown();
519      LOG.info("exists stale before flush done");
520
521      flushRegion(hriPrimary);
522      flushRegion(hriSecondary);
523      LOG.info("flush done");
524      Thread.sleep(1000 + REFRESH_PERIOD * 2);
525
526      // get works and is not stale
527      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
528      g = new Get(b1);
529      g.setConsistency(Consistency.TIMELINE);
530      r = table.get(g);
531      Assert.assertTrue(r.isStale());
532      Assert.assertFalse(r.isEmpty());
533      SlowMeCopro.getPrimaryCdl().get().countDown();
534      LOG.info("stale done");
535
536      // exists works on stale and we see the put after the flush
537      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
538      g = new Get(b1);
539      g.setCheckExistenceOnly(true);
540      g.setConsistency(Consistency.TIMELINE);
541      r = table.get(g);
542      Assert.assertTrue(r.isStale());
543      Assert.assertTrue(r.getExists());
544      SlowMeCopro.getPrimaryCdl().get().countDown();
545      LOG.info("exists stale after flush done");
546
547    } finally {
548      SlowMeCopro.getPrimaryCdl().get().countDown();
549      SlowMeCopro.sleepTime.set(0);
550      Delete d = new Delete(b1);
551      table.delete(d);
552      closeRegion(hriSecondary);
553    }
554  }
555
556  @Test
557  public void testHedgedRead() throws Exception {
558    byte[] b1 = "testHedgedRead".getBytes();
559    openRegion(hriSecondary);
560
561    try {
562      // A simple put works, even if there here a second replica
563      Put p = new Put(b1);
564      p.addColumn(f, b1, b1);
565      table.put(p);
566      LOG.info("Put done");
567
568      // A get works and is not stale
569      Get g = new Get(b1);
570      Result r = table.get(g);
571      Assert.assertFalse(r.isStale());
572      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
573      LOG.info("get works and is not stale done");
574
575      //reset
576      ClusterConnection connection = (ClusterConnection) HTU.getConnection();
577      Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
578      Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
579      hedgedReadOps.dec(hedgedReadOps.getCount());
580      hedgedReadWin.dec(hedgedReadWin.getCount());
581
582      // Wait a little on the main region, just enough to happen once hedged read
583      // and hedged read did not returned faster
584      int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
585      SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
586      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
587      g = new Get(b1);
588      g.setConsistency(Consistency.TIMELINE);
589      r = table.get(g);
590      Assert.assertFalse(r.isStale());
591      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
592      Assert.assertEquals(1, hedgedReadOps.getCount());
593      Assert.assertEquals(0, hedgedReadWin.getCount());
594      SlowMeCopro.sleepTime.set(0);
595      SlowMeCopro.getSecondaryCdl().get().countDown();
596      LOG.info("hedged read occurred but not faster");
597
598
599      // But if we ask for stale we will get it and hedged read returned faster
600      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
601      g = new Get(b1);
602      g.setConsistency(Consistency.TIMELINE);
603      r = table.get(g);
604      Assert.assertTrue(r.isStale());
605      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
606      Assert.assertEquals(2, hedgedReadOps.getCount());
607      Assert.assertEquals(1, hedgedReadWin.getCount());
608      SlowMeCopro.getPrimaryCdl().get().countDown();
609      LOG.info("hedged read occurred and faster");
610
611    } finally {
612      SlowMeCopro.getPrimaryCdl().get().countDown();
613      SlowMeCopro.getSecondaryCdl().get().countDown();
614      SlowMeCopro.sleepTime.set(0);
615      Delete d = new Delete(b1);
616      table.delete(d);
617      closeRegion(hriSecondary);
618    }
619  }
620
621  @Ignore // Disabled because it is flakey. Fails 17% on constrained GCE. %3 on Apache.
622  @Test
623  public void testCancelOfMultiGet() throws Exception {
624    openRegion(hriSecondary);
625    try {
626      List<Put> puts = new ArrayList<>(2);
627      byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
628      Put p = new Put(b1);
629      p.addColumn(f, b1, b1);
630      puts.add(p);
631
632      byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
633      p = new Put(b2);
634      p.addColumn(f, b2, b2);
635      puts.add(p);
636      table.put(puts);
637      LOG.debug("PUT done");
638      flushRegion(hriPrimary);
639      LOG.info("flush done");
640
641      Thread.sleep(1000 + REFRESH_PERIOD * 2);
642
643      AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess();
644
645      // Make primary slowdown
646      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
647
648      List<Get> gets = new ArrayList<>();
649      Get g = new Get(b1);
650      g.setCheckExistenceOnly(true);
651      g.setConsistency(Consistency.TIMELINE);
652      gets.add(g);
653      g = new Get(b2);
654      g.setCheckExistenceOnly(true);
655      g.setConsistency(Consistency.TIMELINE);
656      gets.add(g);
657      Object[] results = new Object[2];
658
659      int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
660      int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
661      AsyncProcessTask task = AsyncProcessTask.newBuilder()
662              .setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
663              .setTableName(table.getName())
664              .setRowAccess(gets)
665              .setResults(results)
666              .setOperationTimeout(operationTimeout)
667              .setRpcTimeout(readTimeout)
668              .build();
669      AsyncRequestFuture reqs = ap.submit(task);
670      reqs.waitUntilDone();
671      // verify we got the right results back
672      for (Object r : results) {
673        Assert.assertTrue(((Result)r).isStale());
674        Assert.assertTrue(((Result)r).getExists());
675      }
676      Set<CancellableRegionServerCallable> set =
677          ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
678      // verify we did cancel unneeded calls
679      Assert.assertTrue(!set.isEmpty());
680      for (CancellableRegionServerCallable m : set) {
681        Assert.assertTrue(m.isCancelled());
682      }
683    } finally {
684      SlowMeCopro.getPrimaryCdl().get().countDown();
685      SlowMeCopro.sleepTime.set(0);
686      SlowMeCopro.slowDownNext.set(false);
687      SlowMeCopro.countOfNext.set(0);
688      for (int i = 0; i < 2; i++) {
689        byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
690        Delete d = new Delete(b1);
691        table.delete(d);
692      }
693      closeRegion(hriSecondary);
694    }
695  }
696
697  @Test
698  public void testScanWithReplicas() throws Exception {
699    //simple scan
700    runMultipleScansOfOneType(false, false);
701  }
702
703  @Test
704  public void testSmallScanWithReplicas() throws Exception {
705    //small scan
706    runMultipleScansOfOneType(false, true);
707  }
708
709  @Test
710  public void testReverseScanWithReplicas() throws Exception {
711    //reverse scan
712    runMultipleScansOfOneType(true, false);
713  }
714
715  @Test
716  public void testCancelOfScan() throws Exception {
717    openRegion(hriSecondary);
718    int NUMROWS = 100;
719    try {
720      for (int i = 0; i < NUMROWS; i++) {
721        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
722        Put p = new Put(b1);
723        p.addColumn(f, b1, b1);
724        table.put(p);
725      }
726      LOG.debug("PUT done");
727      int caching = 20;
728      byte[] start;
729      start = Bytes.toBytes("testUseRegionWithReplica" + 0);
730
731      flushRegion(hriPrimary);
732      LOG.info("flush done");
733      Thread.sleep(1000 + REFRESH_PERIOD * 2);
734
735      // now make some 'next' calls slow
736      SlowMeCopro.slowDownNext.set(true);
737      SlowMeCopro.countOfNext.set(0);
738      SlowMeCopro.sleepTime.set(5000);
739
740      Scan scan = new Scan(start);
741      scan.setCaching(caching);
742      scan.setConsistency(Consistency.TIMELINE);
743      ResultScanner scanner = table.getScanner(scan);
744      Iterator<Result> iter = scanner.iterator();
745      iter.next();
746      Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
747      SlowMeCopro.slowDownNext.set(false);
748      SlowMeCopro.countOfNext.set(0);
749    } finally {
750      SlowMeCopro.getPrimaryCdl().get().countDown();
751      SlowMeCopro.sleepTime.set(0);
752      SlowMeCopro.slowDownNext.set(false);
753      SlowMeCopro.countOfNext.set(0);
754      for (int i = 0; i < NUMROWS; i++) {
755        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
756        Delete d = new Delete(b1);
757        table.delete(d);
758      }
759      closeRegion(hriSecondary);
760    }
761  }
762
763  private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
764    openRegion(hriSecondary);
765    int NUMROWS = 100;
766    int NUMCOLS = 10;
767    try {
768      for (int i = 0; i < NUMROWS; i++) {
769        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
770        for (int col = 0; col < NUMCOLS; col++) {
771          Put p = new Put(b1);
772          String qualifier = "qualifer" + col;
773          KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
774          p.add(kv);
775          table.put(p);
776        }
777      }
778      LOG.debug("PUT done");
779      int caching = 20;
780      long maxResultSize = Long.MAX_VALUE;
781
782      byte[] start;
783      if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
784      else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
785
786      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
787        start, NUMROWS, NUMCOLS, false, false);
788
789      // Even if we were to slow the server down, unless we ask for stale
790      // we won't get it
791      SlowMeCopro.sleepTime.set(5000);
792      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
793        NUMCOLS, false, false);
794      SlowMeCopro.sleepTime.set(0);
795
796      flushRegion(hriPrimary);
797      LOG.info("flush done");
798      Thread.sleep(1000 + REFRESH_PERIOD * 2);
799
800      //Now set the flag to get a response even if stale
801      SlowMeCopro.sleepTime.set(5000);
802      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
803        start, NUMROWS, NUMCOLS, true, false);
804      SlowMeCopro.sleepTime.set(0);
805
806      // now make some 'next' calls slow
807      SlowMeCopro.slowDownNext.set(true);
808      SlowMeCopro.countOfNext.set(0);
809      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
810        NUMROWS, NUMCOLS, true, true);
811      SlowMeCopro.slowDownNext.set(false);
812      SlowMeCopro.countOfNext.set(0);
813
814      // Make sure we do not get stale data..
815      SlowMeCopro.sleepTime.set(5000);
816      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
817        start, NUMROWS, NUMCOLS, false, false);
818      SlowMeCopro.sleepTime.set(0);
819
820      // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
821      // returned from the server before the replica switch occurs.
822      maxResultSize = 1;
823      SlowMeCopro.slowDownNext.set(true);
824      SlowMeCopro.countOfNext.set(0);
825      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
826        NUMROWS, NUMCOLS, true, true);
827      maxResultSize = Long.MAX_VALUE;
828      SlowMeCopro.slowDownNext.set(false);
829      SlowMeCopro.countOfNext.set(0);
830    } finally {
831      SlowMeCopro.getPrimaryCdl().get().countDown();
832      SlowMeCopro.sleepTime.set(0);
833      SlowMeCopro.slowDownNext.set(false);
834      SlowMeCopro.countOfNext.set(0);
835      for (int i = 0; i < NUMROWS; i++) {
836        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
837        Delete d = new Delete(b1);
838        table.delete(d);
839      }
840      closeRegion(hriSecondary);
841    }
842  }
843
844  private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
845      int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
846      boolean staleExpected, boolean slowNext)
847          throws Exception {
848    Scan scan = new Scan(startRow);
849    scan.setCaching(caching);
850    scan.setMaxResultSize(maxResultSize);
851    scan.setReversed(reversed);
852    scan.setSmall(small);
853    scan.setConsistency(consistency);
854    ResultScanner scanner = table.getScanner(scan);
855    Iterator<Result> iter = scanner.iterator();
856
857    // Maps of row keys that we have seen so far
858    HashMap<String, Boolean> map = new HashMap<>();
859
860    // Tracked metrics
861    int rowCount = 0;
862    int cellCount = 0;
863    int countOfStale = 0;
864
865    while (iter.hasNext()) {
866      rowCount++;
867      Result r = iter.next();
868      String row = new String(r.getRow());
869
870      if (map.containsKey(row)) {
871        throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
872      }
873
874      map.put(row, true);
875
876      for (Cell cell : r.rawCells()) {
877        cellCount++;
878      }
879
880      if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
881      if (r.isStale()) countOfStale++;
882    }
883    Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
884      rowCount == numRows);
885    Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
886      cellCount == (numRows * numCols));
887
888    if (slowNext) {
889      LOG.debug("Count of Stale " + countOfStale);
890      Assert.assertTrue(countOfStale > 1);
891
892      // If the scan was configured in such a way that a full row was NOT retrieved before the
893      // replica switch occurred, then it is possible that all rows were stale
894      if (maxResultSize != Long.MAX_VALUE) {
895        Assert.assertTrue(countOfStale <= numRows);
896      } else {
897        Assert.assertTrue(countOfStale < numRows);
898      }
899    }
900  }
901}