001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.regionserver.Region; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.MapReduceTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.wal.WALEdit; 055import org.apache.hadoop.util.Tool; 056import org.apache.hadoop.util.ToolRunner; 057import org.junit.jupiter.api.AfterAll; 058import org.junit.jupiter.api.BeforeAll; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.junit.jupiter.api.TestInfo; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065@Tag(MapReduceTests.TAG) 066@Tag(LargeTests.TAG) 067public class TestImportTSVWithOperationAttributes implements Configurable { 068 069 private static final Logger LOG = 070 LoggerFactory.getLogger(TestImportTSVWithOperationAttributes.class); 071 protected static final String NAME = TestImportTsv.class.getSimpleName(); 072 protected static HBaseTestingUtil util = new HBaseTestingUtil(); 073 074 /** 075 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is false. 076 */ 077 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 078 079 /** 080 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 081 */ 082 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 083 084 private static Configuration conf; 085 086 private static final String TEST_ATR_KEY = "test"; 087 088 private final String FAMILY = "FAM"; 089 090 @Override 091 public Configuration getConf() { 092 return util.getConfiguration(); 093 } 094 095 @Override 096 public void setConf(Configuration conf) { 097 throw new IllegalArgumentException("setConf not supported"); 098 } 099 100 @BeforeAll 101 public static void provisionCluster() throws Exception { 102 conf = util.getConfiguration(); 103 conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); 104 conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); 105 util.startMiniCluster(); 106 } 107 108 @AfterAll 109 public static void releaseCluster() throws Exception { 110 util.shutdownMiniCluster(); 111 } 112 113 @Test 114 public void testMROnTable(TestInfo testInfo) throws Exception { 115 final TableName tableName = 116 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 117 118 // Prepare the arguments required for the test. 119 String[] args = new String[] { 120 "-D" + ImportTsv.MAPPER_CONF_KEY 121 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 122 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 123 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 124 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; 125 util.createTable(tableName, FAMILY); 126 doMROnTableTest(util, FAMILY, data, args, 1, true); 127 util.deleteTable(tableName); 128 } 129 130 @Test 131 public void testMROnTableWithInvalidOperationAttr(TestInfo testInfo) throws Exception { 132 final TableName tableName = 133 TableName.valueOf(testInfo.getTestMethod().get().getName() + util.getRandomUUID()); 134 135 // Prepare the arguments required for the test. 136 String[] args = new String[] { 137 "-D" + ImportTsv.MAPPER_CONF_KEY 138 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 139 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 140 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 141 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; 142 util.createTable(tableName, FAMILY); 143 doMROnTableTest(util, FAMILY, data, args, 1, false); 144 util.deleteTable(tableName); 145 } 146 147 /** 148 * Run an ImportTsv job and perform basic validation on the results. Returns the ImportTsv 149 * <code>Tool</code> instance so that other tests can inspect it for further validation as 150 * necessary. This method is static to insure non-reliance on instance's util/conf facilities. Any 151 * arguments to pass BEFORE inputFile path is appended. 152 * @return The Tool instance used to run the test. 153 */ 154 private Tool doMROnTableTest(HBaseTestingUtil util, String family, String data, String[] args, 155 int valueMultiplier, boolean dataAvailable) throws Exception { 156 String table = args[args.length - 1]; 157 Configuration conf = new Configuration(util.getConfiguration()); 158 159 // populate input file 160 FileSystem fs = FileSystem.get(conf); 161 Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); 162 FSDataOutputStream op = fs.create(inputPath, true); 163 op.write(Bytes.toBytes(data)); 164 op.close(); 165 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 166 167 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 168 LOG.debug("Forcing combiner."); 169 conf.setInt("mapreduce.map.combine.minspills", 1); 170 } 171 172 // run the import 173 List<String> argv = new ArrayList<>(Arrays.asList(args)); 174 argv.add(inputPath.toString()); 175 Tool tool = new ImportTsv(); 176 LOG.debug("Running ImportTsv with arguments: " + argv); 177 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 178 179 validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); 180 181 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 182 LOG.debug("Deleting test subdirectory"); 183 util.cleanupDataTestDirOnTestFS(table); 184 } 185 return tool; 186 } 187 188 /** 189 * Confirm ImportTsv via data in online table. 190 */ 191 private static void validateTable(Configuration conf, TableName tableName, String family, 192 int valueMultiplier, boolean dataAvailable) throws IOException { 193 194 LOG.debug("Validating table."); 195 Connection connection = ConnectionFactory.createConnection(conf); 196 Table table = connection.getTable(tableName); 197 boolean verified = false; 198 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 199 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 200 for (int i = 0; i < numRetries; i++) { 201 try { 202 Scan scan = new Scan(); 203 // Scan entire family. 204 scan.addFamily(Bytes.toBytes(family)); 205 if (dataAvailable) { 206 ResultScanner resScanner = table.getScanner(scan); 207 for (Result res : resScanner) { 208 LOG.debug("Getting results " + res.size()); 209 assertTrue(res.size() == 2); 210 List<Cell> kvs = res.listCells(); 211 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 212 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 213 assertTrue( 214 CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 215 assertTrue( 216 CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 217 // Only one result set is expected, so let it loop. 218 verified = true; 219 } 220 } else { 221 ResultScanner resScanner = table.getScanner(scan); 222 Result[] next = resScanner.next(2); 223 assertEquals(0, next.length); 224 verified = true; 225 } 226 227 break; 228 } catch (NullPointerException e) { 229 // If here, a cell was empty. Presume its because updates came in 230 // after the scanner had been opened. Wait a while and retry. 231 } 232 try { 233 Thread.sleep(pause); 234 } catch (InterruptedException e) { 235 // continue 236 } 237 } 238 table.close(); 239 connection.close(); 240 assertTrue(verified); 241 } 242 243 public static class OperationAttributesTestController 244 implements RegionCoprocessor, RegionObserver { 245 246 @Override 247 public Optional<RegionObserver> getRegionObserver() { 248 return Optional.of(this); 249 } 250 251 @Override 252 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> e, Put put, 253 WALEdit edit, Durability durability) throws IOException { 254 Region region = e.getEnvironment().getRegion(); 255 if ( 256 !region.getRegionInfo().isMetaRegion() && !region.getRegionInfo().getTable().isSystemTable() 257 ) { 258 if (put.getAttribute(TEST_ATR_KEY) != null) { 259 LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); 260 } else { 261 e.bypass(); 262 } 263 } 264 } 265 } 266}