001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.JVMClusterUtil;
052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
053import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * This class is only a base for other integration-level replication tests.
063 * Do not add tests here.
064 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
065 * All other tests should have their own classes and extend this one
066 */
067public class TestReplicationBase {
068  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
069
070  protected static Configuration conf1 = HBaseConfiguration.create();
071  protected static Configuration conf2;
072  protected static Configuration CONF_WITH_LOCALFS;
073
074  protected static ZKWatcher zkw1;
075  protected static ZKWatcher zkw2;
076
077  protected static ReplicationAdmin admin;
078  protected static Admin hbaseAdmin;
079
080  protected static Table htable1;
081  protected static Table htable2;
082  protected static NavigableMap<byte[], Integer> scopes;
083
084  protected static HBaseTestingUtility utility1;
085  protected static HBaseTestingUtility utility2;
086  protected static final int NB_ROWS_IN_BATCH = 100;
087  protected static final int NB_ROWS_IN_BIG_BATCH =
088      NB_ROWS_IN_BATCH * 10;
089  protected static final long SLEEP_TIME = 500;
090  protected static final int NB_RETRIES = 50;
091
092  protected static final TableName tableName = TableName.valueOf("test");
093  protected static final byte[] famName = Bytes.toBytes("f");
094  protected static final byte[] row = Bytes.toBytes("row");
095  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
096  protected static final String PEER_ID2 = "2";
097
098  protected boolean isSerialPeer() {
099    return false;
100  }
101
102  protected final void cleanUp() throws IOException, InterruptedException {
103    // Starting and stopping replication can make us miss new logs,
104    // rolling like this makes sure the most recent one gets added to the queue
105    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
106        .getRegionServerThreads()) {
107      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
108    }
109    int rowCount = utility1.countRows(tableName);
110    utility1.deleteTableData(tableName);
111    // truncating the table will send one Delete per row to the slave cluster
112    // in an async fashion, which is why we cannot just call deleteTableData on
113    // utility2 since late writes could make it to the slave in some way.
114    // Instead, we truncate the first table and wait for all the Deletes to
115    // make it to the slave.
116    Scan scan = new Scan();
117    int lastCount = 0;
118    for (int i = 0; i < NB_RETRIES; i++) {
119      if (i == NB_RETRIES - 1) {
120        fail("Waited too much time for truncate");
121      }
122      ResultScanner scanner = htable2.getScanner(scan);
123      Result[] res = scanner.next(rowCount);
124      scanner.close();
125      if (res.length != 0) {
126        if (res.length < lastCount) {
127          i--; // Don't increment timeout if we make progress
128        }
129        lastCount = res.length;
130        LOG.info("Still got " + res.length + " rows");
131        Thread.sleep(SLEEP_TIME);
132      } else {
133        break;
134      }
135    }
136  }
137
138  protected static void waitForReplication(int expectedRows, int retries)
139      throws IOException, InterruptedException {
140    Scan scan;
141    for (int i = 0; i < retries; i++) {
142      scan = new Scan();
143      if (i== retries -1) {
144        fail("Waited too much time for normal batch replication");
145      }
146      ResultScanner scanner = htable2.getScanner(scan);
147      Result[] res = scanner.next(expectedRows);
148      scanner.close();
149      if (res.length != expectedRows) {
150        LOG.info("Only got " + res.length + " rows");
151        Thread.sleep(SLEEP_TIME);
152      } else {
153        break;
154      }
155    }
156  }
157
158  protected static void loadData(String prefix, byte[] row) throws IOException {
159    loadData(prefix, row, famName);
160  }
161
162  protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
163    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
164    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
165      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
166      put.addColumn(familyName, row, row);
167      puts.add(put);
168    }
169    htable1.put(puts);
170  }
171
172  @BeforeClass
173  public static void setUpBeforeClass() throws Exception {
174    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
175    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
176    // sufficient number of events. But we don't want to go too low because
177    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
178    // more than one batch sent to the peer cluster for better testing.
179    conf1.setInt("replication.source.size.capacity", 102400);
180    conf1.setLong("replication.source.sleepforretries", 100);
181    conf1.setInt("hbase.regionserver.maxlogs", 10);
182    conf1.setLong("hbase.master.logcleaner.ttl", 10);
183    conf1.setInt("zookeeper.recovery.retry", 1);
184    conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
185    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
186    conf1.setInt("replication.stats.thread.period.seconds", 5);
187    conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
188    conf1.setLong("replication.sleep.before.failover", 2000);
189    conf1.setInt("replication.source.maxretriesmultiplier", 10);
190    conf1.setFloat("replication.source.ratio", 1.0f);
191    conf1.setBoolean("replication.source.eof.autorecovery", true);
192    conf1.setLong("hbase.serial.replication.waiting.ms", 100);
193
194    utility1 = new HBaseTestingUtility(conf1);
195    utility1.startMiniZKCluster();
196    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
197    // Have to reget conf1 in case zk cluster location different
198    // than default
199    conf1 = utility1.getConfiguration();
200    zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
201    admin = new ReplicationAdmin(conf1);
202    LOG.info("Setup first Zk");
203
204    // Base conf2 on conf1 so it gets the right zk cluster.
205    conf2 = HBaseConfiguration.create(conf1);
206    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
207    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
208    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
209
210    utility2 = new HBaseTestingUtility(conf2);
211    utility2.setZkCluster(miniZK);
212    zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
213    LOG.info("Setup second Zk");
214
215    CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
216    utility1.startMiniCluster(2);
217    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
218    // as a component in deciding maximum number of parallel batches to send to the peer cluster.
219    utility2.startMiniCluster(4);
220
221    hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
222
223    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
224        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
225            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
226        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
227    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
228    for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
229      scopes.put(f.getName(), f.getScope());
230    }
231    Connection connection1 = ConnectionFactory.createConnection(conf1);
232    Connection connection2 = ConnectionFactory.createConnection(conf2);
233    try (Admin admin1 = connection1.getAdmin()) {
234      admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
235    }
236    try (Admin admin2 = connection2.getAdmin()) {
237      admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
238    }
239    utility1.waitUntilAllRegionsAssigned(tableName);
240    utility2.waitUntilAllRegionsAssigned(tableName);
241    htable1 = connection1.getTable(tableName);
242    htable2 = connection2.getTable(tableName);
243  }
244
245  private boolean peerExist(String peerId) throws IOException {
246    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
247  }
248
249  @Before
250  public void setUpBase() throws Exception {
251    if (!peerExist(PEER_ID2)) {
252      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
253          .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build();
254      hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
255    }
256  }
257
258  @After
259  public void tearDownBase() throws Exception {
260    if (peerExist(PEER_ID2)) {
261      hbaseAdmin.removeReplicationPeer(PEER_ID2);
262    }
263  }
264
265  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
266    Put put = new Put(row);
267    put.addColumn(famName, row, row);
268
269    htable1 = utility1.getConnection().getTable(tableName);
270    htable1.put(put);
271
272    Get get = new Get(row);
273    for (int i = 0; i < NB_RETRIES; i++) {
274      if (i == NB_RETRIES - 1) {
275        fail("Waited too much time for put replication");
276      }
277      Result res = htable2.get(get);
278      if (res.isEmpty()) {
279        LOG.info("Row not available");
280        Thread.sleep(SLEEP_TIME);
281      } else {
282        assertArrayEquals(row, res.value());
283        break;
284      }
285    }
286
287    Delete del = new Delete(row);
288    htable1.delete(del);
289
290    get = new Get(row);
291    for (int i = 0; i < NB_RETRIES; i++) {
292      if (i == NB_RETRIES - 1) {
293        fail("Waited too much time for del replication");
294      }
295      Result res = htable2.get(get);
296      if (res.size() >= 1) {
297        LOG.info("Row not deleted");
298        Thread.sleep(SLEEP_TIME);
299      } else {
300        break;
301      }
302    }
303  }
304
305  protected static void runSmallBatchTest() throws IOException, InterruptedException {
306    // normal Batch tests
307    loadData("", row);
308
309    Scan scan = new Scan();
310
311    ResultScanner scanner1 = htable1.getScanner(scan);
312    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
313    scanner1.close();
314    assertEquals(NB_ROWS_IN_BATCH, res1.length);
315
316    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
317  }
318
319  @AfterClass
320  public static void tearDownAfterClass() throws Exception {
321    htable2.close();
322    htable1.close();
323    admin.close();
324    utility2.shutdownMiniCluster();
325    utility1.shutdownMiniCluster();
326  }
327}