001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static java.lang.String.format; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeSet; 033import java.util.UUID; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.IntegrationTestingUtility; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 063 064/** 065 * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster. 066 */ 067@Category(IntegrationTests.class) 068public class IntegrationTestImportTsv extends Configured implements Tool { 069 070 private static final String NAME = IntegrationTestImportTsv.class.getSimpleName(); 071 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class); 072 private static final String GENERATED_HFILE_FOLDER_PARAM_KEY = 073 "IntegrationTestImportTsv.generatedHFileFolder"; 074 075 protected static final String simple_tsv = "row1\t1\tc1\tc2\n" + "row2\t1\tc1\tc2\n" 076 + "row3\t1\tc1\tc2\n" + "row4\t1\tc1\tc2\n" + "row5\t1\tc1\tc2\n" + "row6\t1\tc1\tc2\n" 077 + "row7\t1\tc1\tc2\n" + "row8\t1\tc1\tc2\n" + "row9\t1\tc1\tc2\n" + "row10\t1\tc1\tc2\n"; 078 079 @Rule 080 public TestName name = new TestName(); 081 082 protected static final Set<KeyValue> simple_expected = 083 new TreeSet<KeyValue>(CellComparator.getInstance()) { 084 private static final long serialVersionUID = 1L; 085 { 086 byte[] family = Bytes.toBytes("d"); 087 for (String line : Splitter.on('\n').split(simple_tsv)) { 088 String[] row = line.split("\t"); 089 byte[] key = Bytes.toBytes(row[0]); 090 long ts = Long.parseLong(row[1]); 091 byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; 092 add(new KeyValue(key, family, fields[0], ts, KeyValue.Type.Put, fields[0])); 093 add(new KeyValue(key, family, fields[1], ts, KeyValue.Type.Put, fields[1])); 094 } 095 } 096 }; 097 098 // this instance is initialized on first access when the test is run from 099 // JUnit/Maven or by main when run from the CLI. 100 protected static IntegrationTestingUtility util = null; 101 102 @Override 103 public Configuration getConf() { 104 return util.getConfiguration(); 105 } 106 107 @Override 108 public void setConf(Configuration conf) { 109 LOG.debug("Ignoring setConf call."); 110 } 111 112 @BeforeClass 113 public static void provisionCluster() throws Exception { 114 if (null == util) { 115 util = new IntegrationTestingUtility(); 116 } 117 util.initializeCluster(1); 118 if (!util.isDistributedCluster()) { 119 // also need MR when running without a real cluster 120 util.startMiniMapReduceCluster(); 121 } 122 } 123 124 @AfterClass 125 public static void releaseCluster() throws Exception { 126 util.restoreCluster(); 127 if (!util.isDistributedCluster()) { 128 util.shutdownMiniMapReduceCluster(); 129 } 130 util = null; 131 } 132 133 /** 134 * Verify the data described by <code>simple_tsv</code> matches <code>simple_expected</code>. 135 */ 136 protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName) throws Exception { 137 138 String[] args = { hfiles.toString(), tableName.getNameAsString() }; 139 LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args))); 140 assertEquals("Loading HFiles failed.", 0, 141 ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args)); 142 143 Table table = null; 144 Scan scan = new Scan() { 145 { 146 setCacheBlocks(false); 147 setCaching(1000); 148 } 149 }; 150 try { 151 table = util.getConnection().getTable(tableName); 152 Iterator<Result> resultsIt = table.getScanner(scan).iterator(); 153 Iterator<KeyValue> expectedIt = simple_expected.iterator(); 154 while (resultsIt.hasNext() && expectedIt.hasNext()) { 155 Result r = resultsIt.next(); 156 for (Cell actual : r.rawCells()) { 157 assertTrue("Ran out of expected values prematurely!", expectedIt.hasNext()); 158 KeyValue expected = expectedIt.next(); 159 assertEquals("Scan produced surprising result", 0, 160 CellComparator.getInstance().compare(expected, actual)); 161 } 162 } 163 assertFalse("Did not consume all expected values.", expectedIt.hasNext()); 164 assertFalse("Did not consume all scan results.", resultsIt.hasNext()); 165 } finally { 166 if (null != table) table.close(); 167 } 168 } 169 170 /** 171 * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. 172 */ 173 protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { 174 if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) return; 175 176 FileSystem fs = FileSystem.get(conf); 177 Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); 178 assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile)); 179 } 180 181 @Test 182 public void testGenerateAndLoad() throws Exception { 183 generateAndLoad(TableName.valueOf(name.getMethodName())); 184 } 185 186 void generateAndLoad(final TableName table) throws Exception { 187 LOG.info("Running test testGenerateAndLoad."); 188 String cf = "d"; 189 Path hfiles = initGeneratedHFilePath(table); 190 LOG.info("The folder where the HFiles will be generated: {}", hfiles.toString()); 191 192 Map<String, String> args = new HashMap<>(); 193 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 194 args.put(ImportTsv.COLUMNS_CONF_KEY, format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf)); 195 // configure the test harness to NOT delete the HFiles after they're 196 // generated. We need those for doLoadIncrementalHFiles 197 args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false"); 198 199 // run the job, complete the load. 200 util.createTable(table, new String[] { cf }); 201 Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args); 202 doLoadIncrementalHFiles(hfiles, table); 203 204 // validate post-conditions 205 validateDeletedPartitionsFile(t.getConf()); 206 207 // clean up after ourselves. 208 util.deleteTable(table); 209 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 210 LOG.info("testGenerateAndLoad completed successfully."); 211 } 212 213 @Override 214 public int run(String[] args) throws Exception { 215 if (args.length != 0) { 216 System.err.println(format("%s [genericOptions]", NAME)); 217 System.err.println(" Runs ImportTsv integration tests against a distributed cluster."); 218 System.err.println(); 219 System.err.println(" Use '-D" + GENERATED_HFILE_FOLDER_PARAM_KEY + "=<path>' to define a"); 220 System.err.println(" base folder for the generated HFiles. If HDFS Transparent Encryption"); 221 System.err.println(" is configured, then make sure to set this parameter to a folder in"); 222 System.err.println(" the same encryption zone in HDFS as the HBase root directory,"); 223 System.err.println(" otherwise the bulkload will fail."); 224 System.err.println(); 225 ToolRunner.printGenericCommandUsage(System.err); 226 return 1; 227 } 228 229 // adding more test methods? Don't forget to add them here... or consider doing what 230 // IntegrationTestsDriver does. 231 provisionCluster(); 232 TableName tableName = TableName.valueOf("IntegrationTestImportTsv"); 233 if (util.getAdmin().tableExists(tableName)) { 234 util.deleteTable(tableName); 235 } 236 generateAndLoad(tableName); 237 releaseCluster(); 238 239 return 0; 240 } 241 242 private Path initGeneratedHFilePath(final TableName table) throws IOException { 243 String folderParam = getConf().getTrimmed(GENERATED_HFILE_FOLDER_PARAM_KEY); 244 if (folderParam == null || folderParam.isEmpty()) { 245 // by default, fall back to the test data dir 246 return new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 247 } 248 249 Path hfiles = new Path(folderParam, UUID.randomUUID().toString()); 250 FileSystem fs = util.getTestFileSystem(); 251 String shouldPreserve = System.getProperty("hbase.testing.preserve.testdir", "false"); 252 if (!Boolean.parseBoolean(shouldPreserve)) { 253 if (fs.getUri().getScheme().equals(FileSystem.getLocal(getConf()).getUri().getScheme())) { 254 File localFoler = new File(hfiles.toString()); 255 localFoler.deleteOnExit(); 256 } else { 257 fs.deleteOnExit(hfiles); 258 } 259 } 260 return hfiles; 261 } 262 263 public static void main(String[] args) throws Exception { 264 Configuration conf = HBaseConfiguration.create(); 265 IntegrationTestingUtility.setUseDistributedCluster(conf); 266 util = new IntegrationTestingUtility(conf); 267 int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args); 268 System.exit(status); 269 } 270}