1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.URL;
24 import java.net.URLDecoder;
25 import java.util.ArrayList;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.zip.ZipEntry;
33 import java.util.zip.ZipFile;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.MetaTableAccessor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.classification.InterfaceStability;
46 import org.apache.hadoop.hbase.client.Connection;
47 import org.apache.hadoop.hbase.client.ConnectionFactory;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.security.UserProvider;
55 import org.apache.hadoop.hbase.security.token.TokenUtil;
56 import org.apache.hadoop.hbase.util.Base64;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
59 import org.apache.hadoop.io.Writable;
60 import org.apache.hadoop.mapreduce.InputFormat;
61 import org.apache.hadoop.mapreduce.Job;
62 import org.apache.hadoop.util.StringUtils;
63 import com.google.protobuf.InvalidProtocolBufferException;
64
65
66
67
68 @SuppressWarnings({ "rawtypes", "unchecked" })
69 @InterfaceAudience.Public
70 @InterfaceStability.Stable
71 public class TableMapReduceUtil {
72 static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public static void initTableMapperJob(String table, Scan scan,
88 Class<? extends TableMapper> mapper,
89 Class<?> outputKeyClass,
90 Class<?> outputValueClass, Job job)
91 throws IOException {
92 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
93 job, true);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public static void initTableMapperJob(TableName table,
111 Scan scan,
112 Class<? extends TableMapper> mapper,
113 Class<?> outputKeyClass,
114 Class<?> outputValueClass,
115 Job job) throws IOException {
116 initTableMapperJob(table.getNameAsString(),
117 scan,
118 mapper,
119 outputKeyClass,
120 outputValueClass,
121 job,
122 true);
123 }
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public static void initTableMapperJob(byte[] table, Scan scan,
139 Class<? extends TableMapper> mapper,
140 Class<?> outputKeyClass,
141 Class<?> outputValueClass, Job job)
142 throws IOException {
143 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
144 job, true);
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 public static void initTableMapperJob(String table, Scan scan,
163 Class<? extends TableMapper> mapper,
164 Class<?> outputKeyClass,
165 Class<?> outputValueClass, Job job,
166 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
167 throws IOException {
168 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
169 addDependencyJars, true, inputFormatClass);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public static void initTableMapperJob(String table, Scan scan,
191 Class<? extends TableMapper> mapper,
192 Class<?> outputKeyClass,
193 Class<?> outputValueClass, Job job,
194 boolean addDependencyJars, boolean initCredentials,
195 Class<? extends InputFormat> inputFormatClass)
196 throws IOException {
197 job.setInputFormatClass(inputFormatClass);
198 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
199 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
200 job.setMapperClass(mapper);
201 if (Put.class.equals(outputValueClass)) {
202 job.setCombinerClass(PutCombiner.class);
203 }
204 Configuration conf = job.getConfiguration();
205 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
206 conf.set(TableInputFormat.INPUT_TABLE, table);
207 conf.set(TableInputFormat.SCAN, convertScanToString(scan));
208 conf.setStrings("io.serializations", conf.get("io.serializations"),
209 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
210 KeyValueSerialization.class.getName());
211 if (addDependencyJars) {
212 addDependencyJars(job);
213 }
214 if (initCredentials) {
215 initCredentials(job);
216 }
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public static void initTableMapperJob(byte[] table, Scan scan,
236 Class<? extends TableMapper> mapper,
237 Class<?> outputKeyClass,
238 Class<?> outputValueClass, Job job,
239 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
240 throws IOException {
241 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
242 outputValueClass, job, addDependencyJars, inputFormatClass);
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public static void initTableMapperJob(byte[] table, Scan scan,
261 Class<? extends TableMapper> mapper,
262 Class<?> outputKeyClass,
263 Class<?> outputValueClass, Job job,
264 boolean addDependencyJars)
265 throws IOException {
266 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
267 outputValueClass, job, addDependencyJars, TableInputFormat.class);
268 }
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285 public static void initTableMapperJob(String table, Scan scan,
286 Class<? extends TableMapper> mapper,
287 Class<?> outputKeyClass,
288 Class<?> outputValueClass, Job job,
289 boolean addDependencyJars)
290 throws IOException {
291 initTableMapperJob(table, scan, mapper, outputKeyClass,
292 outputValueClass, job, addDependencyJars, TableInputFormat.class);
293 }
294
295
296
297
298
299
300
301 public static void resetCacheConfig(Configuration conf) {
302 conf.setFloat(
303 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
304 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
305 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
306 }
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
329 Class<? extends TableMapper> mapper,
330 Class<?> outputKeyClass,
331 Class<?> outputValueClass, Job job,
332 boolean addDependencyJars, Path tmpRestoreDir)
333 throws IOException {
334 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
335 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
336 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
337 resetCacheConfig(job.getConfiguration());
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351
352 public static void initTableMapperJob(List<Scan> scans,
353 Class<? extends TableMapper> mapper,
354 Class<?> outputKeyClass,
355 Class<?> outputValueClass, Job job) throws IOException {
356 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
357 true);
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374 public static void initTableMapperJob(List<Scan> scans,
375 Class<? extends TableMapper> mapper,
376 Class<?> outputKeyClass,
377 Class<?> outputValueClass, Job job,
378 boolean addDependencyJars) throws IOException {
379 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
380 addDependencyJars, true);
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398 public static void initTableMapperJob(List<Scan> scans,
399 Class<? extends TableMapper> mapper,
400 Class<?> outputKeyClass,
401 Class<?> outputValueClass, Job job,
402 boolean addDependencyJars,
403 boolean initCredentials) throws IOException {
404 job.setInputFormatClass(MultiTableInputFormat.class);
405 if (outputValueClass != null) {
406 job.setMapOutputValueClass(outputValueClass);
407 }
408 if (outputKeyClass != null) {
409 job.setMapOutputKeyClass(outputKeyClass);
410 }
411 job.setMapperClass(mapper);
412 Configuration conf = job.getConfiguration();
413 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
414 List<String> scanStrings = new ArrayList<String>();
415
416 for (Scan scan : scans) {
417 scanStrings.add(convertScanToString(scan));
418 }
419 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
420 scanStrings.toArray(new String[scanStrings.size()]));
421
422 if (addDependencyJars) {
423 addDependencyJars(job);
424 }
425
426 if (initCredentials) {
427 initCredentials(job);
428 }
429 }
430
431 public static void initCredentials(Job job) throws IOException {
432 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
433 if (userProvider.isHadoopSecurityEnabled()) {
434
435 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
436 job.getConfiguration().set("mapreduce.job.credentials.binary",
437 System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
438 }
439 }
440
441 if (userProvider.isHBaseSecurityEnabled()) {
442 try {
443
444 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
445 User user = userProvider.getCurrent();
446 if (quorumAddress != null) {
447 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
448 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
449 Connection peerConn = ConnectionFactory.createConnection(peerConf);
450 try {
451 TokenUtil.addTokenForJob(peerConn, user, job);
452 } finally {
453 peerConn.close();
454 }
455 }
456
457 Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
458 try {
459 TokenUtil.addTokenForJob(conn, user, job);
460 } finally {
461 conn.close();
462 }
463 } catch (InterruptedException ie) {
464 LOG.info("Interrupted obtaining user authentication token");
465 Thread.currentThread().interrupt();
466 }
467 }
468 }
469
470
471
472
473
474
475
476
477
478
479
480
481 public static void initCredentialsForCluster(Job job, String quorumAddress)
482 throws IOException {
483 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
484 if (userProvider.isHBaseSecurityEnabled()) {
485 try {
486 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
487 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
488 Connection peerConn = ConnectionFactory.createConnection(peerConf);
489 try {
490 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
491 } finally {
492 peerConn.close();
493 }
494 } catch (InterruptedException e) {
495 LOG.info("Interrupted obtaining user authentication token");
496 Thread.interrupted();
497 }
498 }
499 }
500
501
502
503
504
505
506
507
508 static String convertScanToString(Scan scan) throws IOException {
509 ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
510 return Base64.encodeBytes(proto.toByteArray());
511 }
512
513
514
515
516
517
518
519
520 static Scan convertStringToScan(String base64) throws IOException {
521 byte [] decoded = Base64.decode(base64);
522 ClientProtos.Scan scan;
523 try {
524 scan = ClientProtos.Scan.parseFrom(decoded);
525 } catch (InvalidProtocolBufferException ipbe) {
526 throw new IOException(ipbe);
527 }
528
529 return ProtobufUtil.toScan(scan);
530 }
531
532
533
534
535
536
537
538
539
540
541 public static void initTableReducerJob(String table,
542 Class<? extends TableReducer> reducer, Job job)
543 throws IOException {
544 initTableReducerJob(table, reducer, job, null);
545 }
546
547
548
549
550
551
552
553
554
555
556
557
558 public static void initTableReducerJob(String table,
559 Class<? extends TableReducer> reducer, Job job,
560 Class partitioner) throws IOException {
561 initTableReducerJob(table, reducer, job, partitioner, null, null, null);
562 }
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587 public static void initTableReducerJob(String table,
588 Class<? extends TableReducer> reducer, Job job,
589 Class partitioner, String quorumAddress, String serverClass,
590 String serverImpl) throws IOException {
591 initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
592 serverClass, serverImpl, true);
593 }
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620 public static void initTableReducerJob(String table,
621 Class<? extends TableReducer> reducer, Job job,
622 Class partitioner, String quorumAddress, String serverClass,
623 String serverImpl, boolean addDependencyJars) throws IOException {
624
625 Configuration conf = job.getConfiguration();
626 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
627 job.setOutputFormatClass(TableOutputFormat.class);
628 if (reducer != null) job.setReducerClass(reducer);
629 conf.set(TableOutputFormat.OUTPUT_TABLE, table);
630 conf.setStrings("io.serializations", conf.get("io.serializations"),
631 MutationSerialization.class.getName(), ResultSerialization.class.getName());
632
633 if (quorumAddress != null) {
634
635 ZKUtil.transformClusterKey(quorumAddress);
636 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
637 }
638 if (serverClass != null && serverImpl != null) {
639 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
640 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
641 }
642 job.setOutputKeyClass(ImmutableBytesWritable.class);
643 job.setOutputValueClass(Writable.class);
644 if (partitioner == HRegionPartitioner.class) {
645 job.setPartitionerClass(HRegionPartitioner.class);
646 int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
647 if (job.getNumReduceTasks() > regions) {
648 job.setNumReduceTasks(regions);
649 }
650 } else if (partitioner != null) {
651 job.setPartitionerClass(partitioner);
652 }
653
654 if (addDependencyJars) {
655 addDependencyJars(job);
656 }
657
658 initCredentials(job);
659 }
660
661
662
663
664
665
666
667
668
669 public static void limitNumReduceTasks(String table, Job job)
670 throws IOException {
671 int regions =
672 MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
673 if (job.getNumReduceTasks() > regions)
674 job.setNumReduceTasks(regions);
675 }
676
677
678
679
680
681
682
683
684
685 public static void setNumReduceTasks(String table, Job job)
686 throws IOException {
687 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
688 TableName.valueOf(table)));
689 }
690
691
692
693
694
695
696
697
698
699
700 public static void setScannerCaching(Job job, int batchSize) {
701 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
702 }
703
704
705
706
707
708
709
710
711
712
713
714
715
716 public static void addHBaseDependencyJars(Configuration conf) throws IOException {
717
718
719
720
721
722 Class prefixTreeCodecClass = null;
723 try {
724 prefixTreeCodecClass =
725 Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
726 } catch (ClassNotFoundException e) {
727
728 LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
729 " Continuing without it.");
730 }
731
732 addDependencyJars(conf,
733
734 org.apache.hadoop.hbase.HConstants.class,
735 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class,
736 org.apache.hadoop.hbase.client.Put.class,
737 org.apache.hadoop.hbase.CompatibilityFactory.class,
738 org.apache.hadoop.hbase.mapreduce.TableMapper.class,
739 prefixTreeCodecClass,
740
741 org.apache.zookeeper.ZooKeeper.class,
742 io.netty.channel.Channel.class,
743 com.google.protobuf.Message.class,
744 com.google.common.collect.Lists.class,
745 org.apache.htrace.Trace.class,
746 com.yammer.metrics.core.MetricsRegistry.class);
747 }
748
749
750
751
752
753 public static String buildDependencyClasspath(Configuration conf) {
754 if (conf == null) {
755 throw new IllegalArgumentException("Must provide a configuration object.");
756 }
757 Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
758 if (paths.size() == 0) {
759 throw new IllegalArgumentException("Configuration contains no tmpjars.");
760 }
761 StringBuilder sb = new StringBuilder();
762 for (String s : paths) {
763
764 int idx = s.indexOf(":");
765 if (idx != -1) s = s.substring(idx + 1);
766 if (sb.length() > 0) sb.append(File.pathSeparator);
767 sb.append(s);
768 }
769 return sb.toString();
770 }
771
772
773
774
775
776
777 public static void addDependencyJars(Job job) throws IOException {
778 addHBaseDependencyJars(job.getConfiguration());
779 try {
780 addDependencyJars(job.getConfiguration(),
781
782
783 job.getMapOutputKeyClass(),
784 job.getMapOutputValueClass(),
785 job.getInputFormatClass(),
786 job.getOutputKeyClass(),
787 job.getOutputValueClass(),
788 job.getOutputFormatClass(),
789 job.getPartitionerClass(),
790 job.getCombinerClass());
791 } catch (ClassNotFoundException e) {
792 throw new IOException(e);
793 }
794 }
795
796
797
798
799
800
801 public static void addDependencyJars(Configuration conf,
802 Class<?>... classes) throws IOException {
803
804 FileSystem localFs = FileSystem.getLocal(conf);
805 Set<String> jars = new HashSet<String>();
806
807 jars.addAll(conf.getStringCollection("tmpjars"));
808
809
810
811 Map<String, String> packagedClasses = new HashMap<String, String>();
812
813
814 for (Class<?> clazz : classes) {
815 if (clazz == null) continue;
816
817 Path path = findOrCreateJar(clazz, localFs, packagedClasses);
818 if (path == null) {
819 LOG.warn("Could not find jar for class " + clazz +
820 " in order to ship it to the cluster.");
821 continue;
822 }
823 if (!localFs.exists(path)) {
824 LOG.warn("Could not validate jar file " + path + " for class "
825 + clazz);
826 continue;
827 }
828 jars.add(path.toString());
829 }
830 if (jars.isEmpty()) return;
831
832 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
833 }
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
849 Map<String, String> packagedClasses)
850 throws IOException {
851
852 String jar = findContainingJar(my_class, packagedClasses);
853 if (null == jar || jar.isEmpty()) {
854 jar = getJar(my_class);
855 updateMap(jar, packagedClasses);
856 }
857
858 if (null == jar || jar.isEmpty()) {
859 return null;
860 }
861
862 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
863 return new Path(jar).makeQualified(fs);
864 }
865
866
867
868
869
870
871
872 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
873 if (null == jar || jar.isEmpty()) {
874 return;
875 }
876 ZipFile zip = null;
877 try {
878 zip = new ZipFile(jar);
879 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
880 ZipEntry entry = iter.nextElement();
881 if (entry.getName().endsWith("class")) {
882 packagedClasses.put(entry.getName(), jar);
883 }
884 }
885 } finally {
886 if (null != zip) zip.close();
887 }
888 }
889
890
891
892
893
894
895
896
897
898
899 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
900 throws IOException {
901 ClassLoader loader = my_class.getClassLoader();
902
903 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
904
905 if (loader != null) {
906
907 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
908 URL url = itr.nextElement();
909 if ("jar".equals(url.getProtocol())) {
910 String toReturn = url.getPath();
911 if (toReturn.startsWith("file:")) {
912 toReturn = toReturn.substring("file:".length());
913 }
914
915
916
917
918
919
920 toReturn = toReturn.replaceAll("\\+", "%2B");
921 toReturn = URLDecoder.decode(toReturn, "UTF-8");
922 return toReturn.replaceAll("!.*$", "");
923 }
924 }
925 }
926
927
928
929 return packagedClasses.get(class_file);
930 }
931
932
933
934
935
936
937
938
939 private static String getJar(Class<?> my_class) {
940 String ret = null;
941 try {
942 ret = JarFinder.getJar(my_class);
943 } catch (Exception e) {
944
945 throw new RuntimeException("getJar invocation failed.", e);
946 }
947
948 return ret;
949 }
950 }