001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.ReplicationTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.mapreduce.Job; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Category({ ReplicationTests.class, LargeTests.class }) 073public class TestVerifyReplication extends TestReplicationBase { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestVerifyReplication.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); 080 081 private static final String PEER_ID = "2"; 082 private static final TableName peerTableName = TableName.valueOf("peerTest"); 083 private static Table htable3; 084 085 @Rule 086 public TestName name = new TestName(); 087 088 @Before 089 public void setUp() throws Exception { 090 cleanUp(); 091 UTIL2.deleteTableData(peerTableName); 092 } 093 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 TestReplicationBase.setUpBeforeClass(); 097 098 TableDescriptor peerTable = 099 TableDescriptorBuilder.newBuilder(peerTableName) 100 .setColumnFamily( 101 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 102 .build(); 103 104 Connection connection2 = ConnectionFactory.createConnection(CONF2); 105 try (Admin admin2 = connection2.getAdmin()) { 106 admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 107 } 108 htable3 = connection2.getTable(peerTableName); 109 } 110 111 static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 112 throws IOException, InterruptedException, ClassNotFoundException { 113 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 114 if (job == null) { 115 fail("Job wasn't created, see the log"); 116 } 117 if (!job.waitForCompletion(true)) { 118 fail("Job failed, see the log"); 119 } 120 assertEquals(expectedGoodRows, 121 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 122 assertEquals(expectedBadRows, 123 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 124 } 125 126 /** 127 * Do a small loading into a table, make sure the data is really the same, then run the 128 * VerifyReplication job to check the results. Do a second comparison where all the cells are 129 * different. 130 */ 131 @Test 132 public void testVerifyRepJob() throws Exception { 133 // Populate the tables, at the same time it guarantees that the tables are 134 // identical since it does the check 135 runSmallBatchTest(); 136 137 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 138 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 139 140 Scan scan = new Scan(); 141 ResultScanner rs = htable2.getScanner(scan); 142 Put put = null; 143 for (Result result : rs) { 144 put = new Put(result.getRow()); 145 Cell firstVal = result.rawCells()[0]; 146 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 147 Bytes.toBytes("diff data")); 148 htable2.put(put); 149 } 150 Delete delete = new Delete(put.getRow()); 151 htable2.delete(delete); 152 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 153 } 154 155 /** 156 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 157 * delete marker is replicated, run verify replication with and without raw to check the results. 158 */ 159 @Test 160 public void testVerifyRepJobWithRawOptions() throws Exception { 161 LOG.info(name.getMethodName()); 162 163 final TableName tableName = TableName.valueOf(name.getMethodName()); 164 byte[] familyname = Bytes.toBytes("fam_raw"); 165 byte[] row = Bytes.toBytes("row_raw"); 166 167 Table lHtable1 = null; 168 Table lHtable2 = null; 169 170 try { 171 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 172 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 173 TableDescriptor table = 174 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 175 176 Connection connection1 = ConnectionFactory.createConnection(CONF1); 177 Connection connection2 = ConnectionFactory.createConnection(CONF2); 178 try (Admin admin1 = connection1.getAdmin()) { 179 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 180 } 181 try (Admin admin2 = connection2.getAdmin()) { 182 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 183 } 184 UTIL1.waitUntilAllRegionsAssigned(tableName); 185 UTIL2.waitUntilAllRegionsAssigned(tableName); 186 187 lHtable1 = UTIL1.getConnection().getTable(tableName); 188 lHtable2 = UTIL2.getConnection().getTable(tableName); 189 190 Put put = new Put(row); 191 put.addColumn(familyname, row, row); 192 lHtable1.put(put); 193 194 Get get = new Get(row); 195 for (int i = 0; i < NB_RETRIES; i++) { 196 if (i == NB_RETRIES - 1) { 197 fail("Waited too much time for put replication"); 198 } 199 Result res = lHtable2.get(get); 200 if (res.isEmpty()) { 201 LOG.info("Row not available"); 202 Thread.sleep(SLEEP_TIME); 203 } else { 204 assertArrayEquals(res.value(), row); 205 break; 206 } 207 } 208 209 Delete del = new Delete(row); 210 lHtable1.delete(del); 211 212 get = new Get(row); 213 for (int i = 0; i < NB_RETRIES; i++) { 214 if (i == NB_RETRIES - 1) { 215 fail("Waited too much time for del replication"); 216 } 217 Result res = lHtable2.get(get); 218 if (res.size() >= 1) { 219 LOG.info("Row not deleted"); 220 Thread.sleep(SLEEP_TIME); 221 } else { 222 break; 223 } 224 } 225 226 // Checking verifyReplication for the default behavior. 227 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 228 runVerifyReplication(argsWithoutRaw, 0, 0); 229 230 // Checking verifyReplication with raw 231 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 232 runVerifyReplication(argsWithRawAsTrue, 1, 0); 233 } finally { 234 if (lHtable1 != null) { 235 lHtable1.close(); 236 } 237 if (lHtable2 != null) { 238 lHtable2.close(); 239 } 240 } 241 } 242 243 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 244 throws IOException { 245 FileSystem fs = FileSystem.get(conf); 246 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 247 assertNotNull(subDirectories); 248 assertEquals(subDirectories.length, expectedCount); 249 for (int i = 0; i < expectedCount; i++) { 250 assertTrue(subDirectories[i].isDirectory()); 251 } 252 } 253 254 @Test 255 public void testVerifyRepJobWithQuorumAddress() throws Exception { 256 // Populate the tables, at the same time it guarantees that the tables are 257 // identical since it does the check 258 runSmallBatchTest(); 259 260 // with a quorum address (a cluster key) 261 String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() }; 262 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 263 264 Scan scan = new Scan(); 265 ResultScanner rs = htable2.getScanner(scan); 266 Put put = null; 267 for (Result result : rs) { 268 put = new Put(result.getRow()); 269 Cell firstVal = result.rawCells()[0]; 270 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 271 Bytes.toBytes("diff data")); 272 htable2.put(put); 273 } 274 Delete delete = new Delete(put.getRow()); 275 htable2.delete(delete); 276 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 277 } 278 279 @Test 280 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 281 // Populate the tables, at the same time it guarantees that the tables are 282 // identical since it does the check 283 runSmallBatchTest(); 284 285 // Take source and target tables snapshot 286 Path rootDir = CommonFSUtils.getRootDir(CONF1); 287 FileSystem fs = rootDir.getFileSystem(CONF1); 288 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 289 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 290 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 291 292 // Take target snapshot 293 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 294 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 295 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 296 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 297 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 298 299 String peerFSAddress = peerFs.getUri().toString(); 300 String tmpPath1 = UTIL1.getRandomDir().toString(); 301 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 302 303 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 304 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 305 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 306 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 307 tableName.getNameAsString() }; 308 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 309 checkRestoreTmpDir(CONF1, tmpPath1, 1); 310 checkRestoreTmpDir(CONF2, tmpPath2, 1); 311 312 Scan scan = new Scan(); 313 ResultScanner rs = htable2.getScanner(scan); 314 Put put = null; 315 for (Result result : rs) { 316 put = new Put(result.getRow()); 317 Cell firstVal = result.rawCells()[0]; 318 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 319 Bytes.toBytes("diff data")); 320 htable2.put(put); 321 } 322 Delete delete = new Delete(put.getRow()); 323 htable2.delete(delete); 324 325 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 326 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 327 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 328 329 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 330 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 331 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 332 333 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 334 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 335 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 336 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 337 tableName.getNameAsString() }; 338 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 339 checkRestoreTmpDir(CONF1, tmpPath1, 2); 340 checkRestoreTmpDir(CONF2, tmpPath2, 2); 341 } 342 343 static void runBatchCopyTest() throws Exception { 344 // normal Batch tests for htable1 345 loadData("", row, noRepfamName); 346 347 Scan scan1 = new Scan(); 348 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 349 ResultScanner scanner1 = htable1.getScanner(scan1); 350 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 351 for (Result result : res1) { 352 Put put = new Put(result.getRow()); 353 for (Cell cell : result.rawCells()) { 354 put.add(cell); 355 } 356 puts.add(put); 357 } 358 scanner1.close(); 359 assertEquals(NB_ROWS_IN_BATCH, res1.length); 360 361 // Copy the data to htable3 362 htable3.put(puts); 363 364 Scan scan2 = new Scan(); 365 ResultScanner scanner2 = htable3.getScanner(scan2); 366 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 367 scanner2.close(); 368 assertEquals(NB_ROWS_IN_BATCH, res2.length); 369 } 370 371 @Test 372 public void testVerifyRepJobWithPeerTableName() throws Exception { 373 // Populate the tables with same data 374 runBatchCopyTest(); 375 376 // with a peerTableName along with quorum address (a cluster key) 377 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 378 UTIL2.getClusterKey(), tableName.getNameAsString() }; 379 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 380 381 UTIL2.deleteTableData(peerTableName); 382 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 383 } 384 385 @Test 386 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 387 // Populate the tables with same data 388 runBatchCopyTest(); 389 390 // Take source and target tables snapshot 391 Path rootDir = CommonFSUtils.getRootDir(CONF1); 392 FileSystem fs = rootDir.getFileSystem(CONF1); 393 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 394 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 395 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 396 397 // Take target snapshot 398 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 399 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 400 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 401 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 402 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 403 404 String peerFSAddress = peerFs.getUri().toString(); 405 String tmpPath1 = UTIL1.getRandomDir().toString(); 406 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 407 408 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 409 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 410 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 411 "--peerFSAddress=" + peerFSAddress, 412 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 413 tableName.getNameAsString() }; 414 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 415 checkRestoreTmpDir(CONF1, tmpPath1, 1); 416 checkRestoreTmpDir(CONF2, tmpPath2, 1); 417 418 Scan scan = new Scan(); 419 ResultScanner rs = htable3.getScanner(scan); 420 Put put = null; 421 for (Result result : rs) { 422 put = new Put(result.getRow()); 423 Cell firstVal = result.rawCells()[0]; 424 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 425 Bytes.toBytes("diff data")); 426 htable3.put(put); 427 } 428 Delete delete = new Delete(put.getRow()); 429 htable3.delete(delete); 430 431 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 432 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 433 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 434 435 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 436 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 437 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 438 439 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 440 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 441 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 442 "--peerFSAddress=" + peerFSAddress, 443 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 444 tableName.getNameAsString() }; 445 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 446 checkRestoreTmpDir(CONF1, tmpPath1, 2); 447 checkRestoreTmpDir(CONF2, tmpPath2, 2); 448 } 449 450 @AfterClass 451 public static void tearDownAfterClass() throws Exception { 452 htable3.close(); 453 TestReplicationBase.tearDownAfterClass(); 454 } 455}