001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import org.apache.hadoop.conf.Configurable; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileScanner; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.io.Text; 058import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 059import org.apache.hadoop.mapreduce.Job; 060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 061import org.apache.hadoop.util.Tool; 062import org.apache.hadoop.util.ToolRunner; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.ExpectedException; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@Category({VerySlowMapReduceTests.class, LargeTests.class}) 075public class TestImportTsv implements Configurable { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestImportTsv.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class); 082 protected static final String NAME = TestImportTsv.class.getSimpleName(); 083 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 084 085 // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. 086 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 087 088 /** 089 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 090 */ 091 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 092 093 private final String FAMILY = "FAM"; 094 private TableName tn; 095 private Map<String, String> args; 096 097 @Rule 098 public ExpectedException exception = ExpectedException.none(); 099 100 public Configuration getConf() { 101 return util.getConfiguration(); 102 } 103 104 public void setConf(Configuration conf) { 105 throw new IllegalArgumentException("setConf not supported"); 106 } 107 108 @BeforeClass 109 public static void provisionCluster() throws Exception { 110 util.startMiniCluster(); 111 } 112 113 @AfterClass 114 public static void releaseCluster() throws Exception { 115 util.shutdownMiniCluster(); 116 } 117 118 @Before 119 public void setup() throws Exception { 120 tn = TableName.valueOf("test-" + util.getRandomUUID()); 121 args = new HashMap<>(); 122 // Prepare the arguments required for the test. 123 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); 124 args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); 125 } 126 127 @Test 128 public void testMROnTable() throws Exception { 129 util.createTable(tn, FAMILY); 130 doMROnTableTest(null, 1); 131 util.deleteTable(tn); 132 } 133 134 @Test 135 public void testMROnTableWithTimestamp() throws Exception { 136 util.createTable(tn, FAMILY); 137 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 138 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 139 String data = "KEY,1234,VALUE1,VALUE2\n"; 140 141 doMROnTableTest(data, 1); 142 util.deleteTable(tn); 143 } 144 145 @Test 146 public void testMROnTableWithCustomMapper() 147 throws Exception { 148 util.createTable(tn, FAMILY); 149 args.put(ImportTsv.MAPPER_CONF_KEY, 150 "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); 151 152 doMROnTableTest(null, 3); 153 util.deleteTable(tn); 154 } 155 156 @Test 157 public void testBulkOutputWithoutAnExistingTable() throws Exception { 158 // Prepare the arguments required for the test. 159 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 160 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 161 162 doMROnTableTest(null, 3); 163 util.deleteTable(tn); 164 } 165 166 @Test 167 public void testBulkOutputWithAnExistingTable() throws Exception { 168 util.createTable(tn, FAMILY); 169 170 // Prepare the arguments required for the test. 171 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 172 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 173 174 doMROnTableTest(null, 3); 175 util.deleteTable(tn); 176 } 177 178 @Test 179 public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { 180 util.createTable(tn, FAMILY); 181 182 // Prepare the arguments required for the test. 183 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 184 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 185 args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); 186 doMROnTableTest(null, 3); 187 util.deleteTable(tn); 188 } 189 190 @Test 191 public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { 192 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 193 String INPUT_FILE = "InputFile1.csv"; 194 // Prepare the arguments required for the test. 195 String[] args = 196 new String[] { 197 "-D" + ImportTsv.MAPPER_CONF_KEY 198 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 199 "-D" + ImportTsv.COLUMNS_CONF_KEY 200 + "=HBASE_ROW_KEY,FAM:A,FAM:B", 201 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 202 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 203 tn.getNameAsString(), 204 INPUT_FILE 205 }; 206 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 207 new Configuration(util.getConfiguration()), 208 new ImportTsv() { 209 @Override 210 public int run(String[] args) throws Exception { 211 Job job = createSubmittableJob(getConf(), args); 212 assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); 213 assertTrue(job.getReducerClass().equals(TextSortReducer.class)); 214 assertTrue(job.getMapOutputValueClass().equals(Text.class)); 215 return 0; 216 } 217 }, args)); 218 // Delete table created by createSubmittableJob. 219 util.deleteTable(tn); 220 } 221 222 @Test 223 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 224 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 225 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 226 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 227 String data = "KEY\u001bVALUE4\u001bVALUE8\n"; 228 doMROnTableTest(data, 4); 229 util.deleteTable(tn); 230 } 231 232 @Test 233 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 234 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 235 236 Configuration conf = new Configuration(util.getConfiguration()); 237 conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); 238 conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); 239 conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 240 exception.expect(TableNotFoundException.class); 241 assertEquals("running test job configuration failed.", 0, 242 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 243 @Override public int run(String[] args) throws Exception { 244 createSubmittableJob(getConf(), args); 245 return 0; 246 } 247 }, args)); 248 } 249 250 @Test 251 public void testMRWithoutAnExistingTable() throws Exception { 252 String[] args = 253 new String[] { tn.getNameAsString(), "/inputFile" }; 254 255 exception.expect(TableNotFoundException.class); 256 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 257 new Configuration(util.getConfiguration()), 258 new ImportTsv() { 259 @Override 260 public int run(String[] args) throws Exception { 261 createSubmittableJob(getConf(), args); 262 return 0; 263 } 264 }, args)); 265 } 266 267 @Test 268 public void testJobConfigurationsWithDryMode() throws Exception { 269 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); 270 String INPUT_FILE = "InputFile1.csv"; 271 // Prepare the arguments required for the test. 272 String[] argsArray = new String[] { 273 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 274 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 275 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 276 "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", 277 tn.getNameAsString(), 278 INPUT_FILE }; 279 assertEquals("running test job configuration failed.", 0, ToolRunner.run( 280 new Configuration(util.getConfiguration()), 281 new ImportTsv() { 282 @Override 283 public int run(String[] args) throws Exception { 284 Job job = createSubmittableJob(getConf(), args); 285 assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); 286 return 0; 287 } 288 }, argsArray)); 289 // Delete table created by createSubmittableJob. 290 util.deleteTable(tn); 291 } 292 293 @Test 294 public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { 295 util.createTable(tn, FAMILY); 296 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 297 doMROnTableTest(null, 1); 298 // Dry mode should not delete an existing table. If it's not present, 299 // this will throw TableNotFoundException. 300 util.deleteTable(tn); 301 } 302 303 /** 304 * If table is not present in non-bulk mode, dry run should fail just like 305 * normal mode. 306 */ 307 @Test 308 public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { 309 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 310 exception.expect(TableNotFoundException.class); 311 doMROnTableTest(null, 1); 312 } 313 314 @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception { 315 util.createTable(tn, FAMILY); 316 // Prepare the arguments required for the test. 317 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 318 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 319 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 320 doMROnTableTest(null, 1); 321 // Dry mode should not delete an existing table. If it's not present, 322 // this will throw TableNotFoundException. 323 util.deleteTable(tn); 324 } 325 326 /** 327 * If table is not present in bulk mode and create.table is not set to yes, 328 * import should fail with TableNotFoundException. 329 */ 330 @Test 331 public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws 332 Exception { 333 // Prepare the arguments required for the test. 334 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 335 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 336 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 337 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 338 exception.expect(TableNotFoundException.class); 339 doMROnTableTest(null, 1); 340 } 341 342 @Test 343 public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { 344 // Prepare the arguments required for the test. 345 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 346 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 347 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 348 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); 349 doMROnTableTest(null, 1); 350 // Verify temporary table was deleted. 351 exception.expect(TableNotFoundException.class); 352 util.deleteTable(tn); 353 } 354 355 /** 356 * If there are invalid data rows as inputs, then only those rows should be ignored. 357 */ 358 @Test 359 public void testTsvImporterTextMapperWithInvalidData() throws Exception { 360 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 361 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 362 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 363 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 364 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 365 // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS 366 String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; 367 doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); 368 util.deleteTable(tn); 369 } 370 371 @Test 372 public void testSkipEmptyColumns() throws Exception { 373 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 374 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 375 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 376 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 377 args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); 378 // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 379 String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; 380 doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); 381 util.deleteTable(tn); 382 } 383 384 private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { 385 return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1); 386 } 387 388 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, 389 String family, String data, Map<String, String> args) throws Exception { 390 return doMROnTableTest(util, table, family, data, args, 1,-1); 391 } 392 393 /** 394 * Run an ImportTsv job and perform basic validation on the results. 395 * Returns the ImportTsv <code>Tool</code> instance so that other tests can 396 * inspect it for further validation as necessary. This method is static to 397 * insure non-reliance on instance's util/conf facilities. 398 * @param args Any arguments to pass BEFORE inputFile path is appended. 399 * @return The Tool instance used to run the test. 400 */ 401 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, 402 String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount) 403 throws Exception { 404 Configuration conf = new Configuration(util.getConfiguration()); 405 406 // populate input file 407 FileSystem fs = FileSystem.get(conf); 408 Path inputPath = fs.makeQualified( 409 new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 410 FSDataOutputStream op = fs.create(inputPath, true); 411 if (data == null) { 412 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 413 } 414 op.write(Bytes.toBytes(data)); 415 op.close(); 416 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 417 418 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 419 LOG.debug("Forcing combiner."); 420 conf.setInt("mapreduce.map.combine.minspills", 1); 421 } 422 423 // Build args array. 424 String[] argsArray = new String[args.size() + 2]; 425 Iterator it = args.entrySet().iterator(); 426 int i = 0; 427 while (it.hasNext()) { 428 Map.Entry pair = (Map.Entry) it.next(); 429 argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); 430 i++; 431 } 432 argsArray[i] = table.getNameAsString(); 433 argsArray[i + 1] = inputPath.toString(); 434 435 // run the import 436 Tool tool = new ImportTsv(); 437 LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray)); 438 assertEquals(0, ToolRunner.run(conf, tool, argsArray)); 439 440 // Perform basic validation. If the input args did not include 441 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 442 // Otherwise, validate presence of hfiles. 443 boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) && 444 "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); 445 if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 446 if (isDryRun) { 447 assertFalse(String.format("Dry run mode, %s should not have been created.", 448 ImportTsv.BULK_OUTPUT_CONF_KEY), 449 fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); 450 } else { 451 validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount); 452 } 453 } else { 454 validateTable(conf, table, family, valueMultiplier, isDryRun); 455 } 456 457 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 458 LOG.debug("Deleting test subdirectory"); 459 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 460 } 461 return tool; 462 } 463 464 /** 465 * Confirm ImportTsv via data in online table. 466 */ 467 private static void validateTable(Configuration conf, TableName tableName, 468 String family, int valueMultiplier, boolean isDryRun) throws IOException { 469 470 LOG.debug("Validating table."); 471 Connection connection = ConnectionFactory.createConnection(conf); 472 Table table = connection.getTable(tableName); 473 boolean verified = false; 474 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 475 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 476 for (int i = 0; i < numRetries; i++) { 477 try { 478 Scan scan = new Scan(); 479 // Scan entire family. 480 scan.addFamily(Bytes.toBytes(family)); 481 ResultScanner resScanner = table.getScanner(scan); 482 int numRows = 0; 483 for (Result res : resScanner) { 484 numRows++; 485 assertEquals(2, res.size()); 486 List<Cell> kvs = res.listCells(); 487 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 488 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 489 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 490 assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 491 // Only one result set is expected, so let it loop. 492 } 493 if (isDryRun) { 494 assertEquals(0, numRows); 495 } else { 496 assertEquals(1, numRows); 497 } 498 verified = true; 499 break; 500 } catch (NullPointerException e) { 501 // If here, a cell was empty. Presume its because updates came in 502 // after the scanner had been opened. Wait a while and retry. 503 } 504 try { 505 Thread.sleep(pause); 506 } catch (InterruptedException e) { 507 // continue 508 } 509 } 510 table.close(); 511 connection.close(); 512 assertTrue(verified); 513 } 514 515 /** 516 * Confirm ImportTsv via HFiles on fs. 517 */ 518 private static void validateHFiles(FileSystem fs, String outputPath, String family, 519 int expectedKVCount) throws IOException { 520 // validate number and content of output columns 521 LOG.debug("Validating HFiles."); 522 Set<String> configFamilies = new HashSet<>(); 523 configFamilies.add(family); 524 Set<String> foundFamilies = new HashSet<>(); 525 int actualKVCount = 0; 526 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 527 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 528 String cf = elements[elements.length - 1]; 529 foundFamilies.add(cf); 530 assertTrue( 531 String.format( 532 "HFile output contains a column family (%s) not present in input families (%s)", 533 cf, configFamilies), 534 configFamilies.contains(cf)); 535 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 536 assertTrue( 537 String.format("HFile %s appears to contain no data.", hfile.getPath()), 538 hfile.getLen() > 0); 539 // count the number of KVs from all the hfiles 540 if (expectedKVCount > -1) { 541 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 542 } 543 } 544 } 545 assertTrue(String.format("HFile output does not contain the input family '%s'.", family), 546 foundFamilies.contains(family)); 547 if (expectedKVCount > -1) { 548 assertTrue(String.format( 549 "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, 550 expectedKVCount), actualKVCount == expectedKVCount); 551 } 552 } 553 554 /** 555 * Method returns the total KVs in given hfile 556 * @param fs File System 557 * @param p HFile path 558 * @return KV count in the given hfile 559 * @throws IOException 560 */ 561 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 562 Configuration conf = util.getConfiguration(); 563 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 564 HFileScanner scanner = reader.getScanner(false, false); 565 scanner.seekTo(); 566 int count = 0; 567 do { 568 count++; 569 } while (scanner.next()); 570 reader.close(); 571 return count; 572 } 573} 574