1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configurable;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.BufferedMutator;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Mutation;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.mapreduce.JobContext;
41 import org.apache.hadoop.mapreduce.OutputCommitter;
42 import org.apache.hadoop.mapreduce.OutputFormat;
43 import org.apache.hadoop.mapreduce.RecordWriter;
44 import org.apache.hadoop.mapreduce.TaskAttemptContext;
45
46
47
48
49
50
51 @InterfaceAudience.Public
52 @InterfaceStability.Stable
53 public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
54 implements Configurable {
55
56 private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
57
58
59 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
60
61
62
63
64
65
66
67 public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
68
69
70 public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port";
71
72
73 public static final String
74 REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
75
76 public static final String
77 REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
78
79
80 private Configuration conf = null;
81
82
83
84
85 protected class TableRecordWriter
86 extends RecordWriter<KEY, Mutation> {
87
88 private Connection connection;
89 private BufferedMutator mutator;
90
91
92
93
94
95 public TableRecordWriter() throws IOException {
96 String tableName = conf.get(OUTPUT_TABLE);
97 this.connection = ConnectionFactory.createConnection(conf);
98 this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
99 LOG.info("Created table instance for " + tableName);
100 }
101
102
103
104
105
106
107
108 @Override
109 public void close(TaskAttemptContext context)
110 throws IOException {
111 mutator.close();
112 connection.close();
113 }
114
115
116
117
118
119
120
121
122
123 @Override
124 public void write(KEY key, Mutation value)
125 throws IOException {
126 if (!(value instanceof Put) && !(value instanceof Delete)) {
127 throw new IOException("Pass a Delete or a Put");
128 }
129 mutator.mutate(value);
130 }
131 }
132
133
134
135
136
137
138
139
140
141 @Override
142 public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
143 throws IOException, InterruptedException {
144 return new TableRecordWriter();
145 }
146
147
148
149
150
151
152
153
154
155 @Override
156 public void checkOutputSpecs(JobContext context) throws IOException,
157 InterruptedException {
158
159
160 }
161
162
163
164
165
166
167
168
169
170
171 @Override
172 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
173 throws IOException, InterruptedException {
174 return new TableOutputCommitter();
175 }
176
177 @Override
178 public Configuration getConf() {
179 return conf;
180 }
181
182 @Override
183 public void setConf(Configuration otherConf) {
184 this.conf = HBaseConfiguration.create(otherConf);
185
186 String tableName = this.conf.get(OUTPUT_TABLE);
187 if(tableName == null || tableName.length() <= 0) {
188 throw new IllegalArgumentException("Must specify table name");
189 }
190
191 String address = this.conf.get(QUORUM_ADDRESS);
192 int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
193 String serverClass = this.conf.get(REGION_SERVER_CLASS);
194 String serverImpl = this.conf.get(REGION_SERVER_IMPL);
195
196 try {
197 if (address != null) {
198 ZKUtil.applyClusterKeyToConf(this.conf, address);
199 }
200 if (serverClass != null) {
201 this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
202 }
203 if (zkClientPort != 0) {
204 this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
205 }
206 } catch(IOException e) {
207 LOG.error(e);
208 throw new RuntimeException(e);
209 }
210 }
211 }