001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import java.io.Closeable;
022import java.io.IOException;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.filter.Filter;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.mapred.InputFormat;
036import org.apache.hadoop.mapred.InputSplit;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.RecordReader;
039import org.apache.hadoop.mapred.Reporter;
040
041/**
042 * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a
043 * byte[] of input columns and optionally a {@link Filter}.
044 * Subclasses may use other TableRecordReader implementations.
045 *
046 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
047 * function properly. Each of the entry points to this class used by the MapReduce framework,
048 * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
049 * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
050 * retrieving the necessary configuration information. If your subclass overrides either of these
051 * methods, either call the parent version or call initialize yourself.
052 *
053 * <p>
054 * An example of a subclass:
055 * <pre>
056 *   class ExampleTIF extends TableInputFormatBase {
057 *
058 *     {@literal @}Override
059 *     protected void initialize(JobConf context) throws IOException {
060 *       // We are responsible for the lifecycle of this connection until we hand it over in
061 *       // initializeTable.
062 *       Connection connection =
063 *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
064 *       TableName tableName = TableName.valueOf("exampleTable");
065 *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
066 *       initializeTable(connection, tableName);
067 *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
068 *         Bytes.toBytes("columnB") };
069 *       // mandatory
070 *       setInputColumns(inputColumns);
071 *       // optional, by default we'll get everything for the given columns.
072 *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
073 *       setRowFilter(exampleFilter);
074 *     }
075 *   }
076 * </pre>
077 */
078
079@InterfaceAudience.Public
080public abstract class TableInputFormatBase
081implements InputFormat<ImmutableBytesWritable, Result> {
082  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class);
083  private byte [][] inputColumns;
084  private Table table;
085  private RegionLocator regionLocator;
086  private Connection connection;
087  private TableRecordReader tableRecordReader;
088  private Filter rowFilter;
089
090  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
091      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
092      "method";
093  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
094            " previous error. Please look at the previous logs lines from" +
095            " the task's full log for more details.";
096
097  /**
098   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
099   * the default.
100   *
101   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
102   *      JobConf, Reporter)
103   */
104  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
105      InputSplit split, JobConf job, Reporter reporter)
106  throws IOException {
107    // In case a subclass uses the deprecated approach or calls initializeTable directly
108    if (table == null) {
109      initialize(job);
110    }
111    // null check in case our child overrides getTable to not throw.
112    try {
113      if (getTable() == null) {
114        // initialize() must not have been implemented in the subclass.
115        throw new IOException(INITIALIZATION_ERROR);
116      }
117    } catch (IllegalStateException exception) {
118      throw new IOException(INITIALIZATION_ERROR, exception);
119    }
120
121    TableSplit tSplit = (TableSplit) split;
122    // if no table record reader was provided use default
123    final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
124        this.tableRecordReader;
125    trr.setStartRow(tSplit.getStartRow());
126    trr.setEndRow(tSplit.getEndRow());
127    trr.setHTable(this.table);
128    trr.setInputColumns(this.inputColumns);
129    trr.setRowFilter(this.rowFilter);
130    trr.init();
131    return new RecordReader<ImmutableBytesWritable, Result>() {
132
133      @Override
134      public void close() throws IOException {
135        trr.close();
136        closeTable();
137      }
138
139      @Override
140      public ImmutableBytesWritable createKey() {
141        return trr.createKey();
142      }
143
144      @Override
145      public Result createValue() {
146        return trr.createValue();
147      }
148
149      @Override
150      public long getPos() throws IOException {
151        return trr.getPos();
152      }
153
154      @Override
155      public float getProgress() throws IOException {
156        return trr.getProgress();
157      }
158
159      @Override
160      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
161        return trr.next(key, value);
162      }
163    };
164  }
165
166  /**
167   * Calculates the splits that will serve as input for the map tasks.
168   *
169   * Splits are created in number equal to the smallest between numSplits and
170   * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
171   * If the number of splits is smaller than the number of
172   * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
173   * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
174   * and are grouped the most evenly possible. In the
175   * case splits are uneven the bigger splits are placed first in the
176   * {@link InputSplit} array.
177   *
178   * @param job the map task {@link JobConf}
179   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
180   *
181   * @return the input splits
182   *
183   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
184   */
185  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
186    if (this.table == null) {
187      initialize(job);
188    }
189    // null check in case our child overrides getTable to not throw.
190    try {
191      if (getTable() == null) {
192        // initialize() must not have been implemented in the subclass.
193        throw new IOException(INITIALIZATION_ERROR);
194      }
195    } catch (IllegalStateException exception) {
196      throw new IOException(INITIALIZATION_ERROR, exception);
197    }
198
199    byte [][] startKeys = this.regionLocator.getStartKeys();
200    if (startKeys == null || startKeys.length == 0) {
201      throw new IOException("Expecting at least one region");
202    }
203    if (this.inputColumns == null || this.inputColumns.length == 0) {
204      throw new IOException("Expecting at least one column");
205    }
206    int realNumSplits = numSplits > startKeys.length? startKeys.length:
207      numSplits;
208    InputSplit[] splits = new InputSplit[realNumSplits];
209    int middle = startKeys.length / realNumSplits;
210    int startPos = 0;
211    for (int i = 0; i < realNumSplits; i++) {
212      int lastPos = startPos + middle;
213      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
214      String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
215        getHostname();
216      splits[i] = new TableSplit(this.table.getName(),
217        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
218          HConstants.EMPTY_START_ROW, regionLocation);
219      LOG.info("split: " + i + "->" + splits[i]);
220      startPos = lastPos;
221    }
222    return splits;
223  }
224
225  /**
226   * Allows subclasses to initialize the table information.
227   *
228   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
229   * @param tableName  The {@link TableName} of the table to process.
230   * @throws IOException
231   */
232  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
233    if (this.table != null || this.connection != null) {
234      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
235          "reference; TableInputFormatBase will not close these old references when done.");
236    }
237    this.table = connection.getTable(tableName);
238    this.regionLocator = connection.getRegionLocator(tableName);
239    this.connection = connection;
240  }
241
242  /**
243   * @param inputColumns to be passed in {@link Result} to the map task.
244   */
245  protected void setInputColumns(byte [][] inputColumns) {
246    this.inputColumns = inputColumns;
247  }
248
249  /**
250   * Allows subclasses to get the {@link Table}.
251   */
252  protected Table getTable() {
253    if (table == null) {
254      throw new IllegalStateException(NOT_INITIALIZED);
255    }
256    return this.table;
257  }
258
259  /**
260   * Allows subclasses to set the {@link TableRecordReader}.
261   *
262   * @param tableRecordReader
263   *                to provide other {@link TableRecordReader} implementations.
264   */
265  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
266    this.tableRecordReader = tableRecordReader;
267  }
268
269  /**
270   * Allows subclasses to set the {@link Filter} to be used.
271   *
272   * @param rowFilter
273   */
274  protected void setRowFilter(Filter rowFilter) {
275    this.rowFilter = rowFilter;
276  }
277
278  /**
279   * Handle subclass specific set up.
280   * Each of the entry points used by the MapReduce framework,
281   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
282   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
283   * retrieving the necessary configuration information and calling
284   * {@link #initializeTable(Connection, TableName)}.
285   *
286   * Subclasses should implement their initialize call such that it is safe to call multiple times.
287   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
288   * if an initialize call is needed, but this behavior may change in the future. In particular,
289   * it is critical that initializeTable not be called multiple times since this will leak
290   * Connection instances.
291   *
292   */
293  protected void initialize(JobConf job) throws IOException {
294  }
295
296  /**
297   * Close the Table and related objects that were initialized via
298   * {@link #initializeTable(Connection, TableName)}.
299   *
300   * @throws IOException
301   */
302  protected void closeTable() throws IOException {
303    close(table, connection);
304    table = null;
305    connection = null;
306  }
307
308  private void close(Closeable... closables) throws IOException {
309    for (Closeable c : closables) {
310      if(c != null) { c.close(); }
311    }
312  }
313}