001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
055import org.junit.After;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.BeforeEach;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
068import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
069import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
070
071/**
072 * This class is only a base for other integration-level replication tests. Do not add tests here.
073 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All
074 * other tests should have their own classes and extend this one
075 */
076public class TestReplicationBase {
077  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
078  protected static Connection connection1;
079  protected static Connection connection2;
080  protected static Configuration CONF_WITH_LOCALFS;
081
082  protected static Admin hbaseAdmin;
083
084  protected static Table htable1;
085  protected static Table htable2;
086
087  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
088  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
089  protected static Configuration CONF1 = UTIL1.getConfiguration();
090  protected static Configuration CONF2 = UTIL2.getConfiguration();
091
092  protected static int NUM_SLAVES1 = 1;
093  protected static int NUM_SLAVES2 = 1;
094  protected static final int NB_ROWS_IN_BATCH = 100;
095  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
096  protected static final long SLEEP_TIME = 500;
097  protected static final int NB_RETRIES = 50;
098  protected static AtomicInteger replicateCount = new AtomicInteger();
099  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
100
101  protected static final TableName tableName = TableName.valueOf("test");
102  protected static final byte[] famName = Bytes.toBytes("f");
103  protected static final byte[] row = Bytes.toBytes("row");
104  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
105  protected static final String PEER_ID2 = "2";
106
107  protected boolean isSerialPeer() {
108    return false;
109  }
110
111  protected boolean isSyncPeer() {
112    return false;
113  }
114
115  protected final void cleanUp() throws IOException, InterruptedException {
116    // Starting and stopping replication can make us miss new logs,
117    // rolling like this makes sure the most recent one gets added to the queue
118    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
119      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
120    }
121    int rowCount = UTIL1.countRows(tableName);
122    UTIL1.deleteTableData(tableName);
123    // truncating the table will send one Delete per row to the slave cluster
124    // in an async fashion, which is why we cannot just call deleteTableData on
125    // utility2 since late writes could make it to the slave in some way.
126    // Instead, we truncate the first table and wait for all the Deletes to
127    // make it to the slave.
128    Scan scan = new Scan();
129    int lastCount = 0;
130    for (int i = 0; i < NB_RETRIES; i++) {
131      if (i == NB_RETRIES - 1) {
132        fail("Waited too much time for truncate");
133      }
134      ResultScanner scanner = htable2.getScanner(scan);
135      Result[] res = scanner.next(rowCount);
136      scanner.close();
137      if (res.length != 0) {
138        if (res.length < lastCount) {
139          i--; // Don't increment timeout if we make progress
140        }
141        lastCount = res.length;
142        LOG.info("Still got " + res.length + " rows");
143        Thread.sleep(SLEEP_TIME);
144      } else {
145        break;
146      }
147    }
148  }
149
150  protected static void waitForReplication(int expectedRows, int retries)
151    throws IOException, InterruptedException {
152    waitForReplication(htable2, expectedRows, retries);
153  }
154
155  protected static void waitForReplication(Table table, int expectedRows, int retries)
156    throws IOException, InterruptedException {
157    Scan scan;
158    for (int i = 0; i < retries; i++) {
159      scan = new Scan();
160      if (i == retries - 1) {
161        fail("Waited too much time for normal batch replication");
162      }
163      int count = 0;
164      try (ResultScanner scanner = table.getScanner(scan)) {
165        while (scanner.next() != null) {
166          count++;
167        }
168      }
169      if (count != expectedRows) {
170        LOG.info("Only got " + count + " rows");
171        Thread.sleep(SLEEP_TIME);
172      } else {
173        break;
174      }
175    }
176  }
177
178  protected static void loadData(String prefix, byte[] row) throws IOException {
179    loadData(prefix, row, famName);
180  }
181
182  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
183    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
184    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
185      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
186      put.addColumn(familyName, row, row);
187      puts.add(put);
188    }
189    htable1.put(puts);
190  }
191
192  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
193    Configuration conf = util.getConfiguration();
194    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
195    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
196    // sufficient number of events. But we don't want to go too low because
197    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
198    // more than one batch sent to the peer cluster for better testing.
199    conf.setInt("replication.source.size.capacity", 102400);
200    conf.setLong("replication.source.sleepforretries", 100);
201    conf.setInt("hbase.regionserver.maxlogs", 10);
202    conf.setLong("hbase.master.logcleaner.ttl", 10);
203    conf.setInt("zookeeper.recovery.retry", 1);
204    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
205    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
206    conf.setInt("replication.stats.thread.period.seconds", 5);
207    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
208    conf.setLong("replication.sleep.before.failover", 2000);
209    conf.setInt("replication.source.maxretriesmultiplier", 10);
210    conf.setFloat("replication.source.ratio", 1.0f);
211    conf.setBoolean("replication.source.eof.autorecovery", true);
212    conf.setLong("hbase.serial.replication.waiting.ms", 100);
213  }
214
215  static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) {
216    setupConfig(util1, "/1");
217    setupConfig(util2, "/2");
218
219    Configuration conf2 = util2.getConfiguration();
220    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
221    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
222    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
223  }
224
225  protected static void restartSourceCluster(int numSlaves) throws Exception {
226    Closeables.close(hbaseAdmin, true);
227    Closeables.close(htable1, true);
228    UTIL1.shutdownMiniHBaseCluster();
229    UTIL1.restartHBaseCluster(numSlaves);
230    // Invalidate the cached connection state.
231    CONF1 = UTIL1.getConfiguration();
232    hbaseAdmin = UTIL1.getAdmin();
233    Connection connection1 = UTIL1.getConnection();
234    htable1 = connection1.getTable(tableName);
235  }
236
237  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
238    Closeables.close(htable2, true);
239    UTIL2.restartHBaseCluster(numSlaves);
240    // Invalidate the cached connection state
241    CONF2 = UTIL2.getConfiguration();
242    htable2 = UTIL2.getConnection().getTable(tableName);
243  }
244
245  protected static void createTable(TableName tableName) throws IOException {
246    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
247      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
248        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
249      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
250    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
251    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
252    UTIL1.waitUntilAllRegionsAssigned(tableName);
253    UTIL2.waitUntilAllRegionsAssigned(tableName);
254  }
255
256  private static void startClusters() throws Exception {
257    UTIL1.startMiniZKCluster();
258    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
259    LOG.info("Setup first Zk");
260
261    UTIL2.setZkCluster(miniZK);
262    LOG.info("Setup second Zk");
263
264    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
265    UTIL1.startMiniCluster(NUM_SLAVES1);
266    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
267    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
268    UTIL2.startMiniCluster(NUM_SLAVES2);
269
270    connection1 = ConnectionFactory.createConnection(CONF1);
271    connection2 = ConnectionFactory.createConnection(CONF2);
272    hbaseAdmin = connection1.getAdmin();
273
274    createTable(tableName);
275    htable1 = connection1.getTable(tableName);
276    htable2 = connection2.getTable(tableName);
277  }
278
279  @BeforeAll
280  @BeforeClass
281  public static void setUpBeforeClass() throws Exception {
282    configureClusters(UTIL1, UTIL2);
283    startClusters();
284  }
285
286  private boolean peerExist(String peerId) throws IOException {
287    return peerExist(peerId, UTIL1);
288  }
289
290  private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException {
291    return util.getAdmin().listReplicationPeers().stream()
292      .anyMatch(p -> peerId.equals(p.getPeerId()));
293  }
294
295  // can be override in tests, in case you need to use zk based uri, or the old style uri
296  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
297    return util.getRpcConnnectionURI();
298  }
299
300  protected final void addPeer(String peerId, TableName tableName) throws Exception {
301    addPeer(peerId, tableName, UTIL1, UTIL2);
302  }
303
304  protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source,
305    HBaseTestingUtil target) throws Exception {
306    if (peerExist(peerId, source)) {
307      return;
308    }
309    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
310      .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer())
311      .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
312    if (isSyncPeer()) {
313      FileSystem fs2 = target.getTestFileSystem();
314      // The remote wal dir is not important as we do not use it in DA state, here we only need to
315      // confirm that a sync peer in DA state can still replicate data to remote cluster
316      // asynchronously.
317      builder.setReplicateAllUserTables(false)
318        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
319        .setRemoteWALDir(new Path("/RemoteWAL")
320          .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
321    }
322    source.getAdmin().addReplicationPeer(peerId, builder.build());
323  }
324
325  @Before
326  @BeforeEach
327  public void setUpBase() throws Exception {
328    addPeer(PEER_ID2, tableName);
329  }
330
331  protected final void removePeer(String peerId) throws Exception {
332    removePeer(peerId, UTIL1);
333  }
334
335  protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception {
336    if (peerExist(peerId, util)) {
337      util.getAdmin().removeReplicationPeer(peerId);
338    }
339  }
340
341  @After
342  @AfterEach
343  public void tearDownBase() throws Exception {
344    removePeer(PEER_ID2);
345  }
346
347  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
348    Put put = new Put(row);
349    put.addColumn(famName, row, row);
350
351    htable1 = UTIL1.getConnection().getTable(tableName);
352    htable1.put(put);
353
354    Get get = new Get(row);
355    for (int i = 0; i < NB_RETRIES; i++) {
356      if (i == NB_RETRIES - 1) {
357        fail("Waited too much time for put replication");
358      }
359      Result res = htable2.get(get);
360      if (res.isEmpty()) {
361        LOG.info("Row not available");
362        Thread.sleep(SLEEP_TIME);
363      } else {
364        assertArrayEquals(row, res.value());
365        break;
366      }
367    }
368
369    Delete del = new Delete(row);
370    htable1.delete(del);
371
372    get = new Get(row);
373    for (int i = 0; i < NB_RETRIES; i++) {
374      if (i == NB_RETRIES - 1) {
375        fail("Waited too much time for del replication");
376      }
377      Result res = htable2.get(get);
378      if (res.size() >= 1) {
379        LOG.info("Row not deleted");
380        Thread.sleep(SLEEP_TIME);
381      } else {
382        break;
383      }
384    }
385  }
386
387  protected static void runSmallBatchTest() throws IOException, InterruptedException {
388    // normal Batch tests
389    loadData("", row);
390
391    Scan scan = new Scan();
392
393    ResultScanner scanner1 = htable1.getScanner(scan);
394    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
395    scanner1.close();
396    assertEquals(NB_ROWS_IN_BATCH, res1.length);
397
398    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
399  }
400
401  protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
402    List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
403      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
404    for (ServerName rs : rses) {
405      util.getMiniHBaseCluster().stopRegionServer(rs);
406    }
407  }
408
409  @AfterAll
410  @AfterClass
411  public static void tearDownAfterClass() throws Exception {
412    if (htable2 != null) {
413      htable2.close();
414    }
415    if (htable1 != null) {
416      htable1.close();
417    }
418    if (hbaseAdmin != null) {
419      hbaseAdmin.close();
420    }
421
422    if (connection2 != null) {
423      connection2.close();
424    }
425    if (connection1 != null) {
426      connection1.close();
427    }
428    UTIL2.shutdownMiniCluster();
429    UTIL1.shutdownMiniCluster();
430  }
431
432  /**
433   * Custom replication endpoint to keep track of replication status for tests.
434   */
435  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
436    public ReplicationEndpointTest() {
437      replicateCount.set(0);
438    }
439
440    @Override
441    public boolean replicate(ReplicateContext replicateContext) {
442      replicateCount.incrementAndGet();
443      replicatedEntries.addAll(replicateContext.getEntries());
444
445      return super.replicate(replicateContext);
446    }
447  }
448}