001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.codahale.metrics.Counter;
021import java.io.IOException;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.concurrent.atomic.AtomicReference;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.NotServingRegionException;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.InternalScanner;
046import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
047import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.zookeeper.KeeperException;
052import org.junit.After;
053import org.junit.AfterClass;
054import org.junit.Assert;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
064import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
066
067/**
068 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
069 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
070 */
071@Category({ LargeTests.class, ClientTests.class })
072@SuppressWarnings("deprecation")
073public class TestReplicasClient {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestReplicasClient.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
080
081  private static TableName TABLE_NAME;
082  private Table table = null;
083  private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());;
084
085  private static RegionInfo hriPrimary;
086  private static RegionInfo hriSecondary;
087
088  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
089  private static final byte[] f = HConstants.CATALOG_FAMILY;
090
091  private final static int REFRESH_PERIOD = 1000;
092
093  /**
094   * This copro is used to synchronize the tests.
095   */
096  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
097    static final AtomicLong sleepTime = new AtomicLong(0);
098    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
099    static final AtomicInteger countOfNext = new AtomicInteger(0);
100    private static final AtomicReference<CountDownLatch> primaryCdl =
101      new AtomicReference<>(new CountDownLatch(0));
102    private static final AtomicReference<CountDownLatch> secondaryCdl =
103      new AtomicReference<>(new CountDownLatch(0));
104
105    public SlowMeCopro() {
106    }
107
108    @Override
109    public Optional<RegionObserver> getRegionObserver() {
110      return Optional.of(this);
111    }
112
113    @Override
114    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
115      final List<Cell> results) throws IOException {
116      slowdownCode(e);
117    }
118
119    @Override
120    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
121      final Scan scan) throws IOException {
122      slowdownCode(e);
123    }
124
125    @Override
126    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
127      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
128      throws IOException {
129      // this will slow down a certain next operation if the conditions are met. The slowness
130      // will allow the call to go to a replica
131      if (slowDownNext.get()) {
132        // have some "next" return successfully from the primary; hence countOfNext checked
133        if (countOfNext.incrementAndGet() == 2) {
134          sleepTime.set(2000);
135          slowdownCode(e);
136        }
137      }
138      return true;
139    }
140
141    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
142      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
143        LOG.info("We're the primary replicas.");
144        CountDownLatch latch = getPrimaryCdl().get();
145        try {
146          if (sleepTime.get() > 0) {
147            LOG.info("Sleeping for " + sleepTime.get() + " ms");
148            Thread.sleep(sleepTime.get());
149          } else if (latch.getCount() > 0) {
150            LOG.info("Waiting for the counterCountDownLatch");
151            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
152            if (latch.getCount() > 0) {
153              throw new RuntimeException("Can't wait more");
154            }
155          }
156        } catch (InterruptedException e1) {
157          LOG.error(e1.toString(), e1);
158        }
159      } else {
160        LOG.info("We're not the primary replicas.");
161        CountDownLatch latch = getSecondaryCdl().get();
162        try {
163          if (latch.getCount() > 0) {
164            LOG.info("Waiting for the secondary counterCountDownLatch");
165            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
166            if (latch.getCount() > 0) {
167              throw new RuntimeException("Can't wait more");
168            }
169          }
170        } catch (InterruptedException e1) {
171          LOG.error(e1.toString(), e1);
172        }
173      }
174    }
175
176    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
177      return primaryCdl;
178    }
179
180    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
181      return secondaryCdl;
182    }
183  }
184
185  @BeforeClass
186  public static void beforeClass() throws Exception {
187    // enable store file refreshing
188    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
189      REFRESH_PERIOD);
190    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
191    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
192    StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1)
193      .numAlwaysStandByMasters(1).numMasters(1).build();
194    HTU.startMiniCluster(option);
195
196    // Create table then get the single region for our new table.
197    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
198      TableName.valueOf(TestReplicasClient.class.getSimpleName()),
199      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
200      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
201    builder.setCoprocessor(SlowMeCopro.class.getName());
202    TableDescriptor hdt = builder.build();
203    HTU.createTable(hdt, new byte[][] { f }, null);
204    TABLE_NAME = hdt.getTableName();
205    try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
206      hriPrimary = locator.getRegionLocation(row, false).getRegion();
207    }
208
209    // mock a secondary region info to open
210    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
211
212    // No master
213    LOG.info("Master is going to be stopped");
214    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
215    Configuration c = new Configuration(HTU.getConfiguration());
216    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
217    LOG.info("Master has stopped");
218  }
219
220  @AfterClass
221  public static void afterClass() throws Exception {
222    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
223    HTU.shutdownMiniCluster();
224  }
225
226  @Before
227  public void before() throws IOException {
228    HTU.getConnection().clearRegionLocationCache();
229    try {
230      openRegion(hriPrimary);
231    } catch (Exception ignored) {
232    }
233    try {
234      openRegion(hriSecondary);
235    } catch (Exception ignored) {
236    }
237    table = HTU.getConnection().getTable(TABLE_NAME);
238  }
239
240  @After
241  public void after() throws IOException, KeeperException {
242    try {
243      closeRegion(hriSecondary);
244    } catch (Exception ignored) {
245    }
246    try {
247      closeRegion(hriPrimary);
248    } catch (Exception ignored) {
249    }
250    HTU.getConnection().clearRegionLocationCache();
251  }
252
253  private HRegionServer getRS() {
254    return HTU.getMiniHBaseCluster().getRegionServer(0);
255  }
256
257  private void openRegion(RegionInfo hri) throws Exception {
258    try {
259      if (isRegionOpened(hri)) return;
260    } catch (Exception e) {
261    }
262    // first version is '0'
263    AdminProtos.OpenRegionRequest orr =
264      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
265    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
266    Assert.assertEquals(1, responseOpen.getOpeningStateCount());
267    Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
268      responseOpen.getOpeningState(0));
269    checkRegionIsOpened(hri);
270  }
271
272  private void closeRegion(RegionInfo hri) throws Exception {
273    AdminProtos.CloseRegionRequest crr =
274      ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName());
275    AdminProtos.CloseRegionResponse responseClose =
276      getRS().getRSRpcServices().closeRegion(null, crr);
277    Assert.assertTrue(responseClose.getClosed());
278
279    checkRegionIsClosed(hri.getEncodedName());
280  }
281
282  private void checkRegionIsOpened(RegionInfo hri) throws Exception {
283    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
284      Thread.sleep(1);
285    }
286  }
287
288  private boolean isRegionOpened(RegionInfo hri) throws Exception {
289    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
290  }
291
292  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
293
294    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
295      Thread.sleep(1);
296    }
297
298    try {
299      Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
300    } catch (NotServingRegionException expected) {
301      // That's how it work: if the region is closed we have an exception.
302    }
303
304    // We don't delete the znode here, because there is not always a znode.
305  }
306
307  private void flushRegion(RegionInfo regionInfo) throws IOException {
308    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
309  }
310
311  @Test
312  public void testUseRegionWithoutReplica() throws Exception {
313    byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica");
314    openRegion(hriSecondary);
315    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
316    try {
317      Get g = new Get(b1);
318      Result r = table.get(g);
319      Assert.assertFalse(r.isStale());
320    } finally {
321      closeRegion(hriSecondary);
322    }
323  }
324
325  @Test
326  public void testLocations() throws Exception {
327    byte[] b1 = Bytes.toBytes("testLocations");
328    openRegion(hriSecondary);
329
330    try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
331      RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
332      conn.clearRegionLocationCache();
333      List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
334      Assert.assertEquals(2, rl.size());
335
336      rl = locator.getRegionLocations(b1, false);
337      Assert.assertEquals(2, rl.size());
338
339      conn.clearRegionLocationCache();
340      rl = locator.getRegionLocations(b1, false);
341      Assert.assertEquals(2, rl.size());
342
343      rl = locator.getRegionLocations(b1, true);
344      Assert.assertEquals(2, rl.size());
345    } finally {
346      closeRegion(hriSecondary);
347    }
348  }
349
350  @Test
351  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
352    byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica");
353    openRegion(hriSecondary);
354
355    try {
356      // A get works and is not stale
357      Get g = new Get(b1);
358      Result r = table.get(g);
359      Assert.assertFalse(r.isStale());
360    } finally {
361      closeRegion(hriSecondary);
362    }
363  }
364
365  @Test
366  public void testGetNoResultStaleRegionWithReplica() throws Exception {
367    byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica");
368    openRegion(hriSecondary);
369
370    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
371    try {
372      Get g = new Get(b1);
373      g.setConsistency(Consistency.TIMELINE);
374      Result r = table.get(g);
375      Assert.assertTrue(r.isStale());
376    } finally {
377      SlowMeCopro.getPrimaryCdl().get().countDown();
378      closeRegion(hriSecondary);
379    }
380  }
381
382  @Test
383  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
384    byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
385    openRegion(hriSecondary);
386
387    try {
388      // We sleep; but we won't go to the stale region as we don't get the stale by default.
389      SlowMeCopro.sleepTime.set(2000);
390      Get g = new Get(b1);
391      Result r = table.get(g);
392      Assert.assertFalse(r.isStale());
393
394    } finally {
395      SlowMeCopro.sleepTime.set(0);
396      closeRegion(hriSecondary);
397    }
398  }
399
400  @Test
401  public void testFlushTable() throws Exception {
402    openRegion(hriSecondary);
403    try {
404      flushRegion(hriPrimary);
405      flushRegion(hriSecondary);
406
407      Put p = new Put(row);
408      p.addColumn(f, row, row);
409      table.put(p);
410
411      flushRegion(hriPrimary);
412      flushRegion(hriSecondary);
413    } finally {
414      Delete d = new Delete(row);
415      table.delete(d);
416      closeRegion(hriSecondary);
417    }
418  }
419
420  @Test
421  public void testFlushPrimary() throws Exception {
422    openRegion(hriSecondary);
423
424    try {
425      flushRegion(hriPrimary);
426
427      Put p = new Put(row);
428      p.addColumn(f, row, row);
429      table.put(p);
430
431      flushRegion(hriPrimary);
432    } finally {
433      Delete d = new Delete(row);
434      table.delete(d);
435      closeRegion(hriSecondary);
436    }
437  }
438
439  @Test
440  public void testFlushSecondary() throws Exception {
441    openRegion(hriSecondary);
442    try {
443      flushRegion(hriSecondary);
444
445      Put p = new Put(row);
446      p.addColumn(f, row, row);
447      table.put(p);
448
449      flushRegion(hriSecondary);
450    } catch (TableNotFoundException expected) {
451    } finally {
452      Delete d = new Delete(row);
453      table.delete(d);
454      closeRegion(hriSecondary);
455    }
456  }
457
458  @Test
459  public void testUseRegionWithReplica() throws Exception {
460    byte[] b1 = Bytes.toBytes("testUseRegionWithReplica");
461    openRegion(hriSecondary);
462
463    try {
464      // A simple put works, even if there here a second replica
465      Put p = new Put(b1);
466      p.addColumn(f, b1, b1);
467      table.put(p);
468      LOG.info("Put done");
469
470      // A get works and is not stale
471      Get g = new Get(b1);
472      Result r = table.get(g);
473      Assert.assertFalse(r.isStale());
474      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
475      LOG.info("get works and is not stale done");
476
477      // Even if it we have to wait a little on the main region
478      SlowMeCopro.sleepTime.set(2000);
479      g = new Get(b1);
480      r = table.get(g);
481      Assert.assertFalse(r.isStale());
482      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
483      SlowMeCopro.sleepTime.set(0);
484      LOG.info("sleep and is not stale done");
485
486      // But if we ask for stale we will get it
487      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
488      g = new Get(b1);
489      g.setConsistency(Consistency.TIMELINE);
490      r = table.get(g);
491      Assert.assertTrue(r.isStale());
492      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
493      SlowMeCopro.getPrimaryCdl().get().countDown();
494
495      LOG.info("stale done");
496
497      // exists works and is not stale
498      g = new Get(b1);
499      g.setCheckExistenceOnly(true);
500      r = table.get(g);
501      Assert.assertFalse(r.isStale());
502      Assert.assertTrue(r.getExists());
503      LOG.info("exists not stale done");
504
505      // exists works on stale but don't see the put
506      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
507      g = new Get(b1);
508      g.setCheckExistenceOnly(true);
509      g.setConsistency(Consistency.TIMELINE);
510      r = table.get(g);
511      Assert.assertTrue(r.isStale());
512      Assert.assertFalse("The secondary has stale data", r.getExists());
513      SlowMeCopro.getPrimaryCdl().get().countDown();
514      LOG.info("exists stale before flush done");
515
516      flushRegion(hriPrimary);
517      flushRegion(hriSecondary);
518      LOG.info("flush done");
519      Thread.sleep(1000 + REFRESH_PERIOD * 2);
520
521      // get works and is not stale
522      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
523      g = new Get(b1);
524      g.setConsistency(Consistency.TIMELINE);
525      r = table.get(g);
526      Assert.assertTrue(r.isStale());
527      Assert.assertFalse(r.isEmpty());
528      SlowMeCopro.getPrimaryCdl().get().countDown();
529      LOG.info("stale done");
530
531      // exists works on stale and we see the put after the flush
532      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
533      g = new Get(b1);
534      g.setCheckExistenceOnly(true);
535      g.setConsistency(Consistency.TIMELINE);
536      r = table.get(g);
537      Assert.assertTrue(r.isStale());
538      Assert.assertTrue(r.getExists());
539      SlowMeCopro.getPrimaryCdl().get().countDown();
540      LOG.info("exists stale after flush done");
541
542    } finally {
543      SlowMeCopro.getPrimaryCdl().get().countDown();
544      SlowMeCopro.sleepTime.set(0);
545      Delete d = new Delete(b1);
546      table.delete(d);
547      closeRegion(hriSecondary);
548    }
549  }
550
551  @Test
552  public void testHedgedRead() throws Exception {
553    byte[] b1 = Bytes.toBytes("testHedgedRead");
554    openRegion(hriSecondary);
555
556    try {
557      // A simple put works, even if there here a second replica
558      Put p = new Put(b1);
559      p.addColumn(f, b1, b1);
560      table.put(p);
561      LOG.info("Put done");
562
563      // A get works and is not stale
564      Get g = new Get(b1);
565      Result r = table.get(g);
566      Assert.assertFalse(r.isStale());
567      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
568      LOG.info("get works and is not stale done");
569
570      // reset
571      AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
572      Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps();
573      Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin();
574      hedgedReadOps.dec(hedgedReadOps.getCount());
575      hedgedReadWin.dec(hedgedReadWin.getCount());
576
577      // Wait a little on the main region, just enough to happen once hedged read
578      // and hedged read did not returned faster
579      long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
580      // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
581      // trigger the hedged read...
582      SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100);
583      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
584      g = new Get(b1);
585      g.setConsistency(Consistency.TIMELINE);
586      r = table.get(g);
587      Assert.assertFalse(r.isStale());
588      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
589      Assert.assertEquals(1, hedgedReadOps.getCount());
590      Assert.assertEquals(0, hedgedReadWin.getCount());
591      SlowMeCopro.sleepTime.set(0);
592      SlowMeCopro.getSecondaryCdl().get().countDown();
593      LOG.info("hedged read occurred but not faster");
594
595      // But if we ask for stale we will get it and hedged read returned faster
596      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
597      g = new Get(b1);
598      g.setConsistency(Consistency.TIMELINE);
599      r = table.get(g);
600      Assert.assertTrue(r.isStale());
601      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
602      Assert.assertEquals(2, hedgedReadOps.getCount());
603      // we update the metrics after we finish the request so we use a waitFor here, use assert
604      // directly may cause failure if we run too fast.
605      HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1);
606      SlowMeCopro.getPrimaryCdl().get().countDown();
607      LOG.info("hedged read occurred and faster");
608
609    } finally {
610      SlowMeCopro.getPrimaryCdl().get().countDown();
611      SlowMeCopro.getSecondaryCdl().get().countDown();
612      SlowMeCopro.sleepTime.set(0);
613      Delete d = new Delete(b1);
614      table.delete(d);
615      closeRegion(hriSecondary);
616    }
617  }
618}