001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.NavigableMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.io.NullWritable; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.apache.hadoop.mapreduce.TaskCounter; 045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 047import org.junit.AfterClass; 048import org.junit.Assert; 049import org.junit.BeforeClass; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053 054/** 055 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce 056 * job to see if that is handed over and done properly too. 057 */ 058public abstract class TestTableInputFormatScanBase { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); 061 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 063 static final TableName TABLE_NAME = TableName.valueOf("scantest"); 064 static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; 065 static final String KEY_STARTROW = "startRow"; 066 static final String KEY_LASTROW = "stpRow"; 067 068 private static Table table = null; 069 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 // start mini hbase cluster 073 TEST_UTIL.startMiniCluster(3); 074 // create and fill table 075 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); 076 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); 077 } 078 079 @AfterClass 080 public static void tearDownAfterClass() throws Exception { 081 TEST_UTIL.shutdownMiniCluster(); 082 } 083 084 /** 085 * Pass the key and value to reduce. 086 */ 087 public static class ScanMapper 088 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 089 090 /** 091 * Pass the key and value to reduce. 092 * 093 * @param key The key, here "aaa", "aab" etc. 094 * @param value The value is the same as the key. 095 * @param context The task context. 096 * @throws IOException When reading the rows fails. 097 */ 098 @Override 099 public void map(ImmutableBytesWritable key, Result value, 100 Context context) 101 throws IOException, InterruptedException { 102 if (value.size() != 2) { 103 throw new IOException("There should be two input columns"); 104 } 105 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 106 cfMap = value.getMap(); 107 108 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { 109 throw new IOException("Wrong input columns. Missing: '" + 110 Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); 111 } 112 113 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); 114 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); 115 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + 116 ", value -> (" + val0 + ", " + val1 + ")"); 117 context.write(key, key); 118 } 119 } 120 121 /** 122 * Checks the last and first key seen against the scanner boundaries. 123 */ 124 public static class ScanReducer 125 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 126 NullWritable, NullWritable> { 127 128 private String first = null; 129 private String last = null; 130 131 protected void reduce(ImmutableBytesWritable key, 132 Iterable<ImmutableBytesWritable> values, Context context) 133 throws IOException ,InterruptedException { 134 int count = 0; 135 for (ImmutableBytesWritable value : values) { 136 String val = Bytes.toStringBinary(value.get()); 137 LOG.info("reduce: key[" + count + "] -> " + 138 Bytes.toStringBinary(key.get()) + ", value -> " + val); 139 if (first == null) first = val; 140 last = val; 141 count++; 142 } 143 } 144 145 protected void cleanup(Context context) 146 throws IOException, InterruptedException { 147 Configuration c = context.getConfiguration(); 148 String startRow = c.get(KEY_STARTROW); 149 String lastRow = c.get(KEY_LASTROW); 150 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 151 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 152 if (startRow != null && startRow.length() > 0) { 153 assertEquals(startRow, first); 154 } 155 if (lastRow != null && lastRow.length() > 0) { 156 assertEquals(lastRow, last); 157 } 158 } 159 160 } 161 162 /** 163 * Tests an MR Scan initialized from properties set in the Configuration. 164 */ 165 protected void testScanFromConfiguration(String start, String stop, String last) 166 throws IOException, InterruptedException, ClassNotFoundException { 167 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + 168 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 169 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 170 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); 171 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, 172 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); 173 c.set(KEY_STARTROW, start != null ? start : ""); 174 c.set(KEY_LASTROW, last != null ? last : ""); 175 176 if (start != null) { 177 c.set(TableInputFormat.SCAN_ROW_START, start); 178 } 179 180 if (stop != null) { 181 c.set(TableInputFormat.SCAN_ROW_STOP, stop); 182 } 183 184 Job job = Job.getInstance(c, jobName); 185 job.setMapperClass(ScanMapper.class); 186 job.setReducerClass(ScanReducer.class); 187 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 188 job.setMapOutputValueClass(ImmutableBytesWritable.class); 189 job.setInputFormatClass(TableInputFormat.class); 190 job.setNumReduceTasks(1); 191 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 192 TableMapReduceUtil.addDependencyJars(job); 193 assertTrue(job.waitForCompletion(true)); 194 } 195 196 /** 197 * Tests a MR scan using specific start and stop rows. 198 */ 199 protected void testScan(String start, String stop, String last) 200 throws IOException, InterruptedException, ClassNotFoundException { 201 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + 202 (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 203 LOG.info("Before map/reduce startup - job " + jobName); 204 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 205 Scan scan = new Scan(); 206 scan.addFamily(INPUT_FAMILYS[0]); 207 scan.addFamily(INPUT_FAMILYS[1]); 208 if (start != null) { 209 scan.withStartRow(Bytes.toBytes(start)); 210 } 211 c.set(KEY_STARTROW, start != null ? start : ""); 212 if (stop != null) { 213 scan.withStopRow(Bytes.toBytes(stop)); 214 } 215 c.set(KEY_LASTROW, last != null ? last : ""); 216 LOG.info("scan before: " + scan); 217 Job job = Job.getInstance(c, jobName); 218 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, 219 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 220 job.setReducerClass(ScanReducer.class); 221 job.setNumReduceTasks(1); // one to get final "first" and "last" key 222 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 223 LOG.info("Started " + job.getJobName()); 224 assertTrue(job.waitForCompletion(true)); 225 LOG.info("After map/reduce completion - job " + jobName); 226 } 227 228 229 /** 230 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX 231 * This test does not run MR job 232 */ 233 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) 234 throws IOException, InterruptedException, ClassNotFoundException { 235 String jobName = "TestJobForNumOfSplits"; 236 LOG.info("Before map/reduce startup - job " + jobName); 237 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 238 Scan scan = new Scan(); 239 scan.addFamily(INPUT_FAMILYS[0]); 240 scan.addFamily(INPUT_FAMILYS[1]); 241 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 242 c.set(KEY_STARTROW, ""); 243 c.set(KEY_LASTROW, ""); 244 Job job = Job.getInstance(c, jobName); 245 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 246 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 247 TableInputFormat tif = new TableInputFormat(); 248 tif.setConf(job.getConfiguration()); 249 Assert.assertEquals(TABLE_NAME, table.getName()); 250 List<InputSplit> splits = tif.getSplits(job); 251 for (InputSplit split : splits) { 252 TableSplit tableSplit = (TableSplit) split; 253 // In table input format, we do no store the scanner at the split level 254 // because we use the scan object from the map-reduce job conf itself. 255 Assert.assertTrue(tableSplit.getScanAsString().isEmpty()); 256 } 257 Assert.assertEquals(expectedNumOfSplits, splits.size()); 258 } 259 260 /** 261 * Run MR job to check the number of mapper = expectedNumOfSplits 262 */ 263 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) 264 throws IOException, InterruptedException, ClassNotFoundException { 265 String jobName = "TestJobForNumOfSplits-MR"; 266 LOG.info("Before map/reduce startup - job " + jobName); 267 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); 268 Scan scan = new Scan(); 269 scan.addFamily(INPUT_FAMILYS[0]); 270 scan.addFamily(INPUT_FAMILYS[1]); 271 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 272 c.set(KEY_STARTROW, ""); 273 c.set(KEY_LASTROW, ""); 274 Job job = Job.getInstance(c, jobName); 275 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 276 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 277 job.setReducerClass(ScanReducer.class); 278 job.setNumReduceTasks(1); 279 job.setOutputFormatClass(NullOutputFormat.class); 280 assertTrue("job failed!", job.waitForCompletion(true)); 281 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, 282 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps 283 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, 284 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); 285 } 286 287 /** 288 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR 289 * job 290 */ 291 protected void testAutobalanceNumOfSplit() throws IOException { 292 // set up splits for testing 293 List<InputSplit> splits = new ArrayList<>(5); 294 int[] regionLen = { 10, 20, 20, 40, 60 }; 295 for (int i = 0; i < 5; i++) { 296 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), 297 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); 298 splits.add(split); 299 } 300 TableInputFormat tif = new TableInputFormat(); 301 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); 302 303 assertEquals("Saw the wrong number of splits", 5, res.size()); 304 TableSplit ts1 = (TableSplit) res.get(0); 305 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); 306 TableSplit ts2 = (TableSplit) res.get(1); 307 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); 308 TableSplit ts3 = (TableSplit) res.get(2); 309 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); 310 TableSplit ts4 = (TableSplit) res.get(4); 311 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); 312 } 313} 314