001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static java.lang.String.format; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeSet; 033import java.util.UUID; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.IntegrationTestingUtility; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.junit.jupiter.api.AfterAll; 054import org.junit.jupiter.api.BeforeAll; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.junit.jupiter.api.TestInfo; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 062import org.apache.hbase.thirdparty.com.google.common.base.Strings; 063 064/** 065 * Validate ImportTsv + BulkLoadFiles on a distributed cluster. 066 */ 067@Tag(IntegrationTests.TAG) 068public class IntegrationTestImportTsv extends Configured implements Tool { 069 070 private static final String NAME = IntegrationTestImportTsv.class.getSimpleName(); 071 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestImportTsv.class); 072 private static final String GENERATED_HFILE_FOLDER_PARAM_KEY = 073 "IntegrationTestImportTsv.generatedHFileFolder"; 074 075 protected static final String simple_tsv = "row1\t1\tc1\tc2\n" + "row2\t1\tc1\tc2\n" 076 + "row3\t1\tc1\tc2\n" + "row4\t1\tc1\tc2\n" + "row5\t1\tc1\tc2\n" + "row6\t1\tc1\tc2\n" 077 + "row7\t1\tc1\tc2\n" + "row8\t1\tc1\tc2\n" + "row9\t1\tc1\tc2\n" + "row10\t1\tc1\tc2\n"; 078 079 protected static final Set<KeyValue> simple_expected = 080 new TreeSet<KeyValue>(CellComparator.getInstance()) { 081 private static final long serialVersionUID = 1L; 082 { 083 byte[] family = Bytes.toBytes("d"); 084 for (String line : Splitter.on('\n').split(simple_tsv)) { 085 if (Strings.isNullOrEmpty(line)) { 086 continue; 087 } 088 String[] row = line.split("\t"); 089 byte[] key = Bytes.toBytes(row[0]); 090 long ts = Long.parseLong(row[1]); 091 byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; 092 add(new KeyValue(key, family, fields[0], ts, KeyValue.Type.Put, fields[0])); 093 add(new KeyValue(key, family, fields[1], ts, KeyValue.Type.Put, fields[1])); 094 } 095 } 096 }; 097 098 // this instance is initialized on first access when the test is run from 099 // JUnit/Maven or by main when run from the CLI. 100 protected static IntegrationTestingUtility util = null; 101 102 @Override 103 public Configuration getConf() { 104 return util.getConfiguration(); 105 } 106 107 @Override 108 public void setConf(Configuration conf) { 109 LOG.debug("Ignoring setConf call."); 110 } 111 112 @BeforeAll 113 public static void provisionCluster() throws Exception { 114 if (null == util) { 115 util = new IntegrationTestingUtility(); 116 } 117 util.initializeCluster(1); 118 if (!util.isDistributedCluster()) { 119 // also need MR when running without a real cluster 120 util.startMiniMapReduceCluster(); 121 } 122 } 123 124 @AfterAll 125 public static void releaseCluster() throws Exception { 126 util.restoreCluster(); 127 if (!util.isDistributedCluster()) { 128 util.shutdownMiniMapReduceCluster(); 129 } 130 util = null; 131 } 132 133 /** 134 * Verify the data described by <code>simple_tsv</code> matches <code>simple_expected</code>. 135 */ 136 protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName) throws Exception { 137 138 String[] args = { hfiles.toString(), tableName.getNameAsString() }; 139 LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args))); 140 assertEquals(0, ToolRunner.run(new BulkLoadHFilesTool(getConf()), args), 141 "Loading HFiles failed."); 142 143 Table table = null; 144 Scan scan = new Scan(); 145 scan.setCacheBlocks(false); 146 scan.setCaching(1000); 147 148 try { 149 table = util.getConnection().getTable(tableName); 150 Iterator<Result> resultsIt = table.getScanner(scan).iterator(); 151 Iterator<KeyValue> expectedIt = simple_expected.iterator(); 152 while (resultsIt.hasNext() && expectedIt.hasNext()) { 153 Result r = resultsIt.next(); 154 for (Cell actual : r.rawCells()) { 155 assertTrue(expectedIt.hasNext(), "Ran out of expected values prematurely!"); 156 KeyValue expected = expectedIt.next(); 157 assertEquals(0, CellComparator.getInstance().compare(expected, actual), 158 "Scan produced surprising result"); 159 } 160 } 161 assertFalse(expectedIt.hasNext(), "Did not consume all expected values."); 162 assertFalse(resultsIt.hasNext(), "Did not consume all scan results."); 163 } finally { 164 if (null != table) table.close(); 165 } 166 } 167 168 /** 169 * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. 170 */ 171 protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { 172 if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) return; 173 174 FileSystem fs = FileSystem.get(conf); 175 Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); 176 assertFalse(fs.exists(partitionsFile), "Failed to clean up partitions file."); 177 } 178 179 @Test 180 public void testGenerateAndLoad(TestInfo testInfo) throws Exception { 181 generateAndLoad(TableName.valueOf(testInfo.getTestMethod().get().getName())); 182 } 183 184 void generateAndLoad(final TableName table) throws Exception { 185 LOG.info("Running test testGenerateAndLoad."); 186 String cf = "d"; 187 Path hfiles = initGeneratedHFilePath(table); 188 LOG.info("The folder where the HFiles will be generated: {}", hfiles.toString()); 189 190 Map<String, String> args = new HashMap<>(); 191 args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); 192 args.put(ImportTsv.COLUMNS_CONF_KEY, format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf)); 193 // configure the test harness to NOT delete the HFiles after they're 194 // generated. We need those for doLoadIncrementalHFiles 195 args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false"); 196 197 // run the job, complete the load. 198 util.createTable(table, new String[] { cf }); 199 Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args); 200 doLoadIncrementalHFiles(hfiles, table); 201 202 // validate post-conditions 203 validateDeletedPartitionsFile(t.getConf()); 204 205 // clean up after ourselves. 206 util.deleteTable(table); 207 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 208 LOG.info("testGenerateAndLoad completed successfully."); 209 } 210 211 @Override 212 public int run(String[] args) throws Exception { 213 if (args.length != 0) { 214 System.err.println(format("%s [genericOptions]", NAME)); 215 System.err.println(" Runs ImportTsv integration tests against a distributed cluster."); 216 System.err.println(); 217 System.err.println(" Use '-D" + GENERATED_HFILE_FOLDER_PARAM_KEY + "=<path>' to define a"); 218 System.err.println(" base folder for the generated HFiles. If HDFS Transparent Encryption"); 219 System.err.println(" is configured, then make sure to set this parameter to a folder in"); 220 System.err.println(" the same encryption zone in HDFS as the HBase root directory,"); 221 System.err.println(" otherwise the bulkload will fail."); 222 System.err.println(); 223 ToolRunner.printGenericCommandUsage(System.err); 224 return 1; 225 } 226 227 // adding more test methods? Don't forget to add them here... or consider doing what 228 // IntegrationTestsDriver does. 229 provisionCluster(); 230 TableName tableName = TableName.valueOf("IntegrationTestImportTsv"); 231 if (util.getAdmin().tableExists(tableName)) { 232 util.deleteTable(tableName); 233 } 234 generateAndLoad(tableName); 235 releaseCluster(); 236 237 return 0; 238 } 239 240 private Path initGeneratedHFilePath(final TableName table) throws IOException { 241 String folderParam = getConf().getTrimmed(GENERATED_HFILE_FOLDER_PARAM_KEY); 242 if (folderParam == null || folderParam.isEmpty()) { 243 // by default, fall back to the test data dir 244 return new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); 245 } 246 247 Path hfiles = new Path(folderParam, UUID.randomUUID().toString()); 248 FileSystem fs = util.getTestFileSystem(); 249 String shouldPreserve = System.getProperty("hbase.testing.preserve.testdir", "false"); 250 if (!Boolean.parseBoolean(shouldPreserve)) { 251 if (fs.getUri().getScheme().equals(FileSystem.getLocal(getConf()).getUri().getScheme())) { 252 File localFoler = new File(hfiles.toString()); 253 localFoler.deleteOnExit(); 254 } else { 255 fs.deleteOnExit(hfiles); 256 } 257 } 258 return hfiles; 259 } 260 261 public static void main(String[] args) throws Exception { 262 Configuration conf = HBaseConfiguration.create(); 263 IntegrationTestingUtility.setUseDistributedCluster(conf); 264 util = new IntegrationTestingUtility(conf); 265 int status = ToolRunner.run(conf, new IntegrationTestImportTsv(), args); 266 System.exit(status); 267 } 268}