View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Constructor;
22  import java.lang.reflect.Method;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.mapreduce.Counter;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.Job;
34  import org.apache.hadoop.mapreduce.JobContext;
35  import org.apache.hadoop.mapreduce.MapContext;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.hadoop.mapreduce.OutputCommitter;
38  import org.apache.hadoop.mapreduce.RecordReader;
39  import org.apache.hadoop.mapreduce.RecordWriter;
40  import org.apache.hadoop.mapreduce.StatusReporter;
41  import org.apache.hadoop.mapreduce.TaskAttemptContext;
42  import org.apache.hadoop.mapreduce.TaskAttemptID;
43  import org.apache.hadoop.util.ReflectionUtils;
44  
45  
46  /**
47   * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
48   * <p>
49   * It can be used instead when the Map operation is not CPU
50   * bound in order to improve throughput.
51   * <p>
52   * Mapper implementations using this MapRunnable must be thread-safe.
53   * <p>
54   * The Map-Reduce job has to be configured with the mapper to use via
55   * {@link #setMapperClass} and the number of thread the thread-pool can use with the
56   * {@link #getNumberOfThreads} method. The default value is 10 threads.
57   * <p>
58   */
59  
60  public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
61    private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
62    private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
63    private Context outer;
64    private ExecutorService executor;
65    public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
66    public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
67  
68    /**
69     * The number of threads in the thread pool that will run the map function.
70     * @param job the job
71     * @return the number of threads
72     */
73    public static int getNumberOfThreads(JobContext job) {
74      return job.getConfiguration().
75          getInt(NUMBER_OF_THREADS, 10);
76    }
77  
78    /**
79     * Set the number of threads in the pool for running maps.
80     * @param job the job to modify
81     * @param threads the new number of threads
82     */
83    public static void setNumberOfThreads(Job job, int threads) {
84      job.getConfiguration().setInt(NUMBER_OF_THREADS,
85          threads);
86    }
87  
88    /**
89     * Get the application's mapper class.
90     * @param <K2> the map's output key type
91     * @param <V2> the map's output value type
92     * @param job the job
93     * @return the mapper class to run
94     */
95    @SuppressWarnings("unchecked")
96    public static <K2,V2>
97    Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
98      return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
99          job.getConfiguration().getClass( MAPPER_CLASS,
100             Mapper.class);
101   }
102 
103   /**
104    * Set the application's mapper class.
105    * @param <K2> the map output key type
106    * @param <V2> the map output value type
107    * @param job the job to modify
108    * @param cls the class to use as the mapper
109    */
110   public static <K2,V2>
111   void setMapperClass(Job job,
112       Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
113     if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
114       throw new IllegalArgumentException("Can't have recursive " +
115           "MultithreadedTableMapper instances.");
116     }
117     job.getConfiguration().setClass(MAPPER_CLASS,
118         cls, Mapper.class);
119   }
120 
121   /**
122    * Run the application's maps using a thread pool.
123    */
124   @Override
125   public void run(Context context) throws IOException, InterruptedException {
126     outer = context;
127     int numberOfThreads = getNumberOfThreads(context);
128     mapClass = getMapperClass(context);
129     if (LOG.isDebugEnabled()) {
130       LOG.debug("Configuring multithread runner to use " + numberOfThreads +
131           " threads");
132     }
133     executor = Executors.newFixedThreadPool(numberOfThreads);
134     for(int i=0; i < numberOfThreads; ++i) {
135       MapRunner thread = new MapRunner(context);
136       executor.execute(thread);
137     }
138     executor.shutdown();
139     while (!executor.isTerminated()) {
140       // wait till all the threads are done
141       Thread.sleep(1000);
142     }
143   }
144 
145   private class SubMapRecordReader
146   extends RecordReader<ImmutableBytesWritable, Result> {
147     private ImmutableBytesWritable key;
148     private Result value;
149     private Configuration conf;
150 
151     @Override
152     public void close() throws IOException {
153     }
154 
155     @Override
156     public float getProgress() throws IOException, InterruptedException {
157       return 0;
158     }
159 
160     @Override
161     public void initialize(InputSplit split,
162         TaskAttemptContext context
163         ) throws IOException, InterruptedException {
164       conf = context.getConfiguration();
165     }
166 
167     @Override
168     public boolean nextKeyValue() throws IOException, InterruptedException {
169       synchronized (outer) {
170         if (!outer.nextKeyValue()) {
171           return false;
172         }
173         key = ReflectionUtils.copy(outer.getConfiguration(),
174             outer.getCurrentKey(), key);
175         value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
176         return true;
177       }
178     }
179 
180     public ImmutableBytesWritable getCurrentKey() {
181       return key;
182     }
183 
184     @Override
185     public Result getCurrentValue() {
186       return value;
187     }
188   }
189 
190   private class SubMapRecordWriter extends RecordWriter<K2,V2> {
191 
192     @Override
193     public void close(TaskAttemptContext context) throws IOException,
194     InterruptedException {
195     }
196 
197     @Override
198     public void write(K2 key, V2 value) throws IOException,
199     InterruptedException {
200       synchronized (outer) {
201         outer.write(key, value);
202       }
203     }
204   }
205 
206   private class SubMapStatusReporter extends StatusReporter {
207 
208     @Override
209     public Counter getCounter(Enum<?> name) {
210       return outer.getCounter(name);
211     }
212 
213     @Override
214     public Counter getCounter(String group, String name) {
215       return outer.getCounter(group, name);
216     }
217 
218     @Override
219     public void progress() {
220       outer.progress();
221     }
222 
223     @Override
224     public void setStatus(String status) {
225       outer.setStatus(status);
226     }
227 
228     public float getProgress() {
229       return 0;
230     }
231   }
232 
233   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
234       justification="Don't understand why FB is complaining about this one. We do throw exception")
235   private class MapRunner implements Runnable {
236     private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
237     private Context subcontext;
238 
239     @SuppressWarnings({ "rawtypes", "unchecked" })
240     MapRunner(Context context) throws IOException, InterruptedException {
241       mapper = ReflectionUtils.newInstance(mapClass,
242           context.getConfiguration());
243       try {
244         Constructor c = context.getClass().getConstructor(
245           Mapper.class,
246           Configuration.class,
247           TaskAttemptID.class,
248           RecordReader.class,
249           RecordWriter.class,
250           OutputCommitter.class,
251           StatusReporter.class,
252           InputSplit.class);
253         c.setAccessible(true);
254         subcontext = (Context) c.newInstance(
255           mapper,
256           outer.getConfiguration(), 
257           outer.getTaskAttemptID(),
258           new SubMapRecordReader(),
259           new SubMapRecordWriter(),
260           context.getOutputCommitter(),
261           new SubMapStatusReporter(),
262           outer.getInputSplit());
263       } catch (Exception e) {
264         try {
265           Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
266             Configuration.class,
267             TaskAttemptID.class,
268             RecordReader.class,
269             RecordWriter.class,
270             OutputCommitter.class,
271             StatusReporter.class,
272             InputSplit.class);
273           c.setAccessible(true);
274           MapContext mc = (MapContext) c.newInstance(
275             outer.getConfiguration(), 
276             outer.getTaskAttemptID(),
277             new SubMapRecordReader(),
278             new SubMapRecordWriter(),
279             context.getOutputCommitter(),
280             new SubMapStatusReporter(),
281             outer.getInputSplit());
282           Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
283           Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
284           subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
285         } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
286           // rethrow as IOE
287           throw new IOException(e);
288         }
289       }
290     }
291 
292     @Override
293     public void run() {
294       try {
295         mapper.run(subcontext);
296       } catch (Throwable ie) {
297         LOG.error("Problem in running map.", ie);
298       }
299     }
300   }
301 }