001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.NavigableMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.io.NullWritable; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapreduce.InputSplit; 043import org.apache.hadoop.mapreduce.Job; 044import org.apache.hadoop.mapreduce.Reducer; 045import org.apache.hadoop.mapreduce.TaskCounter; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 048import org.junit.AfterClass; 049import org.junit.Assert; 050import org.junit.BeforeClass; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054 055/** 056 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce 057 * job to see if that is handed over and done properly too. 058 */ 059public abstract class TestTableInputFormatScanBase { 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); 062 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 064 static final TableName TABLE_NAME = TableName.valueOf("scantest"); 065 static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; 066 static final String KEY_STARTROW = "startRow"; 067 static final String KEY_LASTROW = "stpRow"; 068 069 private static Table table = null; 070 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. 074 // this turns it off for this test. TODO: Figure out why scr breaks recovery. 075 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 076 077 // switch TIF to log at DEBUG level 078 TEST_UTIL.enableDebug(TableInputFormat.class); 079 TEST_UTIL.enableDebug(TableInputFormatBase.class); 080 // start mini hbase cluster 081 TEST_UTIL.startMiniCluster(3); 082 // create and fill table 083 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); 084 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); 085 } 086 087 @AfterClass 088 public static void tearDownAfterClass() throws Exception { 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 /** 093 * Pass the key and value to reduce. 094 */ 095 public static class ScanMapper 096 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 097 098 /** 099 * Pass the key and value to reduce. 100 * 101 * @param key The key, here "aaa", "aab" etc. 102 * @param value The value is the same as the key. 103 * @param context The task context. 104 * @throws IOException When reading the rows fails. 105 */ 106 @Override 107 public void map(ImmutableBytesWritable key, Result value, 108 Context context) 109 throws IOException, InterruptedException { 110 if (value.size() != 2) { 111 throw new IOException("There should be two input columns"); 112 } 113 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 114 cfMap = value.getMap(); 115 116 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { 117 throw new IOException("Wrong input columns. Missing: '" + 118 Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); 119 } 120 121 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); 122 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); 123 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + 124 ", value -> (" + val0 + ", " + val1 + ")"); 125 context.write(key, key); 126 } 127 } 128 129 /** 130 * Checks the last and first key seen against the scanner boundaries. 131 */ 132 public static class ScanReducer 133 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 134 NullWritable, NullWritable> { 135 136 private String first = null; 137 private String last = null; 138 139 protected void reduce(ImmutableBytesWritable key, 140 Iterable<ImmutableBytesWritable> values, Context context) 141 throws IOException ,InterruptedException { 142 int count = 0; 143 for (ImmutableBytesWritable value : values) { 144 String val = Bytes.toStringBinary(value.get()); 145 LOG.info("reduce: key[" + count + "] -> " + 146 Bytes.toStringBinary(key.get()) + ", value -> " + val); 147 if (first == null) first = val; 148 last = val; 149 count++; 150 } 151 } 152 153 protected void cleanup(Context context) 154 throws IOException, InterruptedException { 155 Configuration c = context.getConfiguration(); 156 String startRow = c.get(KEY_STARTROW); 157 String lastRow = c.get(KEY_LASTROW); 158 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 159 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 160 if (startRow != null && startRow.length() > 0) { 161 assertEquals(startRow, first); 162 } 163 if (lastRow != null && lastRow.length() > 0) { 164 assertEquals(lastRow, last); 165 } 166 } 167 168 } 169 170 /** 171 * Tests an MR Scan initialized from properties set in the Configuration. 172 */ 173 protected void testScanFromConfiguration(String start, String stop, String last) 174 throws IOException, InterruptedException, ClassNotFoundException { 175 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + 176 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 177 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 178 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); 179 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, 180 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); 181 c.set(KEY_STARTROW, start != null ? start : ""); 182 c.set(KEY_LASTROW, last != null ? last : ""); 183 184 if (start != null) { 185 c.set(TableInputFormat.SCAN_ROW_START, start); 186 } 187 188 if (stop != null) { 189 c.set(TableInputFormat.SCAN_ROW_STOP, stop); 190 } 191 192 Job job = Job.getInstance(c, jobName); 193 job.setMapperClass(ScanMapper.class); 194 job.setReducerClass(ScanReducer.class); 195 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 196 job.setMapOutputValueClass(ImmutableBytesWritable.class); 197 job.setInputFormatClass(TableInputFormat.class); 198 job.setNumReduceTasks(1); 199 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 200 TableMapReduceUtil.addDependencyJars(job); 201 assertTrue(job.waitForCompletion(true)); 202 } 203 204 /** 205 * Tests a MR scan using specific start and stop rows. 206 */ 207 protected void testScan(String start, String stop, String last) 208 throws IOException, InterruptedException, ClassNotFoundException { 209 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + 210 (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 211 LOG.info("Before map/reduce startup - job " + jobName); 212 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 213 Scan scan = new Scan(); 214 scan.addFamily(INPUT_FAMILYS[0]); 215 scan.addFamily(INPUT_FAMILYS[1]); 216 if (start != null) { 217 scan.withStartRow(Bytes.toBytes(start)); 218 } 219 c.set(KEY_STARTROW, start != null ? start : ""); 220 if (stop != null) { 221 scan.withStopRow(Bytes.toBytes(stop)); 222 } 223 c.set(KEY_LASTROW, last != null ? last : ""); 224 LOG.info("scan before: " + scan); 225 Job job = Job.getInstance(c, jobName); 226 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, 227 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 228 job.setReducerClass(ScanReducer.class); 229 job.setNumReduceTasks(1); // one to get final "first" and "last" key 230 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 231 LOG.info("Started " + job.getJobName()); 232 assertTrue(job.waitForCompletion(true)); 233 LOG.info("After map/reduce completion - job " + jobName); 234 } 235 236 237 /** 238 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX 239 * This test does not run MR job 240 */ 241 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) 242 throws IOException, InterruptedException, ClassNotFoundException { 243 String jobName = "TestJobForNumOfSplits"; 244 LOG.info("Before map/reduce startup - job " + jobName); 245 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 246 Scan scan = new Scan(); 247 scan.addFamily(INPUT_FAMILYS[0]); 248 scan.addFamily(INPUT_FAMILYS[1]); 249 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 250 c.set(KEY_STARTROW, ""); 251 c.set(KEY_LASTROW, ""); 252 Job job = Job.getInstance(c, jobName); 253 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 254 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 255 TableInputFormat tif = new TableInputFormat(); 256 tif.setConf(job.getConfiguration()); 257 Assert.assertEquals(TABLE_NAME, table.getName()); 258 List<InputSplit> splits = tif.getSplits(job); 259 Assert.assertEquals(expectedNumOfSplits, splits.size()); 260 } 261 262 /** 263 * Run MR job to check the number of mapper = expectedNumOfSplits 264 */ 265 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) 266 throws IOException, InterruptedException, ClassNotFoundException { 267 String jobName = "TestJobForNumOfSplits-MR"; 268 LOG.info("Before map/reduce startup - job " + jobName); 269 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); 270 Scan scan = new Scan(); 271 scan.addFamily(INPUT_FAMILYS[0]); 272 scan.addFamily(INPUT_FAMILYS[1]); 273 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 274 c.set(KEY_STARTROW, ""); 275 c.set(KEY_LASTROW, ""); 276 Job job = Job.getInstance(c, jobName); 277 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 278 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 279 job.setReducerClass(ScanReducer.class); 280 job.setNumReduceTasks(1); 281 job.setOutputFormatClass(NullOutputFormat.class); 282 assertTrue("job failed!", job.waitForCompletion(true)); 283 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, 284 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps 285 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, 286 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); 287 } 288 289 /** 290 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR 291 * job 292 */ 293 protected void testAutobalanceNumOfSplit() throws IOException { 294 // set up splits for testing 295 List<InputSplit> splits = new ArrayList<>(5); 296 int[] regionLen = { 10, 20, 20, 40, 60 }; 297 for (int i = 0; i < 5; i++) { 298 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), 299 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); 300 splits.add(split); 301 } 302 TableInputFormat tif = new TableInputFormat(); 303 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); 304 305 assertEquals("Saw the wrong number of splits", 5, res.size()); 306 TableSplit ts1 = (TableSplit) res.get(0); 307 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); 308 TableSplit ts2 = (TableSplit) res.get(1); 309 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); 310 TableSplit ts3 = (TableSplit) res.get(2); 311 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); 312 TableSplit ts4 = (TableSplit) res.get(4); 313 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); 314 } 315} 316