001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.NavigableMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.io.NullWritable; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.apache.hadoop.mapreduce.TaskCounter; 045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 047import org.junit.AfterClass; 048import org.junit.Assert; 049import org.junit.BeforeClass; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce 055 * job to see if that is handed over and done properly too. 056 */ 057public abstract class TestTableInputFormatScanBase { 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); 060 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 061 062 static final TableName TABLE_NAME = TableName.valueOf("scantest"); 063 static final byte[][] INPUT_FAMILYS = { Bytes.toBytes("content1"), Bytes.toBytes("content2") }; 064 static final String KEY_STARTROW = "startRow"; 065 static final String KEY_LASTROW = "stpRow"; 066 067 private static Table table = null; 068 069 @BeforeClass 070 public static void setUpBeforeClass() throws Exception { 071 // start mini hbase cluster 072 TEST_UTIL.startMiniCluster(3); 073 // create and fill table 074 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); 075 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); 076 } 077 078 @AfterClass 079 public static void tearDownAfterClass() throws Exception { 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 083 /** 084 * Pass the key and value to reduce. 085 */ 086 public static class ScanMapper 087 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 088 089 /** 090 * Pass the key and value to reduce. 091 * @param key The key, here "aaa", "aab" etc. 092 * @param value The value is the same as the key. 093 * @param context The task context. 094 * @throws IOException When reading the rows fails. 095 */ 096 @Override 097 public void map(ImmutableBytesWritable key, Result value, Context context) 098 throws IOException, InterruptedException { 099 if (value.size() != 2) { 100 throw new IOException("There should be two input columns"); 101 } 102 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cfMap = value.getMap(); 103 104 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { 105 throw new IOException("Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILYS[0]) 106 + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); 107 } 108 109 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); 110 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); 111 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> (" + val0 + ", " 112 + val1 + ")"); 113 context.write(key, key); 114 } 115 } 116 117 /** 118 * Checks the last and first key seen against the scanner boundaries. 119 */ 120 public static class ScanReducer 121 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> { 122 123 private String first = null; 124 private String last = null; 125 126 protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, 127 Context context) throws IOException, InterruptedException { 128 int count = 0; 129 for (ImmutableBytesWritable value : values) { 130 String val = Bytes.toStringBinary(value.get()); 131 LOG.info( 132 "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 133 if (first == null) first = val; 134 last = val; 135 count++; 136 } 137 } 138 139 protected void cleanup(Context context) throws IOException, InterruptedException { 140 Configuration c = context.getConfiguration(); 141 String startRow = c.get(KEY_STARTROW); 142 String lastRow = c.get(KEY_LASTROW); 143 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 144 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 145 if (startRow != null && startRow.length() > 0) { 146 assertEquals(startRow, first); 147 } 148 if (lastRow != null && lastRow.length() > 0) { 149 assertEquals(lastRow, last); 150 } 151 } 152 153 } 154 155 /** 156 * Tests an MR Scan initialized from properties set in the Configuration. 157 */ 158 protected void testScanFromConfiguration(String start, String stop, String last) 159 throws IOException, InterruptedException, ClassNotFoundException { 160 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") 161 + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 162 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 163 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); 164 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, 165 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); 166 c.set(KEY_STARTROW, start != null ? start : ""); 167 c.set(KEY_LASTROW, last != null ? last : ""); 168 169 if (start != null) { 170 c.set(TableInputFormat.SCAN_ROW_START, start); 171 } 172 173 if (stop != null) { 174 c.set(TableInputFormat.SCAN_ROW_STOP, stop); 175 } 176 177 Job job = Job.getInstance(c, jobName); 178 job.setMapperClass(ScanMapper.class); 179 job.setReducerClass(ScanReducer.class); 180 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 181 job.setMapOutputValueClass(ImmutableBytesWritable.class); 182 job.setInputFormatClass(TableInputFormat.class); 183 job.setNumReduceTasks(1); 184 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 185 TableMapReduceUtil.addDependencyJars(job); 186 assertTrue(job.waitForCompletion(true)); 187 } 188 189 /** 190 * Tests a MR scan using specific start and stop rows. 191 */ 192 protected void testScan(String start, String stop, String last) 193 throws IOException, InterruptedException, ClassNotFoundException { 194 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" 195 + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 196 LOG.info("Before map/reduce startup - job " + jobName); 197 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 198 Scan scan = new Scan(); 199 scan.addFamily(INPUT_FAMILYS[0]); 200 scan.addFamily(INPUT_FAMILYS[1]); 201 if (start != null) { 202 scan.withStartRow(Bytes.toBytes(start)); 203 } 204 c.set(KEY_STARTROW, start != null ? start : ""); 205 if (stop != null) { 206 scan.withStopRow(Bytes.toBytes(stop)); 207 } 208 c.set(KEY_LASTROW, last != null ? last : ""); 209 LOG.info("scan before: " + scan); 210 Job job = Job.getInstance(c, jobName); 211 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, 212 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 213 job.setReducerClass(ScanReducer.class); 214 job.setNumReduceTasks(1); // one to get final "first" and "last" key 215 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 216 LOG.info("Started " + job.getJobName()); 217 assertTrue(job.waitForCompletion(true)); 218 LOG.info("After map/reduce completion - job " + jobName); 219 } 220 221 /** 222 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX 223 * This test does not run MR job 224 */ 225 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) 226 throws IOException, InterruptedException, ClassNotFoundException { 227 String jobName = "TestJobForNumOfSplits"; 228 LOG.info("Before map/reduce startup - job " + jobName); 229 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 230 Scan scan = new Scan(); 231 scan.addFamily(INPUT_FAMILYS[0]); 232 scan.addFamily(INPUT_FAMILYS[1]); 233 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 234 c.set(KEY_STARTROW, ""); 235 c.set(KEY_LASTROW, ""); 236 Job job = Job.getInstance(c, jobName); 237 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 238 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 239 TableInputFormat tif = new TableInputFormat(); 240 tif.setConf(job.getConfiguration()); 241 Assert.assertEquals(TABLE_NAME, table.getName()); 242 List<InputSplit> splits = tif.getSplits(job); 243 for (InputSplit split : splits) { 244 TableSplit tableSplit = (TableSplit) split; 245 // In table input format, we do no store the scanner at the split level 246 // because we use the scan object from the map-reduce job conf itself. 247 Assert.assertTrue(tableSplit.getScanAsString().isEmpty()); 248 } 249 Assert.assertEquals(expectedNumOfSplits, splits.size()); 250 } 251 252 /** 253 * Run MR job to check the number of mapper = expectedNumOfSplits 254 */ 255 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) 256 throws IOException, InterruptedException, ClassNotFoundException { 257 String jobName = "TestJobForNumOfSplits-MR"; 258 LOG.info("Before map/reduce startup - job " + jobName); 259 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); 260 Scan scan = new Scan(); 261 scan.addFamily(INPUT_FAMILYS[0]); 262 scan.addFamily(INPUT_FAMILYS[1]); 263 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); 264 c.set(KEY_STARTROW, ""); 265 c.set(KEY_LASTROW, ""); 266 Job job = Job.getInstance(c, jobName); 267 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, 268 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 269 job.setReducerClass(ScanReducer.class); 270 job.setNumReduceTasks(1); 271 job.setOutputFormatClass(NullOutputFormat.class); 272 assertTrue("job failed!", job.waitForCompletion(true)); 273 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, 274 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps 275 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, 276 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); 277 } 278 279 /** 280 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR 281 * job 282 */ 283 protected void testAutobalanceNumOfSplit() throws IOException { 284 // set up splits for testing 285 List<InputSplit> splits = new ArrayList<>(5); 286 int[] regionLen = { 10, 20, 20, 40, 60 }; 287 for (int i = 0; i < 5; i++) { 288 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), 289 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); 290 splits.add(split); 291 } 292 TableInputFormat tif = new TableInputFormat(); 293 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); 294 295 assertEquals("Saw the wrong number of splits", 5, res.size()); 296 TableSplit ts1 = (TableSplit) res.get(0); 297 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); 298 TableSplit ts2 = (TableSplit) res.get(1); 299 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); 300 TableSplit ts3 = (TableSplit) res.get(2); 301 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); 302 TableSplit ts4 = (TableSplit) res.get(4); 303 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); 304 } 305}