001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 044import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.testclassification.ReplicationTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.BeforeEach; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059 060/** 061 * We moved some of {@link TestVerifyReplicationZkClusterKey}'s tests here because it could take too 062 * long to complete. In here we have miscellaneous. 063 */ 064@Tag(ReplicationTests.TAG) 065@Tag(LargeTests.TAG) 066public class TestVerifyReplicationAdjunct extends TestReplicationBase { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplicationAdjunct.class); 069 070 private static final String PEER_ID = "2"; 071 private static final TableName peerTableName = TableName.valueOf("peerTest"); 072 private static Table htable3; 073 074 @Override 075 protected String getClusterKey(HBaseTestingUtil util) throws Exception { 076 // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster 077 // key, as in this test we will pass the cluster key config in peer config directly to 078 // VerifyReplication job. 079 return util.getClusterKey(); 080 } 081 082 @BeforeEach 083 public void setUp() throws Exception { 084 cleanUp(); 085 UTIL2.deleteTableData(peerTableName); 086 } 087 088 @BeforeAll 089 public static void setUpBeforeClass() throws Exception { 090 TableDescriptor peerTable = 091 TableDescriptorBuilder.newBuilder(peerTableName) 092 .setColumnFamily( 093 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 094 .build(); 095 Connection connection2 = ConnectionFactory.createConnection(CONF2); 096 try (Admin admin2 = connection2.getAdmin()) { 097 admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 098 } 099 htable3 = connection2.getTable(peerTableName); 100 } 101 102 // VerifyReplication should honor versions option 103 @Test 104 public void testHBase14905() throws Exception { 105 // normal Batch tests 106 byte[] qualifierName = Bytes.toBytes("f1"); 107 Put put = new Put(Bytes.toBytes("r1")); 108 long ts = EnvironmentEdgeManager.currentTime(); 109 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002")); 110 htable1.put(put); 111 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001")); 112 htable1.put(put); 113 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112")); 114 htable1.put(put); 115 116 Scan scan = new Scan(); 117 scan.readVersions(100); 118 ResultScanner scanner1 = htable1.getScanner(scan); 119 Result[] res1 = scanner1.next(1); 120 scanner1.close(); 121 122 assertEquals(1, res1.length); 123 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 124 125 for (int i = 0; i < NB_RETRIES; i++) { 126 scan = new Scan(); 127 scan.readVersions(100); 128 scanner1 = htable2.getScanner(scan); 129 res1 = scanner1.next(1); 130 scanner1.close(); 131 if (res1.length != 1) { 132 LOG.info("Only got " + res1.length + " rows"); 133 Thread.sleep(SLEEP_TIME); 134 } else { 135 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 136 if (cellNumber != 3) { 137 LOG.info("Only got " + cellNumber + " cells"); 138 Thread.sleep(SLEEP_TIME); 139 } else { 140 break; 141 } 142 } 143 if (i == NB_RETRIES - 1) { 144 fail("Waited too much time for normal batch replication"); 145 } 146 } 147 148 put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111")); 149 htable2.put(put); 150 put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112")); 151 htable2.put(put); 152 153 scan = new Scan(); 154 scan.readVersions(100); 155 scanner1 = htable2.getScanner(scan); 156 res1 = scanner1.next(NB_ROWS_IN_BATCH); 157 scanner1.close(); 158 159 assertEquals(1, res1.length); 160 assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); 161 162 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 163 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); 164 } 165 166 // VerifyReplication should honor versions option 167 @Test 168 public void testVersionMismatchHBase14905() throws Exception { 169 // normal Batch tests 170 byte[] qualifierName = Bytes.toBytes("f1"); 171 Put put = new Put(Bytes.toBytes("r1")); 172 long ts = EnvironmentEdgeManager.currentTime(); 173 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1")); 174 htable1.put(put); 175 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2")); 176 htable1.put(put); 177 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3")); 178 htable1.put(put); 179 180 Scan scan = new Scan(); 181 scan.readVersions(100); 182 ResultScanner scanner1 = htable1.getScanner(scan); 183 Result[] res1 = scanner1.next(1); 184 scanner1.close(); 185 186 assertEquals(1, res1.length); 187 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 188 189 for (int i = 0; i < NB_RETRIES; i++) { 190 scan = new Scan(); 191 scan.readVersions(100); 192 scanner1 = htable2.getScanner(scan); 193 res1 = scanner1.next(1); 194 scanner1.close(); 195 if (res1.length != 1) { 196 LOG.info("Only got " + res1.length + " rows"); 197 Thread.sleep(SLEEP_TIME); 198 } else { 199 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 200 if (cellNumber != 3) { 201 LOG.info("Only got " + cellNumber + " cells"); 202 Thread.sleep(SLEEP_TIME); 203 } else { 204 break; 205 } 206 } 207 if (i == NB_RETRIES - 1) { 208 fail("Waited too much time for normal batch replication"); 209 } 210 } 211 212 try { 213 // Disabling replication and modifying the particular version of the cell to validate the 214 // feature. 215 hbaseAdmin.disableReplicationPeer(PEER_ID); 216 Put put2 = new Put(Bytes.toBytes("r1")); 217 put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99")); 218 htable2.put(put2); 219 220 scan = new Scan(); 221 scan.readVersions(100); 222 scanner1 = htable2.getScanner(scan); 223 res1 = scanner1.next(NB_ROWS_IN_BATCH); 224 scanner1.close(); 225 assertEquals(1, res1.length); 226 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 227 228 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 229 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); 230 } finally { 231 hbaseAdmin.enableReplicationPeer(PEER_ID); 232 } 233 } 234 235 @Test 236 public void testVerifyReplicationPrefixFiltering() throws Exception { 237 final byte[] prefixRow = Bytes.toBytes("prefixrow"); 238 final byte[] prefixRow2 = Bytes.toBytes("secondrow"); 239 loadData("prefixrow", prefixRow); 240 loadData("secondrow", prefixRow2); 241 loadData("aaa", row); 242 loadData("zzz", row); 243 waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); 244 String[] args = 245 new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() }; 246 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); 247 } 248 249 @Test 250 public void testVerifyReplicationSnapshotArguments() { 251 String[] args = 252 new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 253 assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 254 255 args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() }; 256 assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 257 258 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2", 259 tableName.getNameAsString() }; 260 assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 261 262 args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 263 assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 264 265 args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() }; 266 assertFalse(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 267 268 args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/", 269 "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", 270 tableName.getNameAsString() }; 271 assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 272 273 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/", 274 "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs", 275 "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() }; 276 277 assertTrue(new VerifyReplication().doCommandLine(args), Lists.newArrayList(args).toString()); 278 } 279 280 @Test 281 public void testVerifyReplicationWithSnapshotSupport() throws Exception { 282 // Populate the tables, at the same time it guarantees that the tables are 283 // identical since it does the check 284 runSmallBatchTest(); 285 286 // Take source and target tables snapshot 287 Path rootDir = CommonFSUtils.getRootDir(CONF1); 288 FileSystem fs = rootDir.getFileSystem(CONF1); 289 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 290 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 291 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 292 293 // Take target snapshot 294 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 295 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 296 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 297 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 298 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 299 300 String peerFSAddress = peerFs.getUri().toString(); 301 String temPath1 = UTIL1.getRandomDir().toString(); 302 String temPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 303 304 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 305 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 306 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 307 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", 308 tableName.getNameAsString() }; 309 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 310 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 1); 311 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 1); 312 313 Scan scan = new Scan(); 314 ResultScanner rs = htable2.getScanner(scan); 315 Put put = null; 316 for (Result result : rs) { 317 put = new Put(result.getRow()); 318 Cell firstVal = result.rawCells()[0]; 319 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 320 Bytes.toBytes("diff data")); 321 htable2.put(put); 322 } 323 Delete delete = new Delete(put.getRow()); 324 htable2.delete(delete); 325 326 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 327 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 328 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 329 330 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 331 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 332 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 333 334 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 335 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 336 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 337 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", 338 tableName.getNameAsString() }; 339 TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 340 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 2); 341 TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 2); 342 } 343 344 @AfterAll 345 public static void tearDownAfterClass() throws Exception { 346 htable3.close(); 347 } 348}