1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configurable;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.BufferedMutator;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Mutation;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.mapreduce.JobContext;
40 import org.apache.hadoop.mapreduce.OutputCommitter;
41 import org.apache.hadoop.mapreduce.OutputFormat;
42 import org.apache.hadoop.mapreduce.RecordWriter;
43 import org.apache.hadoop.mapreduce.TaskAttemptContext;
44
45
46
47
48
49
50 @InterfaceAudience.Public
51 @InterfaceStability.Stable
52 public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
53 implements Configurable {
54
55 private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
56
57
58 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
59
60
61
62
63
64
65
66
67
68 public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
69
70
71
72
73
74
75
76 public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
77
78
79 public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
80
81
82 public static final String
83 REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
84
85 public static final String
86 REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
87
88
89 private Configuration conf = null;
90
91
92
93
94 protected class TableRecordWriter
95 extends RecordWriter<KEY, Mutation> {
96
97 private Connection connection;
98 private BufferedMutator mutator;
99
100
101
102
103
104 public TableRecordWriter() throws IOException {
105 String tableName = conf.get(OUTPUT_TABLE);
106 this.connection = ConnectionFactory.createConnection(conf);
107 this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
108 LOG.info("Created table instance for " + tableName);
109 }
110
111
112
113
114
115
116
117 @Override
118 public void close(TaskAttemptContext context)
119 throws IOException {
120 mutator.close();
121 connection.close();
122 }
123
124
125
126
127
128
129
130
131
132 @Override
133 public void write(KEY key, Mutation value)
134 throws IOException {
135 if (!(value instanceof Put) && !(value instanceof Delete)) {
136 throw new IOException("Pass a Delete or a Put");
137 }
138 mutator.mutate(value);
139 }
140 }
141
142
143
144
145
146
147
148
149
150 @Override
151 public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
152 throws IOException, InterruptedException {
153 return new TableRecordWriter();
154 }
155
156
157
158
159
160
161
162
163
164 @Override
165 public void checkOutputSpecs(JobContext context) throws IOException,
166 InterruptedException {
167
168
169 }
170
171
172
173
174
175
176
177
178
179
180 @Override
181 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
182 throws IOException, InterruptedException {
183 return new TableOutputCommitter();
184 }
185
186 @Override
187 public Configuration getConf() {
188 return conf;
189 }
190
191 @Override
192 public void setConf(Configuration otherConf) {
193 String tableName = otherConf.get(OUTPUT_TABLE);
194 if(tableName == null || tableName.length() <= 0) {
195 throw new IllegalArgumentException("Must specify table name");
196 }
197
198 String address = otherConf.get(QUORUM_ADDRESS);
199 int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
200 String serverClass = otherConf.get(REGION_SERVER_CLASS);
201 String serverImpl = otherConf.get(REGION_SERVER_IMPL);
202
203 try {
204 this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
205
206 if (serverClass != null) {
207 this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
208 }
209 if (zkClientPort != 0) {
210 this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
211 }
212 } catch(IOException e) {
213 LOG.error(e);
214 throw new RuntimeException(e);
215 }
216 }
217 }