001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.NavigableMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.io.NullWritable; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapreduce.InputSplit; 043import org.apache.hadoop.mapreduce.Job; 044import org.apache.hadoop.mapreduce.Reducer; 045import org.apache.hadoop.mapreduce.TaskCounter; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 048import org.junit.AfterClass; 049import org.junit.Assert; 050import org.junit.BeforeClass; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054 055/** 056 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce 057 * job to see if that is handed over and done properly too. 058 */ 059public abstract class TestTableInputFormatScanBase { 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); 062 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 static final TableName TABLE_NAME = TableName.valueOf("scantest"); 065 static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; 066 static final String KEY_STARTROW = "startRow"; 067 static final String KEY_LASTROW = "stpRow"; 068 069 private static Table table = null; 070 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 // start mini hbase cluster 074 TEST_UTIL.startMiniCluster(3); 075 // create and fill table 076 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); 077 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); 078 } 079 080 @AfterClass 081 public static void tearDownAfterClass() throws Exception { 082 TEST_UTIL.shutdownMiniCluster(); 083 } 084 085 /** 086 * Pass the key and value to reduce. 087 */ 088 public static class ScanMapper 089 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 090 091 /** 092 * Pass the key and value to reduce. 093 * 094 * @param key The key, here "aaa", "aab" etc. 095 * @param value The value is the same as the key. 096 * @param context The task context. 097 * @throws IOException When reading the rows fails. 098 */ 099 @Override 100 public void map(ImmutableBytesWritable key, Result value, 101 Context context) 102 throws IOException, InterruptedException { 103 if (value.size() != 2) { 104 throw new IOException("There should be two input columns"); 105 } 106 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 107 cfMap = value.getMap(); 108 109 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { 110 throw new IOException("Wrong input columns. Missing: '" + 111 Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); 112 } 113 114 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); 115 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); 116 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + 117 ", value -> (" + val0 + ", " + val1 + ")"); 118 context.write(key, key); 119 } 120 } 121 122 /** 123 * Checks the last and first key seen against the scanner boundaries. 124 */ 125 public static class ScanReducer 126 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 127 NullWritable, NullWritable> { 128 129 private String first = null; 130 private String last = null; 131 132 protected void reduce(ImmutableBytesWritable key, 133 Iterable<ImmutableBytesWritable> values, Context context) 134 throws IOException ,InterruptedException { 135 int count = 0; 136 for (ImmutableBytesWritable value : values) { 137 String val = Bytes.toStringBinary(value.get()); 138 LOG.info("reduce: key[" + count + "] -> " + 139 Bytes.toStringBinary(key.get()) + ", value -> " + val); 140 if (first == null) first = val; 141 last = val; 142 count++; 143 } 144 } 145 146 protected void cleanup(Context context) 147 throws IOException, InterruptedException { 148 Configuration c = context.getConfiguration(); 149 String startRow = c.get(KEY_STARTROW); 150 String lastRow = c.get(KEY_LASTROW); 151 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 152 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 153 if (startRow != null && startRow.length() > 0) { 154 assertEquals(startRow, first); 155 } 156 if (lastRow != null && lastRow.length() > 0) { 157 assertEquals(lastRow, last); 158 } 159 } 160 161 } 162 163 /** 164 * Tests an MR Scan initialized from properties set in the Configuration. 165 */ 166 protected void testScanFromConfiguration(String start, String stop, String last) 167 throws IOException, InterruptedException, ClassNotFoundException { 168 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + 169 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 170 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 171 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); 172 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, 173 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); 174 c.set(KEY_STARTROW, start != null ? start : ""); 175 c.set(KEY_LASTROW, last != null ? last : ""); 176 177 if (start != null) { 178 c.set(TableInputFormat.SCAN_ROW_START, start); 179 } 180 181 if (stop != null) { 182 c.set(TableInputFormat.SCAN_ROW_STOP, stop); 183 } 184 185 Job job = Job.getInstance(c, jobName); 186 job.setMapperClass(ScanMapper.class); 187 job.setReducerClass(ScanReducer.class); 188 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 189 job.setMapOutputValueClass(ImmutableBytesWritable.class); 190 job.setInputFormatClass(TableInputFormat.class); 191 job.setNumReduceTasks(1); 192 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 193 TableMapReduceUtil.addDependencyJars(job); 194 assertTrue(job.waitForCompletion(true)); 195 } 196 197 /** 198 * Tests a MR scan using specific start and stop rows. 199 */ 200 protected void testScan(String start, String stop, String last) 201 throws IOException, InterruptedException, ClassNotFoundException { 202 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + 203 (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 204 LOG.info("Before map/reduce startup - job " + jobName); 205 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 206 Scan scan = new Scan(); 207 scan.addFamily(INPUT_FAMILYS[0]); 208 scan.addFamily(INPUT_FAMILYS[1]); 209 if (start != null) { 210 scan.withStartRow(Bytes.toBytes(start)); 211 } 212 c.set(KEY_STARTROW, start != null ? start : ""); 213 if (stop != null) { 214 scan.withStopRow(Bytes.toBytes(stop)); 215 } 216 c.set(KEY_LASTROW, last != null ? last : ""); 217 LOG.info("scan before: " + scan); 218 Job job = Job.getInstance(c, jobName); 219 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, 220 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 221 job.setReducerClass(ScanReducer.class); 222 job.setNumReduceTasks(1); // one to get final "first" and "last" key 223 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 224 LOG.info("Started " + job.getJobName()); 225 assertTrue(job.waitForCompletion(true)); 226 LOG.info("After map/reduce completion - job " + jobName); 227 } 228 229 230 /** 231 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX 232 * This test does not run MR job 233 */ 234 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) 235 throws IOException, InterruptedException, ClassNotFoundException { 236 String jobName = "TestJobForNumOfSplits"; 237 LOG.info("Before map/reduce startup - job " + jobName); 238 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 239 Scan scan = new Scan(); 240 scan.addFamily(INPUT_FAMILYS[0]); 241 scan.addFamily(INPUT_FAMILYS[1]); 242 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 243 c.set(KEY_STARTROW, ""); 244 c.set(KEY_LASTROW, ""); 245 Job job = Job.getInstance(c, jobName); 246 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 247 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 248 TableInputFormat tif = new TableInputFormat(); 249 tif.setConf(job.getConfiguration()); 250 Assert.assertEquals(TABLE_NAME, table.getName()); 251 List<InputSplit> splits = tif.getSplits(job); 252 Assert.assertEquals(expectedNumOfSplits, splits.size()); 253 } 254 255 /** 256 * Run MR job to check the number of mapper = expectedNumOfSplits 257 */ 258 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) 259 throws IOException, InterruptedException, ClassNotFoundException { 260 String jobName = "TestJobForNumOfSplits-MR"; 261 LOG.info("Before map/reduce startup - job " + jobName); 262 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); 263 Scan scan = new Scan(); 264 scan.addFamily(INPUT_FAMILYS[0]); 265 scan.addFamily(INPUT_FAMILYS[1]); 266 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 267 c.set(KEY_STARTROW, ""); 268 c.set(KEY_LASTROW, ""); 269 Job job = Job.getInstance(c, jobName); 270 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 271 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 272 job.setReducerClass(ScanReducer.class); 273 job.setNumReduceTasks(1); 274 job.setOutputFormatClass(NullOutputFormat.class); 275 assertTrue("job failed!", job.waitForCompletion(true)); 276 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, 277 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps 278 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, 279 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); 280 } 281 282 /** 283 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR 284 * job 285 */ 286 protected void testAutobalanceNumOfSplit() throws IOException { 287 // set up splits for testing 288 List<InputSplit> splits = new ArrayList<>(5); 289 int[] regionLen = { 10, 20, 20, 40, 60 }; 290 for (int i = 0; i < 5; i++) { 291 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), 292 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); 293 splits.add(split); 294 } 295 TableInputFormat tif = new TableInputFormat(); 296 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); 297 298 assertEquals("Saw the wrong number of splits", 5, res.size()); 299 TableSplit ts1 = (TableSplit) res.get(0); 300 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); 301 TableSplit ts2 = (TableSplit) res.get(1); 302 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); 303 TableSplit ts3 = (TableSplit) res.get(2); 304 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); 305 TableSplit ts4 = (TableSplit) res.get(4); 306 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); 307 } 308} 309