001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Durability; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.RegionObserver; 051import org.apache.hadoop.hbase.regionserver.Region; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.MapReduceTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.util.Tool; 057import org.apache.hadoop.util.ToolRunner; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@Category({MapReduceTests.class, LargeTests.class}) 069public class TestImportTSVWithOperationAttributes implements Configurable { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestImportTSVWithOperationAttributes.class); 074 075 private static final Logger LOG = 076 LoggerFactory.getLogger(TestImportTSVWithOperationAttributes.class); 077 protected static final String NAME = TestImportTsv.class.getSimpleName(); 078 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 079 080 /** 081 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 082 * false. 083 */ 084 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 085 086 /** 087 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 088 */ 089 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 090 091 private static Configuration conf; 092 093 private static final String TEST_ATR_KEY = "test"; 094 095 private final String FAMILY = "FAM"; 096 097 @Rule 098 public TestName name = new TestName(); 099 100 @Override 101 public Configuration getConf() { 102 return util.getConfiguration(); 103 } 104 105 @Override 106 public void setConf(Configuration conf) { 107 throw new IllegalArgumentException("setConf not supported"); 108 } 109 110 @BeforeClass 111 public static void provisionCluster() throws Exception { 112 conf = util.getConfiguration(); 113 conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); 114 conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); 115 util.startMiniCluster(); 116 } 117 118 @AfterClass 119 public static void releaseCluster() throws Exception { 120 util.shutdownMiniCluster(); 121 } 122 123 @Test 124 public void testMROnTable() throws Exception { 125 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 126 127 // Prepare the arguments required for the test. 128 String[] args = new String[] { 129 "-D" + ImportTsv.MAPPER_CONF_KEY 130 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 131 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 132 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 133 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; 134 util.createTable(tableName, FAMILY); 135 doMROnTableTest(util, FAMILY, data, args, 1, true); 136 util.deleteTable(tableName); 137 } 138 139 @Test 140 public void testMROnTableWithInvalidOperationAttr() throws Exception { 141 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 142 143 // Prepare the arguments required for the test. 144 String[] args = new String[] { 145 "-D" + ImportTsv.MAPPER_CONF_KEY 146 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 147 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 148 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 149 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; 150 util.createTable(tableName, FAMILY); 151 doMROnTableTest(util, FAMILY, data, args, 1, false); 152 util.deleteTable(tableName); 153 } 154 155 /** 156 * Run an ImportTsv job and perform basic validation on the results. Returns 157 * the ImportTsv <code>Tool</code> instance so that other tests can inspect it 158 * for further validation as necessary. This method is static to insure 159 * non-reliance on instance's util/conf facilities. 160 * 161 * @param args 162 * Any arguments to pass BEFORE inputFile path is appended. 163 * @param dataAvailable 164 * @return The Tool instance used to run the test. 165 */ 166 private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, 167 int valueMultiplier, boolean dataAvailable) throws Exception { 168 String table = args[args.length - 1]; 169 Configuration conf = new Configuration(util.getConfiguration()); 170 171 // populate input file 172 FileSystem fs = FileSystem.get(conf); 173 Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); 174 FSDataOutputStream op = fs.create(inputPath, true); 175 op.write(Bytes.toBytes(data)); 176 op.close(); 177 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 178 179 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 180 LOG.debug("Forcing combiner."); 181 conf.setInt("mapreduce.map.combine.minspills", 1); 182 } 183 184 // run the import 185 List<String> argv = new ArrayList<>(Arrays.asList(args)); 186 argv.add(inputPath.toString()); 187 Tool tool = new ImportTsv(); 188 LOG.debug("Running ImportTsv with arguments: " + argv); 189 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 190 191 validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); 192 193 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 194 LOG.debug("Deleting test subdirectory"); 195 util.cleanupDataTestDirOnTestFS(table); 196 } 197 return tool; 198 } 199 200 /** 201 * Confirm ImportTsv via data in online table. 202 * 203 * @param dataAvailable 204 */ 205 private static void validateTable(Configuration conf, TableName tableName, String family, 206 int valueMultiplier, boolean dataAvailable) throws IOException { 207 208 LOG.debug("Validating table."); 209 Connection connection = ConnectionFactory.createConnection(conf); 210 Table table = connection.getTable(tableName); 211 boolean verified = false; 212 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 213 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 214 for (int i = 0; i < numRetries; i++) { 215 try { 216 Scan scan = new Scan(); 217 // Scan entire family. 218 scan.addFamily(Bytes.toBytes(family)); 219 if (dataAvailable) { 220 ResultScanner resScanner = table.getScanner(scan); 221 for (Result res : resScanner) { 222 LOG.debug("Getting results " + res.size()); 223 assertTrue(res.size() == 2); 224 List<Cell> kvs = res.listCells(); 225 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 226 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 227 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 228 assertTrue(CellUtil.matchingValue(kvs.get(1), 229 Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 230 // Only one result set is expected, so let it loop. 231 verified = true; 232 } 233 } else { 234 ResultScanner resScanner = table.getScanner(scan); 235 Result[] next = resScanner.next(2); 236 assertEquals(0, next.length); 237 verified = true; 238 } 239 240 break; 241 } catch (NullPointerException e) { 242 // If here, a cell was empty. Presume its because updates came in 243 // after the scanner had been opened. Wait a while and retry. 244 } 245 try { 246 Thread.sleep(pause); 247 } catch (InterruptedException e) { 248 // continue 249 } 250 } 251 table.close(); 252 connection.close(); 253 assertTrue(verified); 254 } 255 256 public static class OperationAttributesTestController 257 implements RegionCoprocessor, RegionObserver { 258 259 @Override 260 public Optional<RegionObserver> getRegionObserver() { 261 return Optional.of(this); 262 } 263 264 @Override 265 public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, 266 Durability durability) throws IOException { 267 Region region = e.getEnvironment().getRegion(); 268 if (!region.getRegionInfo().isMetaRegion() 269 && !region.getRegionInfo().getTable().isSystemTable()) { 270 if (put.getAttribute(TEST_ATR_KEY) != null) { 271 LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); 272 } else { 273 e.bypass(); 274 } 275 } 276 } 277 } 278}