001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.NavigableMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Reducer;
044import org.apache.hadoop.mapreduce.TaskCounter;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
047import org.junit.AfterClass;
048import org.junit.Assert;
049import org.junit.BeforeClass;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053
054/**
055 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce
056 * job to see if that is handed over and done properly too.
057 */
058public abstract class TestTableInputFormatScanBase {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class);
061  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062
063  static final TableName TABLE_NAME = TableName.valueOf("scantest");
064  static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
065  static final String KEY_STARTROW = "startRow";
066  static final String KEY_LASTROW = "stpRow";
067
068  private static Table table = null;
069
070  @BeforeClass
071  public static void setUpBeforeClass() throws Exception {
072    // start mini hbase cluster
073    TEST_UTIL.startMiniCluster(3);
074    // create and fill table
075    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
076    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
077  }
078
079  @AfterClass
080  public static void tearDownAfterClass() throws Exception {
081    TEST_UTIL.shutdownMiniCluster();
082  }
083
084  /**
085   * Pass the key and value to reduce.
086   */
087  public static class ScanMapper
088    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
089
090    /**
091     * Pass the key and value to reduce.
092     *
093     * @param key  The key, here "aaa", "aab" etc.
094     * @param value  The value is the same as the key.
095     * @param context  The task context.
096     * @throws IOException When reading the rows fails.
097     */
098    @Override
099    public void map(ImmutableBytesWritable key, Result value,
100      Context context)
101        throws IOException, InterruptedException {
102      if (value.size() != 2) {
103        throw new IOException("There should be two input columns");
104      }
105      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
106        cfMap = value.getMap();
107
108      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
109        throw new IOException("Wrong input columns. Missing: '" +
110          Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
111      }
112
113      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
114      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
115      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
116               ", value -> (" + val0 + ", " + val1 + ")");
117      context.write(key, key);
118    }
119  }
120
121  /**
122   * Checks the last and first key seen against the scanner boundaries.
123   */
124  public static class ScanReducer
125    extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
126                    NullWritable, NullWritable> {
127
128    private String first = null;
129    private String last = null;
130
131    protected void reduce(ImmutableBytesWritable key,
132        Iterable<ImmutableBytesWritable> values, Context context)
133      throws IOException ,InterruptedException {
134      int count = 0;
135      for (ImmutableBytesWritable value : values) {
136        String val = Bytes.toStringBinary(value.get());
137        LOG.info("reduce: key[" + count + "] -> " +
138          Bytes.toStringBinary(key.get()) + ", value -> " + val);
139        if (first == null) first = val;
140        last = val;
141        count++;
142      }
143    }
144
145    protected void cleanup(Context context)
146        throws IOException, InterruptedException {
147      Configuration c = context.getConfiguration();
148      String startRow = c.get(KEY_STARTROW);
149      String lastRow = c.get(KEY_LASTROW);
150      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
151      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
152      if (startRow != null && startRow.length() > 0) {
153        assertEquals(startRow, first);
154      }
155      if (lastRow != null && lastRow.length() > 0) {
156        assertEquals(lastRow, last);
157      }
158    }
159
160  }
161
162  /**
163   * Tests an MR Scan initialized from properties set in the Configuration.
164   */
165  protected void testScanFromConfiguration(String start, String stop, String last)
166      throws IOException, InterruptedException, ClassNotFoundException {
167    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
168      "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
169    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
170    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
171    c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
172      Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
173    c.set(KEY_STARTROW, start != null ? start : "");
174    c.set(KEY_LASTROW, last != null ? last : "");
175
176    if (start != null) {
177      c.set(TableInputFormat.SCAN_ROW_START, start);
178    }
179
180    if (stop != null) {
181      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
182    }
183
184    Job job = Job.getInstance(c, jobName);
185    job.setMapperClass(ScanMapper.class);
186    job.setReducerClass(ScanReducer.class);
187    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
188    job.setMapOutputValueClass(ImmutableBytesWritable.class);
189    job.setInputFormatClass(TableInputFormat.class);
190    job.setNumReduceTasks(1);
191    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
192    TableMapReduceUtil.addDependencyJars(job);
193    assertTrue(job.waitForCompletion(true));
194  }
195
196  /**
197   * Tests a MR scan using specific start and stop rows.
198   */
199  protected void testScan(String start, String stop, String last)
200      throws IOException, InterruptedException, ClassNotFoundException {
201    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" +
202      (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
203    LOG.info("Before map/reduce startup - job " + jobName);
204    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
205    Scan scan = new Scan();
206    scan.addFamily(INPUT_FAMILYS[0]);
207    scan.addFamily(INPUT_FAMILYS[1]);
208    if (start != null) {
209      scan.withStartRow(Bytes.toBytes(start));
210    }
211    c.set(KEY_STARTROW, start != null ? start : "");
212    if (stop != null) {
213      scan.withStopRow(Bytes.toBytes(stop));
214    }
215    c.set(KEY_LASTROW, last != null ? last : "");
216    LOG.info("scan before: " + scan);
217    Job job = Job.getInstance(c, jobName);
218    TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class,
219      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
220    job.setReducerClass(ScanReducer.class);
221    job.setNumReduceTasks(1); // one to get final "first" and "last" key
222    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
223    LOG.info("Started " + job.getJobName());
224    assertTrue(job.waitForCompletion(true));
225    LOG.info("After map/reduce completion - job " + jobName);
226  }
227
228
229  /**
230   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
231   * This test does not run MR job
232   */
233  protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
234      throws IOException, InterruptedException, ClassNotFoundException {
235    String jobName = "TestJobForNumOfSplits";
236    LOG.info("Before map/reduce startup - job " + jobName);
237    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
238    Scan scan = new Scan();
239    scan.addFamily(INPUT_FAMILYS[0]);
240    scan.addFamily(INPUT_FAMILYS[1]);
241    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
242    c.set(KEY_STARTROW, "");
243    c.set(KEY_LASTROW, "");
244    Job job = Job.getInstance(c, jobName);
245    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
246      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
247    TableInputFormat tif = new TableInputFormat();
248    tif.setConf(job.getConfiguration());
249    Assert.assertEquals(TABLE_NAME, table.getName());
250    List<InputSplit> splits = tif.getSplits(job);
251    for (InputSplit split : splits) {
252      TableSplit tableSplit = (TableSplit) split;
253      // In table input format, we do no store the scanner at the split level
254      // because we use the scan object from the map-reduce job conf itself.
255      Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
256    }
257    Assert.assertEquals(expectedNumOfSplits, splits.size());
258  }
259
260  /**
261   * Run MR job to check the number of mapper = expectedNumOfSplits
262   */
263  protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
264      throws IOException, InterruptedException, ClassNotFoundException {
265    String jobName = "TestJobForNumOfSplits-MR";
266    LOG.info("Before map/reduce startup - job " + jobName);
267    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
268    Scan scan = new Scan();
269    scan.addFamily(INPUT_FAMILYS[0]);
270    scan.addFamily(INPUT_FAMILYS[1]);
271    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
272    c.set(KEY_STARTROW, "");
273    c.set(KEY_LASTROW, "");
274    Job job = Job.getInstance(c, jobName);
275    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
276      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
277    job.setReducerClass(ScanReducer.class);
278    job.setNumReduceTasks(1);
279    job.setOutputFormatClass(NullOutputFormat.class);
280    assertTrue("job failed!", job.waitForCompletion(true));
281    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
282    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
283    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
284      job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
285  }
286
287  /**
288   * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR
289   * job
290   */
291  protected void testAutobalanceNumOfSplit() throws IOException {
292    // set up splits for testing
293    List<InputSplit> splits = new ArrayList<>(5);
294    int[] regionLen = { 10, 20, 20, 40, 60 };
295    for (int i = 0; i < 5; i++) {
296      InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i),
297        Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
298      splits.add(split);
299    }
300    TableInputFormat tif = new TableInputFormat();
301    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
302
303    assertEquals("Saw the wrong number of splits", 5, res.size());
304    TableSplit ts1 = (TableSplit) res.get(0);
305    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
306    TableSplit ts2 = (TableSplit) res.get(1);
307    assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
308    TableSplit ts3 = (TableSplit) res.get(2);
309    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
310    TableSplit ts4 = (TableSplit) res.get(4);
311    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
312  }
313}
314