001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.JVMClusterUtil;
050import org.apache.hadoop.hbase.wal.WAL;
051import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
052import org.junit.After;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
060import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
061
062/**
063 * This class is only a base for other integration-level replication tests. Do not add tests here.
064 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All
065 * other tests should have their own classes and extend this one
066 */
067public class TestReplicationBase {
068  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
069  protected static Connection connection1;
070  protected static Connection connection2;
071  protected static Configuration CONF_WITH_LOCALFS;
072
073  protected static ReplicationAdmin admin;
074  protected static Admin hbaseAdmin;
075
076  protected static Table htable1;
077  protected static Table htable2;
078
079  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
080  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
081  protected static Configuration CONF1 = UTIL1.getConfiguration();
082  protected static Configuration CONF2 = UTIL2.getConfiguration();
083
084  protected static int NUM_SLAVES1 = 1;
085  protected static int NUM_SLAVES2 = 1;
086  protected static final int NB_ROWS_IN_BATCH = 100;
087  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
088  protected static final long SLEEP_TIME = 500;
089  protected static final int NB_RETRIES = 50;
090  protected static AtomicInteger replicateCount = new AtomicInteger();
091  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
092
093  protected static final TableName tableName = TableName.valueOf("test");
094  protected static final byte[] famName = Bytes.toBytes("f");
095  protected static final byte[] row = Bytes.toBytes("row");
096  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
097  protected static final String PEER_ID2 = "2";
098
099  protected boolean isSerialPeer() {
100    return false;
101  }
102
103  protected final void cleanUp() throws IOException, InterruptedException {
104    // Starting and stopping replication can make us miss new logs,
105    // rolling like this makes sure the most recent one gets added to the queue
106    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
107      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
108    }
109    int rowCount = UTIL1.countRows(tableName);
110    UTIL1.deleteTableData(tableName);
111    // truncating the table will send one Delete per row to the slave cluster
112    // in an async fashion, which is why we cannot just call deleteTableData on
113    // utility2 since late writes could make it to the slave in some way.
114    // Instead, we truncate the first table and wait for all the Deletes to
115    // make it to the slave.
116    Scan scan = new Scan();
117    int lastCount = 0;
118    for (int i = 0; i < NB_RETRIES; i++) {
119      if (i == NB_RETRIES - 1) {
120        fail("Waited too much time for truncate");
121      }
122      ResultScanner scanner = htable2.getScanner(scan);
123      Result[] res = scanner.next(rowCount);
124      scanner.close();
125      if (res.length != 0) {
126        if (res.length < lastCount) {
127          i--; // Don't increment timeout if we make progress
128        }
129        lastCount = res.length;
130        LOG.info("Still got " + res.length + " rows");
131        Thread.sleep(SLEEP_TIME);
132      } else {
133        break;
134      }
135    }
136  }
137
138  protected static void waitForReplication(int expectedRows, int retries)
139    throws IOException, InterruptedException {
140    waitForReplication(htable2, expectedRows, retries);
141  }
142
143  protected static void waitForReplication(Table table, int expectedRows, int retries)
144    throws IOException, InterruptedException {
145    Scan scan;
146    for (int i = 0; i < retries; i++) {
147      scan = new Scan();
148      if (i == retries - 1) {
149        fail("Waited too much time for normal batch replication");
150      }
151      int count = 0;
152      try (ResultScanner scanner = table.getScanner(scan)) {
153        while (scanner.next() != null) {
154          count++;
155        }
156      }
157      if (count != expectedRows) {
158        LOG.info("Only got " + count + " rows");
159        Thread.sleep(SLEEP_TIME);
160      } else {
161        break;
162      }
163    }
164  }
165
166  protected static void loadData(String prefix, byte[] row) throws IOException {
167    loadData(prefix, row, famName);
168  }
169
170  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
171    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
172    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
173      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
174      put.addColumn(familyName, row, row);
175      puts.add(put);
176    }
177    htable1.put(puts);
178  }
179
180  protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
181    Configuration conf = util.getConfiguration();
182    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
183    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
184    // sufficient number of events. But we don't want to go too low because
185    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
186    // more than one batch sent to the peer cluster for better testing.
187    conf.setInt("replication.source.size.capacity", 102400);
188    conf.setLong("replication.source.sleepforretries", 100);
189    conf.setInt("hbase.regionserver.maxlogs", 10);
190    conf.setLong("hbase.master.logcleaner.ttl", 10);
191    conf.setInt("zookeeper.recovery.retry", 1);
192    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
193    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
194    conf.setInt("replication.stats.thread.period.seconds", 5);
195    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
196    conf.setLong("replication.sleep.before.failover", 2000);
197    conf.setInt("replication.source.maxretriesmultiplier", 10);
198    conf.setFloat("replication.source.ratio", 1.0f);
199    conf.setBoolean("replication.source.eof.autorecovery", true);
200    conf.setLong("hbase.serial.replication.waiting.ms", 100);
201  }
202
203  static void configureClusters(HBaseTestingUtility util1, HBaseTestingUtility util2) {
204    setupConfig(util1, "/1");
205    setupConfig(util2, "/2");
206
207    Configuration conf2 = util2.getConfiguration();
208    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
209    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
210    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
211  }
212
213  static void restartSourceCluster(int numSlaves) throws Exception {
214    Closeables.close(hbaseAdmin, true);
215    Closeables.close(htable1, true);
216    UTIL1.shutdownMiniHBaseCluster();
217    UTIL1.restartHBaseCluster(numSlaves);
218    // Invalidate the cached connection state.
219    CONF1 = UTIL1.getConfiguration();
220    hbaseAdmin = UTIL1.getAdmin();
221    Connection connection1 = UTIL1.getConnection();
222    htable1 = connection1.getTable(tableName);
223  }
224
225  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
226    Closeables.close(htable2, true);
227    UTIL2.restartHBaseCluster(numSlaves);
228    // Invalidate the cached connection state
229    CONF2 = UTIL2.getConfiguration();
230    htable2 = UTIL2.getConnection().getTable(tableName);
231  }
232
233  protected static void createTable(TableName tableName) throws IOException {
234    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
235      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
236        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
237      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
238    UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
239    UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
240    UTIL1.waitUntilAllRegionsAssigned(tableName);
241    UTIL2.waitUntilAllRegionsAssigned(tableName);
242  }
243
244  private static void startClusters() throws Exception {
245    UTIL1.startMiniZKCluster();
246    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
247    LOG.info("Setup first Zk");
248
249    UTIL2.setZkCluster(miniZK);
250    LOG.info("Setup second Zk");
251
252    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
253    UTIL1.startMiniCluster(NUM_SLAVES1);
254    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
255    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
256    UTIL2.startMiniCluster(NUM_SLAVES2);
257
258    connection1 = ConnectionFactory.createConnection(CONF1);
259    connection2 = ConnectionFactory.createConnection(CONF2);
260    admin = new ReplicationAdmin(CONF1);
261    hbaseAdmin = connection1.getAdmin();
262
263    createTable(tableName);
264    htable1 = connection1.getTable(tableName);
265    htable2 = connection2.getTable(tableName);
266  }
267
268  @BeforeClass
269  public static void setUpBeforeClass() throws Exception {
270    configureClusters(UTIL1, UTIL2);
271    startClusters();
272  }
273
274  private boolean peerExist(String peerId) throws IOException {
275    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
276  }
277
278  protected final void addPeer(String peerId, TableName tableName) throws Exception {
279    if (!peerExist(peerId)) {
280      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
281        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
282        .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
283      hbaseAdmin.addReplicationPeer(peerId, builder.build());
284    }
285  }
286
287  @Before
288  public void setUpBase() throws Exception {
289    addPeer(PEER_ID2, tableName);
290  }
291
292  protected final void removePeer(String peerId) throws Exception {
293    if (peerExist(peerId)) {
294      hbaseAdmin.removeReplicationPeer(peerId);
295    }
296  }
297
298  @After
299  public void tearDownBase() throws Exception {
300    removePeer(PEER_ID2);
301  }
302
303  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
304    Put put = new Put(row);
305    put.addColumn(famName, row, row);
306
307    htable1 = UTIL1.getConnection().getTable(tableName);
308    htable1.put(put);
309
310    Get get = new Get(row);
311    for (int i = 0; i < NB_RETRIES; i++) {
312      if (i == NB_RETRIES - 1) {
313        fail("Waited too much time for put replication");
314      }
315      Result res = htable2.get(get);
316      if (res.isEmpty()) {
317        LOG.info("Row not available");
318        Thread.sleep(SLEEP_TIME);
319      } else {
320        assertArrayEquals(row, res.value());
321        break;
322      }
323    }
324
325    Delete del = new Delete(row);
326    htable1.delete(del);
327
328    get = new Get(row);
329    for (int i = 0; i < NB_RETRIES; i++) {
330      if (i == NB_RETRIES - 1) {
331        fail("Waited too much time for del replication");
332      }
333      Result res = htable2.get(get);
334      if (res.size() >= 1) {
335        LOG.info("Row not deleted");
336        Thread.sleep(SLEEP_TIME);
337      } else {
338        break;
339      }
340    }
341  }
342
343  protected static void runSmallBatchTest() throws IOException, InterruptedException {
344    // normal Batch tests
345    loadData("", row);
346
347    Scan scan = new Scan();
348
349    ResultScanner scanner1 = htable1.getScanner(scan);
350    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
351    scanner1.close();
352    assertEquals(NB_ROWS_IN_BATCH, res1.length);
353
354    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
355  }
356
357  @AfterClass
358  public static void tearDownAfterClass() throws Exception {
359    if (htable2 != null) {
360      htable2.close();
361    }
362    if (htable1 != null) {
363      htable1.close();
364    }
365    if (admin != null) {
366      admin.close();
367    }
368    if (hbaseAdmin != null) {
369      hbaseAdmin.close();
370    }
371
372    if (connection2 != null) {
373      connection2.close();
374    }
375    if (connection1 != null) {
376      connection1.close();
377    }
378    UTIL2.shutdownMiniCluster();
379    UTIL1.shutdownMiniCluster();
380  }
381
382  /**
383   * Custom replication endpoint to keep track of replication status for tests.
384   */
385  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
386    public ReplicationEndpointTest() {
387      replicateCount.set(0);
388    }
389
390    @Override
391    public boolean replicate(ReplicateContext replicateContext) {
392      replicateCount.incrementAndGet();
393      replicatedEntries.addAll(replicateContext.getEntries());
394
395      return super.replicate(replicateContext);
396    }
397  }
398}