001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileUtil;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.MapReduceTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.mapred.JobClient;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.MapReduceBase;
044import org.apache.hadoop.mapred.OutputCollector;
045import org.apache.hadoop.mapred.Reporter;
046import org.apache.hadoop.mapred.RunningJob;
047import org.junit.AfterClass;
048import org.junit.Assert;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
059
060@Category({MapReduceTests.class, LargeTests.class})
061public class TestTableMapReduceUtil {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
066
067  private static final Logger LOG = LoggerFactory
068      .getLogger(TestTableMapReduceUtil.class);
069
070  private static Table presidentsTable;
071  private static final String TABLE_NAME = "People";
072
073  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
074  private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
075
076  private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
077      "president1", "president2", "president3");
078  private static Iterator<String> presidentNames = ImmutableSet.of(
079      "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
080
081  private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
082      "actor2");
083  private static Iterator<String> actorNames = ImmutableSet.of(
084      "Jack Nicholson", "Martin Freeman").iterator();
085
086  private static String PRESIDENT_PATTERN = "president";
087  private static String ACTOR_PATTERN = "actor";
088  private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
089      .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
090
091  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
092
093  @BeforeClass
094  public static void beforeClass() throws Exception {
095    UTIL.startMiniCluster();
096    presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
097  }
098
099  @AfterClass
100  public static void afterClass() throws Exception {
101    UTIL.shutdownMiniCluster();
102  }
103
104  @Before
105  public void before() throws IOException {
106    LOG.info("before");
107    UTIL.ensureSomeRegionServersAvailable(1);
108    LOG.info("before done");
109  }
110
111  public static Table createAndFillTable(TableName tableName) throws IOException {
112    Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
113    createPutCommand(table);
114    return table;
115  }
116
117  private static void createPutCommand(Table table) throws IOException {
118    for (String president : presidentsRowKeys) {
119      if (presidentNames.hasNext()) {
120        Put p = new Put(Bytes.toBytes(president));
121        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
122        table.put(p);
123      }
124    }
125
126    for (String actor : actorsRowKeys) {
127      if (actorNames.hasNext()) {
128        Put p = new Put(Bytes.toBytes(actor));
129        p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
130        table.put(p);
131      }
132    }
133  }
134
135  /**
136   * Check what the given number of reduce tasks for the given job configuration
137   * does not exceed the number of regions for the given table.
138   */
139  @Test
140  public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
141      throws IOException {
142    Assert.assertNotNull(presidentsTable);
143    Configuration cfg = UTIL.getConfiguration();
144    JobConf jobConf = new JobConf(cfg);
145    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
146    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
147    TableMapReduceUtil.setScannerCaching(jobConf, 100);
148    assertEquals(1, jobConf.getNumReduceTasks());
149    assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
150
151    jobConf.setNumReduceTasks(10);
152    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
153    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
154    assertEquals(1, jobConf.getNumReduceTasks());
155  }
156
157  @Test
158  public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
159      throws IOException {
160    Configuration cfg = UTIL.getConfiguration();
161    JobConf jobConf = new JobConf(cfg);
162    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
163    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
164    assertEquals(1, jobConf.getNumMapTasks());
165
166    jobConf.setNumMapTasks(10);
167    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
168    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
169    assertEquals(1, jobConf.getNumMapTasks());
170  }
171
172  @Test
173  @SuppressWarnings("deprecation")
174  public void shoudBeValidMapReduceEvaluation() throws Exception {
175    Configuration cfg = UTIL.getConfiguration();
176    JobConf jobConf = new JobConf(cfg);
177    try {
178      jobConf.setJobName("process row task");
179      jobConf.setNumReduceTasks(1);
180      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
181          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
182          jobConf);
183      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
184          ClassificatorRowReduce.class, jobConf);
185      RunningJob job = JobClient.runJob(jobConf);
186      assertTrue(job.isSuccessful());
187    } finally {
188      if (jobConf != null)
189        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
190    }
191  }
192
193  @Test
194  @SuppressWarnings("deprecation")
195  public void shoudBeValidMapReduceWithPartitionerEvaluation()
196      throws IOException {
197    Configuration cfg = UTIL.getConfiguration();
198    JobConf jobConf = new JobConf(cfg);
199    try {
200      jobConf.setJobName("process row task");
201      jobConf.setNumReduceTasks(2);
202      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
203          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
204          jobConf);
205
206      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
207          ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
208      RunningJob job = JobClient.runJob(jobConf);
209      assertTrue(job.isSuccessful());
210    } finally {
211      if (jobConf != null)
212        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
213    }
214  }
215
216  @SuppressWarnings("deprecation")
217  static class ClassificatorRowReduce extends MapReduceBase implements
218      TableReduce<ImmutableBytesWritable, Put> {
219
220    @Override
221    public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
222        OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
223        throws IOException {
224      String strKey = Bytes.toString(key.get());
225      List<Put> result = new ArrayList<>();
226      while (values.hasNext())
227        result.add(values.next());
228
229      if (relation.keySet().contains(strKey)) {
230        Set<String> set = relation.get(strKey);
231        if (set != null) {
232          assertEquals(set.size(), result.size());
233        } else {
234          throwAccertionError("Test infrastructure error: set is null");
235        }
236      } else {
237        throwAccertionError("Test infrastructure error: key not found in map");
238      }
239    }
240
241    private void throwAccertionError(String errorMessage) throws AssertionError {
242      throw new AssertionError(errorMessage);
243    }
244  }
245
246  @SuppressWarnings("deprecation")
247  static class ClassificatorMapper extends MapReduceBase implements
248      TableMap<ImmutableBytesWritable, Put> {
249
250    @Override
251    public void map(ImmutableBytesWritable row, Result result,
252        OutputCollector<ImmutableBytesWritable, Put> outCollector,
253        Reporter reporter) throws IOException {
254      String rowKey = Bytes.toString(result.getRow());
255      final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
256          Bytes.toBytes(PRESIDENT_PATTERN));
257      final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
258          Bytes.toBytes(ACTOR_PATTERN));
259      ImmutableBytesWritable outKey = null;
260
261      if (rowKey.startsWith(PRESIDENT_PATTERN)) {
262        outKey = pKey;
263      } else if (rowKey.startsWith(ACTOR_PATTERN)) {
264        outKey = aKey;
265      } else {
266        throw new AssertionError("unexpected rowKey");
267      }
268
269      String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
270          COLUMN_QUALIFIER));
271      outCollector.collect(outKey,
272              new Put(Bytes.toBytes("rowKey2"))
273              .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
274    }
275  }
276}