001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.Random; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.stream.Stream; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionLocator; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.io.NullWritable; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Mapper; 044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 045 046public abstract class HFileOutputFormat2TestBase { 047 048 protected static final int ROWSPERSPLIT = 1024; 049 protected static final int DEFAULT_VALUE_LENGTH = 1000; 050 051 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 052 protected static final byte[][] FAMILIES = 053 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 054 protected static final TableName[] TABLE_NAMES = Stream 055 .of("TestTable", "TestTable2", "TestTable3").map(TableName::valueOf).toArray(TableName[]::new); 056 057 protected static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 058 059 /** 060 * Simple mapper that makes KeyValue output. 061 */ 062 protected static class RandomKVGeneratingMapper 063 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 064 065 private int keyLength; 066 protected static final int KEYLEN_DEFAULT = 10; 067 protected static final String KEYLEN_CONF = "randomkv.key.length"; 068 069 private int valLength; 070 private static final int VALLEN_DEFAULT = 10; 071 private static final String VALLEN_CONF = "randomkv.val.length"; 072 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 073 private boolean multiTableMapper = false; 074 private TableName[] tables = null; 075 076 @Override 077 protected void setup(Context context) throws IOException, InterruptedException { 078 super.setup(context); 079 080 Configuration conf = context.getConfiguration(); 081 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 082 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 083 multiTableMapper = 084 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 085 if (multiTableMapper) { 086 tables = TABLE_NAMES; 087 } else { 088 tables = new TableName[] { TABLE_NAMES[0] }; 089 } 090 } 091 092 @Override 093 protected void map(NullWritable n1, NullWritable n2, 094 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 095 throws java.io.IOException, InterruptedException { 096 097 byte keyBytes[] = new byte[keyLength]; 098 byte valBytes[] = new byte[valLength]; 099 100 int taskId = context.getTaskAttemptID().getTaskID().getId(); 101 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 102 byte[] key; 103 for (int j = 0; j < tables.length; ++j) { 104 for (int i = 0; i < ROWSPERSPLIT; i++) { 105 Bytes.random(keyBytes); 106 // Ensure that unique tasks generate unique keys 107 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 108 Bytes.random(valBytes); 109 key = keyBytes; 110 if (multiTableMapper) { 111 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 112 } 113 114 for (byte[] family : FAMILIES) { 115 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 116 context.write(new ImmutableBytesWritable(key), kv); 117 } 118 } 119 } 120 } 121 } 122 123 /** 124 * Simple mapper that makes Put output. 125 */ 126 protected static class RandomPutGeneratingMapper 127 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 128 129 private int keyLength; 130 protected static final int KEYLEN_DEFAULT = 10; 131 protected static final String KEYLEN_CONF = "randomkv.key.length"; 132 133 private int valLength; 134 protected static final int VALLEN_DEFAULT = 10; 135 protected static final String VALLEN_CONF = "randomkv.val.length"; 136 protected static final byte[] QUALIFIER = Bytes.toBytes("data"); 137 private boolean multiTableMapper = false; 138 private TableName[] tables = null; 139 140 @Override 141 protected void setup(Context context) throws IOException, InterruptedException { 142 super.setup(context); 143 144 Configuration conf = context.getConfiguration(); 145 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 146 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 147 multiTableMapper = 148 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 149 if (multiTableMapper) { 150 tables = TABLE_NAMES; 151 } else { 152 tables = new TableName[] { TABLE_NAMES[0] }; 153 } 154 } 155 156 @Override 157 protected void map(NullWritable n1, NullWritable n2, 158 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 159 throws java.io.IOException, InterruptedException { 160 161 byte keyBytes[] = new byte[keyLength]; 162 byte valBytes[] = new byte[valLength]; 163 164 int taskId = context.getTaskAttemptID().getTaskID().getId(); 165 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 166 167 byte[] key; 168 for (int j = 0; j < tables.length; ++j) { 169 for (int i = 0; i < ROWSPERSPLIT; i++) { 170 Bytes.random(keyBytes); 171 // Ensure that unique tasks generate unique keys 172 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 173 Bytes.random(valBytes); 174 key = keyBytes; 175 if (multiTableMapper) { 176 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 177 } 178 179 for (byte[] family : FAMILIES) { 180 Put p = new Put(keyBytes); 181 p.addColumn(family, QUALIFIER, valBytes); 182 // set TTL to very low so that the scan does not return any value 183 p.setTTL(1l); 184 context.write(new ImmutableBytesWritable(key), p); 185 } 186 } 187 } 188 } 189 } 190 191 protected static void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 192 if (putSortReducer) { 193 job.setInputFormatClass(NMapInputFormat.class); 194 job.setMapperClass(RandomPutGeneratingMapper.class); 195 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 196 job.setMapOutputValueClass(Put.class); 197 } else { 198 job.setInputFormatClass(NMapInputFormat.class); 199 job.setMapperClass(RandomKVGeneratingMapper.class); 200 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 201 job.setMapOutputValueClass(KeyValue.class); 202 } 203 } 204 205 protected static byte[][] generateRandomStartKeys(int numKeys) { 206 Random random = ThreadLocalRandom.current(); 207 byte[][] ret = new byte[numKeys][]; 208 // first region start key is always empty 209 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 210 for (int i = 1; i < numKeys; i++) { 211 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 212 } 213 return ret; 214 } 215 216 /** 217 * This method takes some time and is done inline uploading data. For example, doing the mapfile 218 * test, generation of the key and value consumes about 30% of CPU time. 219 * @return Generated random value to insert into a table cell. 220 */ 221 protected static byte[] generateData(final Random r, int length) { 222 byte[] b = new byte[length]; 223 int i; 224 225 for (i = 0; i < (length - 8); i += 8) { 226 b[i] = (byte) (65 + r.nextInt(26)); 227 b[i + 1] = b[i]; 228 b[i + 2] = b[i]; 229 b[i + 3] = b[i]; 230 b[i + 4] = b[i]; 231 b[i + 5] = b[i]; 232 b[i + 6] = b[i]; 233 b[i + 7] = b[i]; 234 } 235 236 byte a = (byte) (65 + r.nextInt(26)); 237 for (; i < length; i++) { 238 b[i] = a; 239 } 240 return b; 241 } 242 243 protected static byte[][] generateRandomSplitKeys(int numKeys) { 244 Random random = ThreadLocalRandom.current(); 245 byte[][] ret = new byte[numKeys][]; 246 for (int i = 0; i < numKeys; i++) { 247 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 248 } 249 return ret; 250 } 251 252 protected static void runIncrementalPELoad(Configuration conf, 253 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 254 throws IOException, InterruptedException, ClassNotFoundException { 255 Job job = Job.getInstance(conf, "testLocalMRIncrementalLoad"); 256 job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad")); 257 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 258 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 259 CellSerialization.class.getName()); 260 setupRandomGeneratorMapper(job, putSortReducer); 261 if (tableInfo.size() > 1) { 262 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 263 int sum = 0; 264 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 265 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 266 } 267 assertEquals(sum, job.getNumReduceTasks()); 268 } else { 269 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 270 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(), 271 regionLocator); 272 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 273 } 274 275 FileOutputFormat.setOutputPath(job, outDir); 276 277 assertFalse(UTIL.getTestFileSystem().exists(outDir)); 278 279 assertTrue(job.waitForCompletion(true)); 280 } 281}