001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.NavigableMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapreduce.InputSplit;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.Reducer;
045import org.apache.hadoop.mapreduce.TaskCounter;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
048import org.junit.AfterClass;
049import org.junit.Assert;
050import org.junit.BeforeClass;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054
055/**
056 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce
057 * job to see if that is handed over and done properly too.
058 */
059public abstract class TestTableInputFormatScanBase {
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class);
062  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063
064  static final TableName TABLE_NAME = TableName.valueOf("scantest");
065  static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
066  static final String KEY_STARTROW = "startRow";
067  static final String KEY_LASTROW = "stpRow";
068
069  private static Table table = null;
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
074    // this turns it off for this test.  TODO: Figure out why scr breaks recovery.
075    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
076
077    // switch TIF to log at DEBUG level
078    TEST_UTIL.enableDebug(TableInputFormat.class);
079    TEST_UTIL.enableDebug(TableInputFormatBase.class);
080    // start mini hbase cluster
081    TEST_UTIL.startMiniCluster(3);
082    // create and fill table
083    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
084    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
085  }
086
087  @AfterClass
088  public static void tearDownAfterClass() throws Exception {
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  /**
093   * Pass the key and value to reduce.
094   */
095  public static class ScanMapper
096  extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
097
098    /**
099     * Pass the key and value to reduce.
100     *
101     * @param key  The key, here "aaa", "aab" etc.
102     * @param value  The value is the same as the key.
103     * @param context  The task context.
104     * @throws IOException When reading the rows fails.
105     */
106    @Override
107    public void map(ImmutableBytesWritable key, Result value,
108      Context context)
109    throws IOException, InterruptedException {
110      if (value.size() != 2) {
111        throw new IOException("There should be two input columns");
112      }
113      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
114        cfMap = value.getMap();
115
116      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
117        throw new IOException("Wrong input columns. Missing: '" +
118          Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
119      }
120
121      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
122      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
123      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
124               ", value -> (" + val0 + ", " + val1 + ")");
125      context.write(key, key);
126    }
127  }
128
129  /**
130   * Checks the last and first key seen against the scanner boundaries.
131   */
132  public static class ScanReducer
133  extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
134                    NullWritable, NullWritable> {
135
136    private String first = null;
137    private String last = null;
138
139    protected void reduce(ImmutableBytesWritable key,
140        Iterable<ImmutableBytesWritable> values, Context context)
141    throws IOException ,InterruptedException {
142      int count = 0;
143      for (ImmutableBytesWritable value : values) {
144        String val = Bytes.toStringBinary(value.get());
145        LOG.info("reduce: key[" + count + "] -> " +
146          Bytes.toStringBinary(key.get()) + ", value -> " + val);
147        if (first == null) first = val;
148        last = val;
149        count++;
150      }
151    }
152
153    protected void cleanup(Context context)
154    throws IOException, InterruptedException {
155      Configuration c = context.getConfiguration();
156      String startRow = c.get(KEY_STARTROW);
157      String lastRow = c.get(KEY_LASTROW);
158      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
159      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
160      if (startRow != null && startRow.length() > 0) {
161        assertEquals(startRow, first);
162      }
163      if (lastRow != null && lastRow.length() > 0) {
164        assertEquals(lastRow, last);
165      }
166    }
167
168  }
169
170  /**
171   * Tests an MR Scan initialized from properties set in the Configuration.
172   */
173  protected void testScanFromConfiguration(String start, String stop, String last)
174      throws IOException, InterruptedException, ClassNotFoundException {
175    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
176      "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
177    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
178    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
179    c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
180      Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
181    c.set(KEY_STARTROW, start != null ? start : "");
182    c.set(KEY_LASTROW, last != null ? last : "");
183
184    if (start != null) {
185      c.set(TableInputFormat.SCAN_ROW_START, start);
186    }
187
188    if (stop != null) {
189      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
190    }
191
192    Job job = Job.getInstance(c, jobName);
193    job.setMapperClass(ScanMapper.class);
194    job.setReducerClass(ScanReducer.class);
195    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
196    job.setMapOutputValueClass(ImmutableBytesWritable.class);
197    job.setInputFormatClass(TableInputFormat.class);
198    job.setNumReduceTasks(1);
199    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
200    TableMapReduceUtil.addDependencyJars(job);
201    assertTrue(job.waitForCompletion(true));
202  }
203
204  /**
205   * Tests a MR scan using specific start and stop rows.
206   */
207  protected void testScan(String start, String stop, String last)
208      throws IOException, InterruptedException, ClassNotFoundException {
209    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" +
210      (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
211    LOG.info("Before map/reduce startup - job " + jobName);
212    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
213    Scan scan = new Scan();
214    scan.addFamily(INPUT_FAMILYS[0]);
215    scan.addFamily(INPUT_FAMILYS[1]);
216    if (start != null) {
217      scan.withStartRow(Bytes.toBytes(start));
218    }
219    c.set(KEY_STARTROW, start != null ? start : "");
220    if (stop != null) {
221      scan.withStopRow(Bytes.toBytes(stop));
222    }
223    c.set(KEY_LASTROW, last != null ? last : "");
224    LOG.info("scan before: " + scan);
225    Job job = Job.getInstance(c, jobName);
226    TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class,
227      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
228    job.setReducerClass(ScanReducer.class);
229    job.setNumReduceTasks(1); // one to get final "first" and "last" key
230    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
231    LOG.info("Started " + job.getJobName());
232    assertTrue(job.waitForCompletion(true));
233    LOG.info("After map/reduce completion - job " + jobName);
234  }
235
236
237  /**
238   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
239   * This test does not run MR job
240   */
241  protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
242      throws IOException, InterruptedException, ClassNotFoundException {
243    String jobName = "TestJobForNumOfSplits";
244    LOG.info("Before map/reduce startup - job " + jobName);
245    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
246    Scan scan = new Scan();
247    scan.addFamily(INPUT_FAMILYS[0]);
248    scan.addFamily(INPUT_FAMILYS[1]);
249    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
250    c.set(KEY_STARTROW, "");
251    c.set(KEY_LASTROW, "");
252    Job job = Job.getInstance(c, jobName);
253    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
254      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
255    TableInputFormat tif = new TableInputFormat();
256    tif.setConf(job.getConfiguration());
257    Assert.assertEquals(TABLE_NAME, table.getName());
258    List<InputSplit> splits = tif.getSplits(job);
259    Assert.assertEquals(expectedNumOfSplits, splits.size());
260  }
261
262  /**
263   * Run MR job to check the number of mapper = expectedNumOfSplits
264   */
265  protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
266      throws IOException, InterruptedException, ClassNotFoundException {
267    String jobName = "TestJobForNumOfSplits-MR";
268    LOG.info("Before map/reduce startup - job " + jobName);
269    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
270    Scan scan = new Scan();
271    scan.addFamily(INPUT_FAMILYS[0]);
272    scan.addFamily(INPUT_FAMILYS[1]);
273    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
274    c.set(KEY_STARTROW, "");
275    c.set(KEY_LASTROW, "");
276    Job job = Job.getInstance(c, jobName);
277    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
278      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
279    job.setReducerClass(ScanReducer.class);
280    job.setNumReduceTasks(1);
281    job.setOutputFormatClass(NullOutputFormat.class);
282    assertTrue("job failed!", job.waitForCompletion(true));
283    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
284    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
285    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
286      job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
287  }
288
289  /**
290   * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR
291   * job
292   */
293  protected void testAutobalanceNumOfSplit() throws IOException {
294    // set up splits for testing
295    List<InputSplit> splits = new ArrayList<>(5);
296    int[] regionLen = { 10, 20, 20, 40, 60 };
297    for (int i = 0; i < 5; i++) {
298      InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i),
299        Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
300      splits.add(split);
301    }
302    TableInputFormat tif = new TableInputFormat();
303    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
304
305    assertEquals("Saw the wrong number of splits", 5, res.size());
306    TableSplit ts1 = (TableSplit) res.get(0);
307    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
308    TableSplit ts2 = (TableSplit) res.get(1);
309    assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
310    TableSplit ts3 = (TableSplit) res.get(2);
311    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
312    TableSplit ts4 = (TableSplit) res.get(4);
313    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
314  }
315}
316