001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
055import org.junit.After;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.AfterEach;
060import org.junit.jupiter.api.BeforeEach;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
068
069/**
070 * Replication test base class without BeforeAll method, as in some tests we need to do some changes
071 * before starting clusters.
072 * @see TestReplicationBase
073 */
074public class TestReplicationBaseNoBeforeAll {
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBaseNoBeforeAll.class);
076  protected static Connection connection1;
077  protected static Connection connection2;
078  protected static Configuration CONF_WITH_LOCALFS;
079
080  protected static Admin hbaseAdmin;
081
082  protected static Table htable1;
083  protected static Table htable2;
084
085  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
086  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
087  protected static Configuration CONF1 = UTIL1.getConfiguration();
088  protected static Configuration CONF2 = UTIL2.getConfiguration();
089
090  protected static int NUM_SLAVES1 = 1;
091  protected static int NUM_SLAVES2 = 1;
092  protected static final int NB_ROWS_IN_BATCH = 100;
093  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
094  protected static final long SLEEP_TIME = 500;
095  protected static final int NB_RETRIES = 50;
096  protected static AtomicInteger replicateCount = new AtomicInteger();
097  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
098
099  protected static final TableName tableName = TableName.valueOf("test");
100  protected static final byte[] famName = Bytes.toBytes("f");
101  protected static final byte[] row = Bytes.toBytes("row");
102  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
103  protected static final String PEER_ID2 = "2";
104
105  protected boolean isSerialPeer() {
106    return false;
107  }
108
109  protected boolean isSyncPeer() {
110    return false;
111  }
112
113  protected final void cleanUp() throws IOException, InterruptedException {
114    // Starting and stopping replication can make us miss new logs,
115    // rolling like this makes sure the most recent one gets added to the queue
116    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
117      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
118    }
119    int rowCount = UTIL1.countRows(tableName);
120    UTIL1.deleteTableData(tableName);
121    // truncating the table will send one Delete per row to the slave cluster
122    // in an async fashion, which is why we cannot just call deleteTableData on
123    // utility2 since late writes could make it to the slave in some way.
124    // Instead, we truncate the first table and wait for all the Deletes to
125    // make it to the slave.
126    Scan scan = new Scan();
127    int lastCount = 0;
128    for (int i = 0; i < NB_RETRIES; i++) {
129      if (i == NB_RETRIES - 1) {
130        fail("Waited too much time for truncate");
131      }
132      ResultScanner scanner = htable2.getScanner(scan);
133      Result[] res = scanner.next(rowCount);
134      scanner.close();
135      if (res.length != 0) {
136        if (res.length < lastCount) {
137          i--; // Don't increment timeout if we make progress
138        }
139        lastCount = res.length;
140        LOG.info("Still got " + res.length + " rows");
141        Thread.sleep(SLEEP_TIME);
142      } else {
143        break;
144      }
145    }
146  }
147
148  protected static void waitForReplication(int expectedRows, int retries)
149    throws IOException, InterruptedException {
150    waitForReplication(htable2, expectedRows, retries);
151  }
152
153  protected static void waitForReplication(Table table, int expectedRows, int retries)
154    throws IOException, InterruptedException {
155    Scan scan;
156    for (int i = 0; i < retries; i++) {
157      scan = new Scan();
158      if (i == retries - 1) {
159        fail("Waited too much time for normal batch replication");
160      }
161      int count = 0;
162      try (ResultScanner scanner = table.getScanner(scan)) {
163        while (scanner.next() != null) {
164          count++;
165        }
166      }
167      if (count != expectedRows) {
168        LOG.info("Only got " + count + " rows");
169        Thread.sleep(SLEEP_TIME);
170      } else {
171        break;
172      }
173    }
174  }
175
176  protected static void loadData(String prefix, byte[] row) throws IOException {
177    loadData(prefix, row, famName);
178  }
179
180  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
181    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
182    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
183      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
184      put.addColumn(familyName, row, row);
185      puts.add(put);
186    }
187    htable1.put(puts);
188  }
189
190  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
191    Configuration conf = util.getConfiguration();
192    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
193    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
194    // sufficient number of events. But we don't want to go too low because
195    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
196    // more than one batch sent to the peer cluster for better testing.
197    conf.setInt("replication.source.size.capacity", 102400);
198    conf.setLong("replication.source.sleepforretries", 100);
199    conf.setInt("hbase.regionserver.maxlogs", 10);
200    conf.setLong("hbase.master.logcleaner.ttl", 10);
201    conf.setInt("zookeeper.recovery.retry", 1);
202    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
203    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
204    conf.setInt("replication.stats.thread.period.seconds", 5);
205    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
206    conf.setLong("replication.sleep.before.failover", 2000);
207    conf.setInt("replication.source.maxretriesmultiplier", 10);
208    conf.setFloat("replication.source.ratio", 1.0f);
209    conf.setBoolean("replication.source.eof.autorecovery", true);
210    conf.setLong("hbase.serial.replication.waiting.ms", 100);
211  }
212
213  protected static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) {
214    setupConfig(util1, "/1");
215    setupConfig(util2, "/2");
216
217    Configuration conf2 = util2.getConfiguration();
218    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
219    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
220    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
221  }
222
223  protected static void restartSourceCluster(int numSlaves) throws Exception {
224    Closeables.close(hbaseAdmin, true);
225    Closeables.close(htable1, true);
226    Closeables.close(connection1, true);
227    UTIL1.shutdownMiniHBaseCluster();
228    UTIL1.restartHBaseCluster(numSlaves);
229    // Invalidate the cached connection state.
230    CONF1 = UTIL1.getConfiguration();
231    connection1 = ConnectionFactory.createConnection(CONF1);
232    hbaseAdmin = connection1.getAdmin();
233    htable1 = connection1.getTable(tableName);
234  }
235
236  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
237    Closeables.close(htable2, true);
238    Closeables.close(connection2, true);
239    UTIL2.restartHBaseCluster(numSlaves);
240    // Invalidate the cached connection state
241    CONF2 = UTIL2.getConfiguration();
242    connection2 = ConnectionFactory.createConnection(CONF2);
243    htable2 = connection2.getTable(tableName);
244  }
245
246  protected static void createTable(TableName tableName) throws IOException {
247    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
248      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
249        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
250      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
251    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
252    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
253    UTIL1.waitUntilAllRegionsAssigned(tableName);
254    UTIL2.waitUntilAllRegionsAssigned(tableName);
255  }
256
257  protected static void startClusters() throws Exception {
258    UTIL1.startMiniZKCluster();
259    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
260    LOG.info("Setup first Zk");
261
262    UTIL2.setZkCluster(miniZK);
263    LOG.info("Setup second Zk");
264
265    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
266    UTIL1.startMiniCluster(NUM_SLAVES1);
267    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
268    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
269    UTIL2.startMiniCluster(NUM_SLAVES2);
270
271    connection1 = ConnectionFactory.createConnection(CONF1);
272    connection2 = ConnectionFactory.createConnection(CONF2);
273    hbaseAdmin = connection1.getAdmin();
274
275    createTable(tableName);
276    htable1 = connection1.getTable(tableName);
277    htable2 = connection2.getTable(tableName);
278  }
279
280  private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException {
281    return util.getAdmin().listReplicationPeers().stream()
282      .anyMatch(p -> peerId.equals(p.getPeerId()));
283  }
284
285  // can be overridden in tests, in case you need to use zk based uri, or the old style uri
286  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
287    return util.getRpcConnnectionURI();
288  }
289
290  protected final void addPeer(String peerId, TableName tableName) throws Exception {
291    addPeer(peerId, tableName, UTIL1, UTIL2);
292  }
293
294  protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source,
295    HBaseTestingUtil target) throws Exception {
296    if (peerExist(peerId, source)) {
297      return;
298    }
299    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
300      .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer())
301      .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
302    if (isSyncPeer()) {
303      FileSystem fs2 = target.getTestFileSystem();
304      // The remote wal dir is not important as we do not use it in DA state, here we only need to
305      // confirm that a sync peer in DA state can still replicate data to remote cluster
306      // asynchronously.
307      builder.setReplicateAllUserTables(false)
308        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
309        .setRemoteWALDir(new Path("/RemoteWAL")
310          .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
311    }
312    source.getAdmin().addReplicationPeer(peerId, builder.build());
313  }
314
315  @Before
316  @BeforeEach
317  public void setUpBase() throws Exception {
318    addPeer(PEER_ID2, tableName);
319  }
320
321  protected final void removePeer(String peerId) throws Exception {
322    removePeer(peerId, UTIL1);
323  }
324
325  protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception {
326    if (peerExist(peerId, util)) {
327      util.getAdmin().removeReplicationPeer(peerId);
328    }
329  }
330
331  @After
332  @AfterEach
333  public void tearDownBase() throws Exception {
334    removePeer(PEER_ID2);
335  }
336
337  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
338    Put put = new Put(row);
339    put.addColumn(famName, row, row);
340
341    htable1 = UTIL1.getConnection().getTable(tableName);
342    htable1.put(put);
343
344    Get get = new Get(row);
345    for (int i = 0; i < NB_RETRIES; i++) {
346      if (i == NB_RETRIES - 1) {
347        fail("Waited too much time for put replication");
348      }
349      Result res = htable2.get(get);
350      if (res.isEmpty()) {
351        LOG.info("Row not available");
352        Thread.sleep(SLEEP_TIME);
353      } else {
354        assertArrayEquals(row, res.value());
355        break;
356      }
357    }
358
359    Delete del = new Delete(row);
360    htable1.delete(del);
361
362    get = new Get(row);
363    for (int i = 0; i < NB_RETRIES; i++) {
364      if (i == NB_RETRIES - 1) {
365        fail("Waited too much time for del replication");
366      }
367      Result res = htable2.get(get);
368      if (res.size() >= 1) {
369        LOG.info("Row not deleted");
370        Thread.sleep(SLEEP_TIME);
371      } else {
372        break;
373      }
374    }
375  }
376
377  protected static void runSmallBatchTest() throws IOException, InterruptedException {
378    // normal Batch tests
379    loadData("", row);
380
381    Scan scan = new Scan();
382
383    ResultScanner scanner1 = htable1.getScanner(scan);
384    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
385    scanner1.close();
386    assertEquals(NB_ROWS_IN_BATCH, res1.length);
387
388    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
389  }
390
391  protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
392    List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
393      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
394    for (ServerName rs : rses) {
395      util.getMiniHBaseCluster().stopRegionServer(rs);
396    }
397  }
398
399  @AfterAll
400  @AfterClass
401  public static void tearDownAfterClass() throws Exception {
402    if (htable2 != null) {
403      htable2.close();
404    }
405    if (htable1 != null) {
406      htable1.close();
407    }
408    if (hbaseAdmin != null) {
409      hbaseAdmin.close();
410    }
411
412    if (connection2 != null) {
413      connection2.close();
414    }
415    if (connection1 != null) {
416      connection1.close();
417    }
418    UTIL2.shutdownMiniCluster();
419    UTIL1.shutdownMiniCluster();
420  }
421
422  /**
423   * Custom replication endpoint to keep track of replication status for tests.
424   */
425  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
426    public ReplicationEndpointTest() {
427      replicateCount.set(0);
428    }
429
430    @Override
431    public boolean replicate(ReplicateContext replicateContext) {
432      replicateCount.incrementAndGet();
433      replicatedEntries.addAll(replicateContext.getEntries());
434
435      return super.replicate(replicateContext);
436    }
437  }
438}