001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertNotNull;
021import static org.junit.jupiter.api.Assertions.assertThrows;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotEnabledException;
032import org.apache.hadoop.hbase.TableNotFoundException;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.mapreduce.Counter;
043import org.apache.hadoop.mapreduce.Counters;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is
053 * simple - take every row in the table, reverse the value of a particular cell, and write it back
054 * to the table.
055 */
056
057@Tag(VerySlowMapReduceTests.TAG)
058@Tag(LargeTests.TAG)
059public class TestTableMapReduce extends TestTableMapReduceBase {
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduce.class);
062
063  @Override
064  protected Logger getLog() {
065    return LOG;
066  }
067
068  /**
069   * Pass the given key and processed record reduce
070   */
071  static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
072
073    /**
074     * Pass the key, and reversed value to reduce
075     */
076    @Override
077    public void map(ImmutableBytesWritable key, Result value, Context context)
078      throws IOException, InterruptedException {
079      if (value.size() != 1) {
080        throw new IOException("There should only be one input column");
081      }
082      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
083      if (!cf.containsKey(INPUT_FAMILY)) {
084        throw new IOException(
085          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
086      }
087
088      // Get the original value and reverse it
089      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
090      StringBuilder newValue = new StringBuilder(originalValue);
091      newValue.reverse();
092      // Now set the value to be collected
093      Put outval = new Put(key.get());
094      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
095      context.write(key, outval);
096    }
097  }
098
099  @Override
100  protected void runTestOnTable(Table table) throws IOException {
101    Job job = null;
102    try {
103      LOG.info("Before map/reduce startup");
104      job = new Job(table.getConfiguration(), "process column contents");
105      job.setNumReduceTasks(1);
106      Scan scan = new Scan();
107      scan.addFamily(INPUT_FAMILY);
108      TableMapReduceUtil.initTableMapperJob(table.getName().getNameAsString(), scan,
109        ProcessContentsMapper.class, ImmutableBytesWritable.class, Put.class, job);
110      TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(),
111        IdentityTableReducer.class, job);
112      FileOutputFormat.setOutputPath(job, new Path("test"));
113      LOG.info("Started " + table.getName().getNameAsString());
114      assertTrue(job.waitForCompletion(true));
115      LOG.info("After map/reduce completion");
116
117      // verify map-reduce results
118      verify(table.getName());
119
120      verifyJobCountersAreEmitted(job);
121    } catch (InterruptedException e) {
122      throw new IOException(e);
123    } catch (ClassNotFoundException e) {
124      throw new IOException(e);
125    } finally {
126      table.close();
127      if (job != null) {
128        FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir")));
129      }
130    }
131  }
132
133  /**
134   * Verify scan counters are emitted from the job
135   */
136  private void verifyJobCountersAreEmitted(Job job) throws IOException {
137    Counters counters = job.getCounters();
138    Counter counter =
139      counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
140    assertNotNull(counter, "Unable to find Job counter for HBase scan metrics, RPC_CALLS");
141    assertTrue(counter.getValue() > 0, "Counter value for RPC_CALLS should be larger than 0");
142  }
143
144  @Test
145  public void testWritingToDisabledTable() throws IOException {
146    assertThrows(TableNotEnabledException.class, () -> {
147      try (Admin admin = UTIL.getConnection().getAdmin();
148        Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
149        admin.disableTable(table.getName());
150        runTestOnTable(table);
151      }
152    });
153  }
154
155  @Test
156  public void testWritingToNonExistentTable() throws IOException {
157    assertThrows(TableNotFoundException.class, () -> {
158      try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
159        runTestOnTable(table);
160      }
161    });
162  }
163}