1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.Collections;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configurable;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.RegionLocator;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.InputSplit;
39 import org.apache.hadoop.mapreduce.JobContext;
40 import org.apache.hadoop.hbase.util.Pair;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.util.StringUtils;
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class TableInputFormat extends TableInputFormatBase
50 implements Configurable {
51
52 @SuppressWarnings("hiding")
53 private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
54
55
56 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
57
58
59
60
61 private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
62
63
64
65 public static final String SCAN = "hbase.mapreduce.scan";
66
67 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
68
69 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
70
71 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
72
73 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
74
75 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
76
77 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
78
79 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
80
81 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
82
83 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
84
85 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
86
87 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
88
89 public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
90
91
92 private Configuration conf = null;
93
94
95
96
97
98
99
100 @Override
101 public Configuration getConf() {
102 return conf;
103 }
104
105
106
107
108
109
110
111
112
113 @Override
114 public void setConf(Configuration configuration) {
115 this.conf = configuration;
116
117 Scan scan = null;
118
119 if (conf.get(SCAN) != null) {
120 try {
121 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
122 } catch (IOException e) {
123 LOG.error("An error occurred.", e);
124 }
125 } else {
126 try {
127 scan = new Scan();
128
129 if (conf.get(SCAN_ROW_START) != null) {
130 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
131 }
132
133 if (conf.get(SCAN_ROW_STOP) != null) {
134 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
135 }
136
137 if (conf.get(SCAN_COLUMNS) != null) {
138 addColumns(scan, conf.get(SCAN_COLUMNS));
139 }
140
141 if (conf.get(SCAN_COLUMN_FAMILY) != null) {
142 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
143 }
144
145 if (conf.get(SCAN_TIMESTAMP) != null) {
146 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
147 }
148
149 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
150 scan.setTimeRange(
151 Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
152 Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
153 }
154
155 if (conf.get(SCAN_MAXVERSIONS) != null) {
156 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
157 }
158
159 if (conf.get(SCAN_CACHEDROWS) != null) {
160 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
161 }
162
163 if (conf.get(SCAN_BATCHSIZE) != null) {
164 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
165 }
166
167
168 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
169 } catch (Exception e) {
170 LOG.error(StringUtils.stringifyException(e));
171 }
172 }
173
174 setScan(scan);
175 }
176
177 @Override
178 protected void initialize(JobContext context) throws IOException {
179
180
181 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
182 try {
183 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
184 } catch (Exception e) {
185 LOG.error(StringUtils.stringifyException(e));
186 }
187 }
188
189
190
191
192
193
194
195
196
197
198 private static void addColumn(Scan scan, byte[] familyAndQualifier) {
199 byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
200 if (fq.length == 1) {
201 scan.addFamily(fq[0]);
202 } else if (fq.length == 2) {
203 scan.addColumn(fq[0], fq[1]);
204 } else {
205 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218
219 public static void addColumns(Scan scan, byte [][] columns) {
220 for (byte[] column : columns) {
221 addColumn(scan, column);
222 }
223 }
224
225
226
227
228
229
230
231
232
233
234
235 @Override
236 public List<InputSplit> getSplits(JobContext context) throws IOException {
237 List<InputSplit> splits = super.getSplits(context);
238 if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
239 Collections.shuffle(splits);
240 }
241 return splits;
242 }
243
244
245
246
247
248
249
250 private static void addColumns(Scan scan, String columns) {
251 String[] cols = columns.split(" ");
252 for (String col : cols) {
253 addColumn(scan, Bytes.toBytes(col));
254 }
255 }
256
257 @Override
258 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
259 if (conf.get(SPLIT_TABLE) != null) {
260 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
261 try (Connection conn = ConnectionFactory.createConnection(getConf())) {
262 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
263 return rl.getStartEndKeys();
264 }
265 }
266 }
267
268 return super.getStartEndKeys();
269 }
270
271
272
273
274 public static void configureSplitTable(Job job, TableName tableName) {
275 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
276 }
277 }