1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Connection;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.mapred.InputFormat;
37 import org.apache.hadoop.mapred.InputSplit;
38 import org.apache.hadoop.mapred.JobConf;
39 import org.apache.hadoop.mapred.RecordReader;
40 import org.apache.hadoop.mapred.Reporter;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @InterfaceAudience.Public
81 @InterfaceStability.Stable
82 public abstract class TableInputFormatBase
83 implements InputFormat<ImmutableBytesWritable, Result> {
84 private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
85 private byte [][] inputColumns;
86 private HTable table;
87 private Connection connection;
88 private TableRecordReader tableRecordReader;
89 private Filter rowFilter;
90
91 private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
92 "initialized. Ensure you call initializeTable either in your constructor or initialize " +
93 "method";
94 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
95 " previous error. Please look at the previous logs lines from" +
96 " the task's full log for more details.";
97
98
99
100
101
102
103
104
105 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
106 InputSplit split, JobConf job, Reporter reporter)
107 throws IOException {
108
109 if (table == null) {
110 initialize(job);
111 }
112
113 try {
114 if (getTable() == null) {
115
116 throw new IOException(INITIALIZATION_ERROR);
117 }
118 } catch (IllegalStateException exception) {
119 throw new IOException(INITIALIZATION_ERROR, exception);
120 }
121
122 TableSplit tSplit = (TableSplit) split;
123
124 final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
125 this.tableRecordReader;
126 trr.setStartRow(tSplit.getStartRow());
127 trr.setEndRow(tSplit.getEndRow());
128 trr.setHTable(this.table);
129 trr.setInputColumns(this.inputColumns);
130 trr.setRowFilter(this.rowFilter);
131 trr.init();
132 return new RecordReader<ImmutableBytesWritable, Result>() {
133
134 @Override
135 public void close() throws IOException {
136 trr.close();
137 closeTable();
138 }
139
140 @Override
141 public ImmutableBytesWritable createKey() {
142 return trr.createKey();
143 }
144
145 @Override
146 public Result createValue() {
147 return trr.createValue();
148 }
149
150 @Override
151 public long getPos() throws IOException {
152 return trr.getPos();
153 }
154
155 @Override
156 public float getProgress() throws IOException {
157 return trr.getProgress();
158 }
159
160 @Override
161 public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
162 return trr.next(key, value);
163 }
164 };
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
186 if (this.table == null) {
187 initialize(job);
188 }
189
190 try {
191 if (getTable() == null) {
192
193 throw new IOException(INITIALIZATION_ERROR);
194 }
195 } catch (IllegalStateException exception) {
196 throw new IOException(INITIALIZATION_ERROR, exception);
197 }
198
199 byte [][] startKeys = this.table.getStartKeys();
200 if (startKeys == null || startKeys.length == 0) {
201 throw new IOException("Expecting at least one region");
202 }
203 if (this.inputColumns == null || this.inputColumns.length == 0) {
204 throw new IOException("Expecting at least one column");
205 }
206 int realNumSplits = numSplits > startKeys.length? startKeys.length:
207 numSplits;
208 InputSplit[] splits = new InputSplit[realNumSplits];
209 int middle = startKeys.length / realNumSplits;
210 int startPos = 0;
211 for (int i = 0; i < realNumSplits; i++) {
212 int lastPos = startPos + middle;
213 lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
214 String regionLocation = table.getRegionLocation(startKeys[startPos]).
215 getHostname();
216 splits[i] = new TableSplit(this.table.getName(),
217 startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
218 HConstants.EMPTY_START_ROW, regionLocation);
219 LOG.info("split: " + i + "->" + splits[i]);
220 startPos = lastPos;
221 }
222 return splits;
223 }
224
225
226
227
228
229
230
231
232 protected void initializeTable(Connection connection, TableName tableName) throws IOException {
233 if (this.table != null || this.connection != null) {
234 LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
235 "reference; TableInputFormatBase will not close these old references when done.");
236 }
237 this.table = (HTable) connection.getTable(tableName);
238 this.connection = connection;
239 }
240
241
242
243
244 protected void setInputColumns(byte [][] inputColumns) {
245 this.inputColumns = inputColumns;
246 }
247
248
249
250
251
252 @Deprecated
253 protected HTable getHTable() {
254 return (HTable) getTable();
255 }
256
257
258
259
260 protected Table getTable() {
261 if (table == null) {
262 throw new IllegalStateException(NOT_INITIALIZED);
263 }
264 return this.table;
265 }
266
267
268
269
270
271
272
273 @Deprecated
274 protected void setHTable(HTable table) {
275 this.table = table;
276 }
277
278
279
280
281
282
283
284 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
285 this.tableRecordReader = tableRecordReader;
286 }
287
288
289
290
291
292
293 protected void setRowFilter(Filter rowFilter) {
294 this.rowFilter = rowFilter;
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 protected void initialize(JobConf job) throws IOException {
313 }
314
315
316
317
318
319
320
321 protected void closeTable() throws IOException {
322 close(table, connection);
323 table = null;
324 connection = null;
325 }
326
327 private void close(Closeable... closables) throws IOException {
328 for (Closeable c : closables) {
329 if(c != null) { c.close(); }
330 }
331 }
332 }