001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import org.apache.hadoop.conf.Configurable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileScanner; 054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.io.Text; 059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 060import org.apache.hadoop.mapreduce.Job; 061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.junit.jupiter.api.AfterAll; 065import org.junit.jupiter.api.BeforeAll; 066import org.junit.jupiter.api.BeforeEach; 067import org.junit.jupiter.api.Tag; 068import org.junit.jupiter.api.Test; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Tag(VerySlowMapReduceTests.TAG) 073@Tag(LargeTests.TAG) 074public class TestImportTsv implements Configurable { 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class); 077 protected static final String NAME = TestImportTsv.class.getSimpleName(); 078 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 079 080 // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. 081 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 082 083 /** 084 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 085 */ 086 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 087 088 private final String FAMILY = "FAM"; 089 private TableName tn; 090 private Map<String, String> args; 091 092 public Configuration getConf() { 093 return util.getConfiguration(); 094 } 095 096 public void setConf(Configuration conf) { 097 throw new IllegalArgumentException("setConf not supported"); 098 } 099 100 @BeforeAll 101 public static void provisionCluster() throws Exception { 102 util.startMiniCluster(); 103 } 104 105 @AfterAll 106 public static void releaseCluster() throws Exception { 107 util.shutdownMiniCluster(); 108 } 109 110 @BeforeEach 111 public void setup() throws Exception { 112 tn = TableName.valueOf("test-" + util.getRandomUUID()); 113 args = new HashMap<>(); 114 // Prepare the arguments required for the test. 115 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); 116 args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); 117 } 118 119 @Test 120 public void testMROnTable() throws Exception { 121 util.createTable(tn, FAMILY); 122 doMROnTableTest(null, 1); 123 util.deleteTable(tn); 124 } 125 126 @Test 127 public void testMROnTableWithTimestamp() throws Exception { 128 util.createTable(tn, FAMILY); 129 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 130 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 131 String data = "KEY,1234,VALUE1,VALUE2\n"; 132 133 doMROnTableTest(data, 1); 134 util.deleteTable(tn); 135 } 136 137 @Test 138 public void testMROnTableWithCustomMapper() throws Exception { 139 util.createTable(tn, FAMILY); 140 args.put(ImportTsv.MAPPER_CONF_KEY, 141 "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); 142 143 doMROnTableTest(null, 3); 144 util.deleteTable(tn); 145 } 146 147 @Test 148 public void testBulkOutputWithoutAnExistingTable() throws Exception { 149 // Prepare the arguments required for the test. 150 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 151 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 152 153 doMROnTableTest(null, 3); 154 util.deleteTable(tn); 155 } 156 157 @Test 158 public void testBulkOutputWithAnExistingTable() throws Exception { 159 util.createTable(tn, FAMILY); 160 161 // Prepare the arguments required for the test. 162 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 163 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 164 165 doMROnTableTest(null, 3); 166 util.deleteTable(tn); 167 } 168 169 @Test 170 public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { 171 util.createTable(tn, FAMILY); 172 173 // Prepare the arguments required for the test. 174 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 175 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 176 args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); 177 doMROnTableTest(null, 3); 178 util.deleteTable(tn); 179 } 180 181 @Test 182 public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { 183 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 184 String INPUT_FILE = "InputFile1.csv"; 185 // Prepare the arguments required for the test. 186 String[] args = new String[] { 187 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 188 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 189 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 190 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(), 191 INPUT_FILE }; 192 assertEquals(0, ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 193 @Override 194 public int run(String[] args) throws Exception { 195 Job job = createSubmittableJob(getConf(), args); 196 assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); 197 assertTrue(job.getReducerClass().equals(TextSortReducer.class)); 198 assertTrue(job.getMapOutputValueClass().equals(Text.class)); 199 return 0; 200 } 201 }, args), "running test job configuration failed."); 202 // Delete table created by createSubmittableJob. 203 util.deleteTable(tn); 204 } 205 206 @Test 207 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 208 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 209 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 210 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 211 String data = "KEY\u001bVALUE4\u001bVALUE8\n"; 212 doMROnTableTest(data, 4); 213 util.deleteTable(tn); 214 } 215 216 @Test 217 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 218 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 219 220 Configuration conf = new Configuration(util.getConfiguration()); 221 conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); 222 conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); 223 conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 224 assertThrows(TableNotFoundException.class, () -> { 225 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 226 @Override 227 public int run(String[] args) throws Exception { 228 createSubmittableJob(getConf(), args); 229 return 0; 230 } 231 }, args); 232 }, "running test job configuration failed."); 233 } 234 235 @Test 236 public void testMRNoMatchedColumnFamily() throws Exception { 237 util.createTable(tn, FAMILY); 238 239 String[] args = new String[] { 240 "-D" + ImportTsv.COLUMNS_CONF_KEY 241 + "=HBASE_ROW_KEY,FAM:A,FAM01_ERROR:A,FAM01_ERROR:B,FAM02_ERROR:C", 242 tn.getNameAsString(), "/inputFile" }; 243 assertThrows(NoSuchColumnFamilyException.class, () -> { 244 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 245 @Override 246 public int run(String[] args) throws Exception { 247 createSubmittableJob(getConf(), args); 248 return 0; 249 } 250 }, args); 251 }, "running test job configuration failed."); 252 253 util.deleteTable(tn); 254 } 255 256 @Test 257 public void testMRWithoutAnExistingTable() throws Exception { 258 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 259 260 assertThrows(TableNotFoundException.class, () -> { 261 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 262 @Override 263 public int run(String[] args) throws Exception { 264 createSubmittableJob(getConf(), args); 265 return 0; 266 } 267 }, args); 268 }, "running test job configuration failed."); 269 } 270 271 @Test 272 public void testJobConfigurationsWithDryMode() throws Exception { 273 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 274 String INPUT_FILE = "InputFile1.csv"; 275 // Prepare the arguments required for the test. 276 String[] argsArray = 277 new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 278 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 279 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 280 "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE }; 281 assertEquals(0, ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 282 @Override 283 public int run(String[] args) throws Exception { 284 Job job = createSubmittableJob(getConf(), args); 285 assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); 286 return 0; 287 } 288 }, argsArray), "running test job configuration failed."); 289 // Delete table created by createSubmittableJob. 290 util.deleteTable(tn); 291 } 292 293 @Test 294 public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { 295 util.createTable(tn, FAMILY); 296 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 297 doMROnTableTest(null, 1); 298 // Dry mode should not delete an existing table. If it's not present, 299 // this will throw TableNotFoundException. 300 util.deleteTable(tn); 301 } 302 303 /** 304 * If table is not present in non-bulk mode, dry run should fail just like normal mode. 305 */ 306 @Test 307 public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { 308 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 309 assertThrows(TableNotFoundException.class, () -> doMROnTableTest(null, 1)); 310 } 311 312 @Test 313 public void testDryModeWithBulkOutputAndTableExists() throws Exception { 314 util.createTable(tn, FAMILY); 315 // Prepare the arguments required for the test. 316 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 317 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 318 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 319 doMROnTableTest(null, 1); 320 // Dry mode should not delete an existing table. If it's not present, 321 // this will throw TableNotFoundException. 322 util.deleteTable(tn); 323 } 324 325 /** 326 * If table is not present in bulk mode and create.table is not set to yes, import should fail 327 * with TableNotFoundException. 328 */ 329 @Test 330 public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception { 331 // Prepare the arguments required for the test. 332 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 333 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 334 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 335 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 336 assertThrows(TableNotFoundException.class, () -> doMROnTableTest(null, 1)); 337 } 338 339 @Test 340 public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { 341 // Prepare the arguments required for the test. 342 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 343 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 344 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 345 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); 346 doMROnTableTest(null, 1); 347 // Verify temporary table was deleted. 348 assertThrows(TableNotFoundException.class, () -> util.deleteTable(tn)); 349 } 350 351 /** 352 * If there are invalid data rows as inputs, then only those rows should be ignored. 353 */ 354 @Test 355 public void testTsvImporterTextMapperWithInvalidData() throws Exception { 356 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 357 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 358 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 359 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 360 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 361 // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS 362 String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; 363 doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); 364 util.deleteTable(tn); 365 } 366 367 @Test 368 public void testSkipEmptyColumns() throws Exception { 369 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 370 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 371 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 372 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 373 args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); 374 // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 375 String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; 376 doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); 377 util.deleteTable(tn); 378 } 379 380 private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { 381 return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1); 382 } 383 384 protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family, 385 String data, Map<String, String> args) throws Exception { 386 return doMROnTableTest(util, table, family, data, args, 1, -1); 387 } 388 389 /** 390 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 391 * <code>Tool</code> instance so that other tests can inspect it for further validation as 392 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. 393 * @param args Any arguments to pass BEFORE inputFile path is appended. 394 * @return The Tool instance used to run the test. 395 */ 396 protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family, 397 String data, Map<String, String> args, int valueMultiplier, int expectedKVCount) 398 throws Exception { 399 Configuration conf = new Configuration(util.getConfiguration()); 400 401 // populate input file 402 FileSystem fs = FileSystem.get(conf); 403 Path inputPath = 404 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 405 FSDataOutputStream op = fs.create(inputPath, true); 406 if (data == null) { 407 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 408 } 409 op.write(Bytes.toBytes(data)); 410 op.close(); 411 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 412 413 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 414 LOG.debug("Forcing combiner."); 415 conf.setInt("mapreduce.map.combine.minspills", 1); 416 } 417 418 // Build args array. 419 String[] argsArray = new String[args.size() + 2]; 420 Iterator it = args.entrySet().iterator(); 421 int i = 0; 422 while (it.hasNext()) { 423 Map.Entry pair = (Map.Entry) it.next(); 424 argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); 425 i++; 426 } 427 argsArray[i] = table.getNameAsString(); 428 argsArray[i + 1] = inputPath.toString(); 429 430 // run the import 431 Tool tool = new ImportTsv(); 432 LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray)); 433 assertEquals(0, ToolRunner.run(conf, tool, argsArray)); 434 435 // Perform basic validation. If the input args did not include 436 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 437 // Otherwise, validate presence of hfiles. 438 boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) 439 && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); 440 if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 441 if (isDryRun) { 442 assertFalse(fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)), String.format( 443 "Dry run mode, %s should not have been created.", ImportTsv.BULK_OUTPUT_CONF_KEY)); 444 } else { 445 validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount); 446 } 447 } else { 448 validateTable(conf, table, family, valueMultiplier, isDryRun); 449 } 450 451 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 452 LOG.debug("Deleting test subdirectory"); 453 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 454 } 455 return tool; 456 } 457 458 /** 459 * Confirm ImportTsv via data in online table. 460 */ 461 private static void validateTable(Configuration conf, TableName tableName, String family, 462 int valueMultiplier, boolean isDryRun) throws IOException { 463 464 LOG.debug("Validating table."); 465 Connection connection = ConnectionFactory.createConnection(conf); 466 Table table = connection.getTable(tableName); 467 boolean verified = false; 468 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 469 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 470 for (int i = 0; i < numRetries; i++) { 471 try { 472 Scan scan = new Scan(); 473 // Scan entire family. 474 scan.addFamily(Bytes.toBytes(family)); 475 ResultScanner resScanner = table.getScanner(scan); 476 int numRows = 0; 477 for (Result res : resScanner) { 478 numRows++; 479 assertEquals(2, res.size()); 480 List<Cell> kvs = res.listCells(); 481 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 482 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 483 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 484 assertTrue( 485 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 486 // Only one result set is expected, so let it loop. 487 } 488 if (isDryRun) { 489 assertEquals(0, numRows); 490 } else { 491 assertEquals(1, numRows); 492 } 493 verified = true; 494 break; 495 } catch (NullPointerException e) { 496 // If here, a cell was empty. Presume its because updates came in 497 // after the scanner had been opened. Wait a while and retry. 498 } 499 try { 500 Thread.sleep(pause); 501 } catch (InterruptedException e) { 502 // continue 503 } 504 } 505 table.close(); 506 connection.close(); 507 assertTrue(verified); 508 } 509 510 /** 511 * Confirm ImportTsv via HFiles on fs. 512 */ 513 private static void validateHFiles(FileSystem fs, String outputPath, String family, 514 int expectedKVCount) throws IOException { 515 // validate number and content of output columns 516 LOG.debug("Validating HFiles."); 517 Set<String> configFamilies = new HashSet<>(); 518 configFamilies.add(family); 519 Set<String> foundFamilies = new HashSet<>(); 520 int actualKVCount = 0; 521 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 522 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 523 String cf = elements[elements.length - 1]; 524 foundFamilies.add(cf); 525 assertTrue(configFamilies.contains(cf), 526 String.format( 527 "HFile output contains a column family (%s) not present in input families (%s)", cf, 528 configFamilies)); 529 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 530 assertTrue(hfile.getLen() > 0, 531 String.format("HFile %s appears to contain no data.", hfile.getPath())); 532 // count the number of KVs from all the hfiles 533 if (expectedKVCount > -1) { 534 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 535 } 536 } 537 } 538 assertTrue(foundFamilies.contains(family), 539 String.format("HFile output does not contain the input family '%s'.", family)); 540 if (expectedKVCount > -1) { 541 assertTrue(actualKVCount == expectedKVCount, 542 String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", 543 actualKVCount, expectedKVCount)); 544 } 545 } 546 547 /** 548 * Method returns the total KVs in given hfile 549 * @param fs File System 550 * @param p HFile path 551 * @return KV count in the given hfile 552 */ 553 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 554 Configuration conf = util.getConfiguration(); 555 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 556 HFileScanner scanner = reader.getScanner(conf, false, false); 557 scanner.seekTo(); 558 int count = 0; 559 do { 560 count++; 561 } while (scanner.next()); 562 reader.close(); 563 return count; 564 } 565}