001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static java.lang.String.format;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.conf.Configured;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparator;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.IntegrationTestingUtility;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValue.Type;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.testclassification.IntegrationTests;
048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
051import org.apache.hadoop.util.Tool;
052import org.apache.hadoop.util.ToolRunner;
053import org.junit.AfterClass;
054import org.junit.BeforeClass;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster.
064 */
065@Category(IntegrationTests.class)
066public class IntegrationTestImportTsv extends Configured implements Tool {
067
068  private static final String NAME = IntegrationTestImportTsv.class.getSimpleName();
069  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class);
070
071  protected static final String simple_tsv =
072      "row1\t1\tc1\tc2\n" +
073      "row2\t1\tc1\tc2\n" +
074      "row3\t1\tc1\tc2\n" +
075      "row4\t1\tc1\tc2\n" +
076      "row5\t1\tc1\tc2\n" +
077      "row6\t1\tc1\tc2\n" +
078      "row7\t1\tc1\tc2\n" +
079      "row8\t1\tc1\tc2\n" +
080      "row9\t1\tc1\tc2\n" +
081      "row10\t1\tc1\tc2\n";
082
083  @Rule
084  public TestName name = new TestName();
085
086  protected static final Set<KeyValue> simple_expected =
087      new TreeSet<KeyValue>(CellComparator.getInstance()) {
088    private static final long serialVersionUID = 1L;
089    {
090      byte[] family = Bytes.toBytes("d");
091      for (String line : simple_tsv.split("\n")) {
092        String[] row = line.split("\t");
093        byte[] key = Bytes.toBytes(row[0]);
094        long ts = Long.parseLong(row[1]);
095        byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
096        add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0]));
097        add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1]));
098      }
099    }
100  };
101
102  // this instance is initialized on first access when the test is run from
103  // JUnit/Maven or by main when run from the CLI.
104  protected static IntegrationTestingUtility util = null;
105
106  public Configuration getConf() {
107    return util.getConfiguration();
108  }
109
110  public void setConf(Configuration conf) {
111    LOG.debug("Ignoring setConf call.");
112  }
113
114  @BeforeClass
115  public static void provisionCluster() throws Exception {
116    if (null == util) {
117      util = new IntegrationTestingUtility();
118    }
119    util.initializeCluster(1);
120    if (!util.isDistributedCluster()) {
121      // also need MR when running without a real cluster
122      util.startMiniMapReduceCluster();
123    }
124  }
125
126  @AfterClass
127  public static void releaseCluster() throws Exception {
128    util.restoreCluster();
129    if (!util.isDistributedCluster()) {
130      util.shutdownMiniMapReduceCluster();
131    }
132    util = null;
133  }
134
135  /**
136   * Verify the data described by <code>simple_tsv</code> matches
137   * <code>simple_expected</code>.
138   */
139  protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName)
140      throws Exception {
141
142    String[] args = { hfiles.toString(), tableName.getNameAsString() };
143    LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
144    assertEquals("Loading HFiles failed.",
145      0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args));
146
147    Table table = null;
148    Scan scan = new Scan() {{
149      setCacheBlocks(false);
150      setCaching(1000);
151    }};
152    try {
153      table = util.getConnection().getTable(tableName);
154      Iterator<Result> resultsIt = table.getScanner(scan).iterator();
155      Iterator<KeyValue> expectedIt = simple_expected.iterator();
156      while (resultsIt.hasNext() && expectedIt.hasNext()) {
157        Result r = resultsIt.next();
158        for (Cell actual : r.rawCells()) {
159          assertTrue(
160            "Ran out of expected values prematurely!",
161            expectedIt.hasNext());
162          KeyValue expected = expectedIt.next();
163          assertEquals("Scan produced surprising result", 0,
164            CellComparator.getInstance().compare(expected, actual));
165        }
166      }
167      assertFalse("Did not consume all expected values.", expectedIt.hasNext());
168      assertFalse("Did not consume all scan results.", resultsIt.hasNext());
169    } finally {
170      if (null != table) table.close();
171    }
172  }
173
174  /**
175   * Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
176   */
177  protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
178    if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
179      return;
180
181    FileSystem fs = FileSystem.get(conf);
182    Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
183    assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
184  }
185
186  @Test
187  public void testGenerateAndLoad() throws Exception {
188    generateAndLoad(TableName.valueOf(name.getMethodName()));
189  }
190
191  void generateAndLoad(final TableName table) throws Exception {
192    LOG.info("Running test testGenerateAndLoad.");
193    String cf = "d";
194    Path hfiles = new Path(
195        util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
196
197    Map<String, String> args = new HashMap<>();
198    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
199    args.put(ImportTsv.COLUMNS_CONF_KEY,
200        format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
201    // configure the test harness to NOT delete the HFiles after they're
202    // generated. We need those for doLoadIncrementalHFiles
203    args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
204
205    // run the job, complete the load.
206    util.createTable(table, new String[]{cf});
207    Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args);
208    doLoadIncrementalHFiles(hfiles, table);
209
210    // validate post-conditions
211    validateDeletedPartitionsFile(t.getConf());
212
213    // clean up after ourselves.
214    util.deleteTable(table);
215    util.cleanupDataTestDirOnTestFS(table.getNameAsString());
216    LOG.info("testGenerateAndLoad completed successfully.");
217  }
218
219  public int run(String[] args) throws Exception {
220    if (args.length != 0) {
221      System.err.println(format("%s [genericOptions]", NAME));
222      System.err.println("  Runs ImportTsv integration tests against a distributed cluster.");
223      System.err.println();
224      ToolRunner.printGenericCommandUsage(System.err);
225      return 1;
226    }
227
228    // adding more test methods? Don't forget to add them here... or consider doing what
229    // IntegrationTestsDriver does.
230    provisionCluster();
231    TableName tableName = TableName.valueOf("IntegrationTestImportTsv");
232    if (util.getAdmin().tableExists(tableName)) {
233      util.deleteTable(tableName);
234    }
235    generateAndLoad(tableName);
236    releaseCluster();
237
238    return 0;
239  }
240
241  public static void main(String[] args) throws Exception {
242    Configuration conf = HBaseConfiguration.create();
243    IntegrationTestingUtility.setUseDistributedCluster(conf);
244    util = new IntegrationTestingUtility(conf);
245    int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args);
246    System.exit(status);
247  }
248}