001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.List; 024import java.util.Locale; 025 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.mapreduce.InputSplit; 039import org.apache.hadoop.mapreduce.JobContext; 040import org.apache.hadoop.hbase.util.Pair; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.util.StringUtils; 043 044/** 045 * Convert HBase tabular data into a format that is consumable by Map/Reduce. 046 */ 047@InterfaceAudience.Public 048public class TableInputFormat extends TableInputFormatBase 049implements Configurable { 050 051 @SuppressWarnings("hiding") 052 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); 053 054 /** Job parameter that specifies the input table. */ 055 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; 056 /** 057 * If specified, use start keys of this table to split. 058 * This is useful when you are preparing data for bulkload. 059 */ 060 private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; 061 /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. 062 * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. 063 */ 064 public static final String SCAN = "hbase.mapreduce.scan"; 065 /** Scan start row */ 066 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; 067 /** Scan stop row */ 068 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; 069 /** Column Family to Scan */ 070 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; 071 /** Space delimited list of columns and column families to scan. */ 072 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; 073 /** The timestamp used to filter columns with a specific timestamp. */ 074 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; 075 /** The starting timestamp used to filter columns with a specific range of versions. */ 076 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; 077 /** The ending timestamp used to filter columns with a specific range of versions. */ 078 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; 079 /** The maximum number of version to return. */ 080 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; 081 /** Set to false to disable server-side caching of blocks for this scan. */ 082 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; 083 /** The number of rows for caching that will be passed to scanners. */ 084 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; 085 /** Set the maximum number of values to return for each call to next(). */ 086 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; 087 /** Specify if we have to shuffle the map tasks. */ 088 public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps"; 089 090 /** The configuration. */ 091 private Configuration conf = null; 092 093 /** 094 * Returns the current configuration. 095 * 096 * @return The current configuration. 097 * @see org.apache.hadoop.conf.Configurable#getConf() 098 */ 099 @Override 100 public Configuration getConf() { 101 return conf; 102 } 103 104 /** 105 * Sets the configuration. This is used to set the details for the table to 106 * be scanned. 107 * 108 * @param configuration The configuration to set. 109 * @see org.apache.hadoop.conf.Configurable#setConf( 110 * org.apache.hadoop.conf.Configuration) 111 */ 112 @Override 113 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", 114 justification="Intentional") 115 public void setConf(Configuration configuration) { 116 this.conf = configuration; 117 118 Scan scan = null; 119 120 if (conf.get(SCAN) != null) { 121 try { 122 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); 123 } catch (IOException e) { 124 LOG.error("An error occurred.", e); 125 } 126 } else { 127 try { 128 scan = createScanFromConfiguration(conf); 129 } catch (Exception e) { 130 LOG.error(StringUtils.stringifyException(e)); 131 } 132 } 133 134 setScan(scan); 135 } 136 137 /** 138 * Sets up a {@link Scan} instance, applying settings from the configuration property 139 * constants defined in {@code TableInputFormat}. This allows specifying things such as: 140 * <ul> 141 * <li>start and stop rows</li> 142 * <li>column qualifiers or families</li> 143 * <li>timestamps or timerange</li> 144 * <li>scanner caching and batch size</li> 145 * </ul> 146 */ 147 public static Scan createScanFromConfiguration(Configuration conf) throws IOException { 148 Scan scan = new Scan(); 149 150 if (conf.get(SCAN_ROW_START) != null) { 151 scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); 152 } 153 154 if (conf.get(SCAN_ROW_STOP) != null) { 155 scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); 156 } 157 158 if (conf.get(SCAN_COLUMNS) != null) { 159 addColumns(scan, conf.get(SCAN_COLUMNS)); 160 } 161 162 for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) { 163 scan.addFamily(Bytes.toBytes(columnFamily)); 164 } 165 166 if (conf.get(SCAN_TIMESTAMP) != null) { 167 scan.setTimestamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); 168 } 169 170 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { 171 scan.setTimeRange( 172 Long.parseLong(conf.get(SCAN_TIMERANGE_START)), 173 Long.parseLong(conf.get(SCAN_TIMERANGE_END))); 174 } 175 176 if (conf.get(SCAN_MAXVERSIONS) != null) { 177 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); 178 } 179 180 if (conf.get(SCAN_CACHEDROWS) != null) { 181 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); 182 } 183 184 if (conf.get(SCAN_BATCHSIZE) != null) { 185 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); 186 } 187 188 // false by default, full table scans generate too much BC churn 189 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); 190 191 return scan; 192 } 193 194 @Override 195 protected void initialize(JobContext context) throws IOException { 196 // Do we have to worry about mis-matches between the Configuration from setConf and the one 197 // in this context? 198 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); 199 try { 200 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); 201 } catch (Exception e) { 202 LOG.error(StringUtils.stringifyException(e)); 203 } 204 } 205 206 /** 207 * Parses a combined family and qualifier and adds either both or just the 208 * family in case there is no qualifier. This assumes the older colon 209 * divided notation, e.g. "family:qualifier". 210 * 211 * @param scan The Scan to update. 212 * @param familyAndQualifier family and qualifier 213 * @throws IllegalArgumentException When familyAndQualifier is invalid. 214 */ 215 private static void addColumn(Scan scan, byte[] familyAndQualifier) { 216 byte [][] fq = CellUtil.parseColumn(familyAndQualifier); 217 if (fq.length == 1) { 218 scan.addFamily(fq[0]); 219 } else if (fq.length == 2) { 220 scan.addColumn(fq[0], fq[1]); 221 } else { 222 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 223 } 224 } 225 226 /** 227 * Adds an array of columns specified using old format, family:qualifier. 228 * <p> 229 * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the 230 * input. 231 * 232 * @param scan The Scan to update. 233 * @param columns array of columns, formatted as <code>family:qualifier</code> 234 * @see Scan#addColumn(byte[], byte[]) 235 */ 236 public static void addColumns(Scan scan, byte [][] columns) { 237 for (byte[] column : columns) { 238 addColumn(scan, column); 239 } 240 } 241 242 /** 243 * Calculates the splits that will serve as input for the map tasks. The 244 * number of splits matches the number of regions in a table. Splits are shuffled if 245 * required. 246 * @param context The current job context. 247 * @return The list of input splits. 248 * @throws IOException When creating the list of splits fails. 249 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 250 * org.apache.hadoop.mapreduce.JobContext) 251 */ 252 @Override 253 public List<InputSplit> getSplits(JobContext context) throws IOException { 254 List<InputSplit> splits = super.getSplits(context); 255 if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) { 256 Collections.shuffle(splits); 257 } 258 return splits; 259 } 260 261 /** 262 * Convenience method to parse a string representation of an array of column specifiers. 263 * 264 * @param scan The Scan to update. 265 * @param columns The columns to parse. 266 */ 267 private static void addColumns(Scan scan, String columns) { 268 String[] cols = columns.split(" "); 269 for (String col : cols) { 270 addColumn(scan, Bytes.toBytes(col)); 271 } 272 } 273 274 @Override 275 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { 276 if (conf.get(SPLIT_TABLE) != null) { 277 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); 278 try (Connection conn = ConnectionFactory.createConnection(getConf())) { 279 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { 280 return rl.getStartEndKeys(); 281 } 282 } 283 } 284 285 return super.getStartEndKeys(); 286 } 287 288 /** 289 * Sets split table in map-reduce job. 290 */ 291 public static void configureSplitTable(Job job, TableName tableName) { 292 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); 293 } 294}