001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.mapreduce.Counters; 058import org.apache.hadoop.mapreduce.Job; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Test; 063import org.junit.jupiter.api.TestInfo; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067public abstract class VerifyReplicationTestBase extends TestReplicationBase { 068 069 private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationTestBase.class); 070 071 private static final String PEER_ID = "2"; 072 private static final TableName peerTableName = TableName.valueOf("peerTest"); 073 private static Table htable3; 074 075 @BeforeEach 076 public void setUp() throws Exception { 077 cleanUp(); 078 UTIL2.deleteTableData(peerTableName); 079 } 080 081 @BeforeAll 082 public static void setUpBeforeClass() throws Exception { 083 084 TableDescriptor peerTable = 085 TableDescriptorBuilder.newBuilder(peerTableName) 086 .setColumnFamily( 087 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 088 .build(); 089 090 Connection connection2 = ConnectionFactory.createConnection(CONF2); 091 try (Admin admin2 = connection2.getAdmin()) { 092 admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 093 } 094 htable3 = connection2.getTable(peerTableName); 095 } 096 097 static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 098 throws IOException, InterruptedException, ClassNotFoundException { 099 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 100 if (job == null) { 101 fail("Job wasn't created, see the log"); 102 } 103 if (!job.waitForCompletion(true)) { 104 fail("Job failed, see the log"); 105 } 106 assertEquals(expectedGoodRows, 107 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 108 assertEquals(expectedBadRows, 109 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 110 return job.getCounters(); 111 } 112 113 /** 114 * Do a small loading into a table, make sure the data is really the same, then run the 115 * VerifyReplication job to check the results. Do a second comparison where all the cells are 116 * different. 117 */ 118 @Test 119 public void testVerifyRepJob() throws Exception { 120 // Populate the tables, at the same time it guarantees that the tables are 121 // identical since it does the check 122 runSmallBatchTest(); 123 124 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 125 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 126 127 Scan scan = new Scan(); 128 ResultScanner rs = htable2.getScanner(scan); 129 Put put = null; 130 for (Result result : rs) { 131 put = new Put(result.getRow()); 132 Cell firstVal = result.rawCells()[0]; 133 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 134 Bytes.toBytes("diff data")); 135 htable2.put(put); 136 } 137 Delete delete = new Delete(put.getRow()); 138 htable2.delete(delete); 139 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 140 } 141 142 /** 143 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 144 * delete marker is replicated, run verify replication with and without raw to check the results. 145 */ 146 @Test 147 public void testVerifyRepJobWithRawOptions(TestInfo testInfo) throws Exception { 148 LOG.info(testInfo.getTestMethod().get().getName()); 149 150 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 151 byte[] familyname = Bytes.toBytes("fam_raw"); 152 byte[] row = Bytes.toBytes("row_raw"); 153 154 Table lHtable1 = null; 155 Table lHtable2 = null; 156 157 try { 158 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 159 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 160 TableDescriptor table = 161 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 162 163 Connection connection1 = ConnectionFactory.createConnection(CONF1); 164 Connection connection2 = ConnectionFactory.createConnection(CONF2); 165 try (Admin admin1 = connection1.getAdmin()) { 166 admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 167 } 168 try (Admin admin2 = connection2.getAdmin()) { 169 admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 170 } 171 UTIL1.waitUntilAllRegionsAssigned(tableName); 172 UTIL2.waitUntilAllRegionsAssigned(tableName); 173 174 lHtable1 = UTIL1.getConnection().getTable(tableName); 175 lHtable2 = UTIL2.getConnection().getTable(tableName); 176 177 Put put = new Put(row); 178 put.addColumn(familyname, row, row); 179 lHtable1.put(put); 180 181 Get get = new Get(row); 182 for (int i = 0; i < NB_RETRIES; i++) { 183 if (i == NB_RETRIES - 1) { 184 fail("Waited too much time for put replication"); 185 } 186 Result res = lHtable2.get(get); 187 if (res.isEmpty()) { 188 LOG.info("Row not available"); 189 Thread.sleep(SLEEP_TIME); 190 } else { 191 assertArrayEquals(res.value(), row); 192 break; 193 } 194 } 195 196 Delete del = new Delete(row); 197 lHtable1.delete(del); 198 199 get = new Get(row); 200 for (int i = 0; i < NB_RETRIES; i++) { 201 if (i == NB_RETRIES - 1) { 202 fail("Waited too much time for del replication"); 203 } 204 Result res = lHtable2.get(get); 205 if (res.size() >= 1) { 206 LOG.info("Row not deleted"); 207 Thread.sleep(SLEEP_TIME); 208 } else { 209 break; 210 } 211 } 212 213 // Checking verifyReplication for the default behavior. 214 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 215 runVerifyReplication(argsWithoutRaw, 0, 0); 216 217 // Checking verifyReplication with raw 218 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 219 runVerifyReplication(argsWithRawAsTrue, 1, 0); 220 } finally { 221 if (lHtable1 != null) { 222 lHtable1.close(); 223 } 224 if (lHtable2 != null) { 225 lHtable2.close(); 226 } 227 } 228 } 229 230 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 231 throws IOException { 232 FileSystem fs = FileSystem.get(conf); 233 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 234 assertNotNull(subDirectories); 235 assertEquals(subDirectories.length, expectedCount); 236 for (int i = 0; i < expectedCount; i++) { 237 assertTrue(subDirectories[i].isDirectory()); 238 } 239 } 240 241 @Test 242 public void testVerifyRepJobWithQuorumAddress() throws Exception { 243 // Populate the tables, at the same time it guarantees that the tables are 244 // identical since it does the check 245 runSmallBatchTest(); 246 247 // with a quorum address (a cluster key) 248 String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() }; 249 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 250 251 Scan scan = new Scan(); 252 ResultScanner rs = htable2.getScanner(scan); 253 Put put = null; 254 for (Result result : rs) { 255 put = new Put(result.getRow()); 256 Cell firstVal = result.rawCells()[0]; 257 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 258 Bytes.toBytes("diff data")); 259 htable2.put(put); 260 } 261 Delete delete = new Delete(put.getRow()); 262 htable2.delete(delete); 263 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 264 } 265 266 @Test 267 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 268 // Populate the tables, at the same time it guarantees that the tables are 269 // identical since it does the check 270 runSmallBatchTest(); 271 272 // Take source and target tables snapshot 273 Path rootDir = CommonFSUtils.getRootDir(CONF1); 274 FileSystem fs = rootDir.getFileSystem(CONF1); 275 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 276 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 277 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 278 279 // Take target snapshot 280 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 281 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 282 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 283 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 284 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 285 286 String peerFSAddress = peerFs.getUri().toString(); 287 String tmpPath1 = UTIL1.getRandomDir().toString(); 288 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 289 290 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 291 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 292 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 293 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 294 tableName.getNameAsString() }; 295 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 296 checkRestoreTmpDir(CONF1, tmpPath1, 1); 297 checkRestoreTmpDir(CONF2, tmpPath2, 1); 298 299 Scan scan = new Scan(); 300 ResultScanner rs = htable2.getScanner(scan); 301 Put put = null; 302 for (Result result : rs) { 303 put = new Put(result.getRow()); 304 Cell firstVal = result.rawCells()[0]; 305 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 306 Bytes.toBytes("diff data")); 307 htable2.put(put); 308 } 309 Delete delete = new Delete(put.getRow()); 310 htable2.delete(delete); 311 312 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 313 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 314 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 315 316 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 317 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 318 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 319 320 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 321 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 322 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 323 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 324 tableName.getNameAsString() }; 325 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 326 checkRestoreTmpDir(CONF1, tmpPath1, 2); 327 checkRestoreTmpDir(CONF2, tmpPath2, 2); 328 } 329 330 static void runBatchCopyTest() throws Exception { 331 // normal Batch tests for htable1 332 loadData("", row, noRepfamName); 333 334 Scan scan1 = new Scan(); 335 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 336 ResultScanner scanner1 = htable1.getScanner(scan1); 337 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 338 for (Result result : res1) { 339 Put put = new Put(result.getRow()); 340 for (Cell cell : result.rawCells()) { 341 put.add(cell); 342 } 343 puts.add(put); 344 } 345 scanner1.close(); 346 assertEquals(NB_ROWS_IN_BATCH, res1.length); 347 348 // Copy the data to htable3 349 htable3.put(puts); 350 351 Scan scan2 = new Scan(); 352 ResultScanner scanner2 = htable3.getScanner(scan2); 353 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 354 scanner2.close(); 355 assertEquals(NB_ROWS_IN_BATCH, res2.length); 356 } 357 358 @Test 359 public void testVerifyRepJobWithPeerTableName() throws Exception { 360 // Populate the tables with same data 361 runBatchCopyTest(); 362 363 // with a peerTableName along with quorum address (a cluster key) 364 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 365 getClusterKey(UTIL2), tableName.getNameAsString() }; 366 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 367 368 UTIL2.deleteTableData(peerTableName); 369 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 370 } 371 372 @Test 373 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 374 // Populate the tables with same data 375 runBatchCopyTest(); 376 377 // Take source and target tables snapshot 378 Path rootDir = CommonFSUtils.getRootDir(CONF1); 379 FileSystem fs = rootDir.getFileSystem(CONF1); 380 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 381 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 382 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 383 384 // Take target snapshot 385 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 386 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 387 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 388 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 389 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 390 391 String peerFSAddress = peerFs.getUri().toString(); 392 String tmpPath1 = UTIL1.getRandomDir().toString(); 393 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 394 395 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 396 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 397 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 398 "--peerFSAddress=" + peerFSAddress, 399 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 400 tableName.getNameAsString() }; 401 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 402 checkRestoreTmpDir(CONF1, tmpPath1, 1); 403 checkRestoreTmpDir(CONF2, tmpPath2, 1); 404 405 Scan scan = new Scan(); 406 ResultScanner rs = htable3.getScanner(scan); 407 Put put = null; 408 for (Result result : rs) { 409 put = new Put(result.getRow()); 410 Cell firstVal = result.rawCells()[0]; 411 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 412 Bytes.toBytes("diff data")); 413 htable3.put(put); 414 } 415 Delete delete = new Delete(put.getRow()); 416 htable3.delete(delete); 417 418 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 419 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 420 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 421 422 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 423 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 424 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 425 426 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 427 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 428 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 429 "--peerFSAddress=" + peerFSAddress, 430 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 431 tableName.getNameAsString() }; 432 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 433 checkRestoreTmpDir(CONF1, tmpPath1, 2); 434 checkRestoreTmpDir(CONF2, tmpPath2, 2); 435 } 436 437 @Test 438 public void testVerifyReplicationThreadedRecompares() throws Exception { 439 // Populate the tables with same data 440 runBatchCopyTest(); 441 442 // ONLY_IN_PEER_TABLE_ROWS 443 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 444 put.addColumn(noRepfamName, row, row); 445 htable3.put(put); 446 447 // CONTENT_DIFFERENT_ROWS 448 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 449 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 450 htable3.put(put); 451 452 // ONLY_IN_SOURCE_TABLE_ROWS 453 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 454 put.addColumn(noRepfamName, row, row); 455 htable1.put(put); 456 457 String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", 458 "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), 459 getClusterKey(UTIL2), tableName.getNameAsString() }; 460 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 461 assertEquals(9, 462 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 463 assertEquals(9, 464 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 465 assertEquals(1, 466 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 467 assertEquals(1, 468 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 469 assertEquals(1, counters 470 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 471 } 472 473 @Test 474 public void testFailsRemainingComparesAfterShutdown() throws Exception { 475 // Populate the tables with same data 476 runBatchCopyTest(); 477 478 // ONLY_IN_PEER_TABLE_ROWS 479 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 480 put.addColumn(noRepfamName, row, row); 481 htable3.put(put); 482 483 // CONTENT_DIFFERENT_ROWS 484 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 485 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 486 htable3.put(put); 487 488 // ONLY_IN_SOURCE_TABLE_ROWS 489 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 490 put.addColumn(noRepfamName, row, row); 491 htable1.put(put); 492 493 /** 494 * recompareSleep is set to exceed how long we wait on 495 * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to 496 * test the counter-incrementing logic if the executor still hasn't terminated after the call to 497 * shutdown and awaitTermination 498 */ 499 String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", 500 "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), 501 getClusterKey(UTIL2), tableName.getNameAsString() }; 502 503 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 504 assertEquals(3, 505 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 506 assertEquals(3, 507 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 508 assertEquals(1, 509 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 510 assertEquals(1, 511 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 512 assertEquals(1, counters 513 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 514 } 515 516 @Test 517 public void testVerifyReplicationSynchronousRecompares() throws Exception { 518 // Populate the tables with same data 519 runBatchCopyTest(); 520 521 // ONLY_IN_PEER_TABLE_ROWS 522 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 523 put.addColumn(noRepfamName, row, row); 524 htable3.put(put); 525 526 // CONTENT_DIFFERENT_ROWS 527 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 528 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 529 htable3.put(put); 530 531 // ONLY_IN_SOURCE_TABLE_ROWS 532 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 533 put.addColumn(noRepfamName, row, row); 534 htable1.put(put); 535 536 String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", 537 "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2), 538 tableName.getNameAsString() }; 539 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 540 assertEquals(9, 541 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 542 assertEquals(9, 543 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 544 assertEquals(1, 545 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 546 assertEquals(1, 547 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 548 assertEquals(1, counters 549 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 550 } 551 552 @AfterAll 553 public static void tearDownAfterClass() throws Exception { 554 htable3.close(); 555 TestReplicationBase.tearDownAfterClass(); 556 } 557}