001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.lang.reflect.Method; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 028import org.apache.hadoop.mapreduce.Counter; 029import org.apache.hadoop.mapreduce.InputSplit; 030import org.apache.hadoop.mapreduce.Job; 031import org.apache.hadoop.mapreduce.JobContext; 032import org.apache.hadoop.mapreduce.MapContext; 033import org.apache.hadoop.mapreduce.Mapper; 034import org.apache.hadoop.mapreduce.OutputCommitter; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.RecordWriter; 037import org.apache.hadoop.mapreduce.StatusReporter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040import org.apache.hadoop.util.ReflectionUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045 046/** 047 * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper 048 * <p> 049 * It can be used instead when the Map operation is not CPU 050 * bound in order to improve throughput. 051 * <p> 052 * Mapper implementations using this MapRunnable must be thread-safe. 053 * <p> 054 * The Map-Reduce job has to be configured with the mapper to use via 055 * {@link #setMapperClass} and the number of thread the thread-pool can use with the 056 * {@link #getNumberOfThreads} method. The default value is 10 threads. 057 * <p> 058 */ 059 060@InterfaceAudience.Private 061public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> { 062 private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTableMapper.class); 063 private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass; 064 private Context outer; 065 private ExecutorService executor; 066 public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads"; 067 public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass"; 068 069 /** 070 * The number of threads in the thread pool that will run the map function. 071 * @param job the job 072 * @return the number of threads 073 */ 074 public static int getNumberOfThreads(JobContext job) { 075 return job.getConfiguration(). 076 getInt(NUMBER_OF_THREADS, 10); 077 } 078 079 /** 080 * Set the number of threads in the pool for running maps. 081 * @param job the job to modify 082 * @param threads the new number of threads 083 */ 084 public static void setNumberOfThreads(Job job, int threads) { 085 job.getConfiguration().setInt(NUMBER_OF_THREADS, 086 threads); 087 } 088 089 /** 090 * Get the application's mapper class. 091 * @param <K2> the map's output key type 092 * @param <V2> the map's output value type 093 * @param job the job 094 * @return the mapper class to run 095 */ 096 @SuppressWarnings("unchecked") 097 public static <K2,V2> 098 Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) { 099 return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>) 100 job.getConfiguration().getClass( MAPPER_CLASS, 101 Mapper.class); 102 } 103 104 /** 105 * Set the application's mapper class. 106 * @param <K2> the map output key type 107 * @param <V2> the map output value type 108 * @param job the job to modify 109 * @param cls the class to use as the mapper 110 */ 111 public static <K2,V2> 112 void setMapperClass(Job job, 113 Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) { 114 if (MultithreadedTableMapper.class.isAssignableFrom(cls)) { 115 throw new IllegalArgumentException("Can't have recursive " + 116 "MultithreadedTableMapper instances."); 117 } 118 job.getConfiguration().setClass(MAPPER_CLASS, 119 cls, Mapper.class); 120 } 121 122 /** 123 * Run the application's maps using a thread pool. 124 */ 125 @Override 126 public void run(Context context) throws IOException, InterruptedException { 127 outer = context; 128 int numberOfThreads = getNumberOfThreads(context); 129 mapClass = getMapperClass(context); 130 if (LOG.isDebugEnabled()) { 131 LOG.debug("Configuring multithread runner to use " + numberOfThreads + 132 " threads"); 133 } 134 executor = Executors.newFixedThreadPool(numberOfThreads); 135 for(int i=0; i < numberOfThreads; ++i) { 136 MapRunner thread = new MapRunner(context); 137 executor.execute(thread); 138 } 139 executor.shutdown(); 140 while (!executor.isTerminated()) { 141 // wait till all the threads are done 142 Thread.sleep(1000); 143 } 144 } 145 146 private class SubMapRecordReader 147 extends RecordReader<ImmutableBytesWritable, Result> { 148 private ImmutableBytesWritable key; 149 private Result value; 150 private Configuration conf; 151 152 @Override 153 public void close() throws IOException { 154 } 155 156 @Override 157 public float getProgress() throws IOException, InterruptedException { 158 return 0; 159 } 160 161 @Override 162 public void initialize(InputSplit split, 163 TaskAttemptContext context 164 ) throws IOException, InterruptedException { 165 conf = context.getConfiguration(); 166 } 167 168 @Override 169 public boolean nextKeyValue() throws IOException, InterruptedException { 170 synchronized (outer) { 171 if (!outer.nextKeyValue()) { 172 return false; 173 } 174 key = ReflectionUtils.copy(outer.getConfiguration(), 175 outer.getCurrentKey(), key); 176 value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); 177 return true; 178 } 179 } 180 181 public ImmutableBytesWritable getCurrentKey() { 182 return key; 183 } 184 185 @Override 186 public Result getCurrentValue() { 187 return value; 188 } 189 } 190 191 private class SubMapRecordWriter extends RecordWriter<K2,V2> { 192 193 @Override 194 public void close(TaskAttemptContext context) throws IOException, 195 InterruptedException { 196 } 197 198 @Override 199 public void write(K2 key, V2 value) throws IOException, 200 InterruptedException { 201 synchronized (outer) { 202 outer.write(key, value); 203 } 204 } 205 } 206 207 private class SubMapStatusReporter extends StatusReporter { 208 209 @Override 210 public Counter getCounter(Enum<?> name) { 211 return outer.getCounter(name); 212 } 213 214 @Override 215 public Counter getCounter(String group, String name) { 216 return outer.getCounter(group, name); 217 } 218 219 @Override 220 public void progress() { 221 outer.progress(); 222 } 223 224 @Override 225 public void setStatus(String status) { 226 outer.setStatus(status); 227 } 228 229 public float getProgress() { 230 return 0; 231 } 232 } 233 234 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", 235 justification="Don't understand why FB is complaining about this one. We do throw exception") 236 private class MapRunner implements Runnable { 237 private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper; 238 private Context subcontext; 239 240 @SuppressWarnings({ "rawtypes", "unchecked" }) 241 MapRunner(Context context) throws IOException, InterruptedException { 242 mapper = ReflectionUtils.newInstance(mapClass, 243 context.getConfiguration()); 244 try { 245 Constructor c = context.getClass().getConstructor( 246 Mapper.class, 247 Configuration.class, 248 TaskAttemptID.class, 249 RecordReader.class, 250 RecordWriter.class, 251 OutputCommitter.class, 252 StatusReporter.class, 253 InputSplit.class); 254 c.setAccessible(true); 255 subcontext = (Context) c.newInstance( 256 mapper, 257 outer.getConfiguration(), 258 outer.getTaskAttemptID(), 259 new SubMapRecordReader(), 260 new SubMapRecordWriter(), 261 context.getOutputCommitter(), 262 new SubMapStatusReporter(), 263 outer.getInputSplit()); 264 } catch (Exception e) { 265 try { 266 Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor( 267 Configuration.class, 268 TaskAttemptID.class, 269 RecordReader.class, 270 RecordWriter.class, 271 OutputCommitter.class, 272 StatusReporter.class, 273 InputSplit.class); 274 c.setAccessible(true); 275 MapContext mc = (MapContext) c.newInstance( 276 outer.getConfiguration(), 277 outer.getTaskAttemptID(), 278 new SubMapRecordReader(), 279 new SubMapRecordWriter(), 280 context.getOutputCommitter(), 281 new SubMapStatusReporter(), 282 outer.getInputSplit()); 283 Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); 284 Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); 285 subcontext = (Context) getMapContext.invoke( 286 wrappedMapperClass.getDeclaredConstructor().newInstance(), mc); 287 } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION 288 // rethrow as IOE 289 throw new IOException(e); 290 } 291 } 292 } 293 294 @Override 295 public void run() { 296 try { 297 mapper.run(subcontext); 298 } catch (Throwable ie) { 299 LOG.error("Problem in running map.", ie); 300 } 301 } 302 } 303}