1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.Collections;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configurable;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.RegionLocator;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.InputSplit;
39 import org.apache.hadoop.mapreduce.JobContext;
40 import org.apache.hadoop.hbase.util.Pair;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.util.StringUtils;
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class TableInputFormat extends TableInputFormatBase
50 implements Configurable {
51
52 @SuppressWarnings("hiding")
53 private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
54
55
56 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
57
58
59
60
61 private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
62
63
64
65 public static final String SCAN = "hbase.mapreduce.scan";
66
67 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
68
69 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
70
71 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
72
73 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
74
75 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
76
77 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
78
79 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
80
81 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
82
83 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
84
85 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
86
87 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
88
89 public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
90
91
92 private Configuration conf = null;
93
94
95
96
97
98
99
100 @Override
101 public Configuration getConf() {
102 return conf;
103 }
104
105
106
107
108
109
110
111
112
113 @Override
114 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
115 justification="Intentional")
116 public void setConf(Configuration configuration) {
117 this.conf = configuration;
118
119 Scan scan = null;
120
121 if (conf.get(SCAN) != null) {
122 try {
123 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
124 } catch (IOException e) {
125 LOG.error("An error occurred.", e);
126 }
127 } else {
128 try {
129 scan = new Scan();
130
131 if (conf.get(SCAN_ROW_START) != null) {
132 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
133 }
134
135 if (conf.get(SCAN_ROW_STOP) != null) {
136 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
137 }
138
139 if (conf.get(SCAN_COLUMNS) != null) {
140 addColumns(scan, conf.get(SCAN_COLUMNS));
141 }
142
143 if (conf.get(SCAN_COLUMN_FAMILY) != null) {
144 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
145 }
146
147 if (conf.get(SCAN_TIMESTAMP) != null) {
148 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
149 }
150
151 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
152 scan.setTimeRange(
153 Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
154 Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
155 }
156
157 if (conf.get(SCAN_MAXVERSIONS) != null) {
158 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
159 }
160
161 if (conf.get(SCAN_CACHEDROWS) != null) {
162 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
163 }
164
165 if (conf.get(SCAN_BATCHSIZE) != null) {
166 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
167 }
168
169
170 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
171 } catch (Exception e) {
172 LOG.error(StringUtils.stringifyException(e));
173 }
174 }
175
176 setScan(scan);
177 }
178
179 @Override
180 protected void initialize(JobContext context) throws IOException {
181
182
183 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
184 try {
185 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
186 } catch (Exception e) {
187 LOG.error(StringUtils.stringifyException(e));
188 }
189 }
190
191
192
193
194
195
196
197
198
199
200 private static void addColumn(Scan scan, byte[] familyAndQualifier) {
201 byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
202 if (fq.length == 1) {
203 scan.addFamily(fq[0]);
204 } else if (fq.length == 2) {
205 scan.addColumn(fq[0], fq[1]);
206 } else {
207 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
208 }
209 }
210
211
212
213
214
215
216
217
218
219
220
221 public static void addColumns(Scan scan, byte [][] columns) {
222 for (byte[] column : columns) {
223 addColumn(scan, column);
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237 @Override
238 public List<InputSplit> getSplits(JobContext context) throws IOException {
239 List<InputSplit> splits = super.getSplits(context);
240 if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
241 Collections.shuffle(splits);
242 }
243 return splits;
244 }
245
246
247
248
249
250
251
252 private static void addColumns(Scan scan, String columns) {
253 String[] cols = columns.split(" ");
254 for (String col : cols) {
255 addColumn(scan, Bytes.toBytes(col));
256 }
257 }
258
259 @Override
260 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
261 if (conf.get(SPLIT_TABLE) != null) {
262 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
263 try (Connection conn = ConnectionFactory.createConnection(getConf())) {
264 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
265 return rl.getStartEndKeys();
266 }
267 }
268 }
269
270 return super.getStartEndKeys();
271 }
272
273
274
275
276 public static void configureSplitTable(Job job, TableName tableName) {
277 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
278 }
279 }