View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configurable;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.util.StringUtils;
34  
35  /**
36   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
37   */
38  @InterfaceAudience.Public
39  @InterfaceStability.Stable
40  public class TableInputFormat extends TableInputFormatBase
41  implements Configurable {
42  
43    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
44  
45    /** Job parameter that specifies the input table. */
46    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
47    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
48     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
49     */
50    public static final String SCAN = "hbase.mapreduce.scan";
51    /** Scan start row */
52    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
53    /** Scan stop row */
54    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
55    /** Column Family to Scan */
56    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
57    /** Space delimited list of columns and column families to scan. */
58    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
59    /** The timestamp used to filter columns with a specific timestamp. */
60    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
61    /** The starting timestamp used to filter columns with a specific range of versions. */
62    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
63    /** The ending timestamp used to filter columns with a specific range of versions. */
64    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
65    /** The maximum number of version to return. */
66    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
67    /** Set to false to disable server-side caching of blocks for this scan. */
68    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
69    /** The number of rows for caching that will be passed to scanners. */
70    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
71    /** Set the maximum number of values to return for each call to next(). */
72    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
73  
74    /** The configuration. */
75    private Configuration conf = null;
76  
77    /**
78     * Returns the current configuration.
79     *
80     * @return The current configuration.
81     * @see org.apache.hadoop.conf.Configurable#getConf()
82     */
83    @Override
84    public Configuration getConf() {
85      return conf;
86    }
87  
88    /**
89     * Sets the configuration. This is used to set the details for the table to
90     * be scanned.
91     *
92     * @param configuration  The configuration to set.
93     * @see org.apache.hadoop.conf.Configurable#setConf(
94     *   org.apache.hadoop.conf.Configuration)
95     */
96    @Override
97    public void setConf(Configuration configuration) {
98      this.conf = configuration;
99      String tableName = conf.get(INPUT_TABLE);
100     try {
101       setHTable(new HTable(new Configuration(conf), tableName));
102     } catch (Exception e) {
103       LOG.error(StringUtils.stringifyException(e));
104     }
105 
106     Scan scan = null;
107 
108     if (conf.get(SCAN) != null) {
109       try {
110         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
111       } catch (IOException e) {
112         LOG.error("An error occurred.", e);
113       }
114     } else {
115       try {
116         scan = new Scan();
117 
118         if (conf.get(SCAN_ROW_START) != null) {
119           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
120         }
121 
122         if (conf.get(SCAN_ROW_STOP) != null) {
123           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
124         }
125 
126         if (conf.get(SCAN_COLUMNS) != null) {
127           addColumns(scan, conf.get(SCAN_COLUMNS));
128         }
129 
130         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
131           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
132         }
133 
134         if (conf.get(SCAN_TIMESTAMP) != null) {
135           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
136         }
137 
138         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
139           scan.setTimeRange(
140               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
141               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
142         }
143 
144         if (conf.get(SCAN_MAXVERSIONS) != null) {
145           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
146         }
147 
148         if (conf.get(SCAN_CACHEDROWS) != null) {
149           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
150         }
151 
152         if (conf.get(SCAN_BATCHSIZE) != null) {
153           scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
154         }
155 
156         // false by default, full table scans generate too much BC churn
157         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
158       } catch (Exception e) {
159           LOG.error(StringUtils.stringifyException(e));
160       }
161     }
162 
163     setScan(scan);
164   }
165   
166   /**
167    * Parses a combined family and qualifier and adds either both or just the
168    * family in case there is no qualifier. This assumes the older colon
169    * divided notation, e.g. "family:qualifier".
170    *
171    * @param scan The Scan to update.
172    * @param familyAndQualifier family and qualifier
173    * @return A reference to this instance.
174    * @throws IllegalArgumentException When familyAndQualifier is invalid.
175    */
176   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
177     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
178     if (fq.length == 1) {
179       scan.addFamily(fq[0]);
180     } else if (fq.length == 2) {
181       scan.addColumn(fq[0], fq[1]);
182     } else {
183       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
184     }
185   }
186 
187   /**
188    * Adds an array of columns specified using old format, family:qualifier.
189    * <p>
190    * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
191    * input.
192    *
193    * @param scan The Scan to update.
194    * @param columns array of columns, formatted as <code>family:qualifier</code>
195    * @see Scan#addColumn(byte[], byte[])
196    */
197   public static void addColumns(Scan scan, byte [][] columns) {
198     for (byte[] column : columns) {
199       addColumn(scan, column);
200     }
201   }
202 
203   /**
204    * Convenience method to parse a string representation of an array of column specifiers.
205    *
206    * @param scan The Scan to update.
207    * @param columns  The columns to parse.
208    */
209   private static void addColumns(Scan scan, String columns) {
210     String[] cols = columns.split(" ");
211     for (String col : cols) {
212       addColumn(scan, Bytes.toBytes(col));
213     }
214   }
215 
216 }