001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.mapreduce.Counters; 058import org.apache.hadoop.mapreduce.Job; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Test; 063import org.junit.jupiter.api.TestInfo; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067public abstract class VerifyReplicationTestBase extends TestReplicationBase { 068 069 private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationTestBase.class); 070 071 private static final String PEER_ID = "2"; 072 private static final TableName peerTableName = TableName.valueOf("peerTest"); 073 private static Table htable3; 074 075 @BeforeEach 076 public void setUp() throws Exception { 077 cleanUp(); 078 UTIL2.deleteTableData(peerTableName); 079 } 080 081 @BeforeAll 082 public static void setUpBeforeClass() throws Exception { 083 TableDescriptor peerTable = 084 TableDescriptorBuilder.newBuilder(peerTableName) 085 .setColumnFamily( 086 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 087 .build(); 088 089 Connection connection2 = ConnectionFactory.createConnection(CONF2); 090 try (Admin admin2 = connection2.getAdmin()) { 091 admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 092 } 093 htable3 = connection2.getTable(peerTableName); 094 } 095 096 static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 097 throws IOException, InterruptedException, ClassNotFoundException { 098 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 099 if (job == null) { 100 fail("Job wasn't created, see the log"); 101 } 102 if (!job.waitForCompletion(true)) { 103 fail("Job failed, see the log"); 104 } 105 assertEquals(expectedGoodRows, 106 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 107 assertEquals(expectedBadRows, 108 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 109 return job.getCounters(); 110 } 111 112 /** 113 * Do a small loading into a table, make sure the data is really the same, then run the 114 * VerifyReplication job to check the results. Do a second comparison where all the cells are 115 * different. 116 */ 117 @Test 118 public void testVerifyRepJob() throws Exception { 119 // Populate the tables, at the same time it guarantees that the tables are 120 // identical since it does the check 121 runSmallBatchTest(); 122 123 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 124 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 125 126 Scan scan = new Scan(); 127 ResultScanner rs = htable2.getScanner(scan); 128 Put put = null; 129 for (Result result : rs) { 130 put = new Put(result.getRow()); 131 Cell firstVal = result.rawCells()[0]; 132 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 133 Bytes.toBytes("diff data")); 134 htable2.put(put); 135 } 136 Delete delete = new Delete(put.getRow()); 137 htable2.delete(delete); 138 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 139 } 140 141 /** 142 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 143 * delete marker is replicated, run verify replication with and without raw to check the results. 144 */ 145 @Test 146 public void testVerifyRepJobWithRawOptions(TestInfo testInfo) throws Exception { 147 LOG.info(testInfo.getTestMethod().get().getName()); 148 149 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 150 byte[] familyname = Bytes.toBytes("fam_raw"); 151 byte[] row = Bytes.toBytes("row_raw"); 152 153 Table lHtable1 = null; 154 Table lHtable2 = null; 155 156 try { 157 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 158 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 159 TableDescriptor table = 160 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 161 162 Connection connection1 = ConnectionFactory.createConnection(CONF1); 163 Connection connection2 = ConnectionFactory.createConnection(CONF2); 164 try (Admin admin1 = connection1.getAdmin()) { 165 admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 166 } 167 try (Admin admin2 = connection2.getAdmin()) { 168 admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 169 } 170 UTIL1.waitUntilAllRegionsAssigned(tableName); 171 UTIL2.waitUntilAllRegionsAssigned(tableName); 172 173 lHtable1 = UTIL1.getConnection().getTable(tableName); 174 lHtable2 = UTIL2.getConnection().getTable(tableName); 175 176 Put put = new Put(row); 177 put.addColumn(familyname, row, row); 178 lHtable1.put(put); 179 180 Get get = new Get(row); 181 for (int i = 0; i < NB_RETRIES; i++) { 182 if (i == NB_RETRIES - 1) { 183 fail("Waited too much time for put replication"); 184 } 185 Result res = lHtable2.get(get); 186 if (res.isEmpty()) { 187 LOG.info("Row not available"); 188 Thread.sleep(SLEEP_TIME); 189 } else { 190 assertArrayEquals(res.value(), row); 191 break; 192 } 193 } 194 195 Delete del = new Delete(row); 196 lHtable1.delete(del); 197 198 get = new Get(row); 199 for (int i = 0; i < NB_RETRIES; i++) { 200 if (i == NB_RETRIES - 1) { 201 fail("Waited too much time for del replication"); 202 } 203 Result res = lHtable2.get(get); 204 if (res.size() >= 1) { 205 LOG.info("Row not deleted"); 206 Thread.sleep(SLEEP_TIME); 207 } else { 208 break; 209 } 210 } 211 212 // Checking verifyReplication for the default behavior. 213 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 214 runVerifyReplication(argsWithoutRaw, 0, 0); 215 216 // Checking verifyReplication with raw 217 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 218 runVerifyReplication(argsWithRawAsTrue, 1, 0); 219 } finally { 220 if (lHtable1 != null) { 221 lHtable1.close(); 222 } 223 if (lHtable2 != null) { 224 lHtable2.close(); 225 } 226 } 227 } 228 229 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 230 throws IOException { 231 FileSystem fs = FileSystem.get(conf); 232 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 233 assertNotNull(subDirectories); 234 assertEquals(subDirectories.length, expectedCount); 235 for (int i = 0; i < expectedCount; i++) { 236 assertTrue(subDirectories[i].isDirectory()); 237 } 238 } 239 240 @Test 241 public void testVerifyRepJobWithQuorumAddress() throws Exception { 242 // Populate the tables, at the same time it guarantees that the tables are 243 // identical since it does the check 244 runSmallBatchTest(); 245 246 // with a quorum address (a cluster key) 247 String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() }; 248 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 249 250 Scan scan = new Scan(); 251 ResultScanner rs = htable2.getScanner(scan); 252 Put put = null; 253 for (Result result : rs) { 254 put = new Put(result.getRow()); 255 Cell firstVal = result.rawCells()[0]; 256 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 257 Bytes.toBytes("diff data")); 258 htable2.put(put); 259 } 260 Delete delete = new Delete(put.getRow()); 261 htable2.delete(delete); 262 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 263 } 264 265 @Test 266 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 267 // Populate the tables, at the same time it guarantees that the tables are 268 // identical since it does the check 269 runSmallBatchTest(); 270 271 // Take source and target tables snapshot 272 Path rootDir = CommonFSUtils.getRootDir(CONF1); 273 FileSystem fs = rootDir.getFileSystem(CONF1); 274 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 275 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 276 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 277 278 // Take target snapshot 279 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 280 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 281 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 282 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 283 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 284 285 String peerFSAddress = peerFs.getUri().toString(); 286 String tmpPath1 = UTIL1.getRandomDir().toString(); 287 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 288 289 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 290 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 291 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 292 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 293 tableName.getNameAsString() }; 294 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 295 checkRestoreTmpDir(CONF1, tmpPath1, 1); 296 checkRestoreTmpDir(CONF2, tmpPath2, 1); 297 298 Scan scan = new Scan(); 299 ResultScanner rs = htable2.getScanner(scan); 300 Put put = null; 301 for (Result result : rs) { 302 put = new Put(result.getRow()); 303 Cell firstVal = result.rawCells()[0]; 304 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 305 Bytes.toBytes("diff data")); 306 htable2.put(put); 307 } 308 Delete delete = new Delete(put.getRow()); 309 htable2.delete(delete); 310 311 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 312 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 313 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 314 315 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 316 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 317 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 318 319 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 320 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 321 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 322 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 323 tableName.getNameAsString() }; 324 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 325 checkRestoreTmpDir(CONF1, tmpPath1, 2); 326 checkRestoreTmpDir(CONF2, tmpPath2, 2); 327 } 328 329 static void runBatchCopyTest() throws Exception { 330 // normal Batch tests for htable1 331 loadData("", row, noRepfamName); 332 333 Scan scan1 = new Scan(); 334 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 335 ResultScanner scanner1 = htable1.getScanner(scan1); 336 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 337 for (Result result : res1) { 338 Put put = new Put(result.getRow()); 339 for (Cell cell : result.rawCells()) { 340 put.add(cell); 341 } 342 puts.add(put); 343 } 344 scanner1.close(); 345 assertEquals(NB_ROWS_IN_BATCH, res1.length); 346 347 // Copy the data to htable3 348 htable3.put(puts); 349 350 Scan scan2 = new Scan(); 351 ResultScanner scanner2 = htable3.getScanner(scan2); 352 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 353 scanner2.close(); 354 assertEquals(NB_ROWS_IN_BATCH, res2.length); 355 } 356 357 @Test 358 public void testVerifyRepJobWithPeerTableName() throws Exception { 359 // Populate the tables with same data 360 runBatchCopyTest(); 361 362 // with a peerTableName along with quorum address (a cluster key) 363 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 364 getClusterKey(UTIL2), tableName.getNameAsString() }; 365 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 366 367 UTIL2.deleteTableData(peerTableName); 368 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 369 } 370 371 @Test 372 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 373 // Populate the tables with same data 374 runBatchCopyTest(); 375 376 // Take source and target tables snapshot 377 Path rootDir = CommonFSUtils.getRootDir(CONF1); 378 FileSystem fs = rootDir.getFileSystem(CONF1); 379 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 380 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 381 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 382 383 // Take target snapshot 384 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 385 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 386 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 387 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 388 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 389 390 String peerFSAddress = peerFs.getUri().toString(); 391 String tmpPath1 = UTIL1.getRandomDir().toString(); 392 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 393 394 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 395 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 396 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 397 "--peerFSAddress=" + peerFSAddress, 398 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 399 tableName.getNameAsString() }; 400 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 401 checkRestoreTmpDir(CONF1, tmpPath1, 1); 402 checkRestoreTmpDir(CONF2, tmpPath2, 1); 403 404 Scan scan = new Scan(); 405 ResultScanner rs = htable3.getScanner(scan); 406 Put put = null; 407 for (Result result : rs) { 408 put = new Put(result.getRow()); 409 Cell firstVal = result.rawCells()[0]; 410 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 411 Bytes.toBytes("diff data")); 412 htable3.put(put); 413 } 414 Delete delete = new Delete(put.getRow()); 415 htable3.delete(delete); 416 417 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 418 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 419 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 420 421 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 422 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 423 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 424 425 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 426 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 427 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 428 "--peerFSAddress=" + peerFSAddress, 429 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 430 tableName.getNameAsString() }; 431 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 432 checkRestoreTmpDir(CONF1, tmpPath1, 2); 433 checkRestoreTmpDir(CONF2, tmpPath2, 2); 434 } 435 436 @Test 437 public void testVerifyReplicationThreadedRecompares() throws Exception { 438 // Populate the tables with same data 439 runBatchCopyTest(); 440 441 // ONLY_IN_PEER_TABLE_ROWS 442 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 443 put.addColumn(noRepfamName, row, row); 444 htable3.put(put); 445 446 // CONTENT_DIFFERENT_ROWS 447 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 448 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 449 htable3.put(put); 450 451 // ONLY_IN_SOURCE_TABLE_ROWS 452 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 453 put.addColumn(noRepfamName, row, row); 454 htable1.put(put); 455 456 String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", 457 "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), 458 getClusterKey(UTIL2), tableName.getNameAsString() }; 459 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 460 assertEquals(9, 461 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 462 assertEquals(9, 463 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 464 assertEquals(1, 465 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 466 assertEquals(1, 467 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 468 assertEquals(1, counters 469 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 470 } 471 472 @Test 473 public void testFailsRemainingComparesAfterShutdown() throws Exception { 474 // Populate the tables with same data 475 runBatchCopyTest(); 476 477 // ONLY_IN_PEER_TABLE_ROWS 478 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 479 put.addColumn(noRepfamName, row, row); 480 htable3.put(put); 481 482 // CONTENT_DIFFERENT_ROWS 483 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 484 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 485 htable3.put(put); 486 487 // ONLY_IN_SOURCE_TABLE_ROWS 488 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 489 put.addColumn(noRepfamName, row, row); 490 htable1.put(put); 491 492 /** 493 * recompareSleep is set to exceed how long we wait on 494 * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to 495 * test the counter-incrementing logic if the executor still hasn't terminated after the call to 496 * shutdown and awaitTermination 497 */ 498 String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", 499 "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), 500 getClusterKey(UTIL2), tableName.getNameAsString() }; 501 502 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 503 assertEquals(3, 504 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 505 assertEquals(3, 506 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 507 assertEquals(1, 508 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 509 assertEquals(1, 510 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 511 assertEquals(1, counters 512 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 513 } 514 515 @Test 516 public void testVerifyReplicationSynchronousRecompares() throws Exception { 517 // Populate the tables with same data 518 runBatchCopyTest(); 519 520 // ONLY_IN_PEER_TABLE_ROWS 521 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 522 put.addColumn(noRepfamName, row, row); 523 htable3.put(put); 524 525 // CONTENT_DIFFERENT_ROWS 526 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 527 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 528 htable3.put(put); 529 530 // ONLY_IN_SOURCE_TABLE_ROWS 531 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 532 put.addColumn(noRepfamName, row, row); 533 htable1.put(put); 534 535 String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", 536 "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2), 537 tableName.getNameAsString() }; 538 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 539 assertEquals(9, 540 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); 541 assertEquals(9, 542 counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); 543 assertEquals(1, 544 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue()); 545 assertEquals(1, 546 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue()); 547 assertEquals(1, counters 548 .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); 549 } 550 551 @AfterAll 552 public static void tearDownAfterClass() throws Exception { 553 htable3.close(); 554 TestReplicationBase.tearDownAfterClass(); 555 } 556}