001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.ReplicationTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Tag(ReplicationTests.TAG) 060@Tag(LargeTests.TAG) 061public class TestMultiSlaveReplication { 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class); 064 065 private static Configuration conf1; 066 private static Configuration conf2; 067 private static Configuration conf3; 068 069 private static HBaseTestingUtil utility1; 070 private static HBaseTestingUtil utility2; 071 private static HBaseTestingUtil utility3; 072 private static final long SLEEP_TIME = 500; 073 private static final int NB_RETRIES = 100; 074 075 private static final TableName tableName = TableName.valueOf("test"); 076 private static final byte[] famName = Bytes.toBytes("f"); 077 private static final byte[] row = Bytes.toBytes("row"); 078 private static final byte[] row1 = Bytes.toBytes("row1"); 079 private static final byte[] row2 = Bytes.toBytes("row2"); 080 private static final byte[] row3 = Bytes.toBytes("row3"); 081 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 082 083 private static TableDescriptor table; 084 085 @BeforeAll 086 public static void setUpBeforeClass() throws Exception { 087 conf1 = HBaseConfiguration.create(); 088 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 089 // smaller block size and capacity to trigger more operations 090 // and test them 091 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 092 conf1.setInt("replication.source.size.capacity", 1024); 093 conf1.setLong("replication.source.sleepforretries", 100); 094 conf1.setInt("hbase.regionserver.maxlogs", 10); 095 conf1.setLong("hbase.master.logcleaner.ttl", 10); 096 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 097 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 098 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 099 conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); 100 101 utility1 = new HBaseTestingUtil(conf1); 102 utility1.startMiniZKCluster(); 103 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 104 utility1.setZkCluster(miniZK); 105 106 conf2 = new Configuration(conf1); 107 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 108 109 conf3 = new Configuration(conf1); 110 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 111 112 utility2 = new HBaseTestingUtil(conf2); 113 utility2.setZkCluster(miniZK); 114 115 utility3 = new HBaseTestingUtil(conf3); 116 utility3.setZkCluster(miniZK); 117 118 table = TableDescriptorBuilder.newBuilder(tableName) 119 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 120 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 121 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 122 } 123 124 @Test 125 public void testMultiSlaveReplication() throws Exception { 126 LOG.info("testCyclicReplication"); 127 utility1.startMiniCluster(); 128 utility2.startMiniCluster(); 129 utility3.startMiniCluster(); 130 try (Connection conn = ConnectionFactory.createConnection(conf1); 131 Admin admin1 = conn.getAdmin()) { 132 utility1.getAdmin().createTable(table); 133 utility2.getAdmin().createTable(table); 134 utility3.getAdmin().createTable(table); 135 Table htable1 = utility1.getConnection().getTable(tableName); 136 Table htable2 = utility2.getConnection().getTable(tableName); 137 Table htable3 = utility3.getConnection().getTable(tableName); 138 139 ReplicationPeerConfigBuilder rpcBuilder = 140 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()); 141 admin1.addReplicationPeer("1", rpcBuilder.build()); 142 143 // put "row" and wait 'til it got around, then delete 144 putAndWait(row, famName, htable1, htable2); 145 deleteAndWait(row, htable1, htable2); 146 // check it wasn't replication to cluster 3 147 checkRow(row, 0, htable3); 148 149 putAndWait(row2, famName, htable1, htable2); 150 151 // now roll the region server's logs 152 rollWALAndWait(utility1, htable1.getName(), row2); 153 154 // after the log was rolled put a new row 155 putAndWait(row3, famName, htable1, htable2); 156 157 rpcBuilder.setClusterKey(utility3.getRpcConnnectionURI()); 158 admin1.addReplicationPeer("2", rpcBuilder.build()); 159 160 // put a row, check it was replicated to all clusters 161 putAndWait(row1, famName, htable1, htable2, htable3); 162 // delete and verify 163 deleteAndWait(row1, htable1, htable2, htable3); 164 165 // make sure row2 did not get replicated after 166 // cluster 3 was added 167 checkRow(row2, 0, htable3); 168 169 // row3 will get replicated, because it was in the 170 // latest log 171 checkRow(row3, 1, htable3); 172 173 Put p = new Put(row); 174 p.addColumn(famName, row, row); 175 htable1.put(p); 176 // now roll the logs again 177 rollWALAndWait(utility1, htable1.getName(), row); 178 179 // cleanup "row2", also conveniently use this to wait replication 180 // to finish 181 deleteAndWait(row2, htable1, htable2, htable3); 182 // Even if the log was rolled in the middle of the replication 183 // "row" is still replication. 184 checkRow(row, 1, htable2); 185 // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, 186 // we should wait before checking. 187 checkWithWait(row, 1, htable3); 188 189 // cleanup the rest 190 deleteAndWait(row, htable1, htable2, htable3); 191 deleteAndWait(row3, htable1, htable2, htable3); 192 193 utility3.shutdownMiniCluster(); 194 utility2.shutdownMiniCluster(); 195 utility1.shutdownMiniCluster(); 196 } 197 } 198 199 private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table, 200 final byte[] row) throws IOException { 201 final Admin admin = utility.getAdmin(); 202 final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster(); 203 204 // find the region that corresponds to the given row. 205 HRegion region = null; 206 for (HRegion candidate : cluster.getRegions(table)) { 207 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 208 region = candidate; 209 break; 210 } 211 } 212 assertNotNull(region, "Couldn't find the region for row '" + Arrays.toString(row) + "'"); 213 214 final CountDownLatch latch = new CountDownLatch(1); 215 216 // listen for successful log rolls 217 final WALActionsListener listener = new WALActionsListener() { 218 @Override 219 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 220 latch.countDown(); 221 } 222 }; 223 region.getWAL().registerWALActionsListener(listener); 224 225 // request a roll 226 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 227 region.getRegionInfo().getRegionName())); 228 229 // wait 230 try { 231 latch.await(); 232 } catch (InterruptedException exception) { 233 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " 234 + "replication tests fail, it's probably because we should still be waiting."); 235 Thread.currentThread().interrupt(); 236 } 237 region.getWAL().unregisterWALActionsListener(listener); 238 } 239 240 private void checkWithWait(byte[] row, int count, Table table) throws Exception { 241 Get get = new Get(row); 242 for (int i = 0; i < NB_RETRIES; i++) { 243 if (i == NB_RETRIES - 1) { 244 fail("Waited too much time while getting the row."); 245 } 246 boolean rowReplicated = false; 247 Result res = table.get(get); 248 if (res.size() >= 1) { 249 LOG.info("Row is replicated"); 250 rowReplicated = true; 251 assertEquals(count, res.size(), 252 "Table '" + table + "' did not have the expected number of results."); 253 break; 254 } 255 if (rowReplicated) { 256 break; 257 } else { 258 Thread.sleep(SLEEP_TIME); 259 } 260 } 261 } 262 263 private void checkRow(byte[] row, int count, Table... tables) throws IOException { 264 Get get = new Get(row); 265 for (Table table : tables) { 266 Result res = table.get(get); 267 assertEquals(count, res.size(), 268 "Table '" + table + "' did not have the expected number of results."); 269 } 270 } 271 272 private void deleteAndWait(byte[] row, Table source, Table... targets) throws Exception { 273 Delete del = new Delete(row); 274 source.delete(del); 275 276 Get get = new Get(row); 277 for (int i = 0; i < NB_RETRIES; i++) { 278 if (i == NB_RETRIES - 1) { 279 fail("Waited too much time for del replication"); 280 } 281 boolean removedFromAll = true; 282 for (Table target : targets) { 283 Result res = target.get(get); 284 if (res.size() >= 1) { 285 LOG.info("Row not deleted"); 286 removedFromAll = false; 287 break; 288 } 289 } 290 if (removedFromAll) { 291 break; 292 } else { 293 Thread.sleep(SLEEP_TIME); 294 } 295 } 296 } 297 298 private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) throws Exception { 299 Put put = new Put(row); 300 put.addColumn(fam, row, row); 301 source.put(put); 302 303 Get get = new Get(row); 304 for (int i = 0; i < NB_RETRIES; i++) { 305 if (i == NB_RETRIES - 1) { 306 fail("Waited too much time for put replication"); 307 } 308 boolean replicatedToAll = true; 309 for (Table target : targets) { 310 Result res = target.get(get); 311 if (res.isEmpty()) { 312 LOG.info("Row not available"); 313 replicatedToAll = false; 314 break; 315 } else { 316 assertArrayEquals(res.value(), row); 317 } 318 } 319 if (replicatedToAll) { 320 break; 321 } else { 322 Thread.sleep(SLEEP_TIME); 323 } 324 } 325 } 326 327}