View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Locale;
25
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configurable;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.RegionLocator;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.mapreduce.InputSplit;
40  import org.apache.hadoop.mapreduce.JobContext;
41  import org.apache.hadoop.hbase.util.Pair;
42  import org.apache.hadoop.mapreduce.Job;
43  import org.apache.hadoop.util.StringUtils;
44
45  /**
46   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
47   */
48  @InterfaceAudience.Public
49  @InterfaceStability.Stable
50  public class TableInputFormat extends TableInputFormatBase
51  implements Configurable {
52
53    @SuppressWarnings("hiding")
54    private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
55
56    /** Job parameter that specifies the input table. */
57    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
58    /**
59     * If specified, use start keys of this table to split.
60     * This is useful when you are preparing data for bulkload.
61     */
62    private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
63    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
64     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
65     */
66    public static final String SCAN = "hbase.mapreduce.scan";
67    /** Scan start row */
68    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
69    /** Scan stop row */
70    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
71    /** Column Family to Scan */
72    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
73    /** Space delimited list of columns and column families to scan. */
74    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
75    /** The timestamp used to filter columns with a specific timestamp. */
76    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
77    /** The starting timestamp used to filter columns with a specific range of versions. */
78    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
79    /** The ending timestamp used to filter columns with a specific range of versions. */
80    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
81    /** The maximum number of version to return. */
82    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
83    /** Set to false to disable server-side caching of blocks for this scan. */
84    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
85    /** The number of rows for caching that will be passed to scanners. */
86    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
87    /** Set the maximum number of values to return for each call to next(). */
88    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
89    /** Specify if we have to shuffle the map tasks. */
90    public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
91
92    /** The configuration. */
93    private Configuration conf = null;
94
95    /**
96     * Returns the current configuration.
97     *
98     * @return The current configuration.
99     * @see org.apache.hadoop.conf.Configurable#getConf()
100    */
101   @Override
102   public Configuration getConf() {
103     return conf;
104   }
105
106   /**
107    * Sets the configuration. This is used to set the details for the table to
108    * be scanned.
109    *
110    * @param configuration  The configuration to set.
111    * @see org.apache.hadoop.conf.Configurable#setConf(
112    *   org.apache.hadoop.conf.Configuration)
113    */
114   @Override
115   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
116     justification="Intentional")
117   public void setConf(Configuration configuration) {
118     this.conf = configuration;
119
120     Scan scan = null;
121
122     if (conf.get(SCAN) != null) {
123       try {
124         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
125       } catch (IOException e) {
126         LOG.error("An error occurred.", e);
127       }
128     } else {
129       try {
130         scan = createScanFromConfiguration(conf);
131       } catch (Exception e) {
132           LOG.error(StringUtils.stringifyException(e));
133       }
134     }
135
136     setScan(scan);
137   }
138
139   /**
140    * Sets up a {@link Scan} instance, applying settings from the configuration property
141    * constants defined in {@code TableInputFormat}.  This allows specifying things such as:
142    * <ul>
143    *   <li>start and stop rows</li>
144    *   <li>column qualifiers or families</li>
145    *   <li>timestamps or timerange</li>
146    *   <li>scanner caching and batch size</li>
147    * </ul>
148    */
149   public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
150     Scan scan = new Scan();
151
152     if (conf.get(SCAN_ROW_START) != null) {
153       scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
154     }
155
156     if (conf.get(SCAN_ROW_STOP) != null) {
157       scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
158     }
159
160     if (conf.get(SCAN_COLUMNS) != null) {
161       addColumns(scan, conf.get(SCAN_COLUMNS));
162     }
163
164     if (conf.get(SCAN_COLUMN_FAMILY) != null) {
165       scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
166     }
167
168     if (conf.get(SCAN_TIMESTAMP) != null) {
169       scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
170     }
171
172     if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
173       scan.setTimeRange(
174           Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
175           Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
176     }
177
178     if (conf.get(SCAN_MAXVERSIONS) != null) {
179       scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
180     }
181
182     if (conf.get(SCAN_CACHEDROWS) != null) {
183       scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
184     }
185
186     if (conf.get(SCAN_BATCHSIZE) != null) {
187       scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
188     }
189
190     // false by default, full table scans generate too much BC churn
191     scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
192
193     return scan;
194   }
195
196   @Override
197   protected void initialize(JobContext context) throws IOException {
198     // Do we have to worry about mis-matches between the Configuration from setConf and the one
199     // in this context?
200     TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
201     try {
202       initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
203     } catch (Exception e) {
204       LOG.error(StringUtils.stringifyException(e));
205     }
206   }
207
208   /**
209    * Parses a combined family and qualifier and adds either both or just the
210    * family in case there is no qualifier. This assumes the older colon
211    * divided notation, e.g. "family:qualifier".
212    *
213    * @param scan The Scan to update.
214    * @param familyAndQualifier family and qualifier
215    * @throws IllegalArgumentException When familyAndQualifier is invalid.
216    */
217   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
218     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
219     if (fq.length == 1) {
220       scan.addFamily(fq[0]);
221     } else if (fq.length == 2) {
222       scan.addColumn(fq[0], fq[1]);
223     } else {
224       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
225     }
226   }
227
228   /**
229    * Adds an array of columns specified using old format, family:qualifier.
230    * <p>
231    * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
232    * input.
233    *
234    * @param scan The Scan to update.
235    * @param columns array of columns, formatted as <code>family:qualifier</code>
236    * @see Scan#addColumn(byte[], byte[])
237    */
238   public static void addColumns(Scan scan, byte [][] columns) {
239     for (byte[] column : columns) {
240       addColumn(scan, column);
241     }
242   }
243
244   /**
245    * Calculates the splits that will serve as input for the map tasks. The
246    * number of splits matches the number of regions in a table. Splits are shuffled if
247    * required.
248    * @param context  The current job context.
249    * @return The list of input splits.
250    * @throws IOException When creating the list of splits fails.
251    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
252    *   org.apache.hadoop.mapreduce.JobContext)
253    */
254   @Override
255   public List<InputSplit> getSplits(JobContext context) throws IOException {
256     List<InputSplit> splits = super.getSplits(context);
257     if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
258       Collections.shuffle(splits);
259     }
260     return splits;
261   }
262
263   /**
264    * Convenience method to parse a string representation of an array of column specifiers.
265    *
266    * @param scan The Scan to update.
267    * @param columns  The columns to parse.
268    */
269   private static void addColumns(Scan scan, String columns) {
270     String[] cols = columns.split(" ");
271     for (String col : cols) {
272       addColumn(scan, Bytes.toBytes(col));
273     }
274   }
275
276   @Override
277   protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
278     if (conf.get(SPLIT_TABLE) != null) {
279       TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
280       try (Connection conn = ConnectionFactory.createConnection(getConf())) {
281         try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
282           return rl.getStartEndKeys();
283         }
284       }
285     }
286
287     return super.getStartEndKeys();
288   }
289
290   /**
291    * Sets split table in map-reduce job.
292    */
293   public static void configureSplitTable(Job job, TableName tableName) {
294     job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
295   }
296 }