002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.fail;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.commons.io.IOUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.JVMClusterUtil;
050import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
051import org.junit.After;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
059 * This class is only a base for other integration-level replication tests.
060 * Do not add tests here.
061 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
062 * All other tests should have their own classes and extend this one
063 */
064public class TestReplicationBase {
065  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
067  protected static Configuration CONF_WITH_LOCALFS;
069  protected static ReplicationAdmin admin;
070  protected static Admin hbaseAdmin;
072  protected static Table htable1;
073  protected static Table htable2;
075  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
076  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
077  protected static Configuration CONF1 = UTIL1.getConfiguration();
078  protected static Configuration CONF2 = UTIL2.getConfiguration();
080  protected static final int NUM_SLAVES1 = 1;
081  protected static final int NUM_SLAVES2 = 1;
082  protected static final int NB_ROWS_IN_BATCH = 100;
083  protected static final int NB_ROWS_IN_BIG_BATCH =
084      NB_ROWS_IN_BATCH * 10;
085  protected static final long SLEEP_TIME = 500;
086  protected static final int NB_RETRIES = 50;
088  protected static final TableName tableName = TableName.valueOf("test");
089  protected static final byte[] famName = Bytes.toBytes("f");
090  protected static final byte[] row = Bytes.toBytes("row");
091  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
092  protected static final String PEER_ID2 = "2";
094  protected boolean isSerialPeer() {
095    return false;
096  }
098  protected final void cleanUp() throws IOException, InterruptedException {
099    // Starting and stopping replication can make us miss new logs,
100    // rolling like this makes sure the most recent one gets added to the queue
101    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
102        .getRegionServerThreads()) {
103      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
104    }
105    int rowCount = UTIL1.countRows(tableName);
106    UTIL1.deleteTableData(tableName);
107    // truncating the table will send one Delete per row to the slave cluster
108    // in an async fashion, which is why we cannot just call deleteTableData on
109    // utility2 since late writes could make it to the slave in some way.
110    // Instead, we truncate the first table and wait for all the Deletes to
111    // make it to the slave.
112    Scan scan = new Scan();
113    int lastCount = 0;
114    for (int i = 0; i < NB_RETRIES; i++) {
115      if (i == NB_RETRIES - 1) {
116        fail("Waited too much time for truncate");
117      }
118      ResultScanner scanner = htable2.getScanner(scan);
119      Result[] res = scanner.next(rowCount);
120      scanner.close();
121      if (res.length != 0) {
122        if (res.length < lastCount) {
123          i--; // Don't increment timeout if we make progress
124        }
125        lastCount = res.length;
126        LOG.info("Still got " + res.length + " rows");
127        Thread.sleep(SLEEP_TIME);
128      } else {
129        break;
130      }
131    }
132  }
134  protected static void waitForReplication(int expectedRows, int retries)
135      throws IOException, InterruptedException {
136    waitForReplication(htable2, expectedRows, retries);
137  }
139  protected static void waitForReplication(Table htable2, int expectedRows, int retries)
140      throws IOException, InterruptedException {
141    Scan scan;
142    for (int i = 0; i < retries; i++) {
143      scan = new Scan();
144      if (i== retries -1) {
145        fail("Waited too much time for normal batch replication");
146      }
147      ResultScanner scanner = htable2.getScanner(scan);
148      Result[] res = scanner.next(expectedRows);
149      scanner.close();
150      if (res.length != expectedRows) {
151        LOG.info("Only got " + res.length + " rows");
152        Thread.sleep(SLEEP_TIME);
153      } else {
154        break;
155      }
156    }
157  }
159  protected static void loadData(String prefix, byte[] row) throws IOException {
160    loadData(prefix, row, famName);
161  }
163  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
164    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
165    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
166      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
167      put.addColumn(familyName, row, row);
168      puts.add(put);
169    }
170    htable1.put(puts);
171  }
173  protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
174    Configuration conf = util.getConfiguration();
175    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
176    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
177    // sufficient number of events. But we don't want to go too low because
178    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
179    // more than one batch sent to the peer cluster for better testing.
180    conf.setInt("replication.source.size.capacity", 102400);
181    conf.setLong("replication.source.sleepforretries", 100);
182    conf.setInt("hbase.regionserver.maxlogs", 10);
183    conf.setLong("hbase.master.logcleaner.ttl", 10);
184    conf.setInt("zookeeper.recovery.retry", 1);
185    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
186    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
187    conf.setInt("replication.stats.thread.period.seconds", 5);
188    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
189    conf.setLong("replication.sleep.before.failover", 2000);
190    conf.setInt("replication.source.maxretriesmultiplier", 10);
191    conf.setFloat("replication.source.ratio", 1.0f);
192    conf.setBoolean("replication.source.eof.autorecovery", true);
193    conf.setLong("hbase.serial.replication.waiting.ms", 100);
194  }
196  static void configureClusters(HBaseTestingUtility util1,
197      HBaseTestingUtility util2) {
198    setupConfig(util1, "/1");
199    setupConfig(util2, "/2");
201    Configuration conf2 = util2.getConfiguration();
202    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
203    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
204    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
205  }
207  static void restartSourceCluster(int numSlaves)
208      throws Exception {
209    IOUtils.closeQuietly(hbaseAdmin, htable1);
210    UTIL1.shutdownMiniHBaseCluster();
211    UTIL1.restartHBaseCluster(numSlaves);
212    // Invalidate the cached connection state.
213    CONF1 = UTIL1.getConfiguration();
214    hbaseAdmin = UTIL1.getAdmin();
215    Connection connection1 = UTIL1.getConnection();
216    htable1 = connection1.getTable(tableName);
217  }
219  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
220    IOUtils.closeQuietly(htable2);
221    UTIL2.restartHBaseCluster(numSlaves);
222    // Invalidate the cached connection state
223    CONF2 = UTIL2.getConfiguration();
224    htable2 = UTIL2.getConnection().getTable(tableName);
225  }
227  private static void startClusters() throws Exception {
228    UTIL1.startMiniZKCluster();
229    MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
230    LOG.info("Setup first Zk");
232    UTIL2.setZkCluster(miniZK);
233    LOG.info("Setup second Zk");
235    CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
236    UTIL1.startMiniCluster(NUM_SLAVES1);
237    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
238    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
239    UTIL2.startMiniCluster(NUM_SLAVES2);
241    admin = new ReplicationAdmin(CONF1);
242    hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
244    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
245        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
246            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
247        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
249    Connection connection1 = ConnectionFactory.createConnection(CONF1);
250    Connection connection2 = ConnectionFactory.createConnection(CONF2);
251    try (Admin admin1 = connection1.getAdmin()) {
252      admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
253    }
254    try (Admin admin2 = connection2.getAdmin()) {
255      admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
256    }
257    UTIL1.waitUntilAllRegionsAssigned(tableName);
258    UTIL2.waitUntilAllRegionsAssigned(tableName);
259    htable1 = connection1.getTable(tableName);
260    htable2 = connection2.getTable(tableName);
261  }
263  @BeforeClass
264  public static void setUpBeforeClass() throws Exception {
265    configureClusters(UTIL1, UTIL2);
266    startClusters();
267  }
269  private boolean peerExist(String peerId) throws IOException {
270    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
271  }
273  @Before
274  public void setUpBase() throws Exception {
275    if (!peerExist(PEER_ID2)) {
276      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
277          .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
278      hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
279    }
280  }
282  @After
283  public void tearDownBase() throws Exception {
284    if (peerExist(PEER_ID2)) {
285      hbaseAdmin.removeReplicationPeer(PEER_ID2);
286    }
287  }
289  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
290    Put put = new Put(row);
291    put.addColumn(famName, row, row);
293    htable1 = UTIL1.getConnection().getTable(tableName);
294    htable1.put(put);
296    Get get = new Get(row);
297    for (int i = 0; i < NB_RETRIES; i++) {
298      if (i == NB_RETRIES - 1) {
299        fail("Waited too much time for put replication");
300      }
301      Result res = htable2.get(get);
302      if (res.isEmpty()) {
303        LOG.info("Row not available");
304        Thread.sleep(SLEEP_TIME);
305      } else {
306        assertArrayEquals(row, res.value());
307        break;
308      }
309    }
311    Delete del = new Delete(row);
312    htable1.delete(del);
314    get = new Get(row);
315    for (int i = 0; i < NB_RETRIES; i++) {
316      if (i == NB_RETRIES - 1) {
317        fail("Waited too much time for del replication");
318      }
319      Result res = htable2.get(get);
320      if (res.size() >= 1) {
321        LOG.info("Row not deleted");
322        Thread.sleep(SLEEP_TIME);
323      } else {
324        break;
325      }
326    }
327  }
329  protected static void runSmallBatchTest() throws IOException, InterruptedException {
330    // normal Batch tests
331    loadData("", row);
333    Scan scan = new Scan();
335    ResultScanner scanner1 = htable1.getScanner(scan);
336    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
337    scanner1.close();
338    assertEquals(NB_ROWS_IN_BATCH, res1.length);
340    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
341  }
343  @AfterClass
344  public static void tearDownAfterClass() throws Exception {
345    if (htable2 != null) {
346      htable2.close();
347    }
348    if (htable1 != null) {
349      htable1.close();
350    }
351    if (admin != null) {
352      admin.close();
353    }
354    UTIL2.shutdownMiniCluster();
355    UTIL1.shutdownMiniCluster();
356  }