001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.Optional; 027import java.util.UUID; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.coprocessor.ObserverContext; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 041import org.apache.hadoop.hbase.coprocessor.RegionObserver; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.MapReduceTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.util.Tool; 048import org.apache.hadoop.util.ToolRunner; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category({MapReduceTests.class, LargeTests.class}) 060public class TestImportTSVWithTTLs implements Configurable { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestImportTSVWithTTLs.class); 065 066 protected static final Logger LOG = LoggerFactory.getLogger(TestImportTSVWithTTLs.class); 067 protected static final String NAME = TestImportTsv.class.getSimpleName(); 068 protected static HBaseTestingUtility util = new HBaseTestingUtility(); 069 070 /** 071 * Delete the tmp directory after running doMROnTableTest. Boolean. Default is 072 * false. 073 */ 074 protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; 075 076 /** 077 * Force use of combiner in doMROnTableTest. Boolean. Default is true. 078 */ 079 protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; 080 081 private final String FAMILY = "FAM"; 082 private static Configuration conf; 083 084 @Rule 085 public TestName name = new TestName(); 086 087 @Override 088 public Configuration getConf() { 089 return util.getConfiguration(); 090 } 091 092 @Override 093 public void setConf(Configuration conf) { 094 throw new IllegalArgumentException("setConf not supported"); 095 } 096 097 @BeforeClass 098 public static void provisionCluster() throws Exception { 099 conf = util.getConfiguration(); 100 // We don't check persistence in HFiles in this test, but if we ever do we will 101 // need this where the default hfile version is not 3 (i.e. 0.98) 102 conf.setInt("hfile.format.version", 3); 103 conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); 104 util.startMiniCluster(); 105 } 106 107 @AfterClass 108 public static void releaseCluster() throws Exception { 109 util.shutdownMiniCluster(); 110 } 111 112 @Test 113 public void testMROnTable() throws Exception { 114 final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); 115 116 // Prepare the arguments required for the test. 117 String[] args = new String[] { 118 "-D" + ImportTsv.MAPPER_CONF_KEY 119 + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", 120 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", 121 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; 122 String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; 123 util.createTable(tableName, FAMILY); 124 doMROnTableTest(util, FAMILY, data, args, 1); 125 util.deleteTable(tableName); 126 } 127 128 protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, 129 String[] args, int valueMultiplier) throws Exception { 130 TableName table = TableName.valueOf(args[args.length - 1]); 131 Configuration conf = new Configuration(util.getConfiguration()); 132 133 // populate input file 134 FileSystem fs = FileSystem.get(conf); 135 Path inputPath = fs.makeQualified(new Path(util 136 .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); 137 FSDataOutputStream op = fs.create(inputPath, true); 138 op.write(Bytes.toBytes(data)); 139 op.close(); 140 LOG.debug(String.format("Wrote test data to file: %s", inputPath)); 141 142 if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { 143 LOG.debug("Forcing combiner."); 144 conf.setInt("mapreduce.map.combine.minspills", 1); 145 } 146 147 // run the import 148 List<String> argv = new ArrayList<>(Arrays.asList(args)); 149 argv.add(inputPath.toString()); 150 Tool tool = new ImportTsv(); 151 LOG.debug("Running ImportTsv with arguments: " + argv); 152 try { 153 // Job will fail if observer rejects entries without TTL 154 assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); 155 } finally { 156 // Clean up 157 if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { 158 LOG.debug("Deleting test subdirectory"); 159 util.cleanupDataTestDirOnTestFS(table.getNameAsString()); 160 } 161 } 162 163 return tool; 164 } 165 166 public static class TTLCheckingObserver implements RegionCoprocessor, RegionObserver { 167 168 @Override 169 public Optional<RegionObserver> getRegionObserver() { 170 return Optional.of(this); 171 } 172 173 @Override 174 public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, 175 Durability durability) throws IOException { 176 Region region = e.getEnvironment().getRegion(); 177 if (!region.getRegionInfo().isMetaRegion() 178 && !region.getRegionInfo().getTable().isSystemTable()) { 179 // The put carries the TTL attribute 180 if (put.getTTL() != Long.MAX_VALUE) { 181 return; 182 } 183 throw new IOException("Operation does not have TTL set"); 184 } 185 } 186 } 187}