001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.mapreduce.Counters;
058import org.apache.hadoop.mapreduce.Job;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067public abstract class VerifyReplicationTestBase extends TestReplicationBase {
068
069  private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationTestBase.class);
070
071  private static final String PEER_ID = "2";
072  private static final TableName peerTableName = TableName.valueOf("peerTest");
073  private static Table htable3;
074
075  @BeforeEach
076  public void setUp() throws Exception {
077    cleanUp();
078    UTIL2.deleteTableData(peerTableName);
079  }
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083
084    TableDescriptor peerTable =
085      TableDescriptorBuilder.newBuilder(peerTableName)
086        .setColumnFamily(
087          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
088        .build();
089
090    Connection connection2 = ConnectionFactory.createConnection(CONF2);
091    try (Admin admin2 = connection2.getAdmin()) {
092      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
093    }
094    htable3 = connection2.getTable(peerTableName);
095  }
096
097  static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
098    throws IOException, InterruptedException, ClassNotFoundException {
099    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
100    if (job == null) {
101      fail("Job wasn't created, see the log");
102    }
103    if (!job.waitForCompletion(true)) {
104      fail("Job failed, see the log");
105    }
106    assertEquals(expectedGoodRows,
107      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
108    assertEquals(expectedBadRows,
109      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
110    return job.getCounters();
111  }
112
113  /**
114   * Do a small loading into a table, make sure the data is really the same, then run the
115   * VerifyReplication job to check the results. Do a second comparison where all the cells are
116   * different.
117   */
118  @Test
119  public void testVerifyRepJob() throws Exception {
120    // Populate the tables, at the same time it guarantees that the tables are
121    // identical since it does the check
122    runSmallBatchTest();
123
124    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
125    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
126
127    Scan scan = new Scan();
128    ResultScanner rs = htable2.getScanner(scan);
129    Put put = null;
130    for (Result result : rs) {
131      put = new Put(result.getRow());
132      Cell firstVal = result.rawCells()[0];
133      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
134        Bytes.toBytes("diff data"));
135      htable2.put(put);
136    }
137    Delete delete = new Delete(put.getRow());
138    htable2.delete(delete);
139    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
140  }
141
142  /**
143   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
144   * delete marker is replicated, run verify replication with and without raw to check the results.
145   */
146  @Test
147  public void testVerifyRepJobWithRawOptions(TestInfo testInfo) throws Exception {
148    LOG.info(testInfo.getTestMethod().get().getName());
149
150    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
151    byte[] familyname = Bytes.toBytes("fam_raw");
152    byte[] row = Bytes.toBytes("row_raw");
153
154    Table lHtable1 = null;
155    Table lHtable2 = null;
156
157    try {
158      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
159        .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
160      TableDescriptor table =
161        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
162
163      Connection connection1 = ConnectionFactory.createConnection(CONF1);
164      Connection connection2 = ConnectionFactory.createConnection(CONF2);
165      try (Admin admin1 = connection1.getAdmin()) {
166        admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
167      }
168      try (Admin admin2 = connection2.getAdmin()) {
169        admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
170      }
171      UTIL1.waitUntilAllRegionsAssigned(tableName);
172      UTIL2.waitUntilAllRegionsAssigned(tableName);
173
174      lHtable1 = UTIL1.getConnection().getTable(tableName);
175      lHtable2 = UTIL2.getConnection().getTable(tableName);
176
177      Put put = new Put(row);
178      put.addColumn(familyname, row, row);
179      lHtable1.put(put);
180
181      Get get = new Get(row);
182      for (int i = 0; i < NB_RETRIES; i++) {
183        if (i == NB_RETRIES - 1) {
184          fail("Waited too much time for put replication");
185        }
186        Result res = lHtable2.get(get);
187        if (res.isEmpty()) {
188          LOG.info("Row not available");
189          Thread.sleep(SLEEP_TIME);
190        } else {
191          assertArrayEquals(res.value(), row);
192          break;
193        }
194      }
195
196      Delete del = new Delete(row);
197      lHtable1.delete(del);
198
199      get = new Get(row);
200      for (int i = 0; i < NB_RETRIES; i++) {
201        if (i == NB_RETRIES - 1) {
202          fail("Waited too much time for del replication");
203        }
204        Result res = lHtable2.get(get);
205        if (res.size() >= 1) {
206          LOG.info("Row not deleted");
207          Thread.sleep(SLEEP_TIME);
208        } else {
209          break;
210        }
211      }
212
213      // Checking verifyReplication for the default behavior.
214      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
215      runVerifyReplication(argsWithoutRaw, 0, 0);
216
217      // Checking verifyReplication with raw
218      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
219      runVerifyReplication(argsWithRawAsTrue, 1, 0);
220    } finally {
221      if (lHtable1 != null) {
222        lHtable1.close();
223      }
224      if (lHtable2 != null) {
225        lHtable2.close();
226      }
227    }
228  }
229
230  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
231    throws IOException {
232    FileSystem fs = FileSystem.get(conf);
233    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
234    assertNotNull(subDirectories);
235    assertEquals(subDirectories.length, expectedCount);
236    for (int i = 0; i < expectedCount; i++) {
237      assertTrue(subDirectories[i].isDirectory());
238    }
239  }
240
241  @Test
242  public void testVerifyRepJobWithQuorumAddress() throws Exception {
243    // Populate the tables, at the same time it guarantees that the tables are
244    // identical since it does the check
245    runSmallBatchTest();
246
247    // with a quorum address (a cluster key)
248    String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() };
249    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
250
251    Scan scan = new Scan();
252    ResultScanner rs = htable2.getScanner(scan);
253    Put put = null;
254    for (Result result : rs) {
255      put = new Put(result.getRow());
256      Cell firstVal = result.rawCells()[0];
257      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
258        Bytes.toBytes("diff data"));
259      htable2.put(put);
260    }
261    Delete delete = new Delete(put.getRow());
262    htable2.delete(delete);
263    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
264  }
265
266  @Test
267  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
268    // Populate the tables, at the same time it guarantees that the tables are
269    // identical since it does the check
270    runSmallBatchTest();
271
272    // Take source and target tables snapshot
273    Path rootDir = CommonFSUtils.getRootDir(CONF1);
274    FileSystem fs = rootDir.getFileSystem(CONF1);
275    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
276    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
277      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
278
279    // Take target snapshot
280    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
281    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
282    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
283    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
284      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
285
286    String peerFSAddress = peerFs.getUri().toString();
287    String tmpPath1 = UTIL1.getRandomDir().toString();
288    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
289
290    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
291      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
292      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
293      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
294      tableName.getNameAsString() };
295    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
296    checkRestoreTmpDir(CONF1, tmpPath1, 1);
297    checkRestoreTmpDir(CONF2, tmpPath2, 1);
298
299    Scan scan = new Scan();
300    ResultScanner rs = htable2.getScanner(scan);
301    Put put = null;
302    for (Result result : rs) {
303      put = new Put(result.getRow());
304      Cell firstVal = result.rawCells()[0];
305      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
306        Bytes.toBytes("diff data"));
307      htable2.put(put);
308    }
309    Delete delete = new Delete(put.getRow());
310    htable2.delete(delete);
311
312    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
313    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
314      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
315
316    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
317    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
318      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
319
320    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
321      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
322      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
323      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
324      tableName.getNameAsString() };
325    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
326    checkRestoreTmpDir(CONF1, tmpPath1, 2);
327    checkRestoreTmpDir(CONF2, tmpPath2, 2);
328  }
329
330  static void runBatchCopyTest() throws Exception {
331    // normal Batch tests for htable1
332    loadData("", row, noRepfamName);
333
334    Scan scan1 = new Scan();
335    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
336    ResultScanner scanner1 = htable1.getScanner(scan1);
337    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
338    for (Result result : res1) {
339      Put put = new Put(result.getRow());
340      for (Cell cell : result.rawCells()) {
341        put.add(cell);
342      }
343      puts.add(put);
344    }
345    scanner1.close();
346    assertEquals(NB_ROWS_IN_BATCH, res1.length);
347
348    // Copy the data to htable3
349    htable3.put(puts);
350
351    Scan scan2 = new Scan();
352    ResultScanner scanner2 = htable3.getScanner(scan2);
353    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
354    scanner2.close();
355    assertEquals(NB_ROWS_IN_BATCH, res2.length);
356  }
357
358  @Test
359  public void testVerifyRepJobWithPeerTableName() throws Exception {
360    // Populate the tables with same data
361    runBatchCopyTest();
362
363    // with a peerTableName along with quorum address (a cluster key)
364    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
365      getClusterKey(UTIL2), tableName.getNameAsString() };
366    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
367
368    UTIL2.deleteTableData(peerTableName);
369    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
370  }
371
372  @Test
373  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
374    // Populate the tables with same data
375    runBatchCopyTest();
376
377    // Take source and target tables snapshot
378    Path rootDir = CommonFSUtils.getRootDir(CONF1);
379    FileSystem fs = rootDir.getFileSystem(CONF1);
380    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
381    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
382      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
383
384    // Take target snapshot
385    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
386    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
387    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
388    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
389      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
390
391    String peerFSAddress = peerFs.getUri().toString();
392    String tmpPath1 = UTIL1.getRandomDir().toString();
393    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
394
395    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
396      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
397      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
398      "--peerFSAddress=" + peerFSAddress,
399      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
400      tableName.getNameAsString() };
401    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
402    checkRestoreTmpDir(CONF1, tmpPath1, 1);
403    checkRestoreTmpDir(CONF2, tmpPath2, 1);
404
405    Scan scan = new Scan();
406    ResultScanner rs = htable3.getScanner(scan);
407    Put put = null;
408    for (Result result : rs) {
409      put = new Put(result.getRow());
410      Cell firstVal = result.rawCells()[0];
411      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
412        Bytes.toBytes("diff data"));
413      htable3.put(put);
414    }
415    Delete delete = new Delete(put.getRow());
416    htable3.delete(delete);
417
418    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
419    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
420      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
421
422    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
423    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
424      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
425
426    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
427      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
428      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
429      "--peerFSAddress=" + peerFSAddress,
430      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
431      tableName.getNameAsString() };
432    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
433    checkRestoreTmpDir(CONF1, tmpPath1, 2);
434    checkRestoreTmpDir(CONF2, tmpPath2, 2);
435  }
436
437  @Test
438  public void testVerifyReplicationThreadedRecompares() throws Exception {
439    // Populate the tables with same data
440    runBatchCopyTest();
441
442    // ONLY_IN_PEER_TABLE_ROWS
443    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
444    put.addColumn(noRepfamName, row, row);
445    htable3.put(put);
446
447    // CONTENT_DIFFERENT_ROWS
448    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
449    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
450    htable3.put(put);
451
452    // ONLY_IN_SOURCE_TABLE_ROWS
453    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
454    put.addColumn(noRepfamName, row, row);
455    htable1.put(put);
456
457    String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3",
458      "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(),
459      getClusterKey(UTIL2), tableName.getNameAsString() };
460    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
461    assertEquals(9,
462      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
463    assertEquals(9,
464      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
465    assertEquals(1,
466      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
467    assertEquals(1,
468      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
469    assertEquals(1, counters
470      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
471  }
472
473  @Test
474  public void testFailsRemainingComparesAfterShutdown() throws Exception {
475    // Populate the tables with same data
476    runBatchCopyTest();
477
478    // ONLY_IN_PEER_TABLE_ROWS
479    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
480    put.addColumn(noRepfamName, row, row);
481    htable3.put(put);
482
483    // CONTENT_DIFFERENT_ROWS
484    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
485    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
486    htable3.put(put);
487
488    // ONLY_IN_SOURCE_TABLE_ROWS
489    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
490    put.addColumn(noRepfamName, row, row);
491    htable1.put(put);
492
493    /**
494     * recompareSleep is set to exceed how long we wait on
495     * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to
496     * test the counter-incrementing logic if the executor still hasn't terminated after the call to
497     * shutdown and awaitTermination
498     */
499    String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1",
500      "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(),
501      getClusterKey(UTIL2), tableName.getNameAsString() };
502
503    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
504    assertEquals(3,
505      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
506    assertEquals(3,
507      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
508    assertEquals(1,
509      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
510    assertEquals(1,
511      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
512    assertEquals(1, counters
513      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
514  }
515
516  @Test
517  public void testVerifyReplicationSynchronousRecompares() throws Exception {
518    // Populate the tables with same data
519    runBatchCopyTest();
520
521    // ONLY_IN_PEER_TABLE_ROWS
522    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
523    put.addColumn(noRepfamName, row, row);
524    htable3.put(put);
525
526    // CONTENT_DIFFERENT_ROWS
527    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
528    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
529    htable3.put(put);
530
531    // ONLY_IN_SOURCE_TABLE_ROWS
532    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
533    put.addColumn(noRepfamName, row, row);
534    htable1.put(put);
535
536    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
537      "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2),
538      tableName.getNameAsString() };
539    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
540    assertEquals(9,
541      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
542    assertEquals(9,
543      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
544    assertEquals(1,
545      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
546    assertEquals(1,
547      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
548    assertEquals(1, counters
549      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
550  }
551
552  @AfterAll
553  public static void tearDownAfterClass() throws Exception {
554    htable3.close();
555    TestReplicationBase.tearDownAfterClass();
556  }
557}