1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.io.LongWritable;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapreduce.Job;
33 import org.apache.hadoop.mapreduce.Mapper;
34 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
36 import org.apache.hadoop.util.GenericOptionsParser;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class SampleUploader {
61
62 private static final String NAME = "SampleUploader";
63
64 static class Uploader
65 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
66
67 private long checkpoint = 100;
68 private long count = 0;
69
70 @Override
71 public void map(LongWritable key, Text line, Context context)
72 throws IOException {
73
74
75
76
77
78
79 String [] values = line.toString().split(",");
80 if(values.length != 4) {
81 return;
82 }
83
84
85 byte [] row = Bytes.toBytes(values[0]);
86 byte [] family = Bytes.toBytes(values[1]);
87 byte [] qualifier = Bytes.toBytes(values[2]);
88 byte [] value = Bytes.toBytes(values[3]);
89
90
91 Put put = new Put(row);
92 put.add(family, qualifier, value);
93
94
95
96
97
98 try {
99 context.write(new ImmutableBytesWritable(row), put);
100 } catch (InterruptedException e) {
101 e.printStackTrace();
102 }
103
104
105 if(++count % checkpoint == 0) {
106 context.setStatus("Emitting Put " + count);
107 }
108 }
109 }
110
111
112
113
114 public static Job configureJob(Configuration conf, String [] args)
115 throws IOException {
116 Path inputPath = new Path(args[0]);
117 String tableName = args[1];
118 Job job = new Job(conf, NAME + "_" + tableName);
119 job.setJarByClass(Uploader.class);
120 FileInputFormat.setInputPaths(job, inputPath);
121 job.setInputFormatClass(SequenceFileInputFormat.class);
122 job.setMapperClass(Uploader.class);
123
124
125 TableMapReduceUtil.initTableReducerJob(tableName, null, job);
126 job.setNumReduceTasks(0);
127 return job;
128 }
129
130
131
132
133
134
135
136 public static void main(String[] args) throws Exception {
137 Configuration conf = HBaseConfiguration.create();
138 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
139 if(otherArgs.length != 2) {
140 System.err.println("Wrong number of arguments: " + otherArgs.length);
141 System.err.println("Usage: " + NAME + " <input> <tablename>");
142 System.exit(-1);
143 }
144 Job job = configureJob(conf, otherArgs);
145 System.exit(job.waitForCompletion(true) ? 0 : 1);
146 }
147 }