001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.Closeable;
021import java.io.IOException;
022import org.apache.hadoop.hbase.HConstants;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.RegionLocator;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.Table;
028import org.apache.hadoop.hbase.filter.Filter;
029import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
030import org.apache.hadoop.mapred.InputFormat;
031import org.apache.hadoop.mapred.InputSplit;
032import org.apache.hadoop.mapred.JobConf;
033import org.apache.hadoop.mapred.RecordReader;
034import org.apache.hadoop.mapred.Reporter;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a byte[] of input columns and
041 * optionally a {@link Filter}. Subclasses may use other TableRecordReader implementations.
042 * <p/>
043 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
044 * function properly. Each of the entry points to this class used by the MapReduce framework,
045 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
046 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle retrieving
047 * the necessary configuration information. If your subclass overrides either of these methods,
048 * either call the parent version or call initialize yourself.
049 * <p>
050 * An example of a subclass:
051 *
052 * <pre>
053 *   class ExampleTIF extends TableInputFormatBase {
054 *
055 *     {@literal @}Override
056 *     protected void initialize(JobConf context) throws IOException {
057 *       // We are responsible for the lifecycle of this connection until we hand it over in
058 *       // initializeTable.
059 *       Connection connection =
060 *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
061 *       TableName tableName = TableName.valueOf("exampleTable");
062 *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
063 *       initializeTable(connection, tableName);
064 *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
065 *         Bytes.toBytes("columnB") };
066 *       // mandatory
067 *       setInputColumns(inputColumns);
068 *       // optional, by default we'll get everything for the given columns.
069 *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
070 *       setRowFilter(exampleFilter);
071 *     }
072 *   }
073 * </pre>
074 */
075
076@InterfaceAudience.Public
077public abstract class TableInputFormatBase implements InputFormat<ImmutableBytesWritable, Result> {
078  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class);
079  private byte[][] inputColumns;
080  private Table table;
081  private RegionLocator regionLocator;
082  private Connection connection;
083  private TableRecordReader tableRecordReader;
084  private Filter rowFilter;
085
086  private static final String NOT_INITIALIZED = "The input format instance has not been properly "
087    + "initialized. Ensure you call initializeTable either in your constructor or initialize "
088    + "method";
089  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a"
090    + " previous error. Please look at the previous logs lines from"
091    + " the task's full log for more details.";
092
093  /**
094   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the default.
095   * @see InputFormat#getRecordReader(InputSplit, JobConf, Reporter)
096   */
097  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
098    Reporter reporter) throws IOException {
099    // In case a subclass uses the deprecated approach or calls initializeTable directly
100    if (table == null) {
101      initialize(job);
102    }
103    // null check in case our child overrides getTable to not throw.
104    try {
105      if (getTable() == null) {
106        // initialize() must not have been implemented in the subclass.
107        throw new IOException(INITIALIZATION_ERROR);
108      }
109    } catch (IllegalStateException exception) {
110      throw new IOException(INITIALIZATION_ERROR, exception);
111    }
112
113    TableSplit tSplit = (TableSplit) split;
114    // if no table record reader was provided use default
115    final TableRecordReader trr =
116      this.tableRecordReader == null ? new TableRecordReader() : this.tableRecordReader;
117    trr.setStartRow(tSplit.getStartRow());
118    trr.setEndRow(tSplit.getEndRow());
119    trr.setHTable(this.table);
120    trr.setInputColumns(this.inputColumns);
121    trr.setRowFilter(this.rowFilter);
122    trr.init();
123    return new RecordReader<ImmutableBytesWritable, Result>() {
124
125      @Override
126      public void close() throws IOException {
127        trr.close();
128        closeTable();
129      }
130
131      @Override
132      public ImmutableBytesWritable createKey() {
133        return trr.createKey();
134      }
135
136      @Override
137      public Result createValue() {
138        return trr.createValue();
139      }
140
141      @Override
142      public long getPos() throws IOException {
143        return trr.getPos();
144      }
145
146      @Override
147      public float getProgress() throws IOException {
148        return trr.getProgress();
149      }
150
151      @Override
152      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
153        return trr.next(key, value);
154      }
155    };
156  }
157
158  /**
159   * Calculates the splits that will serve as input for the map tasks.
160   * <p/>
161   * Splits are created in number equal to the smallest between numSplits and the number of
162   * {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. If the number of splits is
163   * smaller than the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits
164   * are spanned across multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s and are
165   * grouped the most evenly possible. In the case splits are uneven the bigger splits are placed
166   * first in the {@link InputSplit} array.
167   * @param job       the map task {@link JobConf}
168   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
169   * @return the input splits
170   * @see InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
171   */
172  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
173    if (this.table == null) {
174      initialize(job);
175    }
176    // null check in case our child overrides getTable to not throw.
177    try {
178      if (getTable() == null) {
179        // initialize() must not have been implemented in the subclass.
180        throw new IOException(INITIALIZATION_ERROR);
181      }
182    } catch (IllegalStateException exception) {
183      throw new IOException(INITIALIZATION_ERROR, exception);
184    }
185
186    byte[][] startKeys = this.regionLocator.getStartKeys();
187    if (startKeys == null || startKeys.length == 0) {
188      throw new IOException("Expecting at least one region");
189    }
190    if (this.inputColumns == null || this.inputColumns.length == 0) {
191      throw new IOException("Expecting at least one column");
192    }
193    int realNumSplits = numSplits > startKeys.length ? startKeys.length : numSplits;
194    InputSplit[] splits = new InputSplit[realNumSplits];
195    int middle = startKeys.length / realNumSplits;
196    int startPos = 0;
197    for (int i = 0; i < realNumSplits; i++) {
198      int lastPos = startPos + middle;
199      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
200      String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).getHostname();
201      splits[i] = new TableSplit(this.table.getName(), startKeys[startPos],
202        ((i + 1) < realNumSplits) ? startKeys[lastPos] : HConstants.EMPTY_START_ROW,
203        regionLocation);
204      LOG.info("split: " + i + "->" + splits[i]);
205      startPos = lastPos;
206    }
207    return splits;
208  }
209
210  /**
211   * Allows subclasses to initialize the table information.
212   * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
213   * @param tableName  The {@link TableName} of the table to process. n
214   */
215  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
216    if (this.table != null || this.connection != null) {
217      LOG.warn("initializeTable called multiple times. Overwriting connection and table "
218        + "reference; TableInputFormatBase will not close these old references when done.");
219    }
220    this.table = connection.getTable(tableName);
221    this.regionLocator = connection.getRegionLocator(tableName);
222    this.connection = connection;
223  }
224
225  /**
226   * @param inputColumns to be passed in {@link Result} to the map task.
227   */
228  protected void setInputColumns(byte[][] inputColumns) {
229    this.inputColumns = inputColumns;
230  }
231
232  /**
233   * Allows subclasses to get the {@link Table}.
234   */
235  protected Table getTable() {
236    if (table == null) {
237      throw new IllegalStateException(NOT_INITIALIZED);
238    }
239    return this.table;
240  }
241
242  /**
243   * Allows subclasses to set the {@link TableRecordReader}. n * to provide other
244   * {@link TableRecordReader} implementations.
245   */
246  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
247    this.tableRecordReader = tableRecordReader;
248  }
249
250  /**
251   * Allows subclasses to set the {@link Filter} to be used. n
252   */
253  protected void setRowFilter(Filter rowFilter) {
254    this.rowFilter = rowFilter;
255  }
256
257  /**
258   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
259   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
260   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
261   * retrieving the necessary configuration information and calling
262   * {@link #initializeTable(Connection, TableName)}.
263   * <p/>
264   * Subclasses should implement their initialize call such that it is safe to call multiple times.
265   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
266   * if an initialize call is needed, but this behavior may change in the future. In particular, it
267   * is critical that initializeTable not be called multiple times since this will leak Connection
268   * instances.
269   */
270  protected void initialize(JobConf job) throws IOException {
271  }
272
273  /**
274   * Close the Table and related objects that were initialized via
275   * {@link #initializeTable(Connection, TableName)}. n
276   */
277  protected void closeTable() throws IOException {
278    close(table, connection);
279    table = null;
280    connection = null;
281  }
282
283  private void close(Closeable... closables) throws IOException {
284    for (Closeable c : closables) {
285      if (c != null) {
286        c.close();
287      }
288    }
289  }
290}