001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
055import org.junit.After;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.BeforeEach;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
068import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
069import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
070
071/**
072 * This class is only a base for other integration-level replication tests. Do not add tests here.
073 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All
074 * other tests should have their own classes and extend this one
075 */
076public class TestReplicationBase {
077  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
078  protected static Connection connection1;
079  protected static Connection connection2;
080  protected static Configuration CONF_WITH_LOCALFS;
081
082  protected static Admin hbaseAdmin;
083
084  protected static Table htable1;
085  protected static Table htable2;
086
087  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
088  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
089  protected static Configuration CONF1 = UTIL1.getConfiguration();
090  protected static Configuration CONF2 = UTIL2.getConfiguration();
091
092  protected static int NUM_SLAVES1 = 1;
093  protected static int NUM_SLAVES2 = 1;
094  protected static final int NB_ROWS_IN_BATCH = 100;
095  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
096  protected static final long SLEEP_TIME = 500;
097  protected static final int NB_RETRIES = 50;
098  protected static AtomicInteger replicateCount = new AtomicInteger();
099  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
100
101  protected static final TableName tableName = TableName.valueOf("test");
102  protected static final byte[] famName = Bytes.toBytes("f");
103  protected static final byte[] row = Bytes.toBytes("row");
104  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
105  protected static final String PEER_ID2 = "2";
106
107  protected boolean isSerialPeer() {
108    return false;
109  }
110
111  protected boolean isSyncPeer() {
112    return false;
113  }
114
115  protected final void cleanUp() throws IOException, InterruptedException {
116    // Starting and stopping replication can make us miss new logs,
117    // rolling like this makes sure the most recent one gets added to the queue
118    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
119      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
120    }
121    int rowCount = UTIL1.countRows(tableName);
122    UTIL1.deleteTableData(tableName);
123    // truncating the table will send one Delete per row to the slave cluster
124    // in an async fashion, which is why we cannot just call deleteTableData on
125    // utility2 since late writes could make it to the slave in some way.
126    // Instead, we truncate the first table and wait for all the Deletes to
127    // make it to the slave.
128    Scan scan = new Scan();
129    int lastCount = 0;
130    for (int i = 0; i < NB_RETRIES; i++) {
131      if (i == NB_RETRIES - 1) {
132        fail("Waited too much time for truncate");
133      }
134      ResultScanner scanner = htable2.getScanner(scan);
135      Result[] res = scanner.next(rowCount);
136      scanner.close();
137      if (res.length != 0) {
138        if (res.length < lastCount) {
139          i--; // Don't increment timeout if we make progress
140        }
141        lastCount = res.length;
142        LOG.info("Still got " + res.length + " rows");
143        Thread.sleep(SLEEP_TIME);
144      } else {
145        break;
146      }
147    }
148  }
149
150  protected static void waitForReplication(int expectedRows, int retries)
151    throws IOException, InterruptedException {
152    waitForReplication(htable2, expectedRows, retries);
153  }
154
155  protected static void waitForReplication(Table table, int expectedRows, int retries)
156    throws IOException, InterruptedException {
157    Scan scan;
158    for (int i = 0; i < retries; i++) {
159      scan = new Scan();
160      if (i == retries - 1) {
161        fail("Waited too much time for normal batch replication");
162      }
163      int count = 0;
164      try (ResultScanner scanner = table.getScanner(scan)) {
165        while (scanner.next() != null) {
166          count++;
167        }
168      }
169      if (count != expectedRows) {
170        LOG.info("Only got " + count + " rows");
171        Thread.sleep(SLEEP_TIME);
172      } else {
173        break;
174      }
175    }
176  }
177
178  protected static void loadData(String prefix, byte[] row) throws IOException {
179    loadData(prefix, row, famName);
180  }
181
182  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
183    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
184    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
185      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
186      put.addColumn(familyName, row, row);
187      puts.add(put);
188    }
189    htable1.put(puts);
190  }
191
192  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
193    Configuration conf = util.getConfiguration();
194    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
195    // Disable directory sharing to prevent race conditions when tests run in parallel.
196    // Each test instance gets its own isolated directories to avoid one test's tearDown()
197    // deleting directories another parallel test is still using.
198    conf.setBoolean("hbase.test.disable-directory-sharing", true);
199    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
200    // sufficient number of events. But we don't want to go too low because
201    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
202    // more than one batch sent to the peer cluster for better testing.
203    conf.setInt("replication.source.size.capacity", 102400);
204    conf.setLong("replication.source.sleepforretries", 100);
205    conf.setInt("hbase.regionserver.maxlogs", 10);
206    conf.setLong("hbase.master.logcleaner.ttl", 10);
207    conf.setInt("zookeeper.recovery.retry", 1);
208    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
209    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
210    conf.setInt("replication.stats.thread.period.seconds", 5);
211    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
212    conf.setLong("replication.sleep.before.failover", 2000);
213    conf.setInt("replication.source.maxretriesmultiplier", 10);
214    conf.setFloat("replication.source.ratio", 1.0f);
215    conf.setBoolean("replication.source.eof.autorecovery", true);
216    conf.setLong("hbase.serial.replication.waiting.ms", 100);
217  }
218
219  static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) {
220    setupConfig(util1, "/1");
221    setupConfig(util2, "/2");
222
223    Configuration conf2 = util2.getConfiguration();
224    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
225    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
226    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
227  }
228
229  protected static void restartSourceCluster(int numSlaves) throws Exception {
230    Closeables.close(hbaseAdmin, true);
231    Closeables.close(htable1, true);
232    UTIL1.shutdownMiniHBaseCluster();
233    UTIL1.restartHBaseCluster(numSlaves);
234    // Invalidate the cached connection state.
235    CONF1 = UTIL1.getConfiguration();
236    hbaseAdmin = UTIL1.getAdmin();
237    Connection connection1 = UTIL1.getConnection();
238    htable1 = connection1.getTable(tableName);
239  }
240
241  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
242    Closeables.close(htable2, true);
243    UTIL2.restartHBaseCluster(numSlaves);
244    // Invalidate the cached connection state
245    CONF2 = UTIL2.getConfiguration();
246    htable2 = UTIL2.getConnection().getTable(tableName);
247  }
248
249  protected static void createTable(TableName tableName) throws IOException {
250    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
251      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
252        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
253      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
254    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
255    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
256    UTIL1.waitUntilAllRegionsAssigned(tableName);
257    UTIL2.waitUntilAllRegionsAssigned(tableName);
258  }
259
260  private static void startClusters() throws Exception {
261    UTIL1.startMiniZKCluster();
262    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
263    LOG.info("Setup first Zk");
264
265    UTIL2.setZkCluster(miniZK);
266    LOG.info("Setup second Zk");
267
268    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
269    UTIL1.startMiniCluster(NUM_SLAVES1);
270    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
271    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
272    UTIL2.startMiniCluster(NUM_SLAVES2);
273
274    connection1 = ConnectionFactory.createConnection(CONF1);
275    connection2 = ConnectionFactory.createConnection(CONF2);
276    hbaseAdmin = connection1.getAdmin();
277
278    createTable(tableName);
279    htable1 = connection1.getTable(tableName);
280    htable2 = connection2.getTable(tableName);
281  }
282
283  @BeforeAll
284  @BeforeClass
285  public static void setUpBeforeClass() throws Exception {
286    configureClusters(UTIL1, UTIL2);
287    startClusters();
288  }
289
290  private boolean peerExist(String peerId) throws IOException {
291    return peerExist(peerId, UTIL1);
292  }
293
294  private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException {
295    return util.getAdmin().listReplicationPeers().stream()
296      .anyMatch(p -> peerId.equals(p.getPeerId()));
297  }
298
299  // can be override in tests, in case you need to use zk based uri, or the old style uri
300  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
301    return util.getRpcConnnectionURI();
302  }
303
304  protected final void addPeer(String peerId, TableName tableName) throws Exception {
305    addPeer(peerId, tableName, UTIL1, UTIL2);
306  }
307
308  protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source,
309    HBaseTestingUtil target) throws Exception {
310    if (peerExist(peerId, source)) {
311      return;
312    }
313    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
314      .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer())
315      .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
316    if (isSyncPeer()) {
317      FileSystem fs2 = target.getTestFileSystem();
318      // The remote wal dir is not important as we do not use it in DA state, here we only need to
319      // confirm that a sync peer in DA state can still replicate data to remote cluster
320      // asynchronously.
321      builder.setReplicateAllUserTables(false)
322        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
323        .setRemoteWALDir(new Path("/RemoteWAL")
324          .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
325    }
326    source.getAdmin().addReplicationPeer(peerId, builder.build());
327  }
328
329  @Before
330  @BeforeEach
331  public void setUpBase() throws Exception {
332    addPeer(PEER_ID2, tableName);
333  }
334
335  protected final void removePeer(String peerId) throws Exception {
336    removePeer(peerId, UTIL1);
337  }
338
339  protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception {
340    if (peerExist(peerId, util)) {
341      util.getAdmin().removeReplicationPeer(peerId);
342    }
343  }
344
345  @After
346  @AfterEach
347  public void tearDownBase() throws Exception {
348    removePeer(PEER_ID2);
349  }
350
351  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
352    Put put = new Put(row);
353    put.addColumn(famName, row, row);
354
355    htable1 = UTIL1.getConnection().getTable(tableName);
356    htable1.put(put);
357
358    Get get = new Get(row);
359    for (int i = 0; i < NB_RETRIES; i++) {
360      if (i == NB_RETRIES - 1) {
361        fail("Waited too much time for put replication");
362      }
363      Result res = htable2.get(get);
364      if (res.isEmpty()) {
365        LOG.info("Row not available");
366        Thread.sleep(SLEEP_TIME);
367      } else {
368        assertArrayEquals(row, res.value());
369        break;
370      }
371    }
372
373    Delete del = new Delete(row);
374    htable1.delete(del);
375
376    get = new Get(row);
377    for (int i = 0; i < NB_RETRIES; i++) {
378      if (i == NB_RETRIES - 1) {
379        fail("Waited too much time for del replication");
380      }
381      Result res = htable2.get(get);
382      if (res.size() >= 1) {
383        LOG.info("Row not deleted");
384        Thread.sleep(SLEEP_TIME);
385      } else {
386        break;
387      }
388    }
389  }
390
391  protected static void runSmallBatchTest() throws IOException, InterruptedException {
392    // normal Batch tests
393    loadData("", row);
394
395    Scan scan = new Scan();
396
397    ResultScanner scanner1 = htable1.getScanner(scan);
398    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
399    scanner1.close();
400    assertEquals(NB_ROWS_IN_BATCH, res1.length);
401
402    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
403  }
404
405  protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
406    List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
407      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
408    for (ServerName rs : rses) {
409      util.getMiniHBaseCluster().stopRegionServer(rs);
410    }
411  }
412
413  @AfterAll
414  @AfterClass
415  public static void tearDownAfterClass() throws Exception {
416    if (htable2 != null) {
417      htable2.close();
418    }
419    if (htable1 != null) {
420      htable1.close();
421    }
422    if (hbaseAdmin != null) {
423      hbaseAdmin.close();
424    }
425
426    if (connection2 != null) {
427      connection2.close();
428    }
429    if (connection1 != null) {
430      connection1.close();
431    }
432    UTIL2.shutdownMiniCluster();
433    UTIL1.shutdownMiniCluster();
434  }
435
436  /**
437   * Custom replication endpoint to keep track of replication status for tests.
438   */
439  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
440    public ReplicationEndpointTest() {
441      replicateCount.set(0);
442    }
443
444    @Override
445    public boolean replicate(ReplicateContext replicateContext) {
446      replicateCount.incrementAndGet();
447      replicatedEntries.addAll(replicateContext.getEntries());
448
449      return super.replicate(replicateContext);
450    }
451  }
452}