001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.TreeMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.ReplicationTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.mapreduce.Job; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.TestName; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 070 071@Category({ ReplicationTests.class, LargeTests.class }) 072public class TestVerifyReplication extends TestReplicationBase { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestVerifyReplication.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); 079 080 private static final String PEER_ID = "2"; 081 082 @Rule 083 public TestName name = new TestName(); 084 085 @Before 086 public void setUp() throws Exception { 087 cleanUp(); 088 } 089 090 private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 091 throws IOException, InterruptedException, ClassNotFoundException { 092 Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args); 093 if (job == null) { 094 fail("Job wasn't created, see the log"); 095 } 096 if (!job.waitForCompletion(true)) { 097 fail("Job failed, see the log"); 098 } 099 assertEquals(expectedGoodRows, 100 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 101 assertEquals(expectedBadRows, 102 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 103 } 104 105 /** 106 * Do a small loading into a table, make sure the data is really the same, then run the 107 * VerifyReplication job to check the results. Do a second comparison where all the cells are 108 * different. 109 */ 110 @Test 111 public void testVerifyRepJob() throws Exception { 112 // Populate the tables, at the same time it guarantees that the tables are 113 // identical since it does the check 114 runSmallBatchTest(); 115 116 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 117 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 118 119 Scan scan = new Scan(); 120 ResultScanner rs = htable2.getScanner(scan); 121 Put put = null; 122 for (Result result : rs) { 123 put = new Put(result.getRow()); 124 Cell firstVal = result.rawCells()[0]; 125 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 126 Bytes.toBytes("diff data")); 127 htable2.put(put); 128 } 129 Delete delete = new Delete(put.getRow()); 130 htable2.delete(delete); 131 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 132 } 133 134 /** 135 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 136 * delete marker is replicated, run verify replication with and without raw to check the results. 137 */ 138 @Test 139 public void testVerifyRepJobWithRawOptions() throws Exception { 140 LOG.info(name.getMethodName()); 141 142 final TableName tableName = TableName.valueOf(name.getMethodName()); 143 byte[] familyname = Bytes.toBytes("fam_raw"); 144 byte[] row = Bytes.toBytes("row_raw"); 145 146 Table lHtable1 = null; 147 Table lHtable2 = null; 148 149 try { 150 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 151 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 152 TableDescriptor table = 153 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 154 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 155 for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { 156 scopes.put(f.getName(), f.getScope()); 157 } 158 159 Connection connection1 = ConnectionFactory.createConnection(conf1); 160 Connection connection2 = ConnectionFactory.createConnection(conf2); 161 try (Admin admin1 = connection1.getAdmin()) { 162 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 163 } 164 try (Admin admin2 = connection2.getAdmin()) { 165 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 166 } 167 utility1.waitUntilAllRegionsAssigned(tableName); 168 utility2.waitUntilAllRegionsAssigned(tableName); 169 170 lHtable1 = utility1.getConnection().getTable(tableName); 171 lHtable2 = utility2.getConnection().getTable(tableName); 172 173 Put put = new Put(row); 174 put.addColumn(familyname, row, row); 175 lHtable1.put(put); 176 177 Get get = new Get(row); 178 for (int i = 0; i < NB_RETRIES; i++) { 179 if (i == NB_RETRIES - 1) { 180 fail("Waited too much time for put replication"); 181 } 182 Result res = lHtable2.get(get); 183 if (res.isEmpty()) { 184 LOG.info("Row not available"); 185 Thread.sleep(SLEEP_TIME); 186 } else { 187 assertArrayEquals(res.value(), row); 188 break; 189 } 190 } 191 192 Delete del = new Delete(row); 193 lHtable1.delete(del); 194 195 get = new Get(row); 196 for (int i = 0; i < NB_RETRIES; i++) { 197 if (i == NB_RETRIES - 1) { 198 fail("Waited too much time for del replication"); 199 } 200 Result res = lHtable2.get(get); 201 if (res.size() >= 1) { 202 LOG.info("Row not deleted"); 203 Thread.sleep(SLEEP_TIME); 204 } else { 205 break; 206 } 207 } 208 209 // Checking verifyReplication for the default behavior. 210 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 211 runVerifyReplication(argsWithoutRaw, 0, 0); 212 213 // Checking verifyReplication with raw 214 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 215 runVerifyReplication(argsWithRawAsTrue, 1, 0); 216 } finally { 217 if (lHtable1 != null) { 218 lHtable1.close(); 219 } 220 if (lHtable2 != null) { 221 lHtable2.close(); 222 } 223 } 224 } 225 226 // VerifyReplication should honor versions option 227 @Test 228 public void testHBase14905() throws Exception { 229 // normal Batch tests 230 byte[] qualifierName = Bytes.toBytes("f1"); 231 Put put = new Put(Bytes.toBytes("r1")); 232 long ts = System.currentTimeMillis(); 233 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1002")); 234 htable1.put(put); 235 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v1001")); 236 htable1.put(put); 237 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v1112")); 238 htable1.put(put); 239 240 Scan scan = new Scan(); 241 scan.readVersions(100); 242 ResultScanner scanner1 = htable1.getScanner(scan); 243 Result[] res1 = scanner1.next(1); 244 scanner1.close(); 245 246 assertEquals(1, res1.length); 247 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 248 249 for (int i = 0; i < NB_RETRIES; i++) { 250 scan = new Scan(); 251 scan.readVersions(100); 252 scanner1 = htable2.getScanner(scan); 253 res1 = scanner1.next(1); 254 scanner1.close(); 255 if (res1.length != 1) { 256 LOG.info("Only got " + res1.length + " rows"); 257 Thread.sleep(SLEEP_TIME); 258 } else { 259 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 260 if (cellNumber != 3) { 261 LOG.info("Only got " + cellNumber + " cells"); 262 Thread.sleep(SLEEP_TIME); 263 } else { 264 break; 265 } 266 } 267 if (i == NB_RETRIES - 1) { 268 fail("Waited too much time for normal batch replication"); 269 } 270 } 271 272 put.addColumn(famName, qualifierName, ts + 4, Bytes.toBytes("v1111")); 273 htable2.put(put); 274 put.addColumn(famName, qualifierName, ts + 5, Bytes.toBytes("v1112")); 275 htable2.put(put); 276 277 scan = new Scan(); 278 scan.readVersions(100); 279 scanner1 = htable2.getScanner(scan); 280 res1 = scanner1.next(NB_ROWS_IN_BATCH); 281 scanner1.close(); 282 283 assertEquals(1, res1.length); 284 assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); 285 286 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 287 runVerifyReplication(args, 0, 1); 288 } 289 290 // VerifyReplication should honor versions option 291 @Test 292 public void testVersionMismatchHBase14905() throws Exception { 293 // normal Batch tests 294 byte[] qualifierName = Bytes.toBytes("f1"); 295 Put put = new Put(Bytes.toBytes("r1")); 296 long ts = System.currentTimeMillis(); 297 put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1")); 298 htable1.put(put); 299 put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2")); 300 htable1.put(put); 301 put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3")); 302 htable1.put(put); 303 304 Scan scan = new Scan(); 305 scan.readVersions(100); 306 ResultScanner scanner1 = htable1.getScanner(scan); 307 Result[] res1 = scanner1.next(1); 308 scanner1.close(); 309 310 assertEquals(1, res1.length); 311 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 312 313 for (int i = 0; i < NB_RETRIES; i++) { 314 scan = new Scan(); 315 scan.readVersions(100); 316 scanner1 = htable2.getScanner(scan); 317 res1 = scanner1.next(1); 318 scanner1.close(); 319 if (res1.length != 1) { 320 LOG.info("Only got " + res1.length + " rows"); 321 Thread.sleep(SLEEP_TIME); 322 } else { 323 int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); 324 if (cellNumber != 3) { 325 LOG.info("Only got " + cellNumber + " cells"); 326 Thread.sleep(SLEEP_TIME); 327 } else { 328 break; 329 } 330 } 331 if (i == NB_RETRIES - 1) { 332 fail("Waited too much time for normal batch replication"); 333 } 334 } 335 336 try { 337 // Disabling replication and modifying the particular version of the cell to validate the 338 // feature. 339 hbaseAdmin.disableReplicationPeer(PEER_ID); 340 Put put2 = new Put(Bytes.toBytes("r1")); 341 put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99")); 342 htable2.put(put2); 343 344 scan = new Scan(); 345 scan.readVersions(100); 346 scanner1 = htable2.getScanner(scan); 347 res1 = scanner1.next(NB_ROWS_IN_BATCH); 348 scanner1.close(); 349 assertEquals(1, res1.length); 350 assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); 351 352 String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; 353 runVerifyReplication(args, 0, 1); 354 } finally { 355 hbaseAdmin.enableReplicationPeer(PEER_ID); 356 } 357 } 358 359 @Test 360 public void testVerifyReplicationPrefixFiltering() throws Exception { 361 final byte[] prefixRow = Bytes.toBytes("prefixrow"); 362 final byte[] prefixRow2 = Bytes.toBytes("secondrow"); 363 loadData("prefixrow", prefixRow); 364 loadData("secondrow", prefixRow2); 365 loadData("aaa", row); 366 loadData("zzz", row); 367 waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); 368 String[] args = 369 new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() }; 370 runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); 371 } 372 373 @Test 374 public void testVerifyReplicationSnapshotArguments() { 375 String[] args = 376 new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 377 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 378 379 args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() }; 380 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 381 382 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2", 383 tableName.getNameAsString() }; 384 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 385 386 args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() }; 387 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 388 389 args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() }; 390 assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 391 392 args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/", 393 "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", 394 tableName.getNameAsString() }; 395 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 396 397 args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/", 398 "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs", 399 "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() }; 400 401 assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); 402 } 403 404 private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 405 throws IOException { 406 FileSystem fs = FileSystem.get(conf); 407 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 408 assertNotNull(subDirectories); 409 assertEquals(subDirectories.length, expectedCount); 410 for (int i = 0; i < expectedCount; i++) { 411 assertTrue(subDirectories[i].isDirectory()); 412 } 413 } 414 415 @Test 416 public void testVerifyReplicationWithSnapshotSupport() throws Exception { 417 // Populate the tables, at the same time it guarantees that the tables are 418 // identical since it does the check 419 runSmallBatchTest(); 420 421 // Take source and target tables snapshot 422 Path rootDir = FSUtils.getRootDir(conf1); 423 FileSystem fs = rootDir.getFileSystem(conf1); 424 String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 425 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 426 new String(famName), sourceSnapshotName, rootDir, fs, true); 427 428 // Take target snapshot 429 Path peerRootDir = FSUtils.getRootDir(conf2); 430 FileSystem peerFs = peerRootDir.getFileSystem(conf2); 431 String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 432 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 433 new String(famName), peerSnapshotName, peerRootDir, peerFs, true); 434 435 String peerFSAddress = peerFs.getUri().toString(); 436 String temPath1 = utility1.getRandomDir().toString(); 437 String temPath2 = "/tmp2"; 438 439 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 440 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 441 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 442 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; 443 444 Job job = new VerifyReplication().createSubmittableJob(conf1, args); 445 if (job == null) { 446 fail("Job wasn't created, see the log"); 447 } 448 if (!job.waitForCompletion(true)) { 449 fail("Job failed, see the log"); 450 } 451 assertEquals(NB_ROWS_IN_BATCH, 452 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 453 assertEquals(0, 454 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 455 456 checkRestoreTmpDir(conf1, temPath1, 1); 457 checkRestoreTmpDir(conf2, temPath2, 1); 458 459 Scan scan = new Scan(); 460 ResultScanner rs = htable2.getScanner(scan); 461 Put put = null; 462 for (Result result : rs) { 463 put = new Put(result.getRow()); 464 Cell firstVal = result.rawCells()[0]; 465 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 466 Bytes.toBytes("diff data")); 467 htable2.put(put); 468 } 469 Delete delete = new Delete(put.getRow()); 470 htable2.delete(delete); 471 472 sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); 473 SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, 474 new String(famName), sourceSnapshotName, rootDir, fs, true); 475 476 peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); 477 SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, 478 new String(famName), peerSnapshotName, peerRootDir, peerFs, true); 479 480 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 481 "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, 482 "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, 483 "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; 484 485 job = new VerifyReplication().createSubmittableJob(conf1, args); 486 if (job == null) { 487 fail("Job wasn't created, see the log"); 488 } 489 if (!job.waitForCompletion(true)) { 490 fail("Job failed, see the log"); 491 } 492 assertEquals(0, 493 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 494 assertEquals(NB_ROWS_IN_BATCH, 495 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 496 497 checkRestoreTmpDir(conf1, temPath1, 2); 498 checkRestoreTmpDir(conf2, temPath2, 2); 499 } 500}