001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.NavigableMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Reducer;
044import org.apache.hadoop.mapreduce.TaskCounter;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
047import org.junit.AfterClass;
048import org.junit.Assert;
049import org.junit.BeforeClass;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce
055 * job to see if that is handed over and done properly too.
056 */
057public abstract class TestTableInputFormatScanBase {
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class);
060  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061
062  static final TableName TABLE_NAME = TableName.valueOf("scantest");
063  static final byte[][] INPUT_FAMILYS = { Bytes.toBytes("content1"), Bytes.toBytes("content2") };
064  static final String KEY_STARTROW = "startRow";
065  static final String KEY_LASTROW = "stpRow";
066
067  private static Table table = null;
068
069  @BeforeClass
070  public static void setUpBeforeClass() throws Exception {
071    // start mini hbase cluster
072    TEST_UTIL.startMiniCluster(3);
073    // create and fill table
074    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
075    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
076  }
077
078  @AfterClass
079  public static void tearDownAfterClass() throws Exception {
080    TEST_UTIL.shutdownMiniCluster();
081  }
082
083  /**
084   * Pass the key and value to reduce.
085   */
086  public static class ScanMapper
087    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
088
089    /**
090     * Pass the key and value to reduce.
091     * @param key     The key, here "aaa", "aab" etc.
092     * @param value   The value is the same as the key.
093     * @param context The task context.
094     * @throws IOException When reading the rows fails.
095     */
096    @Override
097    public void map(ImmutableBytesWritable key, Result value, Context context)
098      throws IOException, InterruptedException {
099      if (value.size() != 2) {
100        throw new IOException("There should be two input columns");
101      }
102      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cfMap = value.getMap();
103
104      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
105        throw new IOException("Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILYS[0])
106          + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
107      }
108
109      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
110      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
111      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> (" + val0 + ", "
112        + val1 + ")");
113      context.write(key, key);
114    }
115  }
116
117  /**
118   * Checks the last and first key seen against the scanner boundaries.
119   */
120  public static class ScanReducer
121    extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> {
122
123    private String first = null;
124    private String last = null;
125
126    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values,
127      Context context) throws IOException, InterruptedException {
128      int count = 0;
129      for (ImmutableBytesWritable value : values) {
130        String val = Bytes.toStringBinary(value.get());
131        LOG.info(
132          "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
133        if (first == null) first = val;
134        last = val;
135        count++;
136      }
137    }
138
139    protected void cleanup(Context context) throws IOException, InterruptedException {
140      Configuration c = context.getConfiguration();
141      String startRow = c.get(KEY_STARTROW);
142      String lastRow = c.get(KEY_LASTROW);
143      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
144      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
145      if (startRow != null && startRow.length() > 0) {
146        assertEquals(startRow, first);
147      }
148      if (lastRow != null && lastRow.length() > 0) {
149        assertEquals(lastRow, last);
150      }
151    }
152
153  }
154
155  /**
156   * Tests an MR Scan initialized from properties set in the Configuration.
157   */
158  protected void testScanFromConfiguration(String start, String stop, String last)
159    throws IOException, InterruptedException, ClassNotFoundException {
160    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty")
161      + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
162    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
163    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
164    c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
165      Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
166    c.set(KEY_STARTROW, start != null ? start : "");
167    c.set(KEY_LASTROW, last != null ? last : "");
168
169    if (start != null) {
170      c.set(TableInputFormat.SCAN_ROW_START, start);
171    }
172
173    if (stop != null) {
174      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
175    }
176
177    Job job = Job.getInstance(c, jobName);
178    job.setMapperClass(ScanMapper.class);
179    job.setReducerClass(ScanReducer.class);
180    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
181    job.setMapOutputValueClass(ImmutableBytesWritable.class);
182    job.setInputFormatClass(TableInputFormat.class);
183    job.setNumReduceTasks(1);
184    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
185    TableMapReduceUtil.addDependencyJars(job);
186    assertTrue(job.waitForCompletion(true));
187  }
188
189  /**
190   * Tests a MR scan using specific start and stop rows.
191   */
192  protected void testScan(String start, String stop, String last)
193    throws IOException, InterruptedException, ClassNotFoundException {
194    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To"
195      + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
196    LOG.info("Before map/reduce startup - job " + jobName);
197    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
198    Scan scan = new Scan();
199    scan.addFamily(INPUT_FAMILYS[0]);
200    scan.addFamily(INPUT_FAMILYS[1]);
201    if (start != null) {
202      scan.withStartRow(Bytes.toBytes(start));
203    }
204    c.set(KEY_STARTROW, start != null ? start : "");
205    if (stop != null) {
206      scan.withStopRow(Bytes.toBytes(stop));
207    }
208    c.set(KEY_LASTROW, last != null ? last : "");
209    LOG.info("scan before: " + scan);
210    Job job = Job.getInstance(c, jobName);
211    TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class,
212      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
213    job.setReducerClass(ScanReducer.class);
214    job.setNumReduceTasks(1); // one to get final "first" and "last" key
215    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
216    LOG.info("Started " + job.getJobName());
217    assertTrue(job.waitForCompletion(true));
218    LOG.info("After map/reduce completion - job " + jobName);
219  }
220
221  /**
222   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
223   * This test does not run MR job
224   */
225  protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
226    throws IOException, InterruptedException, ClassNotFoundException {
227    String jobName = "TestJobForNumOfSplits";
228    LOG.info("Before map/reduce startup - job " + jobName);
229    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
230    Scan scan = new Scan();
231    scan.addFamily(INPUT_FAMILYS[0]);
232    scan.addFamily(INPUT_FAMILYS[1]);
233    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
234    c.set(KEY_STARTROW, "");
235    c.set(KEY_LASTROW, "");
236    Job job = Job.getInstance(c, jobName);
237    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
238      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
239    TableInputFormat tif = new TableInputFormat();
240    tif.setConf(job.getConfiguration());
241    Assert.assertEquals(TABLE_NAME, table.getName());
242    List<InputSplit> splits = tif.getSplits(job);
243    for (InputSplit split : splits) {
244      TableSplit tableSplit = (TableSplit) split;
245      // In table input format, we do no store the scanner at the split level
246      // because we use the scan object from the map-reduce job conf itself.
247      Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
248    }
249    Assert.assertEquals(expectedNumOfSplits, splits.size());
250  }
251
252  /**
253   * Run MR job to check the number of mapper = expectedNumOfSplits
254   */
255  protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
256    throws IOException, InterruptedException, ClassNotFoundException {
257    String jobName = "TestJobForNumOfSplits-MR";
258    LOG.info("Before map/reduce startup - job " + jobName);
259    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
260    Scan scan = new Scan();
261    scan.addFamily(INPUT_FAMILYS[0]);
262    scan.addFamily(INPUT_FAMILYS[1]);
263    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
264    c.set(KEY_STARTROW, "");
265    c.set(KEY_LASTROW, "");
266    Job job = Job.getInstance(c, jobName);
267    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
268      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
269    job.setReducerClass(ScanReducer.class);
270    job.setNumReduceTasks(1);
271    job.setOutputFormatClass(NullOutputFormat.class);
272    assertTrue("job failed!", job.waitForCompletion(true));
273    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
274    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
275    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
276      job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
277  }
278
279  /**
280   * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR
281   * job
282   */
283  protected void testAutobalanceNumOfSplit() throws IOException {
284    // set up splits for testing
285    List<InputSplit> splits = new ArrayList<>(5);
286    int[] regionLen = { 10, 20, 20, 40, 60 };
287    for (int i = 0; i < 5; i++) {
288      InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i),
289        Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
290      splits.add(split);
291    }
292    TableInputFormat tif = new TableInputFormat();
293    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
294
295    assertEquals("Saw the wrong number of splits", 5, res.size());
296    TableSplit ts1 = (TableSplit) res.get(0);
297    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
298    TableSplit ts2 = (TableSplit) res.get(1);
299    assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
300    TableSplit ts3 = (TableSplit) res.get(2);
301    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
302    TableSplit ts4 = (TableSplit) res.get(4);
303    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
304  }
305}