001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import java.util.UUID; 029import org.apache.hadoop.conf.Configurable; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Durability; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.coprocessor.ObserverContext; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 051import org.apache.hadoop.hbase.coprocessor.RegionObserver; 052import org.apache.hadoop.hbase.regionserver.Region; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.testclassification.MapReduceTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.util.Tool; 058import org.apache.hadoop.util.ToolRunner; 059import org.junit.AfterClass; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.TestName; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@Category({MapReduceTests.class, LargeTests.class}) 070public class TestImportTSVWithOperationAttributes implements Configurable { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestImportTSVWithOperationAttributes.class); 075 076 private static final Logger LOG = 077 LoggerFactory.getLogger(TestImportTSVWithOperationAttributes.class); 078 protected static final String NAME = TestImportTsv.class.getSimpleName(); 079 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 080 081 /** 082 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 083 * false. 084 */ 085 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 086 087 /** 088 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 089 */ 090 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 091 092 private static Configuration conf; 093 094 private static final String TEST_ATR_KEY = "test"; 095 096 private final String FAMILY = "FAM"; 097 098 @Rule 099 public TestName name = new TestName(); 100 101 @Override 102 public Configuration getConf() { 103 return util.getConfiguration(); 104 } 105 106 @Override 107 public void setConf(Configuration conf) { 108 throw new IllegalArgumentException("setConf not supported"); 109 } 110 111 @BeforeClass 112 public static void provisionCluster() throws Exception { 113 conf = util.getConfiguration(); 114 conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); 115 conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); 116 util.startMiniCluster(); 117 } 118 119 @AfterClass 120 public static void releaseCluster() throws Exception { 121 util.shutdownMiniCluster(); 122 } 123 124 @Test 125 public void testMROnTable() throws Exception { 126 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 127 128 // Prepare the arguments required for the test. 129 String[] args = new String[] { 130 "-D" + ImportTsv.MAPPER_CONF_KEY 131 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 132 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 133 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 134 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; 135 util.createTable(tableName, FAMILY); 136 doMROnTableTest(util, FAMILY, data, args, 1, true); 137 util.deleteTable(tableName); 138 } 139 140 @Test 141 public void testMROnTableWithInvalidOperationAttr() throws Exception { 142 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 143 144 // Prepare the arguments required for the test. 145 String[] args = new String[] { 146 "-D" + ImportTsv.MAPPER_CONF_KEY 147 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", 148 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", 149 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 150 String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; 151 util.createTable(tableName, FAMILY); 152 doMROnTableTest(util, FAMILY, data, args, 1, false); 153 util.deleteTable(tableName); 154 } 155 156 /** 157 * Run an ImportTsv job and perform basic validation on the results. Returns 158 * the ImportTsv <code>Tool</code> instance so that other tests can inspect it 159 * for further validation as necessary. This method is static to insure 160 * non-reliance on instance's util/conf facilities. 161 * 162 * @param args 163 * Any arguments to pass BEFORE inputFile path is appended. 164 * @param dataAvailable 165 * @return The Tool instance used to run the test. 166 */ 167 private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, 168 int valueMultiplier, boolean dataAvailable) throws Exception { 169 String table = args[args.length - 1]; 170 Configuration conf = new Configuration(util.getConfiguration()); 171 172 // populate input file 173 FileSystem fs = FileSystem.get(conf); 174 Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); 175 FSDataOutputStream op = fs.create(inputPath, true); 176 op.write(Bytes.toBytes(data)); 177 op.close(); 178 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 179 180 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 181 LOG.debug("Forcing combiner."); 182 conf.setInt("mapreduce.map.combine.minspills", 1); 183 } 184 185 // run the import 186 List<String> argv = new ArrayList<>(Arrays.asList(args)); 187 argv.add(inputPath.toString()); 188 Tool tool = new ImportTsv(); 189 LOG.debug("Running ImportTsv with arguments: " + argv); 190 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 191 192 validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); 193 194 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 195 LOG.debug("Deleting test subdirectory"); 196 util.cleanupDataTestDirOnTestFS(table); 197 } 198 return tool; 199 } 200 201 /** 202 * Confirm ImportTsv via data in online table. 203 * 204 * @param dataAvailable 205 */ 206 private static void validateTable(Configuration conf, TableName tableName, String family, 207 int valueMultiplier, boolean dataAvailable) throws IOException { 208 209 LOG.debug("Validating table."); 210 Connection connection = ConnectionFactory.createConnection(conf); 211 Table table = connection.getTable(tableName); 212 boolean verified = false; 213 long pause = conf.getLong("hbase.client.pause", 5 * 1000); 214 int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 215 for (int i = 0; i < numRetries; i++) { 216 try { 217 Scan scan = new Scan(); 218 // Scan entire family. 219 scan.addFamily(Bytes.toBytes(family)); 220 if (dataAvailable) { 221 ResultScanner resScanner = table.getScanner(scan); 222 for (Result res : resScanner) { 223 LOG.debug("Getting results " + res.size()); 224 assertTrue(res.size() == 2); 225 List<Cell> kvs = res.listCells(); 226 assertTrue(CellUtil.matchingRows(kvs.get(0), Bytes.toBytes("KEY"))); 227 assertTrue(CellUtil.matchingRows(kvs.get(1), Bytes.toBytes("KEY"))); 228 assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); 229 assertTrue(CellUtil.matchingValue(kvs.get(1), 230 Bytes.toBytes("VALUE" + 2 * valueMultiplier))); 231 // Only one result set is expected, so let it loop. 232 verified = true; 233 } 234 } else { 235 ResultScanner resScanner = table.getScanner(scan); 236 Result[] next = resScanner.next(2); 237 assertEquals(0, next.length); 238 verified = true; 239 } 240 241 break; 242 } catch (NullPointerException e) { 243 // If here, a cell was empty. Presume its because updates came in 244 // after the scanner had been opened. Wait a while and retry. 245 } 246 try { 247 Thread.sleep(pause); 248 } catch (InterruptedException e) { 249 // continue 250 } 251 } 252 table.close(); 253 connection.close(); 254 assertTrue(verified); 255 } 256 257 public static class OperationAttributesTestController 258 implements RegionCoprocessor, RegionObserver { 259 260 @Override 261 public Optional<RegionObserver> getRegionObserver() { 262 return Optional.of(this); 263 } 264 265 @Override 266 public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, 267 Durability durability) throws IOException { 268 Region region = e.getEnvironment().getRegion(); 269 if (!region.getRegionInfo().isMetaRegion() 270 && !region.getRegionInfo().getTable().isSystemTable()) { 271 if (put.getAttribute(TEST_ATR_KEY) != null) { 272 LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); 273 } else { 274 e.bypass(); 275 } 276 } 277 } 278 } 279}