001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.TreeMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Get;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.ResultScanner;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
057import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.testclassification.ReplicationTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.FSUtils;
062import org.apache.hadoop.mapreduce.Job;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076@Category({ ReplicationTests.class, LargeTests.class })
077public class TestVerifyReplication extends TestReplicationBase {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestVerifyReplication.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
084
085  private static final String PEER_ID = "2";
086  private static final TableName peerTableName = TableName.valueOf("peerTest");
087  private static Table htable3;
088
089  @Rule
090  public TestName name = new TestName();
091
092  @Before
093  public void setUp() throws Exception {
094    cleanUp();
095    utility2.deleteTableData(peerTableName);
096  }
097
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    TestReplicationBase.setUpBeforeClass();
101
102    TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily(
103                    ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100)
104                            .build()).build();
105
106    Connection connection2 = ConnectionFactory.createConnection(conf2);
107    try (Admin admin2 = connection2.getAdmin()) {
108      admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
109    }
110    htable3 = connection2.getTable(peerTableName);
111  }
112
113  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
114      throws IOException, InterruptedException, ClassNotFoundException {
115    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
116    if (job == null) {
117      fail("Job wasn't created, see the log");
118    }
119    if (!job.waitForCompletion(true)) {
120      fail("Job failed, see the log");
121    }
122    assertEquals(expectedGoodRows,
123      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
124    assertEquals(expectedBadRows,
125      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
126  }
127
128  /**
129   * Do a small loading into a table, make sure the data is really the same, then run the
130   * VerifyReplication job to check the results. Do a second comparison where all the cells are
131   * different.
132   */
133  @Test
134  public void testVerifyRepJob() throws Exception {
135    // Populate the tables, at the same time it guarantees that the tables are
136    // identical since it does the check
137    runSmallBatchTest();
138
139    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
140    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
141
142    Scan scan = new Scan();
143    ResultScanner rs = htable2.getScanner(scan);
144    Put put = null;
145    for (Result result : rs) {
146      put = new Put(result.getRow());
147      Cell firstVal = result.rawCells()[0];
148      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
149        Bytes.toBytes("diff data"));
150      htable2.put(put);
151    }
152    Delete delete = new Delete(put.getRow());
153    htable2.delete(delete);
154    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
155  }
156
157  /**
158   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
159   * delete marker is replicated, run verify replication with and without raw to check the results.
160   */
161  @Test
162  public void testVerifyRepJobWithRawOptions() throws Exception {
163    LOG.info(name.getMethodName());
164
165    final TableName tableName = TableName.valueOf(name.getMethodName());
166    byte[] familyname = Bytes.toBytes("fam_raw");
167    byte[] row = Bytes.toBytes("row_raw");
168
169    Table lHtable1 = null;
170    Table lHtable2 = null;
171
172    try {
173      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
174          .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
175      TableDescriptor table =
176          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
177      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
178      for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
179        scopes.put(f.getName(), f.getScope());
180      }
181
182      Connection connection1 = ConnectionFactory.createConnection(conf1);
183      Connection connection2 = ConnectionFactory.createConnection(conf2);
184      try (Admin admin1 = connection1.getAdmin()) {
185        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
186      }
187      try (Admin admin2 = connection2.getAdmin()) {
188        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
189      }
190      utility1.waitUntilAllRegionsAssigned(tableName);
191      utility2.waitUntilAllRegionsAssigned(tableName);
192
193      lHtable1 = utility1.getConnection().getTable(tableName);
194      lHtable2 = utility2.getConnection().getTable(tableName);
195
196      Put put = new Put(row);
197      put.addColumn(familyname, row, row);
198      lHtable1.put(put);
199
200      Get get = new Get(row);
201      for (int i = 0; i < NB_RETRIES; i++) {
202        if (i == NB_RETRIES - 1) {
203          fail("Waited too much time for put replication");
204        }
205        Result res = lHtable2.get(get);
206        if (res.isEmpty()) {
207          LOG.info("Row not available");
208          Thread.sleep(SLEEP_TIME);
209        } else {
210          assertArrayEquals(res.value(), row);
211          break;
212        }
213      }
214
215      Delete del = new Delete(row);
216      lHtable1.delete(del);
217
218      get = new Get(row);
219      for (int i = 0; i < NB_RETRIES; i++) {
220        if (i == NB_RETRIES - 1) {
221          fail("Waited too much time for del replication");
222        }
223        Result res = lHtable2.get(get);
224        if (res.size() >= 1) {
225          LOG.info("Row not deleted");
226          Thread.sleep(SLEEP_TIME);
227        } else {
228          break;
229        }
230      }
231
232      // Checking verifyReplication for the default behavior.
233      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
234      runVerifyReplication(argsWithoutRaw, 0, 0);
235
236      // Checking verifyReplication with raw
237      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
238      runVerifyReplication(argsWithRawAsTrue, 1, 0);
239    } finally {
240      if (lHtable1 != null) {
241        lHtable1.close();
242      }
243      if (lHtable2 != null) {
244        lHtable2.close();
245      }
246    }
247  }
248
249  // VerifyReplication should honor versions option
250  @Test
251  public void testHBase14905() throws Exception {
252    // normal Batch tests
253    byte[] qualifierName = Bytes.toBytes("f1");
254    Put put = new Put(Bytes.toBytes("r1"));
255    long ts = System.currentTimeMillis();
256    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002"));
257    htable1.put(put);
258    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001"));
259    htable1.put(put);
260    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112"));
261    htable1.put(put);
262
263    Scan scan = new Scan();
264    scan.readVersions(100);
265    ResultScanner scanner1 = htable1.getScanner(scan);
266    Result[] res1 = scanner1.next(1);
267    scanner1.close();
268
269    assertEquals(1, res1.length);
270    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
271
272    for (int i = 0; i < NB_RETRIES; i++) {
273      scan = new Scan();
274      scan.readVersions(100);
275      scanner1 = htable2.getScanner(scan);
276      res1 = scanner1.next(1);
277      scanner1.close();
278      if (res1.length != 1) {
279        LOG.info("Only got " + res1.length + " rows");
280        Thread.sleep(SLEEP_TIME);
281      } else {
282        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
283        if (cellNumber != 3) {
284          LOG.info("Only got " + cellNumber + " cells");
285          Thread.sleep(SLEEP_TIME);
286        } else {
287          break;
288        }
289      }
290      if (i == NB_RETRIES - 1) {
291        fail("Waited too much time for normal batch replication");
292      }
293    }
294
295    put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111"));
296    htable2.put(put);
297    put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112"));
298    htable2.put(put);
299
300    scan = new Scan();
301    scan.readVersions(100);
302    scanner1 = htable2.getScanner(scan);
303    res1 = scanner1.next(NB_ROWS_IN_BATCH);
304    scanner1.close();
305
306    assertEquals(1, res1.length);
307    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
308
309    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
310    runVerifyReplication(args, 0, 1);
311  }
312
313  // VerifyReplication should honor versions option
314  @Test
315  public void testVersionMismatchHBase14905() throws Exception {
316    // normal Batch tests
317    byte[] qualifierName = Bytes.toBytes("f1");
318    Put put = new Put(Bytes.toBytes("r1"));
319    long ts = System.currentTimeMillis();
320    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
321    htable1.put(put);
322    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
323    htable1.put(put);
324    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
325    htable1.put(put);
326
327    Scan scan = new Scan();
328    scan.readVersions(100);
329    ResultScanner scanner1 = htable1.getScanner(scan);
330    Result[] res1 = scanner1.next(1);
331    scanner1.close();
332
333    assertEquals(1, res1.length);
334    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
335
336    for (int i = 0; i < NB_RETRIES; i++) {
337      scan = new Scan();
338      scan.readVersions(100);
339      scanner1 = htable2.getScanner(scan);
340      res1 = scanner1.next(1);
341      scanner1.close();
342      if (res1.length != 1) {
343        LOG.info("Only got " + res1.length + " rows");
344        Thread.sleep(SLEEP_TIME);
345      } else {
346        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
347        if (cellNumber != 3) {
348          LOG.info("Only got " + cellNumber + " cells");
349          Thread.sleep(SLEEP_TIME);
350        } else {
351          break;
352        }
353      }
354      if (i == NB_RETRIES - 1) {
355        fail("Waited too much time for normal batch replication");
356      }
357    }
358
359    try {
360      // Disabling replication and modifying the particular version of the cell to validate the
361      // feature.
362      hbaseAdmin.disableReplicationPeer(PEER_ID);
363      Put put2 = new Put(Bytes.toBytes("r1"));
364      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
365      htable2.put(put2);
366
367      scan = new Scan();
368      scan.readVersions(100);
369      scanner1 = htable2.getScanner(scan);
370      res1 = scanner1.next(NB_ROWS_IN_BATCH);
371      scanner1.close();
372      assertEquals(1, res1.length);
373      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
374
375      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
376      runVerifyReplication(args, 0, 1);
377    } finally {
378      hbaseAdmin.enableReplicationPeer(PEER_ID);
379    }
380  }
381
382  @Test
383  public void testVerifyReplicationPrefixFiltering() throws Exception {
384    final byte[] prefixRow = Bytes.toBytes("prefixrow");
385    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
386    loadData("prefixrow", prefixRow);
387    loadData("secondrow", prefixRow2);
388    loadData("aaa", row);
389    loadData("zzz", row);
390    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
391    String[] args =
392        new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
393    runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
394  }
395
396  @Test
397  public void testVerifyReplicationSnapshotArguments() {
398    String[] args =
399        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
400    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
401
402    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
403    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
404
405    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
406        tableName.getNameAsString() };
407    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
408
409    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
410    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
411
412    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
413    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
414
415    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
416        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
417        tableName.getNameAsString() };
418    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
419
420    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
421        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
422        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
423
424    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
425  }
426
427  private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
428      throws IOException {
429    FileSystem fs = FileSystem.get(conf);
430    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
431    assertNotNull(subDirectories);
432    assertEquals(subDirectories.length, expectedCount);
433    for (int i = 0; i < expectedCount; i++) {
434      assertTrue(subDirectories[i].isDirectory());
435    }
436  }
437
438  @Test
439  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
440    // Populate the tables, at the same time it guarantees that the tables are
441    // identical since it does the check
442    runSmallBatchTest();
443
444    // Take source and target tables snapshot
445    Path rootDir = FSUtils.getRootDir(conf1);
446    FileSystem fs = rootDir.getFileSystem(conf1);
447    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
448    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
449      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
450
451    // Take target snapshot
452    Path peerRootDir = FSUtils.getRootDir(conf2);
453    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
454    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
455    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
456      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
457
458    String peerFSAddress = peerFs.getUri().toString();
459    String temPath1 = utility1.getRandomDir().toString();
460    String temPath2 = "/tmp" + System.currentTimeMillis();
461
462    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
463        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
464        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
465        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
466    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
467    checkRestoreTmpDir(conf1, temPath1, 1);
468    checkRestoreTmpDir(conf2, temPath2, 1);
469
470    Scan scan = new Scan();
471    ResultScanner rs = htable2.getScanner(scan);
472    Put put = null;
473    for (Result result : rs) {
474      put = new Put(result.getRow());
475      Cell firstVal = result.rawCells()[0];
476      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
477        Bytes.toBytes("diff data"));
478      htable2.put(put);
479    }
480    Delete delete = new Delete(put.getRow());
481    htable2.delete(delete);
482
483    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
484    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
485      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
486
487    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
488    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
489      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
490
491    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
492        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
493        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
494        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
495    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
496    checkRestoreTmpDir(conf1, temPath1, 2);
497    checkRestoreTmpDir(conf2, temPath2, 2);
498  }
499
500  @Test
501  public void testVerifyRepJobWithQuorumAddress() throws Exception {
502    // Populate the tables, at the same time it guarantees that the tables are
503    // identical since it does the check
504    runSmallBatchTest();
505
506    // with a quorum address (a cluster key)
507    String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
508    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
509
510    Scan scan = new Scan();
511    ResultScanner rs = htable2.getScanner(scan);
512    Put put = null;
513    for (Result result : rs) {
514      put = new Put(result.getRow());
515      Cell firstVal = result.rawCells()[0];
516      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
517        Bytes.toBytes("diff data"));
518      htable2.put(put);
519    }
520    Delete delete = new Delete(put.getRow());
521    htable2.delete(delete);
522    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
523  }
524
525  @Test
526  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
527    // Populate the tables, at the same time it guarantees that the tables are
528    // identical since it does the check
529    runSmallBatchTest();
530
531    // Take source and target tables snapshot
532    Path rootDir = FSUtils.getRootDir(conf1);
533    FileSystem fs = rootDir.getFileSystem(conf1);
534    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
535    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
536      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
537
538    // Take target snapshot
539    Path peerRootDir = FSUtils.getRootDir(conf2);
540    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
541    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
542    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
543      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
544
545    String peerFSAddress = peerFs.getUri().toString();
546    String tmpPath1 = utility1.getRandomDir().toString();
547    String tmpPath2 = "/tmp" + System.currentTimeMillis();
548
549    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
550      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
551      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
552      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
553      tableName.getNameAsString() };
554    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
555    checkRestoreTmpDir(conf1, tmpPath1, 1);
556    checkRestoreTmpDir(conf2, tmpPath2, 1);
557
558    Scan scan = new Scan();
559    ResultScanner rs = htable2.getScanner(scan);
560    Put put = null;
561    for (Result result : rs) {
562      put = new Put(result.getRow());
563      Cell firstVal = result.rawCells()[0];
564      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
565        Bytes.toBytes("diff data"));
566      htable2.put(put);
567    }
568    Delete delete = new Delete(put.getRow());
569    htable2.delete(delete);
570
571    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
572    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
573      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
574
575    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
576    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
577      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
578
579    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
580      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
581      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
582      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
583      tableName.getNameAsString() };
584    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
585    checkRestoreTmpDir(conf1, tmpPath1, 2);
586    checkRestoreTmpDir(conf2, tmpPath2, 2);
587  }
588
589  private static void runBatchCopyTest() throws Exception {
590    // normal Batch tests for htable1
591    loadData("", row, noRepfamName);
592
593    Scan scan1 = new Scan();
594    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
595    ResultScanner scanner1 = htable1.getScanner(scan1);
596    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
597    for (Result result : res1) {
598      Put put = new Put(result.getRow());
599      for (Cell cell : result.rawCells()) {
600        put.add(cell);
601      }
602      puts.add(put);
603    }
604    scanner1.close();
605    assertEquals(NB_ROWS_IN_BATCH, res1.length);
606
607    // Copy the data to htable3
608    htable3.put(puts);
609
610    Scan scan2 = new Scan();
611    ResultScanner scanner2 = htable3.getScanner(scan2);
612    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
613    scanner2.close();
614    assertEquals(NB_ROWS_IN_BATCH, res2.length);
615  }
616
617  @Test
618  public void testVerifyRepJobWithPeerTableName() throws Exception {
619    // Populate the tables with same data
620    runBatchCopyTest();
621
622    // with a peerTableName along with quorum address (a cluster key)
623    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
624        utility2.getClusterKey(), tableName.getNameAsString() };
625    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
626
627    utility2.deleteTableData(peerTableName);
628    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
629  }
630
631  @Test
632  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
633    // Populate the tables with same data
634    runBatchCopyTest();
635
636    // Take source and target tables snapshot
637    Path rootDir = FSUtils.getRootDir(conf1);
638    FileSystem fs = rootDir.getFileSystem(conf1);
639    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
640    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
641            Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
642
643    // Take target snapshot
644    Path peerRootDir = FSUtils.getRootDir(conf2);
645    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
646    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
647    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
648            Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
649
650    String peerFSAddress = peerFs.getUri().toString();
651    String tmpPath1 = utility1.getRandomDir().toString();
652    String tmpPath2 = "/tmp" + System.currentTimeMillis();
653
654    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
655      "--sourceSnapshotName=" + sourceSnapshotName,
656      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
657      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
658      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
659      tableName.getNameAsString() };
660    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
661    checkRestoreTmpDir(conf1, tmpPath1, 1);
662    checkRestoreTmpDir(conf2, tmpPath2, 1);
663
664    Scan scan = new Scan();
665    ResultScanner rs = htable3.getScanner(scan);
666    Put put = null;
667    for (Result result : rs) {
668      put = new Put(result.getRow());
669      Cell firstVal = result.rawCells()[0];
670      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
671              Bytes.toBytes("diff data"));
672      htable3.put(put);
673    }
674    Delete delete = new Delete(put.getRow());
675    htable3.delete(delete);
676
677    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
678    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
679            Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
680
681    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
682    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
683            Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
684
685    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
686      "--sourceSnapshotName=" + sourceSnapshotName,
687      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
688      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
689      "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
690      tableName.getNameAsString() };
691    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
692    checkRestoreTmpDir(conf1, tmpPath1, 2);
693    checkRestoreTmpDir(conf2, tmpPath2, 2);
694  }
695
696  @AfterClass
697  public static void tearDownAfterClass() throws Exception {
698    htable3.close();
699    TestReplicationBase.tearDownAfterClass();
700  }
701}