001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.lang.reflect.Method;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.mapreduce.Counter;
029import org.apache.hadoop.mapreduce.InputSplit;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.MapContext;
033import org.apache.hadoop.mapreduce.Mapper;
034import org.apache.hadoop.mapreduce.OutputCommitter;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.RecordWriter;
037import org.apache.hadoop.mapreduce.StatusReporter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040import org.apache.hadoop.util.ReflectionUtils;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
047 * <p>
048 * It can be used instead when the Map operation is not CPU bound in order to improve throughput.
049 * <p>
050 * Mapper implementations using this MapRunnable must be thread-safe.
051 * <p>
052 * The Map-Reduce job has to be configured with the mapper to use via {@link #setMapperClass} and
053 * the number of thread the thread-pool can use with the {@link #getNumberOfThreads} method. The
054 * default value is 10 threads.
055 * <p>
056 */
057
058@InterfaceAudience.Private
059public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
060  private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTableMapper.class);
061  private Class<? extends Mapper<ImmutableBytesWritable, Result, K2, V2>> mapClass;
062  private Context outer;
063  private ExecutorService executor;
064  public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
065  public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
066
067  /**
068   * The number of threads in the thread pool that will run the map function.
069   * @param job the job
070   * @return the number of threads
071   */
072  public static int getNumberOfThreads(JobContext job) {
073    return job.getConfiguration().getInt(NUMBER_OF_THREADS, 10);
074  }
075
076  /**
077   * Set the number of threads in the pool for running maps.
078   * @param job     the job to modify
079   * @param threads the new number of threads
080   */
081  public static void setNumberOfThreads(Job job, int threads) {
082    job.getConfiguration().setInt(NUMBER_OF_THREADS, threads);
083  }
084
085  /**
086   * Get the application's mapper class.
087   * @param <K2> the map's output key type
088   * @param <V2> the map's output value type
089   * @param job  the job
090   * @return the mapper class to run
091   */
092  @SuppressWarnings("unchecked")
093  public static <K2, V2> Class<Mapper<ImmutableBytesWritable, Result, K2, V2>>
094    getMapperClass(JobContext job) {
095    return (Class<Mapper<ImmutableBytesWritable, Result, K2, V2>>) job.getConfiguration()
096      .getClass(MAPPER_CLASS, Mapper.class);
097  }
098
099  /**
100   * Set the application's mapper class.
101   * @param <K2> the map output key type
102   * @param <V2> the map output value type
103   * @param job  the job to modify
104   * @param cls  the class to use as the mapper
105   */
106  public static <K2, V2> void setMapperClass(Job job,
107    Class<? extends Mapper<ImmutableBytesWritable, Result, K2, V2>> cls) {
108    if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
109      throw new IllegalArgumentException(
110        "Can't have recursive " + "MultithreadedTableMapper instances.");
111    }
112    job.getConfiguration().setClass(MAPPER_CLASS, cls, Mapper.class);
113  }
114
115  /**
116   * Run the application's maps using a thread pool.
117   */
118  @Override
119  public void run(Context context) throws IOException, InterruptedException {
120    outer = context;
121    int numberOfThreads = getNumberOfThreads(context);
122    mapClass = getMapperClass(context);
123    if (LOG.isDebugEnabled()) {
124      LOG.debug("Configuring multithread runner to use " + numberOfThreads + " threads");
125    }
126    executor = Executors.newFixedThreadPool(numberOfThreads);
127    for (int i = 0; i < numberOfThreads; ++i) {
128      MapRunner thread = new MapRunner(context);
129      executor.execute(thread);
130    }
131    executor.shutdown();
132    while (!executor.isTerminated()) {
133      // wait till all the threads are done
134      Thread.sleep(1000);
135    }
136  }
137
138  private class SubMapRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
139    private ImmutableBytesWritable key;
140    private Result value;
141    private Configuration conf;
142
143    @Override
144    public void close() throws IOException {
145    }
146
147    @Override
148    public float getProgress() throws IOException, InterruptedException {
149      return 0;
150    }
151
152    @Override
153    public void initialize(InputSplit split, TaskAttemptContext context)
154      throws IOException, InterruptedException {
155      conf = context.getConfiguration();
156    }
157
158    @Override
159    public boolean nextKeyValue() throws IOException, InterruptedException {
160      synchronized (outer) {
161        if (!outer.nextKeyValue()) {
162          return false;
163        }
164        key = ReflectionUtils.copy(outer.getConfiguration(), outer.getCurrentKey(), key);
165        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
166        return true;
167      }
168    }
169
170    public ImmutableBytesWritable getCurrentKey() {
171      return key;
172    }
173
174    @Override
175    public Result getCurrentValue() {
176      return value;
177    }
178  }
179
180  private class SubMapRecordWriter extends RecordWriter<K2, V2> {
181
182    @Override
183    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
184    }
185
186    @Override
187    public void write(K2 key, V2 value) throws IOException, InterruptedException {
188      synchronized (outer) {
189        outer.write(key, value);
190      }
191    }
192  }
193
194  private class SubMapStatusReporter extends StatusReporter {
195
196    @Override
197    public Counter getCounter(Enum<?> name) {
198      return outer.getCounter(name);
199    }
200
201    @Override
202    public Counter getCounter(String group, String name) {
203      return outer.getCounter(group, name);
204    }
205
206    @Override
207    public void progress() {
208      outer.progress();
209    }
210
211    @Override
212    public void setStatus(String status) {
213      outer.setStatus(status);
214    }
215
216    public float getProgress() {
217      return 0;
218    }
219  }
220
221  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "REC_CATCH_EXCEPTION",
222      justification = "Don't understand why FB is complaining about this one."
223        + " We do throw exception")
224  private class MapRunner implements Runnable {
225    private Mapper<ImmutableBytesWritable, Result, K2, V2> mapper;
226    private Context subcontext;
227
228    @SuppressWarnings({ "rawtypes", "unchecked" })
229    MapRunner(Context context) throws IOException, InterruptedException {
230      mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration());
231      try {
232        Constructor c = context.getClass().getConstructor(Mapper.class, Configuration.class,
233          TaskAttemptID.class, RecordReader.class, RecordWriter.class, OutputCommitter.class,
234          StatusReporter.class, InputSplit.class);
235        c.setAccessible(true);
236        subcontext = (Context) c.newInstance(mapper, outer.getConfiguration(),
237          outer.getTaskAttemptID(), new SubMapRecordReader(), new SubMapRecordWriter(),
238          context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit());
239      } catch (Exception e) {
240        try {
241          Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl")
242            .getConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class,
243              RecordWriter.class, OutputCommitter.class, StatusReporter.class, InputSplit.class);
244          c.setAccessible(true);
245          MapContext mc = (MapContext) c.newInstance(outer.getConfiguration(),
246            outer.getTaskAttemptID(), new SubMapRecordReader(), new SubMapRecordWriter(),
247            context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit());
248          Class<?> wrappedMapperClass =
249            Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
250          Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
251          subcontext = (Context) getMapContext
252            .invoke(wrappedMapperClass.getDeclaredConstructor().newInstance(), mc);
253        } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
254          // rethrow as IOE
255          throw new IOException(e);
256        }
257      }
258    }
259
260    @Override
261    public void run() {
262      try {
263        mapper.run(subcontext);
264      } catch (Throwable ie) {
265        LOG.error("Problem in running map.", ie);
266      }
267    }
268  }
269}