001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
055import org.junit.After;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
066
067/**
068 * This class is only a base for other integration-level replication tests. Do not add tests here.
069 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All
070 * other tests should have their own classes and extend this one
071 */
072public class TestReplicationBase {
073  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
074  protected static Connection connection1;
075  protected static Connection connection2;
076  protected static Configuration CONF_WITH_LOCALFS;
077
078  protected static Admin hbaseAdmin;
079
080  protected static Table htable1;
081  protected static Table htable2;
082
083  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
084  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
085  protected static Configuration CONF1 = UTIL1.getConfiguration();
086  protected static Configuration CONF2 = UTIL2.getConfiguration();
087
088  protected static int NUM_SLAVES1 = 1;
089  protected static int NUM_SLAVES2 = 1;
090  protected static final int NB_ROWS_IN_BATCH = 100;
091  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
092  protected static final long SLEEP_TIME = 500;
093  protected static final int NB_RETRIES = 50;
094  protected static AtomicInteger replicateCount = new AtomicInteger();
095  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
096
097  protected static final TableName tableName = TableName.valueOf("test");
098  protected static final byte[] famName = Bytes.toBytes("f");
099  protected static final byte[] row = Bytes.toBytes("row");
100  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
101  protected static final String PEER_ID2 = "2";
102
103  protected boolean isSerialPeer() {
104    return false;
105  }
106
107  protected boolean isSyncPeer() {
108    return false;
109  }
110
111  protected final void cleanUp() throws IOException, InterruptedException {
112    // Starting and stopping replication can make us miss new logs,
113    // rolling like this makes sure the most recent one gets added to the queue
114    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
115      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
116    }
117    int rowCount = UTIL1.countRows(tableName);
118    UTIL1.deleteTableData(tableName);
119    // truncating the table will send one Delete per row to the slave cluster
120    // in an async fashion, which is why we cannot just call deleteTableData on
121    // utility2 since late writes could make it to the slave in some way.
122    // Instead, we truncate the first table and wait for all the Deletes to
123    // make it to the slave.
124    Scan scan = new Scan();
125    int lastCount = 0;
126    for (int i = 0; i < NB_RETRIES; i++) {
127      if (i == NB_RETRIES - 1) {
128        fail("Waited too much time for truncate");
129      }
130      ResultScanner scanner = htable2.getScanner(scan);
131      Result[] res = scanner.next(rowCount);
132      scanner.close();
133      if (res.length != 0) {
134        if (res.length < lastCount) {
135          i--; // Don't increment timeout if we make progress
136        }
137        lastCount = res.length;
138        LOG.info("Still got " + res.length + " rows");
139        Thread.sleep(SLEEP_TIME);
140      } else {
141        break;
142      }
143    }
144  }
145
146  protected static void waitForReplication(int expectedRows, int retries)
147    throws IOException, InterruptedException {
148    waitForReplication(htable2, expectedRows, retries);
149  }
150
151  protected static void waitForReplication(Table table, int expectedRows, int retries)
152    throws IOException, InterruptedException {
153    Scan scan;
154    for (int i = 0; i < retries; i++) {
155      scan = new Scan();
156      if (i == retries - 1) {
157        fail("Waited too much time for normal batch replication");
158      }
159      int count = 0;
160      try (ResultScanner scanner = table.getScanner(scan)) {
161        while (scanner.next() != null) {
162          count++;
163        }
164      }
165      if (count != expectedRows) {
166        LOG.info("Only got " + count + " rows");
167        Thread.sleep(SLEEP_TIME);
168      } else {
169        break;
170      }
171    }
172  }
173
174  protected static void loadData(String prefix, byte[] row) throws IOException {
175    loadData(prefix, row, famName);
176  }
177
178  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
179    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
180    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
181      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
182      put.addColumn(familyName, row, row);
183      puts.add(put);
184    }
185    htable1.put(puts);
186  }
187
188  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
189    Configuration conf = util.getConfiguration();
190    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
191    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
192    // sufficient number of events. But we don't want to go too low because
193    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
194    // more than one batch sent to the peer cluster for better testing.
195    conf.setInt("replication.source.size.capacity", 102400);
196    conf.setLong("replication.source.sleepforretries", 100);
197    conf.setInt("hbase.regionserver.maxlogs", 10);
198    conf.setLong("hbase.master.logcleaner.ttl", 10);
199    conf.setInt("zookeeper.recovery.retry", 1);
200    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
201    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
202    conf.setInt("replication.stats.thread.period.seconds", 5);
203    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
204    conf.setLong("replication.sleep.before.failover", 2000);
205    conf.setInt("replication.source.maxretriesmultiplier", 10);
206    conf.setFloat("replication.source.ratio", 1.0f);
207    conf.setBoolean("replication.source.eof.autorecovery", true);
208    conf.setLong("hbase.serial.replication.waiting.ms", 100);
209  }
210
211  static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) {
212    setupConfig(util1, "/1");
213    setupConfig(util2, "/2");
214
215    Configuration conf2 = util2.getConfiguration();
216    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
217    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
218    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
219  }
220
221  protected static void restartSourceCluster(int numSlaves) throws Exception {
222    Closeables.close(hbaseAdmin, true);
223    Closeables.close(htable1, true);
224    UTIL1.shutdownMiniHBaseCluster();
225    UTIL1.restartHBaseCluster(numSlaves);
226    // Invalidate the cached connection state.
227    CONF1 = UTIL1.getConfiguration();
228    hbaseAdmin = UTIL1.getAdmin();
229    Connection connection1 = UTIL1.getConnection();
230    htable1 = connection1.getTable(tableName);
231  }
232
233  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
234    Closeables.close(htable2, true);
235    UTIL2.restartHBaseCluster(numSlaves);
236    // Invalidate the cached connection state
237    CONF2 = UTIL2.getConfiguration();
238    htable2 = UTIL2.getConnection().getTable(tableName);
239  }
240
241  protected static void createTable(TableName tableName) throws IOException {
242    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
243      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
244        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
245      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
246    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
247    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
248    UTIL1.waitUntilAllRegionsAssigned(tableName);
249    UTIL2.waitUntilAllRegionsAssigned(tableName);
250  }
251
252  private static void startClusters() throws Exception {
253    UTIL1.startMiniZKCluster();
254    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
255    LOG.info("Setup first Zk");
256
257    UTIL2.setZkCluster(miniZK);
258    LOG.info("Setup second Zk");
259
260    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
261    UTIL1.startMiniCluster(NUM_SLAVES1);
262    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
263    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
264    UTIL2.startMiniCluster(NUM_SLAVES2);
265
266    connection1 = ConnectionFactory.createConnection(CONF1);
267    connection2 = ConnectionFactory.createConnection(CONF2);
268    hbaseAdmin = connection1.getAdmin();
269
270    createTable(tableName);
271    htable1 = connection1.getTable(tableName);
272    htable2 = connection2.getTable(tableName);
273  }
274
275  @BeforeClass
276  public static void setUpBeforeClass() throws Exception {
277    configureClusters(UTIL1, UTIL2);
278    startClusters();
279  }
280
281  private boolean peerExist(String peerId) throws IOException {
282    return peerExist(peerId, UTIL1);
283  }
284
285  private boolean peerExist(String peerId, HBaseTestingUtil util) throws IOException {
286    return util.getAdmin().listReplicationPeers().stream()
287      .anyMatch(p -> peerId.equals(p.getPeerId()));
288  }
289
290  // can be override in tests, in case you need to use zk based uri, or the old style uri
291  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
292    return util.getRpcConnnectionURI();
293  }
294
295  protected final void addPeer(String peerId, TableName tableName) throws Exception {
296    addPeer(peerId, tableName, UTIL1, UTIL2);
297  }
298
299  protected final void addPeer(String peerId, TableName tableName, HBaseTestingUtil source,
300    HBaseTestingUtil target) throws Exception {
301    if (peerExist(peerId, source)) {
302      return;
303    }
304    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
305      .setClusterKey(getClusterKey(target)).setSerial(isSerialPeer())
306      .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
307    if (isSyncPeer()) {
308      FileSystem fs2 = target.getTestFileSystem();
309      // The remote wal dir is not important as we do not use it in DA state, here we only need to
310      // confirm that a sync peer in DA state can still replicate data to remote cluster
311      // asynchronously.
312      builder.setReplicateAllUserTables(false)
313        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
314        .setRemoteWALDir(new Path("/RemoteWAL")
315          .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
316    }
317    source.getAdmin().addReplicationPeer(peerId, builder.build());
318  }
319
320  @Before
321  public void setUpBase() throws Exception {
322    addPeer(PEER_ID2, tableName);
323  }
324
325  protected final void removePeer(String peerId) throws Exception {
326    removePeer(peerId, UTIL1);
327  }
328
329  protected final void removePeer(String peerId, HBaseTestingUtil util) throws Exception {
330    if (peerExist(peerId, util)) {
331      util.getAdmin().removeReplicationPeer(peerId);
332    }
333  }
334
335  @After
336  public void tearDownBase() throws Exception {
337    removePeer(PEER_ID2);
338  }
339
340  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
341    Put put = new Put(row);
342    put.addColumn(famName, row, row);
343
344    htable1 = UTIL1.getConnection().getTable(tableName);
345    htable1.put(put);
346
347    Get get = new Get(row);
348    for (int i = 0; i < NB_RETRIES; i++) {
349      if (i == NB_RETRIES - 1) {
350        fail("Waited too much time for put replication");
351      }
352      Result res = htable2.get(get);
353      if (res.isEmpty()) {
354        LOG.info("Row not available");
355        Thread.sleep(SLEEP_TIME);
356      } else {
357        assertArrayEquals(row, res.value());
358        break;
359      }
360    }
361
362    Delete del = new Delete(row);
363    htable1.delete(del);
364
365    get = new Get(row);
366    for (int i = 0; i < NB_RETRIES; i++) {
367      if (i == NB_RETRIES - 1) {
368        fail("Waited too much time for del replication");
369      }
370      Result res = htable2.get(get);
371      if (res.size() >= 1) {
372        LOG.info("Row not deleted");
373        Thread.sleep(SLEEP_TIME);
374      } else {
375        break;
376      }
377    }
378  }
379
380  protected static void runSmallBatchTest() throws IOException, InterruptedException {
381    // normal Batch tests
382    loadData("", row);
383
384    Scan scan = new Scan();
385
386    ResultScanner scanner1 = htable1.getScanner(scan);
387    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
388    scanner1.close();
389    assertEquals(NB_ROWS_IN_BATCH, res1.length);
390
391    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
392  }
393
394  protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
395    List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
396      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
397    for (ServerName rs : rses) {
398      util.getMiniHBaseCluster().stopRegionServer(rs);
399    }
400  }
401
402  @AfterClass
403  public static void tearDownAfterClass() throws Exception {
404    if (htable2 != null) {
405      htable2.close();
406    }
407    if (htable1 != null) {
408      htable1.close();
409    }
410    if (hbaseAdmin != null) {
411      hbaseAdmin.close();
412    }
413
414    if (connection2 != null) {
415      connection2.close();
416    }
417    if (connection1 != null) {
418      connection1.close();
419    }
420    UTIL2.shutdownMiniCluster();
421    UTIL1.shutdownMiniCluster();
422  }
423
424  /**
425   * Custom replication endpoint to keep track of replication status for tests.
426   */
427  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
428    public ReplicationEndpointTest() {
429      replicateCount.set(0);
430    }
431
432    @Override
433    public boolean replicate(ReplicateContext replicateContext) {
434      replicateCount.incrementAndGet();
435      replicatedEntries.addAll(replicateContext.getEntries());
436
437      return super.replicate(replicateContext);
438    }
439  }
440}