001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import org.apache.hadoop.conf.Configurable;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileScanner;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.io.Text;
058import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
059import org.apache.hadoop.mapreduce.Job;
060import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
061import org.apache.hadoop.util.Tool;
062import org.apache.hadoop.util.ToolRunner;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.ExpectedException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074@Category({VerySlowMapReduceTests.class, LargeTests.class})
075public class TestImportTsv implements Configurable {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestImportTsv.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class);
082  protected static final String NAME = TestImportTsv.class.getSimpleName();
083  protected static HBaseTestingUtility util = new HBaseTestingUtility();
084
085  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
086  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
087
088  /**
089   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
090   */
091  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
092
093  private final String FAMILY = "FAM";
094  private TableName tn;
095  private Map<String, String> args;
096
097  @Rule
098  public ExpectedException exception = ExpectedException.none();
099
100  public Configuration getConf() {
101    return util.getConfiguration();
102  }
103
104  public void setConf(Configuration conf) {
105    throw new IllegalArgumentException("setConf not supported");
106  }
107
108  @BeforeClass
109  public static void provisionCluster() throws Exception {
110    util.startMiniCluster();
111  }
112
113  @AfterClass
114  public static void releaseCluster() throws Exception {
115    util.shutdownMiniCluster();
116  }
117
118  @Before
119  public void setup() throws Exception {
120    tn = TableName.valueOf("test-" + util.getRandomUUID());
121    args = new HashMap<>();
122    // Prepare the arguments required for the test.
123    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
124    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
125  }
126
127  @Test
128  public void testMROnTable() throws Exception {
129    util.createTable(tn, FAMILY);
130    doMROnTableTest(null, 1);
131    util.deleteTable(tn);
132  }
133
134  @Test
135  public void testMROnTableWithTimestamp() throws Exception {
136    util.createTable(tn, FAMILY);
137    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
138    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
139    String data = "KEY,1234,VALUE1,VALUE2\n";
140
141    doMROnTableTest(data, 1);
142    util.deleteTable(tn);
143  }
144
145  @Test
146  public void testMROnTableWithCustomMapper()
147  throws Exception {
148    util.createTable(tn, FAMILY);
149    args.put(ImportTsv.MAPPER_CONF_KEY,
150        "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
151
152    doMROnTableTest(null, 3);
153    util.deleteTable(tn);
154  }
155
156  @Test
157  public void testBulkOutputWithoutAnExistingTable() throws Exception {
158    // Prepare the arguments required for the test.
159    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
160    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
161
162    doMROnTableTest(null, 3);
163    util.deleteTable(tn);
164  }
165
166  @Test
167  public void testBulkOutputWithAnExistingTable() throws Exception {
168    util.createTable(tn, FAMILY);
169
170    // Prepare the arguments required for the test.
171    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
172    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
173
174    doMROnTableTest(null, 3);
175    util.deleteTable(tn);
176  }
177
178  @Test
179  public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
180    util.createTable(tn, FAMILY);
181
182    // Prepare the arguments required for the test.
183    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
184    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
185    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
186    doMROnTableTest(null, 3);
187    util.deleteTable(tn);
188  }
189
190  @Test
191  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
192    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
193    String INPUT_FILE = "InputFile1.csv";
194    // Prepare the arguments required for the test.
195    String[] args =
196        new String[] {
197            "-D" + ImportTsv.MAPPER_CONF_KEY
198                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
199            "-D" + ImportTsv.COLUMNS_CONF_KEY
200                + "=HBASE_ROW_KEY,FAM:A,FAM:B",
201            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
202            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
203                tn.getNameAsString(),
204            INPUT_FILE
205            };
206    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
207        new Configuration(util.getConfiguration()),
208        new ImportTsv() {
209          @Override
210          public int run(String[] args) throws Exception {
211            Job job = createSubmittableJob(getConf(), args);
212            assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
213            assertTrue(job.getReducerClass().equals(TextSortReducer.class));
214            assertTrue(job.getMapOutputValueClass().equals(Text.class));
215            return 0;
216          }
217        }, args));
218    // Delete table created by createSubmittableJob.
219    util.deleteTable(tn);
220  }
221
222  @Test
223  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
224    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
225    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
226    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
227    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
228    doMROnTableTest(data, 4);
229    util.deleteTable(tn);
230  }
231
232  @Test
233  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
234    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
235
236    Configuration conf = new Configuration(util.getConfiguration());
237    conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
238    conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
239    conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
240    exception.expect(TableNotFoundException.class);
241    assertEquals("running test job configuration failed.", 0,
242        ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
243              @Override public int run(String[] args) throws Exception {
244                createSubmittableJob(getConf(), args);
245                return 0;
246              }
247            }, args));
248  }
249
250  @Test
251  public void testMRWithoutAnExistingTable() throws Exception {
252    String[] args =
253        new String[] { tn.getNameAsString(), "/inputFile" };
254
255    exception.expect(TableNotFoundException.class);
256    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
257        new Configuration(util.getConfiguration()),
258        new ImportTsv() {
259          @Override
260          public int run(String[] args) throws Exception {
261            createSubmittableJob(getConf(), args);
262            return 0;
263          }
264        }, args));
265  }
266
267  @Test
268  public void testJobConfigurationsWithDryMode() throws Exception {
269    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
270    String INPUT_FILE = "InputFile1.csv";
271    // Prepare the arguments required for the test.
272    String[] argsArray = new String[] {
273        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
274        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
275        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
276        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
277        tn.getNameAsString(),
278        INPUT_FILE };
279    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
280        new Configuration(util.getConfiguration()),
281        new ImportTsv() {
282          @Override
283          public int run(String[] args) throws Exception {
284            Job job = createSubmittableJob(getConf(), args);
285            assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
286            return 0;
287          }
288        }, argsArray));
289    // Delete table created by createSubmittableJob.
290    util.deleteTable(tn);
291  }
292
293  @Test
294  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
295    util.createTable(tn, FAMILY);
296    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
297    doMROnTableTest(null, 1);
298    // Dry mode should not delete an existing table. If it's not present,
299    // this will throw TableNotFoundException.
300    util.deleteTable(tn);
301  }
302
303  /**
304   * If table is not present in non-bulk mode, dry run should fail just like
305   * normal mode.
306   */
307  @Test
308  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
309    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
310    exception.expect(TableNotFoundException.class);
311    doMROnTableTest(null, 1);
312  }
313
314  @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
315    util.createTable(tn, FAMILY);
316    // Prepare the arguments required for the test.
317    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
318    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
319    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
320    doMROnTableTest(null, 1);
321    // Dry mode should not delete an existing table. If it's not present,
322    // this will throw TableNotFoundException.
323    util.deleteTable(tn);
324  }
325
326  /**
327   * If table is not present in bulk mode and create.table is not set to yes,
328   * import should fail with TableNotFoundException.
329   */
330  @Test
331  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
332      Exception {
333    // Prepare the arguments required for the test.
334    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
335    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
336    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
337    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
338    exception.expect(TableNotFoundException.class);
339    doMROnTableTest(null, 1);
340  }
341
342  @Test
343  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
344    // Prepare the arguments required for the test.
345    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
346    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
347    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
348    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
349    doMROnTableTest(null, 1);
350    // Verify temporary table was deleted.
351    exception.expect(TableNotFoundException.class);
352    util.deleteTable(tn);
353  }
354
355  /**
356   * If there are invalid data rows as inputs, then only those rows should be ignored.
357   */
358  @Test
359  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
360    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
361    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
362    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
363    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
364    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
365    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
366    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
367    doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
368    util.deleteTable(tn);
369  }
370
371  @Test
372  public void testSkipEmptyColumns() throws Exception {
373    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
374    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
375    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
376    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
377    args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
378    // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
379    String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
380    doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
381    util.deleteTable(tn);
382  }
383
384  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
385    return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1);
386  }
387
388  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
389      String family, String data, Map<String, String> args) throws Exception {
390    return doMROnTableTest(util, table, family, data, args, 1,-1);
391  }
392
393  /**
394   * Run an ImportTsv job and perform basic validation on the results.
395   * Returns the ImportTsv <code>Tool</code> instance so that other tests can
396   * inspect it for further validation as necessary. This method is static to
397   * insure non-reliance on instance's util/conf facilities.
398   * @param args Any arguments to pass BEFORE inputFile path is appended.
399   * @return The Tool instance used to run the test.
400   */
401  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
402      String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount)
403  throws Exception {
404    Configuration conf = new Configuration(util.getConfiguration());
405
406    // populate input file
407    FileSystem fs = FileSystem.get(conf);
408    Path inputPath = fs.makeQualified(
409            new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
410    FSDataOutputStream op = fs.create(inputPath, true);
411    if (data == null) {
412      data = "KEY\u001bVALUE1\u001bVALUE2\n";
413    }
414    op.write(Bytes.toBytes(data));
415    op.close();
416    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
417
418    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
419      LOG.debug("Forcing combiner.");
420      conf.setInt("mapreduce.map.combine.minspills", 1);
421    }
422
423    // Build args array.
424    String[] argsArray = new String[args.size() + 2];
425    Iterator it = args.entrySet().iterator();
426    int i = 0;
427    while (it.hasNext()) {
428      Map.Entry pair = (Map.Entry) it.next();
429      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
430      i++;
431    }
432    argsArray[i] = table.getNameAsString();
433    argsArray[i + 1] = inputPath.toString();
434
435    // run the import
436    Tool tool = new ImportTsv();
437    LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray));
438    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
439
440    // Perform basic validation. If the input args did not include
441    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
442    // Otherwise, validate presence of hfiles.
443    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
444        "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
445    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
446      if (isDryRun) {
447        assertFalse(String.format("Dry run mode, %s should not have been created.",
448                 ImportTsv.BULK_OUTPUT_CONF_KEY),
449            fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
450      } else {
451        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount);
452      }
453    } else {
454      validateTable(conf, table, family, valueMultiplier, isDryRun);
455    }
456
457    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
458      LOG.debug("Deleting test subdirectory");
459      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
460    }
461    return tool;
462  }
463
464  /**
465   * Confirm ImportTsv via data in online table.
466   */
467  private static void validateTable(Configuration conf, TableName tableName,
468      String family, int valueMultiplier, boolean isDryRun) throws IOException {
469
470    LOG.debug("Validating table.");
471    Connection connection = ConnectionFactory.createConnection(conf);
472    Table table = connection.getTable(tableName);
473    boolean verified = false;
474    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
475    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
476    for (int i = 0; i < numRetries; i++) {
477      try {
478        Scan scan = new Scan();
479        // Scan entire family.
480        scan.addFamily(Bytes.toBytes(family));
481        ResultScanner resScanner = table.getScanner(scan);
482        int numRows = 0;
483        for (Result res : resScanner) {
484          numRows++;
485          assertEquals(2, res.size());
486          List<Cell> kvs = res.listCells();
487          assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY")));
488          assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY")));
489          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
490          assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
491          // Only one result set is expected, so let it loop.
492        }
493        if (isDryRun) {
494          assertEquals(0, numRows);
495        } else {
496          assertEquals(1, numRows);
497        }
498        verified = true;
499        break;
500      } catch (NullPointerException e) {
501        // If here, a cell was empty. Presume its because updates came in
502        // after the scanner had been opened. Wait a while and retry.
503      }
504      try {
505        Thread.sleep(pause);
506      } catch (InterruptedException e) {
507        // continue
508      }
509    }
510    table.close();
511    connection.close();
512    assertTrue(verified);
513  }
514
515  /**
516   * Confirm ImportTsv via HFiles on fs.
517   */
518  private static void validateHFiles(FileSystem fs, String outputPath, String family,
519      int expectedKVCount) throws IOException {
520    // validate number and content of output columns
521    LOG.debug("Validating HFiles.");
522    Set<String> configFamilies = new HashSet<>();
523    configFamilies.add(family);
524    Set<String> foundFamilies = new HashSet<>();
525    int actualKVCount = 0;
526    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
527      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
528      String cf = elements[elements.length - 1];
529      foundFamilies.add(cf);
530      assertTrue(
531        String.format(
532          "HFile output contains a column family (%s) not present in input families (%s)",
533          cf, configFamilies),
534          configFamilies.contains(cf));
535      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
536        assertTrue(
537          String.format("HFile %s appears to contain no data.", hfile.getPath()),
538          hfile.getLen() > 0);
539        // count the number of KVs from all the hfiles
540        if (expectedKVCount > -1) {
541          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
542        }
543      }
544    }
545    assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
546        foundFamilies.contains(family));
547    if (expectedKVCount > -1) {
548      assertTrue(String.format(
549        "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount,
550        expectedKVCount), actualKVCount == expectedKVCount);
551    }
552  }
553
554  /**
555   * Method returns the total KVs in given hfile
556   * @param fs File System
557   * @param p HFile path
558   * @return KV count in the given hfile
559   * @throws IOException
560   */
561  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
562    Configuration conf = util.getConfiguration();
563    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
564    reader.loadFileInfo();
565    HFileScanner scanner = reader.getScanner(false, false);
566    scanner.seekTo();
567    int count = 0;
568    do {
569      count++;
570    } while (scanner.next());
571    reader.close();
572    return count;
573  }
574}
575