001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.UnknownHostException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.RegionLocator;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.util.Addressing;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Pair;
043import org.apache.hadoop.hbase.util.Strings;
044import org.apache.hadoop.mapreduce.InputFormat;
045import org.apache.hadoop.mapreduce.InputSplit;
046import org.apache.hadoop.mapreduce.JobContext;
047import org.apache.hadoop.mapreduce.RecordReader;
048import org.apache.hadoop.mapreduce.TaskAttemptContext;
049import org.apache.hadoop.net.DNS;
050import org.apache.hadoop.util.StringUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
057 * an {@link Scan} instance that defines the input columns etc. Subclasses may use
058 * other TableRecordReader implementations.
059 *
060 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
061 * function properly. Each of the entry points to this class used by the MapReduce framework,
062 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
063 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
064 * retrieving the necessary configuration information. If your subclass overrides either of these
065 * methods, either call the parent version or call initialize yourself.
066 *
067 * <p>
068 * An example of a subclass:
069 * <pre>
070 *   class ExampleTIF extends TableInputFormatBase {
071 *
072 *     {@literal @}Override
073 *     protected void initialize(JobContext context) throws IOException {
074 *       // We are responsible for the lifecycle of this connection until we hand it over in
075 *       // initializeTable.
076 *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
077 *              job.getConfiguration()));
078 *       TableName tableName = TableName.valueOf("exampleTable");
079 *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
080 *       initializeTable(connection, tableName);
081 *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
082 *         Bytes.toBytes("columnB") };
083 *       // optional, by default we'll get everything for the table.
084 *       Scan scan = new Scan();
085 *       for (byte[] family : inputColumns) {
086 *         scan.addFamily(family);
087 *       }
088 *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
089 *       scan.setFilter(exampleFilter);
090 *       setScan(scan);
091 *     }
092 *   }
093 * </pre>
094 *
095 *
096 * The number of InputSplits(mappers) match the number of regions in a table by default.
097 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set
098 * this property will disable autobalance below.\
099 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers
100 * based on average region size; For regions, whose size larger than average region size may assigned
101 * more mappers, and for smaller one, they may group together to use one mapper. If actual average
102 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions.
103 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece",
104 * default mas average region size is 8G.
105 */
106@InterfaceAudience.Public
107public abstract class TableInputFormatBase
108    extends InputFormat<ImmutableBytesWritable, Result> {
109
110  private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class);
111
112  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
113      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
114      "method";
115  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
116      " previous error. Please look at the previous logs lines from" +
117      " the task's full log for more details.";
118
119  /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
120  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance";
121  /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */
122  public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize";
123
124  /** Set the number of Mappers for each region, all regions have same number of Mappers */
125  public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region";
126
127
128  /** Holds the details for the internal scanner.
129   *
130   * @see Scan */
131  private Scan scan = null;
132  /** The {@link Admin}. */
133  private Admin admin;
134  /** The {@link Table} to scan. */
135  private Table table;
136  /** The {@link RegionLocator} of the table. */
137  private RegionLocator regionLocator;
138  /** The reader scanning the table, can be a custom one. */
139  private TableRecordReader tableRecordReader = null;
140  /** The underlying {@link Connection} of the table. */
141  private Connection connection;
142
143
144  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
145  private HashMap<InetAddress, String> reverseDNSCacheMap =
146      new HashMap<>();
147
148  /**
149   * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
150   * the default.
151   *
152   * @param split  The split to work with.
153   * @param context  The current context.
154   * @return The newly created record reader.
155   * @throws IOException When creating the reader fails.
156   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
157   *   org.apache.hadoop.mapreduce.InputSplit,
158   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
159   */
160  @Override
161  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
162      InputSplit split, TaskAttemptContext context)
163      throws IOException {
164    // Just in case a subclass is relying on JobConfigurable magic.
165    if (table == null) {
166      initialize(context);
167    }
168    // null check in case our child overrides getTable to not throw.
169    try {
170      if (getTable() == null) {
171        // initialize() must not have been implemented in the subclass.
172        throw new IOException(INITIALIZATION_ERROR);
173      }
174    } catch (IllegalStateException exception) {
175      throw new IOException(INITIALIZATION_ERROR, exception);
176    }
177    TableSplit tSplit = (TableSplit) split;
178    LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
179    final TableRecordReader trr =
180        this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
181    Scan sc = new Scan(this.scan);
182    sc.setStartRow(tSplit.getStartRow());
183    sc.setStopRow(tSplit.getEndRow());
184    trr.setScan(sc);
185    trr.setTable(getTable());
186    return new RecordReader<ImmutableBytesWritable, Result>() {
187
188      @Override
189      public void close() throws IOException {
190        trr.close();
191        closeTable();
192      }
193
194      @Override
195      public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
196        return trr.getCurrentKey();
197      }
198
199      @Override
200      public Result getCurrentValue() throws IOException, InterruptedException {
201        return trr.getCurrentValue();
202      }
203
204      @Override
205      public float getProgress() throws IOException, InterruptedException {
206        return trr.getProgress();
207      }
208
209      @Override
210      public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
211          InterruptedException {
212        trr.initialize(inputsplit, context);
213      }
214
215      @Override
216      public boolean nextKeyValue() throws IOException, InterruptedException {
217        return trr.nextKeyValue();
218      }
219    };
220  }
221
222  protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
223    return getRegionLocator().getStartEndKeys();
224  }
225
226  /**
227   * Calculates the splits that will serve as input for the map tasks.
228   * @param context  The current job context.
229   * @return The list of input splits.
230   * @throws IOException When creating the list of splits fails.
231   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
232   *   org.apache.hadoop.mapreduce.JobContext)
233   */
234  @Override
235  public List<InputSplit> getSplits(JobContext context) throws IOException {
236    boolean closeOnFinish = false;
237
238    // Just in case a subclass is relying on JobConfigurable magic.
239    if (table == null) {
240      initialize(context);
241      closeOnFinish = true;
242    }
243
244    // null check in case our child overrides getTable to not throw.
245    try {
246      if (getTable() == null) {
247        // initialize() must not have been implemented in the subclass.
248        throw new IOException(INITIALIZATION_ERROR);
249      }
250    } catch (IllegalStateException exception) {
251      throw new IOException(INITIALIZATION_ERROR, exception);
252    }
253
254    try {
255      List<InputSplit> splits = oneInputSplitPerRegion();
256
257      // set same number of mappers for each region
258      if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
259        int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1);
260        List<InputSplit> res = new ArrayList<>();
261        for (int i = 0; i < splits.size(); i++) {
262          List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
263          res.addAll(tmp);
264        }
265        return res;
266      }
267
268      //The default value of "hbase.mapreduce.input.autobalance" is false.
269      if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) {
270        long maxAveRegionSize = context.getConfiguration()
271            .getLong(MAX_AVERAGE_REGION_SIZE, 8L*1073741824); //8GB
272        return calculateAutoBalancedSplits(splits, maxAveRegionSize);
273      }
274
275      // return one mapper per region
276      return splits;
277    } finally {
278      if (closeOnFinish) {
279        closeTable();
280      }
281    }
282  }
283
284  /**
285   * Create one InputSplit per region
286   *
287   * @return The list of InputSplit for all the regions
288   * @throws IOException throws IOException
289   */
290  private List<InputSplit> oneInputSplitPerRegion() throws IOException {
291    RegionSizeCalculator sizeCalculator =
292        createRegionSizeCalculator(getRegionLocator(), getAdmin());
293
294    TableName tableName = getTable().getName();
295
296    Pair<byte[][], byte[][]> keys = getStartEndKeys();
297    if (keys == null || keys.getFirst() == null ||
298        keys.getFirst().length == 0) {
299      HRegionLocation regLoc =
300          getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
301      if (null == regLoc) {
302        throw new IOException("Expecting at least one region.");
303      }
304      List<InputSplit> splits = new ArrayList<>(1);
305      long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
306      // In the table input format for single table we do not need to
307      // store the scan object in table split because it can be memory intensive and redundant
308      // information to what is already stored in conf SCAN. See HBASE-25212
309      TableSplit split = new TableSplit(tableName, null,
310          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
311          .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
312      splits.add(split);
313      return splits;
314    }
315    List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
316    for (int i = 0; i < keys.getFirst().length; i++) {
317      if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
318        continue;
319      }
320
321      byte[] startRow = scan.getStartRow();
322      byte[] stopRow = scan.getStopRow();
323      // determine if the given start an stop key fall into the region
324      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
325          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
326          (stopRow.length == 0 ||
327              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
328        byte[] splitStart = startRow.length == 0 ||
329            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
330            keys.getFirst()[i] : startRow;
331        byte[] splitStop = (stopRow.length == 0 ||
332            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
333            keys.getSecond()[i].length > 0 ?
334            keys.getSecond()[i] : stopRow;
335
336        HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
337        // The below InetSocketAddress creation does a name resolution.
338        InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
339        if (isa.isUnresolved()) {
340          LOG.warn("Failed resolve " + isa);
341        }
342        InetAddress regionAddress = isa.getAddress();
343        String regionLocation;
344        regionLocation = reverseDNS(regionAddress);
345
346        byte[] regionName = location.getRegionInfo().getRegionName();
347        String encodedRegionName = location.getRegionInfo().getEncodedName();
348        long regionSize = sizeCalculator.getRegionSize(regionName);
349        // In the table input format for single table we do not need to
350        // store the scan object in table split because it can be memory intensive and redundant
351        // information to what is already stored in conf SCAN. See HBASE-25212
352        TableSplit split = new TableSplit(tableName, null,
353            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
354        splits.add(split);
355        if (LOG.isDebugEnabled()) {
356          LOG.debug("getSplits: split -> " + i + " -> " + split);
357        }
358      }
359    }
360    return splits;
361  }
362
363  /**
364   * Create n splits for one InputSplit, For now only support uniform distribution
365   * @param split A TableSplit corresponding to a range of rowkeys
366   * @param n     Number of ranges after splitting.  Pass 1 means no split for the range
367   *              Pass 2 if you want to split the range in two;
368   * @return A list of TableSplit, the size of the list is n
369   * @throws IllegalArgumentIOException throws IllegalArgumentIOException
370   */
371  protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
372      throws IllegalArgumentIOException {
373    if (split == null || !(split instanceof TableSplit)) {
374      throw new IllegalArgumentIOException(
375          "InputSplit for CreateNSplitsPerRegion can not be null + "
376              + "and should be instance of TableSplit");
377    }
378    //if n < 1, then still continue using n = 1
379    n = n < 1 ? 1 : n;
380    List<InputSplit> res = new ArrayList<>(n);
381    if (n == 1) {
382      res.add(split);
383      return res;
384    }
385
386    // Collect Region related information
387    TableSplit ts = (TableSplit) split;
388    TableName tableName = ts.getTable();
389    String regionLocation = ts.getRegionLocation();
390    String encodedRegionName = ts.getEncodedRegionName();
391    long regionSize = ts.getLength();
392    byte[] startRow = ts.getStartRow();
393    byte[] endRow = ts.getEndRow();
394
395    // For special case: startRow or endRow is empty
396    if (startRow.length == 0 && endRow.length == 0){
397      startRow = new byte[1];
398      endRow = new byte[1];
399      startRow[0] = 0;
400      endRow[0] = -1;
401    }
402    if (startRow.length == 0 && endRow.length != 0){
403      startRow = new byte[1];
404      startRow[0] = 0;
405    }
406    if (startRow.length != 0 && endRow.length == 0){
407      endRow =new byte[startRow.length];
408      for (int k = 0; k < startRow.length; k++){
409        endRow[k] = -1;
410      }
411    }
412
413    // Split Region into n chunks evenly
414    byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
415    for (int i = 0; i < splitKeys.length - 1; i++) {
416      // In the table input format for single table we do not need to
417      // store the scan object in table split because it can be memory intensive and redundant
418      // information to what is already stored in conf SCAN. See HBASE-25212
419      //notice that the regionSize parameter may be not very accurate
420      TableSplit tsplit =
421          new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation,
422              encodedRegionName, regionSize / n);
423      res.add(tsplit);
424    }
425    return res;
426  }
427  /**
428   * Calculates the number of MapReduce input splits for the map tasks. The number of
429   * MapReduce input splits depends on the average region size.
430   * Make it 'public' for testing
431   *
432   * @param splits The list of input splits before balance.
433   * @param maxAverageRegionSize max Average region size for one mapper
434   * @return The list of input splits.
435   * @throws IOException When creating the list of splits fails.
436   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
437   *org.apache.hadoop.mapreduce.JobContext)
438   */
439  public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize)
440      throws IOException {
441    if (splits.size() == 0) {
442      return splits;
443    }
444    List<InputSplit> resultList = new ArrayList<>();
445    long totalRegionSize = 0;
446    for (int i = 0; i < splits.size(); i++) {
447      TableSplit ts = (TableSplit) splits.get(i);
448      totalRegionSize += ts.getLength();
449    }
450    long averageRegionSize = totalRegionSize / splits.size();
451    // totalRegionSize might be overflow, and the averageRegionSize must be positive.
452    if (averageRegionSize <= 0) {
453      LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
454          "set it to Long.MAX_VALUE " + splits.size());
455      averageRegionSize = Long.MAX_VALUE / splits.size();
456    }
457    //if averageRegionSize is too big, change it to default as 1 GB,
458    if (averageRegionSize > maxAverageRegionSize) {
459      averageRegionSize = maxAverageRegionSize;
460    }
461    // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region
462    // set default as 16M = (default hdfs block size) / 4;
463    if (averageRegionSize < 16 * 1048576) {
464      return splits;
465    }
466    for (int i = 0; i < splits.size(); i++) {
467      TableSplit ts = (TableSplit) splits.get(i);
468      TableName tableName = ts.getTable();
469      String regionLocation = ts.getRegionLocation();
470      String encodedRegionName = ts.getEncodedRegionName();
471      long regionSize = ts.getLength();
472
473      if (regionSize >= averageRegionSize) {
474        // make this region as multiple MapReduce input split.
475        int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0);
476        List<InputSplit> temp = createNInputSplitsUniform(ts, n);
477        resultList.addAll(temp);
478      } else {
479        // if the total size of several small continuous regions less than the average region size,
480        // combine them into one MapReduce input split.
481        long totalSize = regionSize;
482        byte[] splitStartKey = ts.getStartRow();
483        byte[] splitEndKey = ts.getEndRow();
484        int j = i + 1;
485        while (j < splits.size()) {
486          TableSplit nextRegion = (TableSplit) splits.get(j);
487          long nextRegionSize = nextRegion.getLength();
488          if (totalSize + nextRegionSize <= averageRegionSize
489              && Bytes.equals(splitEndKey, nextRegion.getStartRow())) {
490            totalSize = totalSize + nextRegionSize;
491            splitEndKey = nextRegion.getEndRow();
492            j++;
493          } else {
494            break;
495          }
496        }
497        i = j - 1;
498        // In the table input format for single table we do not need to
499        // store the scan object in table split because it can be memory intensive and redundant
500        // information to what is already stored in conf SCAN. See HBASE-25212
501        TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation,
502            encodedRegionName, totalSize);
503        resultList.add(t);
504      }
505    }
506    return resultList;
507  }
508
509  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
510    String hostName = this.reverseDNSCacheMap.get(ipAddress);
511    if (hostName == null) {
512      String ipAddressString = null;
513      try {
514        ipAddressString = DNS.reverseDns(ipAddress, null);
515      } catch (Exception e) {
516        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
517        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
518        // reverse DNS using jndi doesn't work well with ipv6 addresses.
519        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
520      }
521      if (ipAddressString == null) {
522        throw new UnknownHostException("No host found for " + ipAddress);
523      }
524      hostName = Strings.domainNamePointerToHostName(ipAddressString);
525      this.reverseDNSCacheMap.put(ipAddress, hostName);
526    }
527    return hostName;
528  }
529
530  /**
531   * Test if the given region is to be included in the InputSplit while splitting
532   * the regions of a table.
533   * <p>
534   * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
535   * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
536   * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
537   * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
538   * <br>
539   * <br>
540   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
541   * <br>
542   * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
543   *
544   *
545   * @param startKey Start key of the region
546   * @param endKey End key of the region
547   * @return true, if this region needs to be included as part of the input (default).
548   *
549   */
550  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
551    return true;
552  }
553
554  /**
555   * Allows subclasses to get the {@link RegionLocator}.
556   */
557  protected RegionLocator getRegionLocator() {
558    if (regionLocator == null) {
559      throw new IllegalStateException(NOT_INITIALIZED);
560    }
561    return regionLocator;
562  }
563
564  /**
565   * Allows subclasses to get the {@link Table}.
566   */
567  protected Table getTable() {
568    if (table == null) {
569      throw new IllegalStateException(NOT_INITIALIZED);
570    }
571    return table;
572  }
573
574  /**
575   * Allows subclasses to get the {@link Admin}.
576   */
577  protected Admin getAdmin() {
578    if (admin == null) {
579      throw new IllegalStateException(NOT_INITIALIZED);
580    }
581    return admin;
582  }
583
584  /**
585   * Allows subclasses to initialize the table information.
586   *
587   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
588   * @param tableName  The {@link TableName} of the table to process.
589   * @throws IOException
590   */
591  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
592    if (this.table != null || this.connection != null) {
593      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
594          "reference; TableInputFormatBase will not close these old references when done.");
595    }
596    this.table = connection.getTable(tableName);
597    this.regionLocator = connection.getRegionLocator(tableName);
598    this.admin = connection.getAdmin();
599    this.connection = connection;
600  }
601
602  @InterfaceAudience.Private
603  protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
604      throws IOException {
605    return new RegionSizeCalculator(locator, admin);
606  }
607
608  /**
609   * Gets the scan defining the actual details like columns etc.
610   *
611   * @return The internal scan instance.
612   */
613  public Scan getScan() {
614    if (this.scan == null) this.scan = new Scan();
615    return scan;
616  }
617
618  /**
619   * Sets the scan defining the actual details like columns etc.
620   *
621   * @param scan  The scan to set.
622   */
623  public void setScan(Scan scan) {
624    this.scan = scan;
625  }
626
627  /**
628   * Allows subclasses to set the {@link TableRecordReader}.
629   *
630   * @param tableRecordReader A different {@link TableRecordReader}
631   *   implementation.
632   */
633  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
634    this.tableRecordReader = tableRecordReader;
635  }
636
637  /**
638   * Handle subclass specific set up.
639   * Each of the entry points used by the MapReduce framework,
640   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
641   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
642   * retrieving the necessary configuration information and calling
643   * {@link #initializeTable(Connection, TableName)}.
644   *
645   * Subclasses should implement their initialize call such that it is safe to call multiple times.
646   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
647   * if an initialize call is needed, but this behavior may change in the future. In particular,
648   * it is critical that initializeTable not be called multiple times since this will leak
649   * Connection instances.
650   *
651   */
652  protected void initialize(JobContext context) throws IOException {
653  }
654
655  /**
656   * Close the Table and related objects that were initialized via
657   * {@link #initializeTable(Connection, TableName)}.
658   *
659   * @throws IOException
660   */
661  protected void closeTable() throws IOException {
662    close(admin, table, regionLocator, connection);
663    admin = null;
664    table = null;
665    regionLocator = null;
666    connection = null;
667  }
668
669  private void close(Closeable... closables) throws IOException {
670    for (Closeable c : closables) {
671      if(c != null) { c.close(); }
672    }
673  }
674
675}