001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.NavigableMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileUtil;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.logging.Log4jUtils;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.io.NullWritable;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.Reducer;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.junit.After;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.Test;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054
055/**
056 * Base set of tests and setup for input formats touching multiple tables.
057 */
058public abstract class MultiTableInputFormatTestBase {
059  static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class);
060  public static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061  static final String TABLE_NAME = "scantest";
062  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
063  static final String KEY_STARTROW = "startRow";
064  static final String KEY_LASTROW = "stpRow";
065
066  static List<String> TABLES = Lists.newArrayList();
067
068  static {
069    for (int i = 0; i < 3; i++) {
070      TABLES.add(TABLE_NAME + String.valueOf(i));
071    }
072  }
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    // switch TIF to log at DEBUG level
077    Log4jUtils.enableDebug(MultiTableInputFormatBase.class);
078    // start mini hbase cluster
079    TEST_UTIL.startMiniCluster(3);
080    // create and fill table
081    for (String tableName : TABLES) {
082      try (Table table =
083          TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName),
084            INPUT_FAMILY, 4)) {
085        TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
086      }
087    }
088  }
089
090  @AfterClass
091  public static void tearDownAfterClass() throws Exception {
092    TEST_UTIL.shutdownMiniCluster();
093  }
094
095  @After
096  public void tearDown() throws Exception {
097    Configuration c = TEST_UTIL.getConfiguration();
098    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
099  }
100
101  /**
102   * Pass the key and value to reducer.
103   */
104  public static class ScanMapper extends
105      TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
106    /**
107     * Pass the key and value to reduce.
108     *
109     * @param key The key, here "aaa", "aab" etc.
110     * @param value The value is the same as the key.
111     * @param context The task context.
112     * @throws IOException When reading the rows fails.
113     */
114    @Override
115    public void map(ImmutableBytesWritable key, Result value, Context context)
116        throws IOException, InterruptedException {
117      makeAssertions(key, value);
118      context.write(key, key);
119    }
120
121    public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
122      if (value.size() != 1) {
123        throw new IOException("There should only be one input column");
124      }
125      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
126          value.getMap();
127      if (!cf.containsKey(INPUT_FAMILY)) {
128        throw new IOException("Wrong input columns. Missing: '" +
129            Bytes.toString(INPUT_FAMILY) + "'.");
130      }
131      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
132      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
133          ", value -> " + val);
134    }
135  }
136
137  /**
138   * Checks the last and first keys seen against the scanner boundaries.
139   */
140  public static class ScanReducer
141      extends
142      Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
143          NullWritable, NullWritable> {
144    private String first = null;
145    private String last = null;
146
147    @Override
148    protected void reduce(ImmutableBytesWritable key,
149        Iterable<ImmutableBytesWritable> values, Context context)
150        throws IOException, InterruptedException {
151      makeAssertions(key, values);
152    }
153
154    protected void makeAssertions(ImmutableBytesWritable key,
155        Iterable<ImmutableBytesWritable> values) {
156      int count = 0;
157      for (ImmutableBytesWritable value : values) {
158        String val = Bytes.toStringBinary(value.get());
159        LOG.debug("reduce: key[" + count + "] -> " +
160            Bytes.toStringBinary(key.get()) + ", value -> " + val);
161        if (first == null) first = val;
162        last = val;
163        count++;
164      }
165      assertEquals(3, count);
166    }
167
168    @Override
169    protected void cleanup(Context context) throws IOException,
170        InterruptedException {
171      Configuration c = context.getConfiguration();
172      cleanup(c);
173    }
174
175    protected void cleanup(Configuration c) {
176      String startRow = c.get(KEY_STARTROW);
177      String lastRow = c.get(KEY_LASTROW);
178      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
179          startRow + "\"");
180      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
181          "\"");
182      if (startRow != null && startRow.length() > 0) {
183        assertEquals(startRow, first);
184      }
185      if (lastRow != null && lastRow.length() > 0) {
186        assertEquals(lastRow, last);
187      }
188    }
189  }
190
191  @Test
192  public void testScanEmptyToEmpty() throws IOException, InterruptedException,
193      ClassNotFoundException {
194    testScan(null, null, null);
195  }
196
197  @Test
198  public void testScanEmptyToAPP() throws IOException, InterruptedException,
199      ClassNotFoundException {
200    testScan(null, "app", "apo");
201  }
202
203  @Test
204  public void testScanOBBToOPP() throws IOException, InterruptedException,
205      ClassNotFoundException {
206    testScan("obb", "opp", "opo");
207  }
208
209  @Test
210  public void testScanYZYToEmpty() throws IOException, InterruptedException,
211      ClassNotFoundException {
212    testScan("yzy", null, "zzz");
213  }
214
215  /**
216   * Tests a MR scan using specific start and stop rows.
217   *
218   * @throws IOException
219   * @throws ClassNotFoundException
220   * @throws InterruptedException
221   */
222  private void testScan(String start, String stop, String last)
223      throws IOException, InterruptedException, ClassNotFoundException {
224    String jobName =
225        "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" +
226            (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
227    LOG.info("Before map/reduce startup - job " + jobName);
228    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
229
230    c.set(KEY_STARTROW, start != null ? start : "");
231    c.set(KEY_LASTROW, last != null ? last : "");
232
233    List<Scan> scans = new ArrayList<>();
234
235    for (String tableName : TABLES) {
236      Scan scan = new Scan();
237
238      scan.addFamily(INPUT_FAMILY);
239      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
240
241      if (start != null) {
242        scan.withStartRow(Bytes.toBytes(start));
243      }
244      if (stop != null) {
245        scan.withStopRow(Bytes.toBytes(stop));
246      }
247
248      scans.add(scan);
249
250      LOG.info("scan before: " + scan);
251    }
252
253    runJob(jobName, c, scans);
254  }
255
256  protected void runJob(String jobName, Configuration c, List<Scan> scans)
257      throws IOException, InterruptedException, ClassNotFoundException {
258    Job job = new Job(c, jobName);
259
260    initJob(scans, job);
261    job.setReducerClass(ScanReducer.class);
262    job.setNumReduceTasks(1); // one to get final "first" and "last" key
263    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
264    LOG.info("Started " + job.getJobName());
265    job.waitForCompletion(true);
266    assertTrue(job.isSuccessful());
267    LOG.info("After map/reduce completion - job " + jobName);
268  }
269
270  protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
271
272
273}