001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.mapreduce.Counter;
044import org.apache.hadoop.mapreduce.Counters;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
055 * on our tables is simple - take every row in the table, reverse the value of
056 * a particular cell, and write it back to the table.
057 */
058
059@Category({VerySlowMapReduceTests.class, LargeTests.class})
060public class TestTableMapReduce extends TestTableMapReduceBase {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestTableMapReduce.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class);
067
068  @Override
069  protected Logger getLog() { return LOG; }
070
071  /**
072   * Pass the given key and processed record reduce
073   */
074  static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
075
076    /**
077     * Pass the key, and reversed value to reduce
078     *
079     * @param key
080     * @param value
081     * @param context
082     * @throws IOException
083     */
084    @Override
085    public void map(ImmutableBytesWritable key, Result value,
086      Context context)
087    throws IOException, InterruptedException {
088      if (value.size() != 1) {
089        throw new IOException("There should only be one input column");
090      }
091      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
092        cf = value.getMap();
093      if(!cf.containsKey(INPUT_FAMILY)) {
094        throw new IOException("Wrong input columns. Missing: '" +
095          Bytes.toString(INPUT_FAMILY) + "'.");
096      }
097
098      // Get the original value and reverse it
099      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
100      StringBuilder newValue = new StringBuilder(originalValue);
101      newValue.reverse();
102      // Now set the value to be collected
103      Put outval = new Put(key.get());
104      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
105      context.write(key, outval);
106    }
107  }
108
109  @Override
110  protected void runTestOnTable(Table table) throws IOException {
111    Job job = null;
112    try {
113      LOG.info("Before map/reduce startup");
114      job = new Job(table.getConfiguration(), "process column contents");
115      job.setNumReduceTasks(1);
116      Scan scan = new Scan();
117      scan.addFamily(INPUT_FAMILY);
118      TableMapReduceUtil.initTableMapperJob(
119        table.getName().getNameAsString(), scan,
120        ProcessContentsMapper.class, ImmutableBytesWritable.class,
121        Put.class, job);
122      TableMapReduceUtil.initTableReducerJob(
123          table.getName().getNameAsString(),
124        IdentityTableReducer.class, job);
125      FileOutputFormat.setOutputPath(job, new Path("test"));
126      LOG.info("Started " + table.getName().getNameAsString());
127      assertTrue(job.waitForCompletion(true));
128      LOG.info("After map/reduce completion");
129
130      // verify map-reduce results
131      verify(table.getName());
132
133      verifyJobCountersAreEmitted(job);
134    } catch (InterruptedException e) {
135      throw new IOException(e);
136    } catch (ClassNotFoundException e) {
137      throw new IOException(e);
138    } finally {
139      table.close();
140      if (job != null) {
141        FileUtil.fullyDelete(
142          new File(job.getConfiguration().get("hadoop.tmp.dir")));
143      }
144    }
145  }
146
147  /**
148   * Verify scan counters are emitted from the job
149   * @param job
150   * @throws IOException
151   */
152  private void verifyJobCountersAreEmitted(Job job) throws IOException {
153    Counters counters = job.getCounters();
154    Counter counter
155      = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
156    assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
157    assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
158  }
159
160  @Test(expected = TableNotEnabledException.class)
161  public void testWritingToDisabledTable() throws IOException {
162
163    try (Admin admin = UTIL.getConnection().getAdmin();
164      Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
165      admin.disableTable(table.getName());
166      runTestOnTable(table);
167      fail("Should not have reached here, should have thrown an exception");
168    }
169  }
170
171  @Test(expected = TableNotFoundException.class)
172  public void testWritingToNonExistentTable() throws IOException {
173
174    try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
175      runTestOnTable(table);
176      fail("Should not have reached here, should have thrown an exception");
177    }
178  }
179}