001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Set; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileUtil; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.MapReduceTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.mapred.JobClient; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapred.MapReduceBase; 044import org.apache.hadoop.mapred.OutputCollector; 045import org.apache.hadoop.mapred.Reporter; 046import org.apache.hadoop.mapred.RunningJob; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 057 058@Tag(MapReduceTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestTableMapReduceUtil { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestTableMapReduceUtil.class); 063 064 private static Table presidentsTable; 065 private static final String TABLE_NAME = "People"; 066 067 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); 068 private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); 069 070 private static ImmutableSet<String> presidentsRowKeys = 071 ImmutableSet.of("president1", "president2", "president3"); 072 private static Iterator<String> presidentNames = 073 ImmutableSet.of("John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); 074 075 private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", "actor2"); 076 private static Iterator<String> actorNames = 077 ImmutableSet.of("Jack Nicholson", "Martin Freeman").iterator(); 078 079 private static String PRESIDENT_PATTERN = "president"; 080 private static String ACTOR_PATTERN = "actor"; 081 private static ImmutableMap<String, ImmutableSet<String>> relation = 082 ImmutableMap.of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); 083 084 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 085 086 @BeforeAll 087 public static void beforeClass() throws Exception { 088 UTIL.startMiniCluster(); 089 presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME)); 090 } 091 092 @AfterAll 093 public static void afterClass() throws Exception { 094 UTIL.shutdownMiniCluster(); 095 } 096 097 @BeforeEach 098 public void before() throws IOException { 099 LOG.info("before"); 100 UTIL.ensureSomeRegionServersAvailable(1); 101 LOG.info("before done"); 102 } 103 104 public static Table createAndFillTable(TableName tableName) throws IOException { 105 Table table = UTIL.createTable(tableName, COLUMN_FAMILY); 106 createPutCommand(table); 107 return table; 108 } 109 110 private static void createPutCommand(Table table) throws IOException { 111 for (String president : presidentsRowKeys) { 112 if (presidentNames.hasNext()) { 113 Put p = new Put(Bytes.toBytes(president)); 114 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next())); 115 table.put(p); 116 } 117 } 118 119 for (String actor : actorsRowKeys) { 120 if (actorNames.hasNext()) { 121 Put p = new Put(Bytes.toBytes(actor)); 122 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); 123 table.put(p); 124 } 125 } 126 } 127 128 /** 129 * Check what the given number of reduce tasks for the given job configuration does not exceed the 130 * number of regions for the given table. 131 */ 132 @Test 133 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() throws IOException { 134 assertNotNull(presidentsTable); 135 Configuration cfg = UTIL.getConfiguration(); 136 JobConf jobConf = new JobConf(cfg); 137 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 138 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 139 TableMapReduceUtil.setScannerCaching(jobConf, 100); 140 assertEquals(1, jobConf.getNumReduceTasks()); 141 assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); 142 143 jobConf.setNumReduceTasks(10); 144 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 145 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); 146 assertEquals(1, jobConf.getNumReduceTasks()); 147 } 148 149 @Test 150 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() throws IOException { 151 Configuration cfg = UTIL.getConfiguration(); 152 JobConf jobConf = new JobConf(cfg); 153 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); 154 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 155 assertEquals(1, jobConf.getNumMapTasks()); 156 157 jobConf.setNumMapTasks(10); 158 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); 159 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); 160 assertEquals(1, jobConf.getNumMapTasks()); 161 } 162 163 @Test 164 @SuppressWarnings("deprecation") 165 public void shoudBeValidMapReduceEvaluation() throws Exception { 166 Configuration cfg = UTIL.getConfiguration(); 167 JobConf jobConf = new JobConf(cfg); 168 try { 169 jobConf.setJobName("process row task"); 170 jobConf.setNumReduceTasks(1); 171 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 172 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf); 173 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf); 174 RunningJob job = JobClient.runJob(jobConf); 175 assertTrue(job.isSuccessful()); 176 } finally { 177 if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 178 } 179 } 180 181 @Test 182 @SuppressWarnings("deprecation") 183 public void shoudBeValidMapReduceWithPartitionerEvaluation() throws IOException { 184 Configuration cfg = UTIL.getConfiguration(); 185 JobConf jobConf = new JobConf(cfg); 186 try { 187 jobConf.setJobName("process row task"); 188 jobConf.setNumReduceTasks(2); 189 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), 190 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, jobConf); 191 192 TableMapReduceUtil.initTableReduceJob(TABLE_NAME, ClassificatorRowReduce.class, jobConf, 193 HRegionPartitioner.class); 194 RunningJob job = JobClient.runJob(jobConf); 195 assertTrue(job.isSuccessful()); 196 } finally { 197 if (jobConf != null) FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); 198 } 199 } 200 201 @SuppressWarnings("deprecation") 202 static class ClassificatorRowReduce extends MapReduceBase 203 implements TableReduce<ImmutableBytesWritable, Put> { 204 205 @Override 206 public void reduce(ImmutableBytesWritable key, Iterator<Put> values, 207 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) throws IOException { 208 String strKey = Bytes.toString(key.get()); 209 List<Put> result = new ArrayList<>(); 210 while (values.hasNext()) 211 result.add(values.next()); 212 213 if (relation.keySet().contains(strKey)) { 214 Set<String> set = relation.get(strKey); 215 if (set != null) { 216 assertEquals(set.size(), result.size()); 217 } else { 218 throwAccertionError("Test infrastructure error: set is null"); 219 } 220 } else { 221 throwAccertionError("Test infrastructure error: key not found in map"); 222 } 223 } 224 225 private void throwAccertionError(String errorMessage) throws AssertionError { 226 throw new AssertionError(errorMessage); 227 } 228 } 229 230 @SuppressWarnings("deprecation") 231 static class ClassificatorMapper extends MapReduceBase 232 implements TableMap<ImmutableBytesWritable, Put> { 233 234 @Override 235 public void map(ImmutableBytesWritable row, Result result, 236 OutputCollector<ImmutableBytesWritable, Put> outCollector, Reporter reporter) 237 throws IOException { 238 String rowKey = Bytes.toString(result.getRow()); 239 final ImmutableBytesWritable pKey = 240 new ImmutableBytesWritable(Bytes.toBytes(PRESIDENT_PATTERN)); 241 final ImmutableBytesWritable aKey = new ImmutableBytesWritable(Bytes.toBytes(ACTOR_PATTERN)); 242 ImmutableBytesWritable outKey = null; 243 244 if (rowKey.startsWith(PRESIDENT_PATTERN)) { 245 outKey = pKey; 246 } else if (rowKey.startsWith(ACTOR_PATTERN)) { 247 outKey = aKey; 248 } else { 249 throw new AssertionError("unexpected rowKey"); 250 } 251 252 String name = Bytes.toString(result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER)); 253 outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).addColumn(COLUMN_FAMILY, 254 COLUMN_QUALIFIER, Bytes.toBytes(name))); 255 } 256 } 257}