001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.ReplicationTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.mapreduce.Job; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ ReplicationTests.class, LargeTests.class }) 072public class TestVerifyReplication extends TestReplicationBase { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestVerifyReplication.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); 079 080 private static final String PEER_ID = "2"; 081 private static final TableName peerTableName = TableName.valueOf("peerTest"); 082 private static Table htable3; 083 084 @Rule 085 public TestName name = new TestName(); 086 087 @Before 088 public void setUp() throws Exception { 089 cleanUp(); 090 UTIL2.deleteTableData(peerTableName); 091 } 092 093 @BeforeClass 094 public static void setUpBeforeClass() throws Exception { 095 TestReplicationBase.setUpBeforeClass(); 096 097 TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily( 098 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100) 099 .build()).build(); 100 101 Connection connection2 = ConnectionFactory.createConnection(CONF2); 102 try (Admin admin2 = connection2.getAdmin()) { 103 admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 104 } 105 htable3 = connection2.getTable(peerTableName); 106 } 107 108 static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 109 throws IOException, InterruptedException, ClassNotFoundException { 110 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 111 if (job == null) { 112 fail("Job wasn't created, see the log"); 113 } 114 if (!job.waitForCompletion(true)) { 115 fail("Job failed, see the log"); 116 } 117 assertEquals(expectedGoodRows, 118 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 119 assertEquals(expectedBadRows, 120 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 121 } 122 123 /** 124 * Do a small loading into a table, make sure the data is really the same, then run the 125 * VerifyReplication job to check the results. Do a second comparison where all the cells are 126 * different. 127 */ 128 @Test 129 public void testVerifyRepJob() throws Exception { 130 // Populate the tables, at the same time it guarantees that the tables are 131 // identical since it does the check 132 runSmallBatchTest(); 133 134 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 135 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 136 137 Scan scan = new Scan(); 138 ResultScanner rs = htable2.getScanner(scan); 139 Put put = null; 140 for (Result result : rs) { 141 put = new Put(result.getRow()); 142 Cell firstVal = result.rawCells()[0]; 143 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 144 Bytes.toBytes("diff data")); 145 htable2.put(put); 146 } 147 Delete delete = new Delete(put.getRow()); 148 htable2.delete(delete); 149 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 150 } 151 152 /** 153 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 154 * delete marker is replicated, run verify replication with and without raw to check the results. 155 */ 156 @Test 157 public void testVerifyRepJobWithRawOptions() throws Exception { 158 LOG.info(name.getMethodName()); 159 160 final TableName tableName = TableName.valueOf(name.getMethodName()); 161 byte[] familyname = Bytes.toBytes("fam_raw"); 162 byte[] row = Bytes.toBytes("row_raw"); 163 164 Table lHtable1 = null; 165 Table lHtable2 = null; 166 167 try { 168 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 169 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 170 TableDescriptor table = 171 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 172 173 Connection connection1 = ConnectionFactory.createConnection(CONF1); 174 Connection connection2 = ConnectionFactory.createConnection(CONF2); 175 try (Admin admin1 = connection1.getAdmin()) { 176 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 177 } 178 try (Admin admin2 = connection2.getAdmin()) { 179 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 180 } 181 UTIL1.waitUntilAllRegionsAssigned(tableName); 182 UTIL2.waitUntilAllRegionsAssigned(tableName); 183 184 lHtable1 = UTIL1.getConnection().getTable(tableName); 185 lHtable2 = UTIL2.getConnection().getTable(tableName); 186 187 Put put = new Put(row); 188 put.addColumn(familyname, row, row); 189 lHtable1.put(put); 190 191 Get get = new Get(row); 192 for (int i = 0; i < NB_RETRIES; i++) { 193 if (i == NB_RETRIES - 1) { 194 fail("Waited too much time for put replication"); 195 } 196 Result res = lHtable2.get(get); 197 if (res.isEmpty()) { 198 LOG.info("Row not available"); 199 Thread.sleep(SLEEP_TIME); 200 } else { 201 assertArrayEquals(res.value(), row); 202 break; 203 } 204 } 205 206 Delete del = new Delete(row); 207 lHtable1.delete(del); 208 209 get = new Get(row); 210 for (int i = 0; i < NB_RETRIES; i++) { 211 if (i == NB_RETRIES - 1) { 212 fail("Waited too much time for del replication"); 213 } 214 Result res = lHtable2.get(get); 215 if (res.size() >= 1) { 216 LOG.info("Row not deleted"); 217 Thread.sleep(SLEEP_TIME); 218 } else { 219 break; 220 } 221 } 222 223 // Checking verifyReplication for the default behavior. 224 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 225 runVerifyReplication(argsWithoutRaw, 0, 0); 226 227 // Checking verifyReplication with raw 228 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 229 runVerifyReplication(argsWithRawAsTrue, 1, 0); 230 } finally { 231 if (lHtable1 != null) { 232 lHtable1.close(); 233 } 234 if (lHtable2 != null) { 235 lHtable2.close(); 236 } 237 } 238 } 239 240 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 241 throws IOException { 242 FileSystem fs = FileSystem.get(conf); 243 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 244 assertNotNull(subDirectories); 245 assertEquals(subDirectories.length, expectedCount); 246 for (int i = 0; i < expectedCount; i++) { 247 assertTrue(subDirectories[i].isDirectory()); 248 } 249 } 250 251 252 @Test 253 public void testVerifyRepJobWithQuorumAddress() throws Exception { 254 // Populate the tables, at the same time it guarantees that the tables are 255 // identical since it does the check 256 runSmallBatchTest(); 257 258 // with a quorum address (a cluster key) 259 String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() }; 260 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 261 262 Scan scan = new Scan(); 263 ResultScanner rs = htable2.getScanner(scan); 264 Put put = null; 265 for (Result result : rs) { 266 put = new Put(result.getRow()); 267 Cell firstVal = result.rawCells()[0]; 268 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 269 Bytes.toBytes("diff data")); 270 htable2.put(put); 271 } 272 Delete delete = new Delete(put.getRow()); 273 htable2.delete(delete); 274 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 275 } 276 277 @Test 278 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 279 // Populate the tables, at the same time it guarantees that the tables are 280 // identical since it does the check 281 runSmallBatchTest(); 282 283 // Take source and target tables snapshot 284 Path rootDir = CommonFSUtils.getRootDir(CONF1); 285 FileSystem fs = rootDir.getFileSystem(CONF1); 286 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 287 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 288 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 289 290 // Take target snapshot 291 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 292 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 293 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 294 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 295 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 296 297 String peerFSAddress = peerFs.getUri().toString(); 298 String tmpPath1 = UTIL1.getRandomDir().toString(); 299 String tmpPath2 = "/tmp" + System.currentTimeMillis(); 300 301 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 302 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 303 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 304 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 305 tableName.getNameAsString() }; 306 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 307 checkRestoreTmpDir(CONF1, tmpPath1, 1); 308 checkRestoreTmpDir(CONF2, tmpPath2, 1); 309 310 Scan scan = new Scan(); 311 ResultScanner rs = htable2.getScanner(scan); 312 Put put = null; 313 for (Result result : rs) { 314 put = new Put(result.getRow()); 315 Cell firstVal = result.rawCells()[0]; 316 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 317 Bytes.toBytes("diff data")); 318 htable2.put(put); 319 } 320 Delete delete = new Delete(put.getRow()); 321 htable2.delete(delete); 322 323 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 324 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 325 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 326 327 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 328 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 329 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 330 331 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 332 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 333 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 334 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 335 tableName.getNameAsString() }; 336 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 337 checkRestoreTmpDir(CONF1, tmpPath1, 2); 338 checkRestoreTmpDir(CONF2, tmpPath2, 2); 339 } 340 341 static void runBatchCopyTest() throws Exception { 342 // normal Batch tests for htable1 343 loadData("", row, noRepfamName); 344 345 Scan scan1 = new Scan(); 346 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 347 ResultScanner scanner1 = htable1.getScanner(scan1); 348 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 349 for (Result result : res1) { 350 Put put = new Put(result.getRow()); 351 for (Cell cell : result.rawCells()) { 352 put.add(cell); 353 } 354 puts.add(put); 355 } 356 scanner1.close(); 357 assertEquals(NB_ROWS_IN_BATCH, res1.length); 358 359 // Copy the data to htable3 360 htable3.put(puts); 361 362 Scan scan2 = new Scan(); 363 ResultScanner scanner2 = htable3.getScanner(scan2); 364 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 365 scanner2.close(); 366 assertEquals(NB_ROWS_IN_BATCH, res2.length); 367 } 368 369 @Test 370 public void testVerifyRepJobWithPeerTableName() throws Exception { 371 // Populate the tables with same data 372 runBatchCopyTest(); 373 374 // with a peerTableName along with quorum address (a cluster key) 375 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 376 UTIL2.getClusterKey(), tableName.getNameAsString() }; 377 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 378 379 UTIL2.deleteTableData(peerTableName); 380 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 381 } 382 383 @Test 384 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 385 // Populate the tables with same data 386 runBatchCopyTest(); 387 388 // Take source and target tables snapshot 389 Path rootDir = CommonFSUtils.getRootDir(CONF1); 390 FileSystem fs = rootDir.getFileSystem(CONF1); 391 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 392 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 393 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 394 395 // Take target snapshot 396 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 397 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 398 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 399 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 400 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 401 402 String peerFSAddress = peerFs.getUri().toString(); 403 String tmpPath1 = UTIL1.getRandomDir().toString(); 404 String tmpPath2 = "/tmp" + System.currentTimeMillis(); 405 406 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 407 "--sourceSnapshotName=" + sourceSnapshotName, 408 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 409 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 410 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 411 tableName.getNameAsString() }; 412 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 413 checkRestoreTmpDir(CONF1, tmpPath1, 1); 414 checkRestoreTmpDir(CONF2, tmpPath2, 1); 415 416 Scan scan = new Scan(); 417 ResultScanner rs = htable3.getScanner(scan); 418 Put put = null; 419 for (Result result : rs) { 420 put = new Put(result.getRow()); 421 Cell firstVal = result.rawCells()[0]; 422 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 423 Bytes.toBytes("diff data")); 424 htable3.put(put); 425 } 426 Delete delete = new Delete(put.getRow()); 427 htable3.delete(delete); 428 429 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 430 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 431 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 432 433 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 434 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 435 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 436 437 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 438 "--sourceSnapshotName=" + sourceSnapshotName, 439 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 440 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 441 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 442 tableName.getNameAsString() }; 443 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 444 checkRestoreTmpDir(CONF1, tmpPath1, 2); 445 checkRestoreTmpDir(CONF2, tmpPath2, 2); 446 } 447 448 @AfterClass 449 public static void tearDownAfterClass() throws Exception { 450 htable3.close(); 451 TestReplicationBase.tearDownAfterClass(); 452 } 453}