001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; 022import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; 023import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; 024import static org.junit.Assert.assertEquals; 025 026import java.util.ArrayList; 027import java.util.List; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.client.Delete; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.testclassification.ReplicationTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@Category({ ReplicationTests.class, LargeTests.class }) 041public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class); 046 047 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 048 049 /** 050 * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check 051 * the puts and deletes are not replicated back to the originating cluster. 052 */ 053 @Test 054 public void testSyncUpTool() throws Exception { 055 056 /** 057 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: 058 * 'cf1' : replicated 'norep': not replicated 059 */ 060 setupReplication(); 061 062 /** 063 * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows 064 * into cf1, and 1 rows into norep verify correctly replicated to slave 065 */ 066 putAndReplicateRows(); 067 068 /** 069 * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows 070 * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on 071 * master, restart hbase on Slave step 4: verify Slave still have the rows before delete 072 * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master 073 * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows 074 * from cf1 verify correctly replicated to Slave 075 */ 076 mimicSyncUpAfterDelete(); 077 078 /** 079 * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from 080 * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will 081 * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will 082 * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave 083 * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step 084 * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not 085 * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to 086 * Slave 087 */ 088 mimicSyncUpAfterPut(); 089 } 090 091 private void putAndReplicateRows() throws Exception { 092 LOG.debug("putAndReplicateRows"); 093 // add rows to Master cluster, 094 Put p; 095 096 // 100 + 1 row to t1_syncup 097 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 098 p = new Put(Bytes.toBytes("row" + i)); 099 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 100 ht1Source.put(p); 101 } 102 p = new Put(Bytes.toBytes("row" + 9999)); 103 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 104 ht1Source.put(p); 105 106 // 200 + 1 row to t2_syncup 107 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 108 p = new Put(Bytes.toBytes("row" + i)); 109 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 110 ht2Source.put(p); 111 } 112 p = new Put(Bytes.toBytes("row" + 9999)); 113 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 114 ht2Source.put(p); 115 116 // ensure replication completed 117 Thread.sleep(SLEEP_TIME); 118 int rowCountHt1Source = countRows(ht1Source); 119 for (int i = 0; i < NB_RETRIES; i++) { 120 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 121 if (i == NB_RETRIES - 1) { 122 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1, 123 rowCountHt1TargetAtPeer1); 124 } 125 if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) { 126 break; 127 } 128 Thread.sleep(SLEEP_TIME); 129 } 130 131 int rowCountHt2Source = countRows(ht2Source); 132 for (int i = 0; i < NB_RETRIES; i++) { 133 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 134 if (i == NB_RETRIES - 1) { 135 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1, 136 rowCountHt2TargetAtPeer1); 137 } 138 if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) { 139 break; 140 } 141 Thread.sleep(SLEEP_TIME); 142 } 143 } 144 145 private void mimicSyncUpAfterDelete() throws Exception { 146 LOG.debug("mimicSyncUpAfterDelete"); 147 shutDownTargetHBaseCluster(); 148 149 List<Delete> list = new ArrayList<>(); 150 // delete half of the rows 151 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 152 String rowKey = "row" + i; 153 Delete del = new Delete(Bytes.toBytes(rowKey)); 154 list.add(del); 155 } 156 ht1Source.delete(list); 157 158 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 159 String rowKey = "row" + i; 160 Delete del = new Delete(Bytes.toBytes(rowKey)); 161 list.add(del); 162 } 163 ht2Source.delete(list); 164 165 int rowCount_ht1Source = countRows(ht1Source); 166 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, 167 rowCount_ht1Source); 168 169 int rowCount_ht2Source = countRows(ht2Source); 170 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, 171 rowCount_ht2Source); 172 173 shutDownSourceHBaseCluster(); 174 restartTargetHBaseCluster(1); 175 176 Thread.sleep(SLEEP_TIME); 177 178 // before sync up 179 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 180 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 181 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); 182 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); 183 184 // After sync up 185 for (int i = 0; i < NB_RETRIES; i++) { 186 syncUp(UTIL1); 187 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 188 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 189 if (i == NB_RETRIES - 1) { 190 if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { 191 // syncUP still failed. Let's look at the source in case anything wrong there 192 restartSourceHBaseCluster(1); 193 rowCount_ht1Source = countRows(ht1Source); 194 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); 195 rowCount_ht2Source = countRows(ht2Source); 196 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); 197 } 198 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, 199 rowCountHt1TargetAtPeer1); 200 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, 201 rowCountHt2TargetAtPeer1); 202 } 203 if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) { 204 LOG.info("SyncUpAfterDelete succeeded at retry = " + i); 205 break; 206 } else { 207 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 208 + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 209 + rowCountHt2TargetAtPeer1); 210 } 211 Thread.sleep(SLEEP_TIME); 212 } 213 } 214 215 private void mimicSyncUpAfterPut() throws Exception { 216 LOG.debug("mimicSyncUpAfterPut"); 217 restartSourceHBaseCluster(1); 218 shutDownTargetHBaseCluster(); 219 220 Put p; 221 // another 100 + 1 row to t1_syncup 222 // we should see 100 + 2 rows now 223 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 224 p = new Put(Bytes.toBytes("row" + i)); 225 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 226 ht1Source.put(p); 227 } 228 p = new Put(Bytes.toBytes("row" + 9998)); 229 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 230 ht1Source.put(p); 231 232 // another 200 + 1 row to t1_syncup 233 // we should see 200 + 2 rows now 234 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 235 p = new Put(Bytes.toBytes("row" + i)); 236 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 237 ht2Source.put(p); 238 } 239 p = new Put(Bytes.toBytes("row" + 9998)); 240 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 241 ht2Source.put(p); 242 243 int rowCount_ht1Source = countRows(ht1Source); 244 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); 245 int rowCount_ht2Source = countRows(ht2Source); 246 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); 247 248 shutDownSourceHBaseCluster(); 249 restartTargetHBaseCluster(1); 250 251 Thread.sleep(SLEEP_TIME); 252 253 // before sync up 254 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 255 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 256 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, 257 rowCountHt1TargetAtPeer1); 258 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, 259 rowCountHt2TargetAtPeer1); 260 261 // after syun up 262 for (int i = 0; i < NB_RETRIES; i++) { 263 syncUp(UTIL1); 264 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 265 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 266 if (i == NB_RETRIES - 1) { 267 if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { 268 // syncUP still failed. Let's look at the source in case anything wrong there 269 restartSourceHBaseCluster(1); 270 rowCount_ht1Source = countRows(ht1Source); 271 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); 272 rowCount_ht2Source = countRows(ht2Source); 273 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); 274 } 275 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, 276 rowCountHt1TargetAtPeer1); 277 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, 278 rowCountHt2TargetAtPeer1); 279 } 280 if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) { 281 LOG.info("SyncUpAfterPut succeeded at retry = " + i); 282 break; 283 } else { 284 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" 285 + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" 286 + rowCountHt2TargetAtPeer1); 287 } 288 Thread.sleep(SLEEP_TIME); 289 } 290 } 291}