View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Connection;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.RegionLocator;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.filter.Filter;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.mapred.InputFormat;
38  import org.apache.hadoop.mapred.InputSplit;
39  import org.apache.hadoop.mapred.JobConf;
40  import org.apache.hadoop.mapred.RecordReader;
41  import org.apache.hadoop.mapred.Reporter;
42  
43  /**
44   * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
45   * byte[] of input columns and optionally a {@link Filter}.
46   * Subclasses may use other TableRecordReader implementations.
47   *
48   * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
49   * function properly. Each of the entry points to this class used by the MapReduce framework,
50   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
51   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
52   * retrieving the necessary configuration information. If your subclass overrides either of these
53   * methods, either call the parent version or call initialize yourself.
54   *
55   * <p>
56   * An example of a subclass:
57   * <pre>
58   *   class ExampleTIF extends TableInputFormatBase {
59   *
60   *     {@literal @}Override
61   *     protected void initialize(JobConf context) throws IOException {
62   *       // We are responsible for the lifecycle of this connection until we hand it over in
63   *       // initializeTable.
64   *       Connection connection =
65   *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
66   *       TableName tableName = TableName.valueOf("exampleTable");
67   *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
68   *       initializeTable(connection, tableName);
69   *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
70   *         Bytes.toBytes("columnB") };
71   *       // mandatory
72   *       setInputColumns(inputColumns);
73   *       // optional, by default we'll get everything for the given columns.
74   *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
75   *       setRowFilter(exampleFilter);
76   *     }
77   *   }
78   * </pre>
79   */
80  
81  @InterfaceAudience.Public
82  @InterfaceStability.Stable
83  public abstract class TableInputFormatBase
84  implements InputFormat<ImmutableBytesWritable, Result> {
85    private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
86    private byte [][] inputColumns;
87    private Table table;
88    private RegionLocator regionLocator;
89    private Connection connection;
90    private TableRecordReader tableRecordReader;
91    private Filter rowFilter;
92  
93    private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
94        "initialized. Ensure you call initializeTable either in your constructor or initialize " +
95        "method";
96    private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
97              " previous error. Please look at the previous logs lines from" +
98              " the task's full log for more details.";
99  
100   /**
101    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
102    * the default.
103    *
104    * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
105    *      JobConf, Reporter)
106    */
107   public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
108       InputSplit split, JobConf job, Reporter reporter)
109   throws IOException {
110     // In case a subclass uses the deprecated approach or calls initializeTable directly
111     if (table == null) {
112       initialize(job);
113     }
114     // null check in case our child overrides getTable to not throw.
115     try {
116       if (getTable() == null) {
117         // initialize() must not have been implemented in the subclass.
118         throw new IOException(INITIALIZATION_ERROR);
119       }
120     } catch (IllegalStateException exception) {
121       throw new IOException(INITIALIZATION_ERROR, exception);
122     }
123 
124     TableSplit tSplit = (TableSplit) split;
125     // if no table record reader was provided use default
126     final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
127         this.tableRecordReader;
128     trr.setStartRow(tSplit.getStartRow());
129     trr.setEndRow(tSplit.getEndRow());
130     trr.setHTable(this.table);
131     trr.setInputColumns(this.inputColumns);
132     trr.setRowFilter(this.rowFilter);
133     trr.init();
134     return new RecordReader<ImmutableBytesWritable, Result>() {
135 
136       @Override
137       public void close() throws IOException {
138         trr.close();
139         closeTable();
140       }
141 
142       @Override
143       public ImmutableBytesWritable createKey() {
144         return trr.createKey();
145       }
146 
147       @Override
148       public Result createValue() {
149         return trr.createValue();
150       }
151 
152       @Override
153       public long getPos() throws IOException {
154         return trr.getPos();
155       }
156 
157       @Override
158       public float getProgress() throws IOException {
159         return trr.getProgress();
160       }
161 
162       @Override
163       public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
164         return trr.next(key, value);
165       }
166     };
167   }
168 
169   /**
170    * Calculates the splits that will serve as input for the map tasks.
171    * <ul>
172    * Splits are created in number equal to the smallest between numSplits and
173    * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. 
174    * If the number of splits is smaller than the number of 
175    * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
176    * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s 
177    * and are grouped the most evenly possible. In the
178    * case splits are uneven the bigger splits are placed first in the
179    * {@link InputSplit} array.
180    *
181    * @param job the map task {@link JobConf}
182    * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
183    *
184    * @return the input splits
185    *
186    * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
187    */
188   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
189     if (this.table == null) {
190       initialize(job);
191     }
192     // null check in case our child overrides getTable to not throw.
193     try {
194       if (getTable() == null) {
195         // initialize() must not have been implemented in the subclass.
196         throw new IOException(INITIALIZATION_ERROR);
197       }
198     } catch (IllegalStateException exception) {
199       throw new IOException(INITIALIZATION_ERROR, exception);
200     }
201 
202     byte [][] startKeys = this.regionLocator.getStartKeys();
203     if (startKeys == null || startKeys.length == 0) {
204       throw new IOException("Expecting at least one region");
205     }
206     if (this.inputColumns == null || this.inputColumns.length == 0) {
207       throw new IOException("Expecting at least one column");
208     }
209     int realNumSplits = numSplits > startKeys.length? startKeys.length:
210       numSplits;
211     InputSplit[] splits = new InputSplit[realNumSplits];
212     int middle = startKeys.length / realNumSplits;
213     int startPos = 0;
214     for (int i = 0; i < realNumSplits; i++) {
215       int lastPos = startPos + middle;
216       lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
217       String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
218         getHostname();
219       splits[i] = new TableSplit(this.table.getName(),
220         startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
221           HConstants.EMPTY_START_ROW, regionLocation);
222       LOG.info("split: " + i + "->" + splits[i]);
223       startPos = lastPos;
224     }
225     return splits;
226   }
227 
228   /**
229    * Allows subclasses to initialize the table information.
230    *
231    * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
232    * @param tableName  The {@link TableName} of the table to process.
233    * @throws IOException
234    */
235   protected void initializeTable(Connection connection, TableName tableName) throws IOException {
236     if (this.table != null || this.connection != null) {
237       LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
238           "reference; TableInputFormatBase will not close these old references when done.");
239     }
240     this.table = connection.getTable(tableName);
241     this.regionLocator = connection.getRegionLocator(tableName);
242     this.connection = connection;
243   }
244 
245   /**
246    * @param inputColumns to be passed in {@link Result} to the map task.
247    */
248   protected void setInputColumns(byte [][] inputColumns) {
249     this.inputColumns = inputColumns;
250   }
251 
252   /**
253    * Allows subclasses to get the {@link HTable}.
254    * @deprecated use {@link #getTable()}
255    */
256   @Deprecated
257   protected HTable getHTable() {
258     return (HTable) getTable();
259   }
260 
261   /**
262    * Allows subclasses to get the {@link Table}.
263    */
264   protected Table getTable() {
265     if (table == null) {
266       throw new IllegalStateException(NOT_INITIALIZED);
267     }
268     return this.table;
269   }
270 
271   /**
272    * Allows subclasses to set the {@link HTable}.
273    *
274    * @param table to get the data from
275    * @deprecated use {@link #initializeTable(Connection,TableName)}
276    */
277   @Deprecated
278   protected void setHTable(HTable table) {
279     this.table = table;
280   }
281 
282   /**
283    * Allows subclasses to set the {@link TableRecordReader}.
284    *
285    * @param tableRecordReader
286    *                to provide other {@link TableRecordReader} implementations.
287    */
288   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
289     this.tableRecordReader = tableRecordReader;
290   }
291 
292   /**
293    * Allows subclasses to set the {@link Filter} to be used.
294    *
295    * @param rowFilter
296    */
297   protected void setRowFilter(Filter rowFilter) {
298     this.rowFilter = rowFilter;
299   }
300 
301   /**
302    * Handle subclass specific set up.
303    * Each of the entry points used by the MapReduce framework,
304    * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
305    * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
306    * retrieving the necessary configuration information and calling
307    * {@link #initializeTable(Connection, TableName)}.
308    *
309    * Subclasses should implement their initialize call such that it is safe to call multiple times.
310    * The current TableInputFormatBase implementation relies on a non-null table reference to decide
311    * if an initialize call is needed, but this behavior may change in the future. In particular,
312    * it is critical that initializeTable not be called multiple times since this will leak
313    * Connection instances.
314    *
315    */
316   protected void initialize(JobConf job) throws IOException {
317   }
318 
319   /**
320    * Close the Table and related objects that were initialized via
321    * {@link #initializeTable(Connection, TableName)}.
322    *
323    * @throws IOException
324    */
325   protected void closeTable() throws IOException {
326     close(table, connection);
327     table = null;
328     connection = null;
329   }
330 
331   private void close(Closeable... closables) throws IOException {
332     for (Closeable c : closables) {
333       if(c != null) { c.close(); }
334     }
335   }
336 }