001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.Locale; 024import org.apache.hadoop.conf.Configurable; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.RegionLocator; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.mapreduce.InputSplit; 035import org.apache.hadoop.mapreduce.Job; 036import org.apache.hadoop.mapreduce.JobContext; 037import org.apache.hadoop.util.StringUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Convert HBase tabular data into a format that is consumable by Map/Reduce. 044 */ 045@InterfaceAudience.Public 046public class TableInputFormat extends TableInputFormatBase implements Configurable { 047 048 @SuppressWarnings("hiding") 049 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); 050 051 /** Job parameter that specifies the input table. */ 052 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; 053 /** 054 * If specified, use start keys of this table to split. This is useful when you are preparing data 055 * for bulkload. 056 */ 057 private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; 058 /** 059 * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. See 060 * {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. 061 */ 062 public static final String SCAN = "hbase.mapreduce.scan"; 063 /** Scan start row */ 064 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; 065 /** Scan stop row */ 066 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; 067 /** Column Family to Scan */ 068 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; 069 /** Space delimited list of columns and column families to scan. */ 070 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; 071 /** The timestamp used to filter columns with a specific timestamp. */ 072 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; 073 /** The starting timestamp used to filter columns with a specific range of versions. */ 074 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; 075 /** The ending timestamp used to filter columns with a specific range of versions. */ 076 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; 077 /** The maximum number of version to return. */ 078 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; 079 /** Set to false to disable server-side caching of blocks for this scan. */ 080 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; 081 /** The number of rows for caching that will be passed to scanners. */ 082 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; 083 /** Set the maximum number of values to return for each call to next(). */ 084 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; 085 /** Specify if we have to shuffle the map tasks. */ 086 public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps"; 087 088 /** The configuration. */ 089 private Configuration conf = null; 090 091 /** 092 * Returns the current configuration. 093 * @return The current configuration. 094 * @see org.apache.hadoop.conf.Configurable#getConf() 095 */ 096 @Override 097 public Configuration getConf() { 098 return conf; 099 } 100 101 /** 102 * Sets the configuration. This is used to set the details for the table to be scanned. 103 * @param configuration The configuration to set. 104 * @see org.apache.hadoop.conf.Configurable#setConf( org.apache.hadoop.conf.Configuration) 105 */ 106 @Override 107 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "REC_CATCH_EXCEPTION", 108 justification = "Intentional") 109 public void setConf(Configuration configuration) { 110 this.conf = configuration; 111 112 Scan scan = null; 113 114 if (conf.get(SCAN) != null) { 115 try { 116 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); 117 } catch (IOException e) { 118 LOG.error("An error occurred.", e); 119 } 120 } else { 121 try { 122 scan = createScanFromConfiguration(conf); 123 } catch (Exception e) { 124 LOG.error(StringUtils.stringifyException(e)); 125 } 126 } 127 128 setScan(scan); 129 } 130 131 /** 132 * Sets up a {@link Scan} instance, applying settings from the configuration property constants 133 * defined in {@code TableInputFormat}. This allows specifying things such as: 134 * <ul> 135 * <li>start and stop rows</li> 136 * <li>column qualifiers or families</li> 137 * <li>timestamps or timerange</li> 138 * <li>scanner caching and batch size</li> 139 * </ul> 140 */ 141 public static Scan createScanFromConfiguration(Configuration conf) throws IOException { 142 Scan scan = new Scan(); 143 144 if (conf.get(SCAN_ROW_START) != null) { 145 scan.withStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); 146 } 147 148 if (conf.get(SCAN_ROW_STOP) != null) { 149 scan.withStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); 150 } 151 152 if (conf.get(SCAN_COLUMNS) != null) { 153 addColumns(scan, conf.get(SCAN_COLUMNS)); 154 } 155 156 for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) { 157 scan.addFamily(Bytes.toBytes(columnFamily)); 158 } 159 160 if (conf.get(SCAN_TIMESTAMP) != null) { 161 scan.setTimestamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); 162 } 163 164 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { 165 scan.setTimeRange(Long.parseLong(conf.get(SCAN_TIMERANGE_START)), 166 Long.parseLong(conf.get(SCAN_TIMERANGE_END))); 167 } 168 169 if (conf.get(SCAN_MAXVERSIONS) != null) { 170 scan.readVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); 171 } 172 173 if (conf.get(SCAN_CACHEDROWS) != null) { 174 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); 175 } 176 177 if (conf.get(SCAN_BATCHSIZE) != null) { 178 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); 179 } 180 181 // false by default, full table scans generate too much BC churn 182 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); 183 184 return scan; 185 } 186 187 @Override 188 protected void initialize(JobContext context) throws IOException { 189 // Do we have to worry about mis-matches between the Configuration from setConf and the one 190 // in this context? 191 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); 192 try { 193 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); 194 } catch (Exception e) { 195 LOG.error(StringUtils.stringifyException(e)); 196 } 197 } 198 199 /** 200 * Parses a combined family and qualifier and adds either both or just the family in case there is 201 * no qualifier. This assumes the older colon divided notation, e.g. "family:qualifier". 202 * @param scan The Scan to update. 203 * @param familyAndQualifier family and qualifier 204 * @throws IllegalArgumentException When familyAndQualifier is invalid. 205 */ 206 private static void addColumn(Scan scan, byte[] familyAndQualifier) { 207 byte[][] fq = CellUtil.parseColumn(familyAndQualifier); 208 if (fq.length == 1) { 209 scan.addFamily(fq[0]); 210 } else if (fq.length == 2) { 211 scan.addColumn(fq[0], fq[1]); 212 } else { 213 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); 214 } 215 } 216 217 /** 218 * Adds an array of columns specified using old format, family:qualifier. 219 * <p> 220 * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the 221 * input. 222 * @param scan The Scan to update. 223 * @param columns array of columns, formatted as <code>family:qualifier</code> 224 * @see Scan#addColumn(byte[], byte[]) 225 */ 226 public static void addColumns(Scan scan, byte[][] columns) { 227 for (byte[] column : columns) { 228 addColumn(scan, column); 229 } 230 } 231 232 /** 233 * Calculates the splits that will serve as input for the map tasks. The number of splits matches 234 * the number of regions in a table. Splits are shuffled if required. 235 * @param context The current job context. 236 * @return The list of input splits. 237 * @throws IOException When creating the list of splits fails. 238 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 239 */ 240 @Override 241 public List<InputSplit> getSplits(JobContext context) throws IOException { 242 List<InputSplit> splits = super.getSplits(context); 243 if ( 244 (conf.get(SHUFFLE_MAPS) != null) 245 && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT)) 246 ) { 247 Collections.shuffle(splits); 248 } 249 return splits; 250 } 251 252 /** 253 * Convenience method to parse a string representation of an array of column specifiers. 254 * @param scan The Scan to update. 255 * @param columns The columns to parse. 256 */ 257 private static void addColumns(Scan scan, String columns) { 258 String[] cols = columns.split(" "); 259 for (String col : cols) { 260 addColumn(scan, Bytes.toBytes(col)); 261 } 262 } 263 264 @Override 265 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { 266 if (conf.get(SPLIT_TABLE) != null) { 267 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); 268 try (Connection conn = ConnectionFactory.createConnection(getConf())) { 269 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { 270 return rl.getStartEndKeys(); 271 } 272 } 273 } 274 275 return super.getStartEndKeys(); 276 } 277 278 /** 279 * Sets split table in map-reduce job. 280 */ 281 public static void configureSplitTable(Job job, TableName tableName) { 282 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); 283 } 284}