001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import org.apache.hadoop.conf.Configurable; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.TableNotFoundException; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileScanner; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.io.Text; 058import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; 059import org.apache.hadoop.mapreduce.Job; 060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 061import org.apache.hadoop.util.Tool; 062import org.apache.hadoop.util.ToolRunner; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.ExpectedException; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 075public class TestImportTsv implements Configurable { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestImportTsv.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class); 082 protected static final String NAME = TestImportTsv.class.getSimpleName(); 083 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 084 085 // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. 086 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 087 088 /** 089 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 090 */ 091 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 092 093 private final String FAMILY = "FAM"; 094 private TableName tn; 095 private Map<String, String> args; 096 097 @Rule 098 public ExpectedException exception = ExpectedException.none(); 099 100 public Configuration getConf() { 101 return util.getConfiguration(); 102 } 103 104 public void setConf(Configuration conf) { 105 throw new IllegalArgumentException("setConf not supported"); 106 } 107 108 @BeforeClass 109 public static void provisionCluster() throws Exception { 110 util.startMiniCluster(); 111 } 112 113 @AfterClass 114 public static void releaseCluster() throws Exception { 115 util.shutdownMiniCluster(); 116 } 117 118 @Before 119 public void setup() throws Exception { 120 tn = TableName.valueOf("test-" + util.getRandomUUID()); 121 args = new HashMap<>(); 122 // Prepare the arguments required for the test. 123 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); 124 args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); 125 } 126 127 @Test 128 public void testMROnTable() throws Exception { 129 util.createTable(tn, FAMILY); 130 doMROnTableTest(null, 1); 131 util.deleteTable(tn); 132 } 133 134 @Test 135 public void testMROnTableWithTimestamp() throws Exception { 136 util.createTable(tn, FAMILY); 137 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 138 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 139 String data = "KEY,1234,VALUE1,VALUE2\n"; 140 141 doMROnTableTest(data, 1); 142 util.deleteTable(tn); 143 } 144 145 @Test 146 public void testMROnTableWithCustomMapper() throws Exception { 147 util.createTable(tn, FAMILY); 148 args.put(ImportTsv.MAPPER_CONF_KEY, 149 "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); 150 151 doMROnTableTest(null, 3); 152 util.deleteTable(tn); 153 } 154 155 @Test 156 public void testBulkOutputWithoutAnExistingTable() throws Exception { 157 // Prepare the arguments required for the test. 158 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 159 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 160 161 doMROnTableTest(null, 3); 162 util.deleteTable(tn); 163 } 164 165 @Test 166 public void testBulkOutputWithAnExistingTable() throws Exception { 167 util.createTable(tn, FAMILY); 168 169 // Prepare the arguments required for the test. 170 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 171 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 172 173 doMROnTableTest(null, 3); 174 util.deleteTable(tn); 175 } 176 177 @Test 178 public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { 179 util.createTable(tn, FAMILY); 180 181 // Prepare the arguments required for the test. 182 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 183 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 184 args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); 185 doMROnTableTest(null, 3); 186 util.deleteTable(tn); 187 } 188 189 @Test 190 public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { 191 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 192 String INPUT_FILE = "InputFile1.csv"; 193 // Prepare the arguments required for the test. 194 String[] args = new String[] { 195 "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", 196 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 197 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 198 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(), 199 INPUT_FILE }; 200 assertEquals("running test job configuration failed.", 0, 201 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 202 @Override 203 public int run(String[] args) throws Exception { 204 Job job = createSubmittableJob(getConf(), args); 205 assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); 206 assertTrue(job.getReducerClass().equals(TextSortReducer.class)); 207 assertTrue(job.getMapOutputValueClass().equals(Text.class)); 208 return 0; 209 } 210 }, args)); 211 // Delete table created by createSubmittableJob. 212 util.deleteTable(tn); 213 } 214 215 @Test 216 public void testBulkOutputWithTsvImporterTextMapper() throws Exception { 217 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 218 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 219 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 220 String data = "KEY\u001bVALUE4\u001bVALUE8\n"; 221 doMROnTableTest(data, 4); 222 util.deleteTable(tn); 223 } 224 225 @Test 226 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 227 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 228 229 Configuration conf = new Configuration(util.getConfiguration()); 230 conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); 231 conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); 232 conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 233 exception.expect(TableNotFoundException.class); 234 assertEquals("running test job configuration failed.", 0, 235 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 236 @Override 237 public int run(String[] args) throws Exception { 238 createSubmittableJob(getConf(), args); 239 return 0; 240 } 241 }, args)); 242 } 243 244 @Test 245 public void testMRWithoutAnExistingTable() throws Exception { 246 String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; 247 248 exception.expect(TableNotFoundException.class); 249 assertEquals("running test job configuration failed.", 0, 250 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 251 @Override 252 public int run(String[] args) throws Exception { 253 createSubmittableJob(getConf(), args); 254 return 0; 255 } 256 }, args)); 257 } 258 259 @Test 260 public void testJobConfigurationsWithDryMode() throws Exception { 261 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 262 String INPUT_FILE = "InputFile1.csv"; 263 // Prepare the arguments required for the test. 264 String[] argsArray = 265 new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", 266 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", 267 "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), 268 "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE }; 269 assertEquals("running test job configuration failed.", 0, 270 ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { 271 @Override 272 public int run(String[] args) throws Exception { 273 Job job = createSubmittableJob(getConf(), args); 274 assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); 275 return 0; 276 } 277 }, argsArray)); 278 // Delete table created by createSubmittableJob. 279 util.deleteTable(tn); 280 } 281 282 @Test 283 public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { 284 util.createTable(tn, FAMILY); 285 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 286 doMROnTableTest(null, 1); 287 // Dry mode should not delete an existing table. If it's not present, 288 // this will throw TableNotFoundException. 289 util.deleteTable(tn); 290 } 291 292 /** 293 * If table is not present in non-bulk mode, dry run should fail just like normal mode. 294 */ 295 @Test 296 public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { 297 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 298 exception.expect(TableNotFoundException.class); 299 doMROnTableTest(null, 1); 300 } 301 302 @Test 303 public void testDryModeWithBulkOutputAndTableExists() throws Exception { 304 util.createTable(tn, FAMILY); 305 // Prepare the arguments required for the test. 306 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 307 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 308 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 309 doMROnTableTest(null, 1); 310 // Dry mode should not delete an existing table. If it's not present, 311 // this will throw TableNotFoundException. 312 util.deleteTable(tn); 313 } 314 315 /** 316 * If table is not present in bulk mode and create.table is not set to yes, import should fail 317 * with TableNotFoundException. 318 */ 319 @Test 320 public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception { 321 // Prepare the arguments required for the test. 322 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 323 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 324 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 325 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); 326 exception.expect(TableNotFoundException.class); 327 doMROnTableTest(null, 1); 328 } 329 330 @Test 331 public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { 332 // Prepare the arguments required for the test. 333 Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 334 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 335 args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); 336 args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); 337 doMROnTableTest(null, 1); 338 // Verify temporary table was deleted. 339 exception.expect(TableNotFoundException.class); 340 util.deleteTable(tn); 341 } 342 343 /** 344 * If there are invalid data rows as inputs, then only those rows should be ignored. 345 */ 346 @Test 347 public void testTsvImporterTextMapperWithInvalidData() throws Exception { 348 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 349 args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); 350 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 351 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 352 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 353 // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS 354 String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; 355 doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); 356 util.deleteTable(tn); 357 } 358 359 @Test 360 public void testSkipEmptyColumns() throws Exception { 361 Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); 362 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 363 args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); 364 args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); 365 args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); 366 // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 367 String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; 368 doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); 369 util.deleteTable(tn); 370 } 371 372 private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { 373 return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1); 374 } 375 376 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family, 377 String data, Map<String, String> args) throws Exception { 378 return doMROnTableTest(util, table, family, data, args, 1, -1); 379 } 380 381 /** 382 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 383 * <code>Tool</code> instance so that other tests can inspect it for further validation as 384 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. 385 * @param args Any arguments to pass BEFORE inputFile path is appended. 386 * @return The Tool instance used to run the test. 387 */ 388 protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family, 389 String data, Map<String, String> args, int valueMultiplier, int expectedKVCount) 390 throws Exception { 391 Configuration conf = new Configuration(util.getConfiguration()); 392 393 // populate input file 394 FileSystem fs = FileSystem.get(conf); 395 Path inputPath = 396 fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 397 FSDataOutputStream op = fs.create(inputPath, true); 398 if (data == null) { 399 data = "KEY\u001bVALUE1\u001bVALUE2\n"; 400 } 401 op.write(Bytes.toBytes(data)); 402 op.close(); 403 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 404 405 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 406 LOG.debug("Forcing combiner."); 407 conf.setInt("mapreduce.map.combine.minspills", 1); 408 } 409 410 // Build args array. 411 String[] argsArray = new String[args.size() + 2]; 412 Iterator it = args.entrySet().iterator(); 413 int i = 0; 414 while (it.hasNext()) { 415 Map.Entry pair = (Map.Entry) it.next(); 416 argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); 417 i++; 418 } 419 argsArray[i] = table.getNameAsString(); 420 argsArray[i + 1] = inputPath.toString(); 421 422 // run the import 423 Tool tool = new ImportTsv(); 424 LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray)); 425 assertEquals(0, ToolRunner.run(conf, tool, argsArray)); 426 427 // Perform basic validation. If the input args did not include 428 // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. 429 // Otherwise, validate presence of hfiles. 430 boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) 431 && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); 432 if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { 433 if (isDryRun) { 434 assertFalse(String.format("Dry run mode, %s should not have been created.", 435 ImportTsv.BULK_OUTPUT_CONF_KEY), fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); 436 } else { 437 validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount); 438 } 439 } else { 440 validateTable(conf, table, family, valueMultiplier, isDryRun); 441 } 442 443 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 444 LOG.debug("Deleting test subdirectory"); 445 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 446 } 447 return tool; 448 } 449 450 /** 451 * Confirm ImportTsv via data in online table. 452 */ 453 private static void validateTable(Configuration conf, TableName tableName, String family, 454 int valueMultiplier, boolean isDryRun) throws IOException { 455 456 LOG.debug("Validating table."); 457 Connection connection = ConnectionFactory.createConnection(conf); 458 Table table = connection.getTable(tableName); 459 boolean verified = false; 460 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 461 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 462 for (int i = 0; i < numRetries; i++) { 463 try { 464 Scan scan = new Scan(); 465 // Scan entire family. 466 scan.addFamily(Bytes.toBytes(family)); 467 ResultScanner resScanner = table.getScanner(scan); 468 int numRows = 0; 469 for (Result res : resScanner) { 470 numRows++; 471 assertEquals(2, res.size()); 472 List<Cell> kvs = res.listCells(); 473 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 474 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 475 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 476 assertTrue( 477 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 478 // Only one result set is expected, so let it loop. 479 } 480 if (isDryRun) { 481 assertEquals(0, numRows); 482 } else { 483 assertEquals(1, numRows); 484 } 485 verified = true; 486 break; 487 } catch (NullPointerException e) { 488 // If here, a cell was empty. Presume its because updates came in 489 // after the scanner had been opened. Wait a while and retry. 490 } 491 try { 492 Thread.sleep(pause); 493 } catch (InterruptedException e) { 494 // continue 495 } 496 } 497 table.close(); 498 connection.close(); 499 assertTrue(verified); 500 } 501 502 /** 503 * Confirm ImportTsv via HFiles on fs. 504 */ 505 private static void validateHFiles(FileSystem fs, String outputPath, String family, 506 int expectedKVCount) throws IOException { 507 // validate number and content of output columns 508 LOG.debug("Validating HFiles."); 509 Set<String> configFamilies = new HashSet<>(); 510 configFamilies.add(family); 511 Set<String> foundFamilies = new HashSet<>(); 512 int actualKVCount = 0; 513 for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { 514 String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); 515 String cf = elements[elements.length - 1]; 516 foundFamilies.add(cf); 517 assertTrue(String.format( 518 "HFile output contains a column family (%s) not present in input families (%s)", cf, 519 configFamilies), configFamilies.contains(cf)); 520 for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { 521 assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), 522 hfile.getLen() > 0); 523 // count the number of KVs from all the hfiles 524 if (expectedKVCount > -1) { 525 actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); 526 } 527 } 528 } 529 assertTrue(String.format("HFile output does not contain the input family '%s'.", family), 530 foundFamilies.contains(family)); 531 if (expectedKVCount > -1) { 532 assertTrue( 533 String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", 534 actualKVCount, expectedKVCount), 535 actualKVCount == expectedKVCount); 536 } 537 } 538 539 /** 540 * Method returns the total KVs in given hfile 541 * @param fs File System 542 * @param p HFile path 543 * @return KV count in the given hfile n 544 */ 545 private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { 546 Configuration conf = util.getConfiguration(); 547 HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); 548 HFileScanner scanner = reader.getScanner(conf, false, false); 549 scanner.seekTo(); 550 int count = 0; 551 do { 552 count++; 553 } while (scanner.next()); 554 reader.close(); 555 return count; 556 } 557}