001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.commons.io.IOUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * This class is only a base for other integration-level replication tests.
059 * Do not add tests here.
060 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
061 * All other tests should have their own classes and extend this one
062 */
063public class TestReplicationBase {
064  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
065
066  protected static Configuration CONF_WITH_LOCALFS;
067
068  protected static ReplicationAdmin admin;
069  protected static Admin hbaseAdmin;
070
071  protected static Table htable1;
072  protected static Table htable2;
073
074  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
075  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
076  protected static Configuration CONF1 = UTIL1.getConfiguration();
077  protected static Configuration CONF2 = UTIL2.getConfiguration();
078
079  protected static int NUM_SLAVES1 = 1;
080  protected static final int NUM_SLAVES2 = 1;
081  protected static final int NB_ROWS_IN_BATCH = 100;
082  protected static final int NB_ROWS_IN_BIG_BATCH =
083      NB_ROWS_IN_BATCH * 10;
084  protected static final long SLEEP_TIME = 500;
085  protected static final int NB_RETRIES = 50;
086
087  protected static final TableName tableName = TableName.valueOf("test");
088  protected static final byte[] famName = Bytes.toBytes("f");
089  protected static final byte[] row = Bytes.toBytes("row");
090  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
091  protected static final String PEER_ID2 = "2";
092
093  protected boolean isSerialPeer() {
094    return false;
095  }
096
097  protected final void cleanUp() throws IOException, InterruptedException {
098    // Starting and stopping replication can make us miss new logs,
099    // rolling like this makes sure the most recent one gets added to the queue
100    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
101        .getRegionServerThreads()) {
102      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
103    }
104    int rowCount = UTIL1.countRows(tableName);
105    UTIL1.deleteTableData(tableName);
106    // truncating the table will send one Delete per row to the slave cluster
107    // in an async fashion, which is why we cannot just call deleteTableData on
108    // utility2 since late writes could make it to the slave in some way.
109    // Instead, we truncate the first table and wait for all the Deletes to
110    // make it to the slave.
111    Scan scan = new Scan();
112    int lastCount = 0;
113    for (int i = 0; i < NB_RETRIES; i++) {
114      if (i == NB_RETRIES - 1) {
115        fail("Waited too much time for truncate");
116      }
117      ResultScanner scanner = htable2.getScanner(scan);
118      Result[] res = scanner.next(rowCount);
119      scanner.close();
120      if (res.length != 0) {
121        if (res.length < lastCount) {
122          i--; // Don't increment timeout if we make progress
123        }
124        lastCount = res.length;
125        LOG.info("Still got " + res.length + " rows");
126        Thread.sleep(SLEEP_TIME);
127      } else {
128        break;
129      }
130    }
131  }
132
133  protected static void waitForReplication(int expectedRows, int retries)
134      throws IOException, InterruptedException {
135    waitForReplication(htable2, expectedRows, retries);
136  }
137
138  protected static void waitForReplication(Table htable2, int expectedRows, int retries)
139      throws IOException, InterruptedException {
140    Scan scan;
141    for (int i = 0; i < retries; i++) {
142      scan = new Scan();
143      if (i== retries -1) {
144        fail("Waited too much time for normal batch replication");
145      }
146      ResultScanner scanner = htable2.getScanner(scan);
147      Result[] res = scanner.next(expectedRows);
148      scanner.close();
149      if (res.length != expectedRows) {
150        LOG.info("Only got " + res.length + " rows");
151        Thread.sleep(SLEEP_TIME);
152      } else {
153        break;
154      }
155    }
156  }
157
158  protected static void loadData(String prefix, byte[] row) throws IOException {
159    loadData(prefix, row, famName);
160  }
161
162  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
163    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
164    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
165      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
166      put.addColumn(familyName, row, row);
167      puts.add(put);
168    }
169    htable1.put(puts);
170  }
171
172  protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
173    Configuration conf = util.getConfiguration();
174    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
175    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
176    // sufficient number of events. But we don't want to go too low because
177    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
178    // more than one batch sent to the peer cluster for better testing.
179    conf.setInt("replication.source.size.capacity", 102400);
180    conf.setLong("replication.source.sleepforretries", 100);
181    conf.setInt("hbase.regionserver.maxlogs", 10);
182    conf.setLong("hbase.master.logcleaner.ttl", 10);
183    conf.setInt("zookeeper.recovery.retry", 1);
184    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
185    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
186    conf.setInt("replication.stats.thread.period.seconds", 5);
187    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
188    conf.setLong("replication.sleep.before.failover", 2000);
189    conf.setInt("replication.source.maxretriesmultiplier", 10);
190    conf.setFloat("replication.source.ratio", 1.0f);
191    conf.setBoolean("replication.source.eof.autorecovery", true);
192    conf.setLong("hbase.serial.replication.waiting.ms", 100);
193  }
194
195  static void configureClusters(HBaseTestingUtility util1,
196      HBaseTestingUtility util2) {
197    setupConfig(util1, "/1");
198    setupConfig(util2, "/2");
199
200    Configuration conf2 = util2.getConfiguration();
201    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
202    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
203    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
204  }
205
206  static void restartSourceCluster(int numSlaves)
207      throws Exception {
208    IOUtils.closeQuietly(hbaseAdmin, htable1);
209    UTIL1.shutdownMiniHBaseCluster();
210    UTIL1.restartHBaseCluster(numSlaves);
211    // Invalidate the cached connection state.
212    CONF1 = UTIL1.getConfiguration();
213    hbaseAdmin = UTIL1.getAdmin();
214    Connection connection1 = UTIL1.getConnection();
215    htable1 = connection1.getTable(tableName);
216  }
217
218  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
219    IOUtils.closeQuietly(htable2);
220    UTIL2.restartHBaseCluster(numSlaves);
221    // Invalidate the cached connection state
222    CONF2 = UTIL2.getConfiguration();
223    htable2 = UTIL2.getConnection().getTable(tableName);
224  }
225
226  private static void startClusters() throws Exception {
227    UTIL1.startMiniZKCluster();
228    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
229    LOG.info("Setup first Zk");
230
231    UTIL2.setZkCluster(miniZK);
232    LOG.info("Setup second Zk");
233
234    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
235    UTIL1.startMiniCluster(NUM_SLAVES1);
236    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
237    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
238    UTIL2.startMiniCluster(NUM_SLAVES2);
239
240    admin = new ReplicationAdmin(CONF1);
241    hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
242
243    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
244        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
245            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
246        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
247
248    Connection connection1 = ConnectionFactory.createConnection(CONF1);
249    Connection connection2 = ConnectionFactory.createConnection(CONF2);
250    try (Admin admin1 = connection1.getAdmin()) {
251      admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
252    }
253    try (Admin admin2 = connection2.getAdmin()) {
254      admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
255    }
256    UTIL1.waitUntilAllRegionsAssigned(tableName);
257    UTIL2.waitUntilAllRegionsAssigned(tableName);
258    htable1 = connection1.getTable(tableName);
259    htable2 = connection2.getTable(tableName);
260  }
261
262  @BeforeClass
263  public static void setUpBeforeClass() throws Exception {
264    configureClusters(UTIL1, UTIL2);
265    startClusters();
266  }
267
268  private boolean peerExist(String peerId) throws IOException {
269    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
270  }
271
272  @Before
273  public void setUpBase() throws Exception {
274    if (!peerExist(PEER_ID2)) {
275      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
276          .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
277      hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
278    }
279  }
280
281  @After
282  public void tearDownBase() throws Exception {
283    if (peerExist(PEER_ID2)) {
284      hbaseAdmin.removeReplicationPeer(PEER_ID2);
285    }
286  }
287
288  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
289    Put put = new Put(row);
290    put.addColumn(famName, row, row);
291
292    htable1 = UTIL1.getConnection().getTable(tableName);
293    htable1.put(put);
294
295    Get get = new Get(row);
296    for (int i = 0; i < NB_RETRIES; i++) {
297      if (i == NB_RETRIES - 1) {
298        fail("Waited too much time for put replication");
299      }
300      Result res = htable2.get(get);
301      if (res.isEmpty()) {
302        LOG.info("Row not available");
303        Thread.sleep(SLEEP_TIME);
304      } else {
305        assertArrayEquals(row, res.value());
306        break;
307      }
308    }
309
310    Delete del = new Delete(row);
311    htable1.delete(del);
312
313    get = new Get(row);
314    for (int i = 0; i < NB_RETRIES; i++) {
315      if (i == NB_RETRIES - 1) {
316        fail("Waited too much time for del replication");
317      }
318      Result res = htable2.get(get);
319      if (res.size() >= 1) {
320        LOG.info("Row not deleted");
321        Thread.sleep(SLEEP_TIME);
322      } else {
323        break;
324      }
325    }
326  }
327
328  protected static void runSmallBatchTest() throws IOException, InterruptedException {
329    // normal Batch tests
330    loadData("", row);
331
332    Scan scan = new Scan();
333
334    ResultScanner scanner1 = htable1.getScanner(scan);
335    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
336    scanner1.close();
337    assertEquals(NB_ROWS_IN_BATCH, res1.length);
338
339    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
340  }
341
342  @AfterClass
343  public static void tearDownAfterClass() throws Exception {
344    if (htable2 != null) {
345      htable2.close();
346    }
347    if (htable1 != null) {
348      htable1.close();
349    }
350    if (admin != null) {
351      admin.close();
352    }
353    UTIL2.shutdownMiniCluster();
354    UTIL1.shutdownMiniCluster();
355  }
356}