1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.URL;
24 import java.net.URLDecoder;
25 import java.util.*;
26 import java.util.zip.ZipEntry;
27 import java.util.zip.ZipFile;
28
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.MetaTableAccessor;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
48 import org.apache.hadoop.hbase.security.User;
49 import org.apache.hadoop.hbase.security.UserProvider;
50 import org.apache.hadoop.hbase.security.token.TokenUtil;
51 import org.apache.hadoop.hbase.util.Base64;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
54 import org.apache.hadoop.io.Writable;
55 import org.apache.hadoop.mapreduce.InputFormat;
56 import org.apache.hadoop.mapreduce.Job;
57 import org.apache.hadoop.util.StringUtils;
58
59
60
61
62 @SuppressWarnings({ "rawtypes", "unchecked" })
63 @InterfaceAudience.Public
64 @InterfaceStability.Stable
65 public class TableMapReduceUtil {
66 private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public static void initTableMapperJob(String table, Scan scan,
82 Class<? extends TableMapper> mapper,
83 Class<?> outputKeyClass,
84 Class<?> outputValueClass, Job job)
85 throws IOException {
86 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
87 job, true);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 public static void initTableMapperJob(TableName table,
105 Scan scan,
106 Class<? extends TableMapper> mapper,
107 Class<?> outputKeyClass,
108 Class<?> outputValueClass,
109 Job job) throws IOException {
110 initTableMapperJob(table.getNameAsString(),
111 scan,
112 mapper,
113 outputKeyClass,
114 outputValueClass,
115 job,
116 true);
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public static void initTableMapperJob(byte[] table, Scan scan,
133 Class<? extends TableMapper> mapper,
134 Class<?> outputKeyClass,
135 Class<?> outputValueClass, Job job)
136 throws IOException {
137 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
138 job, true);
139 }
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public static void initTableMapperJob(String table, Scan scan,
157 Class<? extends TableMapper> mapper,
158 Class<?> outputKeyClass,
159 Class<?> outputValueClass, Job job,
160 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
161 throws IOException {
162 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
163 addDependencyJars, true, inputFormatClass);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public static void initTableMapperJob(String table, Scan scan,
185 Class<? extends TableMapper> mapper,
186 Class<?> outputKeyClass,
187 Class<?> outputValueClass, Job job,
188 boolean addDependencyJars, boolean initCredentials,
189 Class<? extends InputFormat> inputFormatClass)
190 throws IOException {
191 job.setInputFormatClass(inputFormatClass);
192 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
193 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
194 job.setMapperClass(mapper);
195 if (Put.class.equals(outputValueClass)) {
196 job.setCombinerClass(PutCombiner.class);
197 }
198 Configuration conf = job.getConfiguration();
199 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
200 conf.set(TableInputFormat.INPUT_TABLE, table);
201 conf.set(TableInputFormat.SCAN, convertScanToString(scan));
202 conf.setStrings("io.serializations", conf.get("io.serializations"),
203 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
204 KeyValueSerialization.class.getName());
205 if (addDependencyJars) {
206 addDependencyJars(job);
207 }
208 if (initCredentials) {
209 initCredentials(job);
210 }
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 public static void initTableMapperJob(byte[] table, Scan scan,
230 Class<? extends TableMapper> mapper,
231 Class<?> outputKeyClass,
232 Class<?> outputValueClass, Job job,
233 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
234 throws IOException {
235 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
236 outputValueClass, job, addDependencyJars, inputFormatClass);
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 public static void initTableMapperJob(byte[] table, Scan scan,
255 Class<? extends TableMapper> mapper,
256 Class<?> outputKeyClass,
257 Class<?> outputValueClass, Job job,
258 boolean addDependencyJars)
259 throws IOException {
260 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
261 outputValueClass, job, addDependencyJars, TableInputFormat.class);
262 }
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279 public static void initTableMapperJob(String table, Scan scan,
280 Class<? extends TableMapper> mapper,
281 Class<?> outputKeyClass,
282 Class<?> outputValueClass, Job job,
283 boolean addDependencyJars)
284 throws IOException {
285 initTableMapperJob(table, scan, mapper, outputKeyClass,
286 outputValueClass, job, addDependencyJars, TableInputFormat.class);
287 }
288
289
290
291
292
293
294
295 public static void resetCacheConfig(Configuration conf) {
296 conf.setFloat(
297 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
298 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
299 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
317 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
318 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
319 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
320
321 job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
322 if (outputValueClass != null) {
323 job.setMapOutputValueClass(outputValueClass);
324 }
325 if (outputKeyClass != null) {
326 job.setMapOutputKeyClass(outputKeyClass);
327 }
328 job.setMapperClass(mapper);
329 Configuration conf = job.getConfiguration();
330 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
331
332 if (addDependencyJars) {
333 addDependencyJars(job);
334 }
335
336 resetCacheConfig(job.getConfiguration());
337 }
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
360 Class<? extends TableMapper> mapper,
361 Class<?> outputKeyClass,
362 Class<?> outputValueClass, Job job,
363 boolean addDependencyJars, Path tmpRestoreDir)
364 throws IOException {
365 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
366 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
367 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
368 resetCacheConfig(job.getConfiguration());
369 }
370
371
372
373
374
375
376
377
378
379
380
381
382
383 public static void initTableMapperJob(List<Scan> scans,
384 Class<? extends TableMapper> mapper,
385 Class<?> outputKeyClass,
386 Class<?> outputValueClass, Job job) throws IOException {
387 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
388 true);
389 }
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405 public static void initTableMapperJob(List<Scan> scans,
406 Class<? extends TableMapper> mapper,
407 Class<?> outputKeyClass,
408 Class<?> outputValueClass, Job job,
409 boolean addDependencyJars) throws IOException {
410 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
411 addDependencyJars, true);
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429 public static void initTableMapperJob(List<Scan> scans,
430 Class<? extends TableMapper> mapper,
431 Class<?> outputKeyClass,
432 Class<?> outputValueClass, Job job,
433 boolean addDependencyJars,
434 boolean initCredentials) throws IOException {
435 job.setInputFormatClass(MultiTableInputFormat.class);
436 if (outputValueClass != null) {
437 job.setMapOutputValueClass(outputValueClass);
438 }
439 if (outputKeyClass != null) {
440 job.setMapOutputKeyClass(outputKeyClass);
441 }
442 job.setMapperClass(mapper);
443 Configuration conf = job.getConfiguration();
444 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
445 List<String> scanStrings = new ArrayList<String>();
446
447 for (Scan scan : scans) {
448 scanStrings.add(convertScanToString(scan));
449 }
450 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
451 scanStrings.toArray(new String[scanStrings.size()]));
452
453 if (addDependencyJars) {
454 addDependencyJars(job);
455 }
456
457 if (initCredentials) {
458 initCredentials(job);
459 }
460 }
461
462 public static void initCredentials(Job job) throws IOException {
463 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
464 if (userProvider.isHadoopSecurityEnabled()) {
465
466 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
467 job.getConfiguration().set("mapreduce.job.credentials.binary",
468 System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
469 }
470 }
471
472 if (userProvider.isHBaseSecurityEnabled()) {
473 try {
474
475 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
476 User user = userProvider.getCurrent();
477 if (quorumAddress != null) {
478 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
479 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
480 Connection peerConn = ConnectionFactory.createConnection(peerConf);
481 try {
482 TokenUtil.addTokenForJob(peerConn, user, job);
483 } finally {
484 peerConn.close();
485 }
486 }
487
488 Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
489 try {
490 TokenUtil.addTokenForJob(conn, user, job);
491 } finally {
492 conn.close();
493 }
494 } catch (InterruptedException ie) {
495 LOG.info("Interrupted obtaining user authentication token");
496 Thread.currentThread().interrupt();
497 }
498 }
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514 @Deprecated
515 public static void initCredentialsForCluster(Job job, String quorumAddress)
516 throws IOException {
517 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
518 quorumAddress);
519 initCredentialsForCluster(job, peerConf);
520 }
521
522
523
524
525
526
527
528
529
530 public static void initCredentialsForCluster(Job job, Configuration conf)
531 throws IOException {
532 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
533 if (userProvider.isHBaseSecurityEnabled()) {
534 try {
535 Connection peerConn = ConnectionFactory.createConnection(conf);
536 try {
537 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
538 } finally {
539 peerConn.close();
540 }
541 } catch (InterruptedException e) {
542 LOG.info("Interrupted obtaining user authentication token");
543 Thread.interrupted();
544 }
545 }
546 }
547
548
549
550
551
552
553
554
555 static String convertScanToString(Scan scan) throws IOException {
556 ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
557 return Base64.encodeBytes(proto.toByteArray());
558 }
559
560
561
562
563
564
565
566
567 static Scan convertStringToScan(String base64) throws IOException {
568 byte [] decoded = Base64.decode(base64);
569 ClientProtos.Scan scan;
570 try {
571 scan = ClientProtos.Scan.parseFrom(decoded);
572 } catch (InvalidProtocolBufferException ipbe) {
573 throw new IOException(ipbe);
574 }
575
576 return ProtobufUtil.toScan(scan);
577 }
578
579
580
581
582
583
584
585
586
587
588 public static void initTableReducerJob(String table,
589 Class<? extends TableReducer> reducer, Job job)
590 throws IOException {
591 initTableReducerJob(table, reducer, job, null);
592 }
593
594
595
596
597
598
599
600
601
602
603
604
605 public static void initTableReducerJob(String table,
606 Class<? extends TableReducer> reducer, Job job,
607 Class partitioner) throws IOException {
608 initTableReducerJob(table, reducer, job, partitioner, null, null, null);
609 }
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635 public static void initTableReducerJob(String table,
636 Class<? extends TableReducer> reducer, Job job,
637 Class partitioner, String quorumAddress, String serverClass,
638 String serverImpl) throws IOException {
639 initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
640 serverClass, serverImpl, true);
641 }
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669 public static void initTableReducerJob(String table,
670 Class<? extends TableReducer> reducer, Job job,
671 Class partitioner, String quorumAddress, String serverClass,
672 String serverImpl, boolean addDependencyJars) throws IOException {
673
674 Configuration conf = job.getConfiguration();
675 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
676 job.setOutputFormatClass(TableOutputFormat.class);
677 if (reducer != null) job.setReducerClass(reducer);
678 conf.set(TableOutputFormat.OUTPUT_TABLE, table);
679 conf.setStrings("io.serializations", conf.get("io.serializations"),
680 MutationSerialization.class.getName(), ResultSerialization.class.getName());
681
682 if (quorumAddress != null) {
683
684 ZKConfig.validateClusterKey(quorumAddress);
685 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
686 }
687 if (serverClass != null && serverImpl != null) {
688 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
689 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
690 }
691 job.setOutputKeyClass(ImmutableBytesWritable.class);
692 job.setOutputValueClass(Writable.class);
693 if (partitioner == HRegionPartitioner.class) {
694 job.setPartitionerClass(HRegionPartitioner.class);
695 int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
696 if (job.getNumReduceTasks() > regions) {
697 job.setNumReduceTasks(regions);
698 }
699 } else if (partitioner != null) {
700 job.setPartitionerClass(partitioner);
701 }
702
703 if (addDependencyJars) {
704 addDependencyJars(job);
705 }
706
707 initCredentials(job);
708 }
709
710
711
712
713
714
715
716
717
718 public static void limitNumReduceTasks(String table, Job job)
719 throws IOException {
720 int regions =
721 MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
722 if (job.getNumReduceTasks() > regions)
723 job.setNumReduceTasks(regions);
724 }
725
726
727
728
729
730
731
732
733
734 public static void setNumReduceTasks(String table, Job job)
735 throws IOException {
736 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
737 TableName.valueOf(table)));
738 }
739
740
741
742
743
744
745
746
747
748
749 public static void setScannerCaching(Job job, int batchSize) {
750 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
751 }
752
753
754
755
756
757
758
759
760
761
762
763
764
765 public static void addHBaseDependencyJars(Configuration conf) throws IOException {
766
767
768
769
770
771 Class prefixTreeCodecClass = null;
772 try {
773 prefixTreeCodecClass =
774 Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
775 } catch (ClassNotFoundException e) {
776
777 LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
778 " Continuing without it.");
779 }
780
781 addDependencyJars(conf,
782
783 org.apache.hadoop.hbase.HConstants.class,
784 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class,
785 org.apache.hadoop.hbase.client.Put.class,
786 org.apache.hadoop.hbase.CompatibilityFactory.class,
787 org.apache.hadoop.hbase.mapreduce.TableMapper.class,
788 prefixTreeCodecClass,
789
790 org.apache.zookeeper.ZooKeeper.class,
791 io.netty.channel.Channel.class,
792 com.google.protobuf.Message.class,
793 com.google.common.collect.Lists.class,
794 org.apache.htrace.Trace.class,
795 com.yammer.metrics.core.MetricsRegistry.class);
796 }
797
798
799
800
801
802 public static String buildDependencyClasspath(Configuration conf) {
803 if (conf == null) {
804 throw new IllegalArgumentException("Must provide a configuration object.");
805 }
806 Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
807 if (paths.size() == 0) {
808 throw new IllegalArgumentException("Configuration contains no tmpjars.");
809 }
810 StringBuilder sb = new StringBuilder();
811 for (String s : paths) {
812
813 int idx = s.indexOf(":");
814 if (idx != -1) s = s.substring(idx + 1);
815 if (sb.length() > 0) sb.append(File.pathSeparator);
816 sb.append(s);
817 }
818 return sb.toString();
819 }
820
821
822
823
824
825
826 public static void addDependencyJars(Job job) throws IOException {
827 addHBaseDependencyJars(job.getConfiguration());
828 try {
829 addDependencyJars(job.getConfiguration(),
830
831
832 job.getMapOutputKeyClass(),
833 job.getMapOutputValueClass(),
834 job.getInputFormatClass(),
835 job.getOutputKeyClass(),
836 job.getOutputValueClass(),
837 job.getOutputFormatClass(),
838 job.getPartitionerClass(),
839 job.getCombinerClass());
840 } catch (ClassNotFoundException e) {
841 throw new IOException(e);
842 }
843 }
844
845
846
847
848
849
850 public static void addDependencyJars(Configuration conf,
851 Class<?>... classes) throws IOException {
852
853 FileSystem localFs = FileSystem.getLocal(conf);
854 Set<String> jars = new HashSet<String>();
855
856 jars.addAll(conf.getStringCollection("tmpjars"));
857
858
859
860 Map<String, String> packagedClasses = new HashMap<String, String>();
861
862
863 for (Class<?> clazz : classes) {
864 if (clazz == null) continue;
865
866 Path path = findOrCreateJar(clazz, localFs, packagedClasses);
867 if (path == null) {
868 LOG.warn("Could not find jar for class " + clazz +
869 " in order to ship it to the cluster.");
870 continue;
871 }
872 if (!localFs.exists(path)) {
873 LOG.warn("Could not validate jar file " + path + " for class "
874 + clazz);
875 continue;
876 }
877 jars.add(path.toString());
878 }
879 if (jars.isEmpty()) return;
880
881 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
882 }
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
898 Map<String, String> packagedClasses)
899 throws IOException {
900
901 String jar = findContainingJar(my_class, packagedClasses);
902 if (null == jar || jar.isEmpty()) {
903 jar = getJar(my_class);
904 updateMap(jar, packagedClasses);
905 }
906
907 if (null == jar || jar.isEmpty()) {
908 return null;
909 }
910
911 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
912 return new Path(jar).makeQualified(fs);
913 }
914
915
916
917
918
919
920
921 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
922 if (null == jar || jar.isEmpty()) {
923 return;
924 }
925 ZipFile zip = null;
926 try {
927 zip = new ZipFile(jar);
928 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
929 ZipEntry entry = iter.nextElement();
930 if (entry.getName().endsWith("class")) {
931 packagedClasses.put(entry.getName(), jar);
932 }
933 }
934 } finally {
935 if (null != zip) zip.close();
936 }
937 }
938
939
940
941
942
943
944
945
946
947
948 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
949 throws IOException {
950 ClassLoader loader = my_class.getClassLoader();
951
952 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
953
954 if (loader != null) {
955
956 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
957 URL url = itr.nextElement();
958 if ("jar".equals(url.getProtocol())) {
959 String toReturn = url.getPath();
960 if (toReturn.startsWith("file:")) {
961 toReturn = toReturn.substring("file:".length());
962 }
963
964
965
966
967
968
969 toReturn = toReturn.replaceAll("\\+", "%2B");
970 toReturn = URLDecoder.decode(toReturn, "UTF-8");
971 return toReturn.replaceAll("!.*$", "");
972 }
973 }
974 }
975
976
977
978 return packagedClasses.get(class_file);
979 }
980
981
982
983
984
985
986
987
988 private static String getJar(Class<?> my_class) {
989 String ret = null;
990 try {
991 ret = JarFinder.getJar(my_class);
992 } catch (Exception e) {
993
994 throw new RuntimeException("getJar invocation failed.", e);
995 }
996
997 return ret;
998 }
999 }