001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.TreeMap;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.ReplicationTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.mapreduce.Job;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071@Category({ ReplicationTests.class, LargeTests.class })
072public class TestVerifyReplication extends TestReplicationBase {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestVerifyReplication.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
079
080  private static final String PEER_ID = "2";
081
082  @Rule
083  public TestName name = new TestName();
084
085  @Before
086  public void setUp() throws Exception {
087    cleanUp();
088  }
089
090  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
091      throws IOException, InterruptedException, ClassNotFoundException {
092    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
093    if (job == null) {
094      fail("Job wasn't created, see the log");
095    }
096    if (!job.waitForCompletion(true)) {
097      fail("Job failed, see the log");
098    }
099    assertEquals(expectedGoodRows,
100      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
101    assertEquals(expectedBadRows,
102      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
103  }
104
105  /**
106   * Do a small loading into a table, make sure the data is really the same, then run the
107   * VerifyReplication job to check the results. Do a second comparison where all the cells are
108   * different.
109   */
110  @Test
111  public void testVerifyRepJob() throws Exception {
112    // Populate the tables, at the same time it guarantees that the tables are
113    // identical since it does the check
114    runSmallBatchTest();
115
116    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
117    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
118
119    Scan scan = new Scan();
120    ResultScanner rs = htable2.getScanner(scan);
121    Put put = null;
122    for (Result result : rs) {
123      put = new Put(result.getRow());
124      Cell firstVal = result.rawCells()[0];
125      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
126        Bytes.toBytes("diff data"));
127      htable2.put(put);
128    }
129    Delete delete = new Delete(put.getRow());
130    htable2.delete(delete);
131    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
132  }
133
134  /**
135   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
136   * delete marker is replicated, run verify replication with and without raw to check the results.
137   */
138  @Test
139  public void testVerifyRepJobWithRawOptions() throws Exception {
140    LOG.info(name.getMethodName());
141
142    final TableName tableName = TableName.valueOf(name.getMethodName());
143    byte[] familyname = Bytes.toBytes("fam_raw");
144    byte[] row = Bytes.toBytes("row_raw");
145
146    Table lHtable1 = null;
147    Table lHtable2 = null;
148
149    try {
150      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
151          .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
152      TableDescriptor table =
153          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
154      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
155      for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
156        scopes.put(f.getName(), f.getScope());
157      }
158
159      Connection connection1 = ConnectionFactory.createConnection(conf1);
160      Connection connection2 = ConnectionFactory.createConnection(conf2);
161      try (Admin admin1 = connection1.getAdmin()) {
162        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
163      }
164      try (Admin admin2 = connection2.getAdmin()) {
165        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
166      }
167      utility1.waitUntilAllRegionsAssigned(tableName);
168      utility2.waitUntilAllRegionsAssigned(tableName);
169
170      lHtable1 = utility1.getConnection().getTable(tableName);
171      lHtable2 = utility2.getConnection().getTable(tableName);
172
173      Put put = new Put(row);
174      put.addColumn(familyname, row, row);
175      lHtable1.put(put);
176
177      Get get = new Get(row);
178      for (int i = 0; i < NB_RETRIES; i++) {
179        if (i == NB_RETRIES - 1) {
180          fail("Waited too much time for put replication");
181        }
182        Result res = lHtable2.get(get);
183        if (res.isEmpty()) {
184          LOG.info("Row not available");
185          Thread.sleep(SLEEP_TIME);
186        } else {
187          assertArrayEquals(res.value(), row);
188          break;
189        }
190      }
191
192      Delete del = new Delete(row);
193      lHtable1.delete(del);
194
195      get = new Get(row);
196      for (int i = 0; i < NB_RETRIES; i++) {
197        if (i == NB_RETRIES - 1) {
198          fail("Waited too much time for del replication");
199        }
200        Result res = lHtable2.get(get);
201        if (res.size() >= 1) {
202          LOG.info("Row not deleted");
203          Thread.sleep(SLEEP_TIME);
204        } else {
205          break;
206        }
207      }
208
209      // Checking verifyReplication for the default behavior.
210      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
211      runVerifyReplication(argsWithoutRaw, 0, 0);
212
213      // Checking verifyReplication with raw
214      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
215      runVerifyReplication(argsWithRawAsTrue, 1, 0);
216    } finally {
217      if (lHtable1 != null) {
218        lHtable1.close();
219      }
220      if (lHtable2 != null) {
221        lHtable2.close();
222      }
223    }
224  }
225
226  // VerifyReplication should honor versions option
227  @Test
228  public void testHBase14905() throws Exception {
229    // normal Batch tests
230    byte[] qualifierName = Bytes.toBytes("f1");
231    Put put = new Put(Bytes.toBytes("r1"));
232    long ts = System.currentTimeMillis();
233    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002"));
234    htable1.put(put);
235    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001"));
236    htable1.put(put);
237    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112"));
238    htable1.put(put);
239
240    Scan scan = new Scan();
241    scan.readVersions(100);
242    ResultScanner scanner1 = htable1.getScanner(scan);
243    Result[] res1 = scanner1.next(1);
244    scanner1.close();
245
246    assertEquals(1, res1.length);
247    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
248
249    for (int i = 0; i < NB_RETRIES; i++) {
250      scan = new Scan();
251      scan.readVersions(100);
252      scanner1 = htable2.getScanner(scan);
253      res1 = scanner1.next(1);
254      scanner1.close();
255      if (res1.length != 1) {
256        LOG.info("Only got " + res1.length + " rows");
257        Thread.sleep(SLEEP_TIME);
258      } else {
259        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
260        if (cellNumber != 3) {
261          LOG.info("Only got " + cellNumber + " cells");
262          Thread.sleep(SLEEP_TIME);
263        } else {
264          break;
265        }
266      }
267      if (i == NB_RETRIES - 1) {
268        fail("Waited too much time for normal batch replication");
269      }
270    }
271
272    put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111"));
273    htable2.put(put);
274    put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112"));
275    htable2.put(put);
276
277    scan = new Scan();
278    scan.readVersions(100);
279    scanner1 = htable2.getScanner(scan);
280    res1 = scanner1.next(NB_ROWS_IN_BATCH);
281    scanner1.close();
282
283    assertEquals(1, res1.length);
284    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
285
286    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
287    runVerifyReplication(args, 0, 1);
288  }
289
290  // VerifyReplication should honor versions option
291  @Test
292  public void testVersionMismatchHBase14905() throws Exception {
293    // normal Batch tests
294    byte[] qualifierName = Bytes.toBytes("f1");
295    Put put = new Put(Bytes.toBytes("r1"));
296    long ts = System.currentTimeMillis();
297    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
298    htable1.put(put);
299    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
300    htable1.put(put);
301    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
302    htable1.put(put);
303
304    Scan scan = new Scan();
305    scan.readVersions(100);
306    ResultScanner scanner1 = htable1.getScanner(scan);
307    Result[] res1 = scanner1.next(1);
308    scanner1.close();
309
310    assertEquals(1, res1.length);
311    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
312
313    for (int i = 0; i < NB_RETRIES; i++) {
314      scan = new Scan();
315      scan.readVersions(100);
316      scanner1 = htable2.getScanner(scan);
317      res1 = scanner1.next(1);
318      scanner1.close();
319      if (res1.length != 1) {
320        LOG.info("Only got " + res1.length + " rows");
321        Thread.sleep(SLEEP_TIME);
322      } else {
323        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
324        if (cellNumber != 3) {
325          LOG.info("Only got " + cellNumber + " cells");
326          Thread.sleep(SLEEP_TIME);
327        } else {
328          break;
329        }
330      }
331      if (i == NB_RETRIES - 1) {
332        fail("Waited too much time for normal batch replication");
333      }
334    }
335
336    try {
337      // Disabling replication and modifying the particular version of the cell to validate the
338      // feature.
339      hbaseAdmin.disableReplicationPeer(PEER_ID);
340      Put put2 = new Put(Bytes.toBytes("r1"));
341      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
342      htable2.put(put2);
343
344      scan = new Scan();
345      scan.readVersions(100);
346      scanner1 = htable2.getScanner(scan);
347      res1 = scanner1.next(NB_ROWS_IN_BATCH);
348      scanner1.close();
349      assertEquals(1, res1.length);
350      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
351
352      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
353      runVerifyReplication(args, 0, 1);
354    } finally {
355      hbaseAdmin.enableReplicationPeer(PEER_ID);
356    }
357  }
358
359  @Test
360  public void testVerifyReplicationPrefixFiltering() throws Exception {
361    final byte[] prefixRow = Bytes.toBytes("prefixrow");
362    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
363    loadData("prefixrow", prefixRow);
364    loadData("secondrow", prefixRow2);
365    loadData("aaa", row);
366    loadData("zzz", row);
367    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
368    String[] args =
369        new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
370    runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
371  }
372
373  @Test
374  public void testVerifyReplicationSnapshotArguments() {
375    String[] args =
376        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
377    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
378
379    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
380    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
381
382    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
383        tableName.getNameAsString() };
384    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
385
386    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
387    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
388
389    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
390    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
391
392    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
393        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
394        tableName.getNameAsString() };
395    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
396
397    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
398        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
399        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
400
401    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
402  }
403
404  private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
405      throws IOException {
406    FileSystem fs = FileSystem.get(conf);
407    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
408    assertNotNull(subDirectories);
409    assertEquals(subDirectories.length, expectedCount);
410    for (int i = 0; i < expectedCount; i++) {
411      assertTrue(subDirectories[i].isDirectory());
412    }
413  }
414
415  @Test
416  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
417    // Populate the tables, at the same time it guarantees that the tables are
418    // identical since it does the check
419    runSmallBatchTest();
420
421    // Take source and target tables snapshot
422    Path rootDir = FSUtils.getRootDir(conf1);
423    FileSystem fs = rootDir.getFileSystem(conf1);
424    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
425    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
426      new String(famName), sourceSnapshotName, rootDir, fs, true);
427
428    // Take target snapshot
429    Path peerRootDir = FSUtils.getRootDir(conf2);
430    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
431    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
432    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
433      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
434
435    String peerFSAddress = peerFs.getUri().toString();
436    String temPath1 = utility1.getRandomDir().toString();
437    String temPath2 = "/tmp2";
438
439    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
440        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
441        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
442        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
443
444    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
445    if (job == null) {
446      fail("Job wasn't created, see the log");
447    }
448    if (!job.waitForCompletion(true)) {
449      fail("Job failed, see the log");
450    }
451    assertEquals(NB_ROWS_IN_BATCH,
452      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
453    assertEquals(0,
454      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
455
456    checkRestoreTmpDir(conf1, temPath1, 1);
457    checkRestoreTmpDir(conf2, temPath2, 1);
458
459    Scan scan = new Scan();
460    ResultScanner rs = htable2.getScanner(scan);
461    Put put = null;
462    for (Result result : rs) {
463      put = new Put(result.getRow());
464      Cell firstVal = result.rawCells()[0];
465      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
466        Bytes.toBytes("diff data"));
467      htable2.put(put);
468    }
469    Delete delete = new Delete(put.getRow());
470    htable2.delete(delete);
471
472    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
473    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
474      new String(famName), sourceSnapshotName, rootDir, fs, true);
475
476    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
477    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
478      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
479
480    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
481        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
482        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
483        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
484
485    job = new VerifyReplication().createSubmittableJob(conf1, args);
486    if (job == null) {
487      fail("Job wasn't created, see the log");
488    }
489    if (!job.waitForCompletion(true)) {
490      fail("Job failed, see the log");
491    }
492    assertEquals(0,
493      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
494    assertEquals(NB_ROWS_IN_BATCH,
495      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
496
497    checkRestoreTmpDir(conf1, temPath1, 2);
498    checkRestoreTmpDir(conf2, temPath2, 2);
499  }
500}