001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static java.lang.String.format;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.File;
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeSet;
033import java.util.UUID;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.IntegrationTestingUtility;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.testclassification.IntegrationTests;
048import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
051import org.apache.hadoop.util.Tool;
052import org.apache.hadoop.util.ToolRunner;
053import org.junit.jupiter.api.AfterAll;
054import org.junit.jupiter.api.BeforeAll;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057import org.junit.jupiter.api.TestInfo;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
062import org.apache.hbase.thirdparty.com.google.common.base.Strings;
063
064/**
065 * Validate ImportTsv + BulkLoadFiles on a distributed cluster.
066 */
067@Tag(IntegrationTests.TAG)
068public class IntegrationTestImportTsv extends Configured implements Tool {
069
070  private static final String NAME = IntegrationTestImportTsv.class.getSimpleName();
071  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class);
072  private static final String GENERATED_HFILE_FOLDER_PARAM_KEY =
073    "IntegrationTestImportTsv.generatedHFileFolder";
074
075  protected static final String simple_tsv = "row1\t1\tc1\tc2\n" + "row2\t1\tc1\tc2\n"
076    + "row3\t1\tc1\tc2\n" + "row4\t1\tc1\tc2\n" + "row5\t1\tc1\tc2\n" + "row6\t1\tc1\tc2\n"
077    + "row7\t1\tc1\tc2\n" + "row8\t1\tc1\tc2\n" + "row9\t1\tc1\tc2\n" + "row10\t1\tc1\tc2\n";
078
079  protected static final Set<KeyValue> simple_expected =
080    new TreeSet<KeyValue>(CellComparator.getInstance()) {
081      private static final long serialVersionUID = 1L;
082      {
083        byte[] family = Bytes.toBytes("d");
084        for (String line : Splitter.on('\n').split(simple_tsv)) {
085          if (Strings.isNullOrEmpty(line)) {
086            continue;
087          }
088          String[] row = line.split("\t");
089          byte[] key = Bytes.toBytes(row[0]);
090          long ts = Long.parseLong(row[1]);
091          byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
092          add(new KeyValue(key, family, fields[0], ts, KeyValue.Type.Put, fields[0]));
093          add(new KeyValue(key, family, fields[1], ts, KeyValue.Type.Put, fields[1]));
094        }
095      }
096    };
097
098  // this instance is initialized on first access when the test is run from
099  // JUnit/Maven or by main when run from the CLI.
100  protected static IntegrationTestingUtility util = null;
101
102  @Override
103  public Configuration getConf() {
104    return util.getConfiguration();
105  }
106
107  @Override
108  public void setConf(Configuration conf) {
109    LOG.debug("Ignoring setConf call.");
110  }
111
112  @BeforeAll
113  public static void provisionCluster() throws Exception {
114    if (null == util) {
115      util = new IntegrationTestingUtility();
116    }
117    util.initializeCluster(1);
118    if (!util.isDistributedCluster()) {
119      // also need MR when running without a real cluster
120      util.startMiniMapReduceCluster();
121    }
122  }
123
124  @AfterAll
125  public static void releaseCluster() throws Exception {
126    util.restoreCluster();
127    if (!util.isDistributedCluster()) {
128      util.shutdownMiniMapReduceCluster();
129    }
130    util = null;
131  }
132
133  /**
134   * Verify the data described by <code>simple_tsv</code> matches <code>simple_expected</code>.
135   */
136  protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName) throws Exception {
137
138    String[] args = { hfiles.toString(), tableName.getNameAsString() };
139    LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
140    assertEquals(0, ToolRunner.run(new BulkLoadHFilesTool(getConf()), args),
141      "Loading HFiles failed.");
142
143    Table table = null;
144    Scan scan = new Scan();
145    scan.setCacheBlocks(false);
146    scan.setCaching(1000);
147
148    try {
149      table = util.getConnection().getTable(tableName);
150      Iterator<Result> resultsIt = table.getScanner(scan).iterator();
151      Iterator<KeyValue> expectedIt = simple_expected.iterator();
152      while (resultsIt.hasNext() && expectedIt.hasNext()) {
153        Result r = resultsIt.next();
154        for (Cell actual : r.rawCells()) {
155          assertTrue(expectedIt.hasNext(), "Ran out of expected values prematurely!");
156          KeyValue expected = expectedIt.next();
157          assertEquals(0, CellComparator.getInstance().compare(expected, actual),
158            "Scan produced surprising result");
159        }
160      }
161      assertFalse(expectedIt.hasNext(), "Did not consume all expected values.");
162      assertFalse(resultsIt.hasNext(), "Did not consume all scan results.");
163    } finally {
164      if (null != table) table.close();
165    }
166  }
167
168  /**
169   * Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
170   */
171  protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
172    if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) return;
173
174    FileSystem fs = FileSystem.get(conf);
175    Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
176    assertFalse(fs.exists(partitionsFile), "Failed to clean up partitions file.");
177  }
178
179  @Test
180  public void testGenerateAndLoad(TestInfo testInfo) throws Exception {
181    generateAndLoad(TableName.valueOf(testInfo.getTestMethod().get().getName()));
182  }
183
184  void generateAndLoad(final TableName table) throws Exception {
185    LOG.info("Running test testGenerateAndLoad.");
186    String cf = "d";
187    Path hfiles = initGeneratedHFilePath(table);
188    LOG.info("The folder where the HFiles will be generated: {}", hfiles.toString());
189
190    Map<String, String> args = new HashMap<>();
191    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
192    args.put(ImportTsv.COLUMNS_CONF_KEY, format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
193    // configure the test harness to NOT delete the HFiles after they're
194    // generated. We need those for doLoadIncrementalHFiles
195    args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
196
197    // run the job, complete the load.
198    util.createTable(table, new String[] { cf });
199    Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args);
200    doLoadIncrementalHFiles(hfiles, table);
201
202    // validate post-conditions
203    validateDeletedPartitionsFile(t.getConf());
204
205    // clean up after ourselves.
206    util.deleteTable(table);
207    util.cleanupDataTestDirOnTestFS(table.getNameAsString());
208    LOG.info("testGenerateAndLoad completed successfully.");
209  }
210
211  @Override
212  public int run(String[] args) throws Exception {
213    if (args.length != 0) {
214      System.err.println(format("%s [genericOptions]", NAME));
215      System.err.println("  Runs ImportTsv integration tests against a distributed cluster.");
216      System.err.println();
217      System.err.println("  Use '-D" + GENERATED_HFILE_FOLDER_PARAM_KEY + "=<path>' to define a");
218      System.err.println("  base folder for the generated HFiles. If HDFS Transparent Encryption");
219      System.err.println("  is configured, then make sure to set this parameter to a folder in");
220      System.err.println("  the same encryption zone in HDFS as the HBase root directory,");
221      System.err.println("  otherwise the bulkload will fail.");
222      System.err.println();
223      ToolRunner.printGenericCommandUsage(System.err);
224      return 1;
225    }
226
227    // adding more test methods? Don't forget to add them here... or consider doing what
228    // IntegrationTestsDriver does.
229    provisionCluster();
230    TableName tableName = TableName.valueOf("IntegrationTestImportTsv");
231    if (util.getAdmin().tableExists(tableName)) {
232      util.deleteTable(tableName);
233    }
234    generateAndLoad(tableName);
235    releaseCluster();
236
237    return 0;
238  }
239
240  private Path initGeneratedHFilePath(final TableName table) throws IOException {
241    String folderParam = getConf().getTrimmed(GENERATED_HFILE_FOLDER_PARAM_KEY);
242    if (folderParam == null || folderParam.isEmpty()) {
243      // by default, fall back to the test data dir
244      return new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
245    }
246
247    Path hfiles = new Path(folderParam, UUID.randomUUID().toString());
248    FileSystem fs = util.getTestFileSystem();
249    String shouldPreserve = System.getProperty("hbase.testing.preserve.testdir", "false");
250    if (!Boolean.parseBoolean(shouldPreserve)) {
251      if (fs.getUri().getScheme().equals(FileSystem.getLocal(getConf()).getUri().getScheme())) {
252        File localFoler = new File(hfiles.toString());
253        localFoler.deleteOnExit();
254      } else {
255        fs.deleteOnExit(hfiles);
256      }
257    }
258    return hfiles;
259  }
260
261  public static void main(String[] args) throws Exception {
262    Configuration conf = HBaseConfiguration.create();
263    IntegrationTestingUtility.setUseDistributedCluster(conf);
264    util = new IntegrationTestingUtility(conf);
265    int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args);
266    System.exit(status);
267  }
268}