View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.Collections;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configurable;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.RegionLocator;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.mapreduce.InputSplit;
39  import org.apache.hadoop.mapreduce.JobContext;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.util.StringUtils;
43  
44  /**
45   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TableInputFormat extends TableInputFormatBase
50  implements Configurable {
51  
52    @SuppressWarnings("hiding")
53    private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
54  
55    /** Job parameter that specifies the input table. */
56    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
57    /**
58     * If specified, use start keys of this table to split.
59     * This is useful when you are preparing data for bulkload.
60     */
61    private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
62    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
63     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
64     */
65    public static final String SCAN = "hbase.mapreduce.scan";
66    /** Scan start row */
67    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
68    /** Scan stop row */
69    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
70    /** Column Family to Scan */
71    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
72    /** Space delimited list of columns and column families to scan. */
73    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
74    /** The timestamp used to filter columns with a specific timestamp. */
75    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
76    /** The starting timestamp used to filter columns with a specific range of versions. */
77    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
78    /** The ending timestamp used to filter columns with a specific range of versions. */
79    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
80    /** The maximum number of version to return. */
81    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
82    /** Set to false to disable server-side caching of blocks for this scan. */
83    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
84    /** The number of rows for caching that will be passed to scanners. */
85    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
86    /** Set the maximum number of values to return for each call to next(). */
87    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
88    /** Specify if we have to shuffle the map tasks. */
89    public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
90  
91    /** The configuration. */
92    private Configuration conf = null;
93  
94    /**
95     * Returns the current configuration.
96     *
97     * @return The current configuration.
98     * @see org.apache.hadoop.conf.Configurable#getConf()
99     */
100   @Override
101   public Configuration getConf() {
102     return conf;
103   }
104 
105   /**
106    * Sets the configuration. This is used to set the details for the table to
107    * be scanned.
108    *
109    * @param configuration  The configuration to set.
110    * @see org.apache.hadoop.conf.Configurable#setConf(
111    *   org.apache.hadoop.conf.Configuration)
112    */
113   @Override
114   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
115     justification="Intentional")
116   public void setConf(Configuration configuration) {
117     this.conf = configuration;
118 
119     Scan scan = null;
120 
121     if (conf.get(SCAN) != null) {
122       try {
123         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
124       } catch (IOException e) {
125         LOG.error("An error occurred.", e);
126       }
127     } else {
128       try {
129         scan = new Scan();
130 
131         if (conf.get(SCAN_ROW_START) != null) {
132           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
133         }
134 
135         if (conf.get(SCAN_ROW_STOP) != null) {
136           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
137         }
138 
139         if (conf.get(SCAN_COLUMNS) != null) {
140           addColumns(scan, conf.get(SCAN_COLUMNS));
141         }
142 
143         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
144           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
145         }
146 
147         if (conf.get(SCAN_TIMESTAMP) != null) {
148           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
149         }
150 
151         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
152           scan.setTimeRange(
153               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
154               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
155         }
156 
157         if (conf.get(SCAN_MAXVERSIONS) != null) {
158           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
159         }
160 
161         if (conf.get(SCAN_CACHEDROWS) != null) {
162           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
163         }
164 
165         if (conf.get(SCAN_BATCHSIZE) != null) {
166           scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
167         }
168 
169         // false by default, full table scans generate too much BC churn
170         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
171       } catch (Exception e) {
172           LOG.error(StringUtils.stringifyException(e));
173       }
174     }
175 
176     setScan(scan);
177   }
178 
179   @Override
180   protected void initialize(JobContext context) throws IOException {
181     // Do we have to worry about mis-matches between the Configuration from setConf and the one
182     // in this context?
183     TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
184     try {
185       initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
186     } catch (Exception e) {
187       LOG.error(StringUtils.stringifyException(e));
188     }
189   }
190 
191   /**
192    * Parses a combined family and qualifier and adds either both or just the
193    * family in case there is no qualifier. This assumes the older colon
194    * divided notation, e.g. "family:qualifier".
195    *
196    * @param scan The Scan to update.
197    * @param familyAndQualifier family and qualifier
198    * @throws IllegalArgumentException When familyAndQualifier is invalid.
199    */
200   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
201     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
202     if (fq.length == 1) {
203       scan.addFamily(fq[0]);
204     } else if (fq.length == 2) {
205       scan.addColumn(fq[0], fq[1]);
206     } else {
207       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
208     }
209   }
210 
211   /**
212    * Adds an array of columns specified using old format, family:qualifier.
213    * <p>
214    * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
215    * input.
216    *
217    * @param scan The Scan to update.
218    * @param columns array of columns, formatted as <code>family:qualifier</code>
219    * @see Scan#addColumn(byte[], byte[])
220    */
221   public static void addColumns(Scan scan, byte [][] columns) {
222     for (byte[] column : columns) {
223       addColumn(scan, column);
224     }
225   }
226 
227   /**
228    * Calculates the splits that will serve as input for the map tasks. The
229    * number of splits matches the number of regions in a table. Splits are shuffled if
230    * required.
231    * @param context  The current job context.
232    * @return The list of input splits.
233    * @throws IOException When creating the list of splits fails.
234    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
235    *   org.apache.hadoop.mapreduce.JobContext)
236    */
237   @Override
238   public List<InputSplit> getSplits(JobContext context) throws IOException {
239     List<InputSplit> splits = super.getSplits(context);
240     if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
241       Collections.shuffle(splits);
242     }
243     return splits;
244   }
245 
246   /**
247    * Convenience method to parse a string representation of an array of column specifiers.
248    *
249    * @param scan The Scan to update.
250    * @param columns  The columns to parse.
251    */
252   private static void addColumns(Scan scan, String columns) {
253     String[] cols = columns.split(" ");
254     for (String col : cols) {
255       addColumn(scan, Bytes.toBytes(col));
256     }
257   }
258 
259   @Override
260   protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
261     if (conf.get(SPLIT_TABLE) != null) {
262       TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
263       try (Connection conn = ConnectionFactory.createConnection(getConf())) {
264         try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
265           return rl.getStartEndKeys();
266         }
267       }
268     }
269 
270     return super.getStartEndKeys();
271   }
272 
273   /**
274    * Sets split table in map-reduce job.
275    */
276   public static void configureSplitTable(Job job, TableName tableName) {
277     job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
278   }
279 }