1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.MessageFormat;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.client.ConnectionFactory;
34 import org.apache.hadoop.hbase.client.RegionLocator;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.Pair;
41 import org.apache.hadoop.hbase.util.RegionSizeCalculator;
42 import org.apache.hadoop.mapreduce.InputFormat;
43 import org.apache.hadoop.mapreduce.InputSplit;
44 import org.apache.hadoop.mapreduce.JobContext;
45 import org.apache.hadoop.mapreduce.RecordReader;
46 import org.apache.hadoop.mapreduce.TaskAttemptContext;
47
48 import java.util.Map;
49 import java.util.HashMap;
50 import java.util.Iterator;
51
52
53
54
55
56 @InterfaceAudience.Public
57 @InterfaceStability.Evolving
58 public abstract class MultiTableInputFormatBase extends
59 InputFormat<ImmutableBytesWritable, Result> {
60
61 final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
62
63
64 private List<Scan> scans;
65
66
67 private TableRecordReader tableRecordReader = null;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 @Override
83 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
84 InputSplit split, TaskAttemptContext context)
85 throws IOException, InterruptedException {
86 TableSplit tSplit = (TableSplit) split;
87 LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
88
89 if (tSplit.getTable() == null) {
90 throw new IOException("Cannot create a record reader because of a"
91 + " previous error. Please look at the previous logs lines from"
92 + " the task's full log for more details.");
93 }
94 final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
95 Table table = connection.getTable(tSplit.getTable());
96
97 if (this.tableRecordReader == null) {
98 this.tableRecordReader = new TableRecordReader();
99 }
100 final TableRecordReader trr = this.tableRecordReader;
101
102 try {
103 Scan sc = tSplit.getScan();
104 sc.setStartRow(tSplit.getStartRow());
105 sc.setStopRow(tSplit.getEndRow());
106 trr.setScan(sc);
107 trr.setTable(table);
108 return new RecordReader<ImmutableBytesWritable, Result>() {
109
110 @Override
111 public void close() throws IOException {
112 trr.close();
113 if (connection != null) {
114 connection.close();
115 }
116 }
117
118 @Override
119 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
120 return trr.getCurrentKey();
121 }
122
123 @Override
124 public Result getCurrentValue() throws IOException, InterruptedException {
125 return trr.getCurrentValue();
126 }
127
128 @Override
129 public float getProgress() throws IOException, InterruptedException {
130 return trr.getProgress();
131 }
132
133 @Override
134 public void initialize(InputSplit inputsplit, TaskAttemptContext context)
135 throws IOException, InterruptedException {
136 trr.initialize(inputsplit, context);
137 }
138
139 @Override
140 public boolean nextKeyValue() throws IOException, InterruptedException {
141 return trr.nextKeyValue();
142 }
143 };
144 } catch (IOException ioe) {
145
146
147 trr.close();
148 if (connection != null) {
149 connection.close();
150 }
151 throw ioe;
152 }
153 }
154
155
156
157
158
159
160
161
162
163
164 @Override
165 public List<InputSplit> getSplits(JobContext context) throws IOException {
166 if (scans.isEmpty()) {
167 throw new IOException("No scans were provided.");
168 }
169
170 Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
171 for (Scan scan : scans) {
172 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
173 if (tableNameBytes == null)
174 throw new IOException("A scan object did not have a table name");
175
176 TableName tableName = TableName.valueOf(tableNameBytes);
177
178 List<Scan> scanList = tableMaps.get(tableName);
179 if (scanList == null) {
180 scanList = new ArrayList<Scan>();
181 tableMaps.put(tableName, scanList);
182 }
183 scanList.add(scan);
184 }
185
186 List<InputSplit> splits = new ArrayList<InputSplit>();
187 Iterator iter = tableMaps.entrySet().iterator();
188 while (iter.hasNext()) {
189 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
190 TableName tableName = entry.getKey();
191 List<Scan> scanList = entry.getValue();
192
193 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
194 Table table = conn.getTable(tableName);
195 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
196 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
197 regionLocator, conn.getAdmin());
198 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
199 for (Scan scan : scanList) {
200 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
201 throw new IOException("Expecting at least one region for table : "
202 + tableName.getNameAsString());
203 }
204 int count = 0;
205
206 byte[] startRow = scan.getStartRow();
207 byte[] stopRow = scan.getStopRow();
208
209 for (int i = 0; i < keys.getFirst().length; i++) {
210 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
211 continue;
212 }
213
214 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
215 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
216 (stopRow.length == 0 || Bytes.compareTo(stopRow,
217 keys.getFirst()[i]) > 0)) {
218 byte[] splitStart = startRow.length == 0 ||
219 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
220 keys.getFirst()[i] : startRow;
221 byte[] splitStop = (stopRow.length == 0 ||
222 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
223 keys.getSecond()[i].length > 0 ?
224 keys.getSecond()[i] : stopRow;
225
226 HRegionLocation hregionLocation = regionLocator.getRegionLocation(
227 keys.getFirst()[i], false);
228 String regionHostname = hregionLocation.getHostname();
229 HRegionInfo regionInfo = hregionLocation.getRegionInfo();
230 long regionSize = sizeCalculator.getRegionSize(
231 regionInfo.getRegionName());
232
233 TableSplit split = new TableSplit(table.getName(),
234 scan, splitStart, splitStop, regionHostname, regionSize);
235
236 splits.add(split);
237
238 if (LOG.isDebugEnabled())
239 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
240 }
241 }
242 }
243 }
244 }
245
246 return splits;
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271 protected boolean includeRegionInSplit(final byte[] startKey,
272 final byte[] endKey) {
273 return true;
274 }
275
276
277
278
279 protected List<Scan> getScans() {
280 return this.scans;
281 }
282
283
284
285
286
287
288 protected void setScans(List<Scan> scans) {
289 this.scans = scans;
290 }
291
292
293
294
295
296
297
298 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
299 this.tableRecordReader = tableRecordReader;
300 }
301 }