001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.TreeMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Delete; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.ResultScanner; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 057import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.testclassification.ReplicationTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.FSUtils; 062import org.apache.hadoop.mapreduce.Job; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 075 076@Category({ ReplicationTests.class, LargeTests.class }) 077public class TestVerifyReplication extends TestReplicationBase { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestVerifyReplication.class); 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); 084 085 private static final String PEER_ID = "2"; 086 private static final TableName peerTableName = TableName.valueOf("peerTest"); 087 private static Table htable3; 088 089 @Rule 090 public TestName name = new TestName(); 091 092 @Before 093 public void setUp() throws Exception { 094 cleanUp(); 095 utility2.deleteTableData(peerTableName); 096 } 097 098 @BeforeClass 099 public static void setUpBeforeClass() throws Exception { 100 TestReplicationBase.setUpBeforeClass(); 101 102 TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily( 103 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100) 104 .build()).build(); 105 106 Connection connection2 = ConnectionFactory.createConnection(conf2); 107 try (Admin admin2 = connection2.getAdmin()) { 108 admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 109 } 110 htable3 = connection2.getTable(peerTableName); 111 } 112 113 private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 114 throws IOException, InterruptedException, ClassNotFoundException { 115 Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args); 116 if (job == null) { 117 fail("Job wasn't created, see the log"); 118 } 119 if (!job.waitForCompletion(true)) { 120 fail("Job failed, see the log"); 121 } 122 assertEquals(expectedGoodRows, 123 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 124 assertEquals(expectedBadRows, 125 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 126 } 127 128 /** 129 * Do a small loading into a table, make sure the data is really the same, then run the 130 * VerifyReplication job to check the results. Do a second comparison where all the cells are 131 * different. 132 */ 133 @Test 134 public void testVerifyRepJob() throws Exception { 135 // Populate the tables, at the same time it guarantees that the tables are 136 // identical since it does the check 137 runSmallBatchTest(); 138 139 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 140 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 141 142 Scan scan = new Scan(); 143 ResultScanner rs = htable2.getScanner(scan); 144 Put put = null; 145 for (Result result : rs) { 146 put = new Put(result.getRow()); 147 Cell firstVal = result.rawCells()[0]; 148 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 149 Bytes.toBytes("diff data")); 150 htable2.put(put); 151 } 152 Delete delete = new Delete(put.getRow()); 153 htable2.delete(delete); 154 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 155 } 156 157 /** 158 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 159 * delete marker is replicated, run verify replication with and without raw to check the results. 160 */ 161 @Test 162 public void testVerifyRepJobWithRawOptions() throws Exception { 163 LOG.info(name.getMethodName()); 164 165 final TableName tableName = TableName.valueOf(name.getMethodName()); 166 byte[] familyname = Bytes.toBytes("fam_raw"); 167 byte[] row = Bytes.toBytes("row_raw"); 168 169 Table lHtable1 = null; 170 Table lHtable2 = null; 171 172 try { 173 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 174 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 175 TableDescriptor table = 176 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 177 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 178 for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { 179 scopes.put(f.getName(), f.getScope()); 180 } 181 182 Connection connection1 = ConnectionFactory.createConnection(conf1); 183 Connection connection2 = ConnectionFactory.createConnection(conf2); 184 try (Admin admin1 = connection1.getAdmin()) { 185 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 186 } 187 try (Admin admin2 = connection2.getAdmin()) { 188 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 189 } 190 utility1.waitUntilAllRegionsAssigned(tableName); 191 utility2.waitUntilAllRegionsAssigned(tableName); 192 193 lHtable1 = utility1.getConnection().getTable(tableName); 194 lHtable2 = utility2.getConnection().getTable(tableName); 195 196 Put put = new Put(row); 197 put.addColumn(familyname, row, row); 198 lHtable1.put(put); 199 200 Get get = new Get(row); 201 for (int i = 0; i < NB_RETRIES; i++) { 202 if (i == NB_RETRIES - 1) { 203 fail("Waited too much time for put replication"); 204 } 205 Result res = lHtable2.get(get); 206 if (res.isEmpty()) { 207 LOG.info("Row not available"); 208 Thread.sleep(SLEEP_TIME); 209 } else { 210 assertArrayEquals(res.value(), row); 211 break; 212 } 213 } 214 215 Delete del = new Delete(row); 216 lHtable1.delete(del); 217 218 get = new Get(row); 219 for (int i = 0; i < NB_RETRIES; i++) { 220 if (i == NB_RETRIES - 1) { 221 fail("Waited too much time for del replication"); 222 } 223 Result res = lHtable2.get(get); 224 if (res.size() >= 1) { 225 LOG.info("Row not deleted"); 226 Thread.sleep(SLEEP_TIME); 227 } else { 228 break; 229 } 230 } 231 232 // Checking verifyReplication for the default behavior. 233 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 234 runVerifyReplication(argsWithoutRaw, 0, 0); 235 236 // Checking verifyReplication with raw 237 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 238 runVerifyReplication(argsWithRawAsTrue, 1, 0); 239 } finally { 240 if (lHtable1 != null) { 241 lHtable1.close(); 242 } 243 if (lHtable2 != null) { 244 lHtable2.close(); 245 } 246 } 247 } 248 249 // VerifyReplication should honor versions option 250 @Test 251 public void testHBase14905() throws Exception { 252 // normal Batch tests 253 byte[] qualifierName = Bytes.toBytes("f1"); 254 Put put = new Put(Bytes.toBytes("r1")); 255 long ts = System.currentTimeMillis(); 256 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002")); 257 htable1.put(put); 258 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001")); 259 htable1.put(put); 260 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112")); 261 htable1.put(put); 262 263 Scan scan = new Scan(); 264 scan.readVersions(100); 265 ResultScanner scanner1 = htable1.getScanner(scan); 266 Result[] res1 = scanner1.next(1); 267 scanner1.close(); 268 269 assertEquals(1, res1.length); 270 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 271 272 for (int i = 0; i < NB_RETRIES; i++) { 273 scan = new Scan(); 274 scan.readVersions(100); 275 scanner1 = htable2.getScanner(scan); 276 res1 = scanner1.next(1); 277 scanner1.close(); 278 if (res1.length != 1) { 279 LOG.info("Only got " + res1.length + " rows"); 280 Thread.sleep(SLEEP_TIME); 281 } else { 282 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 283 if (cellNumber != 3) { 284 LOG.info("Only got " + cellNumber + " cells"); 285 Thread.sleep(SLEEP_TIME); 286 } else { 287 break; 288 } 289 } 290 if (i == NB_RETRIES - 1) { 291 fail("Waited too much time for normal batch replication"); 292 } 293 } 294 295 put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111")); 296 htable2.put(put); 297 put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112")); 298 htable2.put(put); 299 300 scan = new Scan(); 301 scan.readVersions(100); 302 scanner1 = htable2.getScanner(scan); 303 res1 = scanner1.next(NB_ROWS_IN_BATCH); 304 scanner1.close(); 305 306 assertEquals(1, res1.length); 307 assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); 308 309 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 310 runVerifyReplication(args, 0, 1); 311 } 312 313 // VerifyReplication should honor versions option 314 @Test 315 public void testVersionMismatchHBase14905() throws Exception { 316 // normal Batch tests 317 byte[] qualifierName = Bytes.toBytes("f1"); 318 Put put = new Put(Bytes.toBytes("r1")); 319 long ts = System.currentTimeMillis(); 320 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1")); 321 htable1.put(put); 322 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2")); 323 htable1.put(put); 324 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3")); 325 htable1.put(put); 326 327 Scan scan = new Scan(); 328 scan.readVersions(100); 329 ResultScanner scanner1 = htable1.getScanner(scan); 330 Result[] res1 = scanner1.next(1); 331 scanner1.close(); 332 333 assertEquals(1, res1.length); 334 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 335 336 for (int i = 0; i < NB_RETRIES; i++) { 337 scan = new Scan(); 338 scan.readVersions(100); 339 scanner1 = htable2.getScanner(scan); 340 res1 = scanner1.next(1); 341 scanner1.close(); 342 if (res1.length != 1) { 343 LOG.info("Only got " + res1.length + " rows"); 344 Thread.sleep(SLEEP_TIME); 345 } else { 346 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 347 if (cellNumber != 3) { 348 LOG.info("Only got " + cellNumber + " cells"); 349 Thread.sleep(SLEEP_TIME); 350 } else { 351 break; 352 } 353 } 354 if (i == NB_RETRIES - 1) { 355 fail("Waited too much time for normal batch replication"); 356 } 357 } 358 359 try { 360 // Disabling replication and modifying the particular version of the cell to validate the 361 // feature. 362 hbaseAdmin.disableReplicationPeer(PEER_ID); 363 Put put2 = new Put(Bytes.toBytes("r1")); 364 put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99")); 365 htable2.put(put2); 366 367 scan = new Scan(); 368 scan.readVersions(100); 369 scanner1 = htable2.getScanner(scan); 370 res1 = scanner1.next(NB_ROWS_IN_BATCH); 371 scanner1.close(); 372 assertEquals(1, res1.length); 373 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 374 375 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 376 runVerifyReplication(args, 0, 1); 377 } finally { 378 hbaseAdmin.enableReplicationPeer(PEER_ID); 379 } 380 } 381 382 @Test 383 public void testVerifyReplicationPrefixFiltering() throws Exception { 384 final byte[] prefixRow = Bytes.toBytes("prefixrow"); 385 final byte[] prefixRow2 = Bytes.toBytes("secondrow"); 386 loadData("prefixrow", prefixRow); 387 loadData("secondrow", prefixRow2); 388 loadData("aaa", row); 389 loadData("zzz", row); 390 waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); 391 String[] args = 392 new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() }; 393 runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); 394 } 395 396 @Test 397 public void testVerifyReplicationSnapshotArguments() { 398 String[] args = 399 new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 400 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 401 402 args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() }; 403 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 404 405 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2", 406 tableName.getNameAsString() }; 407 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 408 409 args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 410 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 411 412 args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() }; 413 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 414 415 args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/", 416 "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", 417 tableName.getNameAsString() }; 418 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 419 420 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/", 421 "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs", 422 "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() }; 423 424 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 425 } 426 427 private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 428 throws IOException { 429 FileSystem fs = FileSystem.get(conf); 430 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 431 assertNotNull(subDirectories); 432 assertEquals(subDirectories.length, expectedCount); 433 for (int i = 0; i < expectedCount; i++) { 434 assertTrue(subDirectories[i].isDirectory()); 435 } 436 } 437 438 @Test 439 public void testVerifyReplicationWithSnapshotSupport() throws Exception { 440 // Populate the tables, at the same time it guarantees that the tables are 441 // identical since it does the check 442 runSmallBatchTest(); 443 444 // Take source and target tables snapshot 445 Path rootDir = FSUtils.getRootDir(conf1); 446 FileSystem fs = rootDir.getFileSystem(conf1); 447 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 448 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 449 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 450 451 // Take target snapshot 452 Path peerRootDir = FSUtils.getRootDir(conf2); 453 FileSystem peerFs = peerRootDir.getFileSystem(conf2); 454 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 455 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 456 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 457 458 String peerFSAddress = peerFs.getUri().toString(); 459 String temPath1 = utility1.getRandomDir().toString(); 460 String temPath2 = "/tmp" + System.currentTimeMillis(); 461 462 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 463 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 464 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 465 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; 466 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 467 checkRestoreTmpDir(conf1, temPath1, 1); 468 checkRestoreTmpDir(conf2, temPath2, 1); 469 470 Scan scan = new Scan(); 471 ResultScanner rs = htable2.getScanner(scan); 472 Put put = null; 473 for (Result result : rs) { 474 put = new Put(result.getRow()); 475 Cell firstVal = result.rawCells()[0]; 476 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 477 Bytes.toBytes("diff data")); 478 htable2.put(put); 479 } 480 Delete delete = new Delete(put.getRow()); 481 htable2.delete(delete); 482 483 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 484 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 485 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 486 487 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 488 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 489 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 490 491 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 492 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 493 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 494 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; 495 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 496 checkRestoreTmpDir(conf1, temPath1, 2); 497 checkRestoreTmpDir(conf2, temPath2, 2); 498 } 499 500 @Test 501 public void testVerifyRepJobWithQuorumAddress() throws Exception { 502 // Populate the tables, at the same time it guarantees that the tables are 503 // identical since it does the check 504 runSmallBatchTest(); 505 506 // with a quorum address (a cluster key) 507 String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() }; 508 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 509 510 Scan scan = new Scan(); 511 ResultScanner rs = htable2.getScanner(scan); 512 Put put = null; 513 for (Result result : rs) { 514 put = new Put(result.getRow()); 515 Cell firstVal = result.rawCells()[0]; 516 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 517 Bytes.toBytes("diff data")); 518 htable2.put(put); 519 } 520 Delete delete = new Delete(put.getRow()); 521 htable2.delete(delete); 522 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 523 } 524 525 @Test 526 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 527 // Populate the tables, at the same time it guarantees that the tables are 528 // identical since it does the check 529 runSmallBatchTest(); 530 531 // Take source and target tables snapshot 532 Path rootDir = FSUtils.getRootDir(conf1); 533 FileSystem fs = rootDir.getFileSystem(conf1); 534 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 535 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 536 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 537 538 // Take target snapshot 539 Path peerRootDir = FSUtils.getRootDir(conf2); 540 FileSystem peerFs = peerRootDir.getFileSystem(conf2); 541 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 542 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 543 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 544 545 String peerFSAddress = peerFs.getUri().toString(); 546 String tmpPath1 = utility1.getRandomDir().toString(); 547 String tmpPath2 = "/tmp" + System.currentTimeMillis(); 548 549 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 550 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 551 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 552 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), 553 tableName.getNameAsString() }; 554 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 555 checkRestoreTmpDir(conf1, tmpPath1, 1); 556 checkRestoreTmpDir(conf2, tmpPath2, 1); 557 558 Scan scan = new Scan(); 559 ResultScanner rs = htable2.getScanner(scan); 560 Put put = null; 561 for (Result result : rs) { 562 put = new Put(result.getRow()); 563 Cell firstVal = result.rawCells()[0]; 564 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 565 Bytes.toBytes("diff data")); 566 htable2.put(put); 567 } 568 Delete delete = new Delete(put.getRow()); 569 htable2.delete(delete); 570 571 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 572 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 573 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 574 575 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 576 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 577 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 578 579 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 580 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 581 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 582 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), 583 tableName.getNameAsString() }; 584 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 585 checkRestoreTmpDir(conf1, tmpPath1, 2); 586 checkRestoreTmpDir(conf2, tmpPath2, 2); 587 } 588 589 private static void runBatchCopyTest() throws Exception { 590 // normal Batch tests for htable1 591 loadData("", row, noRepfamName); 592 593 Scan scan1 = new Scan(); 594 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 595 ResultScanner scanner1 = htable1.getScanner(scan1); 596 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 597 for (Result result : res1) { 598 Put put = new Put(result.getRow()); 599 for (Cell cell : result.rawCells()) { 600 put.add(cell); 601 } 602 puts.add(put); 603 } 604 scanner1.close(); 605 assertEquals(NB_ROWS_IN_BATCH, res1.length); 606 607 // Copy the data to htable3 608 htable3.put(puts); 609 610 Scan scan2 = new Scan(); 611 ResultScanner scanner2 = htable3.getScanner(scan2); 612 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 613 scanner2.close(); 614 assertEquals(NB_ROWS_IN_BATCH, res2.length); 615 } 616 617 @Test 618 public void testVerifyRepJobWithPeerTableName() throws Exception { 619 // Populate the tables with same data 620 runBatchCopyTest(); 621 622 // with a peerTableName along with quorum address (a cluster key) 623 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 624 utility2.getClusterKey(), tableName.getNameAsString() }; 625 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 626 627 utility2.deleteTableData(peerTableName); 628 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 629 } 630 631 @Test 632 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 633 // Populate the tables with same data 634 runBatchCopyTest(); 635 636 // Take source and target tables snapshot 637 Path rootDir = FSUtils.getRootDir(conf1); 638 FileSystem fs = rootDir.getFileSystem(conf1); 639 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 640 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 641 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 642 643 // Take target snapshot 644 Path peerRootDir = FSUtils.getRootDir(conf2); 645 FileSystem peerFs = peerRootDir.getFileSystem(conf2); 646 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 647 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, 648 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 649 650 String peerFSAddress = peerFs.getUri().toString(); 651 String tmpPath1 = utility1.getRandomDir().toString(); 652 String tmpPath2 = "/tmp" + System.currentTimeMillis(); 653 654 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 655 "--sourceSnapshotName=" + sourceSnapshotName, 656 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 657 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 658 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), 659 tableName.getNameAsString() }; 660 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 661 checkRestoreTmpDir(conf1, tmpPath1, 1); 662 checkRestoreTmpDir(conf2, tmpPath2, 1); 663 664 Scan scan = new Scan(); 665 ResultScanner rs = htable3.getScanner(scan); 666 Put put = null; 667 for (Result result : rs) { 668 put = new Put(result.getRow()); 669 Cell firstVal = result.rawCells()[0]; 670 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 671 Bytes.toBytes("diff data")); 672 htable3.put(put); 673 } 674 Delete delete = new Delete(put.getRow()); 675 htable3.delete(delete); 676 677 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 678 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 679 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 680 681 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 682 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, 683 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 684 685 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 686 "--sourceSnapshotName=" + sourceSnapshotName, 687 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 688 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 689 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), 690 tableName.getNameAsString() }; 691 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 692 checkRestoreTmpDir(conf1, tmpPath1, 2); 693 checkRestoreTmpDir(conf2, tmpPath2, 2); 694 } 695 696 @AfterClass 697 public static void tearDownAfterClass() throws Exception { 698 htable3.close(); 699 TestReplicationBase.tearDownAfterClass(); 700 } 701}