001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.ReplicationTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.mapreduce.Job;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Category({ ReplicationTests.class, LargeTests.class })
072public class TestVerifyReplication extends TestReplicationBase {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestVerifyReplication.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
079
080  private static final String PEER_ID = "2";
081  private static final TableName peerTableName = TableName.valueOf("peerTest");
082  private static Table htable3;
083
084  @Rule
085  public TestName name = new TestName();
086
087  @Before
088  public void setUp() throws Exception {
089    cleanUp();
090    UTIL2.deleteTableData(peerTableName);
091  }
092
093  @BeforeClass
094  public static void setUpBeforeClass() throws Exception {
095    TestReplicationBase.setUpBeforeClass();
096
097    TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily(
098                    ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100)
099                            .build()).build();
100
101    Connection connection2 = ConnectionFactory.createConnection(CONF2);
102    try (Admin admin2 = connection2.getAdmin()) {
103      admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
104    }
105    htable3 = connection2.getTable(peerTableName);
106  }
107
108  static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
109      throws IOException, InterruptedException, ClassNotFoundException {
110    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
111    if (job == null) {
112      fail("Job wasn't created, see the log");
113    }
114    if (!job.waitForCompletion(true)) {
115      fail("Job failed, see the log");
116    }
117    assertEquals(expectedGoodRows,
118      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
119    assertEquals(expectedBadRows,
120      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
121  }
122
123  /**
124   * Do a small loading into a table, make sure the data is really the same, then run the
125   * VerifyReplication job to check the results. Do a second comparison where all the cells are
126   * different.
127   */
128  @Test
129  public void testVerifyRepJob() throws Exception {
130    // Populate the tables, at the same time it guarantees that the tables are
131    // identical since it does the check
132    runSmallBatchTest();
133
134    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
135    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
136
137    Scan scan = new Scan();
138    ResultScanner rs = htable2.getScanner(scan);
139    Put put = null;
140    for (Result result : rs) {
141      put = new Put(result.getRow());
142      Cell firstVal = result.rawCells()[0];
143      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
144        Bytes.toBytes("diff data"));
145      htable2.put(put);
146    }
147    Delete delete = new Delete(put.getRow());
148    htable2.delete(delete);
149    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
150  }
151
152  /**
153   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
154   * delete marker is replicated, run verify replication with and without raw to check the results.
155   */
156  @Test
157  public void testVerifyRepJobWithRawOptions() throws Exception {
158    LOG.info(name.getMethodName());
159
160    final TableName tableName = TableName.valueOf(name.getMethodName());
161    byte[] familyname = Bytes.toBytes("fam_raw");
162    byte[] row = Bytes.toBytes("row_raw");
163
164    Table lHtable1 = null;
165    Table lHtable2 = null;
166
167    try {
168      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
169          .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
170      TableDescriptor table =
171          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
172
173      Connection connection1 = ConnectionFactory.createConnection(CONF1);
174      Connection connection2 = ConnectionFactory.createConnection(CONF2);
175      try (Admin admin1 = connection1.getAdmin()) {
176        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
177      }
178      try (Admin admin2 = connection2.getAdmin()) {
179        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
180      }
181      UTIL1.waitUntilAllRegionsAssigned(tableName);
182      UTIL2.waitUntilAllRegionsAssigned(tableName);
183
184      lHtable1 = UTIL1.getConnection().getTable(tableName);
185      lHtable2 = UTIL2.getConnection().getTable(tableName);
186
187      Put put = new Put(row);
188      put.addColumn(familyname, row, row);
189      lHtable1.put(put);
190
191      Get get = new Get(row);
192      for (int i = 0; i < NB_RETRIES; i++) {
193        if (i == NB_RETRIES - 1) {
194          fail("Waited too much time for put replication");
195        }
196        Result res = lHtable2.get(get);
197        if (res.isEmpty()) {
198          LOG.info("Row not available");
199          Thread.sleep(SLEEP_TIME);
200        } else {
201          assertArrayEquals(res.value(), row);
202          break;
203        }
204      }
205
206      Delete del = new Delete(row);
207      lHtable1.delete(del);
208
209      get = new Get(row);
210      for (int i = 0; i < NB_RETRIES; i++) {
211        if (i == NB_RETRIES - 1) {
212          fail("Waited too much time for del replication");
213        }
214        Result res = lHtable2.get(get);
215        if (res.size() >= 1) {
216          LOG.info("Row not deleted");
217          Thread.sleep(SLEEP_TIME);
218        } else {
219          break;
220        }
221      }
222
223      // Checking verifyReplication for the default behavior.
224      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
225      runVerifyReplication(argsWithoutRaw, 0, 0);
226
227      // Checking verifyReplication with raw
228      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
229      runVerifyReplication(argsWithRawAsTrue, 1, 0);
230    } finally {
231      if (lHtable1 != null) {
232        lHtable1.close();
233      }
234      if (lHtable2 != null) {
235        lHtable2.close();
236      }
237    }
238  }
239
240  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
241      throws IOException {
242    FileSystem fs = FileSystem.get(conf);
243    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
244    assertNotNull(subDirectories);
245    assertEquals(subDirectories.length, expectedCount);
246    for (int i = 0; i < expectedCount; i++) {
247      assertTrue(subDirectories[i].isDirectory());
248    }
249  }
250
251
252  @Test
253  public void testVerifyRepJobWithQuorumAddress() throws Exception {
254    // Populate the tables, at the same time it guarantees that the tables are
255    // identical since it does the check
256    runSmallBatchTest();
257
258    // with a quorum address (a cluster key)
259    String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() };
260    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
261
262    Scan scan = new Scan();
263    ResultScanner rs = htable2.getScanner(scan);
264    Put put = null;
265    for (Result result : rs) {
266      put = new Put(result.getRow());
267      Cell firstVal = result.rawCells()[0];
268      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
269        Bytes.toBytes("diff data"));
270      htable2.put(put);
271    }
272    Delete delete = new Delete(put.getRow());
273    htable2.delete(delete);
274    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
275  }
276
277  @Test
278  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
279    // Populate the tables, at the same time it guarantees that the tables are
280    // identical since it does the check
281    runSmallBatchTest();
282
283    // Take source and target tables snapshot
284    Path rootDir = CommonFSUtils.getRootDir(CONF1);
285    FileSystem fs = rootDir.getFileSystem(CONF1);
286    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
287    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
288      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
289
290    // Take target snapshot
291    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
292    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
293    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
294    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
295      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
296
297    String peerFSAddress = peerFs.getUri().toString();
298    String tmpPath1 = UTIL1.getRandomDir().toString();
299    String tmpPath2 = "/tmp" + System.currentTimeMillis();
300
301    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
302      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
303      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
304      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
305      tableName.getNameAsString() };
306    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
307    checkRestoreTmpDir(CONF1, tmpPath1, 1);
308    checkRestoreTmpDir(CONF2, tmpPath2, 1);
309
310    Scan scan = new Scan();
311    ResultScanner rs = htable2.getScanner(scan);
312    Put put = null;
313    for (Result result : rs) {
314      put = new Put(result.getRow());
315      Cell firstVal = result.rawCells()[0];
316      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
317        Bytes.toBytes("diff data"));
318      htable2.put(put);
319    }
320    Delete delete = new Delete(put.getRow());
321    htable2.delete(delete);
322
323    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
324    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
325      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
326
327    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
328    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
329      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
330
331    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
332      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
333      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
334      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
335      tableName.getNameAsString() };
336    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
337    checkRestoreTmpDir(CONF1, tmpPath1, 2);
338    checkRestoreTmpDir(CONF2, tmpPath2, 2);
339  }
340
341  static void runBatchCopyTest() throws Exception {
342    // normal Batch tests for htable1
343    loadData("", row, noRepfamName);
344
345    Scan scan1 = new Scan();
346    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
347    ResultScanner scanner1 = htable1.getScanner(scan1);
348    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
349    for (Result result : res1) {
350      Put put = new Put(result.getRow());
351      for (Cell cell : result.rawCells()) {
352        put.add(cell);
353      }
354      puts.add(put);
355    }
356    scanner1.close();
357    assertEquals(NB_ROWS_IN_BATCH, res1.length);
358
359    // Copy the data to htable3
360    htable3.put(puts);
361
362    Scan scan2 = new Scan();
363    ResultScanner scanner2 = htable3.getScanner(scan2);
364    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
365    scanner2.close();
366    assertEquals(NB_ROWS_IN_BATCH, res2.length);
367  }
368
369  @Test
370  public void testVerifyRepJobWithPeerTableName() throws Exception {
371    // Populate the tables with same data
372    runBatchCopyTest();
373
374    // with a peerTableName along with quorum address (a cluster key)
375    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
376        UTIL2.getClusterKey(), tableName.getNameAsString() };
377    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
378
379    UTIL2.deleteTableData(peerTableName);
380    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
381  }
382
383  @Test
384  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
385    // Populate the tables with same data
386    runBatchCopyTest();
387
388    // Take source and target tables snapshot
389    Path rootDir = CommonFSUtils.getRootDir(CONF1);
390    FileSystem fs = rootDir.getFileSystem(CONF1);
391    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
392    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
393            Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
394
395    // Take target snapshot
396    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
397    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
398    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
399    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
400            Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
401
402    String peerFSAddress = peerFs.getUri().toString();
403    String tmpPath1 = UTIL1.getRandomDir().toString();
404    String tmpPath2 = "/tmp" + System.currentTimeMillis();
405
406    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
407      "--sourceSnapshotName=" + sourceSnapshotName,
408      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
409      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
410      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
411      tableName.getNameAsString() };
412    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
413    checkRestoreTmpDir(CONF1, tmpPath1, 1);
414    checkRestoreTmpDir(CONF2, tmpPath2, 1);
415
416    Scan scan = new Scan();
417    ResultScanner rs = htable3.getScanner(scan);
418    Put put = null;
419    for (Result result : rs) {
420      put = new Put(result.getRow());
421      Cell firstVal = result.rawCells()[0];
422      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
423              Bytes.toBytes("diff data"));
424      htable3.put(put);
425    }
426    Delete delete = new Delete(put.getRow());
427    htable3.delete(delete);
428
429    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
430    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
431            Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
432
433    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
434    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
435            Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
436
437    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
438      "--sourceSnapshotName=" + sourceSnapshotName,
439      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
440      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
441      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
442      tableName.getNameAsString() };
443    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
444    checkRestoreTmpDir(CONF1, tmpPath1, 2);
445    checkRestoreTmpDir(CONF2, tmpPath2, 2);
446  }
447
448  @AfterClass
449  public static void tearDownAfterClass() throws Exception {
450    htable3.close();
451    TestReplicationBase.tearDownAfterClass();
452  }
453}