001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import org.apache.hadoop.conf.Configurable;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileScanner;
054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
060import org.apache.hadoop.mapreduce.Job;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.junit.jupiter.api.AfterAll;
065import org.junit.jupiter.api.BeforeAll;
066import org.junit.jupiter.api.BeforeEach;
067import org.junit.jupiter.api.Tag;
068import org.junit.jupiter.api.Test;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Tag(VerySlowMapReduceTests.TAG)
073@Tag(LargeTests.TAG)
074public class TestImportTsv implements Configurable {
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class);
077  protected static final String NAME = TestImportTsv.class.getSimpleName();
078  protected static HBaseTestingUtil util = new HBaseTestingUtil();
079
080  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
081  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
082
083  /**
084   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
085   */
086  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
087
088  private final String FAMILY = "FAM";
089  private TableName tn;
090  private Map<String, String> args;
091
092  public Configuration getConf() {
093    return util.getConfiguration();
094  }
095
096  public void setConf(Configuration conf) {
097    throw new IllegalArgumentException("setConf not supported");
098  }
099
100  @BeforeAll
101  public static void provisionCluster() throws Exception {
102    util.startMiniCluster();
103  }
104
105  @AfterAll
106  public static void releaseCluster() throws Exception {
107    util.shutdownMiniCluster();
108  }
109
110  @BeforeEach
111  public void setup() throws Exception {
112    tn = TableName.valueOf("test-" + util.getRandomUUID());
113    args = new HashMap<>();
114    // Prepare the arguments required for the test.
115    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
116    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
117  }
118
119  @Test
120  public void testMROnTable() throws Exception {
121    util.createTable(tn, FAMILY);
122    doMROnTableTest(null, 1);
123    util.deleteTable(tn);
124  }
125
126  @Test
127  public void testMROnTableWithTimestamp() throws Exception {
128    util.createTable(tn, FAMILY);
129    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
130    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
131    String data = "KEY,1234,VALUE1,VALUE2\n";
132
133    doMROnTableTest(data, 1);
134    util.deleteTable(tn);
135  }
136
137  @Test
138  public void testMROnTableWithCustomMapper() throws Exception {
139    util.createTable(tn, FAMILY);
140    args.put(ImportTsv.MAPPER_CONF_KEY,
141      "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
142
143    doMROnTableTest(null, 3);
144    util.deleteTable(tn);
145  }
146
147  @Test
148  public void testBulkOutputWithoutAnExistingTable() throws Exception {
149    // Prepare the arguments required for the test.
150    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
151    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
152
153    doMROnTableTest(null, 3);
154    util.deleteTable(tn);
155  }
156
157  @Test
158  public void testBulkOutputWithAnExistingTable() throws Exception {
159    util.createTable(tn, FAMILY);
160
161    // Prepare the arguments required for the test.
162    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
163    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
164
165    doMROnTableTest(null, 3);
166    util.deleteTable(tn);
167  }
168
169  @Test
170  public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
171    util.createTable(tn, FAMILY);
172
173    // Prepare the arguments required for the test.
174    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
175    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
176    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
177    doMROnTableTest(null, 3);
178    util.deleteTable(tn);
179  }
180
181  @Test
182  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
183    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
184    String INPUT_FILE = "InputFile1.csv";
185    // Prepare the arguments required for the test.
186    String[] args = new String[] {
187      "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
188      "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
189      "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
190      "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(),
191      INPUT_FILE };
192    assertEquals(0, ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
193      @Override
194      public int run(String[] args) throws Exception {
195        Job job = createSubmittableJob(getConf(), args);
196        assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
197        assertTrue(job.getReducerClass().equals(TextSortReducer.class));
198        assertTrue(job.getMapOutputValueClass().equals(Text.class));
199        return 0;
200      }
201    }, args), "running test job configuration failed.");
202    // Delete table created by createSubmittableJob.
203    util.deleteTable(tn);
204  }
205
206  @Test
207  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
208    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
209    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
210    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
211    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
212    doMROnTableTest(data, 4);
213    util.deleteTable(tn);
214  }
215
216  @Test
217  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
218    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
219
220    Configuration conf = new Configuration(util.getConfiguration());
221    conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
222    conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
223    conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
224    assertThrows(TableNotFoundException.class, () -> {
225      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
226        @Override
227        public int run(String[] args) throws Exception {
228          createSubmittableJob(getConf(), args);
229          return 0;
230        }
231      }, args);
232    }, "running test job configuration failed.");
233  }
234
235  @Test
236  public void testMRNoMatchedColumnFamily() throws Exception {
237    util.createTable(tn, FAMILY);
238
239    String[] args = new String[] {
240      "-D" + ImportTsv.COLUMNS_CONF_KEY
241        + "=HBASE_ROW_KEY,FAM:A,FAM01_ERROR:A,FAM01_ERROR:B,FAM02_ERROR:C",
242      tn.getNameAsString(), "/inputFile" };
243    assertThrows(NoSuchColumnFamilyException.class, () -> {
244      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
245        @Override
246        public int run(String[] args) throws Exception {
247          createSubmittableJob(getConf(), args);
248          return 0;
249        }
250      }, args);
251    }, "running test job configuration failed.");
252
253    util.deleteTable(tn);
254  }
255
256  @Test
257  public void testMRWithoutAnExistingTable() throws Exception {
258    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
259
260    assertThrows(TableNotFoundException.class, () -> {
261      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
262        @Override
263        public int run(String[] args) throws Exception {
264          createSubmittableJob(getConf(), args);
265          return 0;
266        }
267      }, args);
268    }, "running test job configuration failed.");
269  }
270
271  @Test
272  public void testJobConfigurationsWithDryMode() throws Exception {
273    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
274    String INPUT_FILE = "InputFile1.csv";
275    // Prepare the arguments required for the test.
276    String[] argsArray =
277      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
278        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
279        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
280        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE };
281    assertEquals(0, ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
282      @Override
283      public int run(String[] args) throws Exception {
284        Job job = createSubmittableJob(getConf(), args);
285        assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
286        return 0;
287      }
288    }, argsArray), "running test job configuration failed.");
289    // Delete table created by createSubmittableJob.
290    util.deleteTable(tn);
291  }
292
293  @Test
294  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
295    util.createTable(tn, FAMILY);
296    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
297    doMROnTableTest(null, 1);
298    // Dry mode should not delete an existing table. If it's not present,
299    // this will throw TableNotFoundException.
300    util.deleteTable(tn);
301  }
302
303  /**
304   * If table is not present in non-bulk mode, dry run should fail just like normal mode.
305   */
306  @Test
307  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
308    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
309    assertThrows(TableNotFoundException.class, () -> doMROnTableTest(null, 1));
310  }
311
312  @Test
313  public void testDryModeWithBulkOutputAndTableExists() throws Exception {
314    util.createTable(tn, FAMILY);
315    // Prepare the arguments required for the test.
316    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
317    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
318    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
319    doMROnTableTest(null, 1);
320    // Dry mode should not delete an existing table. If it's not present,
321    // this will throw TableNotFoundException.
322    util.deleteTable(tn);
323  }
324
325  /**
326   * If table is not present in bulk mode and create.table is not set to yes, import should fail
327   * with TableNotFoundException.
328   */
329  @Test
330  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception {
331    // Prepare the arguments required for the test.
332    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
333    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
334    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
335    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
336    assertThrows(TableNotFoundException.class, () -> doMROnTableTest(null, 1));
337  }
338
339  @Test
340  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
341    // Prepare the arguments required for the test.
342    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
343    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
344    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
345    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
346    doMROnTableTest(null, 1);
347    // Verify temporary table was deleted.
348    assertThrows(TableNotFoundException.class, () -> util.deleteTable(tn));
349  }
350
351  /**
352   * If there are invalid data rows as inputs, then only those rows should be ignored.
353   */
354  @Test
355  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
356    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
357    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
358    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
359    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
360    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
361    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
362    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
363    doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
364    util.deleteTable(tn);
365  }
366
367  @Test
368  public void testSkipEmptyColumns() throws Exception {
369    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
370    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
371    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
372    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
373    args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
374    // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
375    String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
376    doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
377    util.deleteTable(tn);
378  }
379
380  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
381    return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1);
382  }
383
384  protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family,
385    String data, Map<String, String> args) throws Exception {
386    return doMROnTableTest(util, table, family, data, args, 1, -1);
387  }
388
389  /**
390   * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv
391   * <code>Tool</code> instance so that other tests can inspect it for further validation as
392   * necessary. This method is static to insure non-reliance on instance's util/conf facilities.
393   * @param args Any arguments to pass BEFORE inputFile path is appended.
394   * @return The Tool instance used to run the test.
395   */
396  protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family,
397    String data, Map<String, String> args, int valueMultiplier, int expectedKVCount)
398    throws Exception {
399    Configuration conf = new Configuration(util.getConfiguration());
400
401    // populate input file
402    FileSystem fs = FileSystem.get(conf);
403    Path inputPath =
404      fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
405    FSDataOutputStream op = fs.create(inputPath, true);
406    if (data == null) {
407      data = "KEY\u001bVALUE1\u001bVALUE2\n";
408    }
409    op.write(Bytes.toBytes(data));
410    op.close();
411    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
412
413    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
414      LOG.debug("Forcing combiner.");
415      conf.setInt("mapreduce.map.combine.minspills", 1);
416    }
417
418    // Build args array.
419    String[] argsArray = new String[args.size() + 2];
420    Iterator it = args.entrySet().iterator();
421    int i = 0;
422    while (it.hasNext()) {
423      Map.Entry pair = (Map.Entry) it.next();
424      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
425      i++;
426    }
427    argsArray[i] = table.getNameAsString();
428    argsArray[i + 1] = inputPath.toString();
429
430    // run the import
431    Tool tool = new ImportTsv();
432    LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray));
433    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
434
435    // Perform basic validation. If the input args did not include
436    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
437    // Otherwise, validate presence of hfiles.
438    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY)
439      && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
440    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
441      if (isDryRun) {
442        assertFalse(fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)), String.format(
443          "Dry run mode, %s should not have been created.", ImportTsv.BULK_OUTPUT_CONF_KEY));
444      } else {
445        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount);
446      }
447    } else {
448      validateTable(conf, table, family, valueMultiplier, isDryRun);
449    }
450
451    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
452      LOG.debug("Deleting test subdirectory");
453      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
454    }
455    return tool;
456  }
457
458  /**
459   * Confirm ImportTsv via data in online table.
460   */
461  private static void validateTable(Configuration conf, TableName tableName, String family,
462    int valueMultiplier, boolean isDryRun) throws IOException {
463
464    LOG.debug("Validating table.");
465    Connection connection = ConnectionFactory.createConnection(conf);
466    Table table = connection.getTable(tableName);
467    boolean verified = false;
468    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
469    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
470    for (int i = 0; i < numRetries; i++) {
471      try {
472        Scan scan = new Scan();
473        // Scan entire family.
474        scan.addFamily(Bytes.toBytes(family));
475        ResultScanner resScanner = table.getScanner(scan);
476        int numRows = 0;
477        for (Result res : resScanner) {
478          numRows++;
479          assertEquals(2, res.size());
480          List<Cell> kvs = res.listCells();
481          assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY")));
482          assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY")));
483          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
484          assertTrue(
485            CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
486          // Only one result set is expected, so let it loop.
487        }
488        if (isDryRun) {
489          assertEquals(0, numRows);
490        } else {
491          assertEquals(1, numRows);
492        }
493        verified = true;
494        break;
495      } catch (NullPointerException e) {
496        // If here, a cell was empty. Presume its because updates came in
497        // after the scanner had been opened. Wait a while and retry.
498      }
499      try {
500        Thread.sleep(pause);
501      } catch (InterruptedException e) {
502        // continue
503      }
504    }
505    table.close();
506    connection.close();
507    assertTrue(verified);
508  }
509
510  /**
511   * Confirm ImportTsv via HFiles on fs.
512   */
513  private static void validateHFiles(FileSystem fs, String outputPath, String family,
514    int expectedKVCount) throws IOException {
515    // validate number and content of output columns
516    LOG.debug("Validating HFiles.");
517    Set<String> configFamilies = new HashSet<>();
518    configFamilies.add(family);
519    Set<String> foundFamilies = new HashSet<>();
520    int actualKVCount = 0;
521    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
522      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
523      String cf = elements[elements.length - 1];
524      foundFamilies.add(cf);
525      assertTrue(configFamilies.contains(cf),
526        String.format(
527          "HFile output contains a column family (%s) not present in input families (%s)", cf,
528          configFamilies));
529      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
530        assertTrue(hfile.getLen() > 0,
531          String.format("HFile %s appears to contain no data.", hfile.getPath()));
532        // count the number of KVs from all the hfiles
533        if (expectedKVCount > -1) {
534          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
535        }
536      }
537    }
538    assertTrue(foundFamilies.contains(family),
539      String.format("HFile output does not contain the input family '%s'.", family));
540    if (expectedKVCount > -1) {
541      assertTrue(actualKVCount == expectedKVCount,
542        String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>",
543          actualKVCount, expectedKVCount));
544    }
545  }
546
547  /**
548   * Method returns the total KVs in given hfile
549   * @param fs File System
550   * @param p  HFile path
551   * @return KV count in the given hfile
552   */
553  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
554    Configuration conf = util.getConfiguration();
555    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
556    HFileScanner scanner = reader.getScanner(conf, false, false);
557    scanner.seekTo();
558    int count = 0;
559    do {
560      count++;
561    } while (scanner.next());
562    reader.close();
563    return count;
564  }
565}