001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.Locale;
024import org.apache.hadoop.conf.Configurable;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.mapreduce.InputSplit;
035import org.apache.hadoop.mapreduce.Job;
036import org.apache.hadoop.mapreduce.JobContext;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Convert HBase tabular data into a format that is consumable by Map/Reduce.
044 */
045@InterfaceAudience.Public
046public class TableInputFormat extends TableInputFormatBase implements Configurable {
047
048  @SuppressWarnings("hiding")
049  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
050
051  /** Job parameter that specifies the input table. */
052  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
053  /**
054   * If specified, use start keys of this table to split. This is useful when you are preparing data
055   * for bulkload.
056   */
057  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
058  /**
059   * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. See
060   * {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
061   */
062  public static final String SCAN = "hbase.mapreduce.scan";
063  /** Scan start row */
064  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
065  /** Scan stop row */
066  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
067  /** Column Family to Scan */
068  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
069  /** Space delimited list of columns and column families to scan. */
070  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
071  /** The timestamp used to filter columns with a specific timestamp. */
072  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
073  /** The starting timestamp used to filter columns with a specific range of versions. */
074  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
075  /** The ending timestamp used to filter columns with a specific range of versions. */
076  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
077  /** The maximum number of version to return. */
078  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
079  /** Set to false to disable server-side caching of blocks for this scan. */
080  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
081  /** The number of rows for caching that will be passed to scanners. */
082  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
083  /** Set the maximum number of values to return for each call to next(). */
084  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
085  /** Specify if we have to shuffle the map tasks. */
086  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
087
088  /** The configuration. */
089  private Configuration conf = null;
090
091  /**
092   * Returns the current configuration.
093   * @return The current configuration.
094   * @see org.apache.hadoop.conf.Configurable#getConf()
095   */
096  @Override
097  public Configuration getConf() {
098    return conf;
099  }
100
101  /**
102   * Sets the configuration. This is used to set the details for the table to be scanned.
103   * @param configuration The configuration to set.
104   * @see org.apache.hadoop.conf.Configurable#setConf( org.apache.hadoop.conf.Configuration)
105   */
106  @Override
107  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "REC_CATCH_EXCEPTION",
108      justification = "Intentional")
109  public void setConf(Configuration configuration) {
110    this.conf = configuration;
111
112    Scan scan = null;
113
114    if (conf.get(SCAN) != null) {
115      try {
116        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
117      } catch (IOException e) {
118        LOG.error("An error occurred.", e);
119      }
120    } else {
121      try {
122        scan = createScanFromConfiguration(conf);
123      } catch (Exception e) {
124        LOG.error(StringUtils.stringifyException(e));
125      }
126    }
127
128    setScan(scan);
129  }
130
131  /**
132   * Sets up a {@link Scan} instance, applying settings from the configuration property constants
133   * defined in {@code TableInputFormat}. This allows specifying things such as:
134   * <ul>
135   * <li>start and stop rows</li>
136   * <li>column qualifiers or families</li>
137   * <li>timestamps or timerange</li>
138   * <li>scanner caching and batch size</li>
139   * </ul>
140   */
141  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
142    Scan scan = new Scan();
143
144    if (conf.get(SCAN_ROW_START) != null) {
145      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
146    }
147
148    if (conf.get(SCAN_ROW_STOP) != null) {
149      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
150    }
151
152    if (conf.get(SCAN_COLUMNS) != null) {
153      addColumns(scan, conf.get(SCAN_COLUMNS));
154    }
155
156    for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
157      scan.addFamily(Bytes.toBytes(columnFamily));
158    }
159
160    if (conf.get(SCAN_TIMESTAMP) != null) {
161      scan.setTimestamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
162    }
163
164    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
165      scan.setTimeRange(Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
166        Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
167    }
168
169    if (conf.get(SCAN_MAXVERSIONS) != null) {
170      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
171    }
172
173    if (conf.get(SCAN_CACHEDROWS) != null) {
174      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
175    }
176
177    if (conf.get(SCAN_BATCHSIZE) != null) {
178      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
179    }
180
181    // false by default, full table scans generate too much BC churn
182    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
183
184    return scan;
185  }
186
187  @Override
188  protected void initialize(JobContext context) throws IOException {
189    // Do we have to worry about mis-matches between the Configuration from setConf and the one
190    // in this context?
191    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
192    try {
193      initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
194    } catch (Exception e) {
195      LOG.error(StringUtils.stringifyException(e));
196    }
197  }
198
199  /**
200   * Parses a combined family and qualifier and adds either both or just the family in case there is
201   * no qualifier. This assumes the older colon divided notation, e.g. "family:qualifier".
202   * @param scan               The Scan to update.
203   * @param familyAndQualifier family and qualifier
204   * @throws IllegalArgumentException When familyAndQualifier is invalid.
205   */
206  private static void addColumn(Scan scan, byte[] familyAndQualifier) {
207    byte[][] fq = CellUtil.parseColumn(familyAndQualifier);
208    if (fq.length == 1) {
209      scan.addFamily(fq[0]);
210    } else if (fq.length == 2) {
211      scan.addColumn(fq[0], fq[1]);
212    } else {
213      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
214    }
215  }
216
217  /**
218   * Adds an array of columns specified using old format, family:qualifier.
219   * <p>
220   * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
221   * input.
222   * @param scan    The Scan to update.
223   * @param columns array of columns, formatted as <code>family:qualifier</code>
224   * @see Scan#addColumn(byte[], byte[])
225   */
226  public static void addColumns(Scan scan, byte[][] columns) {
227    for (byte[] column : columns) {
228      addColumn(scan, column);
229    }
230  }
231
232  /**
233   * Calculates the splits that will serve as input for the map tasks. The number of splits matches
234   * the number of regions in a table. Splits are shuffled if required.
235   * @param context The current job context.
236   * @return The list of input splits.
237   * @throws IOException When creating the list of splits fails.
238   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext)
239   */
240  @Override
241  public List<InputSplit> getSplits(JobContext context) throws IOException {
242    List<InputSplit> splits = super.getSplits(context);
243    if (
244      (conf.get(SHUFFLE_MAPS) != null)
245        && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))
246    ) {
247      Collections.shuffle(splits);
248    }
249    return splits;
250  }
251
252  /**
253   * Convenience method to parse a string representation of an array of column specifiers.
254   * @param scan    The Scan to update.
255   * @param columns The columns to parse.
256   */
257  private static void addColumns(Scan scan, String columns) {
258    String[] cols = columns.split(" ");
259    for (String col : cols) {
260      addColumn(scan, Bytes.toBytes(col));
261    }
262  }
263
264  @Override
265  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
266    if (conf.get(SPLIT_TABLE) != null) {
267      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
268      try (Connection conn = ConnectionFactory.createConnection(getConf())) {
269        try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
270          return rl.getStartEndKeys();
271        }
272      }
273    }
274
275    return super.getStartEndKeys();
276  }
277
278  /**
279   * Sets split table in map-reduce job.
280   */
281  public static void configureSplitTable(Job job, TableName tableName) {
282    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
283  }
284}