001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Durability; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.coprocessor.ObserverContext; 038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 040import org.apache.hadoop.hbase.coprocessor.RegionObserver; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MapReduceTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.apache.hadoop.util.Tool; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058@Category({MapReduceTests.class, LargeTests.class}) 059public class TestImportTSVWithTTLs implements Configurable { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestImportTSVWithTTLs.class); 064 065 protected static final Logger LOG = LoggerFactory.getLogger(TestImportTSVWithTTLs.class); 066 protected static final String NAME = TestImportTsv.class.getSimpleName(); 067 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 068 069 /** 070 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 071 * false. 072 */ 073 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 074 075 /** 076 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 077 */ 078 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 079 080 private final String FAMILY = "FAM"; 081 private static Configuration conf; 082 083 @Rule 084 public TestName name = new TestName(); 085 086 @Override 087 public Configuration getConf() { 088 return util.getConfiguration(); 089 } 090 091 @Override 092 public void setConf(Configuration conf) { 093 throw new IllegalArgumentException("setConf not supported"); 094 } 095 096 @BeforeClass 097 public static void provisionCluster() throws Exception { 098 conf = util.getConfiguration(); 099 // We don't check persistence in HFiles in this test, but if we ever do we will 100 // need this where the default hfile version is not 3 (i.e. 0.98) 101 conf.setInt("hfile.format.version", 3); 102 conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); 103 util.startMiniCluster(); 104 } 105 106 @AfterClass 107 public static void releaseCluster() throws Exception { 108 util.shutdownMiniCluster(); 109 } 110 111 @Test 112 public void testMROnTable() throws Exception { 113 final TableName tableName = TableName.valueOf(name.getMethodName() + util.getRandomUUID()); 114 115 // Prepare the arguments required for the test. 116 String[] args = new String[] { 117 "-D" + ImportTsv.MAPPER_CONF_KEY 118 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 119 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", 120 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 121 String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; 122 util.createTable(tableName, FAMILY); 123 doMROnTableTest(util, FAMILY, data, args, 1); 124 util.deleteTable(tableName); 125 } 126 127 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 128 String[] args, int valueMultiplier) throws Exception { 129 TableName table = TableName.valueOf(args[args.length - 1]); 130 Configuration conf = new Configuration(util.getConfiguration()); 131 132 // populate input file 133 FileSystem fs = FileSystem.get(conf); 134 Path inputPath = fs.makeQualified(new Path(util 135 .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 136 FSDataOutputStream op = fs.create(inputPath, true); 137 op.write(Bytes.toBytes(data)); 138 op.close(); 139 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 140 141 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 142 LOG.debug("Forcing combiner."); 143 conf.setInt("mapreduce.map.combine.minspills", 1); 144 } 145 146 // run the import 147 List<String> argv = new ArrayList<>(Arrays.asList(args)); 148 argv.add(inputPath.toString()); 149 Tool tool = new ImportTsv(); 150 LOG.debug("Running ImportTsv with arguments: " + argv); 151 try { 152 // Job will fail if observer rejects entries without TTL 153 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 154 } finally { 155 // Clean up 156 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 157 LOG.debug("Deleting test subdirectory"); 158 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 159 } 160 } 161 162 return tool; 163 } 164 165 public static class TTLCheckingObserver implements RegionCoprocessor, RegionObserver { 166 167 @Override 168 public Optional<RegionObserver> getRegionObserver() { 169 return Optional.of(this); 170 } 171 172 @Override 173 public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, 174 Durability durability) throws IOException { 175 Region region = e.getEnvironment().getRegion(); 176 if (!region.getRegionInfo().isMetaRegion() 177 && !region.getRegionInfo().getTable().isSystemTable()) { 178 // The put carries the TTL attribute 179 if (put.getTTL() != Long.MAX_VALUE) { 180 return; 181 } 182 throw new IOException("Operation does not have TTL set"); 183 } 184 } 185 } 186}