001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import org.apache.hadoop.conf.Configurable;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.TableNotFoundException;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileScanner;
054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
060import org.apache.hadoop.mapreduce.Job;
061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.ExpectedException;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({ VerySlowMapReduceTests.class, LargeTests.class })
076public class TestImportTsv implements Configurable {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestImportTsv.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestImportTsv.class);
083  protected static final String NAME = TestImportTsv.class.getSimpleName();
084  protected static HBaseTestingUtil util = new HBaseTestingUtil();
085
086  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
087  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
088
089  /**
090   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
091   */
092  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
093
094  private final String FAMILY = "FAM";
095  private TableName tn;
096  private Map<String, String> args;
097
098  @Rule
099  public ExpectedException exception = ExpectedException.none();
100
101  public Configuration getConf() {
102    return util.getConfiguration();
103  }
104
105  public void setConf(Configuration conf) {
106    throw new IllegalArgumentException("setConf not supported");
107  }
108
109  @BeforeClass
110  public static void provisionCluster() throws Exception {
111    util.startMiniCluster();
112  }
113
114  @AfterClass
115  public static void releaseCluster() throws Exception {
116    util.shutdownMiniCluster();
117  }
118
119  @Before
120  public void setup() throws Exception {
121    tn = TableName.valueOf("test-" + util.getRandomUUID());
122    args = new HashMap<>();
123    // Prepare the arguments required for the test.
124    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
125    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
126  }
127
128  @Test
129  public void testMROnTable() throws Exception {
130    util.createTable(tn, FAMILY);
131    doMROnTableTest(null, 1);
132    util.deleteTable(tn);
133  }
134
135  @Test
136  public void testMROnTableWithTimestamp() throws Exception {
137    util.createTable(tn, FAMILY);
138    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
139    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
140    String data = "KEY,1234,VALUE1,VALUE2\n";
141
142    doMROnTableTest(data, 1);
143    util.deleteTable(tn);
144  }
145
146  @Test
147  public void testMROnTableWithCustomMapper() throws Exception {
148    util.createTable(tn, FAMILY);
149    args.put(ImportTsv.MAPPER_CONF_KEY,
150      "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
151
152    doMROnTableTest(null, 3);
153    util.deleteTable(tn);
154  }
155
156  @Test
157  public void testBulkOutputWithoutAnExistingTable() throws Exception {
158    // Prepare the arguments required for the test.
159    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
160    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
161
162    doMROnTableTest(null, 3);
163    util.deleteTable(tn);
164  }
165
166  @Test
167  public void testBulkOutputWithAnExistingTable() throws Exception {
168    util.createTable(tn, FAMILY);
169
170    // Prepare the arguments required for the test.
171    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
172    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
173
174    doMROnTableTest(null, 3);
175    util.deleteTable(tn);
176  }
177
178  @Test
179  public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
180    util.createTable(tn, FAMILY);
181
182    // Prepare the arguments required for the test.
183    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
184    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
185    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
186    doMROnTableTest(null, 3);
187    util.deleteTable(tn);
188  }
189
190  @Test
191  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
192    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
193    String INPUT_FILE = "InputFile1.csv";
194    // Prepare the arguments required for the test.
195    String[] args = new String[] {
196      "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
197      "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
198      "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
199      "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), tn.getNameAsString(),
200      INPUT_FILE };
201    assertEquals("running test job configuration failed.", 0,
202      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
203        @Override
204        public int run(String[] args) throws Exception {
205          Job job = createSubmittableJob(getConf(), args);
206          assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
207          assertTrue(job.getReducerClass().equals(TextSortReducer.class));
208          assertTrue(job.getMapOutputValueClass().equals(Text.class));
209          return 0;
210        }
211      }, args));
212    // Delete table created by createSubmittableJob.
213    util.deleteTable(tn);
214  }
215
216  @Test
217  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
218    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
219    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
220    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
221    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
222    doMROnTableTest(data, 4);
223    util.deleteTable(tn);
224  }
225
226  @Test
227  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
228    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
229
230    Configuration conf = new Configuration(util.getConfiguration());
231    conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
232    conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
233    conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
234    exception.expect(TableNotFoundException.class);
235    assertEquals("running test job configuration failed.", 0,
236      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
237        @Override
238        public int run(String[] args) throws Exception {
239          createSubmittableJob(getConf(), args);
240          return 0;
241        }
242      }, args));
243  }
244
245  @Test
246  public void testMRNoMatchedColumnFamily() throws Exception {
247    util.createTable(tn, FAMILY);
248
249    String[] args = new String[] {
250      "-D" + ImportTsv.COLUMNS_CONF_KEY
251        + "=HBASE_ROW_KEY,FAM:A,FAM01_ERROR:A,FAM01_ERROR:B,FAM02_ERROR:C",
252      tn.getNameAsString(), "/inputFile" };
253    exception.expect(NoSuchColumnFamilyException.class);
254    assertEquals("running test job configuration failed.", 0,
255      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
256        @Override
257        public int run(String[] args) throws Exception {
258          createSubmittableJob(getConf(), args);
259          return 0;
260        }
261      }, args));
262
263    util.deleteTable(tn);
264  }
265
266  @Test
267  public void testMRWithoutAnExistingTable() throws Exception {
268    String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
269
270    exception.expect(TableNotFoundException.class);
271    assertEquals("running test job configuration failed.", 0,
272      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
273        @Override
274        public int run(String[] args) throws Exception {
275          createSubmittableJob(getConf(), args);
276          return 0;
277        }
278      }, args));
279  }
280
281  @Test
282  public void testJobConfigurationsWithDryMode() throws Exception {
283    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
284    String INPUT_FILE = "InputFile1.csv";
285    // Prepare the arguments required for the test.
286    String[] argsArray =
287      new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
288        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
289        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
290        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", tn.getNameAsString(), INPUT_FILE };
291    assertEquals("running test job configuration failed.", 0,
292      ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
293        @Override
294        public int run(String[] args) throws Exception {
295          Job job = createSubmittableJob(getConf(), args);
296          assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
297          return 0;
298        }
299      }, argsArray));
300    // Delete table created by createSubmittableJob.
301    util.deleteTable(tn);
302  }
303
304  @Test
305  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
306    util.createTable(tn, FAMILY);
307    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
308    doMROnTableTest(null, 1);
309    // Dry mode should not delete an existing table. If it's not present,
310    // this will throw TableNotFoundException.
311    util.deleteTable(tn);
312  }
313
314  /**
315   * If table is not present in non-bulk mode, dry run should fail just like normal mode.
316   */
317  @Test
318  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
319    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
320    exception.expect(TableNotFoundException.class);
321    doMROnTableTest(null, 1);
322  }
323
324  @Test
325  public void testDryModeWithBulkOutputAndTableExists() throws Exception {
326    util.createTable(tn, FAMILY);
327    // Prepare the arguments required for the test.
328    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
329    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
330    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
331    doMROnTableTest(null, 1);
332    // Dry mode should not delete an existing table. If it's not present,
333    // this will throw TableNotFoundException.
334    util.deleteTable(tn);
335  }
336
337  /**
338   * If table is not present in bulk mode and create.table is not set to yes, import should fail
339   * with TableNotFoundException.
340   */
341  @Test
342  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws Exception {
343    // Prepare the arguments required for the test.
344    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
345    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
346    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
347    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
348    exception.expect(TableNotFoundException.class);
349    doMROnTableTest(null, 1);
350  }
351
352  @Test
353  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
354    // Prepare the arguments required for the test.
355    Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
356    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
357    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
358    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
359    doMROnTableTest(null, 1);
360    // Verify temporary table was deleted.
361    exception.expect(TableNotFoundException.class);
362    util.deleteTable(tn);
363  }
364
365  /**
366   * If there are invalid data rows as inputs, then only those rows should be ignored.
367   */
368  @Test
369  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
370    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
371    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
372    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
373    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
374    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
375    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
376    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
377    doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
378    util.deleteTable(tn);
379  }
380
381  @Test
382  public void testSkipEmptyColumns() throws Exception {
383    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
384    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
385    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
386    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
387    args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
388    // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
389    String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
390    doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
391    util.deleteTable(tn);
392  }
393
394  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
395    return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier, -1);
396  }
397
398  protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family,
399    String data, Map<String, String> args) throws Exception {
400    return doMROnTableTest(util, table, family, data, args, 1, -1);
401  }
402
403  /**
404   * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv
405   * <code>Tool</code> instance so that other tests can inspect it for further validation as
406   * necessary. This method is static to insure non-reliance on instance's util/conf facilities.
407   * @param args Any arguments to pass BEFORE inputFile path is appended.
408   * @return The Tool instance used to run the test.
409   */
410  protected static Tool doMROnTableTest(HBaseTestingUtil util, TableName table, String family,
411    String data, Map<String, String> args, int valueMultiplier, int expectedKVCount)
412    throws Exception {
413    Configuration conf = new Configuration(util.getConfiguration());
414
415    // populate input file
416    FileSystem fs = FileSystem.get(conf);
417    Path inputPath =
418      fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
419    FSDataOutputStream op = fs.create(inputPath, true);
420    if (data == null) {
421      data = "KEY\u001bVALUE1\u001bVALUE2\n";
422    }
423    op.write(Bytes.toBytes(data));
424    op.close();
425    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
426
427    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
428      LOG.debug("Forcing combiner.");
429      conf.setInt("mapreduce.map.combine.minspills", 1);
430    }
431
432    // Build args array.
433    String[] argsArray = new String[args.size() + 2];
434    Iterator it = args.entrySet().iterator();
435    int i = 0;
436    while (it.hasNext()) {
437      Map.Entry pair = (Map.Entry) it.next();
438      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
439      i++;
440    }
441    argsArray[i] = table.getNameAsString();
442    argsArray[i + 1] = inputPath.toString();
443
444    // run the import
445    Tool tool = new ImportTsv();
446    LOG.debug("Running ImportTsv with arguments: " + Arrays.toString(argsArray));
447    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
448
449    // Perform basic validation. If the input args did not include
450    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
451    // Otherwise, validate presence of hfiles.
452    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY)
453      && "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
454    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
455      if (isDryRun) {
456        assertFalse(String.format("Dry run mode, %s should not have been created.",
457          ImportTsv.BULK_OUTPUT_CONF_KEY), fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
458      } else {
459        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family, expectedKVCount);
460      }
461    } else {
462      validateTable(conf, table, family, valueMultiplier, isDryRun);
463    }
464
465    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
466      LOG.debug("Deleting test subdirectory");
467      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
468    }
469    return tool;
470  }
471
472  /**
473   * Confirm ImportTsv via data in online table.
474   */
475  private static void validateTable(Configuration conf, TableName tableName, String family,
476    int valueMultiplier, boolean isDryRun) throws IOException {
477
478    LOG.debug("Validating table.");
479    Connection connection = ConnectionFactory.createConnection(conf);
480    Table table = connection.getTable(tableName);
481    boolean verified = false;
482    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
483    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
484    for (int i = 0; i < numRetries; i++) {
485      try {
486        Scan scan = new Scan();
487        // Scan entire family.
488        scan.addFamily(Bytes.toBytes(family));
489        ResultScanner resScanner = table.getScanner(scan);
490        int numRows = 0;
491        for (Result res : resScanner) {
492          numRows++;
493          assertEquals(2, res.size());
494          List<Cell> kvs = res.listCells();
495          assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY")));
496          assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY")));
497          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
498          assertTrue(
499            CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
500          // Only one result set is expected, so let it loop.
501        }
502        if (isDryRun) {
503          assertEquals(0, numRows);
504        } else {
505          assertEquals(1, numRows);
506        }
507        verified = true;
508        break;
509      } catch (NullPointerException e) {
510        // If here, a cell was empty. Presume its because updates came in
511        // after the scanner had been opened. Wait a while and retry.
512      }
513      try {
514        Thread.sleep(pause);
515      } catch (InterruptedException e) {
516        // continue
517      }
518    }
519    table.close();
520    connection.close();
521    assertTrue(verified);
522  }
523
524  /**
525   * Confirm ImportTsv via HFiles on fs.
526   */
527  private static void validateHFiles(FileSystem fs, String outputPath, String family,
528    int expectedKVCount) throws IOException {
529    // validate number and content of output columns
530    LOG.debug("Validating HFiles.");
531    Set<String> configFamilies = new HashSet<>();
532    configFamilies.add(family);
533    Set<String> foundFamilies = new HashSet<>();
534    int actualKVCount = 0;
535    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
536      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
537      String cf = elements[elements.length - 1];
538      foundFamilies.add(cf);
539      assertTrue(String.format(
540        "HFile output contains a column family (%s) not present in input families (%s)", cf,
541        configFamilies), configFamilies.contains(cf));
542      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
543        assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()),
544          hfile.getLen() > 0);
545        // count the number of KVs from all the hfiles
546        if (expectedKVCount > -1) {
547          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
548        }
549      }
550    }
551    assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
552      foundFamilies.contains(family));
553    if (expectedKVCount > -1) {
554      assertTrue(
555        String.format("KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>",
556          actualKVCount, expectedKVCount),
557        actualKVCount == expectedKVCount);
558    }
559  }
560
561  /**
562   * Method returns the total KVs in given hfile
563   * @param fs File System
564   * @param p  HFile path
565   * @return KV count in the given hfile
566   */
567  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
568    Configuration conf = util.getConfiguration();
569    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
570    HFileScanner scanner = reader.getScanner(conf, false, false);
571    scanner.seekTo();
572    int count = 0;
573    do {
574      count++;
575    } while (scanner.next());
576    reader.close();
577    return count;
578  }
579}