001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Set;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileUtil;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.MapReduceTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.mapred.JobClient;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.MapReduceBase;
044import org.apache.hadoop.mapred.OutputCollector;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
057
058@Tag(MapReduceTests.TAG)
059@Tag(LargeTests.TAG)
060public class TestTableMapReduceUtil {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduceUtil.class);
063
064  private static Table presidentsTable;
065  private static final String TABLE_NAME = "People";
066
067  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
068  private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
069
070  private static ImmutableSet<String> presidentsRowKeys =
071    ImmutableSet.of("president1", "president2", "president3");
072  private static Iterator<String> presidentNames =
073    ImmutableSet.of("John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
074
075  private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", "actor2");
076  private static Iterator<String> actorNames =
077    ImmutableSet.of("Jack Nicholson", "Martin Freeman").iterator();
078
079  private static String PRESIDENT_PATTERN = "president";
080  private static String ACTOR_PATTERN = "actor";
081  private static ImmutableMap<String, ImmutableSet<String>> relation =
082    ImmutableMap.of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
083
084  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
085
086  @BeforeAll
087  public static void beforeClass() throws Exception {
088    UTIL.startMiniCluster();
089    presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
090  }
091
092  @AfterAll
093  public static void afterClass() throws Exception {
094    UTIL.shutdownMiniCluster();
095  }
096
097  @BeforeEach
098  public void before() throws IOException {
099    LOG.info("before");
100    UTIL.ensureSomeRegionServersAvailable(1);
101    LOG.info("before done");
102  }
103
104  public static Table createAndFillTable(TableName tableName) throws IOException {
105    Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
106    createPutCommand(table);
107    return table;
108  }
109
110  private static void createPutCommand(Table table) throws IOException {
111    for (String president : presidentsRowKeys) {
112      if (presidentNames.hasNext()) {
113        Put p = new Put(Bytes.toBytes(president));
114        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
115        table.put(p);
116      }
117    }
118
119    for (String actor : actorsRowKeys) {
120      if (actorNames.hasNext()) {
121        Put p = new Put(Bytes.toBytes(actor));
122        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
123        table.put(p);
124      }
125    }
126  }
127
128  /**
129   * Check what the given number of reduce tasks for the given job configuration does not exceed the
130   * number of regions for the given table.
131   */
132  @Test
133  public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() throws IOException {
134    assertNotNull(presidentsTable);
135    Configuration cfg = UTIL.getConfiguration();
136    JobConf jobConf = new JobConf(cfg);
137    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
138    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
139    TableMapReduceUtil.setScannerCaching(jobConf, 100);
140    assertEquals(1, jobConf.getNumReduceTasks());
141    assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
142
143    jobConf.setNumReduceTasks(10);
144    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
145    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
146    assertEquals(1, jobConf.getNumReduceTasks());
147  }
148
149  @Test
150  public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() throws IOException {
151    Configuration cfg = UTIL.getConfiguration();
152    JobConf jobConf = new JobConf(cfg);
153    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
154    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
155    assertEquals(1, jobConf.getNumMapTasks());
156
157    jobConf.setNumMapTasks(10);
158    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
159    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
160    assertEquals(1, jobConf.getNumMapTasks());
161  }
162
163  @Test
164  @SuppressWarnings("deprecation")
165  public void shoudBeValidMapReduceEvaluation() throws Exception {
166    Configuration cfg = UTIL.getConfiguration();
167    JobConf jobConf = new JobConf(cfg);
168    try {
169      jobConf.setJobName("process row task");
170      jobConf.setNumReduceTasks(1);
171      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
172        ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf);
173      TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf);
174      RunningJob job = JobClient.runJob(jobConf);
175      assertTrue(job.isSuccessful());
176    } finally {
177      if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
178    }
179  }
180
181  @Test
182  @SuppressWarnings("deprecation")
183  public void shoudBeValidMapReduceWithPartitionerEvaluation() throws IOException {
184    Configuration cfg = UTIL.getConfiguration();
185    JobConf jobConf = new JobConf(cfg);
186    try {
187      jobConf.setJobName("process row task");
188      jobConf.setNumReduceTasks(2);
189      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
190        ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf);
191
192      TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf,
193        HRegionPartitioner.class);
194      RunningJob job = JobClient.runJob(jobConf);
195      assertTrue(job.isSuccessful());
196    } finally {
197      if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
198    }
199  }
200
201  @SuppressWarnings("deprecation")
202  static class ClassificatorRowReduce extends MapReduceBase
203    implements TableReduce<ImmutableBytesWritable, Put> {
204
205    @Override
206    public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
207      OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) throws IOException {
208      String strKey = Bytes.toString(key.get());
209      List<Put> result = new ArrayList<>();
210      while (values.hasNext())
211        result.add(values.next());
212
213      if (relation.keySet().contains(strKey)) {
214        Set<String> set = relation.get(strKey);
215        if (set != null) {
216          assertEquals(set.size(), result.size());
217        } else {
218          throwAccertionError("Test infrastructure error: set is null");
219        }
220      } else {
221        throwAccertionError("Test infrastructure error: key not found in map");
222      }
223    }
224
225    private void throwAccertionError(String errorMessage) throws AssertionError {
226      throw new AssertionError(errorMessage);
227    }
228  }
229
230  @SuppressWarnings("deprecation")
231  static class ClassificatorMapper extends MapReduceBase
232    implements TableMap<ImmutableBytesWritable, Put> {
233
234    @Override
235    public void map(ImmutableBytesWritable row, Result result,
236      OutputCollector<ImmutableBytesWritable, Put> outCollector, Reporter reporter)
237      throws IOException {
238      String rowKey = Bytes.toString(result.getRow());
239      final ImmutableBytesWritable pKey =
240        new ImmutableBytesWritable(Bytes.toBytes(PRESIDENT_PATTERN));
241      final ImmutableBytesWritable aKey = new ImmutableBytesWritable(Bytes.toBytes(ACTOR_PATTERN));
242      ImmutableBytesWritable outKey = null;
243
244      if (rowKey.startsWith(PRESIDENT_PATTERN)) {
245        outKey = pKey;
246      } else if (rowKey.startsWith(ACTOR_PATTERN)) {
247        outKey = aKey;
248      } else {
249        throw new AssertionError("unexpected rowKey");
250      }
251
252      String name = Bytes.toString(result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER));
253      outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).addColumn(COLUMN_FAMILY,
254        COLUMN_QUALIFIER, Bytes.toBytes(name)));
255    }
256  }
257}