001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.doAnswer; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.ByteArrayOutputStream; 030import java.io.File; 031import java.io.IOException; 032import java.io.PrintStream; 033import java.net.URL; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeepDeletedCells; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.PrivateCellUtil; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Delete; 051import org.apache.hadoop.hbase.client.Durability; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 061import org.apache.hadoop.hbase.filter.Filter; 062import org.apache.hadoop.hbase.filter.FilterBase; 063import org.apache.hadoop.hbase.filter.PrefixFilter; 064import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 065import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.LauncherSecurityManager; 071import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.apache.hadoop.hbase.wal.WALEdit; 074import org.apache.hadoop.hbase.wal.WALKey; 075import org.apache.hadoop.mapreduce.Mapper.Context; 076import org.apache.hadoop.util.GenericOptionsParser; 077import org.apache.hadoop.util.ToolRunner; 078import org.junit.After; 079import org.junit.AfterClass; 080import org.junit.Assert; 081import org.junit.Before; 082import org.junit.BeforeClass; 083import org.junit.ClassRule; 084import org.junit.Rule; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.junit.rules.TestName; 088import org.mockito.invocation.InvocationOnMock; 089import org.mockito.stubbing.Answer; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093/** 094 * Tests the table import and table export MR job functionality 095 */ 096@Category({VerySlowMapReduceTests.class, MediumTests.class}) 097public class TestCellBasedImportExport2 { 098 099 @ClassRule 100 public static final HBaseClassTestRule CLASS_RULE = 101 HBaseClassTestRule.forClass(TestCellBasedImportExport2.class); 102 103 private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedImportExport2.class); 104 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 105 private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); 106 private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); 107 private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); 108 private static final String FAMILYA_STRING = "a"; 109 private static final String FAMILYB_STRING = "b"; 110 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); 111 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); 112 private static final byte[] QUAL = Bytes.toBytes("q"); 113 private static final String OUTPUT_DIR = "outputdir"; 114 private static String FQ_OUTPUT_DIR; 115 private static final String EXPORT_BATCH_SIZE = "100"; 116 117 private static final long now = System.currentTimeMillis(); 118 private final TableName EXPORT_TABLE = TableName.valueOf("export_table"); 119 private final TableName IMPORT_TABLE = TableName.valueOf("import_table"); 120 121 @BeforeClass 122 public static void beforeClass() throws Throwable { 123 // Up the handlers; this test needs more than usual. 124 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 125 UTIL.startMiniCluster(); 126 FQ_OUTPUT_DIR = 127 new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 128 } 129 130 @AfterClass 131 public static void afterClass() throws Throwable { 132 UTIL.shutdownMiniCluster(); 133 } 134 135 @Rule 136 public final TestName name = new TestName(); 137 138 @Before 139 public void announce() { 140 LOG.info("Running " + name.getMethodName()); 141 } 142 143 @After 144 public void cleanup() throws Throwable { 145 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 146 fs.delete(new Path(OUTPUT_DIR), true); 147 if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { 148 UTIL.deleteTable(EXPORT_TABLE); 149 } 150 if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { 151 UTIL.deleteTable(IMPORT_TABLE); 152 } 153 } 154 155 /** 156 * Runs an export job with the specified command line args 157 * @param args 158 * @return true if job completed successfully 159 * @throws IOException 160 * @throws InterruptedException 161 * @throws ClassNotFoundException 162 */ 163 protected boolean runExport(String[] args) throws Throwable { 164 // need to make a copy of the configuration because to make sure different temp dirs are used. 165 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); 166 return status == 0; 167 } 168 169 protected void runExportMain(String[] args) throws Throwable { 170 Export.main(args); 171 } 172 173 /** 174 * Runs an import job with the specified command line args 175 * @param args 176 * @return true if job completed successfully 177 * @throws IOException 178 * @throws InterruptedException 179 * @throws ClassNotFoundException 180 */ 181 boolean runImport(String[] args) throws Throwable { 182 // need to make a copy of the configuration because to make sure different temp dirs are used. 183 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); 184 return status == 0; 185 } 186 187 /** 188 * Test simple replication case with column mapping 189 * @throws Exception 190 */ 191 @Test 192 public void testSimpleCase() throws Throwable { 193 try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) { 194 Put p = new Put(ROW1); 195 p.addColumn(FAMILYA, QUAL, now, QUAL); 196 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 197 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 198 t.put(p); 199 p = new Put(ROW2); 200 p.addColumn(FAMILYA, QUAL, now, QUAL); 201 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 202 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 203 t.put(p); 204 p = new Put(ROW3); 205 p.addColumn(FAMILYA, QUAL, now, QUAL); 206 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 207 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 208 t.put(p); 209 } 210 211 String[] args = new String[] { 212 // Only export row1 & row2. 213 "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", 214 "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", 215 name.getMethodName(), 216 FQ_OUTPUT_DIR, 217 "1000", // max number of key versions per key to export 218 }; 219 assertTrue(runExport(args)); 220 221 final String IMPORT_TABLE = name.getMethodName() + "import"; 222 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { 223 args = new String[] { 224 "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, 225 IMPORT_TABLE, 226 FQ_OUTPUT_DIR 227 }; 228 assertTrue(runImport(args)); 229 230 Get g = new Get(ROW1); 231 g.setMaxVersions(); 232 Result r = t.get(g); 233 assertEquals(3, r.size()); 234 g = new Get(ROW2); 235 g.setMaxVersions(); 236 r = t.get(g); 237 assertEquals(3, r.size()); 238 g = new Get(ROW3); 239 r = t.get(g); 240 assertEquals(0, r.size()); 241 } 242 } 243 244 /** 245 * Test export hbase:meta table 246 * 247 * @throws Throwable 248 */ 249 @Test 250 public void testMetaExport() throws Throwable { 251 String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(), 252 FQ_OUTPUT_DIR, "1", "0", "0" }; 253 assertTrue(runExport(args)); 254 } 255 256 /** 257 * Test import data from 0.94 exported file 258 * @throws Throwable 259 */ 260 @Test 261 public void testImport94Table() throws Throwable { 262 final String name = "exportedTableIn94Format"; 263 URL url = TestCellBasedImportExport2.class.getResource(name); 264 File f = new File(url.toURI()); 265 if (!f.exists()) { 266 LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); 267 return; 268 } 269 assertTrue(f.exists()); 270 LOG.info("FILE=" + f); 271 Path importPath = new Path(f.toURI()); 272 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 273 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); 274 String IMPORT_TABLE = name; 275 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { 276 String[] args = new String[] { 277 "-Dhbase.import.version=0.94" , 278 IMPORT_TABLE, FQ_OUTPUT_DIR 279 }; 280 assertTrue(runImport(args)); 281 /* exportedTableIn94Format contains 5 rows 282 ROW COLUMN+CELL 283 r1 column=f1:c1, timestamp=1383766761171, value=val1 284 r2 column=f1:c1, timestamp=1383766771642, value=val2 285 r3 column=f1:c1, timestamp=1383766777615, value=val3 286 r4 column=f1:c1, timestamp=1383766785146, value=val4 287 r5 column=f1:c1, timestamp=1383766791506, value=val5 288 */ 289 assertEquals(5, UTIL.countRows(t)); 290 } 291 } 292 293 /** 294 * Test export scanner batching 295 */ 296 @Test 297 public void testExportScannerBatching() throws Throwable { 298 TableDescriptor desc = TableDescriptorBuilder 299 .newBuilder(TableName.valueOf(name.getMethodName())) 300 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 301 .setMaxVersions(1) 302 .build()) 303 .build(); 304 UTIL.getAdmin().createTable(desc); 305 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 306 307 Put p = new Put(ROW1); 308 p.addColumn(FAMILYA, QUAL, now, QUAL); 309 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 310 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 311 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 312 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 313 t.put(p); 314 315 String[] args = new String[] { 316 "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. 317 name.getMethodName(), 318 FQ_OUTPUT_DIR 319 }; 320 assertTrue(runExport(args)); 321 322 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 323 fs.delete(new Path(FQ_OUTPUT_DIR), true); 324 } 325 } 326 327 @Test 328 public void testWithDeletes() throws Throwable { 329 TableDescriptor desc = TableDescriptorBuilder 330 .newBuilder(TableName.valueOf(name.getMethodName())) 331 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 332 .setMaxVersions(5) 333 .setKeepDeletedCells(KeepDeletedCells.TRUE) 334 .build()) 335 .build(); 336 UTIL.getAdmin().createTable(desc); 337 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 338 339 Put p = new Put(ROW1); 340 p.addColumn(FAMILYA, QUAL, now, QUAL); 341 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 342 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 343 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 344 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 345 t.put(p); 346 347 Delete d = new Delete(ROW1, now+3); 348 t.delete(d); 349 d = new Delete(ROW1); 350 d.addColumns(FAMILYA, QUAL, now+2); 351 t.delete(d); 352 } 353 354 String[] args = new String[] { 355 "-D" + ExportUtils.RAW_SCAN + "=true", 356 name.getMethodName(), 357 FQ_OUTPUT_DIR, 358 "1000", // max number of key versions per key to export 359 }; 360 assertTrue(runExport(args)); 361 362 final String IMPORT_TABLE = name.getMethodName() + "import"; 363 desc = TableDescriptorBuilder 364 .newBuilder(TableName.valueOf(IMPORT_TABLE)) 365 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 366 .setMaxVersions(5) 367 .setKeepDeletedCells(KeepDeletedCells.TRUE) 368 .build()) 369 .build(); 370 UTIL.getAdmin().createTable(desc); 371 try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { 372 args = new String[] { 373 IMPORT_TABLE, 374 FQ_OUTPUT_DIR 375 }; 376 assertTrue(runImport(args)); 377 378 Scan s = new Scan(); 379 s.setMaxVersions(); 380 s.setRaw(true); 381 ResultScanner scanner = t.getScanner(s); 382 Result r = scanner.next(); 383 Cell[] res = r.rawCells(); 384 assertTrue(PrivateCellUtil.isDeleteFamily(res[0])); 385 assertEquals(now+4, res[1].getTimestamp()); 386 assertEquals(now+3, res[2].getTimestamp()); 387 assertTrue(CellUtil.isDelete(res[3])); 388 assertEquals(now+2, res[4].getTimestamp()); 389 assertEquals(now+1, res[5].getTimestamp()); 390 assertEquals(now, res[6].getTimestamp()); 391 } 392 } 393 394 395 @Test 396 public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { 397 final TableName exportTable = TableName.valueOf(name.getMethodName()); 398 TableDescriptor desc = TableDescriptorBuilder 399 .newBuilder(TableName.valueOf(name.getMethodName())) 400 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 401 .setMaxVersions(5) 402 .setKeepDeletedCells(KeepDeletedCells.TRUE) 403 .build()) 404 .build(); 405 UTIL.getAdmin().createTable(desc); 406 407 Table exportT = UTIL.getConnection().getTable(exportTable); 408 409 //Add first version of QUAL 410 Put p = new Put(ROW1); 411 p.addColumn(FAMILYA, QUAL, now, QUAL); 412 exportT.put(p); 413 414 //Add Delete family marker 415 Delete d = new Delete(ROW1, now+3); 416 exportT.delete(d); 417 418 //Add second version of QUAL 419 p = new Put(ROW1); 420 p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes()); 421 exportT.put(p); 422 423 //Add second Delete family marker 424 d = new Delete(ROW1, now+7); 425 exportT.delete(d); 426 427 428 String[] args = new String[] { 429 "-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(), 430 FQ_OUTPUT_DIR, 431 "1000", // max number of key versions per key to export 432 }; 433 assertTrue(runExport(args)); 434 435 final String importTable = name.getMethodName() + "import"; 436 desc = TableDescriptorBuilder 437 .newBuilder(TableName.valueOf(importTable)) 438 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 439 .setMaxVersions(5) 440 .setKeepDeletedCells(KeepDeletedCells.TRUE) 441 .build()) 442 .build(); 443 UTIL.getAdmin().createTable(desc); 444 445 Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); 446 args = new String[] { 447 importTable, 448 FQ_OUTPUT_DIR 449 }; 450 assertTrue(runImport(args)); 451 452 Scan s = new Scan(); 453 s.setMaxVersions(); 454 s.setRaw(true); 455 456 ResultScanner importedTScanner = importT.getScanner(s); 457 Result importedTResult = importedTScanner.next(); 458 459 ResultScanner exportedTScanner = exportT.getScanner(s); 460 Result exportedTResult = exportedTScanner.next(); 461 try { 462 Result.compareResults(exportedTResult, importedTResult); 463 } catch (Throwable e) { 464 fail("Original and imported tables data comparision failed with error:"+e.getMessage()); 465 } finally { 466 exportT.close(); 467 importT.close(); 468 } 469 } 470 471 /** 472 * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, 473 * attempt with invalid values. 474 */ 475 @Test 476 public void testWithFilter() throws Throwable { 477 // Create simple table to export 478 TableDescriptor desc = TableDescriptorBuilder 479 .newBuilder(TableName.valueOf(name.getMethodName())) 480 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 481 .setMaxVersions(5) 482 .build()) 483 .build(); 484 UTIL.getAdmin().createTable(desc); 485 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 486 487 Put p1 = new Put(ROW1); 488 p1.addColumn(FAMILYA, QUAL, now, QUAL); 489 p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); 490 p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); 491 p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); 492 p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); 493 494 // Having another row would actually test the filter. 495 Put p2 = new Put(ROW2); 496 p2.addColumn(FAMILYA, QUAL, now, QUAL); 497 498 exportTable.put(Arrays.asList(p1, p2)); 499 500 // Export the simple table 501 String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; 502 assertTrue(runExport(args)); 503 504 // Import to a new table 505 final String IMPORT_TABLE = name.getMethodName() + "import"; 506 desc = TableDescriptorBuilder 507 .newBuilder(TableName.valueOf(IMPORT_TABLE)) 508 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA) 509 .setMaxVersions(5) 510 .build()) 511 .build(); 512 UTIL.getAdmin().createTable(desc); 513 514 Table importTable = UTIL.getConnection().getTable(desc.getTableName()); 515 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), 516 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, 517 FQ_OUTPUT_DIR, 518 "1000" }; 519 assertTrue(runImport(args)); 520 521 // get the count of the source table for that time range 522 PrefixFilter filter = new PrefixFilter(ROW1); 523 int count = getCount(exportTable, filter); 524 525 Assert.assertEquals("Unexpected row count between export and import tables", count, 526 getCount(importTable, null)); 527 528 // and then test that a broken command doesn't bork everything - easier here because we don't 529 // need to re-run the export job 530 531 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), 532 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), 533 FQ_OUTPUT_DIR, "1000" }; 534 assertFalse(runImport(args)); 535 536 // cleanup 537 exportTable.close(); 538 importTable.close(); 539 } 540 541 /** 542 * Count the number of keyvalues in the specified table for the given timerange 543 * @param table 544 * @return 545 * @throws IOException 546 */ 547 private int getCount(Table table, Filter filter) throws IOException { 548 Scan scan = new Scan(); 549 scan.setFilter(filter); 550 ResultScanner results = table.getScanner(scan); 551 int count = 0; 552 for (Result res : results) { 553 count += res.size(); 554 } 555 results.close(); 556 return count; 557 } 558 559 /** 560 * test main method. Import should print help and call System.exit 561 */ 562 @Test 563 public void testImportMain() throws Throwable { 564 PrintStream oldPrintStream = System.err; 565 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 566 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); 567 System.setSecurityManager(newSecurityManager); 568 ByteArrayOutputStream data = new ByteArrayOutputStream(); 569 String[] args = {}; 570 System.setErr(new PrintStream(data)); 571 try { 572 System.setErr(new PrintStream(data)); 573 Import.main(args); 574 fail("should be SecurityException"); 575 } catch (SecurityException e) { 576 assertEquals(-1, newSecurityManager.getExitCode()); 577 assertTrue(data.toString().contains("Wrong number of arguments:")); 578 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 579 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); 580 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 581 assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); 582 } finally { 583 System.setErr(oldPrintStream); 584 System.setSecurityManager(SECURITY_MANAGER); 585 } 586 } 587 588 @Test 589 public void testExportScan() throws Exception { 590 int version = 100; 591 long startTime = System.currentTimeMillis(); 592 long endTime = startTime + 1; 593 String prefix = "row"; 594 String label_0 = "label_0"; 595 String label_1 = "label_1"; 596 String[] args = { 597 "table", 598 "outputDir", 599 String.valueOf(version), 600 String.valueOf(startTime), 601 String.valueOf(endTime), 602 prefix 603 }; 604 Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); 605 assertEquals(version, scan.getMaxVersions()); 606 assertEquals(startTime, scan.getTimeRange().getMin()); 607 assertEquals(endTime, scan.getTimeRange().getMax()); 608 assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); 609 assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 610 String[] argsWithLabels = { 611 "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, 612 "table", 613 "outputDir", 614 String.valueOf(version), 615 String.valueOf(startTime), 616 String.valueOf(endTime), 617 prefix 618 }; 619 Configuration conf = new Configuration(UTIL.getConfiguration()); 620 // parse the "-D" options 621 String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); 622 Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); 623 assertEquals(version, scanWithLabels.getMaxVersions()); 624 assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); 625 assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); 626 assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); 627 assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 628 assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); 629 assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); 630 assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); 631 } 632 633 /** 634 * test main method. Export should print help and call System.exit 635 */ 636 @Test 637 public void testExportMain() throws Throwable { 638 PrintStream oldPrintStream = System.err; 639 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 640 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); 641 System.setSecurityManager(newSecurityManager); 642 ByteArrayOutputStream data = new ByteArrayOutputStream(); 643 String[] args = {}; 644 System.setErr(new PrintStream(data)); 645 try { 646 System.setErr(new PrintStream(data)); 647 runExportMain(args); 648 fail("should be SecurityException"); 649 } catch (SecurityException e) { 650 assertEquals(-1, newSecurityManager.getExitCode()); 651 String errMsg = data.toString(); 652 assertTrue(errMsg.contains("Wrong number of arguments:")); 653 assertTrue(errMsg.contains( 654 "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + 655 "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); 656 assertTrue( 657 errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); 658 assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); 659 assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); 660 assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); 661 assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); 662 } finally { 663 System.setErr(oldPrintStream); 664 System.setSecurityManager(SECURITY_MANAGER); 665 } 666 } 667 668 /** 669 * Test map method of Importer 670 */ 671 @SuppressWarnings({ "unchecked", "rawtypes" }) 672 @Test 673 public void testKeyValueImporter() throws Throwable { 674 CellImporter importer = new CellImporter(); 675 Configuration configuration = new Configuration(); 676 Context ctx = mock(Context.class); 677 when(ctx.getConfiguration()).thenReturn(configuration); 678 679 doAnswer(new Answer<Void>() { 680 681 @Override 682 public Void answer(InvocationOnMock invocation) throws Throwable { 683 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; 684 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArguments()[1]; 685 assertEquals("Key", Bytes.toString(writer.get())); 686 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 687 return null; 688 } 689 }).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceExtendedCell.class)); 690 691 importer.setup(ctx); 692 Result value = mock(Result.class); 693 KeyValue[] keys = { 694 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 695 Bytes.toBytes("value")), 696 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 697 Bytes.toBytes("value1")) }; 698 when(value.rawCells()).thenReturn(keys); 699 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); 700 701 } 702 703 /** 704 * Test addFilterAndArguments method of Import This method set couple 705 * parameters into Configuration 706 */ 707 @Test 708 public void testAddFilterAndArguments() throws IOException { 709 Configuration configuration = new Configuration(); 710 711 List<String> args = new ArrayList<>(); 712 args.add("param1"); 713 args.add("param2"); 714 715 Import.addFilterAndArguments(configuration, FilterBase.class, args); 716 assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 717 configuration.get(Import.FILTER_CLASS_CONF_KEY)); 718 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); 719 } 720 721 @Test 722 public void testDurability() throws Throwable { 723 // Create an export table. 724 String exportTableName = name.getMethodName() + "export"; 725 try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { 726 727 // Insert some data 728 Put put = new Put(ROW1); 729 put.addColumn(FAMILYA, QUAL, now, QUAL); 730 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 731 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 732 exportTable.put(put); 733 734 put = new Put(ROW2); 735 put.addColumn(FAMILYA, QUAL, now, QUAL); 736 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 737 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 738 exportTable.put(put); 739 740 // Run the export 741 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; 742 assertTrue(runExport(args)); 743 744 // Create the table for import 745 String importTableName = name.getMethodName() + "import1"; 746 Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 747 748 // Register the wal listener for the import table 749 RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 750 .getRegions(importTable.getName()).get(0).getRegionInfo(); 751 TableWALActionListener walListener = new TableWALActionListener(region); 752 WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 753 wal.registerWALActionsListener(walListener); 754 755 // Run the import with SKIP_WAL 756 args = 757 new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), 758 importTableName, FQ_OUTPUT_DIR }; 759 assertTrue(runImport(args)); 760 //Assert that the wal is not visisted 761 assertTrue(!walListener.isWALVisited()); 762 //Ensure that the count is 2 (only one version of key value is obtained) 763 assertTrue(getCount(importTable, null) == 2); 764 765 // Run the import with the default durability option 766 importTableName = name.getMethodName() + "import2"; 767 importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 768 region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 769 .getRegions(importTable.getName()).get(0).getRegionInfo(); 770 wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 771 walListener = new TableWALActionListener(region); 772 wal.registerWALActionsListener(walListener); 773 args = new String[] { importTableName, FQ_OUTPUT_DIR }; 774 assertTrue(runImport(args)); 775 //Assert that the wal is visisted 776 assertTrue(walListener.isWALVisited()); 777 //Ensure that the count is 2 (only one version of key value is obtained) 778 assertTrue(getCount(importTable, null) == 2); 779 } 780 } 781 782 /** 783 * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to 784 * identify that an entry is written to the Write Ahead Log for the given table. 785 */ 786 private static class TableWALActionListener implements WALActionsListener { 787 788 private RegionInfo regionInfo; 789 private boolean isVisited = false; 790 791 public TableWALActionListener(RegionInfo region) { 792 this.regionInfo = region; 793 } 794 795 @Override 796 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { 797 if (logKey.getTableName().getNameAsString().equalsIgnoreCase( 798 this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) { 799 isVisited = true; 800 } 801 } 802 803 public boolean isWALVisited() { 804 return isVisited; 805 } 806 } 807}