View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.ArrayList;
27  import java.util.HashMap;
28  import java.util.List;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Admin;
39  import org.apache.hadoop.hbase.client.Connection;
40  import org.apache.hadoop.hbase.client.RegionLocator;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45  import org.apache.hadoop.hbase.util.Addressing;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
49  import org.apache.hadoop.hbase.util.Strings;
50  import org.apache.hadoop.mapreduce.InputFormat;
51  import org.apache.hadoop.mapreduce.InputSplit;
52  import org.apache.hadoop.mapreduce.JobContext;
53  import org.apache.hadoop.mapreduce.RecordReader;
54  import org.apache.hadoop.mapreduce.TaskAttemptContext;
55  import org.apache.hadoop.net.DNS;
56  import org.apache.hadoop.util.StringUtils;
57  
58  /**
59   * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
60   * an {@link Scan} instance that defines the input columns etc. Subclasses may use
61   * other TableRecordReader implementations.
62   *
63   * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
64   * function properly. Each of the entry points to this class used by the MapReduce framework,
65   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
66   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
67   * retrieving the necessary configuration information. If your subclass overrides either of these
68   * methods, either call the parent version or call initialize yourself.
69   *
70   * <p>
71   * An example of a subclass:
72   * <pre>
73   *   class ExampleTIF extends TableInputFormatBase {
74   *
75   *     {@literal @}Override
76   *     protected void initialize(JobContext context) throws IOException {
77   *       // We are responsible for the lifecycle of this connection until we hand it over in
78   *       // initializeTable.
79   *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
80   *              job.getConfiguration()));
81   *       TableName tableName = TableName.valueOf("exampleTable");
82   *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
83   *       initializeTable(connection, tableName);
84   *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
85   *         Bytes.toBytes("columnB") };
86   *       // optional, by default we'll get everything for the table.
87   *       Scan scan = new Scan();
88   *       for (byte[] family : inputColumns) {
89   *         scan.addFamily(family);
90   *       }
91   *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
92   *       scan.setFilter(exampleFilter);
93   *       setScan(scan);
94   *     }
95   *   }
96   * </pre>
97   */
98  @InterfaceAudience.Public
99  @InterfaceStability.Stable
100 public abstract class TableInputFormatBase
101 extends InputFormat<ImmutableBytesWritable, Result> {
102 
103   /** Specify if we enable auto-balance for input in M/R jobs.*/
104   public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
105   /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
106    * .input.autobalance property.*/
107   public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
108           ".maxskewratio";
109   /** Specify if the row key in table is text (ASCII between 32~126),
110    * default is true. False means the table is using binary row key*/
111   public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
112 
113   private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
114 
115   private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
116       "initialized. Ensure you call initializeTable either in your constructor or initialize " +
117       "method";
118   private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
119             " previous error. Please look at the previous logs lines from" +
120             " the task's full log for more details.";
121 
122   /** Holds the details for the internal scanner.
123    *
124    * @see Scan */
125   private Scan scan = null;
126   /** The {@link Admin}. */
127   private Admin admin;
128   /** The {@link Table} to scan. */
129   private Table table;
130   /** The {@link RegionLocator} of the table. */
131   private RegionLocator regionLocator;
132   /** The reader scanning the table, can be a custom one. */
133   private TableRecordReader tableRecordReader = null;
134   /** The underlying {@link Connection} of the table. */
135   private Connection connection;
136 
137   
138   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
139   private HashMap<InetAddress, String> reverseDNSCacheMap =
140     new HashMap<InetAddress, String>();
141 
142   /**
143    * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
144    * the default.
145    *
146    * @param split  The split to work with.
147    * @param context  The current context.
148    * @return The newly created record reader.
149    * @throws IOException When creating the reader fails.
150    * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
151    *   org.apache.hadoop.mapreduce.InputSplit,
152    *   org.apache.hadoop.mapreduce.TaskAttemptContext)
153    */
154   @Override
155   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
156       InputSplit split, TaskAttemptContext context)
157   throws IOException {
158     // Just in case a subclass is relying on JobConfigurable magic.
159     if (table == null) {
160       initialize(context);
161     }
162     // null check in case our child overrides getTable to not throw.
163     try {
164       if (getTable() == null) {
165         // initialize() must not have been implemented in the subclass.
166         throw new IOException(INITIALIZATION_ERROR);
167       }
168     } catch (IllegalStateException exception) {
169       throw new IOException(INITIALIZATION_ERROR, exception);
170     }
171     TableSplit tSplit = (TableSplit) split;
172     LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
173     final TableRecordReader trr =
174         this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
175     Scan sc = new Scan(this.scan);
176     sc.setStartRow(tSplit.getStartRow());
177     sc.setStopRow(tSplit.getEndRow());
178     trr.setScan(sc);
179     trr.setTable(getTable());
180     return new RecordReader<ImmutableBytesWritable, Result>() {
181 
182       @Override
183       public void close() throws IOException {
184         trr.close();
185         closeTable();
186       }
187 
188       @Override
189       public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
190         return trr.getCurrentKey();
191       }
192 
193       @Override
194       public Result getCurrentValue() throws IOException, InterruptedException {
195         return trr.getCurrentValue();
196       }
197 
198       @Override
199       public float getProgress() throws IOException, InterruptedException {
200         return trr.getProgress();
201       }
202 
203       @Override
204       public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
205           InterruptedException {
206         trr.initialize(inputsplit, context);
207       }
208 
209       @Override
210       public boolean nextKeyValue() throws IOException, InterruptedException {
211         return trr.nextKeyValue();
212       }
213     };
214   }
215 
216   protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
217     return getRegionLocator().getStartEndKeys();
218   }
219 
220   /**
221    * Calculates the splits that will serve as input for the map tasks. The
222    * number of splits matches the number of regions in a table.
223    *
224    * @param context  The current job context.
225    * @return The list of input splits.
226    * @throws IOException When creating the list of splits fails.
227    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
228    *   org.apache.hadoop.mapreduce.JobContext)
229    */
230   @Override
231   public List<InputSplit> getSplits(JobContext context) throws IOException {
232     boolean closeOnFinish = false;
233 
234     // Just in case a subclass is relying on JobConfigurable magic.
235     if (table == null) {
236       initialize(context);
237       closeOnFinish = true;
238     }
239 
240     // null check in case our child overrides getTable to not throw.
241     try {
242       if (getTable() == null) {
243         // initialize() must not have been implemented in the subclass.
244         throw new IOException(INITIALIZATION_ERROR);
245       }
246     } catch (IllegalStateException exception) {
247       throw new IOException(INITIALIZATION_ERROR, exception);
248     }
249 
250     try {
251       RegionSizeCalculator sizeCalculator =
252           new RegionSizeCalculator(getRegionLocator(), getAdmin());
253       
254       TableName tableName = getTable().getName();
255   
256       Pair<byte[][], byte[][]> keys = getStartEndKeys();
257       if (keys == null || keys.getFirst() == null ||
258           keys.getFirst().length == 0) {
259         HRegionLocation regLoc =
260             getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
261         if (null == regLoc) {
262           throw new IOException("Expecting at least one region.");
263         }
264         List<InputSplit> splits = new ArrayList<InputSplit>(1);
265         long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
266         TableSplit split = new TableSplit(tableName, scan,
267             HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
268                 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
269         splits.add(split);
270         return splits;
271       }
272       List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
273       for (int i = 0; i < keys.getFirst().length; i++) {
274         if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
275           continue;
276         }
277         HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
278         // The below InetSocketAddress creation does a name resolution.
279         InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
280         if (isa.isUnresolved()) {
281           LOG.warn("Failed resolve " + isa);
282         }
283         InetAddress regionAddress = isa.getAddress();
284         String regionLocation;
285         regionLocation = reverseDNS(regionAddress);
286   
287         byte[] startRow = scan.getStartRow();
288         byte[] stopRow = scan.getStopRow();
289         // determine if the given start an stop key fall into the region
290         if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
291             Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
292             (stopRow.length == 0 ||
293              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
294           byte[] splitStart = startRow.length == 0 ||
295             Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
296               keys.getFirst()[i] : startRow;
297           byte[] splitStop = (stopRow.length == 0 ||
298             Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
299             keys.getSecond()[i].length > 0 ?
300               keys.getSecond()[i] : stopRow;
301   
302           byte[] regionName = location.getRegionInfo().getRegionName();
303           String encodedRegionName = location.getRegionInfo().getEncodedName();
304           long regionSize = sizeCalculator.getRegionSize(regionName);
305           TableSplit split = new TableSplit(tableName, scan,
306             splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
307           splits.add(split);
308           if (LOG.isDebugEnabled()) {
309             LOG.debug("getSplits: split -> " + i + " -> " + split);
310           }
311         }
312       }
313       //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
314       boolean enableAutoBalance = context.getConfiguration()
315         .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
316       if (enableAutoBalance) {
317         long totalRegionSize=0;
318         for (int i = 0; i < splits.size(); i++){
319           TableSplit ts = (TableSplit)splits.get(i);
320           totalRegionSize += ts.getLength();
321         }
322         long averageRegionSize = totalRegionSize / splits.size();
323         // the averageRegionSize must be positive.
324         if (averageRegionSize <= 0) {
325             LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
326                     "set it to 1.");
327             averageRegionSize = 1;
328         }
329         return calculateRebalancedSplits(splits, context, averageRegionSize);
330       } else {
331         return splits;
332       }
333     } finally {
334       if (closeOnFinish) {
335         closeTable();
336       }
337     }
338   }
339 
340   String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
341     String hostName = this.reverseDNSCacheMap.get(ipAddress);
342     if (hostName == null) {
343       String ipAddressString = null;
344       try {
345         ipAddressString = DNS.reverseDns(ipAddress, null);
346       } catch (Exception e) {
347         // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
348         // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
349         // reverse DNS using jndi doesn't work well with ipv6 addresses.
350         ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
351       }
352       if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
353       hostName = Strings.domainNamePointerToHostName(ipAddressString);
354       this.reverseDNSCacheMap.put(ipAddress, hostName);
355     }
356     return hostName;
357   }
358 
359   /**
360    * Calculates the number of MapReduce input splits for the map tasks. The number of
361    * MapReduce input splits depends on the average region size and the "data skew ratio" user set in
362    * configuration.
363    *
364    * @param list  The list of input splits before balance.
365    * @param context  The current job context.
366    * @param average  The average size of all regions .
367    * @return The list of input splits.
368    * @throws IOException When creating the list of splits fails.
369    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
370    *   org.apache.hadoop.mapreduce.JobContext)
371    */
372   private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
373                                                long average) throws IOException {
374     List<InputSplit> resultList = new ArrayList<InputSplit>();
375     Configuration conf = context.getConfiguration();
376     //The default data skew ratio is 3
377     long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
378     //It determines which mode to use: text key mode or binary key mode. The default is text mode.
379     boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
380     long dataSkewThreshold = dataSkewRatio * average;
381     int count = 0;
382     while (count < list.size()) {
383       TableSplit ts = (TableSplit)list.get(count);
384       TableName tableName = ts.getTable();
385       String regionLocation = ts.getRegionLocation();
386       String encodedRegionName = ts.getEncodedRegionName();
387       long regionSize = ts.getLength();
388       if (regionSize >= dataSkewThreshold) {
389         // if the current region size is large than the data skew threshold,
390         // split the region into two MapReduce input splits.
391         byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
392          //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
393          // MapReduce input splits is not far off.
394         TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, regionLocation,
395                 encodedRegionName, regionSize / 2);
396         TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
397                 encodedRegionName, regionSize - regionSize / 2);
398         resultList.add(t1);
399         resultList.add(t2);
400         count++;
401       } else if (regionSize >= average) {
402         // if the region size between average size and data skew threshold size,
403         // make this region as one MapReduce input split.
404         resultList.add(ts);
405         count++;
406       } else {
407         // if the total size of several small continuous regions less than the average region size,
408         // combine them into one MapReduce input split.
409         long totalSize = regionSize;
410         byte[] splitStartKey = ts.getStartRow();
411         byte[] splitEndKey = ts.getEndRow();
412         count++;
413         for (; count < list.size(); count++) {
414           TableSplit nextRegion = (TableSplit)list.get(count);
415           long nextRegionSize = nextRegion.getLength();
416           if (totalSize + nextRegionSize <= dataSkewThreshold) {
417             totalSize = totalSize + nextRegionSize;
418             splitEndKey = nextRegion.getEndRow();
419           } else {
420             break;
421           }
422         }
423         TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
424                 regionLocation, encodedRegionName, totalSize);
425         resultList.add(t);
426       }
427     }
428     return resultList;
429   }
430 
431   /**
432    * select a split point in the region. The selection of the split point is based on an uniform
433    * distribution assumption for the keys in a region.
434    * Here are some examples:
435    * startKey: aaabcdefg  endKey: aaafff    split point: aaad
436    * startKey: 111000  endKey: 1125790    split point: 111b
437    * startKey: 1110  endKey: 1120    split point: 111_
438    * startKey: binary key { 13, -19, 126, 127 }, endKey: binary key { 13, -19, 127, 0 },
439    * split point: binary key { 13, -19, 127, -64 }
440    * Set this function as "public static", make it easier for test.
441    *
442    * @param start Start key of the region
443    * @param end End key of the region
444    * @param isText It determines to use text key mode or binary key mode
445    * @return The split point in the region.
446    */
447   @InterfaceAudience.Private
448   public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
449     byte upperLimitByte;
450     byte lowerLimitByte;
451     //Use text mode or binary mode.
452     if (isText) {
453       //The range of text char set in ASCII is [32,126], the lower limit is space and the upper
454       // limit is '~'.
455       upperLimitByte = '~';
456       lowerLimitByte = ' ';
457     } else {
458       upperLimitByte = Byte.MAX_VALUE;
459       lowerLimitByte = Byte.MIN_VALUE;
460     }
461     // For special case
462     // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
463     // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
464     if (start.length == 0 && end.length == 0){
465       return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
466     }
467     if (start.length == 0 && end.length != 0){
468       return new byte[]{ end[0] };
469     }
470     if (start.length != 0 && end.length == 0){
471       byte[] result =new byte[start.length];
472       result[0]=start[0];
473       for (int k = 1; k < start.length; k++){
474           result[k] = upperLimitByte;
475       }
476       return result;
477     }
478     // A list to store bytes in split key
479     List resultBytesList = new ArrayList();
480     int maxLength = start.length > end.length ? start.length : end.length;
481     for (int i = 0; i < maxLength; i++) {
482       //calculate the midpoint byte between the first difference
483       //for example: "11ae" and "11chw", the midpoint is "11b"
484       //another example: "11ae" and "11bhw", the first different byte is 'a' and 'b',
485       // there is no midpoint between 'a' and 'b', so we need to check the next byte.
486       if (start[i] == end[i]) {
487         resultBytesList.add(start[i]);
488         //For special case like: startKey="aaa", endKey="aaaz", splitKey="aaaM"
489         if (i + 1 == start.length) {
490           resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2));
491           break;
492         }
493       } else {
494         //if the two bytes differ by 1, like ['a','b'], We need to check the next byte to find
495         // the midpoint.
496         if ((int)end[i] - (int)start[i] == 1) {
497           //get next byte after the first difference
498           byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte;
499           byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte;
500           int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte) + 1;
501           int halfRange = byteRange / 2;
502           if ((int)startNextByte + halfRange > (int)upperLimitByte) {
503             resultBytesList.add(end[i]);
504             resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte +
505                     lowerLimitByte));
506           } else {
507             resultBytesList.add(start[i]);
508             resultBytesList.add((byte) (startNextByte + halfRange));
509           }
510         } else {
511           //calculate the midpoint key by the fist different byte (normal case),
512           // like "11ae" and "11chw", the midpoint is "11b"
513           resultBytesList.add((byte) ((start[i] + end[i]) / 2));
514         }
515         break;
516       }
517     }
518     //transform the List of bytes to byte[]
519     byte[] result = new byte[resultBytesList.size()];
520     for (int k = 0; k < resultBytesList.size(); k++) {
521       result[k] = (byte) resultBytesList.get(k);
522     }
523     return result;
524   }
525 
526   /**
527    * Test if the given region is to be included in the InputSplit while splitting
528    * the regions of a table.
529    * <p>
530    * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
531    * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
532    * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
533    * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
534    * <br>
535    * <br>
536    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
537    * <br>
538    * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
539    *
540    *
541    * @param startKey Start key of the region
542    * @param endKey End key of the region
543    * @return true, if this region needs to be included as part of the input (default).
544    *
545    */
546   protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
547     return true;
548   }
549 
550   /**
551    * Allows subclasses to get the {@link RegionLocator}.
552    */
553   protected RegionLocator getRegionLocator() {
554     if (regionLocator == null) {
555       throw new IllegalStateException(NOT_INITIALIZED);
556     }
557     return regionLocator;
558   }
559   
560   /**
561    * Allows subclasses to get the {@link Table}.
562    */
563   protected Table getTable() {
564     if (table == null) {
565       throw new IllegalStateException(NOT_INITIALIZED);
566     }
567     return table;
568   }
569 
570   /**
571    * Allows subclasses to get the {@link Admin}.
572    */
573   protected Admin getAdmin() {
574     if (admin == null) {
575       throw new IllegalStateException(NOT_INITIALIZED);
576     }
577     return admin;
578   }
579 
580   /**
581    * Allows subclasses to initialize the table information.
582    *
583    * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
584    * @param tableName  The {@link TableName} of the table to process. 
585    * @throws IOException 
586    */
587   protected void initializeTable(Connection connection, TableName tableName) throws IOException {
588     if (this.table != null || this.connection != null) {
589       LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
590           "reference; TableInputFormatBase will not close these old references when done.");
591     }
592     this.table = connection.getTable(tableName);
593     this.regionLocator = connection.getRegionLocator(tableName);
594     this.admin = connection.getAdmin();
595     this.connection = connection;
596   }
597 
598   /**
599    * Gets the scan defining the actual details like columns etc.
600    *
601    * @return The internal scan instance.
602    */
603   public Scan getScan() {
604     if (this.scan == null) this.scan = new Scan();
605     return scan;
606   }
607 
608   /**
609    * Sets the scan defining the actual details like columns etc.
610    *
611    * @param scan  The scan to set.
612    */
613   public void setScan(Scan scan) {
614     this.scan = scan;
615   }
616 
617   /**
618    * Allows subclasses to set the {@link TableRecordReader}.
619    *
620    * @param tableRecordReader A different {@link TableRecordReader}
621    *   implementation.
622    */
623   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
624     this.tableRecordReader = tableRecordReader;
625   }
626   
627   /**
628    * Handle subclass specific set up.
629    * Each of the entry points used by the MapReduce framework,
630    * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
631    * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
632    * retrieving the necessary configuration information and calling
633    * {@link #initializeTable(Connection, TableName)}.
634    *
635    * Subclasses should implement their initialize call such that it is safe to call multiple times.
636    * The current TableInputFormatBase implementation relies on a non-null table reference to decide
637    * if an initialize call is needed, but this behavior may change in the future. In particular,
638    * it is critical that initializeTable not be called multiple times since this will leak
639    * Connection instances.
640    *
641    */
642   protected void initialize(JobContext context) throws IOException {
643   }
644 
645   /**
646    * Close the Table and related objects that were initialized via
647    * {@link #initializeTable(Connection, TableName)}.
648    *
649    * @throws IOException
650    */
651   protected void closeTable() throws IOException {
652     close(admin, table, regionLocator, connection);
653     admin = null;
654     table = null;
655     regionLocator = null;
656     connection = null;
657   }
658 
659   private void close(Closeable... closables) throws IOException {
660     for (Closeable c : closables) {
661       if(c != null) { c.close(); }
662     }
663   }
664 
665 }