001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.TreeMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
046import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
047import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
054import org.junit.AfterClass;
055import org.junit.Assert;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({LargeTests.class, ClientTests.class})
064public class TestReplicaWithCluster {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
071
072  private static final int NB_SERVERS = 3;
073  private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName());
074  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
075
076  // second minicluster used in testing of replication
077  private static HBaseTestingUtil HTU2;
078  private static final byte[] f = HConstants.CATALOG_FAMILY;
079
080  private final static int REFRESH_PERIOD = 1000;
081  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
082
083  /**
084   * This copro is used to synchronize the tests.
085   */
086  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
087    static final AtomicLong sleepTime = new AtomicLong(0);
088    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
089
090    public SlowMeCopro() {
091    }
092
093    @Override
094    public Optional<RegionObserver> getRegionObserver() {
095      return Optional.of(this);
096    }
097
098    @Override
099    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
100                         final Get get, final List<Cell> results) throws IOException {
101
102      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
103        CountDownLatch latch = cdl.get();
104        try {
105          if (sleepTime.get() > 0) {
106            LOG.info("Sleeping for " + sleepTime.get() + " ms");
107            Thread.sleep(sleepTime.get());
108          } else if (latch.getCount() > 0) {
109            LOG.info("Waiting for the counterCountDownLatch");
110            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
111            if (latch.getCount() > 0) {
112              throw new RuntimeException("Can't wait more");
113            }
114          }
115        } catch (InterruptedException e1) {
116          LOG.error(e1.toString(), e1);
117        }
118      } else {
119        LOG.info("We're not the primary replicas.");
120      }
121    }
122  }
123
124  /**
125   * This copro is used to simulate region server down exception for Get and Scan
126   */
127  @CoreCoprocessor
128  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
129
130    public RegionServerStoppedCopro() {
131    }
132
133    @Override
134    public Optional<RegionObserver> getRegionObserver() {
135      return Optional.of(this);
136    }
137
138    @Override
139    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
140        final Get get, final List<Cell> results) throws IOException {
141
142      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
143
144      // Fail for the primary replica and replica 1
145      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
146        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
147        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
148            + " not running");
149      } else {
150        LOG.info("We're replica region " + replicaId);
151      }
152    }
153
154    @Override
155    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
156        final Scan scan) throws IOException {
157      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
158      // Fail for the primary replica and replica 1
159      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
160        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
161        throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
162            + " not running");
163      } else {
164        LOG.info("We're replica region " + replicaId);
165      }
166    }
167  }
168
169  /**
170   * This copro is used to slow down the primary meta region scan a bit
171   */
172  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
173      implements RegionCoprocessor, RegionObserver {
174    static boolean slowDownPrimaryMetaScan = false;
175    static boolean throwException = false;
176
177    @Override
178    public Optional<RegionObserver> getRegionObserver() {
179      return Optional.of(this);
180    }
181
182    @Override
183    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
184        final Get get, final List<Cell> results) throws IOException {
185
186      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
187
188      // Fail for the primary replica, but not for meta
189      if (throwException) {
190        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
191          LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
192              .getRegion().getRegionInfo());
193          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
194                  + " not running");
195        }
196      } else {
197        LOG.info("Get, We're replica region " + replicaId);
198      }
199    }
200
201    @Override
202    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
203        final Scan scan) throws IOException {
204
205      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
206
207      // Slow down with the primary meta region scan
208      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
209        if (slowDownPrimaryMetaScan) {
210          LOG.info("Scan with primary meta region, slow down a bit");
211          try {
212            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
213          } catch (InterruptedException ie) {
214            // Ingore
215          }
216        }
217
218        // Fail for the primary replica
219        if (throwException) {
220          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
221              .getRegion().getRegionInfo());
222
223          throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
224               + " not running");
225        } else {
226          LOG.info("Scan, We're replica region " + replicaId);
227        }
228      } else {
229        LOG.info("Scan, We're replica region " + replicaId);
230      }
231    }
232  }
233
234  @BeforeClass
235  public static void beforeClass() throws Exception {
236    // enable store file refreshing
237    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
238        REFRESH_PERIOD);
239
240    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
241    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
242    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
243    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
244    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
245    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
246    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
247
248    // Wait for primary call longer so make sure that it will get exception from the primary call
249    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
250    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
251
252    // Make sure master does not host system tables.
253    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
254
255    // Set system coprocessor so it can be applied to meta regions
256    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
257        RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
258
259    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
260        META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
261
262    HTU.startMiniCluster(NB_SERVERS);
263    // Enable meta replica at server side
264    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
265
266    HTU.getHBaseCluster().startMaster();
267  }
268
269  @AfterClass
270  public static void afterClass() throws Exception {
271    if (HTU2 != null)
272      HTU2.shutdownMiniCluster();
273    HTU.shutdownMiniCluster();
274  }
275
276  @Test
277  public void testCreateDeleteTable() throws IOException {
278    // Create table then get the single region for our new table.
279    TableDescriptorBuilder builder =
280      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
281        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
282        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
283    builder.setRegionReplication(NB_SERVERS);
284    builder.setCoprocessor(SlowMeCopro.class.getName());
285    TableDescriptor hdt = builder.build();
286    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
287
288    Put p = new Put(row);
289    p.addColumn(f, row, row);
290    table.put(p);
291
292    Get g = new Get(row);
293    Result r = table.get(g);
294    Assert.assertFalse(r.isStale());
295
296    try {
297      // But if we ask for stale we will get it
298      SlowMeCopro.cdl.set(new CountDownLatch(1));
299      g = new Get(row);
300      g.setConsistency(Consistency.TIMELINE);
301      r = table.get(g);
302      Assert.assertTrue(r.isStale());
303      SlowMeCopro.cdl.get().countDown();
304    } finally {
305      SlowMeCopro.cdl.get().countDown();
306      SlowMeCopro.sleepTime.set(0);
307    }
308
309    HTU.getAdmin().disableTable(hdt.getTableName());
310    HTU.deleteTable(hdt.getTableName());
311  }
312
313  @Test
314  public void testChangeTable() throws Exception {
315    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
316            .setRegionReplication(NB_SERVERS)
317            .setCoprocessor(SlowMeCopro.class.getName())
318            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
319            .build();
320    HTU.getAdmin().createTable(td);
321    Table table = HTU.getConnection().getTable(td.getTableName());
322    // basic test: it should work.
323    Put p = new Put(row);
324    p.addColumn(f, row, row);
325    table.put(p);
326
327    Get g = new Get(row);
328    Result r = table.get(g);
329    Assert.assertFalse(r.isStale());
330
331    // Add a CF, it should work.
332    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
333    td = TableDescriptorBuilder.newBuilder(td)
334            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row))
335            .build();
336    HTU.getAdmin().disableTable(td.getTableName());
337    HTU.getAdmin().modifyTable(td);
338    HTU.getAdmin().enableTable(td.getTableName());
339    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
340    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
341        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
342
343    p = new Put(row);
344    p.addColumn(row, row, row);
345    table.put(p);
346
347    g = new Get(row);
348    r = table.get(g);
349    Assert.assertFalse(r.isStale());
350
351    try {
352      SlowMeCopro.cdl.set(new CountDownLatch(1));
353      g = new Get(row);
354      g.setConsistency(Consistency.TIMELINE);
355      r = table.get(g);
356      Assert.assertTrue(r.isStale());
357    } finally {
358      SlowMeCopro.cdl.get().countDown();
359      SlowMeCopro.sleepTime.set(0);
360    }
361
362    Admin admin = HTU.getAdmin();
363    nHdt =admin.getDescriptor(td.getTableName());
364    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
365        bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
366
367    admin.disableTable(td.getTableName());
368    admin.deleteTable(td.getTableName());
369    admin.close();
370  }
371
372  @SuppressWarnings("deprecation")
373  @Test
374  public void testReplicaAndReplication() throws Exception {
375    TableDescriptorBuilder builder =
376      HTU.createModifyableTableDescriptor("testReplicaAndReplication");
377    builder.setRegionReplication(NB_SERVERS);
378    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row)
379      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
380
381    builder.setCoprocessor(SlowMeCopro.class.getName());
382    TableDescriptor tableDescriptor = builder.build();
383    HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
384
385    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
386    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
387    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
388    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
389
390    HTU2 = new HBaseTestingUtil(conf2);
391    HTU2.setZkCluster(miniZK);
392    HTU2.startMiniCluster(NB_SERVERS);
393    LOG.info("Setup second Zk");
394    HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
395
396    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
397      Admin admin = connection.getAdmin()) {
398      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
399        .setClusterKey(HTU2.getClusterKey()).build();
400      admin.addReplicationPeer("2", rpc);
401    }
402
403    Put p = new Put(row);
404    p.addColumn(row, row, row);
405    final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName());
406    table.put(p);
407
408    HTU.getAdmin().flush(table.getName());
409    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
410
411    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
412      @Override public boolean evaluate() throws Exception {
413        try {
414          SlowMeCopro.cdl.set(new CountDownLatch(1));
415          Get g = new Get(row);
416          g.setConsistency(Consistency.TIMELINE);
417          Result r = table.get(g);
418          Assert.assertTrue(r.isStale());
419          return !r.isEmpty();
420        } finally {
421          SlowMeCopro.cdl.get().countDown();
422          SlowMeCopro.sleepTime.set(0);
423        }
424      }
425    });
426    table.close();
427    LOG.info("stale get on the first cluster done. Now for the second.");
428
429    final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName());
430    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
431      @Override public boolean evaluate() throws Exception {
432        try {
433          SlowMeCopro.cdl.set(new CountDownLatch(1));
434          Get g = new Get(row);
435          g.setConsistency(Consistency.TIMELINE);
436          Result r = table2.get(g);
437          Assert.assertTrue(r.isStale());
438          return !r.isEmpty();
439        } finally {
440          SlowMeCopro.cdl.get().countDown();
441          SlowMeCopro.sleepTime.set(0);
442        }
443      }
444    });
445    table2.close();
446
447    HTU.getAdmin().disableTable(tableDescriptor.getTableName());
448    HTU.deleteTable(tableDescriptor.getTableName());
449
450    HTU2.getAdmin().disableTable(tableDescriptor.getTableName());
451    HTU2.deleteTable(tableDescriptor.getTableName());
452
453    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
454    // the minicluster has negative impact of deleting all HConnections in JVM.
455  }
456
457  @Test
458  public void testBulkLoad() throws IOException {
459    // Create table then get the single region for our new table.
460    LOG.debug("Creating test table");
461    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
462      TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3,
463      HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
464    builder.setRegionReplication(NB_SERVERS);
465    builder.setCoprocessor(SlowMeCopro.class.getName());
466    TableDescriptor hdt = builder.build();
467    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
468
469    // create hfiles to load.
470    LOG.debug("Creating test data");
471    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
472    final int numRows = 10;
473    final byte[] qual = Bytes.toBytes("qual");
474    final byte[] val  = Bytes.toBytes("val");
475    Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
476    for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) {
477      Path hfile = new Path(dir, col.getNameAsString());
478      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
479        val, numRows);
480      family2Files.put(col.getName(), Collections.singletonList(hfile));
481    }
482
483    // bulk load HFiles
484    LOG.debug("Loading test data");
485    BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
486
487    // verify we can read them from the primary
488    LOG.debug("Verifying data load");
489    for (int i = 0; i < numRows; i++) {
490      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
491      Get g = new Get(row);
492      Result r = table.get(g);
493      Assert.assertFalse(r.isStale());
494    }
495
496    // verify we can read them from the replica
497    LOG.debug("Verifying replica queries");
498    try {
499      SlowMeCopro.cdl.set(new CountDownLatch(1));
500      for (int i = 0; i < numRows; i++) {
501        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
502        Get g = new Get(row);
503        g.setConsistency(Consistency.TIMELINE);
504        Result r = table.get(g);
505        Assert.assertTrue(r.isStale());
506      }
507      SlowMeCopro.cdl.get().countDown();
508    } finally {
509      SlowMeCopro.cdl.get().countDown();
510      SlowMeCopro.sleepTime.set(0);
511    }
512
513    HTU.getAdmin().disableTable(hdt.getTableName());
514    HTU.deleteTable(hdt.getTableName());
515  }
516
517  @Test
518  public void testReplicaGetWithPrimaryDown() throws IOException {
519    // Create table then get the single region for our new table.
520    TableDescriptorBuilder builder =
521      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
522        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
523        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
524    builder.setRegionReplication(NB_SERVERS);
525    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
526    TableDescriptor hdt = builder.build();
527    try {
528      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
529
530      Put p = new Put(row);
531      p.addColumn(f, row, row);
532      table.put(p);
533
534      // Flush so it can be picked by the replica refresher thread
535      HTU.flush(table.getName());
536
537      // Sleep for some time until data is picked up by replicas
538      try {
539        Thread.sleep(2 * REFRESH_PERIOD);
540      } catch (InterruptedException e1) {
541        LOG.error(e1.toString(), e1);
542      }
543
544      // But if we ask for stale we will get it
545      Get g = new Get(row);
546      g.setConsistency(Consistency.TIMELINE);
547      Result r = table.get(g);
548      Assert.assertTrue(r.isStale());
549    } finally {
550      HTU.getAdmin().disableTable(hdt.getTableName());
551      HTU.deleteTable(hdt.getTableName());
552    }
553  }
554
555  @Test
556  public void testReplicaScanWithPrimaryDown() throws IOException {
557    // Create table then get the single region for our new table.
558    TableDescriptorBuilder builder =
559      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
560        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
561        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
562    builder.setRegionReplication(NB_SERVERS);
563    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
564    TableDescriptor hdt = builder.build();
565    try {
566      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
567
568      Put p = new Put(row);
569      p.addColumn(f, row, row);
570      table.put(p);
571
572      // Flush so it can be picked by the replica refresher thread
573      HTU.flush(table.getName());
574
575      // Sleep for some time until data is picked up by replicas
576      try {
577        Thread.sleep(2 * REFRESH_PERIOD);
578      } catch (InterruptedException e1) {
579        LOG.error(e1.toString(), e1);
580      }
581
582      // But if we ask for stale we will get it
583      // Instantiating the Scan class
584      Scan scan = new Scan();
585
586      // Scanning the required columns
587      scan.addFamily(f);
588      scan.setConsistency(Consistency.TIMELINE);
589
590      // Getting the scan result
591      ResultScanner scanner = table.getScanner(scan);
592
593      Result r = scanner.next();
594
595      Assert.assertTrue(r.isStale());
596    } finally {
597      HTU.getAdmin().disableTable(hdt.getTableName());
598      HTU.deleteTable(hdt.getTableName());
599    }
600  }
601
602  @Test
603  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
604    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
605    HTU.getConfiguration().set("hbase.rpc.client.impl",
606      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
607    // Create table then get the single region for our new table.
608    TableDescriptorBuilder builder =
609      HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
610        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
611        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
612    builder.setRegionReplication(NB_SERVERS);
613    builder.setCoprocessor(SlowMeCopro.class.getName());
614    TableDescriptor hdt = builder.build();
615    try {
616      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
617
618      Put p = new Put(row);
619      p.addColumn(f, row, row);
620      table.put(p);
621
622      // Flush so it can be picked by the replica refresher thread
623      HTU.flush(table.getName());
624
625      // Sleep for some time until data is picked up by replicas
626      try {
627        Thread.sleep(2 * REFRESH_PERIOD);
628      } catch (InterruptedException e1) {
629        LOG.error(e1.toString(), e1);
630      }
631
632      try {
633        // Create the new connection so new config can kick in
634        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
635        Table t = connection.getTable(hdt.getTableName());
636
637        // But if we ask for stale we will get it
638        SlowMeCopro.cdl.set(new CountDownLatch(1));
639        Get g = new Get(row);
640        g.setConsistency(Consistency.TIMELINE);
641        Result r = t.get(g);
642        Assert.assertTrue(r.isStale());
643        SlowMeCopro.cdl.get().countDown();
644      } finally {
645        SlowMeCopro.cdl.get().countDown();
646        SlowMeCopro.sleepTime.set(0);
647      }
648    } finally {
649      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
650      HTU.getConfiguration().unset("hbase.rpc.client.impl");
651      HTU.getAdmin().disableTable(hdt.getTableName());
652      HTU.deleteTable(hdt.getTableName());
653    }
654  }
655}