001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.HColumnDescriptor; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.HTableDescriptor; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Delete; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.util.ToolRunner; 042import org.junit.After; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Category({ ReplicationTests.class, LargeTests.class }) 051public class TestReplicationSyncUpTool extends TestReplicationBase { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 058 059 private static final TableName t1_su = TableName.valueOf("t1_syncup"); 060 private static final TableName t2_su = TableName.valueOf("t2_syncup"); 061 062 protected static final byte[] famName = Bytes.toBytes("cf1"); 063 private static final byte[] qualName = Bytes.toBytes("q1"); 064 065 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 066 067 private HTableDescriptor t1_syncupSource, t1_syncupTarget; 068 private HTableDescriptor t2_syncupSource, t2_syncupTarget; 069 070 protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; 071 072 @Before 073 public void setUp() throws Exception { 074 HColumnDescriptor fam; 075 076 t1_syncupSource = new HTableDescriptor(t1_su); 077 fam = new HColumnDescriptor(famName); 078 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 079 t1_syncupSource.addFamily(fam); 080 fam = new HColumnDescriptor(noRepfamName); 081 t1_syncupSource.addFamily(fam); 082 083 t1_syncupTarget = new HTableDescriptor(t1_su); 084 fam = new HColumnDescriptor(famName); 085 t1_syncupTarget.addFamily(fam); 086 fam = new HColumnDescriptor(noRepfamName); 087 t1_syncupTarget.addFamily(fam); 088 089 t2_syncupSource = new HTableDescriptor(t2_su); 090 fam = new HColumnDescriptor(famName); 091 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 092 t2_syncupSource.addFamily(fam); 093 fam = new HColumnDescriptor(noRepfamName); 094 t2_syncupSource.addFamily(fam); 095 096 t2_syncupTarget = new HTableDescriptor(t2_su); 097 fam = new HColumnDescriptor(famName); 098 t2_syncupTarget.addFamily(fam); 099 fam = new HColumnDescriptor(noRepfamName); 100 t2_syncupTarget.addFamily(fam); 101 } 102 103 @After 104 public void tearDownBase() throws Exception { 105 // Do nothing, just replace the super tearDown. because the super tearDown will use the 106 // out-of-data HBase admin to remove replication peer, which will be result in failure. 107 } 108 109 /** 110 * Add a row to a table in each cluster, check it's replicated, delete it, 111 * check's gone Also check the puts and deletes are not replicated back to 112 * the originating cluster. 113 */ 114 @Test 115 public void testSyncUpTool() throws Exception { 116 117 /** 118 * Set up Replication: on Master and one Slave 119 * Table: t1_syncup and t2_syncup 120 * columnfamily: 121 * 'cf1' : replicated 122 * 'norep': not replicated 123 */ 124 setupReplication(); 125 126 /** 127 * at Master: 128 * t1_syncup: put 100 rows into cf1, and 1 rows into norep 129 * t2_syncup: put 200 rows into cf1, and 1 rows into norep 130 * 131 * verify correctly replicated to slave 132 */ 133 putAndReplicateRows(); 134 135 /** 136 * Verify delete works 137 * 138 * step 1: stop hbase on Slave 139 * 140 * step 2: at Master: 141 * t1_syncup: delete 50 rows from cf1 142 * t2_syncup: delete 100 rows from cf1 143 * no change on 'norep' 144 * 145 * step 3: stop hbase on master, restart hbase on Slave 146 * 147 * step 4: verify Slave still have the rows before delete 148 * t1_syncup: 100 rows from cf1 149 * t2_syncup: 200 rows from cf1 150 * 151 * step 5: run syncup tool on Master 152 * 153 * step 6: verify that delete show up on Slave 154 * t1_syncup: 50 rows from cf1 155 * t2_syncup: 100 rows from cf1 156 * 157 * verify correctly replicated to Slave 158 */ 159 mimicSyncUpAfterDelete(); 160 161 /** 162 * Verify put works 163 * 164 * step 1: stop hbase on Slave 165 * 166 * step 2: at Master: 167 * t1_syncup: put 100 rows from cf1 168 * t2_syncup: put 200 rows from cf1 169 * and put another row on 'norep' 170 * ATTN: put to 'cf1' will overwrite existing rows, so end count will 171 * be 100 and 200 respectively 172 * put to 'norep' will add a new row. 173 * 174 * step 3: stop hbase on master, restart hbase on Slave 175 * 176 * step 4: verify Slave still has the rows before put 177 * t1_syncup: 50 rows from cf1 178 * t2_syncup: 100 rows from cf1 179 * 180 * step 5: run syncup tool on Master 181 * 182 * step 6: verify that put show up on Slave 183 * and 'norep' does not 184 * t1_syncup: 100 rows from cf1 185 * t2_syncup: 200 rows from cf1 186 * 187 * verify correctly replicated to Slave 188 */ 189 mimicSyncUpAfterPut(); 190 } 191 192 protected void setupReplication() throws Exception { 193 ReplicationAdmin admin1 = new ReplicationAdmin(conf1); 194 ReplicationAdmin admin2 = new ReplicationAdmin(conf2); 195 196 Admin ha = utility1.getAdmin(); 197 ha.createTable(t1_syncupSource); 198 ha.createTable(t2_syncupSource); 199 ha.close(); 200 201 ha = utility2.getAdmin(); 202 ha.createTable(t1_syncupTarget); 203 ha.createTable(t2_syncupTarget); 204 ha.close(); 205 206 Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration()); 207 Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration()); 208 209 // Get HTable from Master 210 ht1Source = connection1.getTable(t1_su); 211 ht2Source = connection1.getTable(t2_su); 212 213 // Get HTable from Peer1 214 ht1TargetAtPeer1 = connection2.getTable(t1_su); 215 ht2TargetAtPeer1 = connection2.getTable(t2_su); 216 217 /** 218 * set M-S : Master: utility1 Slave1: utility2 219 */ 220 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 221 rpc.setClusterKey(utility2.getClusterKey()); 222 admin1.addPeer("1", rpc, null); 223 224 admin1.close(); 225 admin2.close(); 226 } 227 228 private void putAndReplicateRows() throws Exception { 229 LOG.debug("putAndReplicateRows"); 230 // add rows to Master cluster, 231 Put p; 232 233 // 100 + 1 row to t1_syncup 234 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 235 p = new Put(Bytes.toBytes("row" + i)); 236 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 237 ht1Source.put(p); 238 } 239 p = new Put(Bytes.toBytes("row" + 9999)); 240 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); 241 ht1Source.put(p); 242 243 // 200 + 1 row to t2_syncup 244 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 245 p = new Put(Bytes.toBytes("row" + i)); 246 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 247 ht2Source.put(p); 248 } 249 p = new Put(Bytes.toBytes("row" + 9999)); 250 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); 251 ht2Source.put(p); 252 253 // ensure replication completed 254 Thread.sleep(SLEEP_TIME); 255 int rowCount_ht1Source = utility1.countRows(ht1Source); 256 for (int i = 0; i < NB_RETRIES; i++) { 257 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 258 if (i==NB_RETRIES-1) { 259 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1, 260 rowCount_ht1TargetAtPeer1); 261 } 262 if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) { 263 break; 264 } 265 Thread.sleep(SLEEP_TIME); 266 } 267 268 int rowCount_ht2Source = utility1.countRows(ht2Source); 269 for (int i = 0; i < NB_RETRIES; i++) { 270 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 271 if (i==NB_RETRIES-1) { 272 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1, 273 rowCount_ht2TargetAtPeer1); 274 } 275 if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) { 276 break; 277 } 278 Thread.sleep(SLEEP_TIME); 279 } 280 } 281 282 private void mimicSyncUpAfterDelete() throws Exception { 283 LOG.debug("mimicSyncUpAfterDelete"); 284 utility2.shutdownMiniHBaseCluster(); 285 286 List<Delete> list = new ArrayList<>(); 287 // delete half of the rows 288 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 289 String rowKey = "row" + i; 290 Delete del = new Delete(Bytes.toBytes(rowKey)); 291 list.add(del); 292 } 293 ht1Source.delete(list); 294 295 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 296 String rowKey = "row" + i; 297 Delete del = new Delete(Bytes.toBytes(rowKey)); 298 list.add(del); 299 } 300 ht2Source.delete(list); 301 302 int rowCount_ht1Source = utility1.countRows(ht1Source); 303 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, 304 rowCount_ht1Source); 305 306 int rowCount_ht2Source = utility1.countRows(ht2Source); 307 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 308 101, rowCount_ht2Source); 309 310 utility1.shutdownMiniHBaseCluster(); 311 utility2.restartHBaseCluster(1); 312 313 Thread.sleep(SLEEP_TIME); 314 315 // before sync up 316 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 317 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 318 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); 319 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); 320 321 // After sync up 322 for (int i = 0; i < NB_RETRIES; i++) { 323 syncUp(utility1); 324 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 325 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 326 if (i == NB_RETRIES - 1) { 327 if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) { 328 // syncUP still failed. Let's look at the source in case anything wrong there 329 utility1.restartHBaseCluster(1); 330 rowCount_ht1Source = utility1.countRows(ht1Source); 331 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); 332 rowCount_ht2Source = utility1.countRows(ht2Source); 333 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); 334 } 335 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, 336 rowCount_ht1TargetAtPeer1); 337 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, 338 rowCount_ht2TargetAtPeer1); 339 } 340 if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) { 341 LOG.info("SyncUpAfterDelete succeeded at retry = " + i); 342 break; 343 } else { 344 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 345 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 346 + rowCount_ht2TargetAtPeer1); 347 } 348 Thread.sleep(SLEEP_TIME); 349 } 350 } 351 352 private void mimicSyncUpAfterPut() throws Exception { 353 LOG.debug("mimicSyncUpAfterPut"); 354 utility1.restartHBaseCluster(1); 355 utility2.shutdownMiniHBaseCluster(); 356 357 Put p; 358 // another 100 + 1 row to t1_syncup 359 // we should see 100 + 2 rows now 360 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 361 p = new Put(Bytes.toBytes("row" + i)); 362 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 363 ht1Source.put(p); 364 } 365 p = new Put(Bytes.toBytes("row" + 9998)); 366 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); 367 ht1Source.put(p); 368 369 // another 200 + 1 row to t1_syncup 370 // we should see 200 + 2 rows now 371 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 372 p = new Put(Bytes.toBytes("row" + i)); 373 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 374 ht2Source.put(p); 375 } 376 p = new Put(Bytes.toBytes("row" + 9998)); 377 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); 378 ht2Source.put(p); 379 380 int rowCount_ht1Source = utility1.countRows(ht1Source); 381 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); 382 int rowCount_ht2Source = utility1.countRows(ht2Source); 383 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); 384 385 utility1.shutdownMiniHBaseCluster(); 386 utility2.restartHBaseCluster(1); 387 388 Thread.sleep(SLEEP_TIME); 389 390 // before sync up 391 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 392 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 393 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, 394 rowCount_ht1TargetAtPeer1); 395 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, 396 rowCount_ht2TargetAtPeer1); 397 398 // after syun up 399 for (int i = 0; i < NB_RETRIES; i++) { 400 syncUp(utility1); 401 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 402 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 403 if (i == NB_RETRIES - 1) { 404 if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) { 405 // syncUP still failed. Let's look at the source in case anything wrong there 406 utility1.restartHBaseCluster(1); 407 rowCount_ht1Source = utility1.countRows(ht1Source); 408 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); 409 rowCount_ht2Source = utility1.countRows(ht2Source); 410 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); 411 } 412 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, 413 rowCount_ht1TargetAtPeer1); 414 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, 415 rowCount_ht2TargetAtPeer1); 416 } 417 if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) { 418 LOG.info("SyncUpAfterPut succeeded at retry = " + i); 419 break; 420 } else { 421 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 422 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 423 + rowCount_ht2TargetAtPeer1); 424 } 425 Thread.sleep(SLEEP_TIME); 426 } 427 } 428 429 protected void syncUp(HBaseTestingUtility ut) throws Exception { 430 ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]); 431 } 432}