001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.lang.reflect.Method; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 028import org.apache.hadoop.mapreduce.Counter; 029import org.apache.hadoop.mapreduce.InputSplit; 030import org.apache.hadoop.mapreduce.Job; 031import org.apache.hadoop.mapreduce.JobContext; 032import org.apache.hadoop.mapreduce.MapContext; 033import org.apache.hadoop.mapreduce.Mapper; 034import org.apache.hadoop.mapreduce.OutputCommitter; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.RecordWriter; 037import org.apache.hadoop.mapreduce.StatusReporter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040import org.apache.hadoop.util.ReflectionUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper 047 * <p> 048 * It can be used instead when the Map operation is not CPU bound in order to improve throughput. 049 * <p> 050 * Mapper implementations using this MapRunnable must be thread-safe. 051 * <p> 052 * The Map-Reduce job has to be configured with the mapper to use via {@link #setMapperClass} and 053 * the number of thread the thread-pool can use with the {@link #getNumberOfThreads} method. The 054 * default value is 10 threads. 055 * <p> 056 */ 057 058@InterfaceAudience.Private 059public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> { 060 private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTableMapper.class); 061 private Class<? extends Mapper<ImmutableBytesWritable, Result, K2, V2>> mapClass; 062 private Context outer; 063 private ExecutorService executor; 064 public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads"; 065 public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass"; 066 067 /** 068 * The number of threads in the thread pool that will run the map function. 069 * @param job the job 070 * @return the number of threads 071 */ 072 public static int getNumberOfThreads(JobContext job) { 073 return job.getConfiguration().getInt(NUMBER_OF_THREADS, 10); 074 } 075 076 /** 077 * Set the number of threads in the pool for running maps. 078 * @param job the job to modify 079 * @param threads the new number of threads 080 */ 081 public static void setNumberOfThreads(Job job, int threads) { 082 job.getConfiguration().setInt(NUMBER_OF_THREADS, threads); 083 } 084 085 /** 086 * Get the application's mapper class. 087 * @param <K2> the map's output key type 088 * @param <V2> the map's output value type 089 * @param job the job 090 * @return the mapper class to run 091 */ 092 @SuppressWarnings("unchecked") 093 public static <K2, V2> Class<Mapper<ImmutableBytesWritable, Result, K2, V2>> 094 getMapperClass(JobContext job) { 095 return (Class<Mapper<ImmutableBytesWritable, Result, K2, V2>>) job.getConfiguration() 096 .getClass(MAPPER_CLASS, Mapper.class); 097 } 098 099 /** 100 * Set the application's mapper class. 101 * @param <K2> the map output key type 102 * @param <V2> the map output value type 103 * @param job the job to modify 104 * @param cls the class to use as the mapper 105 */ 106 public static <K2, V2> void setMapperClass(Job job, 107 Class<? extends Mapper<ImmutableBytesWritable, Result, K2, V2>> cls) { 108 if (MultithreadedTableMapper.class.isAssignableFrom(cls)) { 109 throw new IllegalArgumentException( 110 "Can't have recursive " + "MultithreadedTableMapper instances."); 111 } 112 job.getConfiguration().setClass(MAPPER_CLASS, cls, Mapper.class); 113 } 114 115 /** 116 * Run the application's maps using a thread pool. 117 */ 118 @Override 119 public void run(Context context) throws IOException, InterruptedException { 120 outer = context; 121 int numberOfThreads = getNumberOfThreads(context); 122 mapClass = getMapperClass(context); 123 if (LOG.isDebugEnabled()) { 124 LOG.debug("Configuring multithread runner to use " + numberOfThreads + " threads"); 125 } 126 executor = Executors.newFixedThreadPool(numberOfThreads); 127 for (int i = 0; i < numberOfThreads; ++i) { 128 MapRunner thread = new MapRunner(context); 129 executor.execute(thread); 130 } 131 executor.shutdown(); 132 while (!executor.isTerminated()) { 133 // wait till all the threads are done 134 Thread.sleep(1000); 135 } 136 } 137 138 private class SubMapRecordReader extends RecordReader<ImmutableBytesWritable, Result> { 139 private ImmutableBytesWritable key; 140 private Result value; 141 private Configuration conf; 142 143 @Override 144 public void close() throws IOException { 145 } 146 147 @Override 148 public float getProgress() throws IOException, InterruptedException { 149 return 0; 150 } 151 152 @Override 153 public void initialize(InputSplit split, TaskAttemptContext context) 154 throws IOException, InterruptedException { 155 conf = context.getConfiguration(); 156 } 157 158 @Override 159 public boolean nextKeyValue() throws IOException, InterruptedException { 160 synchronized (outer) { 161 if (!outer.nextKeyValue()) { 162 return false; 163 } 164 key = ReflectionUtils.copy(outer.getConfiguration(), outer.getCurrentKey(), key); 165 value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); 166 return true; 167 } 168 } 169 170 public ImmutableBytesWritable getCurrentKey() { 171 return key; 172 } 173 174 @Override 175 public Result getCurrentValue() { 176 return value; 177 } 178 } 179 180 private class SubMapRecordWriter extends RecordWriter<K2, V2> { 181 182 @Override 183 public void close(TaskAttemptContext context) throws IOException, InterruptedException { 184 } 185 186 @Override 187 public void write(K2 key, V2 value) throws IOException, InterruptedException { 188 synchronized (outer) { 189 outer.write(key, value); 190 } 191 } 192 } 193 194 private class SubMapStatusReporter extends StatusReporter { 195 196 @Override 197 public Counter getCounter(Enum<?> name) { 198 return outer.getCounter(name); 199 } 200 201 @Override 202 public Counter getCounter(String group, String name) { 203 return outer.getCounter(group, name); 204 } 205 206 @Override 207 public void progress() { 208 outer.progress(); 209 } 210 211 @Override 212 public void setStatus(String status) { 213 outer.setStatus(status); 214 } 215 216 public float getProgress() { 217 return 0; 218 } 219 } 220 221 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "REC_CATCH_EXCEPTION", 222 justification = "Don't understand why FB is complaining about this one." 223 + " We do throw exception") 224 private class MapRunner implements Runnable { 225 private Mapper<ImmutableBytesWritable, Result, K2, V2> mapper; 226 private Context subcontext; 227 228 @SuppressWarnings({ "rawtypes", "unchecked" }) 229 MapRunner(Context context) throws IOException, InterruptedException { 230 mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration()); 231 try { 232 Constructor c = context.getClass().getConstructor(Mapper.class, Configuration.class, 233 TaskAttemptID.class, RecordReader.class, RecordWriter.class, OutputCommitter.class, 234 StatusReporter.class, InputSplit.class); 235 c.setAccessible(true); 236 subcontext = (Context) c.newInstance(mapper, outer.getConfiguration(), 237 outer.getTaskAttemptID(), new SubMapRecordReader(), new SubMapRecordWriter(), 238 context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); 239 } catch (Exception e) { 240 try { 241 Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl") 242 .getConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, 243 RecordWriter.class, OutputCommitter.class, StatusReporter.class, InputSplit.class); 244 c.setAccessible(true); 245 MapContext mc = (MapContext) c.newInstance(outer.getConfiguration(), 246 outer.getTaskAttemptID(), new SubMapRecordReader(), new SubMapRecordWriter(), 247 context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); 248 Class<?> wrappedMapperClass = 249 Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); 250 Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); 251 subcontext = (Context) getMapContext 252 .invoke(wrappedMapperClass.getDeclaredConstructor().newInstance(), mc); 253 } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION 254 // rethrow as IOE 255 throw new IOException(e); 256 } 257 } 258 } 259 260 @Override 261 public void run() { 262 try { 263 mapper.run(subcontext); 264 } catch (Throwable ie) { 265 LOG.error("Problem in running map.", ie); 266 } 267 } 268 } 269}