001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.ReplicationTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.mapreduce.Job;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Category({ ReplicationTests.class, LargeTests.class })
073public class TestVerifyReplication extends TestReplicationBase {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestVerifyReplication.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
080
081  private static final String PEER_ID = "2";
082  private static final TableName peerTableName = TableName.valueOf("peerTest");
083  private static Table htable3;
084
085  @Rule
086  public TestName name = new TestName();
087
088  @Before
089  public void setUp() throws Exception {
090    cleanUp();
091    UTIL2.deleteTableData(peerTableName);
092  }
093
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    TestReplicationBase.setUpBeforeClass();
097
098    TableDescriptor peerTable =
099      TableDescriptorBuilder.newBuilder(peerTableName)
100        .setColumnFamily(
101          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
102        .build();
103
104    Connection connection2 = ConnectionFactory.createConnection(CONF2);
105    try (Admin admin2 = connection2.getAdmin()) {
106      admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
107    }
108    htable3 = connection2.getTable(peerTableName);
109  }
110
111  static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
112    throws IOException, InterruptedException, ClassNotFoundException {
113    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
114    if (job == null) {
115      fail("Job wasn't created, see the log");
116    }
117    if (!job.waitForCompletion(true)) {
118      fail("Job failed, see the log");
119    }
120    assertEquals(expectedGoodRows,
121      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
122    assertEquals(expectedBadRows,
123      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
124  }
125
126  /**
127   * Do a small loading into a table, make sure the data is really the same, then run the
128   * VerifyReplication job to check the results. Do a second comparison where all the cells are
129   * different.
130   */
131  @Test
132  public void testVerifyRepJob() throws Exception {
133    // Populate the tables, at the same time it guarantees that the tables are
134    // identical since it does the check
135    runSmallBatchTest();
136
137    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
138    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
139
140    Scan scan = new Scan();
141    ResultScanner rs = htable2.getScanner(scan);
142    Put put = null;
143    for (Result result : rs) {
144      put = new Put(result.getRow());
145      Cell firstVal = result.rawCells()[0];
146      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
147        Bytes.toBytes("diff data"));
148      htable2.put(put);
149    }
150    Delete delete = new Delete(put.getRow());
151    htable2.delete(delete);
152    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
153  }
154
155  /**
156   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
157   * delete marker is replicated, run verify replication with and without raw to check the results.
158   */
159  @Test
160  public void testVerifyRepJobWithRawOptions() throws Exception {
161    LOG.info(name.getMethodName());
162
163    final TableName tableName = TableName.valueOf(name.getMethodName());
164    byte[] familyname = Bytes.toBytes("fam_raw");
165    byte[] row = Bytes.toBytes("row_raw");
166
167    Table lHtable1 = null;
168    Table lHtable2 = null;
169
170    try {
171      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
172        .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
173      TableDescriptor table =
174        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
175
176      Connection connection1 = ConnectionFactory.createConnection(CONF1);
177      Connection connection2 = ConnectionFactory.createConnection(CONF2);
178      try (Admin admin1 = connection1.getAdmin()) {
179        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
180      }
181      try (Admin admin2 = connection2.getAdmin()) {
182        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
183      }
184      UTIL1.waitUntilAllRegionsAssigned(tableName);
185      UTIL2.waitUntilAllRegionsAssigned(tableName);
186
187      lHtable1 = UTIL1.getConnection().getTable(tableName);
188      lHtable2 = UTIL2.getConnection().getTable(tableName);
189
190      Put put = new Put(row);
191      put.addColumn(familyname, row, row);
192      lHtable1.put(put);
193
194      Get get = new Get(row);
195      for (int i = 0; i < NB_RETRIES; i++) {
196        if (i == NB_RETRIES - 1) {
197          fail("Waited too much time for put replication");
198        }
199        Result res = lHtable2.get(get);
200        if (res.isEmpty()) {
201          LOG.info("Row not available");
202          Thread.sleep(SLEEP_TIME);
203        } else {
204          assertArrayEquals(res.value(), row);
205          break;
206        }
207      }
208
209      Delete del = new Delete(row);
210      lHtable1.delete(del);
211
212      get = new Get(row);
213      for (int i = 0; i < NB_RETRIES; i++) {
214        if (i == NB_RETRIES - 1) {
215          fail("Waited too much time for del replication");
216        }
217        Result res = lHtable2.get(get);
218        if (res.size() >= 1) {
219          LOG.info("Row not deleted");
220          Thread.sleep(SLEEP_TIME);
221        } else {
222          break;
223        }
224      }
225
226      // Checking verifyReplication for the default behavior.
227      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
228      runVerifyReplication(argsWithoutRaw, 0, 0);
229
230      // Checking verifyReplication with raw
231      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
232      runVerifyReplication(argsWithRawAsTrue, 1, 0);
233    } finally {
234      if (lHtable1 != null) {
235        lHtable1.close();
236      }
237      if (lHtable2 != null) {
238        lHtable2.close();
239      }
240    }
241  }
242
243  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
244    throws IOException {
245    FileSystem fs = FileSystem.get(conf);
246    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
247    assertNotNull(subDirectories);
248    assertEquals(subDirectories.length, expectedCount);
249    for (int i = 0; i < expectedCount; i++) {
250      assertTrue(subDirectories[i].isDirectory());
251    }
252  }
253
254  @Test
255  public void testVerifyRepJobWithQuorumAddress() throws Exception {
256    // Populate the tables, at the same time it guarantees that the tables are
257    // identical since it does the check
258    runSmallBatchTest();
259
260    // with a quorum address (a cluster key)
261    String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() };
262    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
263
264    Scan scan = new Scan();
265    ResultScanner rs = htable2.getScanner(scan);
266    Put put = null;
267    for (Result result : rs) {
268      put = new Put(result.getRow());
269      Cell firstVal = result.rawCells()[0];
270      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
271        Bytes.toBytes("diff data"));
272      htable2.put(put);
273    }
274    Delete delete = new Delete(put.getRow());
275    htable2.delete(delete);
276    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
277  }
278
279  @Test
280  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
281    // Populate the tables, at the same time it guarantees that the tables are
282    // identical since it does the check
283    runSmallBatchTest();
284
285    // Take source and target tables snapshot
286    Path rootDir = CommonFSUtils.getRootDir(CONF1);
287    FileSystem fs = rootDir.getFileSystem(CONF1);
288    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
289    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
290      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
291
292    // Take target snapshot
293    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
294    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
295    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
296    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
297      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
298
299    String peerFSAddress = peerFs.getUri().toString();
300    String tmpPath1 = UTIL1.getRandomDir().toString();
301    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
302
303    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
304      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
305      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
306      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
307      tableName.getNameAsString() };
308    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
309    checkRestoreTmpDir(CONF1, tmpPath1, 1);
310    checkRestoreTmpDir(CONF2, tmpPath2, 1);
311
312    Scan scan = new Scan();
313    ResultScanner rs = htable2.getScanner(scan);
314    Put put = null;
315    for (Result result : rs) {
316      put = new Put(result.getRow());
317      Cell firstVal = result.rawCells()[0];
318      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
319        Bytes.toBytes("diff data"));
320      htable2.put(put);
321    }
322    Delete delete = new Delete(put.getRow());
323    htable2.delete(delete);
324
325    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
326    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
327      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
328
329    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
330    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
331      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
332
333    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
334      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
335      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
336      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
337      tableName.getNameAsString() };
338    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
339    checkRestoreTmpDir(CONF1, tmpPath1, 2);
340    checkRestoreTmpDir(CONF2, tmpPath2, 2);
341  }
342
343  static void runBatchCopyTest() throws Exception {
344    // normal Batch tests for htable1
345    loadData("", row, noRepfamName);
346
347    Scan scan1 = new Scan();
348    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
349    ResultScanner scanner1 = htable1.getScanner(scan1);
350    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
351    for (Result result : res1) {
352      Put put = new Put(result.getRow());
353      for (Cell cell : result.rawCells()) {
354        put.add(cell);
355      }
356      puts.add(put);
357    }
358    scanner1.close();
359    assertEquals(NB_ROWS_IN_BATCH, res1.length);
360
361    // Copy the data to htable3
362    htable3.put(puts);
363
364    Scan scan2 = new Scan();
365    ResultScanner scanner2 = htable3.getScanner(scan2);
366    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
367    scanner2.close();
368    assertEquals(NB_ROWS_IN_BATCH, res2.length);
369  }
370
371  @Test
372  public void testVerifyRepJobWithPeerTableName() throws Exception {
373    // Populate the tables with same data
374    runBatchCopyTest();
375
376    // with a peerTableName along with quorum address (a cluster key)
377    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
378      UTIL2.getClusterKey(), tableName.getNameAsString() };
379    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
380
381    UTIL2.deleteTableData(peerTableName);
382    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
383  }
384
385  @Test
386  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
387    // Populate the tables with same data
388    runBatchCopyTest();
389
390    // Take source and target tables snapshot
391    Path rootDir = CommonFSUtils.getRootDir(CONF1);
392    FileSystem fs = rootDir.getFileSystem(CONF1);
393    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
394    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
395      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
396
397    // Take target snapshot
398    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
399    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
400    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
401    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
402      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
403
404    String peerFSAddress = peerFs.getUri().toString();
405    String tmpPath1 = UTIL1.getRandomDir().toString();
406    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
407
408    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
409      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
410      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
411      "--peerFSAddress=" + peerFSAddress,
412      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
413      tableName.getNameAsString() };
414    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
415    checkRestoreTmpDir(CONF1, tmpPath1, 1);
416    checkRestoreTmpDir(CONF2, tmpPath2, 1);
417
418    Scan scan = new Scan();
419    ResultScanner rs = htable3.getScanner(scan);
420    Put put = null;
421    for (Result result : rs) {
422      put = new Put(result.getRow());
423      Cell firstVal = result.rawCells()[0];
424      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
425        Bytes.toBytes("diff data"));
426      htable3.put(put);
427    }
428    Delete delete = new Delete(put.getRow());
429    htable3.delete(delete);
430
431    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
432    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
433      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
434
435    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
436    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
437      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
438
439    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
440      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
441      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
442      "--peerFSAddress=" + peerFSAddress,
443      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
444      tableName.getNameAsString() };
445    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
446    checkRestoreTmpDir(CONF1, tmpPath1, 2);
447    checkRestoreTmpDir(CONF2, tmpPath2, 2);
448  }
449
450  @AfterClass
451  public static void tearDownAfterClass() throws Exception {
452    htable3.close();
453    TestReplicationBase.tearDownAfterClass();
454  }
455}