001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.UUID; 033import org.apache.hadoop.conf.Configurable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.TableNotFoundException; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.io.hfile.CacheConfig; 053import org.apache.hadoop.hbase.io.hfile.HFile; 054import org.apache.hadoop.hbase.io.hfile.HFileScanner; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.io.Text; 059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 060import org.apache.hadoop.mapreduce.Job; 061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.ExpectedException; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({VerySlowMapReduceTests.class, LargeTests.class}) 076public class TestImportTsv implements Configurable { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestImportTsv.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class); 083 protected static final String NAME = TestImportTsv.class.getSimpleName(); 084 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 085 086 // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. 087 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 088 089 /** 090 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 091 */ 092 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 093 094 private final String FAMILY = "FAM"; 095 private TableName tn; 096 private Map<String, String> args; 097 098 @Rule 099 public ExpectedException exception = ExpectedException.none(); 100 101 public Configuration getConf() { 102 return util.getConfiguration(); 103 } 104 105 public void setConf(Configuration conf) { 106 throw new IllegalArgumentException("setConf not supported"); 107 } 108 109 @BeforeClass 110 public static void provisionCluster() throws Exception { 111 util.startMiniCluster(); 112 } 113 114 @AfterClass 115 public static void releaseCluster() throws Exception { 116 util.shutdownMiniCluster(); 117 } 118 119 @Before 120 public void setup() throws Exception { 121 tn = TableName.valueOf("test-" + UUID.randomUUID()); 122 args = new HashMap<>(); 123 // Prepare the arguments required for the test. 124 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); 125 args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); 126 } 127 128 @Test 129 public void testMROnTable() throws Exception { 130 util.createTable(tn, FAMILY); 131 doMROnTableTest(null, 1); 132 util.deleteTable(tn); 133 } 134 135 @Test 136 public void testMROnTableWithTimestamp() throws Exception { 137 util.createTable(tn, FAMILY); 138 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 139 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 140 String data = "KEY,1234,VALUE1,VALUE2\n"; 141 142 doMROnTableTest(data, 1); 143 util.deleteTable(tn); 144 } 145 146 @Test 147 public void testMROnTableWithCustomMapper() 148 throws Exception { 149 util.createTable(tn, FAMILY); 150 args.put(ImportTsv.MAPPER_CONF_KEY, 151 "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); 152 153 doMROnTableTest(null, 3); 154 util.deleteTable(tn); 155 } 156 157 @Test 158 public void testBulkOutputWithoutAnExistingTable() throws Exception { 159 // Prepare the arguments required for the test. 160 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 161 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 162 163 doMROnTableTest(null, 3); 164 util.deleteTable(tn); 165 } 166 167 @Test 168 public void testBulkOutputWithAnExistingTable() throws Exception { 169 util.createTable(tn, FAMILY); 170 171 // Prepare the arguments required for the test. 172 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 173 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 174 175 doMROnTableTest(null, 3); 176 util.deleteTable(tn); 177 } 178 179 @Test 180 public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { 181 util.createTable(tn, FAMILY); 182 183 // Prepare the arguments required for the test. 184 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 185 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 186 args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); 187 doMROnTableTest(null, 3); 188 util.deleteTable(tn); 189 } 190 191 @Test 192 public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { 193 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 194 String INPUT_FILE = "InputFile1.csv"; 195 // Prepare the arguments required for the test. 196 String[] args = 197 new String[] { 198 "-D" + ImportTsv.MAPPER_CONF_KEY 199 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 200 "-D" + ImportTsv.COLUMNS_CONF_KEY 201 + "=HBASE_ROW_KEY,FAM:A,FAM:B", 202 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 203 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 204 tn.getNameAsString(), 205 INPUT_FILE 206 }; 207 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 208 new Configuration(util.getConfiguration()), 209 new ImportTsv() { 210 @Override 211 public int run(String[] args) throws Exception { 212 Job job = createSubmittableJob(getConf(), args); 213 assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); 214 assertTrue(job.getReducerClass().equals(TextSortReducer.class)); 215 assertTrue(job.getMapOutputValueClass().equals(Text.class)); 216 return 0; 217 } 218 }, args)); 219 // Delete table created by createSubmittableJob. 220 util.deleteTable(tn); 221 } 222 223 @Test 224 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 225 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 226 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 227 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 228 String data = "KEY\u001bVALUE4\u001bVALUE8\n"; 229 doMROnTableTest(data, 4); 230 util.deleteTable(tn); 231 } 232 233 @Test 234 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 235 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 236 237 Configuration conf = new Configuration(util.getConfiguration()); 238 conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); 239 conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); 240 conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 241 exception.expect(TableNotFoundException.class); 242 assertEquals("running test job configuration failed.", 0, 243 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 244 @Override public int run(String[] args) throws Exception { 245 createSubmittableJob(getConf(), args); 246 return 0; 247 } 248 }, args)); 249 } 250 251 @Test 252 public void testMRWithoutAnExistingTable() throws Exception { 253 String[] args = 254 new String[] { tn.getNameAsString(), "/inputFile" }; 255 256 exception.expect(TableNotFoundException.class); 257 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 258 new Configuration(util.getConfiguration()), 259 new ImportTsv() { 260 @Override 261 public int run(String[] args) throws Exception { 262 createSubmittableJob(getConf(), args); 263 return 0; 264 } 265 }, args)); 266 } 267 268 @Test 269 public void testJobConfigurationsWithDryMode() throws Exception { 270 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 271 String INPUT_FILE = "InputFile1.csv"; 272 // Prepare the arguments required for the test. 273 String[] argsArray = new String[] { 274 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 275 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 276 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 277 "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", 278 tn.getNameAsString(), 279 INPUT_FILE }; 280 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 281 new Configuration(util.getConfiguration()), 282 new ImportTsv() { 283 @Override 284 public int run(String[] args) throws Exception { 285 Job job = createSubmittableJob(getConf(), args); 286 assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); 287 return 0; 288 } 289 }, argsArray)); 290 // Delete table created by createSubmittableJob. 291 util.deleteTable(tn); 292 } 293 294 @Test 295 public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { 296 util.createTable(tn, FAMILY); 297 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 298 doMROnTableTest(null, 1); 299 // Dry mode should not delete an existing table. If it's not present, 300 // this will throw TableNotFoundException. 301 util.deleteTable(tn); 302 } 303 304 /** 305 * If table is not present in non-bulk mode, dry run should fail just like 306 * normal mode. 307 */ 308 @Test 309 public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { 310 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 311 exception.expect(TableNotFoundException.class); 312 doMROnTableTest(null, 1); 313 } 314 315 @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception { 316 util.createTable(tn, FAMILY); 317 // Prepare the arguments required for the test. 318 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 319 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 320 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 321 doMROnTableTest(null, 1); 322 // Dry mode should not delete an existing table. If it's not present, 323 // this will throw TableNotFoundException. 324 util.deleteTable(tn); 325 } 326 327 /** 328 * If table is not present in bulk mode and create.table is not set to yes, 329 * import should fail with TableNotFoundException. 330 */ 331 @Test 332 public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws 333 Exception { 334 // Prepare the arguments required for the test. 335 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 336 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 337 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 338 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 339 exception.expect(TableNotFoundException.class); 340 doMROnTableTest(null, 1); 341 } 342 343 @Test 344 public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { 345 // Prepare the arguments required for the test. 346 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 347 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 348 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 349 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); 350 doMROnTableTest(null, 1); 351 // Verify temporary table was deleted. 352 exception.expect(TableNotFoundException.class); 353 util.deleteTable(tn); 354 } 355 356 /** 357 * If there are invalid data rows as inputs, then only those rows should be ignored. 358 */ 359 @Test 360 public void testTsvImporterTextMapperWithInvalidData() throws Exception { 361 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 362 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 363 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 364 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 365 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 366 // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS 367 String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; 368 doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); 369 util.deleteTable(tn); 370 } 371 372 @Test 373 public void testSkipEmptyColumns() throws Exception { 374 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 375 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 376 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 377 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 378 args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); 379 // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 380 String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; 381 doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); 382 util.deleteTable(tn); 383 } 384 385 private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { 386 return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1); 387 } 388 389 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, 390 String family, String data, Map<String, String> args) throws Exception { 391 return doMROnTableTest(util, table, family, data, args, 1,-1); 392 } 393 394 /** 395 * Run an ImportTsv job and perform basic validation on the results. 396 * Returns the ImportTsv <code>Tool</code> instance so that other tests can 397 * inspect it for further validation as necessary. This method is static to 398 * insure non-reliance on instance's util/conf facilities. 399 * @param args Any arguments to pass BEFORE inputFile path is appended. 400 * @return The Tool instance used to run the test. 401 */ 402 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, 403 String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount) 404 throws Exception { 405 Configuration conf = new Configuration(util.getConfiguration()); 406 407 // populate input file 408 FileSystem fs = FileSystem.get(conf); 409 Path inputPath = fs.makeQualified( 410 new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 411 FSDataOutputStream op = fs.create(inputPath, true); 412 if (data == null) { 413 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 414 } 415 op.write(Bytes.toBytes(data)); 416 op.close(); 417 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 418 419 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 420 LOG.debug("Forcing combiner."); 421 conf.setInt("mapreduce.map.combine.minspills", 1); 422 } 423 424 // Build args array. 425 String[] argsArray = new String[args.size() + 2]; 426 Iterator it = args.entrySet().iterator(); 427 int i = 0; 428 while (it.hasNext()) { 429 Map.Entry pair = (Map.Entry) it.next(); 430 argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); 431 i++; 432 } 433 argsArray[i] = table.getNameAsString(); 434 argsArray[i + 1] = inputPath.toString(); 435 436 // run the import 437 Tool tool = new ImportTsv(); 438 LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray)); 439 assertEquals(0, ToolRunner.run(conf, tool, argsArray)); 440 441 // Perform basic validation. If the input args did not include 442 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 443 // Otherwise, validate presence of hfiles. 444 boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) && 445 "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); 446 if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 447 if (isDryRun) { 448 assertFalse(String.format("Dry run mode, %s should not have been created.", 449 ImportTsv.BULK_OUTPUT_CONF_KEY), 450 fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); 451 } else { 452 validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount); 453 } 454 } else { 455 validateTable(conf, table, family, valueMultiplier, isDryRun); 456 } 457 458 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 459 LOG.debug("Deleting test subdirectory"); 460 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 461 } 462 return tool; 463 } 464 465 /** 466 * Confirm ImportTsv via data in online table. 467 */ 468 private static void validateTable(Configuration conf, TableName tableName, 469 String family, int valueMultiplier, boolean isDryRun) throws IOException { 470 471 LOG.debug("Validating table."); 472 Connection connection = ConnectionFactory.createConnection(conf); 473 Table table = connection.getTable(tableName); 474 boolean verified = false; 475 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 476 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 477 for (int i = 0; i < numRetries; i++) { 478 try { 479 Scan scan = new Scan(); 480 // Scan entire family. 481 scan.addFamily(Bytes.toBytes(family)); 482 ResultScanner resScanner = table.getScanner(scan); 483 int numRows = 0; 484 for (Result res : resScanner) { 485 numRows++; 486 assertEquals(2, res.size()); 487 List<Cell> kvs = res.listCells(); 488 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 489 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 490 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 491 assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 492 // Only one result set is expected, so let it loop. 493 } 494 if (isDryRun) { 495 assertEquals(0, numRows); 496 } else { 497 assertEquals(1, numRows); 498 } 499 verified = true; 500 break; 501 } catch (NullPointerException e) { 502 // If here, a cell was empty. Presume its because updates came in 503 // after the scanner had been opened. Wait a while and retry. 504 } 505 try { 506 Thread.sleep(pause); 507 } catch (InterruptedException e) { 508 // continue 509 } 510 } 511 table.close(); 512 connection.close(); 513 assertTrue(verified); 514 } 515 516 /** 517 * Confirm ImportTsv via HFiles on fs. 518 */ 519 private static void validateHFiles(FileSystem fs, String outputPath, String family, 520 int expectedKVCount) throws IOException { 521 // validate number and content of output columns 522 LOG.debug("Validating HFiles."); 523 Set<String> configFamilies = new HashSet<>(); 524 configFamilies.add(family); 525 Set<String> foundFamilies = new HashSet<>(); 526 int actualKVCount = 0; 527 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 528 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 529 String cf = elements[elements.length - 1]; 530 foundFamilies.add(cf); 531 assertTrue( 532 String.format( 533 "HFile output contains a column family (%s) not present in input families (%s)", 534 cf, configFamilies), 535 configFamilies.contains(cf)); 536 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 537 assertTrue( 538 String.format("HFile %s appears to contain no data.", hfile.getPath()), 539 hfile.getLen() > 0); 540 // count the number of KVs from all the hfiles 541 if (expectedKVCount > -1) { 542 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 543 } 544 } 545 } 546 assertTrue(String.format("HFile output does not contain the input family '%s'.", family), 547 foundFamilies.contains(family)); 548 if (expectedKVCount > -1) { 549 assertTrue(String.format( 550 "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, 551 expectedKVCount), actualKVCount == expectedKVCount); 552 } 553 } 554 555 /** 556 * Method returns the total KVs in given hfile 557 * @param fs File System 558 * @param p HFile path 559 * @return KV count in the given hfile 560 * @throws IOException 561 */ 562 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 563 Configuration conf = util.getConfiguration(); 564 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 565 reader.loadFileInfo(); 566 HFileScanner scanner = reader.getScanner(false, false); 567 scanner.seekTo(); 568 int count = 0; 569 do { 570 count++; 571 } while (scanner.next()); 572 reader.close(); 573 return count; 574 } 575} 576