001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_RETRIES; 022import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.NB_ROWS_IN_BATCH; 023import static org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll.SLEEP_TIME; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.stream.Collectors; 030import org.apache.hadoop.fs.FileAlreadyExistsException; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Tag(ReplicationTests.TAG) 048@Tag(LargeTests.TAG) 049public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class); 052 053 /** 054 * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check 055 * the puts and deletes are not replicated back to the originating cluster. 056 */ 057 @Test 058 public void testSyncUpTool() throws Exception { 059 // Set up Replication: on Master and one Slave 060 // Table: t1_syncup and t2_syncup 061 // columnfamily: 062 // 'cf1' : replicated 063 // 'norep': not replicated 064 setupReplication(); 065 066 // 067 // at Master: 068 // t1_syncup: put 100 rows into cf1, and 1 rows into norep 069 // t2_syncup: put 200 rows into cf1, and 1 rows into norep 070 // 071 // verify correctly replicated to slave 072 putAndReplicateRows(); 073 074 // Verify delete works 075 // 076 // step 1: stop hbase on Slave 077 // 078 // step 2: at Master: 079 // t1_syncup: delete 50 rows from cf1 080 // t2_syncup: delete 100 rows from cf1 081 // no change on 'norep' 082 // 083 // step 3: stop hbase on master, restart hbase on Slave 084 // 085 // step 4: verify Slave still have the rows before delete 086 // t1_syncup: 100 rows from cf1 087 // t2_syncup: 200 rows from cf1 088 // 089 // step 5: run syncup tool on Master 090 // 091 // step 6: verify that delete show up on Slave 092 // t1_syncup: 50 rows from cf1 093 // t2_syncup: 100 rows from cf1 094 // 095 // verify correctly replicated to Slave 096 mimicSyncUpAfterDelete(); 097 098 // Verify put works 099 // 100 // step 1: stop hbase on Slave 101 // 102 // step 2: at Master: 103 // t1_syncup: put 100 rows from cf1 104 // t2_syncup: put 200 rows from cf1 105 // and put another row on 'norep' 106 // ATTN: 107 // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively 108 // put to 'norep' will add a new row. 109 // 110 // step 3: stop hbase on master, restart hbase on Slave 111 // 112 // step 4: verify Slave still has the rows before put 113 // t1_syncup: 50 rows from cf1 114 // t2_syncup: 100 rows from cf1 115 // 116 // step 5: run syncup tool on Master 117 // 118 // step 6: verify that put show up on Slave and 'norep' does not 119 // t1_syncup: 100 rows from cf1 120 // t2_syncup: 200 rows from cf1 121 // 122 // verify correctly replicated to Slave 123 mimicSyncUpAfterPut(); 124 } 125 126 private void putAndReplicateRows() throws Exception { 127 LOG.debug("putAndReplicateRows"); 128 // add rows to Master cluster, 129 Put p; 130 131 // 100 + 1 row to t1_syncup 132 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 133 p = new Put(Bytes.toBytes("row" + i)); 134 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 135 ht1Source.put(p); 136 } 137 p = new Put(Bytes.toBytes("row" + 9999)); 138 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 139 ht1Source.put(p); 140 141 // 200 + 1 row to t2_syncup 142 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 143 p = new Put(Bytes.toBytes("row" + i)); 144 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 145 ht2Source.put(p); 146 } 147 p = new Put(Bytes.toBytes("row" + 9999)); 148 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999)); 149 ht2Source.put(p); 150 151 // ensure replication completed 152 Thread.sleep(SLEEP_TIME); 153 int rowCountHt1Source = countRows(ht1Source); 154 for (int i = 0; i < NB_RETRIES; i++) { 155 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 156 if (i == NB_RETRIES - 1) { 157 assertEquals(rowCountHt1Source - 1, rowCountHt1TargetAtPeer1, 158 "t1_syncup has 101 rows on source, and 100 on slave1"); 159 } 160 if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) { 161 break; 162 } 163 Thread.sleep(SLEEP_TIME); 164 } 165 166 int rowCountHt2Source = countRows(ht2Source); 167 for (int i = 0; i < NB_RETRIES; i++) { 168 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 169 if (i == NB_RETRIES - 1) { 170 assertEquals(rowCountHt2Source - 1, rowCountHt2TargetAtPeer1, 171 "t2_syncup has 201 rows on source, and 200 on slave1"); 172 } 173 if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) { 174 break; 175 } 176 Thread.sleep(SLEEP_TIME); 177 } 178 } 179 180 private void mimicSyncUpAfterDelete() throws Exception { 181 LOG.debug("mimicSyncUpAfterDelete"); 182 shutDownTargetHBaseCluster(); 183 184 List<Delete> list = new ArrayList<>(); 185 // delete half of the rows 186 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { 187 String rowKey = "row" + i; 188 Delete del = new Delete(Bytes.toBytes(rowKey)); 189 list.add(del); 190 } 191 ht1Source.delete(list); 192 193 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 194 String rowKey = "row" + i; 195 Delete del = new Delete(Bytes.toBytes(rowKey)); 196 list.add(del); 197 } 198 ht2Source.delete(list); 199 200 int rowCount_ht1Source = countRows(ht1Source); 201 assertEquals(51, rowCount_ht1Source, 202 "t1_syncup has 51 rows on source, after remove 50 of the replicated colfam"); 203 204 int rowCount_ht2Source = countRows(ht2Source); 205 assertEquals(101, rowCount_ht2Source, 206 "t2_syncup has 101 rows on source, after remove 100 of the replicated colfam"); 207 List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream() 208 .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList()); 209 shutDownSourceHBaseCluster(); 210 restartTargetHBaseCluster(1); 211 212 Thread.sleep(SLEEP_TIME); 213 214 // before sync up 215 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 216 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 217 assertEquals(100, rowCountHt1TargetAtPeer1, "@Peer1 t1_syncup should still have 100 rows"); 218 assertEquals(200, rowCountHt2TargetAtPeer1, "@Peer1 t2_syncup should still have 200 rows"); 219 220 syncUp(UTIL1); 221 222 // After sync up 223 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 224 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 225 assertEquals(50, rowCountHt1TargetAtPeer1, 226 "@Peer1 t1_syncup should be sync up and have 50 rows"); 227 assertEquals(100, rowCountHt2TargetAtPeer1, 228 "@Peer1 t2_syncup should be sync up and have 100 rows"); 229 230 // check we have recorded the dead region servers and also have an info file 231 Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); 232 Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); 233 FileSystem fs = UTIL1.getTestFileSystem(); 234 for (ServerName sn : sourceRses) { 235 assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName()))); 236 } 237 assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE))); 238 assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length); 239 240 restartSourceHBaseCluster(1); 241 // should finally removed all the records after restart 242 UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0); 243 } 244 245 private void mimicSyncUpAfterPut() throws Exception { 246 LOG.debug("mimicSyncUpAfterPut"); 247 shutDownTargetHBaseCluster(); 248 249 Put p; 250 // another 100 + 1 row to t1_syncup 251 // we should see 100 + 2 rows now 252 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { 253 p = new Put(Bytes.toBytes("row" + i)); 254 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 255 ht1Source.put(p); 256 } 257 p = new Put(Bytes.toBytes("row" + 9998)); 258 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 259 ht1Source.put(p); 260 261 // another 200 + 1 row to t1_syncup 262 // we should see 200 + 2 rows now 263 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { 264 p = new Put(Bytes.toBytes("row" + i)); 265 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i)); 266 ht2Source.put(p); 267 } 268 p = new Put(Bytes.toBytes("row" + 9998)); 269 p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998)); 270 ht2Source.put(p); 271 272 int rowCount_ht1Source = countRows(ht1Source); 273 assertEquals(102, rowCount_ht1Source, "t1_syncup has 102 rows on source"); 274 int rowCount_ht2Source = countRows(ht2Source); 275 assertEquals(202, rowCount_ht2Source, "t2_syncup has 202 rows on source"); 276 277 shutDownSourceHBaseCluster(); 278 restartTargetHBaseCluster(1); 279 280 Thread.sleep(SLEEP_TIME); 281 282 // before sync up 283 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 284 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 285 assertEquals(50, rowCountHt1TargetAtPeer1, 286 "@Peer1 t1_syncup should be NOT sync up and have 50 rows"); 287 assertEquals(100, rowCountHt2TargetAtPeer1, 288 "@Peer1 t2_syncup should be NOT sync up and have 100 rows"); 289 290 syncUp(UTIL1); 291 292 // after sync up 293 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 294 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 295 assertEquals(100, rowCountHt1TargetAtPeer1, 296 "@Peer1 t1_syncup should be sync up and have 100 rows"); 297 assertEquals(200, rowCountHt2TargetAtPeer1, 298 "@Peer1 t2_syncup should be sync up and have 200 rows"); 299 } 300 301 /** 302 * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details. 303 */ 304 @Test 305 public void testStartANewSyncUpToolAfterFailed() throws Exception { 306 // Start syncUpTool for the first time with non-force mode, 307 // let's assume that this will fail in sync data, 308 // this does not affect our test results 309 syncUp(UTIL1); 310 Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); 311 Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); 312 Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE); 313 FileSystem fs = UTIL1.getTestFileSystem(); 314 assertTrue(fs.exists(replicationInfoPath)); 315 FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath); 316 317 // Start syncUpTool for the second time with non-force mode, 318 // startup will fail because replication info file already exists 319 try { 320 syncUp(UTIL1); 321 } catch (Exception e) { 322 assertTrue(e instanceof FileAlreadyExistsException, 323 "e should be a FileAlreadyExistsException"); 324 } 325 FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath); 326 assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime()); 327 328 // Start syncUpTool for the third time with force mode, 329 // startup will success and create a new replication info file 330 syncUp(UTIL1, new String[] { "-f" }); 331 FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath); 332 assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime()); 333 } 334}