View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Constructor;
22  import java.lang.reflect.Method;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.mapreduce.Counter;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.Job;
34  import org.apache.hadoop.mapreduce.JobContext;
35  import org.apache.hadoop.mapreduce.MapContext;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.hadoop.mapreduce.OutputCommitter;
38  import org.apache.hadoop.mapreduce.RecordReader;
39  import org.apache.hadoop.mapreduce.RecordWriter;
40  import org.apache.hadoop.mapreduce.StatusReporter;
41  import org.apache.hadoop.mapreduce.TaskAttemptContext;
42  import org.apache.hadoop.mapreduce.TaskAttemptID;
43  import org.apache.hadoop.util.ReflectionUtils;
44  
45  
46  /**
47   * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
48   * <p>
49   * It can be used instead when the Map operation is not CPU
50   * bound in order to improve throughput.
51   * <p>
52   * Mapper implementations using this MapRunnable must be thread-safe.
53   * <p>
54   * The Map-Reduce job has to be configured with the mapper to use via
55   * {@link #setMapperClass(Configuration, Class)} and
56   * the number of thread the thread-pool can use with the
57   * {@link #getNumberOfThreads(Configuration) method. The default
58   * value is 10 threads.
59   * <p>
60   */
61  
62  public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
63    private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
64    private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
65    private Context outer;
66    private ExecutorService executor;
67    public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
68    public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
69  
70    /**
71     * The number of threads in the thread pool that will run the map function.
72     * @param job the job
73     * @return the number of threads
74     */
75    public static int getNumberOfThreads(JobContext job) {
76      return job.getConfiguration().
77          getInt(NUMBER_OF_THREADS, 10);
78    }
79  
80    /**
81     * Set the number of threads in the pool for running maps.
82     * @param job the job to modify
83     * @param threads the new number of threads
84     */
85    public static void setNumberOfThreads(Job job, int threads) {
86      job.getConfiguration().setInt(NUMBER_OF_THREADS,
87          threads);
88    }
89  
90    /**
91     * Get the application's mapper class.
92     * @param <K2> the map's output key type
93     * @param <V2> the map's output value type
94     * @param job the job
95     * @return the mapper class to run
96     */
97    @SuppressWarnings("unchecked")
98    public static <K2,V2>
99    Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
100     return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
101         job.getConfiguration().getClass( MAPPER_CLASS,
102             Mapper.class);
103   }
104 
105   /**
106    * Set the application's mapper class.
107    * @param <K2> the map output key type
108    * @param <V2> the map output value type
109    * @param job the job to modify
110    * @param cls the class to use as the mapper
111    */
112   public static <K2,V2>
113   void setMapperClass(Job job,
114       Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
115     if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
116       throw new IllegalArgumentException("Can't have recursive " +
117           "MultithreadedTableMapper instances.");
118     }
119     job.getConfiguration().setClass(MAPPER_CLASS,
120         cls, Mapper.class);
121   }
122 
123   /**
124    * Run the application's maps using a thread pool.
125    */
126   @Override
127   public void run(Context context) throws IOException, InterruptedException {
128     outer = context;
129     int numberOfThreads = getNumberOfThreads(context);
130     mapClass = getMapperClass(context);
131     if (LOG.isDebugEnabled()) {
132       LOG.debug("Configuring multithread runner to use " + numberOfThreads +
133           " threads");
134     }
135     executor = Executors.newFixedThreadPool(numberOfThreads);
136     for(int i=0; i < numberOfThreads; ++i) {
137       MapRunner thread = new MapRunner(context);
138       executor.execute(thread);
139     }
140     executor.shutdown();
141     while (!executor.isTerminated()) {
142       // wait till all the threads are done
143       Thread.sleep(1000);
144     }
145   }
146 
147   private class SubMapRecordReader
148   extends RecordReader<ImmutableBytesWritable, Result> {
149     private ImmutableBytesWritable key;
150     private Result value;
151     private Configuration conf;
152 
153     @Override
154     public void close() throws IOException {
155     }
156 
157     @Override
158     public float getProgress() throws IOException, InterruptedException {
159       return 0;
160     }
161 
162     @Override
163     public void initialize(InputSplit split,
164         TaskAttemptContext context
165         ) throws IOException, InterruptedException {
166       conf = context.getConfiguration();
167     }
168 
169     @Override
170     public boolean nextKeyValue() throws IOException, InterruptedException {
171       synchronized (outer) {
172         if (!outer.nextKeyValue()) {
173           return false;
174         }
175         key = ReflectionUtils.copy(outer.getConfiguration(),
176             outer.getCurrentKey(), key);
177         value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
178         return true;
179       }
180     }
181 
182     public ImmutableBytesWritable getCurrentKey() {
183       return key;
184     }
185 
186     @Override
187     public Result getCurrentValue() {
188       return value;
189     }
190   }
191 
192   private class SubMapRecordWriter extends RecordWriter<K2,V2> {
193 
194     @Override
195     public void close(TaskAttemptContext context) throws IOException,
196     InterruptedException {
197     }
198 
199     @Override
200     public void write(K2 key, V2 value) throws IOException,
201     InterruptedException {
202       synchronized (outer) {
203         outer.write(key, value);
204       }
205     }
206   }
207 
208   private class SubMapStatusReporter extends StatusReporter {
209 
210     @Override
211     public Counter getCounter(Enum<?> name) {
212       return outer.getCounter(name);
213     }
214 
215     @Override
216     public Counter getCounter(String group, String name) {
217       return outer.getCounter(group, name);
218     }
219 
220     @Override
221     public void progress() {
222       outer.progress();
223     }
224 
225     @Override
226     public void setStatus(String status) {
227       outer.setStatus(status);
228     }
229 
230     public float getProgress() {
231       return 0;
232     }
233   }
234 
235   private class MapRunner implements Runnable {
236     private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
237     private Context subcontext;
238     private Throwable throwable;
239 
240     @SuppressWarnings({ "rawtypes", "unchecked" })
241     MapRunner(Context context) throws IOException, InterruptedException {
242       mapper = ReflectionUtils.newInstance(mapClass,
243           context.getConfiguration());
244       try {
245         Constructor c = context.getClass().getConstructor(
246           Mapper.class,
247           Configuration.class,
248           TaskAttemptID.class,
249           RecordReader.class,
250           RecordWriter.class,
251           OutputCommitter.class,
252           StatusReporter.class,
253           InputSplit.class);
254         c.setAccessible(true);
255         subcontext = (Context) c.newInstance(
256           mapper,
257           outer.getConfiguration(), 
258           outer.getTaskAttemptID(),
259           new SubMapRecordReader(),
260           new SubMapRecordWriter(),
261           context.getOutputCommitter(),
262           new SubMapStatusReporter(),
263           outer.getInputSplit());
264       } catch (Exception e) {
265         try {
266           Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
267             Configuration.class,
268             TaskAttemptID.class,
269             RecordReader.class,
270             RecordWriter.class,
271             OutputCommitter.class,
272             StatusReporter.class,
273             InputSplit.class);
274           c.setAccessible(true);
275           MapContext mc = (MapContext) c.newInstance(
276             outer.getConfiguration(), 
277             outer.getTaskAttemptID(),
278             new SubMapRecordReader(),
279             new SubMapRecordWriter(),
280             context.getOutputCommitter(),
281             new SubMapStatusReporter(),
282             outer.getInputSplit());
283           Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
284           Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
285           subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
286         } catch (Exception ee) {
287           // rethrow as IOE
288           throw new IOException(e);
289         }
290       }
291     }
292 
293     @Override
294     public void run() {
295       try {
296         mapper.run(subcontext);
297       } catch (Throwable ie) {
298         throwable = ie;
299       }
300     }
301   }
302 }