001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.UUID;
033import org.apache.hadoop.conf.Configurable;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.io.hfile.CacheConfig;
053import org.apache.hadoop.hbase.io.hfile.HFile;
054import org.apache.hadoop.hbase.io.hfile.HFileScanner;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
060import org.apache.hadoop.mapreduce.Job;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.ExpectedException;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({VerySlowMapReduceTests.class, LargeTests.class})
076public class TestImportTsv implements Configurable {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestImportTsv.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class);
083  protected static final String NAME = TestImportTsv.class.getSimpleName();
084  protected static HBaseTestingUtility util = new HBaseTestingUtility();
085
086  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
087  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
088
089  /**
090   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
091   */
092  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
093
094  private final String FAMILY = "FAM";
095  private TableName tn;
096  private Map<String, String> args;
097
098  @Rule
099  public ExpectedException exception = ExpectedException.none();
100
101  public Configuration getConf() {
102    return util.getConfiguration();
103  }
104
105  public void setConf(Configuration conf) {
106    throw new IllegalArgumentException("setConf not supported");
107  }
108
109  @BeforeClass
110  public static void provisionCluster() throws Exception {
111    util.startMiniCluster();
112  }
113
114  @AfterClass
115  public static void releaseCluster() throws Exception {
116    util.shutdownMiniCluster();
117  }
118
119  @Before
120  public void setup() throws Exception {
121    tn = TableName.valueOf("test-" + UUID.randomUUID());
122    args = new HashMap<>();
123    // Prepare the arguments required for the test.
124    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
125    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
126  }
127
128  @Test
129  public void testMROnTable() throws Exception {
130    util.createTable(tn, FAMILY);
131    doMROnTableTest(null, 1);
132    util.deleteTable(tn);
133  }
134
135  @Test
136  public void testMROnTableWithTimestamp() throws Exception {
137    util.createTable(tn, FAMILY);
138    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
139    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
140    String data = "KEY,1234,VALUE1,VALUE2\n";
141
142    doMROnTableTest(data, 1);
143    util.deleteTable(tn);
144  }
145
146  @Test
147  public void testMROnTableWithCustomMapper()
148  throws Exception {
149    util.createTable(tn, FAMILY);
150    args.put(ImportTsv.MAPPER_CONF_KEY,
151        "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
152
153    doMROnTableTest(null, 3);
154    util.deleteTable(tn);
155  }
156
157  @Test
158  public void testBulkOutputWithoutAnExistingTable() throws Exception {
159    // Prepare the arguments required for the test.
160    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
161    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
162
163    doMROnTableTest(null, 3);
164    util.deleteTable(tn);
165  }
166
167  @Test
168  public void testBulkOutputWithAnExistingTable() throws Exception {
169    util.createTable(tn, FAMILY);
170
171    // Prepare the arguments required for the test.
172    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
173    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
174
175    doMROnTableTest(null, 3);
176    util.deleteTable(tn);
177  }
178
179  @Test
180  public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
181    util.createTable(tn, FAMILY);
182
183    // Prepare the arguments required for the test.
184    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
185    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
186    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
187    doMROnTableTest(null, 3);
188    util.deleteTable(tn);
189  }
190
191  @Test
192  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
193    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
194    String INPUT_FILE = "InputFile1.csv";
195    // Prepare the arguments required for the test.
196    String[] args =
197        new String[] {
198            "-D" + ImportTsv.MAPPER_CONF_KEY
199                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
200            "-D" + ImportTsv.COLUMNS_CONF_KEY
201                + "=HBASE_ROW_KEY,FAM:A,FAM:B",
202            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
203            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
204                tn.getNameAsString(),
205            INPUT_FILE
206            };
207    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
208        new Configuration(util.getConfiguration()),
209        new ImportTsv() {
210          @Override
211          public int run(String[] args) throws Exception {
212            Job job = createSubmittableJob(getConf(), args);
213            assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
214            assertTrue(job.getReducerClass().equals(TextSortReducer.class));
215            assertTrue(job.getMapOutputValueClass().equals(Text.class));
216            return 0;
217          }
218        }, args));
219    // Delete table created by createSubmittableJob.
220    util.deleteTable(tn);
221  }
222
223  @Test
224  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
225    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
226    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
227    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
228    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
229    doMROnTableTest(data, 4);
230    util.deleteTable(tn);
231  }
232
233  @Test
234  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
235    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
236
237    Configuration conf = new Configuration(util.getConfiguration());
238    conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
239    conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
240    conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
241    exception.expect(TableNotFoundException.class);
242    assertEquals("running test job configuration failed.", 0,
243        ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
244              @Override public int run(String[] args) throws Exception {
245                createSubmittableJob(getConf(), args);
246                return 0;
247              }
248            }, args));
249  }
250
251  @Test
252  public void testMRWithoutAnExistingTable() throws Exception {
253    String[] args =
254        new String[] { tn.getNameAsString(), "/inputFile" };
255
256    exception.expect(TableNotFoundException.class);
257    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
258        new Configuration(util.getConfiguration()),
259        new ImportTsv() {
260          @Override
261          public int run(String[] args) throws Exception {
262            createSubmittableJob(getConf(), args);
263            return 0;
264          }
265        }, args));
266  }
267
268  @Test
269  public void testJobConfigurationsWithDryMode() throws Exception {
270    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
271    String INPUT_FILE = "InputFile1.csv";
272    // Prepare the arguments required for the test.
273    String[] argsArray = new String[] {
274        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
275        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
276        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
277        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
278        tn.getNameAsString(),
279        INPUT_FILE };
280    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
281        new Configuration(util.getConfiguration()),
282        new ImportTsv() {
283          @Override
284          public int run(String[] args) throws Exception {
285            Job job = createSubmittableJob(getConf(), args);
286            assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
287            return 0;
288          }
289        }, argsArray));
290    // Delete table created by createSubmittableJob.
291    util.deleteTable(tn);
292  }
293
294  @Test
295  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
296    util.createTable(tn, FAMILY);
297    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
298    doMROnTableTest(null, 1);
299    // Dry mode should not delete an existing table. If it's not present,
300    // this will throw TableNotFoundException.
301    util.deleteTable(tn);
302  }
303
304  /**
305   * If table is not present in non-bulk mode, dry run should fail just like
306   * normal mode.
307   */
308  @Test
309  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
310    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
311    exception.expect(TableNotFoundException.class);
312    doMROnTableTest(null, 1);
313  }
314
315  @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
316    util.createTable(tn, FAMILY);
317    // Prepare the arguments required for the test.
318    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
319    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
320    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
321    doMROnTableTest(null, 1);
322    // Dry mode should not delete an existing table. If it's not present,
323    // this will throw TableNotFoundException.
324    util.deleteTable(tn);
325  }
326
327  /**
328   * If table is not present in bulk mode and create.table is not set to yes,
329   * import should fail with TableNotFoundException.
330   */
331  @Test
332  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
333      Exception {
334    // Prepare the arguments required for the test.
335    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
336    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
337    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
338    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
339    exception.expect(TableNotFoundException.class);
340    doMROnTableTest(null, 1);
341  }
342
343  @Test
344  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
345    // Prepare the arguments required for the test.
346    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
347    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
348    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
349    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
350    doMROnTableTest(null, 1);
351    // Verify temporary table was deleted.
352    exception.expect(TableNotFoundException.class);
353    util.deleteTable(tn);
354  }
355
356  /**
357   * If there are invalid data rows as inputs, then only those rows should be ignored.
358   */
359  @Test
360  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
361    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
362    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
363    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
364    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
365    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
366    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
367    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
368    doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
369    util.deleteTable(tn);
370  }
371
372  @Test
373  public void testSkipEmptyColumns() throws Exception {
374    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
375    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
376    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
377    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
378    args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
379    // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
380    String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
381    doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
382    util.deleteTable(tn);
383  }
384
385  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
386    return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1);
387  }
388
389  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
390      String family, String data, Map<String, String> args) throws Exception {
391    return doMROnTableTest(util, table, family, data, args, 1,-1);
392  }
393
394  /**
395   * Run an ImportTsv job and perform basic validation on the results.
396   * Returns the ImportTsv <code>Tool</code> instance so that other tests can
397   * inspect it for further validation as necessary. This method is static to
398   * insure non-reliance on instance's util/conf facilities.
399   * @param args Any arguments to pass BEFORE inputFile path is appended.
400   * @return The Tool instance used to run the test.
401   */
402  protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
403      String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount)
404  throws Exception {
405    Configuration conf = new Configuration(util.getConfiguration());
406
407    // populate input file
408    FileSystem fs = FileSystem.get(conf);
409    Path inputPath = fs.makeQualified(
410            new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
411    FSDataOutputStream op = fs.create(inputPath, true);
412    if (data == null) {
413      data = "KEY\u001bVALUE1\u001bVALUE2\n";
414    }
415    op.write(Bytes.toBytes(data));
416    op.close();
417    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
418
419    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
420      LOG.debug("Forcing combiner.");
421      conf.setInt("mapreduce.map.combine.minspills", 1);
422    }
423
424    // Build args array.
425    String[] argsArray = new String[args.size() + 2];
426    Iterator it = args.entrySet().iterator();
427    int i = 0;
428    while (it.hasNext()) {
429      Map.Entry pair = (Map.Entry) it.next();
430      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
431      i++;
432    }
433    argsArray[i] = table.getNameAsString();
434    argsArray[i + 1] = inputPath.toString();
435
436    // run the import
437    Tool tool = new ImportTsv();
438    LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray));
439    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
440
441    // Perform basic validation. If the input args did not include
442    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
443    // Otherwise, validate presence of hfiles.
444    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
445        "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
446    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
447      if (isDryRun) {
448        assertFalse(String.format("Dry run mode, %s should not have been created.",
449                 ImportTsv.BULK_OUTPUT_CONF_KEY),
450            fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
451      } else {
452        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount);
453      }
454    } else {
455      validateTable(conf, table, family, valueMultiplier, isDryRun);
456    }
457
458    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
459      LOG.debug("Deleting test subdirectory");
460      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
461    }
462    return tool;
463  }
464
465  /**
466   * Confirm ImportTsv via data in online table.
467   */
468  private static void validateTable(Configuration conf, TableName tableName,
469      String family, int valueMultiplier, boolean isDryRun) throws IOException {
470
471    LOG.debug("Validating table.");
472    Connection connection = ConnectionFactory.createConnection(conf);
473    Table table = connection.getTable(tableName);
474    boolean verified = false;
475    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
476    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
477    for (int i = 0; i < numRetries; i++) {
478      try {
479        Scan scan = new Scan();
480        // Scan entire family.
481        scan.addFamily(Bytes.toBytes(family));
482        ResultScanner resScanner = table.getScanner(scan);
483        int numRows = 0;
484        for (Result res : resScanner) {
485          numRows++;
486          assertEquals(2, res.size());
487          List<Cell> kvs = res.listCells();
488          assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY")));
489          assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY")));
490          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
491          assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
492          // Only one result set is expected, so let it loop.
493        }
494        if (isDryRun) {
495          assertEquals(0, numRows);
496        } else {
497          assertEquals(1, numRows);
498        }
499        verified = true;
500        break;
501      } catch (NullPointerException e) {
502        // If here, a cell was empty. Presume its because updates came in
503        // after the scanner had been opened. Wait a while and retry.
504      }
505      try {
506        Thread.sleep(pause);
507      } catch (InterruptedException e) {
508        // continue
509      }
510    }
511    table.close();
512    connection.close();
513    assertTrue(verified);
514  }
515
516  /**
517   * Confirm ImportTsv via HFiles on fs.
518   */
519  private static void validateHFiles(FileSystem fs, String outputPath, String family,
520      int expectedKVCount) throws IOException {
521    // validate number and content of output columns
522    LOG.debug("Validating HFiles.");
523    Set<String> configFamilies = new HashSet<>();
524    configFamilies.add(family);
525    Set<String> foundFamilies = new HashSet<>();
526    int actualKVCount = 0;
527    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
528      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
529      String cf = elements[elements.length - 1];
530      foundFamilies.add(cf);
531      assertTrue(
532        String.format(
533          "HFile output contains a column family (%s) not present in input families (%s)",
534          cf, configFamilies),
535          configFamilies.contains(cf));
536      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
537        assertTrue(
538          String.format("HFile %s appears to contain no data.", hfile.getPath()),
539          hfile.getLen() > 0);
540        // count the number of KVs from all the hfiles
541        if (expectedKVCount > -1) {
542          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
543        }
544      }
545    }
546    assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
547        foundFamilies.contains(family));
548    if (expectedKVCount > -1) {
549      assertTrue(String.format(
550        "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount,
551        expectedKVCount), actualKVCount == expectedKVCount);
552    }
553  }
554
555  /**
556   * Method returns the total KVs in given hfile
557   * @param fs File System
558   * @param p HFile path
559   * @return KV count in the given hfile
560   * @throws IOException
561   */
562  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
563    Configuration conf = util.getConfiguration();
564    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
565    reader.loadFileInfo();
566    HFileScanner scanner = reader.getScanner(false, false);
567    scanner.seekTo();
568    int count = 0;
569    do {
570      count++;
571    } while (scanner.next());
572    reader.close();
573    return count;
574  }
575}
576