001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.ReplicationTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 052import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category({ReplicationTests.class, LargeTests.class}) 061public class TestMultiSlaveReplication { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestMultiSlaveReplication.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class); 068 069 private static Configuration conf1; 070 private static Configuration conf2; 071 private static Configuration conf3; 072 073 private static HBaseTestingUtility utility1; 074 private static HBaseTestingUtility utility2; 075 private static HBaseTestingUtility utility3; 076 private static final long SLEEP_TIME = 500; 077 private static final int NB_RETRIES = 100; 078 079 private static final TableName tableName = TableName.valueOf("test"); 080 private static final byte[] famName = Bytes.toBytes("f"); 081 private static final byte[] row = Bytes.toBytes("row"); 082 private static final byte[] row1 = Bytes.toBytes("row1"); 083 private static final byte[] row2 = Bytes.toBytes("row2"); 084 private static final byte[] row3 = Bytes.toBytes("row3"); 085 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 086 087 private static HTableDescriptor table; 088 089 @BeforeClass 090 public static void setUpBeforeClass() throws Exception { 091 conf1 = HBaseConfiguration.create(); 092 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 093 // smaller block size and capacity to trigger more operations 094 // and test them 095 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); 096 conf1.setInt("replication.source.size.capacity", 1024); 097 conf1.setLong("replication.source.sleepforretries", 100); 098 conf1.setInt("hbase.regionserver.maxlogs", 10); 099 conf1.setLong("hbase.master.logcleaner.ttl", 10); 100 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 101 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 102 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 103 conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); 104 105 utility1 = new HBaseTestingUtility(conf1); 106 utility1.startMiniZKCluster(); 107 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 108 utility1.setZkCluster(miniZK); 109 new ZKWatcher(conf1, "cluster1", null, true); 110 111 conf2 = new Configuration(conf1); 112 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 113 114 conf3 = new Configuration(conf1); 115 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 116 117 utility2 = new HBaseTestingUtility(conf2); 118 utility2.setZkCluster(miniZK); 119 new ZKWatcher(conf2, "cluster2", null, true); 120 121 utility3 = new HBaseTestingUtility(conf3); 122 utility3.setZkCluster(miniZK); 123 new ZKWatcher(conf3, "cluster3", null, true); 124 125 table = new HTableDescriptor(tableName); 126 HColumnDescriptor fam = new HColumnDescriptor(famName); 127 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 128 table.addFamily(fam); 129 fam = new HColumnDescriptor(noRepfamName); 130 table.addFamily(fam); 131 } 132 133 @Test 134 public void testMultiSlaveReplication() throws Exception { 135 LOG.info("testCyclicReplication"); 136 MiniHBaseCluster master = utility1.startMiniCluster(); 137 utility2.startMiniCluster(); 138 utility3.startMiniCluster(); 139 ReplicationAdmin admin1 = new ReplicationAdmin(conf1); 140 141 utility1.getAdmin().createTable(table); 142 utility2.getAdmin().createTable(table); 143 utility3.getAdmin().createTable(table); 144 Table htable1 = utility1.getConnection().getTable(tableName); 145 Table htable2 = utility2.getConnection().getTable(tableName); 146 Table htable3 = utility3.getConnection().getTable(tableName); 147 148 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 149 rpc.setClusterKey(utility2.getClusterKey()); 150 admin1.addPeer("1", rpc, null); 151 152 // put "row" and wait 'til it got around, then delete 153 putAndWait(row, famName, htable1, htable2); 154 deleteAndWait(row, htable1, htable2); 155 // check it wasn't replication to cluster 3 156 checkRow(row,0,htable3); 157 158 putAndWait(row2, famName, htable1, htable2); 159 160 // now roll the region server's logs 161 rollWALAndWait(utility1, htable1.getName(), row2); 162 163 // after the log was rolled put a new row 164 putAndWait(row3, famName, htable1, htable2); 165 166 rpc = new ReplicationPeerConfig(); 167 rpc.setClusterKey(utility3.getClusterKey()); 168 admin1.addPeer("2", rpc, null); 169 170 // put a row, check it was replicated to all clusters 171 putAndWait(row1, famName, htable1, htable2, htable3); 172 // delete and verify 173 deleteAndWait(row1, htable1, htable2, htable3); 174 175 // make sure row2 did not get replicated after 176 // cluster 3 was added 177 checkRow(row2,0,htable3); 178 179 // row3 will get replicated, because it was in the 180 // latest log 181 checkRow(row3,1,htable3); 182 183 Put p = new Put(row); 184 p.addColumn(famName, row, row); 185 htable1.put(p); 186 // now roll the logs again 187 rollWALAndWait(utility1, htable1.getName(), row); 188 189 // cleanup "row2", also conveniently use this to wait replication 190 // to finish 191 deleteAndWait(row2, htable1, htable2, htable3); 192 // Even if the log was rolled in the middle of the replication 193 // "row" is still replication. 194 checkRow(row, 1, htable2); 195 // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, 196 // we should wait before checking. 197 checkWithWait(row, 1, htable3); 198 199 // cleanup the rest 200 deleteAndWait(row, htable1, htable2, htable3); 201 deleteAndWait(row3, htable1, htable2, htable3); 202 203 utility3.shutdownMiniCluster(); 204 utility2.shutdownMiniCluster(); 205 utility1.shutdownMiniCluster(); 206 } 207 208 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, 209 final byte[] row) throws IOException { 210 final Admin admin = utility.getAdmin(); 211 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); 212 213 // find the region that corresponds to the given row. 214 HRegion region = null; 215 for (HRegion candidate : cluster.getRegions(table)) { 216 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 217 region = candidate; 218 break; 219 } 220 } 221 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 222 223 final CountDownLatch latch = new CountDownLatch(1); 224 225 // listen for successful log rolls 226 final WALActionsListener listener = new WALActionsListener() { 227 @Override 228 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 229 latch.countDown(); 230 } 231 }; 232 region.getWAL().registerWALActionsListener(listener); 233 234 // request a roll 235 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 236 region.getRegionInfo().getRegionName())); 237 238 // wait 239 try { 240 latch.await(); 241 } catch (InterruptedException exception) { 242 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + 243 "replication tests fail, it's probably because we should still be waiting."); 244 Thread.currentThread().interrupt(); 245 } 246 region.getWAL().unregisterWALActionsListener(listener); 247 } 248 249 250 private void checkWithWait(byte[] row, int count, Table table) throws Exception { 251 Get get = new Get(row); 252 for (int i = 0; i < NB_RETRIES; i++) { 253 if (i == NB_RETRIES - 1) { 254 fail("Waited too much time while getting the row."); 255 } 256 boolean rowReplicated = false; 257 Result res = table.get(get); 258 if (res.size() >= 1) { 259 LOG.info("Row is replicated"); 260 rowReplicated = true; 261 assertEquals("Table '" + table + "' did not have the expected number of results.", 262 count, res.size()); 263 break; 264 } 265 if (rowReplicated) { 266 break; 267 } else { 268 Thread.sleep(SLEEP_TIME); 269 } 270 } 271 } 272 273 private void checkRow(byte[] row, int count, Table... tables) throws IOException { 274 Get get = new Get(row); 275 for (Table table : tables) { 276 Result res = table.get(get); 277 assertEquals("Table '" + table + "' did not have the expected number of results.", 278 count, res.size()); 279 } 280 } 281 282 private void deleteAndWait(byte[] row, Table source, Table... targets) 283 throws Exception { 284 Delete del = new Delete(row); 285 source.delete(del); 286 287 Get get = new Get(row); 288 for (int i = 0; i < NB_RETRIES; i++) { 289 if (i==NB_RETRIES-1) { 290 fail("Waited too much time for del replication"); 291 } 292 boolean removedFromAll = true; 293 for (Table target : targets) { 294 Result res = target.get(get); 295 if (res.size() >= 1) { 296 LOG.info("Row not deleted"); 297 removedFromAll = false; 298 break; 299 } 300 } 301 if (removedFromAll) { 302 break; 303 } else { 304 Thread.sleep(SLEEP_TIME); 305 } 306 } 307 } 308 309 private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) 310 throws Exception { 311 Put put = new Put(row); 312 put.addColumn(fam, row, row); 313 source.put(put); 314 315 Get get = new Get(row); 316 for (int i = 0; i < NB_RETRIES; i++) { 317 if (i==NB_RETRIES-1) { 318 fail("Waited too much time for put replication"); 319 } 320 boolean replicatedToAll = true; 321 for (Table target : targets) { 322 Result res = target.get(get); 323 if (res.isEmpty()) { 324 LOG.info("Row not available"); 325 replicatedToAll = false; 326 break; 327 } else { 328 assertArrayEquals(res.value(), row); 329 } 330 } 331 if (replicatedToAll) { 332 break; 333 } else { 334 Thread.sleep(SLEEP_TIME); 335 } 336 } 337 } 338 339} 340