001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.TreeMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
049import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
050import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
057import org.junit.jupiter.api.AfterAll;
058import org.junit.jupiter.api.BeforeAll;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Tag(LargeTests.TAG)
065@Tag(ClientTests.TAG)
066public class TestReplicaWithCluster {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
069
070  private static final int NB_SERVERS = 3;
071  private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName());
072  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
073
074  // second minicluster used in testing of replication
075  private static HBaseTestingUtil HTU2;
076  private static final byte[] f = HConstants.CATALOG_FAMILY;
077
078  private final static int REFRESH_PERIOD = 1000;
079  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
080
081  /**
082   * This copro is used to synchronize the tests.
083   */
084  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
085    static final AtomicLong sleepTime = new AtomicLong(0);
086    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
087
088    public SlowMeCopro() {
089    }
090
091    @Override
092    public Optional<RegionObserver> getRegionObserver() {
093      return Optional.of(this);
094    }
095
096    @Override
097    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
098      final Get get, final List<Cell> results) throws IOException {
099
100      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
101        CountDownLatch latch = cdl.get();
102        try {
103          if (sleepTime.get() > 0) {
104            LOG.info("Sleeping for " + sleepTime.get() + " ms");
105            Thread.sleep(sleepTime.get());
106          } else if (latch.getCount() > 0) {
107            LOG.info("Waiting for the counterCountDownLatch");
108            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
109            if (latch.getCount() > 0) {
110              throw new RuntimeException("Can't wait more");
111            }
112          }
113        } catch (InterruptedException e1) {
114          LOG.error(e1.toString(), e1);
115        }
116      } else {
117        LOG.info("We're not the primary replicas.");
118      }
119    }
120  }
121
122  /**
123   * This copro is used to simulate region server down exception for Get and Scan
124   */
125  @CoreCoprocessor
126  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
127
128    public RegionServerStoppedCopro() {
129    }
130
131    @Override
132    public Optional<RegionObserver> getRegionObserver() {
133      return Optional.of(this);
134    }
135
136    @Override
137    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
138      final Get get, final List<Cell> results) throws IOException {
139
140      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
141
142      // Fail for the primary replica and replica 1
143      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
144        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
145        throw new RegionServerStoppedException(
146          "Server " + e.getEnvironment().getServerName() + " not running");
147      } else {
148        LOG.info("We're replica region " + replicaId);
149      }
150    }
151
152    @Override
153    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
154      final Scan scan) throws IOException {
155      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
156      // Fail for the primary replica and replica 1
157      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
158        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
159        throw new RegionServerStoppedException(
160          "Server " + e.getEnvironment().getServerName() + " not running");
161      } else {
162        LOG.info("We're replica region " + replicaId);
163      }
164    }
165  }
166
167  /**
168   * This copro is used to slow down the primary meta region scan a bit
169   */
170  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
171    implements RegionCoprocessor, RegionObserver {
172    static boolean slowDownPrimaryMetaScan = false;
173    static boolean throwException = false;
174
175    @Override
176    public Optional<RegionObserver> getRegionObserver() {
177      return Optional.of(this);
178    }
179
180    @Override
181    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
182      final Get get, final List<Cell> results) throws IOException {
183
184      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
185
186      // Fail for the primary replica, but not for meta
187      if (throwException) {
188        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
189          LOG.info("Get, throw Region Server Stopped Exceptoin for region "
190            + e.getEnvironment().getRegion().getRegionInfo());
191          throw new RegionServerStoppedException(
192            "Server " + e.getEnvironment().getServerName() + " not running");
193        }
194      } else {
195        LOG.info("Get, We're replica region " + replicaId);
196      }
197    }
198
199    @Override
200    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
201      final Scan scan) throws IOException {
202
203      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
204
205      // Slow down with the primary meta region scan
206      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
207        if (slowDownPrimaryMetaScan) {
208          LOG.info("Scan with primary meta region, slow down a bit");
209          try {
210            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
211          } catch (InterruptedException ie) {
212            // Ingore
213          }
214        }
215
216        // Fail for the primary replica
217        if (throwException) {
218          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica "
219            + e.getEnvironment().getRegion().getRegionInfo());
220
221          throw new RegionServerStoppedException(
222            "Server " + e.getEnvironment().getServerName() + " not running");
223        } else {
224          LOG.info("Scan, We're replica region " + replicaId);
225        }
226      } else {
227        LOG.info("Scan, We're replica region " + replicaId);
228      }
229    }
230  }
231
232  @BeforeAll
233  public static void beforeClass() throws Exception {
234    // enable store file refreshing
235    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
236      REFRESH_PERIOD);
237
238    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
239    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
240    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
241    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
242    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
243    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
244    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
245
246    // Wait for primary call longer so make sure that it will get exception from the primary call
247    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
248    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
249
250    // Make sure master does not host system tables.
251    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
252
253    // Set system coprocessor so it can be applied to meta regions
254    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
255      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
256
257    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
258      META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
259
260    HTU.startMiniCluster(NB_SERVERS);
261    // Enable meta replica at server side
262    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
263
264    HTU.getHBaseCluster().startMaster();
265  }
266
267  @AfterAll
268  public static void afterClass() throws Exception {
269    if (HTU2 != null) HTU2.shutdownMiniCluster();
270    HTU.shutdownMiniCluster();
271  }
272
273  @Test
274  public void testCreateDeleteTable() throws IOException {
275    // Create table then get the single region for our new table.
276    TableDescriptorBuilder builder =
277      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
278        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
279        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
280    builder.setRegionReplication(NB_SERVERS);
281    builder.setCoprocessor(SlowMeCopro.class.getName());
282    TableDescriptor hdt = builder.build();
283    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
284
285    Put p = new Put(row);
286    p.addColumn(f, row, row);
287    table.put(p);
288
289    Get g = new Get(row);
290    Result r = table.get(g);
291    assertFalse(r.isStale());
292
293    try {
294      // But if we ask for stale we will get it
295      SlowMeCopro.cdl.set(new CountDownLatch(1));
296      g = new Get(row);
297      g.setConsistency(Consistency.TIMELINE);
298      r = table.get(g);
299      assertTrue(r.isStale());
300      SlowMeCopro.cdl.get().countDown();
301    } finally {
302      SlowMeCopro.cdl.get().countDown();
303      SlowMeCopro.sleepTime.set(0);
304    }
305
306    HTU.getAdmin().disableTable(hdt.getTableName());
307    HTU.deleteTable(hdt.getTableName());
308  }
309
310  @Test
311  public void testChangeTable() throws Exception {
312    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
313      .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName())
314      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
315    HTU.getAdmin().createTable(td);
316    Table table = HTU.getConnection().getTable(td.getTableName());
317    // basic test: it should work.
318    Put p = new Put(row);
319    p.addColumn(f, row, row);
320    table.put(p);
321
322    Get g = new Get(row);
323    Result r = table.get(g);
324    assertFalse(r.isStale());
325
326    // Add a CF, it should work.
327    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
328    td = TableDescriptorBuilder.newBuilder(td)
329      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build();
330    HTU.getAdmin().disableTable(td.getTableName());
331    HTU.getAdmin().modifyTable(td);
332    HTU.getAdmin().enableTable(td.getTableName());
333    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
334    assertEquals(bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount(),
335      "fams=" + Arrays.toString(nHdt.getColumnFamilies()));
336
337    p = new Put(row);
338    p.addColumn(row, row, row);
339    table.put(p);
340
341    g = new Get(row);
342    r = table.get(g);
343    assertFalse(r.isStale());
344
345    try {
346      SlowMeCopro.cdl.set(new CountDownLatch(1));
347      g = new Get(row);
348      g.setConsistency(Consistency.TIMELINE);
349      r = table.get(g);
350      assertTrue(r.isStale());
351    } finally {
352      SlowMeCopro.cdl.get().countDown();
353      SlowMeCopro.sleepTime.set(0);
354    }
355
356    Admin admin = HTU.getAdmin();
357    nHdt = admin.getDescriptor(td.getTableName());
358    assertEquals(bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount(),
359      "fams=" + Arrays.toString(nHdt.getColumnFamilies()));
360
361    admin.disableTable(td.getTableName());
362    admin.deleteTable(td.getTableName());
363    admin.close();
364  }
365
366  @SuppressWarnings("deprecation")
367  @Test
368  public void testReplicaAndReplication() throws Exception {
369    TableDescriptorBuilder builder =
370      HTU.createModifyableTableDescriptor("testReplicaAndReplication");
371    builder.setRegionReplication(NB_SERVERS);
372    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row)
373      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
374
375    builder.setCoprocessor(SlowMeCopro.class.getName());
376    TableDescriptor tableDescriptor = builder.build();
377    HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
378
379    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
380    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
381    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
382    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
383
384    HTU2 = new HBaseTestingUtil(conf2);
385    HTU2.setZkCluster(miniZK);
386    HTU2.startMiniCluster(NB_SERVERS);
387    LOG.info("Setup second Zk");
388    HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
389
390    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
391      Admin admin = connection.getAdmin()) {
392      ReplicationPeerConfig rpc =
393        ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getRpcConnnectionURI()).build();
394      admin.addReplicationPeer("2", rpc);
395    }
396
397    Put p = new Put(row);
398    p.addColumn(row, row, row);
399    final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName());
400    table.put(p);
401
402    HTU.getAdmin().flush(table.getName());
403    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
404
405    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
406      @Override
407      public boolean evaluate() throws Exception {
408        try {
409          SlowMeCopro.cdl.set(new CountDownLatch(1));
410          Get g = new Get(row);
411          g.setConsistency(Consistency.TIMELINE);
412          Result r = table.get(g);
413          assertTrue(r.isStale());
414          return !r.isEmpty();
415        } finally {
416          SlowMeCopro.cdl.get().countDown();
417          SlowMeCopro.sleepTime.set(0);
418        }
419      }
420    });
421    table.close();
422    LOG.info("stale get on the first cluster done. Now for the second.");
423
424    final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName());
425    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
426      @Override
427      public boolean evaluate() throws Exception {
428        try {
429          SlowMeCopro.cdl.set(new CountDownLatch(1));
430          Get g = new Get(row);
431          g.setConsistency(Consistency.TIMELINE);
432          Result r = table2.get(g);
433          assertTrue(r.isStale());
434          return !r.isEmpty();
435        } finally {
436          SlowMeCopro.cdl.get().countDown();
437          SlowMeCopro.sleepTime.set(0);
438        }
439      }
440    });
441    table2.close();
442
443    HTU.getAdmin().disableTable(tableDescriptor.getTableName());
444    HTU.deleteTable(tableDescriptor.getTableName());
445
446    HTU2.getAdmin().disableTable(tableDescriptor.getTableName());
447    HTU2.deleteTable(tableDescriptor.getTableName());
448
449    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
450    // the minicluster has negative impact of deleting all HConnections in JVM.
451  }
452
453  @Test
454  public void testBulkLoad() throws IOException {
455    // Create table then get the single region for our new table.
456    LOG.debug("Creating test table");
457    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
458      TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3,
459      HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
460    builder.setRegionReplication(NB_SERVERS);
461    builder.setCoprocessor(SlowMeCopro.class.getName());
462    TableDescriptor hdt = builder.build();
463    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
464
465    // create hfiles to load.
466    LOG.debug("Creating test data");
467    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
468    final int numRows = 10;
469    final byte[] qual = Bytes.toBytes("qual");
470    final byte[] val = Bytes.toBytes("val");
471    Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
472    for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) {
473      Path hfile = new Path(dir, col.getNameAsString());
474      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
475        val, numRows);
476      family2Files.put(col.getName(), Collections.singletonList(hfile));
477    }
478
479    // bulk load HFiles
480    LOG.debug("Loading test data");
481    BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
482
483    // verify we can read them from the primary
484    LOG.debug("Verifying data load");
485    for (int i = 0; i < numRows; i++) {
486      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
487      Get g = new Get(row);
488      Result r = table.get(g);
489      assertFalse(r.isStale());
490    }
491
492    // verify we can read them from the replica
493    LOG.debug("Verifying replica queries");
494    try {
495      SlowMeCopro.cdl.set(new CountDownLatch(1));
496      for (int i = 0; i < numRows; i++) {
497        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
498        Get g = new Get(row);
499        g.setConsistency(Consistency.TIMELINE);
500        Result r = table.get(g);
501        assertTrue(r.isStale());
502      }
503      SlowMeCopro.cdl.get().countDown();
504    } finally {
505      SlowMeCopro.cdl.get().countDown();
506      SlowMeCopro.sleepTime.set(0);
507    }
508
509    HTU.getAdmin().disableTable(hdt.getTableName());
510    HTU.deleteTable(hdt.getTableName());
511  }
512
513  @Test
514  public void testReplicaGetWithPrimaryDown() throws IOException {
515    // Create table then get the single region for our new table.
516    TableDescriptorBuilder builder =
517      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
518        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
519        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
520    builder.setRegionReplication(NB_SERVERS);
521    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
522    TableDescriptor hdt = builder.build();
523    try {
524      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
525
526      Put p = new Put(row);
527      p.addColumn(f, row, row);
528      table.put(p);
529
530      // Flush so it can be picked by the replica refresher thread
531      HTU.flush(table.getName());
532
533      // Sleep for some time until data is picked up by replicas
534      try {
535        Thread.sleep(2 * REFRESH_PERIOD);
536      } catch (InterruptedException e1) {
537        LOG.error(e1.toString(), e1);
538      }
539
540      // But if we ask for stale we will get it
541      Get g = new Get(row);
542      g.setConsistency(Consistency.TIMELINE);
543      Result r = table.get(g);
544      assertTrue(r.isStale());
545    } finally {
546      HTU.getAdmin().disableTable(hdt.getTableName());
547      HTU.deleteTable(hdt.getTableName());
548    }
549  }
550
551  @Test
552  public void testReplicaScanWithPrimaryDown() throws IOException {
553    // Create table then get the single region for our new table.
554    TableDescriptorBuilder builder =
555      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
556        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
557        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
558    builder.setRegionReplication(NB_SERVERS);
559    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
560    TableDescriptor hdt = builder.build();
561    try {
562      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
563
564      Put p = new Put(row);
565      p.addColumn(f, row, row);
566      table.put(p);
567
568      // Flush so it can be picked by the replica refresher thread
569      HTU.flush(table.getName());
570
571      // Sleep for some time until data is picked up by replicas
572      try {
573        Thread.sleep(2 * REFRESH_PERIOD);
574      } catch (InterruptedException e1) {
575        LOG.error(e1.toString(), e1);
576      }
577
578      // But if we ask for stale we will get it
579      // Instantiating the Scan class
580      Scan scan = new Scan();
581
582      // Scanning the required columns
583      scan.addFamily(f);
584      scan.setConsistency(Consistency.TIMELINE);
585
586      // Getting the scan result
587      ResultScanner scanner = table.getScanner(scan);
588
589      Result r = scanner.next();
590
591      assertTrue(r.isStale());
592    } finally {
593      HTU.getAdmin().disableTable(hdt.getTableName());
594      HTU.deleteTable(hdt.getTableName());
595    }
596  }
597
598  @Test
599  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
600    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
601    HTU.getConfiguration().set("hbase.rpc.client.impl",
602      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
603    // Create table then get the single region for our new table.
604    TableDescriptorBuilder builder =
605      HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
606        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
607        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
608    builder.setRegionReplication(NB_SERVERS);
609    builder.setCoprocessor(SlowMeCopro.class.getName());
610    TableDescriptor hdt = builder.build();
611    try {
612      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
613
614      Put p = new Put(row);
615      p.addColumn(f, row, row);
616      table.put(p);
617
618      // Flush so it can be picked by the replica refresher thread
619      HTU.flush(table.getName());
620
621      // Sleep for some time until data is picked up by replicas
622      try {
623        Thread.sleep(2 * REFRESH_PERIOD);
624      } catch (InterruptedException e1) {
625        LOG.error(e1.toString(), e1);
626      }
627
628      try {
629        // Create the new connection so new config can kick in
630        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
631        Table t = connection.getTable(hdt.getTableName());
632
633        // But if we ask for stale we will get it
634        SlowMeCopro.cdl.set(new CountDownLatch(1));
635        Get g = new Get(row);
636        g.setConsistency(Consistency.TIMELINE);
637        Result r = t.get(g);
638        assertTrue(r.isStale());
639        SlowMeCopro.cdl.get().countDown();
640      } finally {
641        SlowMeCopro.cdl.get().countDown();
642        SlowMeCopro.sleepTime.set(0);
643      }
644    } finally {
645      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
646      HTU.getConfiguration().unset("hbase.rpc.client.impl");
647      HTU.getAdmin().disableTable(hdt.getTableName());
648      HTU.deleteTable(hdt.getTableName());
649    }
650  }
651}