001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.ReplicationTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 052import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category({ ReplicationTests.class, LargeTests.class }) 061public class TestMultiSlaveReplication { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestMultiSlaveReplication.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class); 068 069 private static Configuration conf1; 070 private static Configuration conf2; 071 private static Configuration conf3; 072 073 private static HBaseTestingUtility utility1; 074 private static HBaseTestingUtility utility2; 075 private static HBaseTestingUtility utility3; 076 private static final long SLEEP_TIME = 500; 077 private static final int NB_RETRIES = 100; 078 079 private static final TableName tableName = TableName.valueOf("test"); 080 private static final byte[] famName = Bytes.toBytes("f"); 081 private static final byte[] row = Bytes.toBytes("row"); 082 private static final byte[] row1 = Bytes.toBytes("row1"); 083 private static final byte[] row2 = Bytes.toBytes("row2"); 084 private static final byte[] row3 = Bytes.toBytes("row3"); 085 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 086 087 private static HTableDescriptor table; 088 089 @BeforeClass 090 public static void setUpBeforeClass() throws Exception { 091 conf1 = HBaseConfiguration.create(); 092 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 093 // smaller block size and capacity to trigger more operations 094 // and test them 095 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 096 conf1.setInt("replication.source.size.capacity", 1024); 097 conf1.setLong("replication.source.sleepforretries", 100); 098 conf1.setInt("hbase.regionserver.maxlogs", 10); 099 conf1.setLong("hbase.master.logcleaner.ttl", 10); 100 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 101 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 102 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 103 conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); 104 105 utility1 = new HBaseTestingUtility(conf1); 106 utility1.startMiniZKCluster(); 107 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 108 utility1.setZkCluster(miniZK); 109 new ZKWatcher(conf1, "cluster1", null, true); 110 111 conf2 = new Configuration(conf1); 112 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 113 114 conf3 = new Configuration(conf1); 115 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 116 117 utility2 = new HBaseTestingUtility(conf2); 118 utility2.setZkCluster(miniZK); 119 new ZKWatcher(conf2, "cluster2", null, true); 120 121 utility3 = new HBaseTestingUtility(conf3); 122 utility3.setZkCluster(miniZK); 123 new ZKWatcher(conf3, "cluster3", null, true); 124 125 table = new HTableDescriptor(tableName); 126 HColumnDescriptor fam = new HColumnDescriptor(famName); 127 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 128 table.addFamily(fam); 129 fam = new HColumnDescriptor(noRepfamName); 130 table.addFamily(fam); 131 } 132 133 @Test 134 public void testMultiSlaveReplication() throws Exception { 135 LOG.info("testCyclicReplication"); 136 MiniHBaseCluster master = utility1.startMiniCluster(); 137 utility2.startMiniCluster(); 138 utility3.startMiniCluster(); 139 ReplicationAdmin admin1 = new ReplicationAdmin(conf1); 140 141 utility1.getAdmin().createTable(table); 142 utility2.getAdmin().createTable(table); 143 utility3.getAdmin().createTable(table); 144 Table htable1 = utility1.getConnection().getTable(tableName); 145 Table htable2 = utility2.getConnection().getTable(tableName); 146 Table htable3 = utility3.getConnection().getTable(tableName); 147 148 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 149 rpc.setClusterKey(utility2.getClusterKey()); 150 admin1.addPeer("1", rpc, null); 151 152 // put "row" and wait 'til it got around, then delete 153 putAndWait(row, famName, htable1, htable2); 154 deleteAndWait(row, htable1, htable2); 155 // check it wasn't replication to cluster 3 156 checkRow(row, 0, htable3); 157 158 putAndWait(row2, famName, htable1, htable2); 159 160 // now roll the region server's logs 161 rollWALAndWait(utility1, htable1.getName(), row2); 162 163 // after the log was rolled put a new row 164 putAndWait(row3, famName, htable1, htable2); 165 166 rpc = new ReplicationPeerConfig(); 167 rpc.setClusterKey(utility3.getClusterKey()); 168 admin1.addPeer("2", rpc, null); 169 170 // put a row, check it was replicated to all clusters 171 putAndWait(row1, famName, htable1, htable2, htable3); 172 // delete and verify 173 deleteAndWait(row1, htable1, htable2, htable3); 174 175 // make sure row2 did not get replicated after 176 // cluster 3 was added 177 checkRow(row2, 0, htable3); 178 179 // row3 will get replicated, because it was in the 180 // latest log 181 checkRow(row3, 1, htable3); 182 183 Put p = new Put(row); 184 p.addColumn(famName, row, row); 185 htable1.put(p); 186 // now roll the logs again 187 rollWALAndWait(utility1, htable1.getName(), row); 188 189 // cleanup "row2", also conveniently use this to wait replication 190 // to finish 191 deleteAndWait(row2, htable1, htable2, htable3); 192 // Even if the log was rolled in the middle of the replication 193 // "row" is still replication. 194 checkRow(row, 1, htable2); 195 // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, 196 // we should wait before checking. 197 checkWithWait(row, 1, htable3); 198 199 // cleanup the rest 200 deleteAndWait(row, htable1, htable2, htable3); 201 deleteAndWait(row3, htable1, htable2, htable3); 202 203 utility3.shutdownMiniCluster(); 204 utility2.shutdownMiniCluster(); 205 utility1.shutdownMiniCluster(); 206 } 207 208 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, 209 final byte[] row) throws IOException { 210 final Admin admin = utility.getAdmin(); 211 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); 212 213 // find the region that corresponds to the given row. 214 HRegion region = null; 215 for (HRegion candidate : cluster.getRegions(table)) { 216 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 217 region = candidate; 218 break; 219 } 220 } 221 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 222 223 final CountDownLatch latch = new CountDownLatch(1); 224 225 // listen for successful log rolls 226 final WALActionsListener listener = new WALActionsListener() { 227 @Override 228 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 229 latch.countDown(); 230 } 231 }; 232 region.getWAL().registerWALActionsListener(listener); 233 234 // request a roll 235 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 236 region.getRegionInfo().getRegionName())); 237 238 // wait 239 try { 240 latch.await(); 241 } catch (InterruptedException exception) { 242 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " 243 + "replication tests fail, it's probably because we should still be waiting."); 244 Thread.currentThread().interrupt(); 245 } 246 region.getWAL().unregisterWALActionsListener(listener); 247 } 248 249 private void checkWithWait(byte[] row, int count, Table table) throws Exception { 250 Get get = new Get(row); 251 for (int i = 0; i < NB_RETRIES; i++) { 252 if (i == NB_RETRIES - 1) { 253 fail("Waited too much time while getting the row."); 254 } 255 boolean rowReplicated = false; 256 Result res = table.get(get); 257 if (res.size() >= 1) { 258 LOG.info("Row is replicated"); 259 rowReplicated = true; 260 assertEquals("Table '" + table + "' did not have the expected number of results.", count, 261 res.size()); 262 break; 263 } 264 if (rowReplicated) { 265 break; 266 } else { 267 Thread.sleep(SLEEP_TIME); 268 } 269 } 270 } 271 272 private void checkRow(byte[] row, int count, Table... tables) throws IOException { 273 Get get = new Get(row); 274 for (Table table : tables) { 275 Result res = table.get(get); 276 assertEquals("Table '" + table + "' did not have the expected number of results.", count, 277 res.size()); 278 } 279 } 280 281 private void deleteAndWait(byte[] row, Table source, Table... targets) throws Exception { 282 Delete del = new Delete(row); 283 source.delete(del); 284 285 Get get = new Get(row); 286 for (int i = 0; i < NB_RETRIES; i++) { 287 if (i == NB_RETRIES - 1) { 288 fail("Waited too much time for del replication"); 289 } 290 boolean removedFromAll = true; 291 for (Table target : targets) { 292 Result res = target.get(get); 293 if (res.size() >= 1) { 294 LOG.info("Row not deleted"); 295 removedFromAll = false; 296 break; 297 } 298 } 299 if (removedFromAll) { 300 break; 301 } else { 302 Thread.sleep(SLEEP_TIME); 303 } 304 } 305 } 306 307 private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) throws Exception { 308 Put put = new Put(row); 309 put.addColumn(fam, row, row); 310 source.put(put); 311 312 Get get = new Get(row); 313 for (int i = 0; i < NB_RETRIES; i++) { 314 if (i == NB_RETRIES - 1) { 315 fail("Waited too much time for put replication"); 316 } 317 boolean replicatedToAll = true; 318 for (Table target : targets) { 319 Result res = target.get(get); 320 if (res.isEmpty()) { 321 LOG.info("Row not available"); 322 replicatedToAll = false; 323 break; 324 } else { 325 assertArrayEquals(res.value(), row); 326 } 327 } 328 if (replicatedToAll) { 329 break; 330 } else { 331 Thread.sleep(SLEEP_TIME); 332 } 333 } 334 } 335 336}