001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileUtil; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.MapReduceTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.mapred.JobClient; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapred.MapReduceBase; 044import org.apache.hadoop.mapred.OutputCollector; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.junit.AfterClass; 048import org.junit.Assert; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 059 060@Category({ MapReduceTests.class, LargeTests.class }) 061public class TestTableMapReduceUtil { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduceUtil.class); 068 069 private static Table presidentsTable; 070 private static final String TABLE_NAME = "People"; 071 072 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); 073 private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); 074 075 private static ImmutableSet<String> presidentsRowKeys = 076 ImmutableSet.of("president1", "president2", "president3"); 077 private static Iterator<String> presidentNames = 078 ImmutableSet.of("John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); 079 080 private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", "actor2"); 081 private static Iterator<String> actorNames = 082 ImmutableSet.of("Jack Nicholson", "Martin Freeman").iterator(); 083 084 private static String PRESIDENT_PATTERN = "president"; 085 private static String ACTOR_PATTERN = "actor"; 086 private static ImmutableMap<String, ImmutableSet<String>> relation = 087 ImmutableMap.of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); 088 089 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 090 091 @BeforeClass 092 public static void beforeClass() throws Exception { 093 UTIL.startMiniCluster(); 094 presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME)); 095 } 096 097 @AfterClass 098 public static void afterClass() throws Exception { 099 UTIL.shutdownMiniCluster(); 100 } 101 102 @Before 103 public void before() throws IOException { 104 LOG.info("before"); 105 UTIL.ensureSomeRegionServersAvailable(1); 106 LOG.info("before done"); 107 } 108 109 public static Table createAndFillTable(TableName tableName) throws IOException { 110 Table table = UTIL.createTable(tableName, COLUMN_FAMILY); 111 createPutCommand(table); 112 return table; 113 } 114 115 private static void createPutCommand(Table table) throws IOException { 116 for (String president : presidentsRowKeys) { 117 if (presidentNames.hasNext()) { 118 Put p = new Put(Bytes.toBytes(president)); 119 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next())); 120 table.put(p); 121 } 122 } 123 124 for (String actor : actorsRowKeys) { 125 if (actorNames.hasNext()) { 126 Put p = new Put(Bytes.toBytes(actor)); 127 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); 128 table.put(p); 129 } 130 } 131 } 132 133 /** 134 * Check what the given number of reduce tasks for the given job configuration does not exceed the 135 * number of regions for the given table. 136 */ 137 @Test 138 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() throws IOException { 139 Assert.assertNotNull(presidentsTable); 140 Configuration cfg = UTIL.getConfiguration(); 141 JobConf jobConf = new JobConf(cfg); 142 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 143 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 144 TableMapReduceUtil.setScannerCaching(jobConf, 100); 145 assertEquals(1, jobConf.getNumReduceTasks()); 146 assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); 147 148 jobConf.setNumReduceTasks(10); 149 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 150 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 151 assertEquals(1, jobConf.getNumReduceTasks()); 152 } 153 154 @Test 155 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() throws IOException { 156 Configuration cfg = UTIL.getConfiguration(); 157 JobConf jobConf = new JobConf(cfg); 158 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 159 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 160 assertEquals(1, jobConf.getNumMapTasks()); 161 162 jobConf.setNumMapTasks(10); 163 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 164 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 165 assertEquals(1, jobConf.getNumMapTasks()); 166 } 167 168 @Test 169 @SuppressWarnings("deprecation") 170 public void shoudBeValidMapReduceEvaluation() throws Exception { 171 Configuration cfg = UTIL.getConfiguration(); 172 JobConf jobConf = new JobConf(cfg); 173 try { 174 jobConf.setJobName("process row task"); 175 jobConf.setNumReduceTasks(1); 176 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 177 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf); 178 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf); 179 RunningJob job = JobClient.runJob(jobConf); 180 assertTrue(job.isSuccessful()); 181 } finally { 182 if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 183 } 184 } 185 186 @Test 187 @SuppressWarnings("deprecation") 188 public void shoudBeValidMapReduceWithPartitionerEvaluation() throws IOException { 189 Configuration cfg = UTIL.getConfiguration(); 190 JobConf jobConf = new JobConf(cfg); 191 try { 192 jobConf.setJobName("process row task"); 193 jobConf.setNumReduceTasks(2); 194 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 195 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf); 196 197 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf, 198 HRegionPartitioner.class); 199 RunningJob job = JobClient.runJob(jobConf); 200 assertTrue(job.isSuccessful()); 201 } finally { 202 if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 203 } 204 } 205 206 @SuppressWarnings("deprecation") 207 static class ClassificatorRowReduce extends MapReduceBase 208 implements TableReduce<ImmutableBytesWritable, Put> { 209 210 @Override 211 public void reduce(ImmutableBytesWritable key, Iterator<Put> values, 212 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) throws IOException { 213 String strKey = Bytes.toString(key.get()); 214 List<Put> result = new ArrayList<>(); 215 while (values.hasNext()) 216 result.add(values.next()); 217 218 if (relation.keySet().contains(strKey)) { 219 Set<String> set = relation.get(strKey); 220 if (set != null) { 221 assertEquals(set.size(), result.size()); 222 } else { 223 throwAccertionError("Test infrastructure error: set is null"); 224 } 225 } else { 226 throwAccertionError("Test infrastructure error: key not found in map"); 227 } 228 } 229 230 private void throwAccertionError(String errorMessage) throws AssertionError { 231 throw new AssertionError(errorMessage); 232 } 233 } 234 235 @SuppressWarnings("deprecation") 236 static class ClassificatorMapper extends MapReduceBase 237 implements TableMap<ImmutableBytesWritable, Put> { 238 239 @Override 240 public void map(ImmutableBytesWritable row, Result result, 241 OutputCollector<ImmutableBytesWritable, Put> outCollector, Reporter reporter) 242 throws IOException { 243 String rowKey = Bytes.toString(result.getRow()); 244 final ImmutableBytesWritable pKey = 245 new ImmutableBytesWritable(Bytes.toBytes(PRESIDENT_PATTERN)); 246 final ImmutableBytesWritable aKey = new ImmutableBytesWritable(Bytes.toBytes(ACTOR_PATTERN)); 247 ImmutableBytesWritable outKey = null; 248 249 if (rowKey.startsWith(PRESIDENT_PATTERN)) { 250 outKey = pKey; 251 } else if (rowKey.startsWith(ACTOR_PATTERN)) { 252 outKey = aKey; 253 } else { 254 throw new AssertionError("unexpected rowKey"); 255 } 256 257 String name = Bytes.toString(result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER)); 258 outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).addColumn(COLUMN_FAMILY, 259 COLUMN_QUALIFIER, Bytes.toBytes(name))); 260 } 261 } 262}