001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.concurrent.atomic.AtomicReference;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
044import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.ObserverContext;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.RegionObserver;
049import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
051import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
052import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
053import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Category({LargeTests.class, ClientTests.class})
071public class TestReplicaWithCluster {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
078
079  private static final int NB_SERVERS = 3;
080  private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
081  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
082
083  // second minicluster used in testing of replication
084  private static HBaseTestingUtility HTU2;
085  private static final byte[] f = HConstants.CATALOG_FAMILY;
086
087  private final static int REFRESH_PERIOD = 1000;
088  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
089
090  @Rule
091  public TestName name = new TestName();
092
093  /**
094   * This copro is used to synchronize the tests.
095   */
096  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
097    static final AtomicLong sleepTime = new AtomicLong(0);
098    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
099
100    public SlowMeCopro() {
101    }
102
103    @Override
104    public Optional<RegionObserver> getRegionObserver() {
105      return Optional.of(this);
106    }
107
108    @Override
109    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
110                         final Get get, final List<Cell> results) throws IOException {
111
112      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
113        CountDownLatch latch = cdl.get();
114        try {
115          if (sleepTime.get() > 0) {
116            LOG.info("Sleeping for " + sleepTime.get() + " ms");
117            Thread.sleep(sleepTime.get());
118          } else if (latch.getCount() > 0) {
119            LOG.info("Waiting for the counterCountDownLatch");
120            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
121            if (latch.getCount() > 0) {
122              throw new RuntimeException("Can't wait more");
123            }
124          }
125        } catch (InterruptedException e1) {
126          LOG.error(e1.toString(), e1);
127        }
128      } else {
129        LOG.info("We're not the primary replicas.");
130      }
131    }
132  }
133
134  /**
135   * This copro is used to simulate region server down exception for Get and Scan
136   */
137  @CoreCoprocessor
138  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
139
140    public RegionServerStoppedCopro() {
141    }
142
143    @Override
144    public Optional<RegionObserver> getRegionObserver() {
145      return Optional.of(this);
146    }
147
148    @Override
149    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
150        final Get get, final List<Cell> results) throws IOException {
151
152      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
153
154      // Fail for the primary replica and replica 1
155      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
156        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
157        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
158            + " not running");
159      } else {
160        LOG.info("We're replica region " + replicaId);
161      }
162    }
163
164    @Override
165    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
166        final Scan scan) throws IOException {
167      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
168      // Fail for the primary replica and replica 1
169      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
170        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
171        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
172            + " not running");
173      } else {
174        LOG.info("We're replica region " + replicaId);
175      }
176    }
177  }
178
179  /**
180   * This copro is used to slow down the primary meta region scan a bit
181   */
182  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
183      implements RegionCoprocessor, RegionObserver {
184    static boolean slowDownPrimaryMetaScan = false;
185    static boolean throwException = false;
186
187    @Override
188    public Optional<RegionObserver> getRegionObserver() {
189      return Optional.of(this);
190    }
191
192    @Override
193    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
194        final Get get, final List<Cell> results) throws IOException {
195
196      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
197
198      // Fail for the primary replica, but not for meta
199      if (throwException) {
200        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
201          LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
202              .getRegion().getRegionInfo());
203          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
204                  + " not running");
205        }
206      } else {
207        LOG.info("Get, We're replica region " + replicaId);
208      }
209    }
210
211    @Override
212    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
213        final Scan scan) throws IOException {
214
215      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
216
217      // Slow down with the primary meta region scan
218      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
219        if (slowDownPrimaryMetaScan) {
220          LOG.info("Scan with primary meta region, slow down a bit");
221          try {
222            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
223          } catch (InterruptedException ie) {
224            // Ingore
225          }
226        }
227
228        // Fail for the primary replica
229        if (throwException) {
230          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
231              .getRegion().getRegionInfo());
232
233          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
234               + " not running");
235        } else {
236          LOG.info("Scan, We're replica region " + replicaId);
237        }
238      } else {
239        LOG.info("Scan, We're replica region " + replicaId);
240      }
241    }
242  }
243
244  @BeforeClass
245  public static void beforeClass() throws Exception {
246    // enable store file refreshing
247    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
248        REFRESH_PERIOD);
249
250    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
251    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
252    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
253    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
254    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
255    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
256    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
257
258    // Wait for primary call longer so make sure that it will get exception from the primary call
259    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
260    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
261
262    // Make sure master does not host system tables.
263    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
264
265    // Set system coprocessor so it can be applied to meta regions
266    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
267        RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
268
269    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
270        META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
271
272    HTU.startMiniCluster(NB_SERVERS);
273    // Enable meta replica at server side
274    HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
275
276    HTU.getHBaseCluster().startMaster();
277  }
278
279  @AfterClass
280  public static void afterClass() throws Exception {
281    if (HTU2 != null)
282      HTU2.shutdownMiniCluster();
283    HTU.shutdownMiniCluster();
284  }
285
286  @Test
287  public void testCreateDeleteTable() throws IOException {
288    // Create table then get the single region for our new table.
289    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
290    hdt.setRegionReplication(NB_SERVERS);
291    hdt.addCoprocessor(SlowMeCopro.class.getName());
292    Table table = HTU.createTable(hdt, new byte[][]{f}, null);
293
294    Put p = new Put(row);
295    p.addColumn(f, row, row);
296    table.put(p);
297
298    Get g = new Get(row);
299    Result r = table.get(g);
300    Assert.assertFalse(r.isStale());
301
302    try {
303      // But if we ask for stale we will get it
304      SlowMeCopro.cdl.set(new CountDownLatch(1));
305      g = new Get(row);
306      g.setConsistency(Consistency.TIMELINE);
307      r = table.get(g);
308      Assert.assertTrue(r.isStale());
309      SlowMeCopro.cdl.get().countDown();
310    } finally {
311      SlowMeCopro.cdl.get().countDown();
312      SlowMeCopro.sleepTime.set(0);
313    }
314
315    HTU.getAdmin().disableTable(hdt.getTableName());
316    HTU.deleteTable(hdt.getTableName());
317  }
318
319  @Test
320  public void testChangeTable() throws Exception {
321    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
322            .setRegionReplication(NB_SERVERS)
323            .setCoprocessor(SlowMeCopro.class.getName())
324            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
325            .build();
326    HTU.getAdmin().createTable(td);
327    Table table = HTU.getConnection().getTable(td.getTableName());
328    // basic test: it should work.
329    Put p = new Put(row);
330    p.addColumn(f, row, row);
331    table.put(p);
332
333    Get g = new Get(row);
334    Result r = table.get(g);
335    Assert.assertFalse(r.isStale());
336
337    // Add a CF, it should work.
338    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
339    td = TableDescriptorBuilder.newBuilder(td)
340            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row))
341            .build();
342    HTU.getAdmin().disableTable(td.getTableName());
343    HTU.getAdmin().modifyTable(td);
344    HTU.getAdmin().enableTable(td.getTableName());
345    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
346    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
347        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
348
349    p = new Put(row);
350    p.addColumn(row, row, row);
351    table.put(p);
352
353    g = new Get(row);
354    r = table.get(g);
355    Assert.assertFalse(r.isStale());
356
357    try {
358      SlowMeCopro.cdl.set(new CountDownLatch(1));
359      g = new Get(row);
360      g.setConsistency(Consistency.TIMELINE);
361      r = table.get(g);
362      Assert.assertTrue(r.isStale());
363    } finally {
364      SlowMeCopro.cdl.get().countDown();
365      SlowMeCopro.sleepTime.set(0);
366    }
367
368    Admin admin = HTU.getAdmin();
369    nHdt =admin.getDescriptor(td.getTableName());
370    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
371        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
372
373    admin.disableTable(td.getTableName());
374    admin.deleteTable(td.getTableName());
375    admin.close();
376  }
377
378  @SuppressWarnings("deprecation")
379  @Test
380  public void testReplicaAndReplication() throws Exception {
381    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
382    hdt.setRegionReplication(NB_SERVERS);
383
384    HColumnDescriptor fam = new HColumnDescriptor(row);
385    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
386    hdt.addFamily(fam);
387
388    hdt.addCoprocessor(SlowMeCopro.class.getName());
389    HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
390
391    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
392    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
393    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
394    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
395
396    HTU2 = new HBaseTestingUtility(conf2);
397    HTU2.setZkCluster(miniZK);
398    HTU2.startMiniCluster(NB_SERVERS);
399    LOG.info("Setup second Zk");
400    HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
401
402    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
403
404    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
405    rpc.setClusterKey(HTU2.getClusterKey());
406    admin.addPeer("2", rpc, null);
407    admin.close();
408
409    Put p = new Put(row);
410    p.addColumn(row, row, row);
411    final Table table = HTU.getConnection().getTable(hdt.getTableName());
412    table.put(p);
413
414    HTU.getAdmin().flush(table.getName());
415    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
416
417    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
418      @Override public boolean evaluate() throws Exception {
419        try {
420          SlowMeCopro.cdl.set(new CountDownLatch(1));
421          Get g = new Get(row);
422          g.setConsistency(Consistency.TIMELINE);
423          Result r = table.get(g);
424          Assert.assertTrue(r.isStale());
425          return !r.isEmpty();
426        } finally {
427          SlowMeCopro.cdl.get().countDown();
428          SlowMeCopro.sleepTime.set(0);
429        }
430      }
431    });
432    table.close();
433    LOG.info("stale get on the first cluster done. Now for the second.");
434
435    final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
436    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
437      @Override public boolean evaluate() throws Exception {
438        try {
439          SlowMeCopro.cdl.set(new CountDownLatch(1));
440          Get g = new Get(row);
441          g.setConsistency(Consistency.TIMELINE);
442          Result r = table2.get(g);
443          Assert.assertTrue(r.isStale());
444          return !r.isEmpty();
445        } finally {
446          SlowMeCopro.cdl.get().countDown();
447          SlowMeCopro.sleepTime.set(0);
448        }
449      }
450    });
451    table2.close();
452
453    HTU.getAdmin().disableTable(hdt.getTableName());
454    HTU.deleteTable(hdt.getTableName());
455
456    HTU2.getAdmin().disableTable(hdt.getTableName());
457    HTU2.deleteTable(hdt.getTableName());
458
459    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
460    // the minicluster has negative impact of deleting all HConnections in JVM.
461  }
462
463  @Test
464  public void testBulkLoad() throws IOException {
465    // Create table then get the single region for our new table.
466    LOG.debug("Creating test table");
467    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
468    hdt.setRegionReplication(NB_SERVERS);
469    hdt.addCoprocessor(SlowMeCopro.class.getName());
470    Table table = HTU.createTable(hdt, new byte[][]{f}, null);
471
472    // create hfiles to load.
473    LOG.debug("Creating test data");
474    Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName());
475    final int numRows = 10;
476    final byte[] qual = Bytes.toBytes("qual");
477    final byte[] val  = Bytes.toBytes("val");
478    final List<Pair<byte[], String>> famPaths = new ArrayList<>();
479    for (HColumnDescriptor col : hdt.getColumnFamilies()) {
480      Path hfile = new Path(dir, col.getNameAsString());
481      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
482        qual, val, numRows);
483      famPaths.add(new Pair<>(col.getName(), hfile.toString()));
484    }
485
486    // bulk load HFiles
487    LOG.debug("Loading test data");
488    final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
489    table = conn.getTable(hdt.getTableName());
490    final String bulkToken =
491        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
492    ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
493        hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
494        new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
495      @Override
496      protected Void rpcCall() throws Exception {
497        LOG.debug("Going to connect to server " + getLocation() + " for row "
498            + Bytes.toStringBinary(getRow()));
499        SecureBulkLoadClient secureClient = null;
500        byte[] regionName = getLocation().getRegionInfo().getRegionName();
501        try (Table table = conn.getTable(getTableName())) {
502          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
503          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
504              true, null, bulkToken);
505        }
506        return null;
507      }
508    };
509    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
510    RpcRetryingCaller<Void> caller = factory.newCaller();
511    caller.callWithRetries(callable, 10000);
512
513    // verify we can read them from the primary
514    LOG.debug("Verifying data load");
515    for (int i = 0; i < numRows; i++) {
516      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
517      Get g = new Get(row);
518      Result r = table.get(g);
519      Assert.assertFalse(r.isStale());
520    }
521
522    // verify we can read them from the replica
523    LOG.debug("Verifying replica queries");
524    try {
525      SlowMeCopro.cdl.set(new CountDownLatch(1));
526      for (int i = 0; i < numRows; i++) {
527        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
528        Get g = new Get(row);
529        g.setConsistency(Consistency.TIMELINE);
530        Result r = table.get(g);
531        Assert.assertTrue(r.isStale());
532      }
533      SlowMeCopro.cdl.get().countDown();
534    } finally {
535      SlowMeCopro.cdl.get().countDown();
536      SlowMeCopro.sleepTime.set(0);
537    }
538
539    HTU.getAdmin().disableTable(hdt.getTableName());
540    HTU.deleteTable(hdt.getTableName());
541  }
542
543  @Test
544  public void testReplicaGetWithPrimaryDown() throws IOException {
545    // Create table then get the single region for our new table.
546    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
547    hdt.setRegionReplication(NB_SERVERS);
548    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
549    try {
550      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
551
552      Put p = new Put(row);
553      p.addColumn(f, row, row);
554      table.put(p);
555
556      // Flush so it can be picked by the replica refresher thread
557      HTU.flush(table.getName());
558
559      // Sleep for some time until data is picked up by replicas
560      try {
561        Thread.sleep(2 * REFRESH_PERIOD);
562      } catch (InterruptedException e1) {
563        LOG.error(e1.toString(), e1);
564      }
565
566      // But if we ask for stale we will get it
567      Get g = new Get(row);
568      g.setConsistency(Consistency.TIMELINE);
569      Result r = table.get(g);
570      Assert.assertTrue(r.isStale());
571    } finally {
572      HTU.getAdmin().disableTable(hdt.getTableName());
573      HTU.deleteTable(hdt.getTableName());
574    }
575  }
576
577  @Test
578  public void testReplicaScanWithPrimaryDown() throws IOException {
579    // Create table then get the single region for our new table.
580    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
581    hdt.setRegionReplication(NB_SERVERS);
582    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
583
584    try {
585      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
586
587      Put p = new Put(row);
588      p.addColumn(f, row, row);
589      table.put(p);
590
591      // Flush so it can be picked by the replica refresher thread
592      HTU.flush(table.getName());
593
594      // Sleep for some time until data is picked up by replicas
595      try {
596        Thread.sleep(2 * REFRESH_PERIOD);
597      } catch (InterruptedException e1) {
598        LOG.error(e1.toString(), e1);
599      }
600
601      // But if we ask for stale we will get it
602      // Instantiating the Scan class
603      Scan scan = new Scan();
604
605      // Scanning the required columns
606      scan.addFamily(f);
607      scan.setConsistency(Consistency.TIMELINE);
608
609      // Getting the scan result
610      ResultScanner scanner = table.getScanner(scan);
611
612      Result r = scanner.next();
613
614      Assert.assertTrue(r.isStale());
615    } finally {
616      HTU.getAdmin().disableTable(hdt.getTableName());
617      HTU.deleteTable(hdt.getTableName());
618    }
619  }
620
621  @Test
622  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
623    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
624    HTU.getConfiguration().set(
625        "hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
626    // Create table then get the single region for our new table.
627    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
628    hdt.setRegionReplication(NB_SERVERS);
629    hdt.addCoprocessor(SlowMeCopro.class.getName());
630
631    try {
632      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
633
634      Put p = new Put(row);
635      p.addColumn(f, row, row);
636      table.put(p);
637
638      // Flush so it can be picked by the replica refresher thread
639      HTU.flush(table.getName());
640
641      // Sleep for some time until data is picked up by replicas
642      try {
643        Thread.sleep(2 * REFRESH_PERIOD);
644      } catch (InterruptedException e1) {
645        LOG.error(e1.toString(), e1);
646      }
647
648      try {
649        // Create the new connection so new config can kick in
650        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
651        Table t = connection.getTable(hdt.getTableName());
652
653        // But if we ask for stale we will get it
654        SlowMeCopro.cdl.set(new CountDownLatch(1));
655        Get g = new Get(row);
656        g.setConsistency(Consistency.TIMELINE);
657        Result r = t.get(g);
658        Assert.assertTrue(r.isStale());
659        SlowMeCopro.cdl.get().countDown();
660      } finally {
661        SlowMeCopro.cdl.get().countDown();
662        SlowMeCopro.sleepTime.set(0);
663      }
664    } finally {
665      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
666      HTU.getConfiguration().unset("hbase.rpc.client.impl");
667      HTU.getAdmin().disableTable(hdt.getTableName());
668      HTU.deleteTable(hdt.getTableName());
669    }
670  }
671
672  // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table
673  // scan will always get the result from primary meta region as long as the result is returned
674  // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region.
675  @Test
676  public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
677    HTU.getAdmin().setBalancerRunning(false, true);
678    Configuration conf = new Configuration(HTU.getConfiguration());
679    conf.setBoolean(USE_META_REPLICAS, true);
680    Connection conn = ConnectionFactory.createConnection(conf);
681
682    // Create table then get the single region for our new table.
683    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
684    hdt.setRegionReplication(2);
685    try {
686
687      HTU.createTable(hdt, new byte[][] { f }, null);
688
689      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
690
691      // Get user table location, always get it from the primary meta replica
692      RegionLocations url = ((ClusterConnection)conn)
693          .locateRegion(hdt.getTableName(), row, false, false);
694
695    } finally {
696      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
697      HTU.getAdmin().setBalancerRunning(true, true);
698      HTU.getAdmin().disableTable(hdt.getTableName());
699      HTU.deleteTable(hdt.getTableName());
700    }
701  }
702
703
704  // This test is to simulate the case that the meta region and the primary user region
705  // are down, hbase client is able to access user replica regions and return stale data.
706  // Meta replica is enabled to show the case that the meta replica region could be out of sync
707  // with the primary meta region.
708  @Test
709  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
710    HTU.getAdmin().setBalancerRunning(false, true);
711
712    Configuration conf = new Configuration(HTU.getConfiguration());
713    conf.setBoolean(USE_META_REPLICAS, true);
714    Connection conn = ConnectionFactory.createConnection(conf);
715
716    // Create table then get the single region for our new table.
717    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
718    hdt.setRegionReplication(2);
719    try {
720
721      HTU.createTable(hdt, new byte[][] { f }, null);
722      Table table = conn.getTable(TableName.valueOf(name.getMethodName()));
723
724      // Get Meta location
725      RegionLocations mrl = ((ClusterConnection)conn)
726          .locateRegion(TableName.META_TABLE_NAME,
727              HConstants.EMPTY_START_ROW, false, false);
728
729      // Get user table location
730      RegionLocations url = ((ClusterConnection)conn)
731          .locateRegion(hdt.getTableName(), row, false, false);
732
733      // Make sure that user primary region is co-hosted with the meta region
734      if (!url.getDefaultRegionLocation().getServerName().equals(
735          mrl.getDefaultRegionLocation().getServerName())) {
736        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
737            mrl.getDefaultRegionLocation().getServerName());
738      }
739
740      // Make sure that the user replica region is not hosted by the same region server with
741      // primary
742      if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
743          .getServerName())) {
744        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
745            url.getDefaultRegionLocation().getServerName());
746      }
747
748      // Wait until the meta table is updated with new location info
749      while (true) {
750        mrl = ((ClusterConnection)conn)
751            .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
752
753        // Get user table location
754        url = ((ClusterConnection)conn)
755            .locateRegion(hdt.getTableName(), row, false, true);
756
757        LOG.info("meta locations " + mrl);
758        LOG.info("table locations " + url);
759        ServerName a = url.getDefaultRegionLocation().getServerName();
760        ServerName b = mrl.getDefaultRegionLocation().getServerName();
761        if(a.equals(b)) {
762          break;
763        } else {
764          LOG.info("Waiting for new region info to be updated in meta table");
765          Thread.sleep(100);
766        }
767      }
768
769      Put p = new Put(row);
770      p.addColumn(f, row, row);
771      table.put(p);
772
773      // Flush so it can be picked by the replica refresher thread
774      HTU.flush(table.getName());
775
776      // Sleep for some time until data is picked up by replicas
777      try {
778        Thread.sleep(2 * REFRESH_PERIOD);
779      } catch (InterruptedException e1) {
780        LOG.error(e1.toString(), e1);
781      }
782
783      // Simulating the RS down
784      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
785
786      // The first Get is supposed to succeed
787      Get g = new Get(row);
788      g.setConsistency(Consistency.TIMELINE);
789      Result r = table.get(g);
790      Assert.assertTrue(r.isStale());
791
792      // The second Get will succeed as well
793      r = table.get(g);
794      Assert.assertTrue(r.isStale());
795    } finally {
796      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
797      HTU.getAdmin().setBalancerRunning(true, true);
798      HTU.getAdmin().disableTable(hdt.getTableName());
799      HTU.deleteTable(hdt.getTableName());
800    }
801  }
802}