001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.RegionLocations;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
045import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
053import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
060import org.junit.AfterClass;
061import org.junit.Assert;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Category({ LargeTests.class, ClientTests.class })
072public class TestReplicaWithCluster {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
079
080  private static final int NB_SERVERS = 3;
081  private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
082  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
083
084  // second minicluster used in testing of replication
085  private static HBaseTestingUtility HTU2;
086  private static final byte[] f = HConstants.CATALOG_FAMILY;
087
088  private final static int REFRESH_PERIOD = 1000;
089  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
090
091  @Rule
092  public TestName name = new TestName();
093
094  /**
095   * This copro is used to synchronize the tests.
096   */
097  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
098    static final AtomicLong sleepTime = new AtomicLong(0);
099    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
100
101    public SlowMeCopro() {
102    }
103
104    @Override
105    public Optional<RegionObserver> getRegionObserver() {
106      return Optional.of(this);
107    }
108
109    @Override
110    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
111      final List<Cell> results) throws IOException {
112
113      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
114        CountDownLatch latch = cdl.get();
115        try {
116          if (sleepTime.get() > 0) {
117            LOG.info("Sleeping for " + sleepTime.get() + " ms");
118            Thread.sleep(sleepTime.get());
119          } else if (latch.getCount() > 0) {
120            LOG.info("Waiting for the counterCountDownLatch");
121            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
122            if (latch.getCount() > 0) {
123              throw new RuntimeException("Can't wait more");
124            }
125          }
126        } catch (InterruptedException e1) {
127          LOG.error(e1.toString(), e1);
128        }
129      } else {
130        LOG.info("We're not the primary replicas.");
131      }
132    }
133  }
134
135  /**
136   * This copro is used to simulate region server down exception for Get and Scan
137   */
138  @CoreCoprocessor
139  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
140
141    public RegionServerStoppedCopro() {
142    }
143
144    @Override
145    public Optional<RegionObserver> getRegionObserver() {
146      return Optional.of(this);
147    }
148
149    @Override
150    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
151      final List<Cell> results) throws IOException {
152
153      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
154
155      // Fail for the primary replica and replica 1
156      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
157        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
158        throw new RegionServerStoppedException(
159          "Server " + e.getEnvironment().getServerName() + " not running");
160      } else {
161        LOG.info("We're replica region " + replicaId);
162      }
163    }
164
165    @Override
166    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
167      final Scan scan) throws IOException {
168      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
169      // Fail for the primary replica and replica 1
170      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
171        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
172        throw new RegionServerStoppedException(
173          "Server " + e.getEnvironment().getServerName() + " not running");
174      } else {
175        LOG.info("We're replica region " + replicaId);
176      }
177    }
178  }
179
180  /**
181   * This copro is used to slow down the primary meta region scan a bit
182   */
183  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
184    implements RegionCoprocessor, RegionObserver {
185    static boolean slowDownPrimaryMetaScan = false;
186    static boolean throwException = false;
187
188    @Override
189    public Optional<RegionObserver> getRegionObserver() {
190      return Optional.of(this);
191    }
192
193    @Override
194    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
195      final List<Cell> results) throws IOException {
196
197      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
198
199      // Fail for the primary replica, but not for meta
200      if (throwException) {
201        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
202          LOG.info("Get, throw Region Server Stopped Exceptoin for region "
203            + e.getEnvironment().getRegion().getRegionInfo());
204          throw new RegionServerStoppedException(
205            "Server " + e.getEnvironment().getServerName() + " not running");
206        }
207      } else {
208        LOG.info("Get, We're replica region " + replicaId);
209      }
210    }
211
212    @Override
213    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
214      final Scan scan) throws IOException {
215
216      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
217
218      // Slow down with the primary meta region scan
219      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
220        if (slowDownPrimaryMetaScan) {
221          LOG.info("Scan with primary meta region, slow down a bit");
222          try {
223            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
224          } catch (InterruptedException ie) {
225            // Ingore
226          }
227        }
228
229        // Fail for the primary replica
230        if (throwException) {
231          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica "
232            + e.getEnvironment().getRegion().getRegionInfo());
233
234          throw new RegionServerStoppedException(
235            "Server " + e.getEnvironment().getServerName() + " not running");
236        } else {
237          LOG.info("Scan, We're replica region " + replicaId);
238        }
239      } else {
240        LOG.info("Scan, We're replica region " + replicaId);
241      }
242    }
243  }
244
245  @BeforeClass
246  public static void beforeClass() throws Exception {
247    // enable store file refreshing
248    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
249      REFRESH_PERIOD);
250
251    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
252    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
253    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
254    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
255    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
256    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
257    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
258
259    // Wait for primary call longer so make sure that it will get exception from the primary call
260    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
261    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
262
263    // Make sure master does not host system tables.
264    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
265
266    // Set system coprocessor so it can be applied to meta regions
267    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
268      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
269
270    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
271      META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
272
273    HTU.startMiniCluster(NB_SERVERS);
274    // Enable meta replica at server side
275    HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
276
277    HTU.getHBaseCluster().startMaster();
278  }
279
280  @AfterClass
281  public static void afterClass() throws Exception {
282    if (HTU2 != null) HTU2.shutdownMiniCluster();
283    HTU.shutdownMiniCluster();
284  }
285
286  @Test
287  public void testCreateDeleteTable() throws IOException {
288    // Create table then get the single region for our new table.
289    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
290    hdt.setRegionReplication(NB_SERVERS);
291    hdt.addCoprocessor(SlowMeCopro.class.getName());
292    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
293
294    Put p = new Put(row);
295    p.addColumn(f, row, row);
296    table.put(p);
297
298    Get g = new Get(row);
299    Result r = table.get(g);
300    Assert.assertFalse(r.isStale());
301
302    try {
303      // But if we ask for stale we will get it
304      SlowMeCopro.cdl.set(new CountDownLatch(1));
305      g = new Get(row);
306      g.setConsistency(Consistency.TIMELINE);
307      r = table.get(g);
308      Assert.assertTrue(r.isStale());
309      SlowMeCopro.cdl.get().countDown();
310    } finally {
311      SlowMeCopro.cdl.get().countDown();
312      SlowMeCopro.sleepTime.set(0);
313    }
314
315    HTU.getAdmin().disableTable(hdt.getTableName());
316    HTU.deleteTable(hdt.getTableName());
317  }
318
319  @Test
320  public void testChangeTable() throws Exception {
321    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
322      .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName())
323      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
324    HTU.getAdmin().createTable(td);
325    Table table = HTU.getConnection().getTable(td.getTableName());
326    // basic test: it should work.
327    Put p = new Put(row);
328    p.addColumn(f, row, row);
329    table.put(p);
330
331    Get g = new Get(row);
332    Result r = table.get(g);
333    Assert.assertFalse(r.isStale());
334
335    // Add a CF, it should work.
336    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
337    td = TableDescriptorBuilder.newBuilder(td)
338      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build();
339    HTU.getAdmin().disableTable(td.getTableName());
340    HTU.getAdmin().modifyTable(td);
341    HTU.getAdmin().enableTable(td.getTableName());
342    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
343    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
344      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
345
346    p = new Put(row);
347    p.addColumn(row, row, row);
348    table.put(p);
349
350    g = new Get(row);
351    r = table.get(g);
352    Assert.assertFalse(r.isStale());
353
354    try {
355      SlowMeCopro.cdl.set(new CountDownLatch(1));
356      g = new Get(row);
357      g.setConsistency(Consistency.TIMELINE);
358      r = table.get(g);
359      Assert.assertTrue(r.isStale());
360    } finally {
361      SlowMeCopro.cdl.get().countDown();
362      SlowMeCopro.sleepTime.set(0);
363    }
364
365    Admin admin = HTU.getAdmin();
366    nHdt = admin.getDescriptor(td.getTableName());
367    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
368      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
369
370    admin.disableTable(td.getTableName());
371    admin.deleteTable(td.getTableName());
372    admin.close();
373  }
374
375  @SuppressWarnings("deprecation")
376  @Test
377  public void testReplicaAndReplication() throws Exception {
378    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
379    hdt.setRegionReplication(NB_SERVERS);
380
381    HColumnDescriptor fam = new HColumnDescriptor(row);
382    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
383    hdt.addFamily(fam);
384
385    hdt.addCoprocessor(SlowMeCopro.class.getName());
386    HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
387
388    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
389    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
390    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
391    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
392
393    HTU2 = new HBaseTestingUtility(conf2);
394    HTU2.setZkCluster(miniZK);
395    HTU2.startMiniCluster(NB_SERVERS);
396    LOG.info("Setup second Zk");
397    HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
398
399    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
400
401    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
402    rpc.setClusterKey(HTU2.getClusterKey());
403    admin.addPeer("2", rpc, null);
404    admin.close();
405
406    Put p = new Put(row);
407    p.addColumn(row, row, row);
408    final Table table = HTU.getConnection().getTable(hdt.getTableName());
409    table.put(p);
410
411    HTU.getAdmin().flush(table.getName());
412    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
413
414    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
415      @Override
416      public boolean evaluate() throws Exception {
417        try {
418          SlowMeCopro.cdl.set(new CountDownLatch(1));
419          Get g = new Get(row);
420          g.setConsistency(Consistency.TIMELINE);
421          Result r = table.get(g);
422          Assert.assertTrue(r.isStale());
423          return !r.isEmpty();
424        } finally {
425          SlowMeCopro.cdl.get().countDown();
426          SlowMeCopro.sleepTime.set(0);
427        }
428      }
429    });
430    table.close();
431    LOG.info("stale get on the first cluster done. Now for the second.");
432
433    final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
434    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
435      @Override
436      public boolean evaluate() throws Exception {
437        try {
438          SlowMeCopro.cdl.set(new CountDownLatch(1));
439          Get g = new Get(row);
440          g.setConsistency(Consistency.TIMELINE);
441          Result r = table2.get(g);
442          Assert.assertTrue(r.isStale());
443          return !r.isEmpty();
444        } finally {
445          SlowMeCopro.cdl.get().countDown();
446          SlowMeCopro.sleepTime.set(0);
447        }
448      }
449    });
450    table2.close();
451
452    HTU.getAdmin().disableTable(hdt.getTableName());
453    HTU.deleteTable(hdt.getTableName());
454
455    HTU2.getAdmin().disableTable(hdt.getTableName());
456    HTU2.deleteTable(hdt.getTableName());
457
458    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
459    // the minicluster has negative impact of deleting all HConnections in JVM.
460  }
461
462  @Test
463  public void testBulkLoad() throws IOException {
464    // Create table then get the single region for our new table.
465    LOG.debug("Creating test table");
466    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
467    hdt.setRegionReplication(NB_SERVERS);
468    hdt.addCoprocessor(SlowMeCopro.class.getName());
469    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
470
471    // create hfiles to load.
472    LOG.debug("Creating test data");
473    Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName());
474    final int numRows = 10;
475    final byte[] qual = Bytes.toBytes("qual");
476    final byte[] val = Bytes.toBytes("val");
477    final List<Pair<byte[], String>> famPaths = new ArrayList<>();
478    for (HColumnDescriptor col : hdt.getColumnFamilies()) {
479      Path hfile = new Path(dir, col.getNameAsString());
480      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
481        val, numRows);
482      famPaths.add(new Pair<>(col.getName(), hfile.toString()));
483    }
484
485    // bulk load HFiles
486    LOG.debug("Loading test data");
487    final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
488    table = conn.getTable(hdt.getTableName());
489    final String bulkToken =
490      new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
491    ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, hdt.getTableName(),
492      TestHRegionServerBulkLoad.rowkey(0),
493      new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
494      @Override
495      protected Void rpcCall() throws Exception {
496        LOG.debug("Going to connect to server " + getLocation() + " for row "
497          + Bytes.toStringBinary(getRow()));
498        SecureBulkLoadClient secureClient = null;
499        byte[] regionName = getLocation().getRegionInfo().getRegionName();
500        try (Table table = conn.getTable(getTableName())) {
501          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
502          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, bulkToken);
503        }
504        return null;
505      }
506    };
507    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
508    RpcRetryingCaller<Void> caller = factory.newCaller();
509    caller.callWithRetries(callable, 10000);
510
511    // verify we can read them from the primary
512    LOG.debug("Verifying data load");
513    for (int i = 0; i < numRows; i++) {
514      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
515      Get g = new Get(row);
516      Result r = table.get(g);
517      Assert.assertFalse(r.isStale());
518    }
519
520    // verify we can read them from the replica
521    LOG.debug("Verifying replica queries");
522    try {
523      SlowMeCopro.cdl.set(new CountDownLatch(1));
524      for (int i = 0; i < numRows; i++) {
525        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
526        Get g = new Get(row);
527        g.setConsistency(Consistency.TIMELINE);
528        Result r = table.get(g);
529        Assert.assertTrue(r.isStale());
530      }
531      SlowMeCopro.cdl.get().countDown();
532    } finally {
533      SlowMeCopro.cdl.get().countDown();
534      SlowMeCopro.sleepTime.set(0);
535    }
536
537    HTU.getAdmin().disableTable(hdt.getTableName());
538    HTU.deleteTable(hdt.getTableName());
539  }
540
541  @Test
542  public void testReplicaGetWithPrimaryDown() throws IOException {
543    // Create table then get the single region for our new table.
544    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
545    hdt.setRegionReplication(NB_SERVERS);
546    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
547    try {
548      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
549
550      Put p = new Put(row);
551      p.addColumn(f, row, row);
552      table.put(p);
553
554      // Flush so it can be picked by the replica refresher thread
555      HTU.flush(table.getName());
556
557      // Sleep for some time until data is picked up by replicas
558      try {
559        Thread.sleep(2 * REFRESH_PERIOD);
560      } catch (InterruptedException e1) {
561        LOG.error(e1.toString(), e1);
562      }
563
564      // But if we ask for stale we will get it
565      Get g = new Get(row);
566      g.setConsistency(Consistency.TIMELINE);
567      Result r = table.get(g);
568      Assert.assertTrue(r.isStale());
569    } finally {
570      HTU.getAdmin().disableTable(hdt.getTableName());
571      HTU.deleteTable(hdt.getTableName());
572    }
573  }
574
575  @Test
576  public void testReplicaScanWithPrimaryDown() throws IOException {
577    // Create table then get the single region for our new table.
578    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
579    hdt.setRegionReplication(NB_SERVERS);
580    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
581
582    try {
583      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
584
585      Put p = new Put(row);
586      p.addColumn(f, row, row);
587      table.put(p);
588
589      // Flush so it can be picked by the replica refresher thread
590      HTU.flush(table.getName());
591
592      // Sleep for some time until data is picked up by replicas
593      try {
594        Thread.sleep(2 * REFRESH_PERIOD);
595      } catch (InterruptedException e1) {
596        LOG.error(e1.toString(), e1);
597      }
598
599      // But if we ask for stale we will get it
600      // Instantiating the Scan class
601      Scan scan = new Scan();
602
603      // Scanning the required columns
604      scan.addFamily(f);
605      scan.setConsistency(Consistency.TIMELINE);
606
607      // Getting the scan result
608      ResultScanner scanner = table.getScanner(scan);
609
610      Result r = scanner.next();
611
612      Assert.assertTrue(r.isStale());
613    } finally {
614      HTU.getAdmin().disableTable(hdt.getTableName());
615      HTU.deleteTable(hdt.getTableName());
616    }
617  }
618
619  @Test
620  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
621    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
622    HTU.getConfiguration().set("hbase.rpc.client.impl",
623      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
624    // Create table then get the single region for our new table.
625    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
626    hdt.setRegionReplication(NB_SERVERS);
627    hdt.addCoprocessor(SlowMeCopro.class.getName());
628
629    try {
630      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
631
632      Put p = new Put(row);
633      p.addColumn(f, row, row);
634      table.put(p);
635
636      // Flush so it can be picked by the replica refresher thread
637      HTU.flush(table.getName());
638
639      // Sleep for some time until data is picked up by replicas
640      try {
641        Thread.sleep(2 * REFRESH_PERIOD);
642      } catch (InterruptedException e1) {
643        LOG.error(e1.toString(), e1);
644      }
645
646      try {
647        // Create the new connection so new config can kick in
648        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
649        Table t = connection.getTable(hdt.getTableName());
650
651        // But if we ask for stale we will get it
652        SlowMeCopro.cdl.set(new CountDownLatch(1));
653        Get g = new Get(row);
654        g.setConsistency(Consistency.TIMELINE);
655        Result r = t.get(g);
656        Assert.assertTrue(r.isStale());
657        SlowMeCopro.cdl.get().countDown();
658      } finally {
659        SlowMeCopro.cdl.get().countDown();
660        SlowMeCopro.sleepTime.set(0);
661      }
662    } finally {
663      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
664      HTU.getConfiguration().unset("hbase.rpc.client.impl");
665      HTU.getAdmin().disableTable(hdt.getTableName());
666      HTU.deleteTable(hdt.getTableName());
667    }
668  }
669
670  // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table
671  // scan will always get the result from primary meta region as long as the result is returned
672  // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region.
673  @Test
674  public void testGetRegionLocationFromPrimaryMetaRegion()
675    throws IOException, InterruptedException {
676    HTU.getAdmin().setBalancerRunning(false, true);
677    Configuration conf = new Configuration(HTU.getConfiguration());
678    conf.setBoolean(USE_META_REPLICAS, true);
679    Connection conn = ConnectionFactory.createConnection(conf);
680
681    // Create table then get the single region for our new table.
682    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
683    hdt.setRegionReplication(2);
684    try {
685
686      HTU.createTable(hdt, new byte[][] { f }, null);
687
688      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
689
690      // Get user table location, always get it from the primary meta replica
691      RegionLocations url =
692        ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false);
693
694    } finally {
695      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
696      HTU.getAdmin().setBalancerRunning(true, true);
697      HTU.getAdmin().disableTable(hdt.getTableName());
698      HTU.deleteTable(hdt.getTableName());
699    }
700  }
701
702  // This test is to simulate the case that the meta region and the primary user region
703  // are down, hbase client is able to access user replica regions and return stale data.
704  // Meta replica is enabled to show the case that the meta replica region could be out of sync
705  // with the primary meta region.
706  @Test
707  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
708    HTU.getAdmin().setBalancerRunning(false, true);
709
710    Configuration conf = new Configuration(HTU.getConfiguration());
711    conf.setBoolean(USE_META_REPLICAS, true);
712    Connection conn = ConnectionFactory.createConnection(conf);
713
714    // Create table then get the single region for our new table.
715    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
716    hdt.setRegionReplication(2);
717    try {
718
719      HTU.createTable(hdt, new byte[][] { f }, null);
720      Table table = conn.getTable(TableName.valueOf(name.getMethodName()));
721
722      // Get Meta location
723      RegionLocations mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME,
724        HConstants.EMPTY_START_ROW, false, false);
725
726      // Get user table location
727      RegionLocations url =
728        ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false);
729
730      // Make sure that user primary region is co-hosted with the meta region
731      if (
732        !url.getDefaultRegionLocation().getServerName()
733          .equals(mrl.getDefaultRegionLocation().getServerName())
734      ) {
735        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
736          mrl.getDefaultRegionLocation().getServerName());
737      }
738
739      // Make sure that the user replica region is not hosted by the same region server with
740      // primary
741      if (
742        url.getRegionLocation(1).getServerName()
743          .equals(mrl.getDefaultRegionLocation().getServerName())
744      ) {
745        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
746          url.getDefaultRegionLocation().getServerName());
747      }
748
749      // Wait until the meta table is updated with new location info
750      while (true) {
751        mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME,
752          HConstants.EMPTY_START_ROW, false, false);
753
754        // Get user table location
755        url = ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, true);
756
757        LOG.info("meta locations " + mrl);
758        LOG.info("table locations " + url);
759        ServerName a = url.getDefaultRegionLocation().getServerName();
760        ServerName b = mrl.getDefaultRegionLocation().getServerName();
761        if (a.equals(b)) {
762          break;
763        } else {
764          LOG.info("Waiting for new region info to be updated in meta table");
765          Thread.sleep(100);
766        }
767      }
768
769      Put p = new Put(row);
770      p.addColumn(f, row, row);
771      table.put(p);
772
773      // Flush so it can be picked by the replica refresher thread
774      HTU.flush(table.getName());
775
776      // Sleep for some time until data is picked up by replicas
777      try {
778        Thread.sleep(2 * REFRESH_PERIOD);
779      } catch (InterruptedException e1) {
780        LOG.error(e1.toString(), e1);
781      }
782
783      // Simulating the RS down
784      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
785
786      // The first Get is supposed to succeed
787      Get g = new Get(row);
788      g.setConsistency(Consistency.TIMELINE);
789      Result r = table.get(g);
790      Assert.assertTrue(r.isStale());
791
792      // The second Get will succeed as well
793      r = table.get(g);
794      Assert.assertTrue(r.isStale());
795    } finally {
796      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
797      HTU.getAdmin().setBalancerRunning(true, true);
798      HTU.getAdmin().disableTable(hdt.getTableName());
799      HTU.deleteTable(hdt.getTableName());
800    }
801  }
802}