001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.MapReduceTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.BeforeAll;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing on our tables is
055 * simple - take every row in the table, reverse the value of a particular cell, and write it back
056 * to the table.
057 */
058@Tag(MapReduceTests.TAG)
059@Tag(LargeTests.TAG)
060public class TestMultithreadedTableMapper {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestMultithreadedTableMapper.class);
063  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
064  static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
065  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
066  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
067  static final int NUMBER_OF_THREADS = 10;
068
069  @BeforeAll
070  public static void beforeClass() throws Exception {
071    // Up the handlers; this test needs more than usual.
072    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
073    UTIL.startMiniCluster();
074    Table table = UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME,
075      new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
076    UTIL.loadTable(table, INPUT_FAMILY, false);
077    UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME);
078  }
079
080  @AfterAll
081  public static void afterClass() throws Exception {
082    UTIL.shutdownMiniCluster();
083  }
084
085  /**
086   * Pass the given key and processed record reduce
087   */
088  public static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
089
090    /**
091     * Pass the key, and reversed value to reduce
092     */
093    @Override
094    public void map(ImmutableBytesWritable key, Result value, Context context)
095      throws IOException, InterruptedException {
096      if (value.size() != 1) {
097        throw new IOException("There should only be one input column");
098      }
099      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap();
100      if (!cf.containsKey(INPUT_FAMILY)) {
101        throw new IOException(
102          "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'.");
103      }
104      // Get the original value and reverse it
105      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
106      StringBuilder newValue = new StringBuilder(originalValue);
107      newValue.reverse();
108      // Now set the value to be collected
109      Put outval = new Put(key.get());
110      outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
111      context.write(key, outval);
112    }
113  }
114
115  /**
116   * Test multithreadedTableMappper map/reduce against a multi-region table
117   */
118  @Test
119  public void testMultithreadedTableMapper()
120    throws IOException, InterruptedException, ClassNotFoundException {
121    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
122  }
123
124  private void runTestOnTable(Table table)
125    throws IOException, InterruptedException, ClassNotFoundException {
126    Job job = null;
127    try {
128      LOG.info("Before map/reduce startup");
129      job = new Job(table.getConfiguration(), "process column contents");
130      job.setNumReduceTasks(1);
131      Scan scan = new Scan();
132      scan.addFamily(INPUT_FAMILY);
133      TableMapReduceUtil.initTableMapperJob(table.getName(), scan, MultithreadedTableMapper.class,
134        ImmutableBytesWritable.class, Put.class, job);
135      MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
136      MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
137      TableMapReduceUtil.initTableReducerJob(table.getName().getNameAsString(),
138        IdentityTableReducer.class, job);
139      FileOutputFormat.setOutputPath(job, new Path("test"));
140      LOG.info("Started " + table.getName());
141      assertTrue(job.waitForCompletion(true));
142      LOG.info("After map/reduce completion");
143      // verify map-reduce results
144      verify(table.getName());
145    } finally {
146      table.close();
147      if (job != null) {
148        FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir")));
149      }
150    }
151  }
152
153  private void verify(TableName tableName) throws IOException {
154    Table table = UTIL.getConnection().getTable(tableName);
155    boolean verified = false;
156    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
157    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
158    for (int i = 0; i < numRetries; i++) {
159      try {
160        LOG.info("Verification attempt #" + i);
161        verifyAttempt(table);
162        verified = true;
163        break;
164      } catch (NullPointerException e) {
165        // If here, a cell was empty. Presume its because updates came in
166        // after the scanner had been opened. Wait a while and retry.
167        LOG.debug("Verification attempt failed: " + e.getMessage());
168      }
169      try {
170        Thread.sleep(pause);
171      } catch (InterruptedException e) {
172        // continue
173      }
174    }
175    assertTrue(verified);
176    table.close();
177  }
178
179  /**
180   * Looks at every value of the mapreduce output and verifies that indeed the values have been
181   * reversed.
182   * @param table Table to scan.
183   * @throws NullPointerException if we failed to find a cell value
184   */
185  private void verifyAttempt(final Table table) throws IOException, NullPointerException {
186    Scan scan = new Scan();
187    scan.addFamily(INPUT_FAMILY);
188    scan.addFamily(OUTPUT_FAMILY);
189    ResultScanner scanner = table.getScanner(scan);
190    try {
191      Iterator<Result> itr = scanner.iterator();
192      assertTrue(itr.hasNext());
193      while (itr.hasNext()) {
194        Result r = itr.next();
195        if (LOG.isDebugEnabled()) {
196          if (r.size() > 2) {
197            throw new IOException("Too many results, expected 2 got " + r.size());
198          }
199        }
200        byte[] firstValue = null;
201        byte[] secondValue = null;
202        int count = 0;
203        for (Cell kv : r.listCells()) {
204          if (count == 0) {
205            firstValue = CellUtil.cloneValue(kv);
206          } else if (count == 1) {
207            secondValue = CellUtil.cloneValue(kv);
208          } else if (count == 2) {
209            break;
210          }
211          count++;
212        }
213        String first = "";
214        if (firstValue == null) {
215          throw new NullPointerException(Bytes.toString(r.getRow()) + ": first value is null");
216        }
217        first = Bytes.toString(firstValue);
218        String second = "";
219        if (secondValue == null) {
220          throw new NullPointerException(Bytes.toString(r.getRow()) + ": second value is null");
221        }
222        byte[] secondReversed = new byte[secondValue.length];
223        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
224          secondReversed[i] = secondValue[j];
225        }
226        second = Bytes.toString(secondReversed);
227        if (first.compareTo(second) != 0) {
228          if (LOG.isDebugEnabled()) {
229            LOG.debug(
230              "second key is not the reverse of first. row=" + Bytes.toStringBinary(r.getRow())
231                + ", first value=" + first + ", second value=" + second);
232          }
233          fail();
234        }
235      }
236    } finally {
237      scanner.close();
238    }
239  }
240
241}