001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.concurrent.atomic.AtomicReference;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HColumnDescriptor;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HTableDescriptor;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
043import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
049import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
050import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
051import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
053import org.apache.hadoop.hbase.testclassification.ClientTests;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
058import org.junit.AfterClass;
059import org.junit.Assert;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067@Category({MediumTests.class, ClientTests.class})
068public class TestReplicaWithCluster {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
075
076  private static final int NB_SERVERS = 3;
077  private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
078  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
079
080  // second minicluster used in testing of replication
081  private static HBaseTestingUtility HTU2;
082  private static final byte[] f = HConstants.CATALOG_FAMILY;
083
084  private final static int REFRESH_PERIOD = 1000;
085  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
086
087  /**
088   * This copro is used to synchronize the tests.
089   */
090  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
091    static final AtomicLong sleepTime = new AtomicLong(0);
092    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
093
094    public SlowMeCopro() {
095    }
096
097    @Override
098    public Optional<RegionObserver> getRegionObserver() {
099      return Optional.of(this);
100    }
101
102    @Override
103    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
104                         final Get get, final List<Cell> results) throws IOException {
105
106      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
107        CountDownLatch latch = cdl.get();
108        try {
109          if (sleepTime.get() > 0) {
110            LOG.info("Sleeping for " + sleepTime.get() + " ms");
111            Thread.sleep(sleepTime.get());
112          } else if (latch.getCount() > 0) {
113            LOG.info("Waiting for the counterCountDownLatch");
114            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
115            if (latch.getCount() > 0) {
116              throw new RuntimeException("Can't wait more");
117            }
118          }
119        } catch (InterruptedException e1) {
120          LOG.error(e1.toString(), e1);
121        }
122      } else {
123        LOG.info("We're not the primary replicas.");
124      }
125    }
126  }
127
128  /**
129   * This copro is used to simulate region server down exception for Get and Scan
130   */
131  @CoreCoprocessor
132  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
133
134    public RegionServerStoppedCopro() {
135    }
136
137    @Override
138    public Optional<RegionObserver> getRegionObserver() {
139      return Optional.of(this);
140    }
141
142    @Override
143    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
144        final Get get, final List<Cell> results) throws IOException {
145
146      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
147
148      // Fail for the primary replica and replica 1
149      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
150        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
151        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
152            + " not running");
153      } else {
154        LOG.info("We're replica region " + replicaId);
155      }
156    }
157
158    @Override
159    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
160        final Scan scan) throws IOException {
161      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
162      // Fail for the primary replica and replica 1
163      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
164        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
165        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
166            + " not running");
167      } else {
168        LOG.info("We're replica region " + replicaId);
169      }
170    }
171  }
172
173  /**
174   * This copro is used to slow down the primary meta region scan a bit
175   */
176  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
177      implements RegionCoprocessor, RegionObserver {
178    static boolean slowDownPrimaryMetaScan = false;
179    static boolean throwException = false;
180
181    @Override
182    public Optional<RegionObserver> getRegionObserver() {
183      return Optional.of(this);
184    }
185
186    @Override
187    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
188        final Get get, final List<Cell> results) throws IOException {
189
190      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
191
192      // Fail for the primary replica, but not for meta
193      if (throwException) {
194        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
195          LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
196              .getRegion().getRegionInfo());
197          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
198                  + " not running");
199        }
200      } else {
201        LOG.info("Get, We're replica region " + replicaId);
202      }
203    }
204
205    @Override
206    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
207        final Scan scan) throws IOException {
208
209      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
210
211      // Slow down with the primary meta region scan
212      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
213        if (slowDownPrimaryMetaScan) {
214          LOG.info("Scan with primary meta region, slow down a bit");
215          try {
216            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
217          } catch (InterruptedException ie) {
218            // Ingore
219          }
220        }
221
222        // Fail for the primary replica
223        if (throwException) {
224          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
225              .getRegion().getRegionInfo());
226
227          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
228               + " not running");
229        } else {
230          LOG.info("Scan, We're replica region " + replicaId);
231        }
232      } else {
233        LOG.info("Scan, We're replica region " + replicaId);
234      }
235    }
236  }
237
238  @BeforeClass
239  public static void beforeClass() throws Exception {
240    // enable store file refreshing
241    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
242        REFRESH_PERIOD);
243
244    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
245    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
246    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
247    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
248    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
249    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
250    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
251
252    // Wait for primary call longer so make sure that it will get exception from the primary call
253    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
254    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
255
256    // Retry less so it can fail faster
257    HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
258
259    // Enable meta replica at server side
260    HTU.getConfiguration().setInt("hbase.meta.replica.count", 2);
261
262    // Make sure master does not host system tables.
263    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
264
265    // Set system coprocessor so it can be applied to meta regions
266    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
267        RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
268
269    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
270        META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
271
272    HTU.startMiniCluster(NB_SERVERS);
273    HTU.getHBaseCluster().startMaster();
274  }
275
276  @AfterClass
277  public static void afterClass() throws Exception {
278    if (HTU2 != null)
279      HTU2.shutdownMiniCluster();
280    HTU.shutdownMiniCluster();
281  }
282
283  @Test
284  public void testCreateDeleteTable() throws IOException {
285    // Create table then get the single region for our new table.
286    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
287    hdt.setRegionReplication(NB_SERVERS);
288    hdt.addCoprocessor(SlowMeCopro.class.getName());
289    Table table = HTU.createTable(hdt, new byte[][]{f}, null);
290
291    Put p = new Put(row);
292    p.addColumn(f, row, row);
293    table.put(p);
294
295    Get g = new Get(row);
296    Result r = table.get(g);
297    Assert.assertFalse(r.isStale());
298
299    try {
300      // But if we ask for stale we will get it
301      SlowMeCopro.cdl.set(new CountDownLatch(1));
302      g = new Get(row);
303      g.setConsistency(Consistency.TIMELINE);
304      r = table.get(g);
305      Assert.assertTrue(r.isStale());
306      SlowMeCopro.cdl.get().countDown();
307    } finally {
308      SlowMeCopro.cdl.get().countDown();
309      SlowMeCopro.sleepTime.set(0);
310    }
311
312    HTU.getAdmin().disableTable(hdt.getTableName());
313    HTU.deleteTable(hdt.getTableName());
314  }
315
316  @Test
317  public void testChangeTable() throws Exception {
318    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
319            .setRegionReplication(NB_SERVERS)
320            .setCoprocessor(SlowMeCopro.class.getName())
321            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
322            .build();
323    HTU.getAdmin().createTable(td);
324    Table table = HTU.getConnection().getTable(td.getTableName());
325    // basic test: it should work.
326    Put p = new Put(row);
327    p.addColumn(f, row, row);
328    table.put(p);
329
330    Get g = new Get(row);
331    Result r = table.get(g);
332    Assert.assertFalse(r.isStale());
333
334    // Add a CF, it should work.
335    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
336    td = TableDescriptorBuilder.newBuilder(td)
337            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row))
338            .build();
339    HTU.getAdmin().disableTable(td.getTableName());
340    HTU.getAdmin().modifyTable(td);
341    HTU.getAdmin().enableTable(td.getTableName());
342    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
343    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
344        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
345
346    p = new Put(row);
347    p.addColumn(row, row, row);
348    table.put(p);
349
350    g = new Get(row);
351    r = table.get(g);
352    Assert.assertFalse(r.isStale());
353
354    try {
355      SlowMeCopro.cdl.set(new CountDownLatch(1));
356      g = new Get(row);
357      g.setConsistency(Consistency.TIMELINE);
358      r = table.get(g);
359      Assert.assertTrue(r.isStale());
360    } finally {
361      SlowMeCopro.cdl.get().countDown();
362      SlowMeCopro.sleepTime.set(0);
363    }
364
365    Admin admin = HTU.getAdmin();
366    nHdt =admin.getDescriptor(td.getTableName());
367    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
368        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
369
370    admin.disableTable(td.getTableName());
371    admin.deleteTable(td.getTableName());
372    admin.close();
373  }
374
375  @SuppressWarnings("deprecation")
376  @Test
377  public void testReplicaAndReplication() throws Exception {
378    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
379    hdt.setRegionReplication(NB_SERVERS);
380
381    HColumnDescriptor fam = new HColumnDescriptor(row);
382    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
383    hdt.addFamily(fam);
384
385    hdt.addCoprocessor(SlowMeCopro.class.getName());
386    HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
387
388    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
389    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
390    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
391    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
392
393    HTU2 = new HBaseTestingUtility(conf2);
394    HTU2.setZkCluster(miniZK);
395    HTU2.startMiniCluster(NB_SERVERS);
396    LOG.info("Setup second Zk");
397    HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
398
399    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
400
401    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
402    rpc.setClusterKey(HTU2.getClusterKey());
403    admin.addPeer("2", rpc, null);
404    admin.close();
405
406    Put p = new Put(row);
407    p.addColumn(row, row, row);
408    final Table table = HTU.getConnection().getTable(hdt.getTableName());
409    table.put(p);
410
411    HTU.getAdmin().flush(table.getName());
412    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
413
414    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
415      @Override public boolean evaluate() throws Exception {
416        try {
417          SlowMeCopro.cdl.set(new CountDownLatch(1));
418          Get g = new Get(row);
419          g.setConsistency(Consistency.TIMELINE);
420          Result r = table.get(g);
421          Assert.assertTrue(r.isStale());
422          return !r.isEmpty();
423        } finally {
424          SlowMeCopro.cdl.get().countDown();
425          SlowMeCopro.sleepTime.set(0);
426        }
427      }
428    });
429    table.close();
430    LOG.info("stale get on the first cluster done. Now for the second.");
431
432    final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
433    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
434      @Override public boolean evaluate() throws Exception {
435        try {
436          SlowMeCopro.cdl.set(new CountDownLatch(1));
437          Get g = new Get(row);
438          g.setConsistency(Consistency.TIMELINE);
439          Result r = table2.get(g);
440          Assert.assertTrue(r.isStale());
441          return !r.isEmpty();
442        } finally {
443          SlowMeCopro.cdl.get().countDown();
444          SlowMeCopro.sleepTime.set(0);
445        }
446      }
447    });
448    table2.close();
449
450    HTU.getAdmin().disableTable(hdt.getTableName());
451    HTU.deleteTable(hdt.getTableName());
452
453    HTU2.getAdmin().disableTable(hdt.getTableName());
454    HTU2.deleteTable(hdt.getTableName());
455
456    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
457    // the minicluster has negative impact of deleting all HConnections in JVM.
458  }
459
460  @Test
461  public void testBulkLoad() throws IOException {
462    // Create table then get the single region for our new table.
463    LOG.debug("Creating test table");
464    HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
465    hdt.setRegionReplication(NB_SERVERS);
466    hdt.addCoprocessor(SlowMeCopro.class.getName());
467    Table table = HTU.createTable(hdt, new byte[][]{f}, null);
468
469    // create hfiles to load.
470    LOG.debug("Creating test data");
471    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
472    final int numRows = 10;
473    final byte[] qual = Bytes.toBytes("qual");
474    final byte[] val  = Bytes.toBytes("val");
475    final List<Pair<byte[], String>> famPaths = new ArrayList<>();
476    for (HColumnDescriptor col : hdt.getColumnFamilies()) {
477      Path hfile = new Path(dir, col.getNameAsString());
478      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
479        qual, val, numRows);
480      famPaths.add(new Pair<>(col.getName(), hfile.toString()));
481    }
482
483    // bulk load HFiles
484    LOG.debug("Loading test data");
485    final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
486    table = conn.getTable(hdt.getTableName());
487    final String bulkToken =
488        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
489    ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
490        hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
491        new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
492      @Override
493      protected Void rpcCall() throws Exception {
494        LOG.debug("Going to connect to server " + getLocation() + " for row "
495            + Bytes.toStringBinary(getRow()));
496        SecureBulkLoadClient secureClient = null;
497        byte[] regionName = getLocation().getRegionInfo().getRegionName();
498        try (Table table = conn.getTable(getTableName())) {
499          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
500          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
501              true, null, bulkToken);
502        }
503        return null;
504      }
505    };
506    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
507    RpcRetryingCaller<Void> caller = factory.newCaller();
508    caller.callWithRetries(callable, 10000);
509
510    // verify we can read them from the primary
511    LOG.debug("Verifying data load");
512    for (int i = 0; i < numRows; i++) {
513      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
514      Get g = new Get(row);
515      Result r = table.get(g);
516      Assert.assertFalse(r.isStale());
517    }
518
519    // verify we can read them from the replica
520    LOG.debug("Verifying replica queries");
521    try {
522      SlowMeCopro.cdl.set(new CountDownLatch(1));
523      for (int i = 0; i < numRows; i++) {
524        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
525        Get g = new Get(row);
526        g.setConsistency(Consistency.TIMELINE);
527        Result r = table.get(g);
528        Assert.assertTrue(r.isStale());
529      }
530      SlowMeCopro.cdl.get().countDown();
531    } finally {
532      SlowMeCopro.cdl.get().countDown();
533      SlowMeCopro.sleepTime.set(0);
534    }
535
536    HTU.getAdmin().disableTable(hdt.getTableName());
537    HTU.deleteTable(hdt.getTableName());
538  }
539
540  @Test
541  public void testReplicaGetWithPrimaryDown() throws IOException {
542    // Create table then get the single region for our new table.
543    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
544    hdt.setRegionReplication(NB_SERVERS);
545    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
546    try {
547      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
548
549      Put p = new Put(row);
550      p.addColumn(f, row, row);
551      table.put(p);
552
553      // Flush so it can be picked by the replica refresher thread
554      HTU.flush(table.getName());
555
556      // Sleep for some time until data is picked up by replicas
557      try {
558        Thread.sleep(2 * REFRESH_PERIOD);
559      } catch (InterruptedException e1) {
560        LOG.error(e1.toString(), e1);
561      }
562
563      // But if we ask for stale we will get it
564      Get g = new Get(row);
565      g.setConsistency(Consistency.TIMELINE);
566      Result r = table.get(g);
567      Assert.assertTrue(r.isStale());
568    } finally {
569      HTU.getAdmin().disableTable(hdt.getTableName());
570      HTU.deleteTable(hdt.getTableName());
571    }
572  }
573
574  @Test
575  public void testReplicaScanWithPrimaryDown() throws IOException {
576    // Create table then get the single region for our new table.
577    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
578    hdt.setRegionReplication(NB_SERVERS);
579    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
580
581    try {
582      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
583
584      Put p = new Put(row);
585      p.addColumn(f, row, row);
586      table.put(p);
587
588      // Flush so it can be picked by the replica refresher thread
589      HTU.flush(table.getName());
590
591      // Sleep for some time until data is picked up by replicas
592      try {
593        Thread.sleep(2 * REFRESH_PERIOD);
594      } catch (InterruptedException e1) {
595        LOG.error(e1.toString(), e1);
596      }
597
598      // But if we ask for stale we will get it
599      // Instantiating the Scan class
600      Scan scan = new Scan();
601
602      // Scanning the required columns
603      scan.addFamily(f);
604      scan.setConsistency(Consistency.TIMELINE);
605
606      // Getting the scan result
607      ResultScanner scanner = table.getScanner(scan);
608
609      Result r = scanner.next();
610
611      Assert.assertTrue(r.isStale());
612    } finally {
613      HTU.getAdmin().disableTable(hdt.getTableName());
614      HTU.deleteTable(hdt.getTableName());
615    }
616  }
617
618  @Test
619  public void testReplicaGetWithRpcClientImpl() throws IOException {
620    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
621    HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl");
622    // Create table then get the single region for our new table.
623    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl");
624    hdt.setRegionReplication(NB_SERVERS);
625    hdt.addCoprocessor(SlowMeCopro.class.getName());
626
627    try {
628      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
629
630      Put p = new Put(row);
631      p.addColumn(f, row, row);
632      table.put(p);
633
634      // Flush so it can be picked by the replica refresher thread
635      HTU.flush(table.getName());
636
637      // Sleep for some time until data is picked up by replicas
638      try {
639        Thread.sleep(2 * REFRESH_PERIOD);
640      } catch (InterruptedException e1) {
641        LOG.error(e1.toString(), e1);
642      }
643
644      try {
645        // Create the new connection so new config can kick in
646        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
647        Table t = connection.getTable(hdt.getTableName());
648
649        // But if we ask for stale we will get it
650        SlowMeCopro.cdl.set(new CountDownLatch(1));
651        Get g = new Get(row);
652        g.setConsistency(Consistency.TIMELINE);
653        Result r = t.get(g);
654        Assert.assertTrue(r.isStale());
655        SlowMeCopro.cdl.get().countDown();
656      } finally {
657        SlowMeCopro.cdl.get().countDown();
658        SlowMeCopro.sleepTime.set(0);
659      }
660    } finally {
661      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
662      HTU.getConfiguration().unset("hbase.rpc.client.impl");
663      HTU.getAdmin().disableTable(hdt.getTableName());
664      HTU.deleteTable(hdt.getTableName());
665    }
666  }
667
668  // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table
669  // scan will always get the result from primary meta region as long as the result is returned
670  // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region.
671  @Test
672  public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
673    HTU.getAdmin().setBalancerRunning(false, true);
674
675    ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
676
677    // Create table then get the single region for our new table.
678    HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
679    hdt.setRegionReplication(2);
680    try {
681
682      HTU.createTable(hdt, new byte[][] { f }, null);
683
684      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
685
686      // Get user table location, always get it from the primary meta replica
687      RegionLocations url = ((ClusterConnection) HTU.getConnection())
688          .locateRegion(hdt.getTableName(), row, false, false);
689
690    } finally {
691      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
692      ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
693      HTU.getAdmin().setBalancerRunning(true, true);
694      HTU.getAdmin().disableTable(hdt.getTableName());
695      HTU.deleteTable(hdt.getTableName());
696    }
697  }
698
699
700  // This test is to simulate the case that the meta region and the primary user region
701  // are down, hbase client is able to access user replica regions and return stale data.
702  // Meta replica is enabled to show the case that the meta replica region could be out of sync
703  // with the primary meta region.
704  @Test
705  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
706    HTU.getAdmin().setBalancerRunning(false, true);
707
708    ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
709
710    // Create table then get the single region for our new table.
711    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
712    hdt.setRegionReplication(2);
713    try {
714
715      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
716
717      // Get Meta location
718      RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
719          .locateRegion(TableName.META_TABLE_NAME,
720              HConstants.EMPTY_START_ROW, false, false);
721
722      // Get user table location
723      RegionLocations url = ((ClusterConnection) HTU.getConnection())
724          .locateRegion(hdt.getTableName(), row, false, false);
725
726      // Make sure that user primary region is co-hosted with the meta region
727      if (!url.getDefaultRegionLocation().getServerName().equals(
728          mrl.getDefaultRegionLocation().getServerName())) {
729        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
730            mrl.getDefaultRegionLocation().getServerName());
731      }
732
733      // Make sure that the user replica region is not hosted by the same region server with
734      // primary
735      if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
736          .getServerName())) {
737        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
738            url.getDefaultRegionLocation().getServerName());
739      }
740
741      // Wait until the meta table is updated with new location info
742      while (true) {
743        mrl = ((ClusterConnection) HTU.getConnection())
744            .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
745
746        // Get user table location
747        url = ((ClusterConnection) HTU.getConnection())
748            .locateRegion(hdt.getTableName(), row, false, true);
749
750        LOG.info("meta locations " + mrl);
751        LOG.info("table locations " + url);
752        ServerName a = url.getDefaultRegionLocation().getServerName();
753        ServerName b = mrl.getDefaultRegionLocation().getServerName();
754        if(a.equals(b)) {
755          break;
756        } else {
757          LOG.info("Waiting for new region info to be updated in meta table");
758          Thread.sleep(100);
759        }
760      }
761
762      Put p = new Put(row);
763      p.addColumn(f, row, row);
764      table.put(p);
765
766      // Flush so it can be picked by the replica refresher thread
767      HTU.flush(table.getName());
768
769      // Sleep for some time until data is picked up by replicas
770      try {
771        Thread.sleep(2 * REFRESH_PERIOD);
772      } catch (InterruptedException e1) {
773        LOG.error(e1.toString(), e1);
774      }
775
776      // Simulating the RS down
777      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
778
779      // The first Get is supposed to succeed
780      Get g = new Get(row);
781      g.setConsistency(Consistency.TIMELINE);
782      Result r = table.get(g);
783      Assert.assertTrue(r.isStale());
784
785      // The second Get will succeed as well
786      r = table.get(g);
787      Assert.assertTrue(r.isStale());
788
789    } finally {
790      ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
791      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
792      HTU.getAdmin().setBalancerRunning(true, true);
793      HTU.getAdmin().disableTable(hdt.getTableName());
794      HTU.deleteTable(hdt.getTableName());
795    }
796  }
797}