001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.ReplicationTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.mapreduce.Counters;
061import org.apache.hadoop.mapreduce.Job;
062import org.junit.AfterClass;
063import org.junit.Before;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category({ ReplicationTests.class, LargeTests.class })
074public class TestVerifyReplication extends TestReplicationBase {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestVerifyReplication.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
081
082  private static final String PEER_ID = "2";
083  private static final TableName peerTableName = TableName.valueOf("peerTest");
084  private static Table htable3;
085
086  @Rule
087  public TestName name = new TestName();
088
089  @Before
090  public void setUp() throws Exception {
091    cleanUp();
092    UTIL2.deleteTableData(peerTableName);
093  }
094
095  @BeforeClass
096  public static void setUpBeforeClass() throws Exception {
097    TestReplicationBase.setUpBeforeClass();
098
099    TableDescriptor peerTable =
100      TableDescriptorBuilder.newBuilder(peerTableName)
101        .setColumnFamily(
102          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
103        .build();
104
105    Connection connection2 = ConnectionFactory.createConnection(CONF2);
106    try (Admin admin2 = connection2.getAdmin()) {
107      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
108    }
109    htable3 = connection2.getTable(peerTableName);
110  }
111
112  static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
113    throws IOException, InterruptedException, ClassNotFoundException {
114    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
115    if (job == null) {
116      fail("Job wasn't created, see the log");
117    }
118    if (!job.waitForCompletion(true)) {
119      fail("Job failed, see the log");
120    }
121    assertEquals(expectedGoodRows,
122      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
123    assertEquals(expectedBadRows,
124      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
125    return job.getCounters();
126  }
127
128  /**
129   * Do a small loading into a table, make sure the data is really the same, then run the
130   * VerifyReplication job to check the results. Do a second comparison where all the cells are
131   * different.
132   */
133  @Test
134  public void testVerifyRepJob() throws Exception {
135    // Populate the tables, at the same time it guarantees that the tables are
136    // identical since it does the check
137    runSmallBatchTest();
138
139    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
140    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
141
142    Scan scan = new Scan();
143    ResultScanner rs = htable2.getScanner(scan);
144    Put put = null;
145    for (Result result : rs) {
146      put = new Put(result.getRow());
147      Cell firstVal = result.rawCells()[0];
148      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
149        Bytes.toBytes("diff data"));
150      htable2.put(put);
151    }
152    Delete delete = new Delete(put.getRow());
153    htable2.delete(delete);
154    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
155  }
156
157  /**
158   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
159   * delete marker is replicated, run verify replication with and without raw to check the results.
160   */
161  @Test
162  public void testVerifyRepJobWithRawOptions() throws Exception {
163    LOG.info(name.getMethodName());
164
165    final TableName tableName = TableName.valueOf(name.getMethodName());
166    byte[] familyname = Bytes.toBytes("fam_raw");
167    byte[] row = Bytes.toBytes("row_raw");
168
169    Table lHtable1 = null;
170    Table lHtable2 = null;
171
172    try {
173      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
174        .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
175      TableDescriptor table =
176        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
177
178      Connection connection1 = ConnectionFactory.createConnection(CONF1);
179      Connection connection2 = ConnectionFactory.createConnection(CONF2);
180      try (Admin admin1 = connection1.getAdmin()) {
181        admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
182      }
183      try (Admin admin2 = connection2.getAdmin()) {
184        admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
185      }
186      UTIL1.waitUntilAllRegionsAssigned(tableName);
187      UTIL2.waitUntilAllRegionsAssigned(tableName);
188
189      lHtable1 = UTIL1.getConnection().getTable(tableName);
190      lHtable2 = UTIL2.getConnection().getTable(tableName);
191
192      Put put = new Put(row);
193      put.addColumn(familyname, row, row);
194      lHtable1.put(put);
195
196      Get get = new Get(row);
197      for (int i = 0; i < NB_RETRIES; i++) {
198        if (i == NB_RETRIES - 1) {
199          fail("Waited too much time for put replication");
200        }
201        Result res = lHtable2.get(get);
202        if (res.isEmpty()) {
203          LOG.info("Row not available");
204          Thread.sleep(SLEEP_TIME);
205        } else {
206          assertArrayEquals(res.value(), row);
207          break;
208        }
209      }
210
211      Delete del = new Delete(row);
212      lHtable1.delete(del);
213
214      get = new Get(row);
215      for (int i = 0; i < NB_RETRIES; i++) {
216        if (i == NB_RETRIES - 1) {
217          fail("Waited too much time for del replication");
218        }
219        Result res = lHtable2.get(get);
220        if (res.size() >= 1) {
221          LOG.info("Row not deleted");
222          Thread.sleep(SLEEP_TIME);
223        } else {
224          break;
225        }
226      }
227
228      // Checking verifyReplication for the default behavior.
229      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
230      runVerifyReplication(argsWithoutRaw, 0, 0);
231
232      // Checking verifyReplication with raw
233      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
234      runVerifyReplication(argsWithRawAsTrue, 1, 0);
235    } finally {
236      if (lHtable1 != null) {
237        lHtable1.close();
238      }
239      if (lHtable2 != null) {
240        lHtable2.close();
241      }
242    }
243  }
244
245  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
246    throws IOException {
247    FileSystem fs = FileSystem.get(conf);
248    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
249    assertNotNull(subDirectories);
250    assertEquals(subDirectories.length, expectedCount);
251    for (int i = 0; i < expectedCount; i++) {
252      assertTrue(subDirectories[i].isDirectory());
253    }
254  }
255
256  @Test
257  public void testVerifyRepJobWithQuorumAddress() throws Exception {
258    // Populate the tables, at the same time it guarantees that the tables are
259    // identical since it does the check
260    runSmallBatchTest();
261
262    // with a quorum address (a cluster key)
263    String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() };
264    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
265
266    Scan scan = new Scan();
267    ResultScanner rs = htable2.getScanner(scan);
268    Put put = null;
269    for (Result result : rs) {
270      put = new Put(result.getRow());
271      Cell firstVal = result.rawCells()[0];
272      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
273        Bytes.toBytes("diff data"));
274      htable2.put(put);
275    }
276    Delete delete = new Delete(put.getRow());
277    htable2.delete(delete);
278    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
279  }
280
281  @Test
282  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
283    // Populate the tables, at the same time it guarantees that the tables are
284    // identical since it does the check
285    runSmallBatchTest();
286
287    // Take source and target tables snapshot
288    Path rootDir = CommonFSUtils.getRootDir(CONF1);
289    FileSystem fs = rootDir.getFileSystem(CONF1);
290    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
291    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
292      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
293
294    // Take target snapshot
295    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
296    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
297    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
298    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
299      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
300
301    String peerFSAddress = peerFs.getUri().toString();
302    String tmpPath1 = UTIL1.getRandomDir().toString();
303    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
304
305    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
306      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
307      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
308      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
309      tableName.getNameAsString() };
310    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
311    checkRestoreTmpDir(CONF1, tmpPath1, 1);
312    checkRestoreTmpDir(CONF2, tmpPath2, 1);
313
314    Scan scan = new Scan();
315    ResultScanner rs = htable2.getScanner(scan);
316    Put put = null;
317    for (Result result : rs) {
318      put = new Put(result.getRow());
319      Cell firstVal = result.rawCells()[0];
320      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
321        Bytes.toBytes("diff data"));
322      htable2.put(put);
323    }
324    Delete delete = new Delete(put.getRow());
325    htable2.delete(delete);
326
327    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
328    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
329      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
330
331    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
332    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
333      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
334
335    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
336      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
337      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
338      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
339      tableName.getNameAsString() };
340    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
341    checkRestoreTmpDir(CONF1, tmpPath1, 2);
342    checkRestoreTmpDir(CONF2, tmpPath2, 2);
343  }
344
345  static void runBatchCopyTest() throws Exception {
346    // normal Batch tests for htable1
347    loadData("", row, noRepfamName);
348
349    Scan scan1 = new Scan();
350    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
351    ResultScanner scanner1 = htable1.getScanner(scan1);
352    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
353    for (Result result : res1) {
354      Put put = new Put(result.getRow());
355      for (Cell cell : result.rawCells()) {
356        put.add(cell);
357      }
358      puts.add(put);
359    }
360    scanner1.close();
361    assertEquals(NB_ROWS_IN_BATCH, res1.length);
362
363    // Copy the data to htable3
364    htable3.put(puts);
365
366    Scan scan2 = new Scan();
367    ResultScanner scanner2 = htable3.getScanner(scan2);
368    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
369    scanner2.close();
370    assertEquals(NB_ROWS_IN_BATCH, res2.length);
371  }
372
373  @Test
374  public void testVerifyRepJobWithPeerTableName() throws Exception {
375    // Populate the tables with same data
376    runBatchCopyTest();
377
378    // with a peerTableName along with quorum address (a cluster key)
379    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
380      UTIL2.getClusterKey(), tableName.getNameAsString() };
381    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
382
383    UTIL2.deleteTableData(peerTableName);
384    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
385  }
386
387  @Test
388  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
389    // Populate the tables with same data
390    runBatchCopyTest();
391
392    // Take source and target tables snapshot
393    Path rootDir = CommonFSUtils.getRootDir(CONF1);
394    FileSystem fs = rootDir.getFileSystem(CONF1);
395    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
396    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
397      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
398
399    // Take target snapshot
400    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
401    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
402    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
403    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
404      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
405
406    String peerFSAddress = peerFs.getUri().toString();
407    String tmpPath1 = UTIL1.getRandomDir().toString();
408    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
409
410    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
411      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
412      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
413      "--peerFSAddress=" + peerFSAddress,
414      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
415      tableName.getNameAsString() };
416    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
417    checkRestoreTmpDir(CONF1, tmpPath1, 1);
418    checkRestoreTmpDir(CONF2, tmpPath2, 1);
419
420    Scan scan = new Scan();
421    ResultScanner rs = htable3.getScanner(scan);
422    Put put = null;
423    for (Result result : rs) {
424      put = new Put(result.getRow());
425      Cell firstVal = result.rawCells()[0];
426      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
427        Bytes.toBytes("diff data"));
428      htable3.put(put);
429    }
430    Delete delete = new Delete(put.getRow());
431    htable3.delete(delete);
432
433    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
434    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
435      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
436
437    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
438    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
439      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
440
441    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
442      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
443      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
444      "--peerFSAddress=" + peerFSAddress,
445      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
446      tableName.getNameAsString() };
447    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
448    checkRestoreTmpDir(CONF1, tmpPath1, 2);
449    checkRestoreTmpDir(CONF2, tmpPath2, 2);
450  }
451
452  @Test
453  public void testVerifyReplicationThreadedRecompares() throws Exception {
454    // Populate the tables with same data
455    runBatchCopyTest();
456
457    // ONLY_IN_PEER_TABLE_ROWS
458    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
459    put.addColumn(noRepfamName, row, row);
460    htable3.put(put);
461
462    // CONTENT_DIFFERENT_ROWS
463    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
464    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
465    htable3.put(put);
466
467    // ONLY_IN_SOURCE_TABLE_ROWS
468    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
469    put.addColumn(noRepfamName, row, row);
470    htable1.put(put);
471
472    String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3",
473      "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(),
474      UTIL2.getClusterKey(), tableName.getNameAsString() };
475    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
476    assertEquals(
477      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9);
478    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
479      9);
480    assertEquals(
481      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
482      1);
483    assertEquals(
484      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
485      1);
486    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
487      .getValue(), 1);
488  }
489
490  @Test
491  public void testFailsRemainingComparesAfterShutdown() throws Exception {
492    // Populate the tables with same data
493    runBatchCopyTest();
494
495    // ONLY_IN_PEER_TABLE_ROWS
496    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
497    put.addColumn(noRepfamName, row, row);
498    htable3.put(put);
499
500    // CONTENT_DIFFERENT_ROWS
501    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
502    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
503    htable3.put(put);
504
505    // ONLY_IN_SOURCE_TABLE_ROWS
506    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
507    put.addColumn(noRepfamName, row, row);
508    htable1.put(put);
509
510    /**
511     * recompareSleep is set to exceed how long we wait on
512     * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to
513     * test the counter-incrementing logic if the executor still hasn't terminated after the call to
514     * shutdown and awaitTermination
515     */
516    String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1",
517      "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(),
518      UTIL2.getClusterKey(), tableName.getNameAsString() };
519
520    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
521    assertEquals(
522      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3);
523    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
524      3);
525    assertEquals(
526      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
527      1);
528    assertEquals(
529      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
530      1);
531    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
532      .getValue(), 1);
533  }
534
535  @Test
536  public void testVerifyReplicationSynchronousRecompares() throws Exception {
537    // Populate the tables with same data
538    runBatchCopyTest();
539
540    // ONLY_IN_PEER_TABLE_ROWS
541    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
542    put.addColumn(noRepfamName, row, row);
543    htable3.put(put);
544
545    // CONTENT_DIFFERENT_ROWS
546    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
547    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
548    htable3.put(put);
549
550    // ONLY_IN_SOURCE_TABLE_ROWS
551    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
552    put.addColumn(noRepfamName, row, row);
553    htable1.put(put);
554
555    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
556      "--peerTableName=" + peerTableName.getNameAsString(), UTIL2.getClusterKey(),
557      tableName.getNameAsString() };
558    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
559    assertEquals(
560      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9);
561    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
562      9);
563    assertEquals(
564      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
565      1);
566    assertEquals(
567      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
568      1);
569    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
570      .getValue(), 1);
571  }
572
573  @AfterClass
574  public static void tearDownAfterClass() throws Exception {
575    htable3.close();
576    TestReplicationBase.tearDownAfterClass();
577  }
578}