001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.Closeable; 021import java.io.IOException; 022import org.apache.hadoop.hbase.HConstants; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.Connection; 025import org.apache.hadoop.hbase.client.RegionLocator; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.Table; 028import org.apache.hadoop.hbase.filter.Filter; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.mapred.InputFormat; 031import org.apache.hadoop.mapred.InputSplit; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.RecordReader; 034import org.apache.hadoop.mapred.Reporter; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a byte[] of input columns and 041 * optionally a {@link Filter}. Subclasses may use other TableRecordReader implementations. 042 * <p/> 043 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to 044 * function properly. Each of the entry points to this class used by the MapReduce framework, 045 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, 046 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle retrieving 047 * the necessary configuration information. If your subclass overrides either of these methods, 048 * either call the parent version or call initialize yourself. 049 * <p> 050 * An example of a subclass: 051 * 052 * <pre> 053 * class ExampleTIF extends TableInputFormatBase { 054 * 055 * {@literal @}Override 056 * protected void initialize(JobConf context) throws IOException { 057 * // We are responsible for the lifecycle of this connection until we hand it over in 058 * // initializeTable. 059 * Connection connection = 060 * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 061 * TableName tableName = TableName.valueOf("exampleTable"); 062 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 063 * initializeTable(connection, tableName); 064 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 065 * Bytes.toBytes("columnB") }; 066 * // mandatory 067 * setInputColumns(inputColumns); 068 * // optional, by default we'll get everything for the given columns. 069 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 070 * setRowFilter(exampleFilter); 071 * } 072 * } 073 * </pre> 074 */ 075 076@InterfaceAudience.Public 077public abstract class TableInputFormatBase implements InputFormat<ImmutableBytesWritable, Result> { 078 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 079 private byte[][] inputColumns; 080 private Table table; 081 private RegionLocator regionLocator; 082 private Connection connection; 083 private TableRecordReader tableRecordReader; 084 private Filter rowFilter; 085 086 private static final String NOT_INITIALIZED = "The input format instance has not been properly " 087 + "initialized. Ensure you call initializeTable either in your constructor or initialize " 088 + "method"; 089 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" 090 + " previous error. Please look at the previous logs lines from" 091 + " the task's full log for more details."; 092 093 /** 094 * Builds a TableRecordReader. If no TableRecordReader was provided, uses the default. 095 * @see InputFormat#getRecordReader(InputSplit, JobConf, Reporter) 096 */ 097 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 098 Reporter reporter) throws IOException { 099 // In case a subclass uses the deprecated approach or calls initializeTable directly 100 if (table == null) { 101 initialize(job); 102 } 103 // null check in case our child overrides getTable to not throw. 104 try { 105 if (getTable() == null) { 106 // initialize() must not have been implemented in the subclass. 107 throw new IOException(INITIALIZATION_ERROR); 108 } 109 } catch (IllegalStateException exception) { 110 throw new IOException(INITIALIZATION_ERROR, exception); 111 } 112 113 TableSplit tSplit = (TableSplit) split; 114 // if no table record reader was provided use default 115 final TableRecordReader trr = 116 this.tableRecordReader == null ? new TableRecordReader() : this.tableRecordReader; 117 trr.setStartRow(tSplit.getStartRow()); 118 trr.setEndRow(tSplit.getEndRow()); 119 trr.setHTable(this.table); 120 trr.setInputColumns(this.inputColumns); 121 trr.setRowFilter(this.rowFilter); 122 trr.init(); 123 return new RecordReader<ImmutableBytesWritable, Result>() { 124 125 @Override 126 public void close() throws IOException { 127 trr.close(); 128 closeTable(); 129 } 130 131 @Override 132 public ImmutableBytesWritable createKey() { 133 return trr.createKey(); 134 } 135 136 @Override 137 public Result createValue() { 138 return trr.createValue(); 139 } 140 141 @Override 142 public long getPos() throws IOException { 143 return trr.getPos(); 144 } 145 146 @Override 147 public float getProgress() throws IOException { 148 return trr.getProgress(); 149 } 150 151 @Override 152 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 153 return trr.next(key, value); 154 } 155 }; 156 } 157 158 /** 159 * Calculates the splits that will serve as input for the map tasks. 160 * <p/> 161 * Splits are created in number equal to the smallest between numSplits and the number of 162 * {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. If the number of splits is 163 * smaller than the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits 164 * are spanned across multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s and are 165 * grouped the most evenly possible. In the case splits are uneven the bigger splits are placed 166 * first in the {@link InputSplit} array. 167 * @param job the map task {@link JobConf} 168 * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). 169 * @return the input splits 170 * @see InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) 171 */ 172 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 173 if (this.table == null) { 174 initialize(job); 175 } 176 // null check in case our child overrides getTable to not throw. 177 try { 178 if (getTable() == null) { 179 // initialize() must not have been implemented in the subclass. 180 throw new IOException(INITIALIZATION_ERROR); 181 } 182 } catch (IllegalStateException exception) { 183 throw new IOException(INITIALIZATION_ERROR, exception); 184 } 185 186 byte[][] startKeys = this.regionLocator.getStartKeys(); 187 if (startKeys == null || startKeys.length == 0) { 188 throw new IOException("Expecting at least one region"); 189 } 190 if (this.inputColumns == null || this.inputColumns.length == 0) { 191 throw new IOException("Expecting at least one column"); 192 } 193 int realNumSplits = numSplits > startKeys.length ? startKeys.length : numSplits; 194 InputSplit[] splits = new InputSplit[realNumSplits]; 195 int middle = startKeys.length / realNumSplits; 196 int startPos = 0; 197 for (int i = 0; i < realNumSplits; i++) { 198 int lastPos = startPos + middle; 199 lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; 200 String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).getHostname(); 201 splits[i] = new TableSplit(this.table.getName(), startKeys[startPos], 202 ((i + 1) < realNumSplits) ? startKeys[lastPos] : HConstants.EMPTY_START_ROW, 203 regionLocation); 204 LOG.info("split: " + i + "->" + splits[i]); 205 startPos = lastPos; 206 } 207 return splits; 208 } 209 210 /** 211 * Allows subclasses to initialize the table information. 212 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 213 * @param tableName The {@link TableName} of the table to process. 214 */ 215 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 216 if (this.table != null || this.connection != null) { 217 LOG.warn("initializeTable called multiple times. Overwriting connection and table " 218 + "reference; TableInputFormatBase will not close these old references when done."); 219 } 220 this.table = connection.getTable(tableName); 221 this.regionLocator = connection.getRegionLocator(tableName); 222 this.connection = connection; 223 } 224 225 /** 226 * @param inputColumns to be passed in {@link Result} to the map task. 227 */ 228 protected void setInputColumns(byte[][] inputColumns) { 229 this.inputColumns = inputColumns; 230 } 231 232 /** 233 * Allows subclasses to get the {@link Table}. 234 */ 235 protected Table getTable() { 236 if (table == null) { 237 throw new IllegalStateException(NOT_INITIALIZED); 238 } 239 return this.table; 240 } 241 242 /** 243 * Allows subclasses to set the {@link TableRecordReader}. to provide other 244 * {@link TableRecordReader} implementations. 245 */ 246 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 247 this.tableRecordReader = tableRecordReader; 248 } 249 250 /** 251 * Allows subclasses to set the {@link Filter} to be used. 252 */ 253 protected void setRowFilter(Filter rowFilter) { 254 this.rowFilter = rowFilter; 255 } 256 257 /** 258 * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, 259 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, 260 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle 261 * retrieving the necessary configuration information and calling 262 * {@link #initializeTable(Connection, TableName)}. 263 * <p/> 264 * Subclasses should implement their initialize call such that it is safe to call multiple times. 265 * The current TableInputFormatBase implementation relies on a non-null table reference to decide 266 * if an initialize call is needed, but this behavior may change in the future. In particular, it 267 * is critical that initializeTable not be called multiple times since this will leak Connection 268 * instances. 269 */ 270 protected void initialize(JobConf job) throws IOException { 271 } 272 273 /** 274 * Close the Table and related objects that were initialized via 275 * {@link #initializeTable(Connection, TableName)}. 276 */ 277 protected void closeTable() throws IOException { 278 close(table, connection); 279 table = null; 280 connection = null; 281 } 282 283 private void close(Closeable... closables) throws IOException { 284 for (Closeable c : closables) { 285 if (c != null) { 286 c.close(); 287 } 288 } 289 } 290}