001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import org.apache.hadoop.conf.Configurable;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileScanner;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
059import org.apache.hadoop.mapreduce.Job;
060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
061import org.apache.hadoop.util.Tool;
062import org.apache.hadoop.util.ToolRunner;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.ExpectedException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074@Category({ VerySlowMapReduceTests.class, LargeTests.class })
075public class TestImportTsv implements Configurable {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestImportTsv.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class);
082  protected static final String NAME = TestImportTsv.class.getSimpleName();
083  protected static HBaseTestingUtility util = new HBaseTestingUtility();
084
085  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
086  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
087
088  /**
089   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
090   */
091  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
092
093  private final String FAMILY = "FAM";
094  private TableName tn;
095  private Map<String, String> args;
096
097  @Rule
098  public ExpectedException exception = ExpectedException.none();
099
100  public Configuration getConf() {
101    return util.getConfiguration();
102  }
103
104  public void setConf(Configuration conf) {
105    throw new IllegalArgumentException("setConf not supported");
106  }
107
108  @BeforeClass
109  public static void provisionCluster() throws Exception {
110    util.startMiniCluster();
111  }
112
113  @AfterClass
114  public static void releaseCluster() throws Exception {
115    util.shutdownMiniCluster();
116  }
117
118  @Before
119  public void setup() throws Exception {
120    tn = TableName.valueOf("test-" + util.getRandomUUID());
121    args = new HashMap<>();
122    // Prepare the arguments required for the test.
123    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
124    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
125  }
126
127  @Test
128  public void testMROnTable() throws Exception {
129    util.createTable(tn, FAMILY);
130    doMROnTableTest(null, 1);
131    util.deleteTable(tn);
132  }
133
134  @Test
135  public void testMROnTableWithTimestamp() throws Exception {
136    util.createTable(tn, FAMILY);
137    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
138    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
139    String data = "KEY,1234,VALUE1,VALUE2\n";
140
141    doMROnTableTest(data, 1);
142    util.deleteTable(tn);
143  }
144
145  @Test
146  public void testMROnTableWithCustomMapper() throws Exception {
147    util.createTable(tn, FAMILY);
148    args.put(ImportTsv.MAPPER_CONF_KEY,
149      "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
150
151    doMROnTableTest(null, 3);
152    util.deleteTable(tn);
153  }
154
155  @Test
156  public void testBulkOutputWithoutAnExistingTable() throws Exception {
157    // Prepare the arguments required for the test.
158    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
159    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
160
161    doMROnTableTest(null, 3);
162    util.deleteTable(tn);
163  }
164
165  @Test
166  public void testBulkOutputWithAnExistingTable() throws Exception {
167    util.createTable(tn, FAMILY);
168
169    // Prepare the arguments required for the test.
170    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
171    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
172
173    doMROnTableTest(null, 3);
174    util.deleteTable(tn);
175  }
176
177  @Test
178  public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
179    util.createTable(tn, FAMILY);
180
181    // Prepare the arguments required for the test.
182    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
183    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
184    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
185    doMROnTableTest(null, 3);
186    util.deleteTable(tn);
187  }
188
189  @Test
190  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
191    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
192    String INPUT_FILE = "InputFile1.csv";
193    // Prepare the arguments required for the test.
194    String[] args = new String[] {
195      "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
196      "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
197      "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
198      "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(),
199      INPUT_FILE };
200    assertEquals("running test job configuration failed.", 0,
201      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
202        @Override
203        public int run(String[] args) throws Exception {
204          Job job = createSubmittableJob(getConf(), args);
205          assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
206          assertTrue(job.getReducerClass().equals(TextSortReducer.class));
207          assertTrue(job.getMapOutputValueClass().equals(Text.class));
208          return 0;
209        }
210      }, args));
211    // Delete table created by createSubmittableJob.
212    util.deleteTable(tn);
213  }
214
215  @Test
216  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
217    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
218    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
219    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
220    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
221    doMROnTableTest(data, 4);
222    util.deleteTable(tn);
223  }
224
225  @Test
226  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
227    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
228
229    Configuration conf = new Configuration(util.getConfiguration());
230    conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
231    conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
232    conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
233    exception.expect(TableNotFoundException.class);
234    assertEquals("running test job configuration failed.", 0,
235      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
236        @Override
237        public int run(String[] args) throws Exception {
238          createSubmittableJob(getConf(), args);
239          return 0;
240        }
241      }, args));
242  }
243
244  @Test
245  public void testMRWithoutAnExistingTable() throws Exception {
246    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
247
248    exception.expect(TableNotFoundException.class);
249    assertEquals("running test job configuration failed.", 0,
250      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
251        @Override
252        public int run(String[] args) throws Exception {
253          createSubmittableJob(getConf(), args);
254          return 0;
255        }
256      }, args));
257  }
258
259  @Test
260  public void testJobConfigurationsWithDryMode() throws Exception {
261    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
262    String INPUT_FILE = "InputFile1.csv";
263    // Prepare the arguments required for the test.
264    String[] argsArray =
265      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
266        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
267        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
268        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE };
269    assertEquals("running test job configuration failed.", 0,
270      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
271        @Override
272        public int run(String[] args) throws Exception {
273          Job job = createSubmittableJob(getConf(), args);
274          assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
275          return 0;
276        }
277      }, argsArray));
278    // Delete table created by createSubmittableJob.
279    util.deleteTable(tn);
280  }
281
282  @Test
283  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
284    util.createTable(tn, FAMILY);
285    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
286    doMROnTableTest(null, 1);
287    // Dry mode should not delete an existing table. If it's not present,
288    // this will throw TableNotFoundException.
289    util.deleteTable(tn);
290  }
291
292  /**
293   * If table is not present in non-bulk mode, dry run should fail just like normal mode.
294   */
295  @Test
296  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
297    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
298    exception.expect(TableNotFoundException.class);
299    doMROnTableTest(null, 1);
300  }
301
302  @Test
303  public void testDryModeWithBulkOutputAndTableExists() throws Exception {
304    util.createTable(tn, FAMILY);
305    // Prepare the arguments required for the test.
306    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
307    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
308    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
309    doMROnTableTest(null, 1);
310    // Dry mode should not delete an existing table. If it's not present,
311    // this will throw TableNotFoundException.
312    util.deleteTable(tn);
313  }
314
315  /**
316   * If table is not present in bulk mode and create.table is not set to yes, import should fail
317   * with TableNotFoundException.
318   */
319  @Test
320  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception {
321    // Prepare the arguments required for the test.
322    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
323    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
324    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
325    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
326    exception.expect(TableNotFoundException.class);
327    doMROnTableTest(null, 1);
328  }
329
330  @Test
331  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
332    // Prepare the arguments required for the test.
333    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
334    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
335    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
336    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
337    doMROnTableTest(null, 1);
338    // Verify temporary table was deleted.
339    exception.expect(TableNotFoundException.class);
340    util.deleteTable(tn);
341  }
342
343  /**
344   * If there are invalid data rows as inputs, then only those rows should be ignored.
345   */
346  @Test
347  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
348    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
349    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
350    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
351    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
352    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
353    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
354    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
355    doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
356    util.deleteTable(tn);
357  }
358
359  @Test
360  public void testSkipEmptyColumns() throws Exception {
361    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
362    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
363    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
364    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
365    args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
366    // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
367    String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
368    doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
369    util.deleteTable(tn);
370  }
371
372  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
373    return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1);
374  }
375
376  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family,
377    String data, Map<String, String> args) throws Exception {
378    return doMROnTableTest(util, table, family, data, args, 1, -1);
379  }
380
381  /**
382   * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv
383   * <code>Tool</code> instance so that other tests can inspect it for further validation as
384   * necessary. This method is static to insure non-reliance on instance's util/conf facilities.
385   * @param args Any arguments to pass BEFORE inputFile path is appended.
386   * @return The Tool instance used to run the test.
387   */
388  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, String family,
389    String data, Map<String, String> args, int valueMultiplier, int expectedKVCount)
390    throws Exception {
391    Configuration conf = new Configuration(util.getConfiguration());
392
393    // populate input file
394    FileSystem fs = FileSystem.get(conf);
395    Path inputPath =
396      fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
397    FSDataOutputStream op = fs.create(inputPath, true);
398    if (data == null) {
399      data = "KEY\u001bVALUE1\u001bVALUE2\n";
400    }
401    op.write(Bytes.toBytes(data));
402    op.close();
403    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
404
405    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
406      LOG.debug("Forcing combiner.");
407      conf.setInt("mapreduce.map.combine.minspills", 1);
408    }
409
410    // Build args array.
411    String[] argsArray = new String[args.size() + 2];
412    Iterator it = args.entrySet().iterator();
413    int i = 0;
414    while (it.hasNext()) {
415      Map.Entry pair = (Map.Entry) it.next();
416      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
417      i++;
418    }
419    argsArray[i] = table.getNameAsString();
420    argsArray[i + 1] = inputPath.toString();
421
422    // run the import
423    Tool tool = new ImportTsv();
424    LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray));
425    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
426
427    // Perform basic validation. If the input args did not include
428    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
429    // Otherwise, validate presence of hfiles.
430    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY)
431      && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
432    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
433      if (isDryRun) {
434        assertFalse(String.format("Dry run mode, %s should not have been created.",
435          ImportTsv.BULK_OUTPUT_CONF_KEY), fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
436      } else {
437        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount);
438      }
439    } else {
440      validateTable(conf, table, family, valueMultiplier, isDryRun);
441    }
442
443    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
444      LOG.debug("Deleting test subdirectory");
445      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
446    }
447    return tool;
448  }
449
450  /**
451   * Confirm ImportTsv via data in online table.
452   */
453  private static void validateTable(Configuration conf, TableName tableName, String family,
454    int valueMultiplier, boolean isDryRun) throws IOException {
455
456    LOG.debug("Validating table.");
457    Connection connection = ConnectionFactory.createConnection(conf);
458    Table table = connection.getTable(tableName);
459    boolean verified = false;
460    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
461    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
462    for (int i = 0; i < numRetries; i++) {
463      try {
464        Scan scan = new Scan();
465        // Scan entire family.
466        scan.addFamily(Bytes.toBytes(family));
467        ResultScanner resScanner = table.getScanner(scan);
468        int numRows = 0;
469        for (Result res : resScanner) {
470          numRows++;
471          assertEquals(2, res.size());
472          List<Cell> kvs = res.listCells();
473          assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY")));
474          assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY")));
475          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
476          assertTrue(
477            CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
478          // Only one result set is expected, so let it loop.
479        }
480        if (isDryRun) {
481          assertEquals(0, numRows);
482        } else {
483          assertEquals(1, numRows);
484        }
485        verified = true;
486        break;
487      } catch (NullPointerException e) {
488        // If here, a cell was empty. Presume its because updates came in
489        // after the scanner had been opened. Wait a while and retry.
490      }
491      try {
492        Thread.sleep(pause);
493      } catch (InterruptedException e) {
494        // continue
495      }
496    }
497    table.close();
498    connection.close();
499    assertTrue(verified);
500  }
501
502  /**
503   * Confirm ImportTsv via HFiles on fs.
504   */
505  private static void validateHFiles(FileSystem fs, String outputPath, String family,
506    int expectedKVCount) throws IOException {
507    // validate number and content of output columns
508    LOG.debug("Validating HFiles.");
509    Set<String> configFamilies = new HashSet<>();
510    configFamilies.add(family);
511    Set<String> foundFamilies = new HashSet<>();
512    int actualKVCount = 0;
513    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
514      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
515      String cf = elements[elements.length - 1];
516      foundFamilies.add(cf);
517      assertTrue(String.format(
518        "HFile output contains a column family (%s) not present in input families (%s)", cf,
519        configFamilies), configFamilies.contains(cf));
520      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
521        assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()),
522          hfile.getLen() > 0);
523        // count the number of KVs from all the hfiles
524        if (expectedKVCount > -1) {
525          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
526        }
527      }
528    }
529    assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
530      foundFamilies.contains(family));
531    if (expectedKVCount > -1) {
532      assertTrue(
533        String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>",
534          actualKVCount, expectedKVCount),
535        actualKVCount == expectedKVCount);
536    }
537  }
538
539  /**
540   * Method returns the total KVs in given hfile
541   * @param fs File System
542   * @param p  HFile path
543   * @return KV count in the given hfile n
544   */
545  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
546    Configuration conf = util.getConfiguration();
547    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
548    HFileScanner scanner = reader.getScanner(conf, false, false);
549    scanner.seekTo();
550    int count = 0;
551    do {
552      count++;
553    } while (scanner.next());
554    reader.close();
555    return count;
556  }
557}