001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.NavigableMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapreduce.InputSplit;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.Reducer;
045import org.apache.hadoop.mapreduce.TaskCounter;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
048import org.junit.AfterClass;
049import org.junit.Assert;
050import org.junit.BeforeClass;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054
055/**
056 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce
057 * job to see if that is handed over and done properly too.
058 */
059public abstract class TestTableInputFormatScanBase {
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class);
062  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063
064  static final TableName TABLE_NAME = TableName.valueOf("scantest");
065  static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
066  static final String KEY_STARTROW = "startRow";
067  static final String KEY_LASTROW = "stpRow";
068
069  private static Table table = null;
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    // start mini hbase cluster
074    TEST_UTIL.startMiniCluster(3);
075    // create and fill table
076    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
077    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
078  }
079
080  @AfterClass
081  public static void tearDownAfterClass() throws Exception {
082    TEST_UTIL.shutdownMiniCluster();
083  }
084
085  /**
086   * Pass the key and value to reduce.
087   */
088  public static class ScanMapper
089  extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
090
091    /**
092     * Pass the key and value to reduce.
093     *
094     * @param key  The key, here "aaa", "aab" etc.
095     * @param value  The value is the same as the key.
096     * @param context  The task context.
097     * @throws IOException When reading the rows fails.
098     */
099    @Override
100    public void map(ImmutableBytesWritable key, Result value,
101      Context context)
102    throws IOException, InterruptedException {
103      if (value.size() != 2) {
104        throw new IOException("There should be two input columns");
105      }
106      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
107        cfMap = value.getMap();
108
109      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
110        throw new IOException("Wrong input columns. Missing: '" +
111          Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
112      }
113
114      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
115      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
116      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
117               ", value -> (" + val0 + ", " + val1 + ")");
118      context.write(key, key);
119    }
120  }
121
122  /**
123   * Checks the last and first key seen against the scanner boundaries.
124   */
125  public static class ScanReducer
126  extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
127                    NullWritable, NullWritable> {
128
129    private String first = null;
130    private String last = null;
131
132    protected void reduce(ImmutableBytesWritable key,
133        Iterable<ImmutableBytesWritable> values, Context context)
134    throws IOException ,InterruptedException {
135      int count = 0;
136      for (ImmutableBytesWritable value : values) {
137        String val = Bytes.toStringBinary(value.get());
138        LOG.info("reduce: key[" + count + "] -> " +
139          Bytes.toStringBinary(key.get()) + ", value -> " + val);
140        if (first == null) first = val;
141        last = val;
142        count++;
143      }
144    }
145
146    protected void cleanup(Context context)
147    throws IOException, InterruptedException {
148      Configuration c = context.getConfiguration();
149      String startRow = c.get(KEY_STARTROW);
150      String lastRow = c.get(KEY_LASTROW);
151      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
152      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
153      if (startRow != null && startRow.length() > 0) {
154        assertEquals(startRow, first);
155      }
156      if (lastRow != null && lastRow.length() > 0) {
157        assertEquals(lastRow, last);
158      }
159    }
160
161  }
162
163  /**
164   * Tests an MR Scan initialized from properties set in the Configuration.
165   */
166  protected void testScanFromConfiguration(String start, String stop, String last)
167      throws IOException, InterruptedException, ClassNotFoundException {
168    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
169      "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
170    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
171    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
172    c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
173      Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
174    c.set(KEY_STARTROW, start != null ? start : "");
175    c.set(KEY_LASTROW, last != null ? last : "");
176
177    if (start != null) {
178      c.set(TableInputFormat.SCAN_ROW_START, start);
179    }
180
181    if (stop != null) {
182      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
183    }
184
185    Job job = Job.getInstance(c, jobName);
186    job.setMapperClass(ScanMapper.class);
187    job.setReducerClass(ScanReducer.class);
188    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
189    job.setMapOutputValueClass(ImmutableBytesWritable.class);
190    job.setInputFormatClass(TableInputFormat.class);
191    job.setNumReduceTasks(1);
192    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
193    TableMapReduceUtil.addDependencyJars(job);
194    assertTrue(job.waitForCompletion(true));
195  }
196
197  /**
198   * Tests a MR scan using specific start and stop rows.
199   */
200  protected void testScan(String start, String stop, String last)
201      throws IOException, InterruptedException, ClassNotFoundException {
202    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" +
203      (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
204    LOG.info("Before map/reduce startup - job " + jobName);
205    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
206    Scan scan = new Scan();
207    scan.addFamily(INPUT_FAMILYS[0]);
208    scan.addFamily(INPUT_FAMILYS[1]);
209    if (start != null) {
210      scan.withStartRow(Bytes.toBytes(start));
211    }
212    c.set(KEY_STARTROW, start != null ? start : "");
213    if (stop != null) {
214      scan.withStopRow(Bytes.toBytes(stop));
215    }
216    c.set(KEY_LASTROW, last != null ? last : "");
217    LOG.info("scan before: " + scan);
218    Job job = Job.getInstance(c, jobName);
219    TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class,
220      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
221    job.setReducerClass(ScanReducer.class);
222    job.setNumReduceTasks(1); // one to get final "first" and "last" key
223    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
224    LOG.info("Started " + job.getJobName());
225    assertTrue(job.waitForCompletion(true));
226    LOG.info("After map/reduce completion - job " + jobName);
227  }
228
229
230  /**
231   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
232   * This test does not run MR job
233   */
234  protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
235      throws IOException, InterruptedException, ClassNotFoundException {
236    String jobName = "TestJobForNumOfSplits";
237    LOG.info("Before map/reduce startup - job " + jobName);
238    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
239    Scan scan = new Scan();
240    scan.addFamily(INPUT_FAMILYS[0]);
241    scan.addFamily(INPUT_FAMILYS[1]);
242    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
243    c.set(KEY_STARTROW, "");
244    c.set(KEY_LASTROW, "");
245    Job job = Job.getInstance(c, jobName);
246    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
247      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
248    TableInputFormat tif = new TableInputFormat();
249    tif.setConf(job.getConfiguration());
250    Assert.assertEquals(TABLE_NAME, table.getName());
251    List<InputSplit> splits = tif.getSplits(job);
252    Assert.assertEquals(expectedNumOfSplits, splits.size());
253  }
254
255  /**
256   * Run MR job to check the number of mapper = expectedNumOfSplits
257   */
258  protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
259      throws IOException, InterruptedException, ClassNotFoundException {
260    String jobName = "TestJobForNumOfSplits-MR";
261    LOG.info("Before map/reduce startup - job " + jobName);
262    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
263    Scan scan = new Scan();
264    scan.addFamily(INPUT_FAMILYS[0]);
265    scan.addFamily(INPUT_FAMILYS[1]);
266    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
267    c.set(KEY_STARTROW, "");
268    c.set(KEY_LASTROW, "");
269    Job job = Job.getInstance(c, jobName);
270    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
271      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
272    job.setReducerClass(ScanReducer.class);
273    job.setNumReduceTasks(1);
274    job.setOutputFormatClass(NullOutputFormat.class);
275    assertTrue("job failed!", job.waitForCompletion(true));
276    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
277    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
278    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
279      job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
280  }
281
282  /**
283   * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR
284   * job
285   */
286  protected void testAutobalanceNumOfSplit() throws IOException {
287    // set up splits for testing
288    List<InputSplit> splits = new ArrayList<>(5);
289    int[] regionLen = { 10, 20, 20, 40, 60 };
290    for (int i = 0; i < 5; i++) {
291      InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i),
292        Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
293      splits.add(split);
294    }
295    TableInputFormat tif = new TableInputFormat();
296    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
297
298    assertEquals("Saw the wrong number of splits", 5, res.size());
299    TableSplit ts1 = (TableSplit) res.get(0);
300    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
301    TableSplit ts2 = (TableSplit) res.get(1);
302    assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
303    TableSplit ts3 = (TableSplit) res.get(2);
304    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
305    TableSplit ts4 = (TableSplit) res.get(4);
306    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
307  }
308}
309