001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.NavigableMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileUtil;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.logging.Log4jUtils;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.io.NullWritable;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Reducer;
044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053
054/**
055 * Base set of tests and setup for input formats touching multiple tables.
056 */
057public abstract class MultiTableInputFormatTestBase {
058  static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class);
059  public static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  static final String TABLE_NAME = "scantest";
061  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
062  static final String KEY_STARTROW = "startRow";
063  static final String KEY_LASTROW = "stpRow";
064
065  static List<String> TABLES = Lists.newArrayList();
066
067  static {
068    for (int i = 0; i < 3; i++) {
069      TABLES.add(TABLE_NAME + String.valueOf(i));
070    }
071  }
072
073  @BeforeClass
074  public static void setUpBeforeClass() throws Exception {
075    // switch TIF to log at DEBUG level
076    Log4jUtils.enableDebug(MultiTableInputFormatBase.class);
077    // start mini hbase cluster
078    TEST_UTIL.startMiniCluster(3);
079    // create and fill table
080    for (String tableName : TABLES) {
081      try (Table table =
082        TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4)) {
083        TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
084      }
085    }
086  }
087
088  @AfterClass
089  public static void tearDownAfterClass() throws Exception {
090    TEST_UTIL.shutdownMiniCluster();
091  }
092
093  @After
094  public void tearDown() throws Exception {
095    Configuration c = TEST_UTIL.getConfiguration();
096    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
097  }
098
099  /**
100   * Pass the key and value to reducer.
101   */
102  public static class ScanMapper
103    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
104    /**
105     * Pass the key and value to reduce.
106     * @param key     The key, here "aaa", "aab" etc.
107     * @param value   The value is the same as the key.
108     * @param context The task context.
109     * @throws IOException When reading the rows fails.
110     */
111    @Override
112    public void map(ImmutableBytesWritable key, Result value, Context context)
113      throws IOException, InterruptedException {
114      makeAssertions(key, value);
115      context.write(key, key);
116    }
117
118    public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
119      if (value.size() != 1) {
120        throw new IOException("There should only be one input column");
121      }
122      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
123      if (!cf.containsKey(INPUT_FAMILY)) {
124        throw new IOException(
125          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
126      }
127      String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
128      LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
129    }
130  }
131
132  /**
133   * Checks the last and first keys seen against the scanner boundaries.
134   */
135  public static class ScanReducer
136    extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> {
137    private String first = null;
138    private String last = null;
139
140    @Override
141    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values,
142      Context context) throws IOException, InterruptedException {
143      makeAssertions(key, values);
144    }
145
146    protected void makeAssertions(ImmutableBytesWritable key,
147      Iterable<ImmutableBytesWritable> values) {
148      int count = 0;
149      for (ImmutableBytesWritable value : values) {
150        String val = Bytes.toStringBinary(value.get());
151        LOG.debug(
152          "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val);
153        if (first == null) first = val;
154        last = val;
155        count++;
156      }
157      assertEquals(3, count);
158    }
159
160    @Override
161    protected void cleanup(Context context) throws IOException, InterruptedException {
162      Configuration c = context.getConfiguration();
163      cleanup(c);
164    }
165
166    protected void cleanup(Configuration c) {
167      String startRow = c.get(KEY_STARTROW);
168      String lastRow = c.get(KEY_LASTROW);
169      LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
170      LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
171      if (startRow != null && startRow.length() > 0) {
172        assertEquals(startRow, first);
173      }
174      if (lastRow != null && lastRow.length() > 0) {
175        assertEquals(lastRow, last);
176      }
177    }
178  }
179
180  @Test
181  public void testScanEmptyToEmpty()
182    throws IOException, InterruptedException, ClassNotFoundException {
183    testScan(null, null, null);
184  }
185
186  @Test
187  public void testScanEmptyToAPP()
188    throws IOException, InterruptedException, ClassNotFoundException {
189    testScan(null, "app", "apo");
190  }
191
192  @Test
193  public void testScanOBBToOPP() throws IOException, InterruptedException, ClassNotFoundException {
194    testScan("obb", "opp", "opo");
195  }
196
197  @Test
198  public void testScanYZYToEmpty()
199    throws IOException, InterruptedException, ClassNotFoundException {
200    testScan("yzy", null, "zzz");
201  }
202
203  /**
204   * Tests a MR scan using specific start and stop rows.
205   */
206  private void testScan(String start, String stop, String last)
207    throws IOException, InterruptedException, ClassNotFoundException {
208    String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To"
209      + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
210    LOG.info("Before map/reduce startup - job " + jobName);
211    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
212
213    c.set(KEY_STARTROW, start != null ? start : "");
214    c.set(KEY_LASTROW, last != null ? last : "");
215
216    List<Scan> scans = new ArrayList<>();
217
218    for (String tableName : TABLES) {
219      Scan scan = new Scan();
220
221      scan.addFamily(INPUT_FAMILY);
222      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
223
224      if (start != null) {
225        scan.withStartRow(Bytes.toBytes(start));
226      }
227      if (stop != null) {
228        scan.withStopRow(Bytes.toBytes(stop));
229      }
230
231      scans.add(scan);
232
233      LOG.info("scan before: " + scan);
234    }
235
236    runJob(jobName, c, scans);
237  }
238
239  protected void runJob(String jobName, Configuration c, List<Scan> scans)
240    throws IOException, InterruptedException, ClassNotFoundException {
241    Job job = new Job(c, jobName);
242
243    initJob(scans, job);
244    job.setReducerClass(ScanReducer.class);
245    job.setNumReduceTasks(1); // one to get final "first" and "last" key
246    FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
247    LOG.info("Started " + job.getJobName());
248    job.waitForCompletion(true);
249    assertTrue(job.isSuccessful());
250    LOG.info("After map/reduce completion - job " + jobName);
251  }
252
253  protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
254
255}