001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
044import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.ReplicationTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.junit.jupiter.api.AfterAll;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060/**
061 * We moved some of {@link TestVerifyReplicationZkClusterKey}'s tests here because it could take too
062 * long to complete. In here we have miscellaneous.
063 */
064@Tag(ReplicationTests.TAG)
065@Tag(LargeTests.TAG)
066public class TestVerifyReplicationAdjunct extends TestReplicationBase {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplicationAdjunct.class);
069
070  private static final String PEER_ID = "2";
071  private static final TableName peerTableName = TableName.valueOf("peerTest");
072  private static Table htable3;
073
074  @Override
075  protected String getClusterKey(HBaseTestingUtil util) throws Exception {
076    // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
077    // key, as in this test we will pass the cluster key config in peer config directly to
078    // VerifyReplication job.
079    return util.getClusterKey();
080  }
081
082  @BeforeEach
083  public void setUp() throws Exception {
084    cleanUp();
085    UTIL2.deleteTableData(peerTableName);
086  }
087
088  @BeforeAll
089  public static void setUpBeforeClass() throws Exception {
090    TableDescriptor peerTable =
091      TableDescriptorBuilder.newBuilder(peerTableName)
092        .setColumnFamily(
093          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
094        .build();
095    Connection connection2 = ConnectionFactory.createConnection(CONF2);
096    try (Admin admin2 = connection2.getAdmin()) {
097      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
098    }
099    htable3 = connection2.getTable(peerTableName);
100  }
101
102  // VerifyReplication should honor versions option
103  @Test
104  public void testHBase14905() throws Exception {
105    // normal Batch tests
106    byte[] qualifierName = Bytes.toBytes("f1");
107    Put put = new Put(Bytes.toBytes("r1"));
108    long ts = EnvironmentEdgeManager.currentTime();
109    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002"));
110    htable1.put(put);
111    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001"));
112    htable1.put(put);
113    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112"));
114    htable1.put(put);
115
116    Scan scan = new Scan();
117    scan.readVersions(100);
118    ResultScanner scanner1 = htable1.getScanner(scan);
119    Result[] res1 = scanner1.next(1);
120    scanner1.close();
121
122    assertEquals(1, res1.length);
123    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
124
125    for (int i = 0; i < NB_RETRIES; i++) {
126      scan = new Scan();
127      scan.readVersions(100);
128      scanner1 = htable2.getScanner(scan);
129      res1 = scanner1.next(1);
130      scanner1.close();
131      if (res1.length != 1) {
132        LOG.info("Only got " + res1.length + " rows");
133        Thread.sleep(SLEEP_TIME);
134      } else {
135        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
136        if (cellNumber != 3) {
137          LOG.info("Only got " + cellNumber + " cells");
138          Thread.sleep(SLEEP_TIME);
139        } else {
140          break;
141        }
142      }
143      if (i == NB_RETRIES - 1) {
144        fail("Waited too much time for normal batch replication");
145      }
146    }
147
148    put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111"));
149    htable2.put(put);
150    put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112"));
151    htable2.put(put);
152
153    scan = new Scan();
154    scan.readVersions(100);
155    scanner1 = htable2.getScanner(scan);
156    res1 = scanner1.next(NB_ROWS_IN_BATCH);
157    scanner1.close();
158
159    assertEquals(1, res1.length);
160    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
161
162    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
163    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1);
164  }
165
166  // VerifyReplication should honor versions option
167  @Test
168  public void testVersionMismatchHBase14905() throws Exception {
169    // normal Batch tests
170    byte[] qualifierName = Bytes.toBytes("f1");
171    Put put = new Put(Bytes.toBytes("r1"));
172    long ts = EnvironmentEdgeManager.currentTime();
173    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
174    htable1.put(put);
175    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
176    htable1.put(put);
177    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
178    htable1.put(put);
179
180    Scan scan = new Scan();
181    scan.readVersions(100);
182    ResultScanner scanner1 = htable1.getScanner(scan);
183    Result[] res1 = scanner1.next(1);
184    scanner1.close();
185
186    assertEquals(1, res1.length);
187    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
188
189    for (int i = 0; i < NB_RETRIES; i++) {
190      scan = new Scan();
191      scan.readVersions(100);
192      scanner1 = htable2.getScanner(scan);
193      res1 = scanner1.next(1);
194      scanner1.close();
195      if (res1.length != 1) {
196        LOG.info("Only got " + res1.length + " rows");
197        Thread.sleep(SLEEP_TIME);
198      } else {
199        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
200        if (cellNumber != 3) {
201          LOG.info("Only got " + cellNumber + " cells");
202          Thread.sleep(SLEEP_TIME);
203        } else {
204          break;
205        }
206      }
207      if (i == NB_RETRIES - 1) {
208        fail("Waited too much time for normal batch replication");
209      }
210    }
211
212    try {
213      // Disabling replication and modifying the particular version of the cell to validate the
214      // feature.
215      hbaseAdmin.disableReplicationPeer(PEER_ID);
216      Put put2 = new Put(Bytes.toBytes("r1"));
217      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
218      htable2.put(put2);
219
220      scan = new Scan();
221      scan.readVersions(100);
222      scanner1 = htable2.getScanner(scan);
223      res1 = scanner1.next(NB_ROWS_IN_BATCH);
224      scanner1.close();
225      assertEquals(1, res1.length);
226      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
227
228      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
229      TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1);
230    } finally {
231      hbaseAdmin.enableReplicationPeer(PEER_ID);
232    }
233  }
234
235  @Test
236  public void testVerifyReplicationPrefixFiltering() throws Exception {
237    final byte[] prefixRow = Bytes.toBytes("prefixrow");
238    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
239    loadData("prefixrow", prefixRow);
240    loadData("secondrow", prefixRow2);
241    loadData("aaa", row);
242    loadData("zzz", row);
243    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
244    String[] args =
245      new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
246    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
247  }
248
249  @Test
250  public void testVerifyReplicationSnapshotArguments() {
251    String[] args =
252      new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
253    assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
254
255    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
256    assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
257
258    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
259      tableName.getNameAsString() };
260    assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
261
262    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
263    assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
264
265    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
266    assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
267
268    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
269      "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
270      tableName.getNameAsString() };
271    assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
272
273    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
274      "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
275      "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
276
277    assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString());
278  }
279
280  @Test
281  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
282    // Populate the tables, at the same time it guarantees that the tables are
283    // identical since it does the check
284    runSmallBatchTest();
285
286    // Take source and target tables snapshot
287    Path rootDir = CommonFSUtils.getRootDir(CONF1);
288    FileSystem fs = rootDir.getFileSystem(CONF1);
289    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
290    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
291      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
292
293    // Take target snapshot
294    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
295    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
296    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
297    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
298      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
299
300    String peerFSAddress = peerFs.getUri().toString();
301    String temPath1 = UTIL1.getRandomDir().toString();
302    String temPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
303
304    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
305      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
306      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
307      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
308      tableName.getNameAsString() };
309    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
310    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 1);
311    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 1);
312
313    Scan scan = new Scan();
314    ResultScanner rs = htable2.getScanner(scan);
315    Put put = null;
316    for (Result result : rs) {
317      put = new Put(result.getRow());
318      Cell firstVal = result.rawCells()[0];
319      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
320        Bytes.toBytes("diff data"));
321      htable2.put(put);
322    }
323    Delete delete = new Delete(put.getRow());
324    htable2.delete(delete);
325
326    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
327    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
328      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
329
330    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
331    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
332      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
333
334    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
335      "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
336      "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
337      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2",
338      tableName.getNameAsString() };
339    TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
340    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 2);
341    TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 2);
342  }
343
344  @AfterAll
345  public static void tearDownAfterClass() throws Exception {
346    htable3.close();
347  }
348}