001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.HColumnDescriptor; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.HTableDescriptor; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Delete; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.util.ToolRunner; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ReplicationTests.class, LargeTests.class}) 050public class TestReplicationSyncUpTool extends TestReplicationBase { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); 055 056 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 057 058 private static final TableName t1_su = TableName.valueOf("t1_syncup"); 059 private static final TableName t2_su = TableName.valueOf("t2_syncup"); 060 061 protected static final byte[] famName = Bytes.toBytes("cf1"); 062 private static final byte[] qualName = Bytes.toBytes("q1"); 063 064 protected static final byte[] noRepfamName = Bytes.toBytes("norep"); 065 066 private HTableDescriptor t1_syncupSource, t1_syncupTarget; 067 private HTableDescriptor t2_syncupSource, t2_syncupTarget; 068 069 protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; 070 071 @Before 072 public void setUp() throws Exception { 073 074 HColumnDescriptor fam; 075 076 t1_syncupSource = new HTableDescriptor(t1_su); 077 fam = new HColumnDescriptor(famName); 078 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 079 t1_syncupSource.addFamily(fam); 080 fam = new HColumnDescriptor(noRepfamName); 081 t1_syncupSource.addFamily(fam); 082 083 t1_syncupTarget = new HTableDescriptor(t1_su); 084 fam = new HColumnDescriptor(famName); 085 t1_syncupTarget.addFamily(fam); 086 fam = new HColumnDescriptor(noRepfamName); 087 t1_syncupTarget.addFamily(fam); 088 089 t2_syncupSource = new HTableDescriptor(t2_su); 090 fam = new HColumnDescriptor(famName); 091 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 092 t2_syncupSource.addFamily(fam); 093 fam = new HColumnDescriptor(noRepfamName); 094 t2_syncupSource.addFamily(fam); 095 096 t2_syncupTarget = new HTableDescriptor(t2_su); 097 fam = new HColumnDescriptor(famName); 098 t2_syncupTarget.addFamily(fam); 099 fam = new HColumnDescriptor(noRepfamName); 100 t2_syncupTarget.addFamily(fam); 101 102 } 103 104 /** 105 * Add a row to a table in each cluster, check it's replicated, delete it, 106 * check's gone Also check the puts and deletes are not replicated back to 107 * the originating cluster. 108 */ 109 @Test 110 public void testSyncUpTool() throws Exception { 111 112 /** 113 * Set up Replication: on Master and one Slave 114 * Table: t1_syncup and t2_syncup 115 * columnfamily: 116 * 'cf1' : replicated 117 * 'norep': not replicated 118 */ 119 setupReplication(); 120 121 /** 122 * at Master: 123 * t1_syncup: put 100 rows into cf1, and 1 rows into norep 124 * t2_syncup: put 200 rows into cf1, and 1 rows into norep 125 * 126 * verify correctly replicated to slave 127 */ 128 putAndReplicateRows(); 129 130 /** 131 * Verify delete works 132 * 133 * step 1: stop hbase on Slave 134 * 135 * step 2: at Master: 136 * t1_syncup: delete 50 rows from cf1 137 * t2_syncup: delete 100 rows from cf1 138 * no change on 'norep' 139 * 140 * step 3: stop hbase on master, restart hbase on Slave 141 * 142 * step 4: verify Slave still have the rows before delete 143 * t1_syncup: 100 rows from cf1 144 * t2_syncup: 200 rows from cf1 145 * 146 * step 5: run syncup tool on Master 147 * 148 * step 6: verify that delete show up on Slave 149 * t1_syncup: 50 rows from cf1 150 * t2_syncup: 100 rows from cf1 151 * 152 * verify correctly replicated to Slave 153 */ 154 mimicSyncUpAfterDelete(); 155 156 /** 157 * Verify put works 158 * 159 * step 1: stop hbase on Slave 160 * 161 * step 2: at Master: 162 * t1_syncup: put 100 rows from cf1 163 * t2_syncup: put 200 rows from cf1 164 * and put another row on 'norep' 165 * ATTN: put to 'cf1' will overwrite existing rows, so end count will 166 * be 100 and 200 respectively 167 * put to 'norep' will add a new row. 168 * 169 * step 3: stop hbase on master, restart hbase on Slave 170 * 171 * step 4: verify Slave still has the rows before put 172 * t1_syncup: 50 rows from cf1 173 * t2_syncup: 100 rows from cf1 174 * 175 * step 5: run syncup tool on Master 176 * 177 * step 6: verify that put show up on Slave 178 * and 'norep' does not 179 * t1_syncup: 100 rows from cf1 180 * t2_syncup: 200 rows from cf1 181 * 182 * verify correctly replicated to Slave 183 */ 184 mimicSyncUpAfterPut(); 185 186 } 187 188 protected void setupReplication() throws Exception { 189 ReplicationAdmin admin1 = new ReplicationAdmin(conf1); 190 ReplicationAdmin admin2 = new ReplicationAdmin(conf2); 191 192 Admin ha = utility1.getAdmin(); 193 ha.createTable(t1_syncupSource); 194 ha.createTable(t2_syncupSource); 195 ha.close(); 196 197 ha = utility2.getAdmin(); 198 ha.createTable(t1_syncupTarget); 199 ha.createTable(t2_syncupTarget); 200 ha.close(); 201 202 Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration()); 203 Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration()); 204 205 // Get HTable from Master 206 ht1Source = connection1.getTable(t1_su); 207 ht2Source = connection1.getTable(t2_su); 208 209 // Get HTable from Peer1 210 ht1TargetAtPeer1 = connection2.getTable(t1_su); 211 ht2TargetAtPeer1 = connection2.getTable(t2_su); 212 213 /** 214 * set M-S : Master: utility1 Slave1: utility2 215 */ 216 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 217 rpc.setClusterKey(utility2.getClusterKey()); 218 admin1.addPeer("1", rpc, null); 219 220 admin1.close(); 221 admin2.close(); 222 } 223 224 private void putAndReplicateRows() throws Exception { 225 LOG.debug("putAndReplicateRows"); 226 // add rows to Master cluster, 227 Put p; 228 229 // 100 + 1 row to t1_syncup 230 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 231 p = new Put(Bytes.toBytes("row" + i)); 232 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 233 ht1Source.put(p); 234 } 235 p = new Put(Bytes.toBytes("row" + 9999)); 236 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); 237 ht1Source.put(p); 238 239 // 200 + 1 row to t2_syncup 240 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 241 p = new Put(Bytes.toBytes("row" + i)); 242 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 243 ht2Source.put(p); 244 } 245 p = new Put(Bytes.toBytes("row" + 9999)); 246 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); 247 ht2Source.put(p); 248 249 // ensure replication completed 250 Thread.sleep(SLEEP_TIME); 251 int rowCount_ht1Source = utility1.countRows(ht1Source); 252 for (int i = 0; i < NB_RETRIES; i++) { 253 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 254 if (i==NB_RETRIES-1) { 255 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1, 256 rowCount_ht1TargetAtPeer1); 257 } 258 if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) { 259 break; 260 } 261 Thread.sleep(SLEEP_TIME); 262 } 263 264 int rowCount_ht2Source = utility1.countRows(ht2Source); 265 for (int i = 0; i < NB_RETRIES; i++) { 266 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 267 if (i==NB_RETRIES-1) { 268 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1, 269 rowCount_ht2TargetAtPeer1); 270 } 271 if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) { 272 break; 273 } 274 Thread.sleep(SLEEP_TIME); 275 } 276 } 277 278 private void mimicSyncUpAfterDelete() throws Exception { 279 LOG.debug("mimicSyncUpAfterDelete"); 280 utility2.shutdownMiniHBaseCluster(); 281 282 List<Delete> list = new ArrayList<>(); 283 // delete half of the rows 284 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 285 String rowKey = "row" + i; 286 Delete del = new Delete(Bytes.toBytes(rowKey)); 287 list.add(del); 288 } 289 ht1Source.delete(list); 290 291 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 292 String rowKey = "row" + i; 293 Delete del = new Delete(Bytes.toBytes(rowKey)); 294 list.add(del); 295 } 296 ht2Source.delete(list); 297 298 int rowCount_ht1Source = utility1.countRows(ht1Source); 299 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, 300 rowCount_ht1Source); 301 302 int rowCount_ht2Source = utility1.countRows(ht2Source); 303 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 304 101, rowCount_ht2Source); 305 306 utility1.shutdownMiniHBaseCluster(); 307 utility2.restartHBaseCluster(1); 308 309 Thread.sleep(SLEEP_TIME); 310 311 // before sync up 312 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 313 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 314 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); 315 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); 316 317 // After sync up 318 for (int i = 0; i < NB_RETRIES; i++) { 319 syncUp(utility1); 320 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 321 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 322 if (i == NB_RETRIES - 1) { 323 if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) { 324 // syncUP still failed. Let's look at the source in case anything wrong there 325 utility1.restartHBaseCluster(1); 326 rowCount_ht1Source = utility1.countRows(ht1Source); 327 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); 328 rowCount_ht2Source = utility1.countRows(ht2Source); 329 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); 330 } 331 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, 332 rowCount_ht1TargetAtPeer1); 333 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, 334 rowCount_ht2TargetAtPeer1); 335 } 336 if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) { 337 LOG.info("SyncUpAfterDelete succeeded at retry = " + i); 338 break; 339 } else { 340 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 341 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 342 + rowCount_ht2TargetAtPeer1); 343 } 344 Thread.sleep(SLEEP_TIME); 345 } 346 } 347 348 private void mimicSyncUpAfterPut() throws Exception { 349 LOG.debug("mimicSyncUpAfterPut"); 350 utility1.restartHBaseCluster(1); 351 utility2.shutdownMiniHBaseCluster(); 352 353 Put p; 354 // another 100 + 1 row to t1_syncup 355 // we should see 100 + 2 rows now 356 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 357 p = new Put(Bytes.toBytes("row" + i)); 358 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 359 ht1Source.put(p); 360 } 361 p = new Put(Bytes.toBytes("row" + 9998)); 362 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); 363 ht1Source.put(p); 364 365 // another 200 + 1 row to t1_syncup 366 // we should see 200 + 2 rows now 367 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 368 p = new Put(Bytes.toBytes("row" + i)); 369 p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); 370 ht2Source.put(p); 371 } 372 p = new Put(Bytes.toBytes("row" + 9998)); 373 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); 374 ht2Source.put(p); 375 376 int rowCount_ht1Source = utility1.countRows(ht1Source); 377 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); 378 int rowCount_ht2Source = utility1.countRows(ht2Source); 379 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); 380 381 utility1.shutdownMiniHBaseCluster(); 382 utility2.restartHBaseCluster(1); 383 384 Thread.sleep(SLEEP_TIME); 385 386 // before sync up 387 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 388 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 389 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, 390 rowCount_ht1TargetAtPeer1); 391 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, 392 rowCount_ht2TargetAtPeer1); 393 394 // after syun up 395 for (int i = 0; i < NB_RETRIES; i++) { 396 syncUp(utility1); 397 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); 398 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); 399 if (i == NB_RETRIES - 1) { 400 if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) { 401 // syncUP still failed. Let's look at the source in case anything wrong there 402 utility1.restartHBaseCluster(1); 403 rowCount_ht1Source = utility1.countRows(ht1Source); 404 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); 405 rowCount_ht2Source = utility1.countRows(ht2Source); 406 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); 407 } 408 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, 409 rowCount_ht1TargetAtPeer1); 410 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, 411 rowCount_ht2TargetAtPeer1); 412 } 413 if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) { 414 LOG.info("SyncUpAfterPut succeeded at retry = " + i); 415 break; 416 } else { 417 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 418 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 419 + rowCount_ht2TargetAtPeer1); 420 } 421 Thread.sleep(SLEEP_TIME); 422 } 423 } 424 425 protected void syncUp(HBaseTestingUtility ut) throws Exception { 426 ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]); 427 } 428}