1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.Method;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.mapreduce.Counter;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.Job;
34 import org.apache.hadoop.mapreduce.JobContext;
35 import org.apache.hadoop.mapreduce.MapContext;
36 import org.apache.hadoop.mapreduce.Mapper;
37 import org.apache.hadoop.mapreduce.OutputCommitter;
38 import org.apache.hadoop.mapreduce.RecordReader;
39 import org.apache.hadoop.mapreduce.RecordWriter;
40 import org.apache.hadoop.mapreduce.StatusReporter;
41 import org.apache.hadoop.mapreduce.TaskAttemptContext;
42 import org.apache.hadoop.mapreduce.TaskAttemptID;
43 import org.apache.hadoop.util.ReflectionUtils;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
61 private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
62 private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
63 private Context outer;
64 private ExecutorService executor;
65 public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
66 public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
67
68
69
70
71
72
73 public static int getNumberOfThreads(JobContext job) {
74 return job.getConfiguration().
75 getInt(NUMBER_OF_THREADS, 10);
76 }
77
78
79
80
81
82
83 public static void setNumberOfThreads(Job job, int threads) {
84 job.getConfiguration().setInt(NUMBER_OF_THREADS,
85 threads);
86 }
87
88
89
90
91
92
93
94
95 @SuppressWarnings("unchecked")
96 public static <K2,V2>
97 Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
98 return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
99 job.getConfiguration().getClass( MAPPER_CLASS,
100 Mapper.class);
101 }
102
103
104
105
106
107
108
109
110 public static <K2,V2>
111 void setMapperClass(Job job,
112 Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
113 if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
114 throw new IllegalArgumentException("Can't have recursive " +
115 "MultithreadedTableMapper instances.");
116 }
117 job.getConfiguration().setClass(MAPPER_CLASS,
118 cls, Mapper.class);
119 }
120
121
122
123
124 @Override
125 public void run(Context context) throws IOException, InterruptedException {
126 outer = context;
127 int numberOfThreads = getNumberOfThreads(context);
128 mapClass = getMapperClass(context);
129 if (LOG.isDebugEnabled()) {
130 LOG.debug("Configuring multithread runner to use " + numberOfThreads +
131 " threads");
132 }
133 executor = Executors.newFixedThreadPool(numberOfThreads);
134 for(int i=0; i < numberOfThreads; ++i) {
135 MapRunner thread = new MapRunner(context);
136 executor.execute(thread);
137 }
138 executor.shutdown();
139 while (!executor.isTerminated()) {
140
141 Thread.sleep(1000);
142 }
143 }
144
145 private class SubMapRecordReader
146 extends RecordReader<ImmutableBytesWritable, Result> {
147 private ImmutableBytesWritable key;
148 private Result value;
149 private Configuration conf;
150
151 @Override
152 public void close() throws IOException {
153 }
154
155 @Override
156 public float getProgress() throws IOException, InterruptedException {
157 return 0;
158 }
159
160 @Override
161 public void initialize(InputSplit split,
162 TaskAttemptContext context
163 ) throws IOException, InterruptedException {
164 conf = context.getConfiguration();
165 }
166
167 @Override
168 public boolean nextKeyValue() throws IOException, InterruptedException {
169 synchronized (outer) {
170 if (!outer.nextKeyValue()) {
171 return false;
172 }
173 key = ReflectionUtils.copy(outer.getConfiguration(),
174 outer.getCurrentKey(), key);
175 value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
176 return true;
177 }
178 }
179
180 public ImmutableBytesWritable getCurrentKey() {
181 return key;
182 }
183
184 @Override
185 public Result getCurrentValue() {
186 return value;
187 }
188 }
189
190 private class SubMapRecordWriter extends RecordWriter<K2,V2> {
191
192 @Override
193 public void close(TaskAttemptContext context) throws IOException,
194 InterruptedException {
195 }
196
197 @Override
198 public void write(K2 key, V2 value) throws IOException,
199 InterruptedException {
200 synchronized (outer) {
201 outer.write(key, value);
202 }
203 }
204 }
205
206 private class SubMapStatusReporter extends StatusReporter {
207
208 @Override
209 public Counter getCounter(Enum<?> name) {
210 return outer.getCounter(name);
211 }
212
213 @Override
214 public Counter getCounter(String group, String name) {
215 return outer.getCounter(group, name);
216 }
217
218 @Override
219 public void progress() {
220 outer.progress();
221 }
222
223 @Override
224 public void setStatus(String status) {
225 outer.setStatus(status);
226 }
227
228 public float getProgress() {
229 return 0;
230 }
231 }
232
233 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
234 justification="Don't understand why FB is complaining about this one. We do throw exception")
235 private class MapRunner implements Runnable {
236 private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
237 private Context subcontext;
238
239 @SuppressWarnings({ "rawtypes", "unchecked" })
240 MapRunner(Context context) throws IOException, InterruptedException {
241 mapper = ReflectionUtils.newInstance(mapClass,
242 context.getConfiguration());
243 try {
244 Constructor c = context.getClass().getConstructor(
245 Mapper.class,
246 Configuration.class,
247 TaskAttemptID.class,
248 RecordReader.class,
249 RecordWriter.class,
250 OutputCommitter.class,
251 StatusReporter.class,
252 InputSplit.class);
253 c.setAccessible(true);
254 subcontext = (Context) c.newInstance(
255 mapper,
256 outer.getConfiguration(),
257 outer.getTaskAttemptID(),
258 new SubMapRecordReader(),
259 new SubMapRecordWriter(),
260 context.getOutputCommitter(),
261 new SubMapStatusReporter(),
262 outer.getInputSplit());
263 } catch (Exception e) {
264 try {
265 Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
266 Configuration.class,
267 TaskAttemptID.class,
268 RecordReader.class,
269 RecordWriter.class,
270 OutputCommitter.class,
271 StatusReporter.class,
272 InputSplit.class);
273 c.setAccessible(true);
274 MapContext mc = (MapContext) c.newInstance(
275 outer.getConfiguration(),
276 outer.getTaskAttemptID(),
277 new SubMapRecordReader(),
278 new SubMapRecordWriter(),
279 context.getOutputCommitter(),
280 new SubMapStatusReporter(),
281 outer.getInputSplit());
282 Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
283 Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
284 subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
285 } catch (Exception ee) {
286
287 throw new IOException(e);
288 }
289 }
290 }
291
292 @Override
293 public void run() {
294 try {
295 mapper.run(subcontext);
296 } catch (Throwable ie) {
297 LOG.error("Problem in running map.", ie);
298 }
299 }
300 }
301 }