001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.mapreduce.Counters;
058import org.apache.hadoop.mapreduce.Job;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067public abstract class VerifyReplicationTestBase extends TestReplicationBase {
068
069  private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationTestBase.class);
070
071  private static final String PEER_ID = "2";
072  private static final TableName peerTableName = TableName.valueOf("peerTest");
073  private static Table htable3;
074
075  @BeforeEach
076  public void setUp() throws Exception {
077    cleanUp();
078    UTIL2.deleteTableData(peerTableName);
079  }
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083    TableDescriptor peerTable =
084      TableDescriptorBuilder.newBuilder(peerTableName)
085        .setColumnFamily(
086          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
087        .build();
088
089    Connection connection2 = ConnectionFactory.createConnection(CONF2);
090    try (Admin admin2 = connection2.getAdmin()) {
091      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
092    }
093    htable3 = connection2.getTable(peerTableName);
094  }
095
096  static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
097    throws IOException, InterruptedException, ClassNotFoundException {
098    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
099    if (job == null) {
100      fail("Job wasn't created, see the log");
101    }
102    if (!job.waitForCompletion(true)) {
103      fail("Job failed, see the log");
104    }
105    assertEquals(expectedGoodRows,
106      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
107    assertEquals(expectedBadRows,
108      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
109    return job.getCounters();
110  }
111
112  /**
113   * Do a small loading into a table, make sure the data is really the same, then run the
114   * VerifyReplication job to check the results. Do a second comparison where all the cells are
115   * different.
116   */
117  @Test
118  public void testVerifyRepJob() throws Exception {
119    // Populate the tables, at the same time it guarantees that the tables are
120    // identical since it does the check
121    runSmallBatchTest();
122
123    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
124    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
125
126    Scan scan = new Scan();
127    ResultScanner rs = htable2.getScanner(scan);
128    Put put = null;
129    for (Result result : rs) {
130      put = new Put(result.getRow());
131      Cell firstVal = result.rawCells()[0];
132      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
133        Bytes.toBytes("diff data"));
134      htable2.put(put);
135    }
136    Delete delete = new Delete(put.getRow());
137    htable2.delete(delete);
138    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
139  }
140
141  /**
142   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
143   * delete marker is replicated, run verify replication with and without raw to check the results.
144   */
145  @Test
146  public void testVerifyRepJobWithRawOptions(TestInfo testInfo) throws Exception {
147    LOG.info(testInfo.getTestMethod().get().getName());
148
149    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
150    byte[] familyname = Bytes.toBytes("fam_raw");
151    byte[] row = Bytes.toBytes("row_raw");
152
153    Table lHtable1 = null;
154    Table lHtable2 = null;
155
156    try {
157      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
158        .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
159      TableDescriptor table =
160        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
161
162      Connection connection1 = ConnectionFactory.createConnection(CONF1);
163      Connection connection2 = ConnectionFactory.createConnection(CONF2);
164      try (Admin admin1 = connection1.getAdmin()) {
165        admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
166      }
167      try (Admin admin2 = connection2.getAdmin()) {
168        admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
169      }
170      UTIL1.waitUntilAllRegionsAssigned(tableName);
171      UTIL2.waitUntilAllRegionsAssigned(tableName);
172
173      lHtable1 = UTIL1.getConnection().getTable(tableName);
174      lHtable2 = UTIL2.getConnection().getTable(tableName);
175
176      Put put = new Put(row);
177      put.addColumn(familyname, row, row);
178      lHtable1.put(put);
179
180      Get get = new Get(row);
181      for (int i = 0; i < NB_RETRIES; i++) {
182        if (i == NB_RETRIES - 1) {
183          fail("Waited too much time for put replication");
184        }
185        Result res = lHtable2.get(get);
186        if (res.isEmpty()) {
187          LOG.info("Row not available");
188          Thread.sleep(SLEEP_TIME);
189        } else {
190          assertArrayEquals(res.value(), row);
191          break;
192        }
193      }
194
195      Delete del = new Delete(row);
196      lHtable1.delete(del);
197
198      get = new Get(row);
199      for (int i = 0; i < NB_RETRIES; i++) {
200        if (i == NB_RETRIES - 1) {
201          fail("Waited too much time for del replication");
202        }
203        Result res = lHtable2.get(get);
204        if (res.size() >= 1) {
205          LOG.info("Row not deleted");
206          Thread.sleep(SLEEP_TIME);
207        } else {
208          break;
209        }
210      }
211
212      // Checking verifyReplication for the default behavior.
213      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
214      runVerifyReplication(argsWithoutRaw, 0, 0);
215
216      // Checking verifyReplication with raw
217      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
218      runVerifyReplication(argsWithRawAsTrue, 1, 0);
219    } finally {
220      if (lHtable1 != null) {
221        lHtable1.close();
222      }
223      if (lHtable2 != null) {
224        lHtable2.close();
225      }
226    }
227  }
228
229  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
230    throws IOException {
231    FileSystem fs = FileSystem.get(conf);
232    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
233    assertNotNull(subDirectories);
234    assertEquals(subDirectories.length, expectedCount);
235    for (int i = 0; i < expectedCount; i++) {
236      assertTrue(subDirectories[i].isDirectory());
237    }
238  }
239
240  @Test
241  public void testVerifyRepJobWithQuorumAddress() throws Exception {
242    // Populate the tables, at the same time it guarantees that the tables are
243    // identical since it does the check
244    runSmallBatchTest();
245
246    // with a quorum address (a cluster key)
247    String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() };
248    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
249
250    Scan scan = new Scan();
251    ResultScanner rs = htable2.getScanner(scan);
252    Put put = null;
253    for (Result result : rs) {
254      put = new Put(result.getRow());
255      Cell firstVal = result.rawCells()[0];
256      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
257        Bytes.toBytes("diff data"));
258      htable2.put(put);
259    }
260    Delete delete = new Delete(put.getRow());
261    htable2.delete(delete);
262    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
263  }
264
265  @Test
266  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
267    // Populate the tables, at the same time it guarantees that the tables are
268    // identical since it does the check
269    runSmallBatchTest();
270
271    // Take source and target tables snapshot
272    Path rootDir = CommonFSUtils.getRootDir(CONF1);
273    FileSystem fs = rootDir.getFileSystem(CONF1);
274    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
275    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
276      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
277
278    // Take target snapshot
279    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
280    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
281    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
282    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
283      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
284
285    String peerFSAddress = peerFs.getUri().toString();
286    String tmpPath1 = UTIL1.getRandomDir().toString();
287    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
288
289    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
290      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
291      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
292      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
293      tableName.getNameAsString() };
294    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
295    checkRestoreTmpDir(CONF1, tmpPath1, 1);
296    checkRestoreTmpDir(CONF2, tmpPath2, 1);
297
298    Scan scan = new Scan();
299    ResultScanner rs = htable2.getScanner(scan);
300    Put put = null;
301    for (Result result : rs) {
302      put = new Put(result.getRow());
303      Cell firstVal = result.rawCells()[0];
304      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
305        Bytes.toBytes("diff data"));
306      htable2.put(put);
307    }
308    Delete delete = new Delete(put.getRow());
309    htable2.delete(delete);
310
311    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
312    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
313      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
314
315    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
316    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
317      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
318
319    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
320      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
321      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
322      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
323      tableName.getNameAsString() };
324    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
325    checkRestoreTmpDir(CONF1, tmpPath1, 2);
326    checkRestoreTmpDir(CONF2, tmpPath2, 2);
327  }
328
329  static void runBatchCopyTest() throws Exception {
330    // normal Batch tests for htable1
331    loadData("", row, noRepfamName);
332
333    Scan scan1 = new Scan();
334    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
335    ResultScanner scanner1 = htable1.getScanner(scan1);
336    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
337    for (Result result : res1) {
338      Put put = new Put(result.getRow());
339      for (Cell cell : result.rawCells()) {
340        put.add(cell);
341      }
342      puts.add(put);
343    }
344    scanner1.close();
345    assertEquals(NB_ROWS_IN_BATCH, res1.length);
346
347    // Copy the data to htable3
348    htable3.put(puts);
349
350    Scan scan2 = new Scan();
351    ResultScanner scanner2 = htable3.getScanner(scan2);
352    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
353    scanner2.close();
354    assertEquals(NB_ROWS_IN_BATCH, res2.length);
355  }
356
357  @Test
358  public void testVerifyRepJobWithPeerTableName() throws Exception {
359    // Populate the tables with same data
360    runBatchCopyTest();
361
362    // with a peerTableName along with quorum address (a cluster key)
363    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
364      getClusterKey(UTIL2), tableName.getNameAsString() };
365    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
366
367    UTIL2.deleteTableData(peerTableName);
368    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
369  }
370
371  @Test
372  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
373    // Populate the tables with same data
374    runBatchCopyTest();
375
376    // Take source and target tables snapshot
377    Path rootDir = CommonFSUtils.getRootDir(CONF1);
378    FileSystem fs = rootDir.getFileSystem(CONF1);
379    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
380    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
381      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
382
383    // Take target snapshot
384    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
385    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
386    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
387    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
388      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
389
390    String peerFSAddress = peerFs.getUri().toString();
391    String tmpPath1 = UTIL1.getRandomDir().toString();
392    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
393
394    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
395      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
396      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
397      "--peerFSAddress=" + peerFSAddress,
398      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
399      tableName.getNameAsString() };
400    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
401    checkRestoreTmpDir(CONF1, tmpPath1, 1);
402    checkRestoreTmpDir(CONF2, tmpPath2, 1);
403
404    Scan scan = new Scan();
405    ResultScanner rs = htable3.getScanner(scan);
406    Put put = null;
407    for (Result result : rs) {
408      put = new Put(result.getRow());
409      Cell firstVal = result.rawCells()[0];
410      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
411        Bytes.toBytes("diff data"));
412      htable3.put(put);
413    }
414    Delete delete = new Delete(put.getRow());
415    htable3.delete(delete);
416
417    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
418    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
419      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
420
421    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
422    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
423      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
424
425    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
426      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
427      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
428      "--peerFSAddress=" + peerFSAddress,
429      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
430      tableName.getNameAsString() };
431    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
432    checkRestoreTmpDir(CONF1, tmpPath1, 2);
433    checkRestoreTmpDir(CONF2, tmpPath2, 2);
434  }
435
436  @Test
437  public void testVerifyReplicationThreadedRecompares() throws Exception {
438    // Populate the tables with same data
439    runBatchCopyTest();
440
441    // ONLY_IN_PEER_TABLE_ROWS
442    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
443    put.addColumn(noRepfamName, row, row);
444    htable3.put(put);
445
446    // CONTENT_DIFFERENT_ROWS
447    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
448    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
449    htable3.put(put);
450
451    // ONLY_IN_SOURCE_TABLE_ROWS
452    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
453    put.addColumn(noRepfamName, row, row);
454    htable1.put(put);
455
456    String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3",
457      "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(),
458      getClusterKey(UTIL2), tableName.getNameAsString() };
459    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
460    assertEquals(9,
461      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
462    assertEquals(9,
463      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
464    assertEquals(1,
465      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
466    assertEquals(1,
467      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
468    assertEquals(1, counters
469      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
470  }
471
472  @Test
473  public void testFailsRemainingComparesAfterShutdown() throws Exception {
474    // Populate the tables with same data
475    runBatchCopyTest();
476
477    // ONLY_IN_PEER_TABLE_ROWS
478    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
479    put.addColumn(noRepfamName, row, row);
480    htable3.put(put);
481
482    // CONTENT_DIFFERENT_ROWS
483    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
484    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
485    htable3.put(put);
486
487    // ONLY_IN_SOURCE_TABLE_ROWS
488    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
489    put.addColumn(noRepfamName, row, row);
490    htable1.put(put);
491
492    /**
493     * recompareSleep is set to exceed how long we wait on
494     * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to
495     * test the counter-incrementing logic if the executor still hasn't terminated after the call to
496     * shutdown and awaitTermination
497     */
498    String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1",
499      "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(),
500      getClusterKey(UTIL2), tableName.getNameAsString() };
501
502    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
503    assertEquals(3,
504      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
505    assertEquals(3,
506      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
507    assertEquals(1,
508      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
509    assertEquals(1,
510      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
511    assertEquals(1, counters
512      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
513  }
514
515  @Test
516  public void testVerifyReplicationSynchronousRecompares() throws Exception {
517    // Populate the tables with same data
518    runBatchCopyTest();
519
520    // ONLY_IN_PEER_TABLE_ROWS
521    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
522    put.addColumn(noRepfamName, row, row);
523    htable3.put(put);
524
525    // CONTENT_DIFFERENT_ROWS
526    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
527    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
528    htable3.put(put);
529
530    // ONLY_IN_SOURCE_TABLE_ROWS
531    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
532    put.addColumn(noRepfamName, row, row);
533    htable1.put(put);
534
535    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
536      "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2),
537      tableName.getNameAsString() };
538    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
539    assertEquals(9,
540      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
541    assertEquals(9,
542      counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
543    assertEquals(1,
544      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue());
545    assertEquals(1,
546      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue());
547    assertEquals(1, counters
548      .findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
549  }
550
551  @AfterAll
552  public static void tearDownAfterClass() throws Exception {
553    htable3.close();
554    TestReplicationBase.tearDownAfterClass();
555  }
556}