001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.ByteArrayOutputStream; 032import java.io.File; 033import java.io.IOException; 034import java.io.PrintStream; 035import java.net.URL; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.List; 039import java.util.Optional; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ArrayBackedTag; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.ExtendedCell; 047import org.apache.hadoop.hbase.ExtendedCellScanner; 048import org.apache.hadoop.hbase.HBaseTestingUtil; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeepDeletedCells; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.PrivateCellUtil; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Tag; 055import org.apache.hadoop.hbase.client.ClientInternalHelper; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Get; 062import org.apache.hadoop.hbase.client.Mutation; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.RegionInfo; 065import org.apache.hadoop.hbase.client.Result; 066import org.apache.hadoop.hbase.client.ResultScanner; 067import org.apache.hadoop.hbase.client.Scan; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.coprocessor.ObserverContext; 072import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 073import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 074import org.apache.hadoop.hbase.coprocessor.RegionObserver; 075import org.apache.hadoop.hbase.filter.Filter; 076import org.apache.hadoop.hbase.filter.FilterBase; 077import org.apache.hadoop.hbase.filter.PrefixFilter; 078import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 079import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 080import org.apache.hadoop.hbase.regionserver.HRegion; 081import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 082import org.apache.hadoop.hbase.regionserver.RegionScanner; 083import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 084import org.apache.hadoop.hbase.util.Bytes; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.LauncherSecurityManager; 087import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 088import org.apache.hadoop.hbase.wal.WAL; 089import org.apache.hadoop.hbase.wal.WALEdit; 090import org.apache.hadoop.hbase.wal.WALKey; 091import org.apache.hadoop.mapreduce.Mapper.Context; 092import org.apache.hadoop.util.GenericOptionsParser; 093import org.apache.hadoop.util.ToolRunner; 094import org.junit.jupiter.api.AfterEach; 095import org.junit.jupiter.api.BeforeEach; 096import org.junit.jupiter.api.Test; 097import org.junit.jupiter.api.TestInfo; 098import org.mockito.invocation.InvocationOnMock; 099import org.mockito.stubbing.Answer; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103/** 104 * Base class for testing Import/Export. Shared logic without @BeforeAll/@AfterAll to allow 105 * subclasses to manage their own lifecycle. 106 */ 107public class TestImportExportBase { 108 109 private static final Logger LOG = LoggerFactory.getLogger(TestImportExportBase.class); 110 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 111 private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); 112 private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); 113 private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); 114 private static final String FAMILYA_STRING = "a"; 115 private static final String FAMILYB_STRING = "b"; 116 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); 117 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); 118 private static final byte[] QUAL = Bytes.toBytes("q"); 119 protected static final String OUTPUT_DIR = "outputdir"; 120 protected static String FQ_OUTPUT_DIR; 121 private static final String EXPORT_BATCH_SIZE = "100"; 122 123 private static final long now = EnvironmentEdgeManager.currentTime(); 124 protected final TableName EXPORT_TABLE = TableName.valueOf("export_table"); 125 protected final TableName IMPORT_TABLE = TableName.valueOf("import_table"); 126 public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1); 127 public static final String TEST_ATTR = "source_op"; 128 public static final String TEST_TAG = "test_tag"; 129 130 protected String name; 131 132 @BeforeEach 133 public void announce(TestInfo testInfo) { 134 name = testInfo.getTestMethod().get().getName(); 135 LOG.info("Running {}", name); 136 } 137 138 @AfterEach 139 public void cleanup() throws Throwable { 140 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 141 fs.delete(new Path(OUTPUT_DIR), true); 142 if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) { 143 UTIL.deleteTable(EXPORT_TABLE); 144 } 145 if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) { 146 UTIL.deleteTable(IMPORT_TABLE); 147 } 148 } 149 150 protected static void setUpBeforeClass() throws Exception { 151 // Up the handlers; this test needs more than usual. 152 UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 153 UTIL.startMiniCluster(); 154 FQ_OUTPUT_DIR = 155 new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 156 } 157 158 /** 159 * Runs an export job with the specified command line args 160 * @return true if job completed successfully 161 */ 162 protected boolean runExport(String[] args) throws Throwable { 163 // need to make a copy of the configuration because to make sure different temp dirs are used. 164 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); 165 return status == 0; 166 } 167 168 protected void runExportMain(String[] args) throws Throwable { 169 Export.main(args); 170 } 171 172 /** 173 * Runs an import job with the specified command line args 174 * @return true if job completed successfully 175 */ 176 boolean runImport(String[] args) throws Throwable { 177 // need to make a copy of the configuration because to make sure different temp dirs are used. 178 int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); 179 return status == 0; 180 } 181 182 /** 183 * Test simple replication case with column mapping 184 */ 185 @Test 186 public void testSimpleCase() throws Throwable { 187 try (Table t = UTIL.createTable(TableName.valueOf(name), FAMILYA, 3)) { 188 Put p = new Put(ROW1); 189 p.addColumn(FAMILYA, QUAL, now, QUAL); 190 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 191 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 192 t.put(p); 193 p = new Put(ROW2); 194 p.addColumn(FAMILYA, QUAL, now, QUAL); 195 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 196 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 197 t.put(p); 198 p = new Put(ROW3); 199 p.addColumn(FAMILYA, QUAL, now, QUAL); 200 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 201 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 202 t.put(p); 203 } 204 205 String[] args = new String[] { 206 // Only export row1 & row2. 207 "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", 208 "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name, FQ_OUTPUT_DIR, "1000", // max 209 // number 210 // of key 211 // versions 212 // per key 213 // to 214 // export 215 }; 216 assertTrue(runExport(args)); 217 218 final String IMPORT_TABLE = name + "import"; 219 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) { 220 args = 221 new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING, 222 IMPORT_TABLE, FQ_OUTPUT_DIR }; 223 assertTrue(runImport(args)); 224 225 Get g = new Get(ROW1); 226 g.readAllVersions(); 227 Result r = t.get(g); 228 assertEquals(3, r.size()); 229 g = new Get(ROW2); 230 g.readAllVersions(); 231 r = t.get(g); 232 assertEquals(3, r.size()); 233 g = new Get(ROW3); 234 r = t.get(g); 235 assertEquals(0, r.size()); 236 } 237 } 238 239 /** 240 * Test export hbase:meta table 241 */ 242 @Test 243 public void testMetaExport() throws Throwable { 244 String[] args = 245 new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" }; 246 assertTrue(runExport(args)); 247 } 248 249 /** 250 * Test import data from 0.94 exported file 251 */ 252 @Test 253 public void testImport94Table() throws Throwable { 254 final String name = "exportedTableIn94Format"; 255 URL url = TestImportExportBase.class.getResource(name); 256 File f = new File(url.toURI()); 257 if (!f.exists()) { 258 LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); 259 return; 260 } 261 assertTrue(f.exists()); 262 LOG.info("FILE=" + f); 263 Path importPath = new Path(f.toURI()); 264 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 265 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); 266 String IMPORT_TABLE = name; 267 try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) { 268 String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR }; 269 assertTrue(runImport(args)); 270 // @formatter:off 271 // exportedTableIn94Format contains 5 rows 272 // ROW COLUMN+CELL 273 // r1 column=f1:c1, timestamp=1383766761171, value=val1 274 // r2 column=f1:c1, timestamp=1383766771642, value=val2 275 // r3 column=f1:c1, timestamp=1383766777615, value=val3 276 // r4 column=f1:c1, timestamp=1383766785146, value=val4 277 // r5 column=f1:c1, timestamp=1383766791506, value=val5 278 // @formatter:on 279 assertEquals(5, UTIL.countRows(t)); 280 } 281 } 282 283 /** 284 * Test export scanner batching 285 */ 286 @Test 287 public void testExportScannerBatching() throws Throwable { 288 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name)) 289 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build()) 290 .build(); 291 UTIL.getAdmin().createTable(desc); 292 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 293 Put p = new Put(ROW1); 294 p.addColumn(FAMILYA, QUAL, now, QUAL); 295 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 296 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 297 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 298 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 299 t.put(p); 300 // added scanner batching arg. 301 String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, 302 name, FQ_OUTPUT_DIR }; 303 assertTrue(runExport(args)); 304 305 FileSystem fs = FileSystem.get(UTIL.getConfiguration()); 306 fs.delete(new Path(FQ_OUTPUT_DIR), true); 307 } 308 } 309 310 @Test 311 public void testWithDeletes() throws Throwable { 312 TableDescriptor desc = TableDescriptorBuilder 313 .newBuilder(TableName.valueOf(name)).setColumnFamily(ColumnFamilyDescriptorBuilder 314 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 315 .build(); 316 UTIL.getAdmin().createTable(desc); 317 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 318 Put p = new Put(ROW1); 319 p.addColumn(FAMILYA, QUAL, now, QUAL); 320 p.addColumn(FAMILYA, QUAL, now + 1, QUAL); 321 p.addColumn(FAMILYA, QUAL, now + 2, QUAL); 322 p.addColumn(FAMILYA, QUAL, now + 3, QUAL); 323 p.addColumn(FAMILYA, QUAL, now + 4, QUAL); 324 t.put(p); 325 326 Delete d = new Delete(ROW1, now + 3); 327 t.delete(d); 328 d = new Delete(ROW1); 329 d.addColumns(FAMILYA, QUAL, now + 2); 330 t.delete(d); 331 } 332 333 String[] args = 334 new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name, FQ_OUTPUT_DIR, "1000", // max 335 // number 336 // of key 337 // versions 338 // per key 339 // to 340 // export 341 }; 342 assertTrue(runExport(args)); 343 344 final String IMPORT_TABLE = name + "import"; 345 desc = TableDescriptorBuilder 346 .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder 347 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 348 .build(); 349 UTIL.getAdmin().createTable(desc); 350 try (Table t = UTIL.getConnection().getTable(desc.getTableName())) { 351 args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR }; 352 assertTrue(runImport(args)); 353 354 Scan s = new Scan(); 355 s.readAllVersions(); 356 s.setRaw(true); 357 ResultScanner scanner = t.getScanner(s); 358 Result r = scanner.next(); 359 ExtendedCell[] res = ClientInternalHelper.getExtendedRawCells(r); 360 assertTrue(PrivateCellUtil.isDeleteFamily(res[0])); 361 assertEquals(now + 4, res[1].getTimestamp()); 362 assertEquals(now + 3, res[2].getTimestamp()); 363 assertTrue(CellUtil.isDelete(res[3])); 364 assertEquals(now + 2, res[4].getTimestamp()); 365 assertEquals(now + 1, res[5].getTimestamp()); 366 assertEquals(now, res[6].getTimestamp()); 367 } 368 } 369 370 @Test 371 public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable { 372 final TableName exportTable = TableName.valueOf(name); 373 TableDescriptor desc = TableDescriptorBuilder 374 .newBuilder(TableName.valueOf(name)).setColumnFamily(ColumnFamilyDescriptorBuilder 375 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 376 .build(); 377 UTIL.getAdmin().createTable(desc); 378 379 Table exportT = UTIL.getConnection().getTable(exportTable); 380 381 // Add first version of QUAL 382 Put p = new Put(ROW1); 383 p.addColumn(FAMILYA, QUAL, now, QUAL); 384 exportT.put(p); 385 386 // Add Delete family marker 387 Delete d = new Delete(ROW1, now + 3); 388 exportT.delete(d); 389 390 // Add second version of QUAL 391 p = new Put(ROW1); 392 p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s")); 393 exportT.put(p); 394 395 // Add second Delete family marker 396 d = new Delete(ROW1, now + 7); 397 exportT.delete(d); 398 399 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 400 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 401 // export 402 }; 403 assertTrue(runExport(args)); 404 405 final String importTable = name + "import"; 406 desc = TableDescriptorBuilder 407 .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder 408 .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 409 .build(); 410 UTIL.getAdmin().createTable(desc); 411 412 Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); 413 args = new String[] { importTable, FQ_OUTPUT_DIR }; 414 assertTrue(runImport(args)); 415 416 Scan s = new Scan(); 417 s.readAllVersions(); 418 s.setRaw(true); 419 420 ResultScanner importedTScanner = importT.getScanner(s); 421 Result importedTResult = importedTScanner.next(); 422 423 ResultScanner exportedTScanner = exportT.getScanner(s); 424 Result exportedTResult = exportedTScanner.next(); 425 try { 426 Result.compareResults(exportedTResult, importedTResult); 427 } catch (Throwable e) { 428 fail("Original and imported tables data comparision failed with error:" + e.getMessage()); 429 } finally { 430 exportT.close(); 431 importT.close(); 432 } 433 } 434 435 /** 436 * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, 437 * attempt with invalid values. 438 */ 439 @Test 440 public void testWithFilter() throws Throwable { 441 // Create simple table to export 442 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name)) 443 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 444 .build(); 445 UTIL.getAdmin().createTable(desc); 446 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 447 448 Put p1 = new Put(ROW1); 449 p1.addColumn(FAMILYA, QUAL, now, QUAL); 450 p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); 451 p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); 452 p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); 453 p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); 454 455 // Having another row would actually test the filter. 456 Put p2 = new Put(ROW2); 457 p2.addColumn(FAMILYA, QUAL, now, QUAL); 458 459 exportTable.put(Arrays.asList(p1, p2)); 460 461 // Export the simple table 462 String[] args = new String[] { name, FQ_OUTPUT_DIR, "1000" }; 463 assertTrue(runExport(args)); 464 465 // Import to a new table 466 final String IMPORT_TABLE = name + "import"; 467 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 468 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 469 .build(); 470 UTIL.getAdmin().createTable(desc); 471 472 Table importTable = UTIL.getConnection().getTable(desc.getTableName()); 473 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), 474 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, 475 "1000" }; 476 assertTrue(runImport(args)); 477 478 // get the count of the source table for that time range 479 PrefixFilter filter = new PrefixFilter(ROW1); 480 int count = getCount(exportTable, filter); 481 482 assertEquals(count, getCount(importTable, null), 483 "Unexpected row count between export and import tables"); 484 485 // and then test that a broken command doesn't bork everything - easier here because we don't 486 // need to re-run the export job 487 488 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), 489 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name, FQ_OUTPUT_DIR, 490 "1000" }; 491 assertFalse(runImport(args)); 492 493 // cleanup 494 exportTable.close(); 495 importTable.close(); 496 } 497 498 /** 499 * Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult 500 */ 501 @Test 502 public void testBulkImportAndLargeResult() throws Throwable { 503 // Create simple table to export 504 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name)) 505 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 506 .build(); 507 UTIL.getAdmin().createTable(desc); 508 Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); 509 510 Put p1 = new Put(ROW1); 511 p1.addColumn(FAMILYA, QUAL, now, QUAL); 512 513 // Having another row would actually test the filter. 514 Put p2 = new Put(ROW2); 515 p2.addColumn(FAMILYA, QUAL, now, QUAL); 516 517 exportTable.put(Arrays.asList(p1, p2)); 518 519 // Export the simple table 520 String[] args = new String[] { name, FQ_OUTPUT_DIR, "1000" }; 521 assertTrue(runExport(args)); 522 523 // Import to a new table 524 final String IMPORT_TABLE = name + "import"; 525 desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) 526 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) 527 .build(); 528 UTIL.getAdmin().createTable(desc); 529 530 String O_OUTPUT_DIR = 531 new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); 532 533 args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR, 534 "-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; 535 assertTrue(runImport(args)); 536 } 537 538 /** 539 * Count the number of keyvalues in the specified table with the given filter 540 * @param table the table to scan 541 * @return the number of keyvalues found 542 */ 543 private int getCount(Table table, Filter filter) throws IOException { 544 Scan scan = new Scan(); 545 scan.setFilter(filter); 546 ResultScanner results = table.getScanner(scan); 547 int count = 0; 548 for (Result res : results) { 549 count += res.size(); 550 } 551 results.close(); 552 return count; 553 } 554 555 /** 556 * test main method. Import should print help and call System.exit 557 */ 558 @Test 559 public void testImportMain() throws Throwable { 560 PrintStream oldPrintStream = System.err; 561 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 562 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 563 System.setSecurityManager(newSecurityManager); 564 ByteArrayOutputStream data = new ByteArrayOutputStream(); 565 String[] args = {}; 566 System.setErr(new PrintStream(data)); 567 try { 568 System.setErr(new PrintStream(data)); 569 Import.main(args); 570 fail("should be SecurityException"); 571 } catch (SecurityException e) { 572 assertEquals(-1, newSecurityManager.getExitCode()); 573 assertTrue(data.toString().contains("Wrong number of arguments:")); 574 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 575 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); 576 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); 577 assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); 578 } finally { 579 System.setErr(oldPrintStream); 580 System.setSecurityManager(SECURITY_MANAGER); 581 } 582 } 583 584 @Test 585 public void testExportScan() throws Exception { 586 int version = 100; 587 long startTime = EnvironmentEdgeManager.currentTime(); 588 long endTime = startTime + 1; 589 String prefix = "row"; 590 String label_0 = "label_0"; 591 String label_1 = "label_1"; 592 String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime), 593 String.valueOf(endTime), prefix }; 594 Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args); 595 assertEquals(version, scan.getMaxVersions()); 596 assertEquals(startTime, scan.getTimeRange().getMin()); 597 assertEquals(endTime, scan.getTimeRange().getMax()); 598 assertEquals(true, (scan.getFilter() instanceof PrefixFilter)); 599 assertEquals(0, 600 Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix))); 601 String[] argsWithLabels = 602 { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table", 603 "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime), 604 prefix }; 605 Configuration conf = new Configuration(UTIL.getConfiguration()); 606 // parse the "-D" options 607 String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs(); 608 Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs); 609 assertEquals(version, scanWithLabels.getMaxVersions()); 610 assertEquals(startTime, scanWithLabels.getTimeRange().getMin()); 611 assertEquals(endTime, scanWithLabels.getTimeRange().getMax()); 612 assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter)); 613 assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), 614 Bytes.toBytesBinary(prefix))); 615 assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size()); 616 assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0)); 617 assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1)); 618 } 619 620 /** 621 * test main method. Export should print help and call System.exit 622 */ 623 @Test 624 public void testExportMain() throws Throwable { 625 PrintStream oldPrintStream = System.err; 626 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 627 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 628 System.setSecurityManager(newSecurityManager); 629 ByteArrayOutputStream data = new ByteArrayOutputStream(); 630 String[] args = {}; 631 System.setErr(new PrintStream(data)); 632 try { 633 System.setErr(new PrintStream(data)); 634 runExportMain(args); 635 fail("should be SecurityException"); 636 } catch (SecurityException e) { 637 assertEquals(-1, newSecurityManager.getExitCode()); 638 String errMsg = data.toString(); 639 assertTrue(errMsg.contains("Wrong number of arguments:")); 640 assertTrue( 641 errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " 642 + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); 643 assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); 644 assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); 645 assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100")); 646 assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10")); 647 assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100")); 648 } finally { 649 System.setErr(oldPrintStream); 650 System.setSecurityManager(SECURITY_MANAGER); 651 } 652 } 653 654 /** 655 * Test map method of Importer 656 */ 657 @SuppressWarnings({ "unchecked", "rawtypes" }) 658 @Test 659 public void testKeyValueImporter() throws Throwable { 660 CellImporter importer = new CellImporter(); 661 Configuration configuration = new Configuration(); 662 Context ctx = mock(Context.class); 663 when(ctx.getConfiguration()).thenReturn(configuration); 664 665 doAnswer(new Answer<Void>() { 666 667 @Override 668 public Void answer(InvocationOnMock invocation) throws Throwable { 669 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0); 670 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1); 671 assertEquals("Key", Bytes.toString(writer.get())); 672 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 673 return null; 674 } 675 }).when(ctx).write(any(), any()); 676 677 importer.setup(ctx); 678 KeyValue[] keys = { 679 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 680 Bytes.toBytes("value")), 681 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), 682 Bytes.toBytes("value1")) }; 683 Result value = Result.create(keys); 684 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); 685 686 } 687 688 /** 689 * Test addFilterAndArguments method of Import This method set couple parameters into 690 * Configuration 691 */ 692 @Test 693 public void testAddFilterAndArguments() throws IOException { 694 Configuration configuration = new Configuration(); 695 696 List<String> args = new ArrayList<>(); 697 args.add("param1"); 698 args.add("param2"); 699 700 Import.addFilterAndArguments(configuration, FilterBase.class, args); 701 assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 702 configuration.get(Import.FILTER_CLASS_CONF_KEY)); 703 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); 704 } 705 706 @Test 707 public void testDurability() throws Throwable { 708 // Create an export table. 709 String exportTableName = name + "export"; 710 try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) { 711 // Insert some data 712 Put put = new Put(ROW1); 713 put.addColumn(FAMILYA, QUAL, now, QUAL); 714 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 715 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 716 exportTable.put(put); 717 718 put = new Put(ROW2); 719 put.addColumn(FAMILYA, QUAL, now, QUAL); 720 put.addColumn(FAMILYA, QUAL, now + 1, QUAL); 721 put.addColumn(FAMILYA, QUAL, now + 2, QUAL); 722 exportTable.put(put); 723 724 // Run the export 725 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" }; 726 assertTrue(runExport(args)); 727 728 // Create the table for import 729 String importTableName = name + "import1"; 730 Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 731 732 // Register the wal listener for the import table 733 RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 734 .getRegions(importTable.getName()).get(0).getRegionInfo(); 735 TableWALActionListener walListener = new TableWALActionListener(region); 736 WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 737 wal.registerWALActionsListener(walListener); 738 739 // Run the import with SKIP_WAL 740 args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), 741 importTableName, FQ_OUTPUT_DIR }; 742 assertTrue(runImport(args)); 743 // Assert that the wal is not visisted 744 assertTrue(!walListener.isWALVisited()); 745 // Ensure that the count is 2 (only one version of key value is obtained) 746 assertTrue(getCount(importTable, null) == 2); 747 748 // Run the import with the default durability option 749 importTableName = name + "import2"; 750 importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); 751 region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() 752 .getRegions(importTable.getName()).get(0).getRegionInfo(); 753 wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); 754 walListener = new TableWALActionListener(region); 755 wal.registerWALActionsListener(walListener); 756 args = new String[] { importTableName, FQ_OUTPUT_DIR }; 757 assertTrue(runImport(args)); 758 // Assert that the wal is visisted 759 assertTrue(walListener.isWALVisited()); 760 // Ensure that the count is 2 (only one version of key value is obtained) 761 assertTrue(getCount(importTable, null) == 2); 762 } 763 } 764 765 /** 766 * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify 767 * that an entry is written to the Write Ahead Log for the given table. 768 */ 769 private static class TableWALActionListener implements WALActionsListener { 770 771 private RegionInfo regionInfo; 772 private boolean isVisited = false; 773 774 public TableWALActionListener(RegionInfo region) { 775 this.regionInfo = region; 776 } 777 778 @Override 779 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 780 if ( 781 logKey.getTableName().getNameAsString() 782 .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit()) 783 ) { 784 isVisited = true; 785 } 786 } 787 788 public boolean isWALVisited() { 789 return isVisited; 790 } 791 } 792 793 /** 794 * Add cell tags to delete mutations, run export and import tool and verify that tags are present 795 * in import table also. 796 * @throws Throwable throws Throwable. 797 */ 798 @Test 799 public void testTagsAddition() throws Throwable { 800 final TableName exportTable = TableName.valueOf(name); 801 TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable) 802 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 803 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 804 .setCoprocessor(MetadataController.class.getName()).build(); 805 UTIL.getAdmin().createTable(desc); 806 807 Table exportT = UTIL.getConnection().getTable(exportTable); 808 809 // Add first version of QUAL 810 Put p = new Put(ROW1); 811 p.addColumn(FAMILYA, QUAL, now, QUAL); 812 exportT.put(p); 813 814 // Add Delete family marker 815 Delete d = new Delete(ROW1, now + 3); 816 // Add test attribute to delete mutation. 817 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 818 exportT.delete(d); 819 820 // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool 821 // will use KeyValueCodecWithTags. 822 String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", 823 // This will make sure that codec will encode and decode tags in rpc call. 824 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 825 exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to 826 // export 827 }; 828 assertTrue(runExport(args)); 829 // Assert tag exists in exportTable 830 checkWhetherTagExists(exportTable, true); 831 832 // Create an import table with MetadataController. 833 final TableName importTable = TableName.valueOf("importWithTestTagsAddition"); 834 TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable) 835 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 836 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 837 .setCoprocessor(MetadataController.class.getName()).build(); 838 UTIL.getAdmin().createTable(importTableDesc); 839 840 // Run import tool. 841 args = new String[] { 842 // This will make sure that codec will encode and decode tags in rpc call. 843 "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags", 844 importTable.getNameAsString(), FQ_OUTPUT_DIR }; 845 assertTrue(runImport(args)); 846 // Make sure that tags exists in imported table. 847 checkWhetherTagExists(importTable, true); 848 } 849 850 private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException { 851 List<ExtendedCell> values = new ArrayList<>(); 852 for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) { 853 Scan scan = new Scan(); 854 // Make sure to set rawScan to true so that we will get Delete Markers. 855 scan.setRaw(true); 856 scan.readAllVersions(); 857 scan.withStartRow(ROW1); 858 // Need to use RegionScanner instead of table#getScanner since the latter will 859 // not return tags since it will go through rpc layer and remove tags intentionally. 860 RegionScanner scanner = region.getScanner(scan); 861 scanner.next(values); 862 if (!values.isEmpty()) { 863 break; 864 } 865 } 866 boolean deleteFound = false; 867 for (ExtendedCell cell : values) { 868 if (PrivateCellUtil.isDelete(cell.getType().getCode())) { 869 deleteFound = true; 870 List<Tag> tags = PrivateCellUtil.getTags(cell); 871 // If tagExists flag is true then validate whether tag contents are as expected. 872 if (tagExists) { 873 assertEquals(1, tags.size()); 874 for (Tag tag : tags) { 875 assertEquals(TEST_TAG, Tag.getValueAsString(tag)); 876 } 877 } else { 878 // If tagExists flag is disabled then check for 0 size tags. 879 assertEquals(0, tags.size()); 880 } 881 } 882 } 883 assertTrue(deleteFound); 884 } 885 886 /* 887 * This co-proc will add a cell tag to delete mutation. 888 */ 889 public static class MetadataController implements RegionCoprocessor, RegionObserver { 890 @Override 891 public Optional<RegionObserver> getRegionObserver() { 892 return Optional.of(this); 893 } 894 895 @Override 896 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 897 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 898 if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) { 899 return; 900 } 901 for (int i = 0; i < miniBatchOp.size(); i++) { 902 Mutation m = miniBatchOp.getOperation(i); 903 if (!(m instanceof Delete)) { 904 continue; 905 } 906 byte[] sourceOpAttr = m.getAttribute(TEST_ATTR); 907 if (sourceOpAttr == null) { 908 continue; 909 } 910 Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr); 911 List<Cell> updatedCells = new ArrayList<>(); 912 for (ExtendedCellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { 913 ExtendedCell cell = cellScanner.current(); 914 List<Tag> tags = PrivateCellUtil.getTags(cell); 915 tags.add(sourceOpTag); 916 Cell updatedCell = PrivateCellUtil.createCell(cell, tags); 917 updatedCells.add(updatedCell); 918 } 919 m.getFamilyCellMap().clear(); 920 // Clear and add new Cells to the Mutation. 921 for (Cell cell : updatedCells) { 922 Delete d = (Delete) m; 923 d.add(cell); 924 } 925 } 926 } 927 } 928 929 /** 930 * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means 931 * it will use no Codec. Make sure that we don't return Tags in response. 932 * @throws Exception Exception 933 */ 934 @Test 935 public void testTagsWithEmptyCodec() throws Exception { 936 TableName tableName = TableName.valueOf(name); 937 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 938 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5) 939 .setKeepDeletedCells(KeepDeletedCells.TRUE).build()) 940 .setCoprocessor(MetadataController.class.getName()).build(); 941 UTIL.getAdmin().createTable(tableDesc); 942 Configuration conf = new Configuration(UTIL.getConfiguration()); 943 conf.set(RPC_CODEC_CONF_KEY, ""); 944 conf.set(DEFAULT_CODEC_CLASS, ""); 945 try (Connection connection = ConnectionFactory.createConnection(conf); 946 Table table = connection.getTable(tableName)) { 947 // Add first version of QUAL 948 Put p = new Put(ROW1); 949 p.addColumn(FAMILYA, QUAL, now, QUAL); 950 table.put(p); 951 952 // Add Delete family marker 953 Delete d = new Delete(ROW1, now + 3); 954 // Add test attribute to delete mutation. 955 d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG)); 956 table.delete(d); 957 958 // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use 959 // empty Codec and it shouldn't encode/decode tags. 960 Scan scan = new Scan().withStartRow(ROW1).setRaw(true); 961 ResultScanner scanner = table.getScanner(scan); 962 int count = 0; 963 Result result; 964 while ((result = scanner.next()) != null) { 965 List<ExtendedCell> cells = Arrays.asList(ClientInternalHelper.getExtendedRawCells(result)); 966 assertEquals(2, cells.size()); 967 ExtendedCell cell = cells.get(0); 968 assertTrue(CellUtil.isDelete(cell)); 969 List<Tag> tags = PrivateCellUtil.getTags(cell); 970 assertEquals(0, tags.size()); 971 count++; 972 } 973 assertEquals(1, count); 974 } finally { 975 UTIL.deleteTable(tableName); 976 } 977 } 978}