1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Connection;
31 import org.apache.hadoop.hbase.client.ConnectionFactory;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.mapred.FileInputFormat;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.JobConfigurable;
36 import org.apache.hadoop.util.StringUtils;
37
38
39
40
41 @InterfaceAudience.Public
42 @InterfaceStability.Stable
43 public class TableInputFormat extends TableInputFormatBase implements
44 JobConfigurable {
45 private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
46
47
48
49
50 public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
51
52 public void configure(JobConf job) {
53 try {
54 initialize(job);
55 } catch (Exception e) {
56 LOG.error(StringUtils.stringifyException(e));
57 }
58 }
59
60 @Override
61 protected void initialize(JobConf job) throws IOException {
62 Path[] tableNames = FileInputFormat.getInputPaths(job);
63 String colArg = job.get(COLUMN_LIST);
64 String[] colNames = colArg.split(" ");
65 byte [][] m_cols = new byte[colNames.length][];
66 for (int i = 0; i < m_cols.length; i++) {
67 m_cols[i] = Bytes.toBytes(colNames[i]);
68 }
69 setInputColumns(m_cols);
70 Connection connection = ConnectionFactory.createConnection(job);
71 initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
72 }
73
74 public void validateInput(JobConf job) throws IOException {
75
76 Path [] tableNames = FileInputFormat.getInputPaths(job);
77 if (tableNames == null || tableNames.length > 1) {
78 throw new IOException("expecting one table name");
79 }
80
81
82 if (getHTable() == null) {
83 throw new IOException("could not connect to table '" +
84 tableNames[0].getName() + "'");
85 }
86
87
88 String colArg = job.get(COLUMN_LIST);
89 if (colArg == null || colArg.length() == 0) {
90 throw new IOException("expecting at least one column");
91 }
92 }
93 }