001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.Collections;
023import java.util.List;
024import java.util.Locale;
025
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.mapreduce.InputSplit;
039import org.apache.hadoop.mapreduce.JobContext;
040import org.apache.hadoop.hbase.util.Pair;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.util.StringUtils;
043
044/**
045 * Convert HBase tabular data into a format that is consumable by Map/Reduce.
046 */
047@InterfaceAudience.Public
048public class TableInputFormat extends TableInputFormatBase
049implements Configurable {
050
051  @SuppressWarnings("hiding")
052  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
053
054  /** Job parameter that specifies the input table. */
055  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
056  /**
057   * If specified, use start keys of this table to split.
058   * This is useful when you are preparing data for bulkload.
059   */
060  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
061  /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
062   * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
063   */
064  public static final String SCAN = "hbase.mapreduce.scan";
065  /** Scan start row */
066  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
067  /** Scan stop row */
068  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
069  /** Column Family to Scan */
070  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
071  /** Space delimited list of columns and column families to scan. */
072  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
073  /** The timestamp used to filter columns with a specific timestamp. */
074  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
075  /** The starting timestamp used to filter columns with a specific range of versions. */
076  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
077  /** The ending timestamp used to filter columns with a specific range of versions. */
078  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
079  /** The maximum number of version to return. */
080  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
081  /** Set to false to disable server-side caching of blocks for this scan. */
082  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
083  /** The number of rows for caching that will be passed to scanners. */
084  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
085  /** Set the maximum number of values to return for each call to next(). */
086  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
087  /** Specify if we have to shuffle the map tasks. */
088  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
089
090  /** The configuration. */
091  private Configuration conf = null;
092
093  /**
094   * Returns the current configuration.
095   *
096   * @return The current configuration.
097   * @see org.apache.hadoop.conf.Configurable#getConf()
098   */
099  @Override
100  public Configuration getConf() {
101    return conf;
102  }
103
104  /**
105   * Sets the configuration. This is used to set the details for the table to
106   * be scanned.
107   *
108   * @param configuration  The configuration to set.
109   * @see org.apache.hadoop.conf.Configurable#setConf(
110   *   org.apache.hadoop.conf.Configuration)
111   */
112  @Override
113  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
114    justification="Intentional")
115  public void setConf(Configuration configuration) {
116    this.conf = configuration;
117
118    Scan scan = null;
119
120    if (conf.get(SCAN) != null) {
121      try {
122        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
123      } catch (IOException e) {
124        LOG.error("An error occurred.", e);
125      }
126    } else {
127      try {
128        scan = createScanFromConfiguration(conf);
129      } catch (Exception e) {
130          LOG.error(StringUtils.stringifyException(e));
131      }
132    }
133
134    setScan(scan);
135  }
136
137  /**
138   * Sets up a {@link Scan} instance, applying settings from the configuration property
139   * constants defined in {@code TableInputFormat}.  This allows specifying things such as:
140   * <ul>
141   *   <li>start and stop rows</li>
142   *   <li>column qualifiers or families</li>
143   *   <li>timestamps or timerange</li>
144   *   <li>scanner caching and batch size</li>
145   * </ul>
146   */
147  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
148    Scan scan = new Scan();
149
150    if (conf.get(SCAN_ROW_START) != null) {
151      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
152    }
153
154    if (conf.get(SCAN_ROW_STOP) != null) {
155      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
156    }
157
158    if (conf.get(SCAN_COLUMNS) != null) {
159      addColumns(scan, conf.get(SCAN_COLUMNS));
160    }
161
162    for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
163      scan.addFamily(Bytes.toBytes(columnFamily));
164    }
165
166    if (conf.get(SCAN_TIMESTAMP) != null) {
167      scan.setTimestamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
168    }
169
170    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
171      scan.setTimeRange(
172          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
173          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
174    }
175
176    if (conf.get(SCAN_MAXVERSIONS) != null) {
177      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
178    }
179
180    if (conf.get(SCAN_CACHEDROWS) != null) {
181      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
182    }
183
184    if (conf.get(SCAN_BATCHSIZE) != null) {
185      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
186    }
187
188    // false by default, full table scans generate too much BC churn
189    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
190
191    return scan;
192  }
193
194  @Override
195  protected void initialize(JobContext context) throws IOException {
196    // Do we have to worry about mis-matches between the Configuration from setConf and the one
197    // in this context?
198    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
199    try {
200      initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
201    } catch (Exception e) {
202      LOG.error(StringUtils.stringifyException(e));
203    }
204  }
205
206  /**
207   * Parses a combined family and qualifier and adds either both or just the
208   * family in case there is no qualifier. This assumes the older colon
209   * divided notation, e.g. "family:qualifier".
210   *
211   * @param scan The Scan to update.
212   * @param familyAndQualifier family and qualifier
213   * @throws IllegalArgumentException When familyAndQualifier is invalid.
214   */
215  private static void addColumn(Scan scan, byte[] familyAndQualifier) {
216    byte [][] fq = CellUtil.parseColumn(familyAndQualifier);
217    if (fq.length == 1) {
218      scan.addFamily(fq[0]);
219    } else if (fq.length == 2) {
220      scan.addColumn(fq[0], fq[1]);
221    } else {
222      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
223    }
224  }
225
226  /**
227   * Adds an array of columns specified using old format, family:qualifier.
228   * <p>
229   * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
230   * input.
231   *
232   * @param scan The Scan to update.
233   * @param columns array of columns, formatted as <code>family:qualifier</code>
234   * @see Scan#addColumn(byte[], byte[])
235   */
236  public static void addColumns(Scan scan, byte [][] columns) {
237    for (byte[] column : columns) {
238      addColumn(scan, column);
239    }
240  }
241
242  /**
243   * Calculates the splits that will serve as input for the map tasks. The
244   * number of splits matches the number of regions in a table. Splits are shuffled if
245   * required.
246   * @param context  The current job context.
247   * @return The list of input splits.
248   * @throws IOException When creating the list of splits fails.
249   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
250   *   org.apache.hadoop.mapreduce.JobContext)
251   */
252  @Override
253  public List<InputSplit> getSplits(JobContext context) throws IOException {
254    List<InputSplit> splits = super.getSplits(context);
255    if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
256      Collections.shuffle(splits);
257    }
258    return splits;
259  }
260
261  /**
262   * Convenience method to parse a string representation of an array of column specifiers.
263   *
264   * @param scan The Scan to update.
265   * @param columns  The columns to parse.
266   */
267  private static void addColumns(Scan scan, String columns) {
268    String[] cols = columns.split(" ");
269    for (String col : cols) {
270      addColumn(scan, Bytes.toBytes(col));
271    }
272  }
273
274  @Override
275  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
276    if (conf.get(SPLIT_TABLE) != null) {
277      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
278      try (Connection conn = ConnectionFactory.createConnection(getConf())) {
279        try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
280          return rl.getStartEndKeys();
281        }
282      }
283    }
284
285    return super.getStartEndKeys();
286  }
287
288  /**
289   * Sets split table in map-reduce job.
290   */
291  public static void configureSplitTable(Job job, TableName tableName) {
292    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
293  }
294}