001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.Closeable; 022import java.io.IOException; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.RegionLocator; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.filter.Filter; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.mapred.InputFormat; 036import org.apache.hadoop.mapred.InputSplit; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.RecordReader; 039import org.apache.hadoop.mapred.Reporter; 040 041/** 042 * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a 043 * byte[] of input columns and optionally a {@link Filter}. 044 * Subclasses may use other TableRecordReader implementations. 045 * 046 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to 047 * function properly. Each of the entry points to this class used by the MapReduce framework, 048 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, 049 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle 050 * retrieving the necessary configuration information. If your subclass overrides either of these 051 * methods, either call the parent version or call initialize yourself. 052 * 053 * <p> 054 * An example of a subclass: 055 * <pre> 056 * class ExampleTIF extends TableInputFormatBase { 057 * 058 * {@literal @}Override 059 * protected void initialize(JobConf context) throws IOException { 060 * // We are responsible for the lifecycle of this connection until we hand it over in 061 * // initializeTable. 062 * Connection connection = 063 * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 064 * TableName tableName = TableName.valueOf("exampleTable"); 065 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 066 * initializeTable(connection, tableName); 067 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 068 * Bytes.toBytes("columnB") }; 069 * // mandatory 070 * setInputColumns(inputColumns); 071 * // optional, by default we'll get everything for the given columns. 072 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 073 * setRowFilter(exampleFilter); 074 * } 075 * } 076 * </pre> 077 */ 078 079@InterfaceAudience.Public 080public abstract class TableInputFormatBase 081implements InputFormat<ImmutableBytesWritable, Result> { 082 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 083 private byte [][] inputColumns; 084 private Table table; 085 private RegionLocator regionLocator; 086 private Connection connection; 087 private TableRecordReader tableRecordReader; 088 private Filter rowFilter; 089 090 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + 091 "initialized. Ensure you call initializeTable either in your constructor or initialize " + 092 "method"; 093 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + 094 " previous error. Please look at the previous logs lines from" + 095 " the task's full log for more details."; 096 097 /** 098 * Builds a TableRecordReader. If no TableRecordReader was provided, uses 099 * the default. 100 * 101 * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, 102 * JobConf, Reporter) 103 */ 104 public RecordReader<ImmutableBytesWritable, Result> getRecordReader( 105 InputSplit split, JobConf job, Reporter reporter) 106 throws IOException { 107 // In case a subclass uses the deprecated approach or calls initializeTable directly 108 if (table == null) { 109 initialize(job); 110 } 111 // null check in case our child overrides getTable to not throw. 112 try { 113 if (getTable() == null) { 114 // initialize() must not have been implemented in the subclass. 115 throw new IOException(INITIALIZATION_ERROR); 116 } 117 } catch (IllegalStateException exception) { 118 throw new IOException(INITIALIZATION_ERROR, exception); 119 } 120 121 TableSplit tSplit = (TableSplit) split; 122 // if no table record reader was provided use default 123 final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : 124 this.tableRecordReader; 125 trr.setStartRow(tSplit.getStartRow()); 126 trr.setEndRow(tSplit.getEndRow()); 127 trr.setHTable(this.table); 128 trr.setInputColumns(this.inputColumns); 129 trr.setRowFilter(this.rowFilter); 130 trr.init(); 131 return new RecordReader<ImmutableBytesWritable, Result>() { 132 133 @Override 134 public void close() throws IOException { 135 trr.close(); 136 closeTable(); 137 } 138 139 @Override 140 public ImmutableBytesWritable createKey() { 141 return trr.createKey(); 142 } 143 144 @Override 145 public Result createValue() { 146 return trr.createValue(); 147 } 148 149 @Override 150 public long getPos() throws IOException { 151 return trr.getPos(); 152 } 153 154 @Override 155 public float getProgress() throws IOException { 156 return trr.getProgress(); 157 } 158 159 @Override 160 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 161 return trr.next(key, value); 162 } 163 }; 164 } 165 166 /** 167 * Calculates the splits that will serve as input for the map tasks. 168 * 169 * Splits are created in number equal to the smallest between numSplits and 170 * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. 171 * If the number of splits is smaller than the number of 172 * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across 173 * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s 174 * and are grouped the most evenly possible. In the 175 * case splits are uneven the bigger splits are placed first in the 176 * {@link InputSplit} array. 177 * 178 * @param job the map task {@link JobConf} 179 * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). 180 * 181 * @return the input splits 182 * 183 * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) 184 */ 185 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 186 if (this.table == null) { 187 initialize(job); 188 } 189 // null check in case our child overrides getTable to not throw. 190 try { 191 if (getTable() == null) { 192 // initialize() must not have been implemented in the subclass. 193 throw new IOException(INITIALIZATION_ERROR); 194 } 195 } catch (IllegalStateException exception) { 196 throw new IOException(INITIALIZATION_ERROR, exception); 197 } 198 199 byte [][] startKeys = this.regionLocator.getStartKeys(); 200 if (startKeys == null || startKeys.length == 0) { 201 throw new IOException("Expecting at least one region"); 202 } 203 if (this.inputColumns == null || this.inputColumns.length == 0) { 204 throw new IOException("Expecting at least one column"); 205 } 206 int realNumSplits = numSplits > startKeys.length? startKeys.length: 207 numSplits; 208 InputSplit[] splits = new InputSplit[realNumSplits]; 209 int middle = startKeys.length / realNumSplits; 210 int startPos = 0; 211 for (int i = 0; i < realNumSplits; i++) { 212 int lastPos = startPos + middle; 213 lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; 214 String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]). 215 getHostname(); 216 splits[i] = new TableSplit(this.table.getName(), 217 startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: 218 HConstants.EMPTY_START_ROW, regionLocation); 219 LOG.info("split: " + i + "->" + splits[i]); 220 startPos = lastPos; 221 } 222 return splits; 223 } 224 225 /** 226 * Allows subclasses to initialize the table information. 227 * 228 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 229 * @param tableName The {@link TableName} of the table to process. 230 * @throws IOException 231 */ 232 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 233 if (this.table != null || this.connection != null) { 234 LOG.warn("initializeTable called multiple times. Overwriting connection and table " + 235 "reference; TableInputFormatBase will not close these old references when done."); 236 } 237 this.table = connection.getTable(tableName); 238 this.regionLocator = connection.getRegionLocator(tableName); 239 this.connection = connection; 240 } 241 242 /** 243 * @param inputColumns to be passed in {@link Result} to the map task. 244 */ 245 protected void setInputColumns(byte [][] inputColumns) { 246 this.inputColumns = inputColumns; 247 } 248 249 /** 250 * Allows subclasses to get the {@link Table}. 251 */ 252 protected Table getTable() { 253 if (table == null) { 254 throw new IllegalStateException(NOT_INITIALIZED); 255 } 256 return this.table; 257 } 258 259 /** 260 * Allows subclasses to set the {@link TableRecordReader}. 261 * 262 * @param tableRecordReader 263 * to provide other {@link TableRecordReader} implementations. 264 */ 265 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 266 this.tableRecordReader = tableRecordReader; 267 } 268 269 /** 270 * Allows subclasses to set the {@link Filter} to be used. 271 * 272 * @param rowFilter 273 */ 274 protected void setRowFilter(Filter rowFilter) { 275 this.rowFilter = rowFilter; 276 } 277 278 /** 279 * Handle subclass specific set up. 280 * Each of the entry points used by the MapReduce framework, 281 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, 282 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle 283 * retrieving the necessary configuration information and calling 284 * {@link #initializeTable(Connection, TableName)}. 285 * 286 * Subclasses should implement their initialize call such that it is safe to call multiple times. 287 * The current TableInputFormatBase implementation relies on a non-null table reference to decide 288 * if an initialize call is needed, but this behavior may change in the future. In particular, 289 * it is critical that initializeTable not be called multiple times since this will leak 290 * Connection instances. 291 * 292 */ 293 protected void initialize(JobConf job) throws IOException { 294 } 295 296 /** 297 * Close the Table and related objects that were initialized via 298 * {@link #initializeTable(Connection, TableName)}. 299 * 300 * @throws IOException 301 */ 302 protected void closeTable() throws IOException { 303 close(table, connection); 304 table = null; 305 connection = null; 306 } 307 308 private void close(Closeable... closables) throws IOException { 309 for (Closeable c : closables) { 310 if(c != null) { c.close(); } 311 } 312 } 313}