001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.concurrent.CountDownLatch;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.ReplicationTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Tag(ReplicationTests.TAG)
060@Tag(LargeTests.TAG)
061public class TestMultiSlaveReplication {
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class);
064
065  private static Configuration conf1;
066  private static Configuration conf2;
067  private static Configuration conf3;
068
069  private static HBaseTestingUtil utility1;
070  private static HBaseTestingUtil utility2;
071  private static HBaseTestingUtil utility3;
072  private static final long SLEEP_TIME = 500;
073  private static final int NB_RETRIES = 100;
074
075  private static final TableName tableName = TableName.valueOf("test");
076  private static final byte[] famName = Bytes.toBytes("f");
077  private static final byte[] row = Bytes.toBytes("row");
078  private static final byte[] row1 = Bytes.toBytes("row1");
079  private static final byte[] row2 = Bytes.toBytes("row2");
080  private static final byte[] row3 = Bytes.toBytes("row3");
081  private static final byte[] noRepfamName = Bytes.toBytes("norep");
082
083  private static TableDescriptor table;
084
085  @BeforeAll
086  public static void setUpBeforeClass() throws Exception {
087    conf1 = HBaseConfiguration.create();
088    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
089    // smaller block size and capacity to trigger more operations
090    // and test them
091    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
092    conf1.setInt("replication.source.size.capacity", 1024);
093    conf1.setLong("replication.source.sleepforretries", 100);
094    conf1.setInt("hbase.regionserver.maxlogs", 10);
095    conf1.setLong("hbase.master.logcleaner.ttl", 10);
096    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
097    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
098      "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
099    conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
100
101    utility1 = new HBaseTestingUtil(conf1);
102    utility1.startMiniZKCluster();
103    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
104    utility1.setZkCluster(miniZK);
105
106    conf2 = new Configuration(conf1);
107    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
108
109    conf3 = new Configuration(conf1);
110    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
111
112    utility2 = new HBaseTestingUtil(conf2);
113    utility2.setZkCluster(miniZK);
114
115    utility3 = new HBaseTestingUtil(conf3);
116    utility3.setZkCluster(miniZK);
117
118    table = TableDescriptorBuilder.newBuilder(tableName)
119      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
120        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
121      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
122  }
123
124  @Test
125  public void testMultiSlaveReplication() throws Exception {
126    LOG.info("testCyclicReplication");
127    utility1.startMiniCluster();
128    utility2.startMiniCluster();
129    utility3.startMiniCluster();
130    try (Connection conn = ConnectionFactory.createConnection(conf1);
131      Admin admin1 = conn.getAdmin()) {
132      utility1.getAdmin().createTable(table);
133      utility2.getAdmin().createTable(table);
134      utility3.getAdmin().createTable(table);
135      Table htable1 = utility1.getConnection().getTable(tableName);
136      Table htable2 = utility2.getConnection().getTable(tableName);
137      Table htable3 = utility3.getConnection().getTable(tableName);
138
139      ReplicationPeerConfigBuilder rpcBuilder =
140        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI());
141      admin1.addReplicationPeer("1", rpcBuilder.build());
142
143      // put "row" and wait 'til it got around, then delete
144      putAndWait(row, famName, htable1, htable2);
145      deleteAndWait(row, htable1, htable2);
146      // check it wasn't replication to cluster 3
147      checkRow(row, 0, htable3);
148
149      putAndWait(row2, famName, htable1, htable2);
150
151      // now roll the region server's logs
152      rollWALAndWait(utility1, htable1.getName(), row2);
153
154      // after the log was rolled put a new row
155      putAndWait(row3, famName, htable1, htable2);
156
157      rpcBuilder.setClusterKey(utility3.getRpcConnnectionURI());
158      admin1.addReplicationPeer("2", rpcBuilder.build());
159
160      // put a row, check it was replicated to all clusters
161      putAndWait(row1, famName, htable1, htable2, htable3);
162      // delete and verify
163      deleteAndWait(row1, htable1, htable2, htable3);
164
165      // make sure row2 did not get replicated after
166      // cluster 3 was added
167      checkRow(row2, 0, htable3);
168
169      // row3 will get replicated, because it was in the
170      // latest log
171      checkRow(row3, 1, htable3);
172
173      Put p = new Put(row);
174      p.addColumn(famName, row, row);
175      htable1.put(p);
176      // now roll the logs again
177      rollWALAndWait(utility1, htable1.getName(), row);
178
179      // cleanup "row2", also conveniently use this to wait replication
180      // to finish
181      deleteAndWait(row2, htable1, htable2, htable3);
182      // Even if the log was rolled in the middle of the replication
183      // "row" is still replication.
184      checkRow(row, 1, htable2);
185      // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
186      // we should wait before checking.
187      checkWithWait(row, 1, htable3);
188
189      // cleanup the rest
190      deleteAndWait(row, htable1, htable2, htable3);
191      deleteAndWait(row3, htable1, htable2, htable3);
192
193      utility3.shutdownMiniCluster();
194      utility2.shutdownMiniCluster();
195      utility1.shutdownMiniCluster();
196    }
197  }
198
199  private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table,
200    final byte[] row) throws IOException {
201    final Admin admin = utility.getAdmin();
202    final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster();
203
204    // find the region that corresponds to the given row.
205    HRegion region = null;
206    for (HRegion candidate : cluster.getRegions(table)) {
207      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
208        region = candidate;
209        break;
210      }
211    }
212    assertNotNull(region, "Couldn't find the region for row '" + Arrays.toString(row) + "'");
213
214    final CountDownLatch latch = new CountDownLatch(1);
215
216    // listen for successful log rolls
217    final WALActionsListener listener = new WALActionsListener() {
218      @Override
219      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
220        latch.countDown();
221      }
222    };
223    region.getWAL().registerWALActionsListener(listener);
224
225    // request a roll
226    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
227      region.getRegionInfo().getRegionName()));
228
229    // wait
230    try {
231      latch.await();
232    } catch (InterruptedException exception) {
233      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
234        + "replication tests fail, it's probably because we should still be waiting.");
235      Thread.currentThread().interrupt();
236    }
237    region.getWAL().unregisterWALActionsListener(listener);
238  }
239
240  private void checkWithWait(byte[] row, int count, Table table) throws Exception {
241    Get get = new Get(row);
242    for (int i = 0; i < NB_RETRIES; i++) {
243      if (i == NB_RETRIES - 1) {
244        fail("Waited too much time while getting the row.");
245      }
246      boolean rowReplicated = false;
247      Result res = table.get(get);
248      if (res.size() >= 1) {
249        LOG.info("Row is replicated");
250        rowReplicated = true;
251        assertEquals(count, res.size(),
252          "Table '" + table + "' did not have the expected number of  results.");
253        break;
254      }
255      if (rowReplicated) {
256        break;
257      } else {
258        Thread.sleep(SLEEP_TIME);
259      }
260    }
261  }
262
263  private void checkRow(byte[] row, int count, Table... tables) throws IOException {
264    Get get = new Get(row);
265    for (Table table : tables) {
266      Result res = table.get(get);
267      assertEquals(count, res.size(),
268        "Table '" + table + "' did not have the expected number of results.");
269    }
270  }
271
272  private void deleteAndWait(byte[] row, Table source, Table... targets) throws Exception {
273    Delete del = new Delete(row);
274    source.delete(del);
275
276    Get get = new Get(row);
277    for (int i = 0; i < NB_RETRIES; i++) {
278      if (i == NB_RETRIES - 1) {
279        fail("Waited too much time for del replication");
280      }
281      boolean removedFromAll = true;
282      for (Table target : targets) {
283        Result res = target.get(get);
284        if (res.size() >= 1) {
285          LOG.info("Row not deleted");
286          removedFromAll = false;
287          break;
288        }
289      }
290      if (removedFromAll) {
291        break;
292      } else {
293        Thread.sleep(SLEEP_TIME);
294      }
295    }
296  }
297
298  private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) throws Exception {
299    Put put = new Put(row);
300    put.addColumn(fam, row, row);
301    source.put(put);
302
303    Get get = new Get(row);
304    for (int i = 0; i < NB_RETRIES; i++) {
305      if (i == NB_RETRIES - 1) {
306        fail("Waited too much time for put replication");
307      }
308      boolean replicatedToAll = true;
309      for (Table target : targets) {
310        Result res = target.get(get);
311        if (res.isEmpty()) {
312          LOG.info("Row not available");
313          replicatedToAll = false;
314          break;
315        } else {
316          assertArrayEquals(res.value(), row);
317        }
318      }
319      if (replicatedToAll) {
320        break;
321      } else {
322        Thread.sleep(SLEEP_TIME);
323      }
324    }
325  }
326
327}