001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.UnknownHostException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.RegionLocator;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
043import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
044import org.apache.hadoop.hbase.util.Addressing;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.util.Strings;
048import org.apache.hadoop.mapreduce.InputFormat;
049import org.apache.hadoop.mapreduce.InputSplit;
050import org.apache.hadoop.mapreduce.JobContext;
051import org.apache.hadoop.mapreduce.RecordReader;
052import org.apache.hadoop.mapreduce.TaskAttemptContext;
053import org.apache.hadoop.net.DNS;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
056
057/**
058 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
059 * an {@link Scan} instance that defines the input columns etc. Subclasses may use
060 * other TableRecordReader implementations.
061 *
062 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
063 * function properly. Each of the entry points to this class used by the MapReduce framework,
064 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
065 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
066 * retrieving the necessary configuration information. If your subclass overrides either of these
067 * methods, either call the parent version or call initialize yourself.
068 *
069 * <p>
070 * An example of a subclass:
071 * <pre>
072 *   class ExampleTIF extends TableInputFormatBase {
073 *
074 *     {@literal @}Override
075 *     protected void initialize(JobContext context) throws IOException {
076 *       // We are responsible for the lifecycle of this connection until we hand it over in
077 *       // initializeTable.
078 *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
079 *              job.getConfiguration()));
080 *       TableName tableName = TableName.valueOf("exampleTable");
081 *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
082 *       initializeTable(connection, tableName);
083 *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
084 *         Bytes.toBytes("columnB") };
085 *       // optional, by default we'll get everything for the table.
086 *       Scan scan = new Scan();
087 *       for (byte[] family : inputColumns) {
088 *         scan.addFamily(family);
089 *       }
090 *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
091 *       scan.setFilter(exampleFilter);
092 *       setScan(scan);
093 *     }
094 *   }
095 * </pre>
096 *
097 *
098 * The number of InputSplits(mappers) match the number of regions in a table by default.
099 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set
100 * this property will disable autobalance below.\
101 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers
102 * based on average region size; For regions, whose size larger than average region size may assigned
103 * more mappers, and for smaller one, they may group together to use one mapper. If actual average
104 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions.
105 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece",
106 * default mas average region size is 8G.
107 */
108@InterfaceAudience.Public
109public abstract class TableInputFormatBase
110    extends InputFormat<ImmutableBytesWritable, Result> {
111
112  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class);
113
114  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
115      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
116      "method";
117  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
118      " previous error. Please look at the previous logs lines from" +
119      " the task's full log for more details.";
120
121  /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
122  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance";
123  /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */
124  public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize";
125
126  /** Set the number of Mappers for each region, all regions have same number of Mappers */
127  public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region";
128
129
130  /** Holds the details for the internal scanner.
131   *
132   * @see Scan */
133  private Scan scan = null;
134  /** The {@link Admin}. */
135  private Admin admin;
136  /** The {@link Table} to scan. */
137  private Table table;
138  /** The {@link RegionLocator} of the table. */
139  private RegionLocator regionLocator;
140  /** The reader scanning the table, can be a custom one. */
141  private TableRecordReader tableRecordReader = null;
142  /** The underlying {@link Connection} of the table. */
143  private Connection connection;
144
145
146  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
147  private HashMap<InetAddress, String> reverseDNSCacheMap =
148      new HashMap<>();
149
150  /**
151   * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
152   * the default.
153   *
154   * @param split  The split to work with.
155   * @param context  The current context.
156   * @return The newly created record reader.
157   * @throws IOException When creating the reader fails.
158   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
159   *   org.apache.hadoop.mapreduce.InputSplit,
160   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
161   */
162  @Override
163  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
164      InputSplit split, TaskAttemptContext context)
165      throws IOException {
166    // Just in case a subclass is relying on JobConfigurable magic.
167    if (table == null) {
168      initialize(context);
169    }
170    // null check in case our child overrides getTable to not throw.
171    try {
172      if (getTable() == null) {
173        // initialize() must not have been implemented in the subclass.
174        throw new IOException(INITIALIZATION_ERROR);
175      }
176    } catch (IllegalStateException exception) {
177      throw new IOException(INITIALIZATION_ERROR, exception);
178    }
179    TableSplit tSplit = (TableSplit) split;
180    LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
181    final TableRecordReader trr =
182        this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
183    Scan sc = new Scan(this.scan);
184    sc.setStartRow(tSplit.getStartRow());
185    sc.setStopRow(tSplit.getEndRow());
186    trr.setScan(sc);
187    trr.setTable(getTable());
188    return new RecordReader<ImmutableBytesWritable, Result>() {
189
190      @Override
191      public void close() throws IOException {
192        trr.close();
193        closeTable();
194      }
195
196      @Override
197      public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
198        return trr.getCurrentKey();
199      }
200
201      @Override
202      public Result getCurrentValue() throws IOException, InterruptedException {
203        return trr.getCurrentValue();
204      }
205
206      @Override
207      public float getProgress() throws IOException, InterruptedException {
208        return trr.getProgress();
209      }
210
211      @Override
212      public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
213          InterruptedException {
214        trr.initialize(inputsplit, context);
215      }
216
217      @Override
218      public boolean nextKeyValue() throws IOException, InterruptedException {
219        return trr.nextKeyValue();
220      }
221    };
222  }
223
224  protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
225    return getRegionLocator().getStartEndKeys();
226  }
227
228  /**
229   * Calculates the splits that will serve as input for the map tasks.
230   * @param context  The current job context.
231   * @return The list of input splits.
232   * @throws IOException When creating the list of splits fails.
233   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
234   *   org.apache.hadoop.mapreduce.JobContext)
235   */
236  @Override
237  public List<InputSplit> getSplits(JobContext context) throws IOException {
238    boolean closeOnFinish = false;
239
240    // Just in case a subclass is relying on JobConfigurable magic.
241    if (table == null) {
242      initialize(context);
243      closeOnFinish = true;
244    }
245
246    // null check in case our child overrides getTable to not throw.
247    try {
248      if (getTable() == null) {
249        // initialize() must not have been implemented in the subclass.
250        throw new IOException(INITIALIZATION_ERROR);
251      }
252    } catch (IllegalStateException exception) {
253      throw new IOException(INITIALIZATION_ERROR, exception);
254    }
255
256    try {
257      List<InputSplit> splits = oneInputSplitPerRegion();
258
259      // set same number of mappers for each region
260      if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
261        int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1);
262        List<InputSplit> res = new ArrayList<>();
263        for (int i = 0; i < splits.size(); i++) {
264          List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
265          res.addAll(tmp);
266        }
267        return res;
268      }
269
270      //The default value of "hbase.mapreduce.input.autobalance" is false.
271      if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) {
272        long maxAveRegionSize = context.getConfiguration()
273            .getLong(MAX_AVERAGE_REGION_SIZE, 8L*1073741824); //8GB
274        return calculateAutoBalancedSplits(splits, maxAveRegionSize);
275      }
276
277      // return one mapper per region
278      return splits;
279    } finally {
280      if (closeOnFinish) {
281        closeTable();
282      }
283    }
284  }
285
286  /**
287   * Create one InputSplit per region
288   *
289   * @return The list of InputSplit for all the regions
290   * @throws IOException
291   */
292  private List<InputSplit> oneInputSplitPerRegion() throws IOException {
293    RegionSizeCalculator sizeCalculator =
294        createRegionSizeCalculator(getRegionLocator(), getAdmin());
295
296    TableName tableName = getTable().getName();
297
298    Pair<byte[][], byte[][]> keys = getStartEndKeys();
299    if (keys == null || keys.getFirst() == null ||
300        keys.getFirst().length == 0) {
301      HRegionLocation regLoc =
302          getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
303      if (null == regLoc) {
304        throw new IOException("Expecting at least one region.");
305      }
306      List<InputSplit> splits = new ArrayList<>(1);
307      long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
308      TableSplit split = new TableSplit(tableName, scan,
309          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
310          .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
311      splits.add(split);
312      return splits;
313    }
314    List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
315    for (int i = 0; i < keys.getFirst().length; i++) {
316      if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
317        continue;
318      }
319
320      byte[] startRow = scan.getStartRow();
321      byte[] stopRow = scan.getStopRow();
322      // determine if the given start an stop key fall into the region
323      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
324          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
325          (stopRow.length == 0 ||
326              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
327        byte[] splitStart = startRow.length == 0 ||
328            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
329            keys.getFirst()[i] : startRow;
330        byte[] splitStop = (stopRow.length == 0 ||
331            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
332            keys.getSecond()[i].length > 0 ?
333            keys.getSecond()[i] : stopRow;
334
335        HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
336        // The below InetSocketAddress creation does a name resolution.
337        InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
338        if (isa.isUnresolved()) {
339          LOG.warn("Failed resolve " + isa);
340        }
341        InetAddress regionAddress = isa.getAddress();
342        String regionLocation;
343        regionLocation = reverseDNS(regionAddress);
344
345        byte[] regionName = location.getRegionInfo().getRegionName();
346        String encodedRegionName = location.getRegionInfo().getEncodedName();
347        long regionSize = sizeCalculator.getRegionSize(regionName);
348        TableSplit split = new TableSplit(tableName, scan,
349            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
350        splits.add(split);
351        if (LOG.isDebugEnabled()) {
352          LOG.debug("getSplits: split -> " + i + " -> " + split);
353        }
354      }
355    }
356    return splits;
357  }
358
359  /**
360   * Create n splits for one InputSplit, For now only support uniform distribution
361   * @param split A TableSplit corresponding to a range of rowkeys
362   * @param n     Number of ranges after splitting.  Pass 1 means no split for the range
363   *              Pass 2 if you want to split the range in two;
364   * @return A list of TableSplit, the size of the list is n
365   * @throws IllegalArgumentIOException
366   */
367  protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
368      throws IllegalArgumentIOException {
369    if (split == null || !(split instanceof TableSplit)) {
370      throw new IllegalArgumentIOException(
371          "InputSplit for CreateNSplitsPerRegion can not be null + "
372              + "and should be instance of TableSplit");
373    }
374    //if n < 1, then still continue using n = 1
375    n = n < 1 ? 1 : n;
376    List<InputSplit> res = new ArrayList<>(n);
377    if (n == 1) {
378      res.add(split);
379      return res;
380    }
381
382    // Collect Region related information
383    TableSplit ts = (TableSplit) split;
384    TableName tableName = ts.getTable();
385    String regionLocation = ts.getRegionLocation();
386    String encodedRegionName = ts.getEncodedRegionName();
387    long regionSize = ts.getLength();
388    byte[] startRow = ts.getStartRow();
389    byte[] endRow = ts.getEndRow();
390
391    // For special case: startRow or endRow is empty
392    if (startRow.length == 0 && endRow.length == 0){
393      startRow = new byte[1];
394      endRow = new byte[1];
395      startRow[0] = 0;
396      endRow[0] = -1;
397    }
398    if (startRow.length == 0 && endRow.length != 0){
399      startRow = new byte[1];
400      startRow[0] = 0;
401    }
402    if (startRow.length != 0 && endRow.length == 0){
403      endRow =new byte[startRow.length];
404      for (int k = 0; k < startRow.length; k++){
405        endRow[k] = -1;
406      }
407    }
408
409    // Split Region into n chunks evenly
410    byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
411    for (int i = 0; i < splitKeys.length - 1; i++) {
412      //notice that the regionSize parameter may be not very accurate
413      TableSplit tsplit =
414          new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
415              encodedRegionName, regionSize / n);
416      res.add(tsplit);
417    }
418    return res;
419  }
420  /**
421   * Calculates the number of MapReduce input splits for the map tasks. The number of
422   * MapReduce input splits depends on the average region size.
423   * Make it 'public' for testing
424   *
425   * @param splits The list of input splits before balance.
426   * @param maxAverageRegionSize max Average region size for one mapper
427   * @return The list of input splits.
428   * @throws IOException When creating the list of splits fails.
429   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
430   *org.apache.hadoop.mapreduce.JobContext)
431   */
432  public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize)
433      throws IOException {
434    if (splits.size() == 0) {
435      return splits;
436    }
437    List<InputSplit> resultList = new ArrayList<>();
438    long totalRegionSize = 0;
439    for (int i = 0; i < splits.size(); i++) {
440      TableSplit ts = (TableSplit) splits.get(i);
441      totalRegionSize += ts.getLength();
442    }
443    long averageRegionSize = totalRegionSize / splits.size();
444    // totalRegionSize might be overflow, and the averageRegionSize must be positive.
445    if (averageRegionSize <= 0) {
446      LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
447          "set it to Long.MAX_VALUE " + splits.size());
448      averageRegionSize = Long.MAX_VALUE / splits.size();
449    }
450    //if averageRegionSize is too big, change it to default as 1 GB,
451    if (averageRegionSize > maxAverageRegionSize) {
452      averageRegionSize = maxAverageRegionSize;
453    }
454    // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region
455    // set default as 16M = (default hdfs block size) / 4;
456    if (averageRegionSize < 16 * 1048576) {
457      return splits;
458    }
459    for (int i = 0; i < splits.size(); i++) {
460      TableSplit ts = (TableSplit) splits.get(i);
461      TableName tableName = ts.getTable();
462      String regionLocation = ts.getRegionLocation();
463      String encodedRegionName = ts.getEncodedRegionName();
464      long regionSize = ts.getLength();
465
466      if (regionSize >= averageRegionSize) {
467        // make this region as multiple MapReduce input split.
468        int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0);
469        List<InputSplit> temp = createNInputSplitsUniform(ts, n);
470        resultList.addAll(temp);
471      } else {
472        // if the total size of several small continuous regions less than the average region size,
473        // combine them into one MapReduce input split.
474        long totalSize = regionSize;
475        byte[] splitStartKey = ts.getStartRow();
476        byte[] splitEndKey = ts.getEndRow();
477        int j = i + 1;
478        while (j < splits.size()) {
479          TableSplit nextRegion = (TableSplit) splits.get(j);
480          long nextRegionSize = nextRegion.getLength();
481          if (totalSize + nextRegionSize <= averageRegionSize
482              && Bytes.equals(splitEndKey, nextRegion.getStartRow())) {
483            totalSize = totalSize + nextRegionSize;
484            splitEndKey = nextRegion.getEndRow();
485            j++;
486          } else {
487            break;
488          }
489        }
490        i = j - 1;
491        TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
492            encodedRegionName, totalSize);
493        resultList.add(t);
494      }
495    }
496    return resultList;
497  }
498
499  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
500    String hostName = this.reverseDNSCacheMap.get(ipAddress);
501    if (hostName == null) {
502      String ipAddressString = null;
503      try {
504        ipAddressString = DNS.reverseDns(ipAddress, null);
505      } catch (Exception e) {
506        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
507        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
508        // reverse DNS using jndi doesn't work well with ipv6 addresses.
509        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
510      }
511      if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
512      hostName = Strings.domainNamePointerToHostName(ipAddressString);
513      this.reverseDNSCacheMap.put(ipAddress, hostName);
514    }
515    return hostName;
516  }
517
518  /**
519   * Test if the given region is to be included in the InputSplit while splitting
520   * the regions of a table.
521   * <p>
522   * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
523   * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
524   * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
525   * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
526   * <br>
527   * <br>
528   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
529   * <br>
530   * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
531   *
532   *
533   * @param startKey Start key of the region
534   * @param endKey End key of the region
535   * @return true, if this region needs to be included as part of the input (default).
536   *
537   */
538  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
539    return true;
540  }
541
542  /**
543   * Allows subclasses to get the {@link RegionLocator}.
544   */
545  protected RegionLocator getRegionLocator() {
546    if (regionLocator == null) {
547      throw new IllegalStateException(NOT_INITIALIZED);
548    }
549    return regionLocator;
550  }
551
552  /**
553   * Allows subclasses to get the {@link Table}.
554   */
555  protected Table getTable() {
556    if (table == null) {
557      throw new IllegalStateException(NOT_INITIALIZED);
558    }
559    return table;
560  }
561
562  /**
563   * Allows subclasses to get the {@link Admin}.
564   */
565  protected Admin getAdmin() {
566    if (admin == null) {
567      throw new IllegalStateException(NOT_INITIALIZED);
568    }
569    return admin;
570  }
571
572  /**
573   * Allows subclasses to initialize the table information.
574   *
575   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
576   * @param tableName  The {@link TableName} of the table to process.
577   * @throws IOException
578   */
579  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
580    if (this.table != null || this.connection != null) {
581      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
582          "reference; TableInputFormatBase will not close these old references when done.");
583    }
584    this.table = connection.getTable(tableName);
585    this.regionLocator = connection.getRegionLocator(tableName);
586    this.admin = connection.getAdmin();
587    this.connection = connection;
588  }
589
590  @VisibleForTesting
591  protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
592      throws IOException {
593    return new RegionSizeCalculator(locator, admin);
594  }
595
596  /**
597   * Gets the scan defining the actual details like columns etc.
598   *
599   * @return The internal scan instance.
600   */
601  public Scan getScan() {
602    if (this.scan == null) this.scan = new Scan();
603    return scan;
604  }
605
606  /**
607   * Sets the scan defining the actual details like columns etc.
608   *
609   * @param scan  The scan to set.
610   */
611  public void setScan(Scan scan) {
612    this.scan = scan;
613  }
614
615  /**
616   * Allows subclasses to set the {@link TableRecordReader}.
617   *
618   * @param tableRecordReader A different {@link TableRecordReader}
619   *   implementation.
620   */
621  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
622    this.tableRecordReader = tableRecordReader;
623  }
624
625  /**
626   * Handle subclass specific set up.
627   * Each of the entry points used by the MapReduce framework,
628   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
629   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
630   * retrieving the necessary configuration information and calling
631   * {@link #initializeTable(Connection, TableName)}.
632   *
633   * Subclasses should implement their initialize call such that it is safe to call multiple times.
634   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
635   * if an initialize call is needed, but this behavior may change in the future. In particular,
636   * it is critical that initializeTable not be called multiple times since this will leak
637   * Connection instances.
638   *
639   */
640  protected void initialize(JobContext context) throws IOException {
641  }
642
643  /**
644   * Close the Table and related objects that were initialized via
645   * {@link #initializeTable(Connection, TableName)}.
646   *
647   * @throws IOException
648   */
649  protected void closeTable() throws IOException {
650    close(admin, table, regionLocator, connection);
651    admin = null;
652    table = null;
653    regionLocator = null;
654    connection = null;
655  }
656
657  private void close(Closeable... closables) throws IOException {
658    for (Closeable c : closables) {
659      if(c != null) { c.close(); }
660    }
661  }
662
663}