001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileUtil; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.MapReduceTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.mapred.JobClient; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapred.MapReduceBase; 044import org.apache.hadoop.mapred.OutputCollector; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.junit.AfterClass; 048import org.junit.Assert; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 059 060@Category({MapReduceTests.class, LargeTests.class}) 061public class TestTableMapReduceUtil { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 066 067 private static final Logger LOG = LoggerFactory 068 .getLogger(TestTableMapReduceUtil.class); 069 070 private static Table presidentsTable; 071 private static final String TABLE_NAME = "People"; 072 073 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); 074 private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); 075 076 private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of( 077 "president1", "president2", "president3"); 078 private static Iterator<String> presidentNames = ImmutableSet.of( 079 "John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); 080 081 private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", 082 "actor2"); 083 private static Iterator<String> actorNames = ImmutableSet.of( 084 "Jack Nicholson", "Martin Freeman").iterator(); 085 086 private static String PRESIDENT_PATTERN = "president"; 087 private static String ACTOR_PATTERN = "actor"; 088 private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap 089 .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); 090 091 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 UTIL.startMiniCluster(); 096 presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME)); 097 } 098 099 @AfterClass 100 public static void afterClass() throws Exception { 101 UTIL.shutdownMiniCluster(); 102 } 103 104 @Before 105 public void before() throws IOException { 106 LOG.info("before"); 107 UTIL.ensureSomeRegionServersAvailable(1); 108 LOG.info("before done"); 109 } 110 111 public static Table createAndFillTable(TableName tableName) throws IOException { 112 Table table = UTIL.createTable(tableName, COLUMN_FAMILY); 113 createPutCommand(table); 114 return table; 115 } 116 117 private static void createPutCommand(Table table) throws IOException { 118 for (String president : presidentsRowKeys) { 119 if (presidentNames.hasNext()) { 120 Put p = new Put(Bytes.toBytes(president)); 121 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next())); 122 table.put(p); 123 } 124 } 125 126 for (String actor : actorsRowKeys) { 127 if (actorNames.hasNext()) { 128 Put p = new Put(Bytes.toBytes(actor)); 129 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); 130 table.put(p); 131 } 132 } 133 } 134 135 /** 136 * Check what the given number of reduce tasks for the given job configuration 137 * does not exceed the number of regions for the given table. 138 */ 139 @Test 140 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() 141 throws IOException { 142 Assert.assertNotNull(presidentsTable); 143 Configuration cfg = UTIL.getConfiguration(); 144 JobConf jobConf = new JobConf(cfg); 145 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 146 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 147 TableMapReduceUtil.setScannerCaching(jobConf, 100); 148 assertEquals(1, jobConf.getNumReduceTasks()); 149 assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); 150 151 jobConf.setNumReduceTasks(10); 152 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 153 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 154 assertEquals(1, jobConf.getNumReduceTasks()); 155 } 156 157 @Test 158 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() 159 throws IOException { 160 Configuration cfg = UTIL.getConfiguration(); 161 JobConf jobConf = new JobConf(cfg); 162 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 163 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 164 assertEquals(1, jobConf.getNumMapTasks()); 165 166 jobConf.setNumMapTasks(10); 167 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 168 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 169 assertEquals(1, jobConf.getNumMapTasks()); 170 } 171 172 @Test 173 @SuppressWarnings("deprecation") 174 public void shoudBeValidMapReduceEvaluation() throws Exception { 175 Configuration cfg = UTIL.getConfiguration(); 176 JobConf jobConf = new JobConf(cfg); 177 try { 178 jobConf.setJobName("process row task"); 179 jobConf.setNumReduceTasks(1); 180 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 181 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, 182 jobConf); 183 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, 184 ClassificatorRowReduce.class, jobConf); 185 RunningJob job = JobClient.runJob(jobConf); 186 assertTrue(job.isSuccessful()); 187 } finally { 188 if (jobConf != null) 189 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 190 } 191 } 192 193 @Test 194 @SuppressWarnings("deprecation") 195 public void shoudBeValidMapReduceWithPartitionerEvaluation() 196 throws IOException { 197 Configuration cfg = UTIL.getConfiguration(); 198 JobConf jobConf = new JobConf(cfg); 199 try { 200 jobConf.setJobName("process row task"); 201 jobConf.setNumReduceTasks(2); 202 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 203 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, 204 jobConf); 205 206 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, 207 ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class); 208 RunningJob job = JobClient.runJob(jobConf); 209 assertTrue(job.isSuccessful()); 210 } finally { 211 if (jobConf != null) 212 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 213 } 214 } 215 216 @SuppressWarnings("deprecation") 217 static class ClassificatorRowReduce extends MapReduceBase implements 218 TableReduce<ImmutableBytesWritable, Put> { 219 220 @Override 221 public void reduce(ImmutableBytesWritable key, Iterator<Put> values, 222 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) 223 throws IOException { 224 String strKey = Bytes.toString(key.get()); 225 List<Put> result = new ArrayList<>(); 226 while (values.hasNext()) 227 result.add(values.next()); 228 229 if (relation.keySet().contains(strKey)) { 230 Set<String> set = relation.get(strKey); 231 if (set != null) { 232 assertEquals(set.size(), result.size()); 233 } else { 234 throwAccertionError("Test infrastructure error: set is null"); 235 } 236 } else { 237 throwAccertionError("Test infrastructure error: key not found in map"); 238 } 239 } 240 241 private void throwAccertionError(String errorMessage) throws AssertionError { 242 throw new AssertionError(errorMessage); 243 } 244 } 245 246 @SuppressWarnings("deprecation") 247 static class ClassificatorMapper extends MapReduceBase implements 248 TableMap<ImmutableBytesWritable, Put> { 249 250 @Override 251 public void map(ImmutableBytesWritable row, Result result, 252 OutputCollector<ImmutableBytesWritable, Put> outCollector, 253 Reporter reporter) throws IOException { 254 String rowKey = Bytes.toString(result.getRow()); 255 final ImmutableBytesWritable pKey = new ImmutableBytesWritable( 256 Bytes.toBytes(PRESIDENT_PATTERN)); 257 final ImmutableBytesWritable aKey = new ImmutableBytesWritable( 258 Bytes.toBytes(ACTOR_PATTERN)); 259 ImmutableBytesWritable outKey = null; 260 261 if (rowKey.startsWith(PRESIDENT_PATTERN)) { 262 outKey = pKey; 263 } else if (rowKey.startsWith(ACTOR_PATTERN)) { 264 outKey = aKey; 265 } else { 266 throw new AssertionError("unexpected rowKey"); 267 } 268 269 String name = Bytes.toString(result.getValue(COLUMN_FAMILY, 270 COLUMN_QUALIFIER)); 271 outCollector.collect(outKey, 272 new Put(Bytes.toBytes("rowKey2")) 273 .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name))); 274 } 275 } 276}