001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.JVMClusterUtil;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
053import org.junit.After;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
064
065/**
066 * This class is only a base for other integration-level replication tests.
067 * Do not add tests here.
068 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
069 * All other tests should have their own classes and extend this one
070 */
071public class TestReplicationBase {
072  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
073  protected static Connection connection1;
074  protected static Connection connection2;
075  protected static Configuration CONF_WITH_LOCALFS;
076
077  protected static Admin hbaseAdmin;
078
079  protected static Table htable1;
080  protected static Table htable2;
081
082  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
083  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
084  protected static Configuration CONF1 = UTIL1.getConfiguration();
085  protected static Configuration CONF2 = UTIL2.getConfiguration();
086
087  protected static int NUM_SLAVES1 = 1;
088  protected static int NUM_SLAVES2 = 1;
089  protected static final int NB_ROWS_IN_BATCH = 100;
090  protected static final int NB_ROWS_IN_BIG_BATCH =
091      NB_ROWS_IN_BATCH * 10;
092  protected static final long SLEEP_TIME = 500;
093  protected static final int NB_RETRIES = 50;
094  protected static AtomicInteger replicateCount = new AtomicInteger();
095  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
096
097  protected static final TableName tableName = TableName.valueOf("test");
098  protected static final byte[] famName = Bytes.toBytes("f");
099  protected static final byte[] row = Bytes.toBytes("row");
100  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
101  protected static final String PEER_ID2 = "2";
102
103  protected boolean isSerialPeer() {
104    return false;
105  }
106
107  protected boolean isSyncPeer() {
108    return false;
109  }
110
111  protected final void cleanUp() throws IOException, InterruptedException {
112    // Starting and stopping replication can make us miss new logs,
113    // rolling like this makes sure the most recent one gets added to the queue
114    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
115        .getRegionServerThreads()) {
116      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
117    }
118    int rowCount = UTIL1.countRows(tableName);
119    UTIL1.deleteTableData(tableName);
120    // truncating the table will send one Delete per row to the slave cluster
121    // in an async fashion, which is why we cannot just call deleteTableData on
122    // utility2 since late writes could make it to the slave in some way.
123    // Instead, we truncate the first table and wait for all the Deletes to
124    // make it to the slave.
125    Scan scan = new Scan();
126    int lastCount = 0;
127    for (int i = 0; i < NB_RETRIES; i++) {
128      if (i == NB_RETRIES - 1) {
129        fail("Waited too much time for truncate");
130      }
131      ResultScanner scanner = htable2.getScanner(scan);
132      Result[] res = scanner.next(rowCount);
133      scanner.close();
134      if (res.length != 0) {
135        if (res.length < lastCount) {
136          i--; // Don't increment timeout if we make progress
137        }
138        lastCount = res.length;
139        LOG.info("Still got " + res.length + " rows");
140        Thread.sleep(SLEEP_TIME);
141      } else {
142        break;
143      }
144    }
145  }
146
147  protected static void waitForReplication(int expectedRows, int retries)
148      throws IOException, InterruptedException {
149    waitForReplication(htable2, expectedRows, retries);
150  }
151
152  protected static void waitForReplication(Table table, int expectedRows, int retries)
153    throws IOException, InterruptedException {
154    Scan scan;
155    for (int i = 0; i < retries; i++) {
156      scan = new Scan();
157      if (i == retries - 1) {
158        fail("Waited too much time for normal batch replication");
159      }
160      int count = 0;
161      try (ResultScanner scanner = table.getScanner(scan)) {
162        while (scanner.next() != null) {
163          count++;
164        }
165      }
166      if (count != expectedRows) {
167        LOG.info("Only got " + count + " rows");
168        Thread.sleep(SLEEP_TIME);
169      } else {
170        break;
171      }
172    }
173  }
174
175  protected static void loadData(String prefix, byte[] row) throws IOException {
176    loadData(prefix, row, famName);
177  }
178
179  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
180    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
181    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
182      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
183      put.addColumn(familyName, row, row);
184      puts.add(put);
185    }
186    htable1.put(puts);
187  }
188
189  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
190    Configuration conf = util.getConfiguration();
191    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
192    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
193    // sufficient number of events. But we don't want to go too low because
194    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
195    // more than one batch sent to the peer cluster for better testing.
196    conf.setInt("replication.source.size.capacity", 102400);
197    conf.setLong("replication.source.sleepforretries", 100);
198    conf.setInt("hbase.regionserver.maxlogs", 10);
199    conf.setLong("hbase.master.logcleaner.ttl", 10);
200    conf.setInt("zookeeper.recovery.retry", 1);
201    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
202    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
203    conf.setInt("replication.stats.thread.period.seconds", 5);
204    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
205    conf.setLong("replication.sleep.before.failover", 2000);
206    conf.setInt("replication.source.maxretriesmultiplier", 10);
207    conf.setFloat("replication.source.ratio", 1.0f);
208    conf.setBoolean("replication.source.eof.autorecovery", true);
209    conf.setLong("hbase.serial.replication.waiting.ms", 100);
210  }
211
212  static void configureClusters(HBaseTestingUtil util1,
213      HBaseTestingUtil util2) {
214    setupConfig(util1, "/1");
215    setupConfig(util2, "/2");
216
217    Configuration conf2 = util2.getConfiguration();
218    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
219    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
220    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
221  }
222
223  static void restartSourceCluster(int numSlaves) throws Exception {
224    Closeables.close(hbaseAdmin, true);
225    Closeables.close(htable1, true);
226    UTIL1.shutdownMiniHBaseCluster();
227    UTIL1.restartHBaseCluster(numSlaves);
228    // Invalidate the cached connection state.
229    CONF1 = UTIL1.getConfiguration();
230    hbaseAdmin = UTIL1.getAdmin();
231    Connection connection1 = UTIL1.getConnection();
232    htable1 = connection1.getTable(tableName);
233  }
234
235  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
236    Closeables.close(htable2, true);
237    UTIL2.restartHBaseCluster(numSlaves);
238    // Invalidate the cached connection state
239    CONF2 = UTIL2.getConfiguration();
240    htable2 = UTIL2.getConnection().getTable(tableName);
241  }
242
243  protected static void createTable(TableName tableName)
244    throws IOException {
245    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
246      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
247        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
248      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
249    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
250    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
251    UTIL1.waitUntilAllRegionsAssigned(tableName);
252    UTIL2.waitUntilAllRegionsAssigned(tableName);
253  }
254
255  private static void startClusters() throws Exception {
256    UTIL1.startMiniZKCluster();
257    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
258    LOG.info("Setup first Zk");
259
260    UTIL2.setZkCluster(miniZK);
261    LOG.info("Setup second Zk");
262
263    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
264    UTIL1.startMiniCluster(NUM_SLAVES1);
265    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
266    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
267    UTIL2.startMiniCluster(NUM_SLAVES2);
268
269    connection1 = ConnectionFactory.createConnection(CONF1);
270    connection2 = ConnectionFactory.createConnection(CONF2);
271    hbaseAdmin = connection1.getAdmin();
272
273    createTable(tableName);
274    htable1 = connection1.getTable(tableName);
275    htable2 = connection2.getTable(tableName);
276  }
277
278  @BeforeClass
279  public static void setUpBeforeClass() throws Exception {
280    configureClusters(UTIL1, UTIL2);
281    startClusters();
282  }
283
284  private boolean peerExist(String peerId) throws IOException {
285    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
286  }
287
288  protected final void addPeer(String peerId, TableName tableName) throws Exception {
289    if (!peerExist(peerId)) {
290      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
291        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
292        .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
293      if (isSyncPeer()) {
294        FileSystem fs2 = UTIL2.getTestFileSystem();
295        // The remote wal dir is not important as we do not use it in DA state, here we only need to
296        // confirm that a sync peer in DA state can still replicate data to remote cluster
297        // asynchronously.
298        builder.setReplicateAllUserTables(false)
299          .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
300          .setRemoteWALDir(new Path("/RemoteWAL")
301            .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
302      }
303      hbaseAdmin.addReplicationPeer(peerId, builder.build());
304    }
305  }
306
307  @Before
308  public void setUpBase() throws Exception {
309    addPeer(PEER_ID2, tableName);
310  }
311
312  protected final void removePeer(String peerId) throws Exception {
313    if (peerExist(peerId)) {
314      hbaseAdmin.removeReplicationPeer(peerId);
315    }
316  }
317
318  @After
319  public void tearDownBase() throws Exception {
320    removePeer(PEER_ID2);
321  }
322
323  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
324    Put put = new Put(row);
325    put.addColumn(famName, row, row);
326
327    htable1 = UTIL1.getConnection().getTable(tableName);
328    htable1.put(put);
329
330    Get get = new Get(row);
331    for (int i = 0; i < NB_RETRIES; i++) {
332      if (i == NB_RETRIES - 1) {
333        fail("Waited too much time for put replication");
334      }
335      Result res = htable2.get(get);
336      if (res.isEmpty()) {
337        LOG.info("Row not available");
338        Thread.sleep(SLEEP_TIME);
339      } else {
340        assertArrayEquals(row, res.value());
341        break;
342      }
343    }
344
345    Delete del = new Delete(row);
346    htable1.delete(del);
347
348    get = new Get(row);
349    for (int i = 0; i < NB_RETRIES; i++) {
350      if (i == NB_RETRIES - 1) {
351        fail("Waited too much time for del replication");
352      }
353      Result res = htable2.get(get);
354      if (res.size() >= 1) {
355        LOG.info("Row not deleted");
356        Thread.sleep(SLEEP_TIME);
357      } else {
358        break;
359      }
360    }
361  }
362
363  protected static void runSmallBatchTest() throws IOException, InterruptedException {
364    // normal Batch tests
365    loadData("", row);
366
367    Scan scan = new Scan();
368
369    ResultScanner scanner1 = htable1.getScanner(scan);
370    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
371    scanner1.close();
372    assertEquals(NB_ROWS_IN_BATCH, res1.length);
373
374    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
375  }
376
377  @AfterClass
378  public static void tearDownAfterClass() throws Exception {
379    if (htable2 != null) {
380      htable2.close();
381    }
382    if (htable1 != null) {
383      htable1.close();
384    }
385    if (hbaseAdmin != null) {
386      hbaseAdmin.close();
387    }
388
389    if (connection2 != null) {
390      connection2.close();
391    }
392    if (connection1 != null) {
393      connection1.close();
394    }
395    UTIL2.shutdownMiniCluster();
396    UTIL1.shutdownMiniCluster();
397  }
398
399  /**
400   * Custom replication endpoint to keep track of replication status for tests.
401   */
402  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
403    public ReplicationEndpointTest() {
404      replicateCount.set(0);
405    }
406
407    @Override public boolean replicate(ReplicateContext replicateContext) {
408      replicateCount.incrementAndGet();
409      replicatedEntries.addAll(replicateContext.getEntries());
410
411      return super.replicate(replicateContext);
412    }
413  }
414}