001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static java.lang.String.format; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.Iterator; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeSet; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellComparator; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.IntegrationTestingUtility; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValue.Type; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster. 064 */ 065@Category(IntegrationTests.class) 066public class IntegrationTestImportTsv extends Configured implements Tool { 067 068 private static final String NAME = IntegrationTestImportTsv.class.getSimpleName(); 069 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class); 070 071 protected static final String simple_tsv = 072 "row1\t1\tc1\tc2\n" + 073 "row2\t1\tc1\tc2\n" + 074 "row3\t1\tc1\tc2\n" + 075 "row4\t1\tc1\tc2\n" + 076 "row5\t1\tc1\tc2\n" + 077 "row6\t1\tc1\tc2\n" + 078 "row7\t1\tc1\tc2\n" + 079 "row8\t1\tc1\tc2\n" + 080 "row9\t1\tc1\tc2\n" + 081 "row10\t1\tc1\tc2\n"; 082 083 @Rule 084 public TestName name = new TestName(); 085 086 protected static final Set<KeyValue> simple_expected = 087 new TreeSet<KeyValue>(CellComparator.getInstance()) { 088 private static final long serialVersionUID = 1L; 089 { 090 byte[] family = Bytes.toBytes("d"); 091 for (String line : simple_tsv.split("\n")) { 092 String[] row = line.split("\t"); 093 byte[] key = Bytes.toBytes(row[0]); 094 long ts = Long.parseLong(row[1]); 095 byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; 096 add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0])); 097 add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1])); 098 } 099 } 100 }; 101 102 // this instance is initialized on first access when the test is run from 103 // JUnit/Maven or by main when run from the CLI. 104 protected static IntegrationTestingUtility util = null; 105 106 public Configuration getConf() { 107 return util.getConfiguration(); 108 } 109 110 public void setConf(Configuration conf) { 111 LOG.debug("Ignoring setConf call."); 112 } 113 114 @BeforeClass 115 public static void provisionCluster() throws Exception { 116 if (null == util) { 117 util = new IntegrationTestingUtility(); 118 } 119 util.initializeCluster(1); 120 if (!util.isDistributedCluster()) { 121 // also need MR when running without a real cluster 122 util.startMiniMapReduceCluster(); 123 } 124 } 125 126 @AfterClass 127 public static void releaseCluster() throws Exception { 128 util.restoreCluster(); 129 if (!util.isDistributedCluster()) { 130 util.shutdownMiniMapReduceCluster(); 131 } 132 util = null; 133 } 134 135 /** 136 * Verify the data described by <code>simple_tsv</code> matches 137 * <code>simple_expected</code>. 138 */ 139 protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName) 140 throws Exception { 141 142 String[] args = { hfiles.toString(), tableName.getNameAsString() }; 143 LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args))); 144 assertEquals("Loading HFiles failed.", 145 0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args)); 146 147 Table table = null; 148 Scan scan = new Scan() {{ 149 setCacheBlocks(false); 150 setCaching(1000); 151 }}; 152 try { 153 table = util.getConnection().getTable(tableName); 154 Iterator<Result> resultsIt = table.getScanner(scan).iterator(); 155 Iterator<KeyValue> expectedIt = simple_expected.iterator(); 156 while (resultsIt.hasNext() && expectedIt.hasNext()) { 157 Result r = resultsIt.next(); 158 for (Cell actual : r.rawCells()) { 159 assertTrue( 160 "Ran out of expected values prematurely!", 161 expectedIt.hasNext()); 162 KeyValue expected = expectedIt.next(); 163 assertEquals("Scan produced surprising result", 0, 164 CellComparator.getInstance().compare(expected, actual)); 165 } 166 } 167 assertFalse("Did not consume all expected values.", expectedIt.hasNext()); 168 assertFalse("Did not consume all scan results.", resultsIt.hasNext()); 169 } finally { 170 if (null != table) table.close(); 171 } 172 } 173 174 /** 175 * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. 176 */ 177 protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { 178 if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) 179 return; 180 181 FileSystem fs = FileSystem.get(conf); 182 Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); 183 assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile)); 184 } 185 186 @Test 187 public void testGenerateAndLoad() throws Exception { 188 generateAndLoad(TableName.valueOf(name.getMethodName())); 189 } 190 191 void generateAndLoad(final TableName table) throws Exception { 192 LOG.info("Running test testGenerateAndLoad."); 193 String cf = "d"; 194 Path hfiles = new Path( 195 util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 196 197 Map<String, String> args = new HashMap<>(); 198 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 199 args.put(ImportTsv.COLUMNS_CONF_KEY, 200 format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf)); 201 // configure the test harness to NOT delete the HFiles after they're 202 // generated. We need those for doLoadIncrementalHFiles 203 args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false"); 204 205 // run the job, complete the load. 206 util.createTable(table, new String[]{cf}); 207 Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args); 208 doLoadIncrementalHFiles(hfiles, table); 209 210 // validate post-conditions 211 validateDeletedPartitionsFile(t.getConf()); 212 213 // clean up after ourselves. 214 util.deleteTable(table); 215 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 216 LOG.info("testGenerateAndLoad completed successfully."); 217 } 218 219 public int run(String[] args) throws Exception { 220 if (args.length != 0) { 221 System.err.println(format("%s [genericOptions]", NAME)); 222 System.err.println(" Runs ImportTsv integration tests against a distributed cluster."); 223 System.err.println(); 224 ToolRunner.printGenericCommandUsage(System.err); 225 return 1; 226 } 227 228 // adding more test methods? Don't forget to add them here... or consider doing what 229 // IntegrationTestsDriver does. 230 provisionCluster(); 231 TableName tableName = TableName.valueOf("IntegrationTestImportTsv"); 232 if (util.getAdmin().tableExists(tableName)) { 233 util.deleteTable(tableName); 234 } 235 generateAndLoad(tableName); 236 releaseCluster(); 237 238 return 0; 239 } 240 241 public static void main(String[] args) throws Exception { 242 Configuration conf = HBaseConfiguration.create(); 243 IntegrationTestingUtility.setUseDistributedCluster(conf); 244 util = new IntegrationTestingUtility(conf); 245 int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args); 246 System.exit(status); 247 } 248}