001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.ByteArrayOutputStream; 032import java.io.File; 033import java.io.IOException; 034import java.io.PrintStream; 035import java.net.URL; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.List; 039import java.util.Optional; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ArrayBackedTag; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellScanner; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtil; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeepDeletedCells; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Tag; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Delete; 059import org.apache.hadoop.hbase.client.Durability; 060import org.apache.hadoop.hbase.client.Get; 061import org.apache.hadoop.hbase.client.Mutation; 062import org.apache.hadoop.hbase.client.Put; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.Table; 068import org.apache.hadoop.hbase.client.TableDescriptor; 069import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 070import org.apache.hadoop.hbase.coprocessor.ObserverContext; 071import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 072import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 073import org.apache.hadoop.hbase.coprocessor.RegionObserver; 074import org.apache.hadoop.hbase.filter.Filter; 075import org.apache.hadoop.hbase.filter.FilterBase; 076import org.apache.hadoop.hbase.filter.PrefixFilter; 077import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 078import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 079import org.apache.hadoop.hbase.regionserver.HRegion; 080import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 081import org.apache.hadoop.hbase.regionserver.RegionScanner; 082import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 083import org.apache.hadoop.hbase.testclassification.MediumTests; 084import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 085import org.apache.hadoop.hbase.util.Bytes; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.LauncherSecurityManager; 088import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 089import org.apache.hadoop.hbase.wal.WAL; 090import org.apache.hadoop.hbase.wal.WALEdit; 091import org.apache.hadoop.hbase.wal.WALKey; 092import org.apache.hadoop.mapreduce.Mapper.Context; 093import org.apache.hadoop.util.GenericOptionsParser; 094import org.apache.hadoop.util.ToolRunner; 095import org.junit.After; 096import org.junit.AfterClass; 097import org.junit.Assert; 098import org.junit.Before; 099import org.junit.BeforeClass; 100import org.junit.ClassRule; 101import org.junit.Rule; 102import org.junit.Test; 103import org.junit.experimental.categories.Category; 104import org.junit.rules.TestName; 105import org.mockito.invocation.InvocationOnMock; 106import org.mockito.stubbing.Answer; 107import org.slf4j.Logger; 108import org.slf4j.LoggerFactory; 109 110/** 111 * Tests the table import and table export MR job functionality 112 */ 113@Category({ VerySlowMapReduceTests.class, MediumTests.class }) 114public class TestImportExport { 115 116 @ClassRule 117 public static final HBaseClassTestRule CLASS_RULE = 118 HBaseClassTestRule.forClass(TestImportExport.class); 119 120 private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class); 121 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 122 private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); 123 private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); 124 private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); 125 private static final String FAMILYA_STRING = "a"; 126 private static final String FAMILYB_STRING = "b"; 127 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); 128 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); 129 private static final byte[] QUAL = Bytes.toBytes("q"); 130 private static final String OUTPUT_DIR = "outputdir"; 131 private static String FQ_OUTPUT_DIR; 132 private static final String EXPORT_BATCH_SIZE = "100"; 133 134 private static final long now = EnvironmentEdgeManager.currentTime(); 135 private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); 136 private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); 137 public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1); 138 public static final String TEST_ATTR = "source_op"; 139 public static final String TEST_TAG = "test_tag"; 140 141 @BeforeClass 142 public static void beforeClass() throws Throwable { 143 // Up the handlers; this test needs more than usual. 144 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 145 UTIL.startMiniCluster(); 146 FQ_OUTPUT_DIR = 147 new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 148 } 149 150 @AfterClass 151 public static void afterClass() throws Throwable { 152 UTIL.shutdownMiniCluster(); 153 } 154 155 @Rule 156 public final TestName name = new TestName(); 157 158 @Before 159 public void announce() { 160 LOG.info("Running " + name.getMethodName()); 161 } 162 163 @After 164 public void cleanup() throws Throwable { 165 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 166 fs.delete(new Path(OUTPUT_DIR), true); 167 if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { 168 UTIL.deleteTable(EXPORT_TABLE); 169 } 170 if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { 171 UTIL.deleteTable(IMPORT_TABLE); 172 } 173 } 174 175 /** 176 * Runs an export job with the specified command line args 177 * @return true if job completed successfully 178 */ 179 protected boolean runExport(String[] args) throws Throwable { 180 // need to make a copy of the configuration because to make sure different temp dirs are used. 181 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); 182 return status == 0; 183 } 184 185 protected void runExportMain(String[] args) throws Throwable { 186 Export.main(args); 187 } 188 189 /** 190 * Runs an import job with the specified command line args 191 * @return true if job completed successfully 192 */ 193 boolean runImport(String[] args) throws Throwable { 194 // need to make a copy of the configuration because to make sure different temp dirs are used. 195 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); 196 return status == 0; 197 } 198 199 /** 200 * Test simple replication case with column mapping 201 */ 202 @Test 203 public void testSimpleCase() throws Throwable { 204 try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3)) { 205 Put p = new Put(ROW1); 206 p.addColumn(FAMILYA, QUAL, now, QUAL); 207 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 208 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 209 t.put(p); 210 p = new Put(ROW2); 211 p.addColumn(FAMILYA, QUAL, now, QUAL); 212 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 213 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 214 t.put(p); 215 p = new Put(ROW3); 216 p.addColumn(FAMILYA, QUAL, now, QUAL); 217 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 218 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 219 t.put(p); 220 } 221 222 String[] args = new String[] { 223 // Only export row1 & row2. 224 "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", 225 "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR, 226 "1000", // max number of key versions per key to export 227 }; 228 assertTrue(runExport(args)); 229 230 final String IMPORT_TABLE = name.getMethodName() + "import"; 231 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) { 232 args = 233 new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING, 234 IMPORT_TABLE, FQ_OUTPUT_DIR }; 235 assertTrue(runImport(args)); 236 237 Get g = new Get(ROW1); 238 g.readAllVersions(); 239 Result r = t.get(g); 240 assertEquals(3, r.size()); 241 g = new Get(ROW2); 242 g.readAllVersions(); 243 r = t.get(g); 244 assertEquals(3, r.size()); 245 g = new Get(ROW3); 246 r = t.get(g); 247 assertEquals(0, r.size()); 248 } 249 } 250 251 /** 252 * Test export hbase:meta table 253 */ 254 @Test 255 public void testMetaExport() throws Throwable { 256 String[] args = 257 new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" }; 258 assertTrue(runExport(args)); 259 } 260 261 /** 262 * Test import data from 0.94 exported file 263 */ 264 @Test 265 public void testImport94Table() throws Throwable { 266 final String name = "exportedTableIn94Format"; 267 URL url = TestImportExport.class.getResource(name); 268 File f = new File(url.toURI()); 269 if (!f.exists()) { 270 LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); 271 return; 272 } 273 assertTrue(f.exists()); 274 LOG.info("FILE=" + f); 275 Path importPath = new Path(f.toURI()); 276 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 277 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); 278 String IMPORT_TABLE = name; 279 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) { 280 String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR }; 281 assertTrue(runImport(args)); 282 // @formatter:off 283 // exportedTableIn94Format contains 5 rows 284 // ROW COLUMN+CELL 285 // r1 column=f1:c1, timestamp=1383766761171, value=val1 286 // r2 column=f1:c1, timestamp=1383766771642, value=val2 287 // r3 column=f1:c1, timestamp=1383766777615, value=val3 288 // r4 column=f1:c1, timestamp=1383766785146, value=val4 289 // r5 column=f1:c1, timestamp=1383766791506, value=val5 290 // @formatter:on 291 assertEquals(5, UTIL.countRows(t)); 292 } 293 } 294 295 /** 296 * Test export scanner batching 297 */ 298 @Test 299 public void testExportScannerBatching() throws Throwable { 300 TableDescriptor desc = TableDescriptorBuilder 301 .newBuilder(TableName.valueOf(name.getMethodName())) 302 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build()) 303 .build(); 304 UTIL.getAdmin().createTable(desc); 305 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 306 Put p = new Put(ROW1); 307 p.addColumn(FAMILYA, QUAL, now, QUAL); 308 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 309 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 310 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 311 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 312 t.put(p); 313 // added scanner batching arg. 314 String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, 315 name.getMethodName(), FQ_OUTPUT_DIR }; 316 assertTrue(runExport(args)); 317 318 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 319 fs.delete(new Path(FQ_OUTPUT_DIR), true); 320 } 321 } 322 323 @Test 324 public void testWithDeletes() throws Throwable { 325 TableDescriptor desc = 326 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 327 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 328 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 329 .build(); 330 UTIL.getAdmin().createTable(desc); 331 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 332 Put p = new Put(ROW1); 333 p.addColumn(FAMILYA, QUAL, now, QUAL); 334 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 335 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 336 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 337 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 338 t.put(p); 339 340 Delete d = new Delete(ROW1, now + 3); 341 t.delete(d); 342 d = new Delete(ROW1); 343 d.addColumns(FAMILYA, QUAL, now + 2); 344 t.delete(d); 345 } 346 347 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(), 348 FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export 349 }; 350 assertTrue(runExport(args)); 351 352 final String IMPORT_TABLE = name.getMethodName() + "import"; 353 desc = TableDescriptorBuilder 354 .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder 355 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 356 .build(); 357 UTIL.getAdmin().createTable(desc); 358 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 359 args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR }; 360 assertTrue(runImport(args)); 361 362 Scan s = new Scan(); 363 s.readAllVersions(); 364 s.setRaw(true); 365 ResultScanner scanner = t.getScanner(s); 366 Result r = scanner.next(); 367 Cell[] res = r.rawCells(); 368 assertTrue(PrivateCellUtil.isDeleteFamily(res[0])); 369 assertEquals(now + 4, res[1].getTimestamp()); 370 assertEquals(now + 3, res[2].getTimestamp()); 371 assertTrue(CellUtil.isDelete(res[3])); 372 assertEquals(now + 2, res[4].getTimestamp()); 373 assertEquals(now + 1, res[5].getTimestamp()); 374 assertEquals(now, res[6].getTimestamp()); 375 } 376 } 377 378 @Test 379 public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { 380 final TableName exportTable = TableName.valueOf(name.getMethodName()); 381 TableDescriptor desc = 382 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 383 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 384 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 385 .build(); 386 UTIL.getAdmin().createTable(desc); 387 388 Table exportT = UTIL.getConnection().getTable(exportTable); 389 390 // Add first version of QUAL 391 Put p = new Put(ROW1); 392 p.addColumn(FAMILYA, QUAL, now, QUAL); 393 exportT.put(p); 394 395 // Add Delete family marker 396 Delete d = new Delete(ROW1, now + 3); 397 exportT.delete(d); 398 399 // Add second version of QUAL 400 p = new Put(ROW1); 401 p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s")); 402 exportT.put(p); 403 404 // Add second Delete family marker 405 d = new Delete(ROW1, now + 7); 406 exportT.delete(d); 407 408 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 409 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 410 // export 411 }; 412 assertTrue(runExport(args)); 413 414 final String importTable = name.getMethodName() + "import"; 415 desc = TableDescriptorBuilder 416 .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder 417 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 418 .build(); 419 UTIL.getAdmin().createTable(desc); 420 421 Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); 422 args = new String[] { importTable, FQ_OUTPUT_DIR }; 423 assertTrue(runImport(args)); 424 425 Scan s = new Scan(); 426 s.readAllVersions(); 427 s.setRaw(true); 428 429 ResultScanner importedTScanner = importT.getScanner(s); 430 Result importedTResult = importedTScanner.next(); 431 432 ResultScanner exportedTScanner = exportT.getScanner(s); 433 Result exportedTResult = exportedTScanner.next(); 434 try { 435 Result.compareResults(exportedTResult, importedTResult); 436 } catch (Throwable e) { 437 fail("Original and imported tables data comparision failed with error:" + e.getMessage()); 438 } finally { 439 exportT.close(); 440 importT.close(); 441 } 442 } 443 444 /** 445 * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, 446 * attempt with invalid values. 447 */ 448 @Test 449 public void testWithFilter() throws Throwable { 450 // Create simple table to export 451 TableDescriptor desc = TableDescriptorBuilder 452 .newBuilder(TableName.valueOf(name.getMethodName())) 453 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 454 .build(); 455 UTIL.getAdmin().createTable(desc); 456 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 457 458 Put p1 = new Put(ROW1); 459 p1.addColumn(FAMILYA, QUAL, now, QUAL); 460 p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); 461 p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); 462 p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); 463 p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); 464 465 // Having another row would actually test the filter. 466 Put p2 = new Put(ROW2); 467 p2.addColumn(FAMILYA, QUAL, now, QUAL); 468 469 exportTable.put(Arrays.asList(p1, p2)); 470 471 // Export the simple table 472 String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; 473 assertTrue(runExport(args)); 474 475 // Import to a new table 476 final String IMPORT_TABLE = name.getMethodName() + "import"; 477 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 478 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 479 .build(); 480 UTIL.getAdmin().createTable(desc); 481 482 Table importTable = UTIL.getConnection().getTable(desc.getTableName()); 483 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), 484 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, 485 "1000" }; 486 assertTrue(runImport(args)); 487 488 // get the count of the source table for that time range 489 PrefixFilter filter = new PrefixFilter(ROW1); 490 int count = getCount(exportTable, filter); 491 492 Assert.assertEquals("Unexpected row count between export and import tables", count, 493 getCount(importTable, null)); 494 495 // and then test that a broken command doesn't bork everything - easier here because we don't 496 // need to re-run the export job 497 498 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), 499 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), 500 FQ_OUTPUT_DIR, "1000" }; 501 assertFalse(runImport(args)); 502 503 // cleanup 504 exportTable.close(); 505 importTable.close(); 506 } 507 508 /** 509 * Count the number of keyvalues in the specified table with the given filter 510 * @param table the table to scan 511 * @return the number of keyvalues found 512 */ 513 private int getCount(Table table, Filter filter) throws IOException { 514 Scan scan = new Scan(); 515 scan.setFilter(filter); 516 ResultScanner results = table.getScanner(scan); 517 int count = 0; 518 for (Result res : results) { 519 count += res.size(); 520 } 521 results.close(); 522 return count; 523 } 524 525 /** 526 * test main method. Import should print help and call System.exit 527 */ 528 @Test 529 public void testImportMain() throws Throwable { 530 PrintStream oldPrintStream = System.err; 531 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 532 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 533 System.setSecurityManager(newSecurityManager); 534 ByteArrayOutputStream data = new ByteArrayOutputStream(); 535 String[] args = {}; 536 System.setErr(new PrintStream(data)); 537 try { 538 System.setErr(new PrintStream(data)); 539 Import.main(args); 540 fail("should be SecurityException"); 541 } catch (SecurityException e) { 542 assertEquals(-1, newSecurityManager.getExitCode()); 543 assertTrue(data.toString().contains("Wrong number of arguments:")); 544 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 545 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); 546 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 547 assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); 548 } finally { 549 System.setErr(oldPrintStream); 550 System.setSecurityManager(SECURITY_MANAGER); 551 } 552 } 553 554 @Test 555 public void testExportScan() throws Exception { 556 int version = 100; 557 long startTime = EnvironmentEdgeManager.currentTime(); 558 long endTime = startTime + 1; 559 String prefix = "row"; 560 String label_0 = "label_0"; 561 String label_1 = "label_1"; 562 String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime), 563 String.valueOf(endTime), prefix }; 564 Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); 565 assertEquals(version, scan.getMaxVersions()); 566 assertEquals(startTime, scan.getTimeRange().getMin()); 567 assertEquals(endTime, scan.getTimeRange().getMax()); 568 assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); 569 assertEquals(0, 570 Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 571 String[] argsWithLabels = 572 { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table", 573 "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime), 574 prefix }; 575 Configuration conf = new Configuration(UTIL.getConfiguration()); 576 // parse the "-D" options 577 String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); 578 Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); 579 assertEquals(version, scanWithLabels.getMaxVersions()); 580 assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); 581 assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); 582 assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); 583 assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), 584 Bytes.toBytesBinary(prefix))); 585 assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); 586 assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); 587 assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); 588 } 589 590 /** 591 * test main method. Export should print help and call System.exit 592 */ 593 @Test 594 public void testExportMain() throws Throwable { 595 PrintStream oldPrintStream = System.err; 596 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 597 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 598 System.setSecurityManager(newSecurityManager); 599 ByteArrayOutputStream data = new ByteArrayOutputStream(); 600 String[] args = {}; 601 System.setErr(new PrintStream(data)); 602 try { 603 System.setErr(new PrintStream(data)); 604 runExportMain(args); 605 fail("should be SecurityException"); 606 } catch (SecurityException e) { 607 assertEquals(-1, newSecurityManager.getExitCode()); 608 String errMsg = data.toString(); 609 assertTrue(errMsg.contains("Wrong number of arguments:")); 610 assertTrue( 611 errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " 612 + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); 613 assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); 614 assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); 615 assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); 616 assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); 617 assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); 618 } finally { 619 System.setErr(oldPrintStream); 620 System.setSecurityManager(SECURITY_MANAGER); 621 } 622 } 623 624 /** 625 * Test map method of Importer 626 */ 627 @SuppressWarnings({ "unchecked", "rawtypes" }) 628 @Test 629 public void testKeyValueImporter() throws Throwable { 630 CellImporter importer = new CellImporter(); 631 Configuration configuration = new Configuration(); 632 Context ctx = mock(Context.class); 633 when(ctx.getConfiguration()).thenReturn(configuration); 634 635 doAnswer(new Answer<Void>() { 636 637 @Override 638 public Void answer(InvocationOnMock invocation) throws Throwable { 639 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0); 640 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1); 641 assertEquals("Key", Bytes.toString(writer.get())); 642 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 643 return null; 644 } 645 }).when(ctx).write(any(), any()); 646 647 importer.setup(ctx); 648 Result value = mock(Result.class); 649 KeyValue[] keys = { 650 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 651 Bytes.toBytes("value")), 652 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 653 Bytes.toBytes("value1")) }; 654 when(value.rawCells()).thenReturn(keys); 655 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); 656 657 } 658 659 /** 660 * Test addFilterAndArguments method of Import This method set couple parameters into 661 * Configuration 662 */ 663 @Test 664 public void testAddFilterAndArguments() throws IOException { 665 Configuration configuration = new Configuration(); 666 667 List<String> args = new ArrayList<>(); 668 args.add("param1"); 669 args.add("param2"); 670 671 Import.addFilterAndArguments(configuration, FilterBase.class, args); 672 assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 673 configuration.get(Import.FILTER_CLASS_CONF_KEY)); 674 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); 675 } 676 677 @Test 678 public void testDurability() throws Throwable { 679 // Create an export table. 680 String exportTableName = name.getMethodName() + "export"; 681 try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) { 682 // Insert some data 683 Put put = new Put(ROW1); 684 put.addColumn(FAMILYA, QUAL, now, QUAL); 685 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 686 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 687 exportTable.put(put); 688 689 put = new Put(ROW2); 690 put.addColumn(FAMILYA, QUAL, now, QUAL); 691 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 692 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 693 exportTable.put(put); 694 695 // Run the export 696 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" }; 697 assertTrue(runExport(args)); 698 699 // Create the table for import 700 String importTableName = name.getMethodName() + "import1"; 701 Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 702 703 // Register the wal listener for the import table 704 RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 705 .getRegions(importTable.getName()).get(0).getRegionInfo(); 706 TableWALActionListener walListener = new TableWALActionListener(region); 707 WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 708 wal.registerWALActionsListener(walListener); 709 710 // Run the import with SKIP_WAL 711 args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), 712 importTableName, FQ_OUTPUT_DIR }; 713 assertTrue(runImport(args)); 714 // Assert that the wal is not visisted 715 assertTrue(!walListener.isWALVisited()); 716 // Ensure that the count is 2 (only one version of key value is obtained) 717 assertTrue(getCount(importTable, null) == 2); 718 719 // Run the import with the default durability option 720 importTableName = name.getMethodName() + "import2"; 721 importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 722 region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 723 .getRegions(importTable.getName()).get(0).getRegionInfo(); 724 wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 725 walListener = new TableWALActionListener(region); 726 wal.registerWALActionsListener(walListener); 727 args = new String[] { importTableName, FQ_OUTPUT_DIR }; 728 assertTrue(runImport(args)); 729 // Assert that the wal is visisted 730 assertTrue(walListener.isWALVisited()); 731 // Ensure that the count is 2 (only one version of key value is obtained) 732 assertTrue(getCount(importTable, null) == 2); 733 } 734 } 735 736 /** 737 * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify 738 * that an entry is written to the Write Ahead Log for the given table. 739 */ 740 private static class TableWALActionListener implements WALActionsListener { 741 742 private RegionInfo regionInfo; 743 private boolean isVisited = false; 744 745 public TableWALActionListener(RegionInfo region) { 746 this.regionInfo = region; 747 } 748 749 @Override 750 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 751 if ( 752 logKey.getTableName().getNameAsString() 753 .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit()) 754 ) { 755 isVisited = true; 756 } 757 } 758 759 public boolean isWALVisited() { 760 return isVisited; 761 } 762 } 763 764 /** 765 * Add cell tags to delete mutations, run export and import tool and verify that tags are present 766 * in import table also. 767 * @throws Throwable throws Throwable. 768 */ 769 @Test 770 public void testTagsAddition() throws Throwable { 771 final TableName exportTable = TableName.valueOf(name.getMethodName()); 772 TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable) 773 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 774 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 775 .setCoprocessor(MetadataController.class.getName()).build(); 776 UTIL.getAdmin().createTable(desc); 777 778 Table exportT = UTIL.getConnection().getTable(exportTable); 779 780 // Add first version of QUAL 781 Put p = new Put(ROW1); 782 p.addColumn(FAMILYA, QUAL, now, QUAL); 783 exportT.put(p); 784 785 // Add Delete family marker 786 Delete d = new Delete(ROW1, now + 3); 787 // Add test attribute to delete mutation. 788 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 789 exportT.delete(d); 790 791 // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool 792 // will use KeyValueCodecWithTags. 793 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 794 // This will make sure that codec will encode and decode tags in rpc call. 795 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 796 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 797 // export 798 }; 799 assertTrue(runExport(args)); 800 // Assert tag exists in exportTable 801 checkWhetherTagExists(exportTable, true); 802 803 // Create an import table with MetadataController. 804 final TableName importTable = TableName.valueOf("importWithTestTagsAddition"); 805 TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable) 806 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 807 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 808 .setCoprocessor(MetadataController.class.getName()).build(); 809 UTIL.getAdmin().createTable(importTableDesc); 810 811 // Run import tool. 812 args = new String[] { 813 // This will make sure that codec will encode and decode tags in rpc call. 814 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 815 importTable.getNameAsString(), FQ_OUTPUT_DIR }; 816 assertTrue(runImport(args)); 817 // Make sure that tags exists in imported table. 818 checkWhetherTagExists(importTable, true); 819 } 820 821 private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException { 822 List<Cell> values = new ArrayList<>(); 823 for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) { 824 Scan scan = new Scan(); 825 // Make sure to set rawScan to true so that we will get Delete Markers. 826 scan.setRaw(true); 827 scan.readAllVersions(); 828 scan.withStartRow(ROW1); 829 // Need to use RegionScanner instead of table#getScanner since the latter will 830 // not return tags since it will go through rpc layer and remove tags intentionally. 831 RegionScanner scanner = region.getScanner(scan); 832 scanner.next(values); 833 if (!values.isEmpty()) { 834 break; 835 } 836 } 837 boolean deleteFound = false; 838 for (Cell cell : values) { 839 if (PrivateCellUtil.isDelete(cell.getType().getCode())) { 840 deleteFound = true; 841 List<Tag> tags = PrivateCellUtil.getTags(cell); 842 // If tagExists flag is true then validate whether tag contents are as expected. 843 if (tagExists) { 844 Assert.assertEquals(1, tags.size()); 845 for (Tag tag : tags) { 846 Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag)); 847 } 848 } else { 849 // If tagExists flag is disabled then check for 0 size tags. 850 assertEquals(0, tags.size()); 851 } 852 } 853 } 854 Assert.assertTrue(deleteFound); 855 } 856 857 /* 858 * This co-proc will add a cell tag to delete mutation. 859 */ 860 public static class MetadataController implements RegionCoprocessor, RegionObserver { 861 @Override 862 public Optional<RegionObserver> getRegionObserver() { 863 return Optional.of(this); 864 } 865 866 @Override 867 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 868 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 869 if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) { 870 return; 871 } 872 for (int i = 0; i < miniBatchOp.size(); i++) { 873 Mutation m = miniBatchOp.getOperation(i); 874 if (!(m instanceof Delete)) { 875 continue; 876 } 877 byte[] sourceOpAttr = m.getAttribute(TEST_ATTR); 878 if (sourceOpAttr == null) { 879 continue; 880 } 881 Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr); 882 List<Cell> updatedCells = new ArrayList<>(); 883 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { 884 Cell cell = cellScanner.current(); 885 List<Tag> tags = PrivateCellUtil.getTags(cell); 886 tags.add(sourceOpTag); 887 Cell updatedCell = PrivateCellUtil.createCell(cell, tags); 888 updatedCells.add(updatedCell); 889 } 890 m.getFamilyCellMap().clear(); 891 // Clear and add new Cells to the Mutation. 892 for (Cell cell : updatedCells) { 893 Delete d = (Delete) m; 894 d.add(cell); 895 } 896 } 897 } 898 } 899 900 /** 901 * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means 902 * it will use no Codec. Make sure that we don't return Tags in response. 903 * @throws Exception Exception 904 */ 905 @Test 906 public void testTagsWithEmptyCodec() throws Exception { 907 TableName tableName = TableName.valueOf(name.getMethodName()); 908 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 909 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 910 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 911 .setCoprocessor(MetadataController.class.getName()).build(); 912 UTIL.getAdmin().createTable(tableDesc); 913 Configuration conf = new Configuration(UTIL.getConfiguration()); 914 conf.set(RPC_CODEC_CONF_KEY, ""); 915 conf.set(DEFAULT_CODEC_CLASS, ""); 916 try (Connection connection = ConnectionFactory.createConnection(conf); 917 Table table = connection.getTable(tableName)) { 918 // Add first version of QUAL 919 Put p = new Put(ROW1); 920 p.addColumn(FAMILYA, QUAL, now, QUAL); 921 table.put(p); 922 923 // Add Delete family marker 924 Delete d = new Delete(ROW1, now + 3); 925 // Add test attribute to delete mutation. 926 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 927 table.delete(d); 928 929 // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use 930 // empty Codec and it shouldn't encode/decode tags. 931 Scan scan = new Scan().withStartRow(ROW1).setRaw(true); 932 ResultScanner scanner = table.getScanner(scan); 933 int count = 0; 934 Result result; 935 while ((result = scanner.next()) != null) { 936 List<Cell> cells = result.listCells(); 937 assertEquals(2, cells.size()); 938 Cell cell = cells.get(0); 939 assertTrue(CellUtil.isDelete(cell)); 940 List<Tag> tags = PrivateCellUtil.getTags(cell); 941 assertEquals(0, tags.size()); 942 count++; 943 } 944 assertEquals(1, count); 945 } finally { 946 UTIL.deleteTable(tableName); 947 } 948 } 949}