001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.concurrent.CountDownLatch;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.MiniHBaseCluster;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.ReplicationTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
052import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({ReplicationTests.class, LargeTests.class})
061public class TestMultiSlaveReplication {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestMultiSlaveReplication.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class);
068
069  private static Configuration conf1;
070  private static Configuration conf2;
071  private static Configuration conf3;
072
073  private static HBaseTestingUtility utility1;
074  private static HBaseTestingUtility utility2;
075  private static HBaseTestingUtility utility3;
076  private static final long SLEEP_TIME = 500;
077  private static final int NB_RETRIES = 100;
078
079  private static final TableName tableName = TableName.valueOf("test");
080  private static final byte[] famName = Bytes.toBytes("f");
081  private static final byte[] row = Bytes.toBytes("row");
082  private static final byte[] row1 = Bytes.toBytes("row1");
083  private static final byte[] row2 = Bytes.toBytes("row2");
084  private static final byte[] row3 = Bytes.toBytes("row3");
085  private static final byte[] noRepfamName = Bytes.toBytes("norep");
086
087  private static HTableDescriptor table;
088
089  @BeforeClass
090  public static void setUpBeforeClass() throws Exception {
091    conf1 = HBaseConfiguration.create();
092    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
093    // smaller block size and capacity to trigger more operations
094    // and test them
095    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
096    conf1.setInt("replication.source.size.capacity", 1024);
097    conf1.setLong("replication.source.sleepforretries", 100);
098    conf1.setInt("hbase.regionserver.maxlogs", 10);
099    conf1.setLong("hbase.master.logcleaner.ttl", 10);
100    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
101    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
102        "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
103    conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
104
105    utility1 = new HBaseTestingUtility(conf1);
106    utility1.startMiniZKCluster();
107    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
108    utility1.setZkCluster(miniZK);
109    new ZKWatcher(conf1, "cluster1", null, true);
110
111    conf2 = new Configuration(conf1);
112    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
113
114    conf3 = new Configuration(conf1);
115    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
116
117    utility2 = new HBaseTestingUtility(conf2);
118    utility2.setZkCluster(miniZK);
119    new ZKWatcher(conf2, "cluster2", null, true);
120
121    utility3 = new HBaseTestingUtility(conf3);
122    utility3.setZkCluster(miniZK);
123    new ZKWatcher(conf3, "cluster3", null, true);
124
125    table = new HTableDescriptor(tableName);
126    HColumnDescriptor fam = new HColumnDescriptor(famName);
127    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
128    table.addFamily(fam);
129    fam = new HColumnDescriptor(noRepfamName);
130    table.addFamily(fam);
131  }
132
133  @Test
134  public void testMultiSlaveReplication() throws Exception {
135    LOG.info("testCyclicReplication");
136    MiniHBaseCluster master = utility1.startMiniCluster();
137    utility2.startMiniCluster();
138    utility3.startMiniCluster();
139    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
140
141    utility1.getAdmin().createTable(table);
142    utility2.getAdmin().createTable(table);
143    utility3.getAdmin().createTable(table);
144    Table htable1 = utility1.getConnection().getTable(tableName);
145    Table htable2 = utility2.getConnection().getTable(tableName);
146    Table htable3 = utility3.getConnection().getTable(tableName);
147
148    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
149    rpc.setClusterKey(utility2.getClusterKey());
150    admin1.addPeer("1", rpc, null);
151
152    // put "row" and wait 'til it got around, then delete
153    putAndWait(row, famName, htable1, htable2);
154    deleteAndWait(row, htable1, htable2);
155    // check it wasn't replication to cluster 3
156    checkRow(row,0,htable3);
157
158    putAndWait(row2, famName, htable1, htable2);
159
160    // now roll the region server's logs
161    rollWALAndWait(utility1, htable1.getName(), row2);
162
163    // after the log was rolled put a new row
164    putAndWait(row3, famName, htable1, htable2);
165
166    rpc = new ReplicationPeerConfig();
167    rpc.setClusterKey(utility3.getClusterKey());
168    admin1.addPeer("2", rpc, null);
169
170    // put a row, check it was replicated to all clusters
171    putAndWait(row1, famName, htable1, htable2, htable3);
172    // delete and verify
173    deleteAndWait(row1, htable1, htable2, htable3);
174
175    // make sure row2 did not get replicated after
176    // cluster 3 was added
177    checkRow(row2,0,htable3);
178
179    // row3 will get replicated, because it was in the
180    // latest log
181    checkRow(row3,1,htable3);
182
183    Put p = new Put(row);
184    p.addColumn(famName, row, row);
185    htable1.put(p);
186    // now roll the logs again
187    rollWALAndWait(utility1, htable1.getName(), row);
188
189    // cleanup "row2", also conveniently use this to wait replication
190    // to finish
191    deleteAndWait(row2, htable1, htable2, htable3);
192    // Even if the log was rolled in the middle of the replication
193    // "row" is still replication.
194    checkRow(row, 1, htable2);
195    // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
196    // we should wait before checking.
197    checkWithWait(row, 1, htable3);
198
199    // cleanup the rest
200    deleteAndWait(row, htable1, htable2, htable3);
201    deleteAndWait(row3, htable1, htable2, htable3);
202
203    utility3.shutdownMiniCluster();
204    utility2.shutdownMiniCluster();
205    utility1.shutdownMiniCluster();
206  }
207
208  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
209      final byte[] row) throws IOException {
210    final Admin admin = utility.getAdmin();
211    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
212
213    // find the region that corresponds to the given row.
214    HRegion region = null;
215    for (HRegion candidate : cluster.getRegions(table)) {
216      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
217        region = candidate;
218        break;
219      }
220    }
221    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
222
223    final CountDownLatch latch = new CountDownLatch(1);
224
225    // listen for successful log rolls
226    final WALActionsListener listener = new WALActionsListener() {
227          @Override
228          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
229            latch.countDown();
230          }
231        };
232    region.getWAL().registerWALActionsListener(listener);
233
234    // request a roll
235    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
236      region.getRegionInfo().getRegionName()));
237
238    // wait
239    try {
240      latch.await();
241    } catch (InterruptedException exception) {
242      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
243          "replication tests fail, it's probably because we should still be waiting.");
244      Thread.currentThread().interrupt();
245    }
246    region.getWAL().unregisterWALActionsListener(listener);
247  }
248
249
250  private void checkWithWait(byte[] row, int count, Table table) throws Exception {
251    Get get = new Get(row);
252    for (int i = 0; i < NB_RETRIES; i++) {
253      if (i == NB_RETRIES - 1) {
254        fail("Waited too much time while getting the row.");
255      }
256      boolean rowReplicated = false;
257      Result res = table.get(get);
258      if (res.size() >= 1) {
259        LOG.info("Row is replicated");
260        rowReplicated = true;
261        assertEquals("Table '" + table + "' did not have the expected number of  results.",
262            count, res.size());
263        break;
264      }
265      if (rowReplicated) {
266        break;
267      } else {
268        Thread.sleep(SLEEP_TIME);
269      }
270    }
271  }
272
273  private void checkRow(byte[] row, int count, Table... tables) throws IOException {
274    Get get = new Get(row);
275    for (Table table : tables) {
276      Result res = table.get(get);
277      assertEquals("Table '" + table + "' did not have the expected number of results.",
278          count, res.size());
279    }
280  }
281
282  private void deleteAndWait(byte[] row, Table source, Table... targets)
283  throws Exception {
284    Delete del = new Delete(row);
285    source.delete(del);
286
287    Get get = new Get(row);
288    for (int i = 0; i < NB_RETRIES; i++) {
289      if (i==NB_RETRIES-1) {
290        fail("Waited too much time for del replication");
291      }
292      boolean removedFromAll = true;
293      for (Table target : targets) {
294        Result res = target.get(get);
295        if (res.size() >= 1) {
296          LOG.info("Row not deleted");
297          removedFromAll = false;
298          break;
299        }
300      }
301      if (removedFromAll) {
302        break;
303      } else {
304        Thread.sleep(SLEEP_TIME);
305      }
306    }
307  }
308
309  private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets)
310  throws Exception {
311    Put put = new Put(row);
312    put.addColumn(fam, row, row);
313    source.put(put);
314
315    Get get = new Get(row);
316    for (int i = 0; i < NB_RETRIES; i++) {
317      if (i==NB_RETRIES-1) {
318        fail("Waited too much time for put replication");
319      }
320      boolean replicatedToAll = true;
321      for (Table target : targets) {
322        Result res = target.get(get);
323        if (res.isEmpty()) {
324          LOG.info("Row not available");
325          replicatedToAll = false;
326          break;
327        } else {
328          assertArrayEquals(res.value(), row);
329        }
330      }
331      if (replicatedToAll) {
332        break;
333      } else {
334        Thread.sleep(SLEEP_TIME);
335      }
336    }
337  }
338
339}
340