001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.JVMClusterUtil;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
053import org.junit.After;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
064
065/**
066 * This class is only a base for other integration-level replication tests. Do not add tests here.
067 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go All
068 * other tests should have their own classes and extend this one
069 */
070public class TestReplicationBase {
071  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
072  protected static Connection connection1;
073  protected static Connection connection2;
074  protected static Configuration CONF_WITH_LOCALFS;
075
076  protected static Admin hbaseAdmin;
077
078  protected static Table htable1;
079  protected static Table htable2;
080
081  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
082  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
083  protected static Configuration CONF1 = UTIL1.getConfiguration();
084  protected static Configuration CONF2 = UTIL2.getConfiguration();
085
086  protected static int NUM_SLAVES1 = 1;
087  protected static int NUM_SLAVES2 = 1;
088  protected static final int NB_ROWS_IN_BATCH = 100;
089  protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
090  protected static final long SLEEP_TIME = 500;
091  protected static final int NB_RETRIES = 50;
092  protected static AtomicInteger replicateCount = new AtomicInteger();
093  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
094
095  protected static final TableName tableName = TableName.valueOf("test");
096  protected static final byte[] famName = Bytes.toBytes("f");
097  protected static final byte[] row = Bytes.toBytes("row");
098  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
099  protected static final String PEER_ID2 = "2";
100
101  protected boolean isSerialPeer() {
102    return false;
103  }
104
105  protected boolean isSyncPeer() {
106    return false;
107  }
108
109  protected final void cleanUp() throws IOException, InterruptedException {
110    // Starting and stopping replication can make us miss new logs,
111    // rolling like this makes sure the most recent one gets added to the queue
112    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
113      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
114    }
115    int rowCount = UTIL1.countRows(tableName);
116    UTIL1.deleteTableData(tableName);
117    // truncating the table will send one Delete per row to the slave cluster
118    // in an async fashion, which is why we cannot just call deleteTableData on
119    // utility2 since late writes could make it to the slave in some way.
120    // Instead, we truncate the first table and wait for all the Deletes to
121    // make it to the slave.
122    Scan scan = new Scan();
123    int lastCount = 0;
124    for (int i = 0; i < NB_RETRIES; i++) {
125      if (i == NB_RETRIES - 1) {
126        fail("Waited too much time for truncate");
127      }
128      ResultScanner scanner = htable2.getScanner(scan);
129      Result[] res = scanner.next(rowCount);
130      scanner.close();
131      if (res.length != 0) {
132        if (res.length < lastCount) {
133          i--; // Don't increment timeout if we make progress
134        }
135        lastCount = res.length;
136        LOG.info("Still got " + res.length + " rows");
137        Thread.sleep(SLEEP_TIME);
138      } else {
139        break;
140      }
141    }
142  }
143
144  protected static void waitForReplication(int expectedRows, int retries)
145    throws IOException, InterruptedException {
146    waitForReplication(htable2, expectedRows, retries);
147  }
148
149  protected static void waitForReplication(Table table, int expectedRows, int retries)
150    throws IOException, InterruptedException {
151    Scan scan;
152    for (int i = 0; i < retries; i++) {
153      scan = new Scan();
154      if (i == retries - 1) {
155        fail("Waited too much time for normal batch replication");
156      }
157      int count = 0;
158      try (ResultScanner scanner = table.getScanner(scan)) {
159        while (scanner.next() != null) {
160          count++;
161        }
162      }
163      if (count != expectedRows) {
164        LOG.info("Only got " + count + " rows");
165        Thread.sleep(SLEEP_TIME);
166      } else {
167        break;
168      }
169    }
170  }
171
172  protected static void loadData(String prefix, byte[] row) throws IOException {
173    loadData(prefix, row, famName);
174  }
175
176  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
177    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
178    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
179      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
180      put.addColumn(familyName, row, row);
181      puts.add(put);
182    }
183    htable1.put(puts);
184  }
185
186  protected static void setupConfig(HBaseTestingUtil util, String znodeParent) {
187    Configuration conf = util.getConfiguration();
188    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
189    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
190    // sufficient number of events. But we don't want to go too low because
191    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
192    // more than one batch sent to the peer cluster for better testing.
193    conf.setInt("replication.source.size.capacity", 102400);
194    conf.setLong("replication.source.sleepforretries", 100);
195    conf.setInt("hbase.regionserver.maxlogs", 10);
196    conf.setLong("hbase.master.logcleaner.ttl", 10);
197    conf.setInt("zookeeper.recovery.retry", 1);
198    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
199    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
200    conf.setInt("replication.stats.thread.period.seconds", 5);
201    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
202    conf.setLong("replication.sleep.before.failover", 2000);
203    conf.setInt("replication.source.maxretriesmultiplier", 10);
204    conf.setFloat("replication.source.ratio", 1.0f);
205    conf.setBoolean("replication.source.eof.autorecovery", true);
206    conf.setLong("hbase.serial.replication.waiting.ms", 100);
207  }
208
209  static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) {
210    setupConfig(util1, "/1");
211    setupConfig(util2, "/2");
212
213    Configuration conf2 = util2.getConfiguration();
214    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
215    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
216    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
217  }
218
219  protected static void restartSourceCluster(int numSlaves) throws Exception {
220    Closeables.close(hbaseAdmin, true);
221    Closeables.close(htable1, true);
222    UTIL1.shutdownMiniHBaseCluster();
223    UTIL1.restartHBaseCluster(numSlaves);
224    // Invalidate the cached connection state.
225    CONF1 = UTIL1.getConfiguration();
226    hbaseAdmin = UTIL1.getAdmin();
227    Connection connection1 = UTIL1.getConnection();
228    htable1 = connection1.getTable(tableName);
229  }
230
231  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
232    Closeables.close(htable2, true);
233    UTIL2.restartHBaseCluster(numSlaves);
234    // Invalidate the cached connection state
235    CONF2 = UTIL2.getConfiguration();
236    htable2 = UTIL2.getConnection().getTable(tableName);
237  }
238
239  protected static void createTable(TableName tableName) throws IOException {
240    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
241      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
242        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
243      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
244    UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
245    UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
246    UTIL1.waitUntilAllRegionsAssigned(tableName);
247    UTIL2.waitUntilAllRegionsAssigned(tableName);
248  }
249
250  private static void startClusters() throws Exception {
251    UTIL1.startMiniZKCluster();
252    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
253    LOG.info("Setup first Zk");
254
255    UTIL2.setZkCluster(miniZK);
256    LOG.info("Setup second Zk");
257
258    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
259    UTIL1.startMiniCluster(NUM_SLAVES1);
260    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
261    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
262    UTIL2.startMiniCluster(NUM_SLAVES2);
263
264    connection1 = ConnectionFactory.createConnection(CONF1);
265    connection2 = ConnectionFactory.createConnection(CONF2);
266    hbaseAdmin = connection1.getAdmin();
267
268    createTable(tableName);
269    htable1 = connection1.getTable(tableName);
270    htable2 = connection2.getTable(tableName);
271  }
272
273  @BeforeClass
274  public static void setUpBeforeClass() throws Exception {
275    configureClusters(UTIL1, UTIL2);
276    startClusters();
277  }
278
279  private boolean peerExist(String peerId) throws IOException {
280    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
281  }
282
283  protected final void addPeer(String peerId, TableName tableName) throws Exception {
284    if (!peerExist(peerId)) {
285      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
286        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
287        .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
288      if (isSyncPeer()) {
289        FileSystem fs2 = UTIL2.getTestFileSystem();
290        // The remote wal dir is not important as we do not use it in DA state, here we only need to
291        // confirm that a sync peer in DA state can still replicate data to remote cluster
292        // asynchronously.
293        builder.setReplicateAllUserTables(false)
294          .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
295          .setRemoteWALDir(new Path("/RemoteWAL")
296            .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
297      }
298      hbaseAdmin.addReplicationPeer(peerId, builder.build());
299    }
300  }
301
302  @Before
303  public void setUpBase() throws Exception {
304    addPeer(PEER_ID2, tableName);
305  }
306
307  protected final void removePeer(String peerId) throws Exception {
308    if (peerExist(peerId)) {
309      hbaseAdmin.removeReplicationPeer(peerId);
310    }
311  }
312
313  @After
314  public void tearDownBase() throws Exception {
315    removePeer(PEER_ID2);
316  }
317
318  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
319    Put put = new Put(row);
320    put.addColumn(famName, row, row);
321
322    htable1 = UTIL1.getConnection().getTable(tableName);
323    htable1.put(put);
324
325    Get get = new Get(row);
326    for (int i = 0; i < NB_RETRIES; i++) {
327      if (i == NB_RETRIES - 1) {
328        fail("Waited too much time for put replication");
329      }
330      Result res = htable2.get(get);
331      if (res.isEmpty()) {
332        LOG.info("Row not available");
333        Thread.sleep(SLEEP_TIME);
334      } else {
335        assertArrayEquals(row, res.value());
336        break;
337      }
338    }
339
340    Delete del = new Delete(row);
341    htable1.delete(del);
342
343    get = new Get(row);
344    for (int i = 0; i < NB_RETRIES; i++) {
345      if (i == NB_RETRIES - 1) {
346        fail("Waited too much time for del replication");
347      }
348      Result res = htable2.get(get);
349      if (res.size() >= 1) {
350        LOG.info("Row not deleted");
351        Thread.sleep(SLEEP_TIME);
352      } else {
353        break;
354      }
355    }
356  }
357
358  protected static void runSmallBatchTest() throws IOException, InterruptedException {
359    // normal Batch tests
360    loadData("", row);
361
362    Scan scan = new Scan();
363
364    ResultScanner scanner1 = htable1.getScanner(scan);
365    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
366    scanner1.close();
367    assertEquals(NB_ROWS_IN_BATCH, res1.length);
368
369    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
370  }
371
372  @AfterClass
373  public static void tearDownAfterClass() throws Exception {
374    if (htable2 != null) {
375      htable2.close();
376    }
377    if (htable1 != null) {
378      htable1.close();
379    }
380    if (hbaseAdmin != null) {
381      hbaseAdmin.close();
382    }
383
384    if (connection2 != null) {
385      connection2.close();
386    }
387    if (connection1 != null) {
388      connection1.close();
389    }
390    UTIL2.shutdownMiniCluster();
391    UTIL1.shutdownMiniCluster();
392  }
393
394  /**
395   * Custom replication endpoint to keep track of replication status for tests.
396   */
397  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
398    public ReplicationEndpointTest() {
399      replicateCount.set(0);
400    }
401
402    @Override
403    public boolean replicate(ReplicateContext replicateContext) {
404      replicateCount.incrementAndGet();
405      replicatedEntries.addAll(replicateContext.getEntries());
406
407      return super.replicate(replicateContext);
408    }
409  }
410}