001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.NavigableMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.io.NullWritable; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.apache.hadoop.mapreduce.TaskCounter; 045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce 054 * job to see if that is handed over and done properly too. 055 */ 056public abstract class TestTableInputFormatScanBase { 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); 059 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 061 static final TableName TABLE_NAME = TableName.valueOf("scantest"); 062 static final byte[][] INPUT_FAMILYS = { Bytes.toBytes("content1"), Bytes.toBytes("content2") }; 063 static final String KEY_STARTROW = "startRow"; 064 static final String KEY_LASTROW = "stpRow"; 065 066 private static Table table = null; 067 068 @BeforeAll 069 public static void setUpBeforeClass() throws Exception { 070 // start mini hbase cluster 071 TEST_UTIL.startMiniCluster(3); 072 // create and fill table 073 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); 074 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); 075 } 076 077 @AfterAll 078 public static void tearDownAfterClass() throws Exception { 079 TEST_UTIL.shutdownMiniCluster(); 080 } 081 082 /** 083 * Pass the key and value to reduce. 084 */ 085 public static class ScanMapper 086 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 087 088 /** 089 * Pass the key and value to reduce. 090 * @param key The key, here "aaa", "aab" etc. 091 * @param value The value is the same as the key. 092 * @param context The task context. 093 * @throws IOException When reading the rows fails. 094 */ 095 @Override 096 public void map(ImmutableBytesWritable key, Result value, Context context) 097 throws IOException, InterruptedException { 098 if (value.size() != 2) { 099 throw new IOException("There should be two input columns"); 100 } 101 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cfMap = value.getMap(); 102 103 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { 104 throw new IOException("Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILYS[0]) 105 + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); 106 } 107 108 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); 109 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); 110 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> (" + val0 + ", " 111 + val1 + ")"); 112 context.write(key, key); 113 } 114 } 115 116 /** 117 * Checks the last and first key seen against the scanner boundaries. 118 */ 119 public static class ScanReducer 120 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> { 121 122 private String first = null; 123 private String last = null; 124 125 protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, 126 Context context) throws IOException, InterruptedException { 127 int count = 0; 128 for (ImmutableBytesWritable value : values) { 129 String val = Bytes.toStringBinary(value.get()); 130 LOG.info( 131 "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 132 if (first == null) first = val; 133 last = val; 134 count++; 135 } 136 } 137 138 protected void cleanup(Context context) throws IOException, InterruptedException { 139 Configuration c = context.getConfiguration(); 140 String startRow = c.get(KEY_STARTROW); 141 String lastRow = c.get(KEY_LASTROW); 142 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 143 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 144 if (startRow != null && startRow.length() > 0) { 145 assertEquals(startRow, first); 146 } 147 if (lastRow != null && lastRow.length() > 0) { 148 assertEquals(lastRow, last); 149 } 150 } 151 152 } 153 154 /** 155 * Tests an MR Scan initialized from properties set in the Configuration. 156 */ 157 protected void testScanFromConfiguration(String start, String stop, String last) 158 throws IOException, InterruptedException, ClassNotFoundException { 159 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") 160 + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 161 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 162 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); 163 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, 164 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); 165 c.set(KEY_STARTROW, start != null ? start : ""); 166 c.set(KEY_LASTROW, last != null ? last : ""); 167 168 if (start != null) { 169 c.set(TableInputFormat.SCAN_ROW_START, start); 170 } 171 172 if (stop != null) { 173 c.set(TableInputFormat.SCAN_ROW_STOP, stop); 174 } 175 176 Job job = Job.getInstance(c, jobName); 177 job.setMapperClass(ScanMapper.class); 178 job.setReducerClass(ScanReducer.class); 179 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 180 job.setMapOutputValueClass(ImmutableBytesWritable.class); 181 job.setInputFormatClass(TableInputFormat.class); 182 job.setNumReduceTasks(1); 183 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 184 TableMapReduceUtil.addDependencyJars(job); 185 assertTrue(job.waitForCompletion(true)); 186 } 187 188 /** 189 * Tests a MR scan using specific start and stop rows. 190 */ 191 protected void testScan(String start, String stop, String last) 192 throws IOException, InterruptedException, ClassNotFoundException { 193 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" 194 + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 195 LOG.info("Before map/reduce startup - job " + jobName); 196 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 197 Scan scan = new Scan(); 198 scan.addFamily(INPUT_FAMILYS[0]); 199 scan.addFamily(INPUT_FAMILYS[1]); 200 if (start != null) { 201 scan.withStartRow(Bytes.toBytes(start)); 202 } 203 c.set(KEY_STARTROW, start != null ? start : ""); 204 if (stop != null) { 205 scan.withStopRow(Bytes.toBytes(stop)); 206 } 207 c.set(KEY_LASTROW, last != null ? last : ""); 208 LOG.info("scan before: " + scan); 209 Job job = Job.getInstance(c, jobName); 210 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, 211 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 212 job.setReducerClass(ScanReducer.class); 213 job.setNumReduceTasks(1); // one to get final "first" and "last" key 214 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 215 LOG.info("Started " + job.getJobName()); 216 assertTrue(job.waitForCompletion(true)); 217 LOG.info("After map/reduce completion - job " + jobName); 218 } 219 220 /** 221 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX 222 * This test does not run MR job 223 */ 224 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) 225 throws IOException, InterruptedException, ClassNotFoundException { 226 String jobName = "TestJobForNumOfSplits"; 227 LOG.info("Before map/reduce startup - job " + jobName); 228 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 229 Scan scan = new Scan(); 230 scan.addFamily(INPUT_FAMILYS[0]); 231 scan.addFamily(INPUT_FAMILYS[1]); 232 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 233 c.set(KEY_STARTROW, ""); 234 c.set(KEY_LASTROW, ""); 235 Job job = Job.getInstance(c, jobName); 236 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 237 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 238 TableInputFormat tif = new TableInputFormat(); 239 tif.setConf(job.getConfiguration()); 240 assertEquals(TABLE_NAME, table.getName()); 241 List<InputSplit> splits = tif.getSplits(job); 242 for (InputSplit split : splits) { 243 TableSplit tableSplit = (TableSplit) split; 244 // In table input format, we do no store the scanner at the split level 245 // because we use the scan object from the map-reduce job conf itself. 246 assertTrue(tableSplit.getScanAsString().isEmpty()); 247 } 248 assertEquals(expectedNumOfSplits, splits.size()); 249 } 250 251 /** 252 * Run MR job to check the number of mapper = expectedNumOfSplits 253 */ 254 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) 255 throws IOException, InterruptedException, ClassNotFoundException { 256 String jobName = "TestJobForNumOfSplits-MR"; 257 LOG.info("Before map/reduce startup - job " + jobName); 258 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); 259 Scan scan = new Scan(); 260 scan.addFamily(INPUT_FAMILYS[0]); 261 scan.addFamily(INPUT_FAMILYS[1]); 262 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 263 c.set(KEY_STARTROW, ""); 264 c.set(KEY_LASTROW, ""); 265 Job job = Job.getInstance(c, jobName); 266 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 267 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 268 job.setReducerClass(ScanReducer.class); 269 job.setNumReduceTasks(1); 270 job.setOutputFormatClass(NullOutputFormat.class); 271 assertTrue(job.waitForCompletion(true), "job failed!"); 272 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, 273 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps 274 assertEquals(expectedNumOfSplits, 275 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue(), 276 "Saw the wrong count of mappers per region"); 277 } 278 279 /** 280 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR 281 * job 282 */ 283 protected void testAutobalanceNumOfSplit() throws IOException { 284 // set up splits for testing 285 List<InputSplit> splits = new ArrayList<>(5); 286 int[] regionLen = { 10, 20, 20, 40, 60 }; 287 for (int i = 0; i < 5; i++) { 288 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), 289 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); 290 splits.add(split); 291 } 292 TableInputFormat tif = new TableInputFormat(); 293 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); 294 295 assertEquals(5, res.size(), "Saw the wrong number of splits"); 296 TableSplit ts1 = (TableSplit) res.get(0); 297 assertEquals(2, Bytes.toInt(ts1.getEndRow()), "The first split end key should be"); 298 TableSplit ts2 = (TableSplit) res.get(1); 299 assertEquals(20 * 1048576, ts2.getLength(), "The second split regionsize should be"); 300 TableSplit ts3 = (TableSplit) res.get(2); 301 assertEquals(3, Bytes.toInt(ts3.getStartRow()), "The third split start key should be"); 302 TableSplit ts4 = (TableSplit) res.get(4); 303 assertNotEquals(4, Bytes.toInt(ts4.getStartRow()), "The seventh split start key should not be"); 304 } 305}