001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.mapreduce.Counter;
044import org.apache.hadoop.mapreduce.Counters;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is
055 * simple - take every row in the table, reverse the value of a particular cell, and write it back
056 * to the table.
057 */
058
059@Category({ VerySlowMapReduceTests.class, LargeTests.class })
060public class TestTableMapReduce extends TestTableMapReduceBase {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestTableMapReduce.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class);
067
068  @Override
069  protected Logger getLog() {
070    return LOG;
071  }
072
073  /**
074   * Pass the given key and processed record reduce
075   */
076  static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
077
078    /**
079     * Pass the key, and reversed value to reduce nnnn
080     */
081    @Override
082    public void map(ImmutableBytesWritable key, Result value, Context context)
083      throws IOException, InterruptedException {
084      if (value.size() != 1) {
085        throw new IOException("There should only be one input column");
086      }
087      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
088      if (!cf.containsKey(INPUT_FAMILY)) {
089        throw new IOException(
090          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
091      }
092
093      // Get the original value and reverse it
094      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
095      StringBuilder newValue = new StringBuilder(originalValue);
096      newValue.reverse();
097      // Now set the value to be collected
098      Put outval = new Put(key.get());
099      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
100      context.write(key, outval);
101    }
102  }
103
104  @Override
105  protected void runTestOnTable(Table table) throws IOException {
106    Job job = null;
107    try {
108      LOG.info("Before map/reduce startup");
109      job = new Job(table.getConfiguration(), "process column contents");
110      job.setNumReduceTasks(1);
111      Scan scan = new Scan();
112      scan.addFamily(INPUT_FAMILY);
113      TableMapReduceUtil.initTableMapperJob(table.getName().getNameAsString(), scan,
114        ProcessContentsMapper.class, ImmutableBytesWritable.class, Put.class, job);
115      TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(),
116        IdentityTableReducer.class, job);
117      FileOutputFormat.setOutputPath(job, new Path("test"));
118      LOG.info("Started " + table.getName().getNameAsString());
119      assertTrue(job.waitForCompletion(true));
120      LOG.info("After map/reduce completion");
121
122      // verify map-reduce results
123      verify(table.getName());
124
125      verifyJobCountersAreEmitted(job);
126    } catch (InterruptedException e) {
127      throw new IOException(e);
128    } catch (ClassNotFoundException e) {
129      throw new IOException(e);
130    } finally {
131      table.close();
132      if (job != null) {
133        FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir")));
134      }
135    }
136  }
137
138  /**
139   * Verify scan counters are emitted from the job nn
140   */
141  private void verifyJobCountersAreEmitted(Job job) throws IOException {
142    Counters counters = job.getCounters();
143    Counter counter =
144      counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
145    assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
146    assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
147  }
148
149  @Test(expected = TableNotEnabledException.class)
150  public void testWritingToDisabledTable() throws IOException {
151
152    try (Admin admin = UTIL.getConnection().getAdmin();
153      Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
154      admin.disableTable(table.getName());
155      runTestOnTable(table);
156      fail("Should not have reached here, should have thrown an exception");
157    }
158  }
159
160  @Test(expected = TableNotFoundException.class)
161  public void testWritingToNonExistentTable() throws IOException {
162
163    try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
164      runTestOnTable(table);
165      fail("Should not have reached here, should have thrown an exception");
166    }
167  }
168}