001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.NavigableMap;
030import java.util.TreeMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
054import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.runners.Parameterized.Parameter;
059import org.junit.runners.Parameterized.Parameters;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * This class is only a base for other integration-level replication tests.
065 * Do not add tests here.
066 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
067 * All other tests should have their own classes and extend this one
068 */
069public class TestReplicationBase {
070/*
071  {
072    ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
073  }*/
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
076
077  protected static Configuration conf1 = HBaseConfiguration.create();
078  protected static Configuration conf2;
079  protected static Configuration CONF_WITH_LOCALFS;
080
081  protected static ZKWatcher zkw1;
082  protected static ZKWatcher zkw2;
083
084  protected static ReplicationAdmin admin;
085  protected static Admin hbaseAdmin;
086
087  protected static Table htable1;
088  protected static Table htable2;
089  protected static NavigableMap<byte[], Integer> scopes;
090
091  protected static HBaseTestingUtility utility1;
092  protected static HBaseTestingUtility utility2;
093  protected static final int NB_ROWS_IN_BATCH = 100;
094  protected static final int NB_ROWS_IN_BIG_BATCH =
095      NB_ROWS_IN_BATCH * 10;
096  protected static final long SLEEP_TIME = 500;
097  protected static final int NB_RETRIES = 10;
098
099  protected static final TableName tableName = TableName.valueOf("test");
100  protected static final byte[] famName = Bytes.toBytes("f");
101  protected static final byte[] row = Bytes.toBytes("row");
102  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
103
104  @Parameter
105  public static boolean seperateOldWALs;
106
107  @Parameters
108  public static List<Boolean> params() {
109    return Arrays.asList(false, true);
110  }
111
112  protected final void cleanUp() throws IOException, InterruptedException {
113    // Starting and stopping replication can make us miss new logs,
114    // rolling like this makes sure the most recent one gets added to the queue
115    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
116        .getRegionServerThreads()) {
117      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
118    }
119    int rowCount = utility1.countRows(tableName);
120    utility1.deleteTableData(tableName);
121    // truncating the table will send one Delete per row to the slave cluster
122    // in an async fashion, which is why we cannot just call deleteTableData on
123    // utility2 since late writes could make it to the slave in some way.
124    // Instead, we truncate the first table and wait for all the Deletes to
125    // make it to the slave.
126    Scan scan = new Scan();
127    int lastCount = 0;
128    for (int i = 0; i < NB_RETRIES; i++) {
129      if (i == NB_RETRIES - 1) {
130        fail("Waited too much time for truncate");
131      }
132      ResultScanner scanner = htable2.getScanner(scan);
133      Result[] res = scanner.next(rowCount);
134      scanner.close();
135      if (res.length != 0) {
136        if (res.length < lastCount) {
137          i--; // Don't increment timeout if we make progress
138        }
139        lastCount = res.length;
140        LOG.info("Still got " + res.length + " rows");
141        Thread.sleep(SLEEP_TIME);
142      } else {
143        break;
144      }
145    }
146  }
147
148  protected static void waitForReplication(int expectedRows, int retries)
149      throws IOException, InterruptedException {
150    Scan scan;
151    for (int i = 0; i < retries; i++) {
152      scan = new Scan();
153      if (i== retries -1) {
154        fail("Waited too much time for normal batch replication");
155      }
156      ResultScanner scanner = htable2.getScanner(scan);
157      Result[] res = scanner.next(expectedRows);
158      scanner.close();
159      if (res.length != expectedRows) {
160        LOG.info("Only got " + res.length + " rows");
161        Thread.sleep(SLEEP_TIME);
162      } else {
163        break;
164      }
165    }
166  }
167
168  protected static void loadData(String prefix, byte[] row) throws IOException {
169    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
170    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
171      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
172      put.addColumn(famName, row, row);
173      puts.add(put);
174    }
175    htable1.put(puts);
176  }
177
178  @BeforeClass
179  public static void setUpBeforeClass() throws Exception {
180    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
181    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
182    // sufficient number of events. But we don't want to go too low because
183    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
184    // more than one batch sent to the peer cluster for better testing.
185    conf1.setInt("replication.source.size.capacity", 102400);
186    conf1.setLong("replication.source.sleepforretries", 100);
187    conf1.setInt("hbase.regionserver.maxlogs", 10);
188    conf1.setLong("hbase.master.logcleaner.ttl", 10);
189    conf1.setInt("zookeeper.recovery.retry", 1);
190    conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
191    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
192    conf1.setInt("replication.stats.thread.period.seconds", 5);
193    conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
194    conf1.setLong("replication.sleep.before.failover", 2000);
195    conf1.setInt("replication.source.maxretriesmultiplier", 10);
196    conf1.setFloat("replication.source.ratio", 1.0f);
197    conf1.setBoolean("replication.source.eof.autorecovery", true);
198
199    // Parameter config
200    conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
201
202    utility1 = new HBaseTestingUtility(conf1);
203    utility1.startMiniZKCluster();
204    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
205    // Have to reget conf1 in case zk cluster location different
206    // than default
207    conf1 = utility1.getConfiguration();
208    zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
209    admin = new ReplicationAdmin(conf1);
210    LOG.info("Setup first Zk");
211
212    // Base conf2 on conf1 so it gets the right zk cluster.
213    conf2 = HBaseConfiguration.create(conf1);
214    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
215    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
216    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
217
218    utility2 = new HBaseTestingUtility(conf2);
219    utility2.setZkCluster(miniZK);
220    zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
221    LOG.info("Setup second Zk");
222
223    CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
224    utility1.startMiniCluster(2);
225    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
226    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
227    utility2.startMiniCluster(4);
228
229    ReplicationPeerConfig rpc =
230        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
231    hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
232    hbaseAdmin.addReplicationPeer("2", rpc);
233
234    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
235        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
236            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
237        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
238    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
239    for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
240      scopes.put(f.getName(), f.getScope());
241    }
242    Connection connection1 = ConnectionFactory.createConnection(conf1);
243    Connection connection2 = ConnectionFactory.createConnection(conf2);
244    try (Admin admin1 = connection1.getAdmin()) {
245      admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
246    }
247    try (Admin admin2 = connection2.getAdmin()) {
248      admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
249    }
250    utility1.waitUntilAllRegionsAssigned(tableName);
251    utility2.waitUntilAllRegionsAssigned(tableName);
252    htable1 = connection1.getTable(tableName);
253    htable2 = connection2.getTable(tableName);
254  }
255
256  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
257    Put put = new Put(row);
258    put.addColumn(famName, row, row);
259
260    htable1 = utility1.getConnection().getTable(tableName);
261    htable1.put(put);
262
263    Get get = new Get(row);
264    for (int i = 0; i < NB_RETRIES; i++) {
265      if (i == NB_RETRIES - 1) {
266        fail("Waited too much time for put replication");
267      }
268      Result res = htable2.get(get);
269      if (res.isEmpty()) {
270        LOG.info("Row not available");
271        Thread.sleep(SLEEP_TIME);
272      } else {
273        assertArrayEquals(row, res.value());
274        break;
275      }
276    }
277
278    Delete del = new Delete(row);
279    htable1.delete(del);
280
281    get = new Get(row);
282    for (int i = 0; i < NB_RETRIES; i++) {
283      if (i == NB_RETRIES - 1) {
284        fail("Waited too much time for del replication");
285      }
286      Result res = htable2.get(get);
287      if (res.size() >= 1) {
288        LOG.info("Row not deleted");
289        Thread.sleep(SLEEP_TIME);
290      } else {
291        break;
292      }
293    }
294  }
295
296  protected static void runSmallBatchTest() throws IOException, InterruptedException {
297    // normal Batch tests
298    loadData("", row);
299
300    Scan scan = new Scan();
301
302    ResultScanner scanner1 = htable1.getScanner(scan);
303    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
304    scanner1.close();
305    assertEquals(NB_ROWS_IN_BATCH, res1.length);
306
307    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
308  }
309
310  @AfterClass
311  public static void tearDownAfterClass() throws Exception {
312    htable2.close();
313    htable1.close();
314    admin.close();
315    utility2.shutdownMiniCluster();
316    utility1.shutdownMiniCluster();
317  }
318}