001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.NavigableMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Reducer;
044import org.apache.hadoop.mapreduce.TaskCounter;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce
054 * job to see if that is handed over and done properly too.
055 */
056public abstract class TestTableInputFormatScanBase {
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class);
059  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060
061  static final TableName TABLE_NAME = TableName.valueOf("scantest");
062  static final byte[][] INPUT_FAMILYS = { Bytes.toBytes("content1"), Bytes.toBytes("content2") };
063  static final String KEY_STARTROW = "startRow";
064  static final String KEY_LASTROW = "stpRow";
065
066  private static Table table = null;
067
068  @BeforeAll
069  public static void setUpBeforeClass() throws Exception {
070    // start mini hbase cluster
071    TEST_UTIL.startMiniCluster(3);
072    // create and fill table
073    table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
074    TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
075  }
076
077  @AfterAll
078  public static void tearDownAfterClass() throws Exception {
079    TEST_UTIL.shutdownMiniCluster();
080  }
081
082  /**
083   * Pass the key and value to reduce.
084   */
085  public static class ScanMapper
086    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
087
088    /**
089     * Pass the key and value to reduce.
090     * @param key     The key, here "aaa", "aab" etc.
091     * @param value   The value is the same as the key.
092     * @param context The task context.
093     * @throws IOException When reading the rows fails.
094     */
095    @Override
096    public void map(ImmutableBytesWritable key, Result value, Context context)
097      throws IOException, InterruptedException {
098      if (value.size() != 2) {
099        throw new IOException("There should be two input columns");
100      }
101      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cfMap = value.getMap();
102
103      if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
104        throw new IOException("Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILYS[0])
105          + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
106      }
107
108      String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
109      String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
110      LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> (" + val0 + ", "
111        + val1 + ")");
112      context.write(key, key);
113    }
114  }
115
116  /**
117   * Checks the last and first key seen against the scanner boundaries.
118   */
119  public static class ScanReducer
120    extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> {
121
122    private String first = null;
123    private String last = null;
124
125    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values,
126      Context context) throws IOException, InterruptedException {
127      int count = 0;
128      for (ImmutableBytesWritable value : values) {
129        String val = Bytes.toStringBinary(value.get());
130        LOG.info(
131          "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
132        if (first == null) first = val;
133        last = val;
134        count++;
135      }
136    }
137
138    protected void cleanup(Context context) throws IOException, InterruptedException {
139      Configuration c = context.getConfiguration();
140      String startRow = c.get(KEY_STARTROW);
141      String lastRow = c.get(KEY_LASTROW);
142      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
143      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
144      if (startRow != null && startRow.length() > 0) {
145        assertEquals(startRow, first);
146      }
147      if (lastRow != null && lastRow.length() > 0) {
148        assertEquals(lastRow, last);
149      }
150    }
151
152  }
153
154  /**
155   * Tests an MR Scan initialized from properties set in the Configuration.
156   */
157  protected void testScanFromConfiguration(String start, String stop, String last)
158    throws IOException, InterruptedException, ClassNotFoundException {
159    String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty")
160      + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
161    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
162    c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
163    c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
164      Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
165    c.set(KEY_STARTROW, start != null ? start : "");
166    c.set(KEY_LASTROW, last != null ? last : "");
167
168    if (start != null) {
169      c.set(TableInputFormat.SCAN_ROW_START, start);
170    }
171
172    if (stop != null) {
173      c.set(TableInputFormat.SCAN_ROW_STOP, stop);
174    }
175
176    Job job = Job.getInstance(c, jobName);
177    job.setMapperClass(ScanMapper.class);
178    job.setReducerClass(ScanReducer.class);
179    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
180    job.setMapOutputValueClass(ImmutableBytesWritable.class);
181    job.setInputFormatClass(TableInputFormat.class);
182    job.setNumReduceTasks(1);
183    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
184    TableMapReduceUtil.addDependencyJars(job);
185    assertTrue(job.waitForCompletion(true));
186  }
187
188  /**
189   * Tests a MR scan using specific start and stop rows.
190   */
191  protected void testScan(String start, String stop, String last)
192    throws IOException, InterruptedException, ClassNotFoundException {
193    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To"
194      + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
195    LOG.info("Before map/reduce startup - job " + jobName);
196    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
197    Scan scan = new Scan();
198    scan.addFamily(INPUT_FAMILYS[0]);
199    scan.addFamily(INPUT_FAMILYS[1]);
200    if (start != null) {
201      scan.withStartRow(Bytes.toBytes(start));
202    }
203    c.set(KEY_STARTROW, start != null ? start : "");
204    if (stop != null) {
205      scan.withStopRow(Bytes.toBytes(stop));
206    }
207    c.set(KEY_LASTROW, last != null ? last : "");
208    LOG.info("scan before: " + scan);
209    Job job = Job.getInstance(c, jobName);
210    TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class,
211      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
212    job.setReducerClass(ScanReducer.class);
213    job.setNumReduceTasks(1); // one to get final "first" and "last" key
214    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
215    LOG.info("Started " + job.getJobName());
216    assertTrue(job.waitForCompletion(true));
217    LOG.info("After map/reduce completion - job " + jobName);
218  }
219
220  /**
221   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
222   * This test does not run MR job
223   */
224  protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits)
225    throws IOException, InterruptedException, ClassNotFoundException {
226    String jobName = "TestJobForNumOfSplits";
227    LOG.info("Before map/reduce startup - job " + jobName);
228    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
229    Scan scan = new Scan();
230    scan.addFamily(INPUT_FAMILYS[0]);
231    scan.addFamily(INPUT_FAMILYS[1]);
232    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
233    c.set(KEY_STARTROW, "");
234    c.set(KEY_LASTROW, "");
235    Job job = Job.getInstance(c, jobName);
236    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
237      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
238    TableInputFormat tif = new TableInputFormat();
239    tif.setConf(job.getConfiguration());
240    assertEquals(TABLE_NAME, table.getName());
241    List<InputSplit> splits = tif.getSplits(job);
242    for (InputSplit split : splits) {
243      TableSplit tableSplit = (TableSplit) split;
244      // In table input format, we do no store the scanner at the split level
245      // because we use the scan object from the map-reduce job conf itself.
246      assertTrue(tableSplit.getScanAsString().isEmpty());
247    }
248    assertEquals(expectedNumOfSplits, splits.size());
249  }
250
251  /**
252   * Run MR job to check the number of mapper = expectedNumOfSplits
253   */
254  protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
255    throws IOException, InterruptedException, ClassNotFoundException {
256    String jobName = "TestJobForNumOfSplits-MR";
257    LOG.info("Before map/reduce startup - job " + jobName);
258    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
259    Scan scan = new Scan();
260    scan.addFamily(INPUT_FAMILYS[0]);
261    scan.addFamily(INPUT_FAMILYS[1]);
262    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
263    c.set(KEY_STARTROW, "");
264    c.set(KEY_LASTROW, "");
265    Job job = Job.getInstance(c, jobName);
266    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
267      ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
268    job.setReducerClass(ScanReducer.class);
269    job.setNumReduceTasks(1);
270    job.setOutputFormatClass(NullOutputFormat.class);
271    assertTrue(job.waitForCompletion(true), "job failed!");
272    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
273    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
274    assertEquals(expectedNumOfSplits,
275      job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue(),
276      "Saw the wrong count of mappers per region");
277  }
278
279  /**
280   * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR
281   * job
282   */
283  protected void testAutobalanceNumOfSplit() throws IOException {
284    // set up splits for testing
285    List<InputSplit> splits = new ArrayList<>(5);
286    int[] regionLen = { 10, 20, 20, 40, 60 };
287    for (int i = 0; i < 5; i++) {
288      InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i),
289        Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
290      splits.add(split);
291    }
292    TableInputFormat tif = new TableInputFormat();
293    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
294
295    assertEquals(5, res.size(), "Saw the wrong number of splits");
296    TableSplit ts1 = (TableSplit) res.get(0);
297    assertEquals(2, Bytes.toInt(ts1.getEndRow()), "The first split end key should be");
298    TableSplit ts2 = (TableSplit) res.get(1);
299    assertEquals(20 * 1048576, ts2.getLength(), "The second split regionsize should be");
300    TableSplit ts3 = (TableSplit) res.get(2);
301    assertEquals(3, Bytes.toInt(ts3.getStartRow()), "The third split start key should be");
302    TableSplit ts4 = (TableSplit) res.get(4);
303    assertNotEquals(4, Bytes.toInt(ts4.getStartRow()), "The seventh split start key should not be");
304  }
305}