001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.lang.reflect.Method;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.mapreduce.Counter;
029import org.apache.hadoop.mapreduce.InputSplit;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.MapContext;
033import org.apache.hadoop.mapreduce.Mapper;
034import org.apache.hadoop.mapreduce.OutputCommitter;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.RecordWriter;
037import org.apache.hadoop.mapreduce.StatusReporter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040import org.apache.hadoop.util.ReflectionUtils;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045
046/**
047 * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
048 * <p>
049 * It can be used instead when the Map operation is not CPU
050 * bound in order to improve throughput.
051 * <p>
052 * Mapper implementations using this MapRunnable must be thread-safe.
053 * <p>
054 * The Map-Reduce job has to be configured with the mapper to use via
055 * {@link #setMapperClass} and the number of thread the thread-pool can use with the
056 * {@link #getNumberOfThreads} method. The default value is 10 threads.
057 * <p>
058 */
059
060@InterfaceAudience.Private
061public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
062  private static final Logger LOG = LoggerFactory.getLogger(MultithreadedTableMapper.class);
063  private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
064  private Context outer;
065  private ExecutorService executor;
066  public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
067  public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
068
069  /**
070   * The number of threads in the thread pool that will run the map function.
071   * @param job the job
072   * @return the number of threads
073   */
074  public static int getNumberOfThreads(JobContext job) {
075    return job.getConfiguration().
076        getInt(NUMBER_OF_THREADS, 10);
077  }
078
079  /**
080   * Set the number of threads in the pool for running maps.
081   * @param job the job to modify
082   * @param threads the new number of threads
083   */
084  public static void setNumberOfThreads(Job job, int threads) {
085    job.getConfiguration().setInt(NUMBER_OF_THREADS,
086        threads);
087  }
088
089  /**
090   * Get the application's mapper class.
091   * @param <K2> the map's output key type
092   * @param <V2> the map's output value type
093   * @param job the job
094   * @return the mapper class to run
095   */
096  @SuppressWarnings("unchecked")
097  public static <K2,V2>
098  Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
099    return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
100        job.getConfiguration().getClass( MAPPER_CLASS,
101            Mapper.class);
102  }
103
104  /**
105   * Set the application's mapper class.
106   * @param <K2> the map output key type
107   * @param <V2> the map output value type
108   * @param job the job to modify
109   * @param cls the class to use as the mapper
110   */
111  public static <K2,V2>
112  void setMapperClass(Job job,
113      Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
114    if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
115      throw new IllegalArgumentException("Can't have recursive " +
116          "MultithreadedTableMapper instances.");
117    }
118    job.getConfiguration().setClass(MAPPER_CLASS,
119        cls, Mapper.class);
120  }
121
122  /**
123   * Run the application's maps using a thread pool.
124   */
125  @Override
126  public void run(Context context) throws IOException, InterruptedException {
127    outer = context;
128    int numberOfThreads = getNumberOfThreads(context);
129    mapClass = getMapperClass(context);
130    if (LOG.isDebugEnabled()) {
131      LOG.debug("Configuring multithread runner to use " + numberOfThreads +
132          " threads");
133    }
134    executor = Executors.newFixedThreadPool(numberOfThreads);
135    for(int i=0; i < numberOfThreads; ++i) {
136      MapRunner thread = new MapRunner(context);
137      executor.execute(thread);
138    }
139    executor.shutdown();
140    while (!executor.isTerminated()) {
141      // wait till all the threads are done
142      Thread.sleep(1000);
143    }
144  }
145
146  private class SubMapRecordReader
147  extends RecordReader<ImmutableBytesWritable, Result> {
148    private ImmutableBytesWritable key;
149    private Result value;
150    private Configuration conf;
151
152    @Override
153    public void close() throws IOException {
154    }
155
156    @Override
157    public float getProgress() throws IOException, InterruptedException {
158      return 0;
159    }
160
161    @Override
162    public void initialize(InputSplit split,
163        TaskAttemptContext context
164        ) throws IOException, InterruptedException {
165      conf = context.getConfiguration();
166    }
167
168    @Override
169    public boolean nextKeyValue() throws IOException, InterruptedException {
170      synchronized (outer) {
171        if (!outer.nextKeyValue()) {
172          return false;
173        }
174        key = ReflectionUtils.copy(outer.getConfiguration(),
175            outer.getCurrentKey(), key);
176        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
177        return true;
178      }
179    }
180
181    public ImmutableBytesWritable getCurrentKey() {
182      return key;
183    }
184
185    @Override
186    public Result getCurrentValue() {
187      return value;
188    }
189  }
190
191  private class SubMapRecordWriter extends RecordWriter<K2,V2> {
192
193    @Override
194    public void close(TaskAttemptContext context) throws IOException,
195    InterruptedException {
196    }
197
198    @Override
199    public void write(K2 key, V2 value) throws IOException,
200    InterruptedException {
201      synchronized (outer) {
202        outer.write(key, value);
203      }
204    }
205  }
206
207  private class SubMapStatusReporter extends StatusReporter {
208
209    @Override
210    public Counter getCounter(Enum<?> name) {
211      return outer.getCounter(name);
212    }
213
214    @Override
215    public Counter getCounter(String group, String name) {
216      return outer.getCounter(group, name);
217    }
218
219    @Override
220    public void progress() {
221      outer.progress();
222    }
223
224    @Override
225    public void setStatus(String status) {
226      outer.setStatus(status);
227    }
228
229    public float getProgress() {
230      return 0;
231    }
232  }
233
234  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
235      justification="Don't understand why FB is complaining about this one. We do throw exception")
236  private class MapRunner implements Runnable {
237    private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
238    private Context subcontext;
239
240    @SuppressWarnings({ "rawtypes", "unchecked" })
241    MapRunner(Context context) throws IOException, InterruptedException {
242      mapper = ReflectionUtils.newInstance(mapClass,
243          context.getConfiguration());
244      try {
245        Constructor c = context.getClass().getConstructor(
246          Mapper.class,
247          Configuration.class,
248          TaskAttemptID.class,
249          RecordReader.class,
250          RecordWriter.class,
251          OutputCommitter.class,
252          StatusReporter.class,
253          InputSplit.class);
254        c.setAccessible(true);
255        subcontext = (Context) c.newInstance(
256          mapper,
257          outer.getConfiguration(),
258          outer.getTaskAttemptID(),
259          new SubMapRecordReader(),
260          new SubMapRecordWriter(),
261          context.getOutputCommitter(),
262          new SubMapStatusReporter(),
263          outer.getInputSplit());
264      } catch (Exception e) {
265        try {
266          Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
267            Configuration.class,
268            TaskAttemptID.class,
269            RecordReader.class,
270            RecordWriter.class,
271            OutputCommitter.class,
272            StatusReporter.class,
273            InputSplit.class);
274          c.setAccessible(true);
275          MapContext mc = (MapContext) c.newInstance(
276            outer.getConfiguration(),
277            outer.getTaskAttemptID(),
278            new SubMapRecordReader(),
279            new SubMapRecordWriter(),
280            context.getOutputCommitter(),
281            new SubMapStatusReporter(),
282            outer.getInputSplit());
283          Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
284          Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
285          subcontext = (Context) getMapContext.invoke(
286              wrappedMapperClass.getDeclaredConstructor().newInstance(), mc);
287        } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
288          // rethrow as IOE
289          throw new IOException(e);
290        }
291      }
292    }
293
294    @Override
295    public void run() {
296      try {
297        mapper.run(subcontext);
298      } catch (Throwable ie) {
299        LOG.error("Problem in running map.", ie);
300      }
301    }
302  }
303}