001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.RegionLocator; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Addressing; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.Strings; 043import org.apache.hadoop.mapreduce.InputFormat; 044import org.apache.hadoop.mapreduce.InputSplit; 045import org.apache.hadoop.mapreduce.JobContext; 046import org.apache.hadoop.mapreduce.RecordReader; 047import org.apache.hadoop.mapreduce.TaskAttemptContext; 048import org.apache.hadoop.net.DNS; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, an 056 * {@link Scan} instance that defines the input columns etc. Subclasses may use other 057 * TableRecordReader implementations. Subclasses MUST ensure initializeTable(Connection, TableName) 058 * is called for an instance to function properly. Each of the entry points to this class used by 059 * the MapReduce framework, {@link #createRecordReader(InputSplit, TaskAttemptContext)} and 060 * {@link #getSplits(JobContext)}, will call {@link #initialize(JobContext)} as a convenient 061 * centralized location to handle retrieving the necessary configuration information. If your 062 * subclass overrides either of these methods, either call the parent version or call initialize 063 * yourself. 064 * <p> 065 * An example of a subclass: 066 * 067 * <pre> 068 * class ExampleTIF extends TableInputFormatBase { 069 * 070 * {@literal @}Override 071 * protected void initialize(JobContext context) throws IOException { 072 * // We are responsible for the lifecycle of this connection until we hand it over in 073 * // initializeTable. 074 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 075 * job.getConfiguration())); 076 * TableName tableName = TableName.valueOf("exampleTable"); 077 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 078 * initializeTable(connection, tableName); 079 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 080 * Bytes.toBytes("columnB") }; 081 * // optional, by default we'll get everything for the table. 082 * Scan scan = new Scan(); 083 * for (byte[] family : inputColumns) { 084 * scan.addFamily(family); 085 * } 086 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 087 * scan.setFilter(exampleFilter); 088 * setScan(scan); 089 * } 090 * } 091 * </pre> 092 * 093 * The number of InputSplits(mappers) match the number of regions in a table by default. Set 094 * "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this 095 * property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable 096 * autobalance, hbase will assign mappers based on average region size; For regions, whose size 097 * larger than average region size may assigned more mappers, and for smaller one, they may group 098 * together to use one mapper. If actual average region size is too big, like 50G, it is not good to 099 * only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max 100 * average region size when enable "autobalanece", default mas average region size is 8G. 101 */ 102@InterfaceAudience.Public 103public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result> { 104 105 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 106 107 private static final String NOT_INITIALIZED = "The input format instance has not been properly " 108 + "initialized. Ensure you call initializeTable either in your constructor or initialize " 109 + "method"; 110 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" 111 + " previous error. Please look at the previous logs lines from" 112 + " the task's full log for more details."; 113 114 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 115 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 116 /** 117 * In auto-balance, we split input by ave region size, if calculated region size is too big, we 118 * can set it. 119 */ 120 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 121 122 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 123 public static final String NUM_MAPPERS_PER_REGION = 124 "hbase.mapreduce.tableinput.mappers.per.region"; 125 126 /** 127 * Holds the details for the internal scanner. 128 * @see Scan 129 */ 130 private Scan scan = null; 131 /** The {@link Admin}. */ 132 private Admin admin; 133 /** The {@link Table} to scan. */ 134 private Table table; 135 /** The {@link RegionLocator} of the table. */ 136 private RegionLocator regionLocator; 137 /** The reader scanning the table, can be a custom one. */ 138 private TableRecordReader tableRecordReader = null; 139 /** The underlying {@link Connection} of the table. */ 140 private Connection connection; 141 /** Used to generate splits based on region size. */ 142 private RegionSizeCalculator regionSizeCalculator; 143 144 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 145 private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); 146 147 /** 148 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses the 149 * default. 150 * @param split The split to work with. 151 * @param context The current context. 152 * @return The newly created record reader. 153 * @throws IOException When creating the reader fails. 154 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 155 * org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) 156 */ 157 @Override 158 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 159 TaskAttemptContext context) throws IOException { 160 // Just in case a subclass is relying on JobConfigurable magic. 161 if (table == null) { 162 initialize(context); 163 } 164 // null check in case our child overrides getTable to not throw. 165 try { 166 if (getTable() == null) { 167 // initialize() must not have been implemented in the subclass. 168 throw new IOException(INITIALIZATION_ERROR); 169 } 170 } catch (IllegalStateException exception) { 171 throw new IOException(INITIALIZATION_ERROR, exception); 172 } 173 TableSplit tSplit = (TableSplit) split; 174 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); 175 final TableRecordReader trr = 176 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 177 Scan sc = new Scan(this.scan); 178 sc.withStartRow(tSplit.getStartRow()); 179 sc.withStopRow(tSplit.getEndRow()); 180 trr.setScan(sc); 181 trr.setTable(getTable()); 182 return new RecordReader<ImmutableBytesWritable, Result>() { 183 184 @Override 185 public void close() throws IOException { 186 trr.close(); 187 closeTable(); 188 } 189 190 @Override 191 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 192 return trr.getCurrentKey(); 193 } 194 195 @Override 196 public Result getCurrentValue() throws IOException, InterruptedException { 197 return trr.getCurrentValue(); 198 } 199 200 @Override 201 public float getProgress() throws IOException, InterruptedException { 202 return trr.getProgress(); 203 } 204 205 @Override 206 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 207 throws IOException, InterruptedException { 208 trr.initialize(inputsplit, context); 209 } 210 211 @Override 212 public boolean nextKeyValue() throws IOException, InterruptedException { 213 return trr.nextKeyValue(); 214 } 215 }; 216 } 217 218 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { 219 return getRegionLocator().getStartEndKeys(); 220 } 221 222 /** 223 * Calculates the splits that will serve as input for the map tasks. 224 * @param context The current job context. 225 * @return The list of input splits. 226 * @throws IOException When creating the list of splits fails. 227 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 228 */ 229 @Override 230 public List<InputSplit> getSplits(JobContext context) throws IOException { 231 boolean closeOnFinish = false; 232 233 // Just in case a subclass is relying on JobConfigurable magic. 234 if (table == null) { 235 initialize(context); 236 closeOnFinish = true; 237 } 238 239 // null check in case our child overrides getTable to not throw. 240 try { 241 if (getTable() == null) { 242 // initialize() must not have been implemented in the subclass. 243 throw new IOException(INITIALIZATION_ERROR); 244 } 245 } catch (IllegalStateException exception) { 246 throw new IOException(INITIALIZATION_ERROR, exception); 247 } 248 249 try { 250 List<InputSplit> splits = oneInputSplitPerRegion(); 251 252 // set same number of mappers for each region 253 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 254 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 255 List<InputSplit> res = new ArrayList<>(); 256 for (int i = 0; i < splits.size(); i++) { 257 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 258 res.addAll(tmp); 259 } 260 return res; 261 } 262 263 // The default value of "hbase.mapreduce.input.autobalance" is false. 264 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 265 long maxAveRegionSize = 266 context.getConfiguration().getLong(MAX_AVERAGE_REGION_SIZE, 8L * 1073741824); // 8GB 267 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 268 } 269 270 // return one mapper per region 271 return splits; 272 } finally { 273 if (closeOnFinish) { 274 closeTable(); 275 } 276 } 277 } 278 279 /** 280 * Create one InputSplit per region 281 * @return The list of InputSplit for all the regions 282 * @throws IOException throws IOException 283 */ 284 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 285 if (regionSizeCalculator == null) { 286 // Initialize here rather than with the other resources because this involves 287 // a full scan of meta, which can be heavy. We might as well only do it if/when necessary. 288 regionSizeCalculator = createRegionSizeCalculator(getRegionLocator(), getAdmin()); 289 } 290 291 TableName tableName = getTable().getName(); 292 293 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 294 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { 295 HRegionLocation regLoc = 296 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 297 if (null == regLoc) { 298 throw new IOException("Expecting at least one region."); 299 } 300 List<InputSplit> splits = new ArrayList<>(1); 301 long regionSize = regionSizeCalculator.getRegionSize(regLoc.getRegion().getRegionName()); 302 // In the table input format for single table we do not need to 303 // store the scan object in table split because it can be memory intensive and redundant 304 // information to what is already stored in conf SCAN. See HBASE-25212 305 TableSplit split = 306 new TableSplit(tableName, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, 307 regLoc.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 308 splits.add(split); 309 return splits; 310 } 311 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 312 for (int i = 0; i < keys.getFirst().length; i++) { 313 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 314 continue; 315 } 316 317 byte[] startRow = scan.getStartRow(); 318 byte[] stopRow = scan.getStopRow(); 319 // determine if the given start an stop key fall into the region 320 if ( 321 (startRow.length == 0 || keys.getSecond()[i].length == 0 322 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) 323 && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0) 324 ) { 325 byte[] splitStart = 326 startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 327 ? keys.getFirst()[i] 328 : startRow; 329 byte[] splitStop = 330 (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) 331 && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; 332 333 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 334 // The below InetSocketAddress creation does a name resolution. 335 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 336 if (isa.isUnresolved()) { 337 LOG.warn("Failed resolve " + isa); 338 } 339 InetAddress regionAddress = isa.getAddress(); 340 String regionLocation; 341 regionLocation = reverseDNS(regionAddress); 342 343 byte[] regionName = location.getRegion().getRegionName(); 344 String encodedRegionName = location.getRegion().getEncodedName(); 345 long regionSize = regionSizeCalculator.getRegionSize(regionName); 346 // In the table input format for single table we do not need to 347 // store the scan object in table split because it can be memory intensive and redundant 348 // information to what is already stored in conf SCAN. See HBASE-25212 349 TableSplit split = new TableSplit(tableName, null, splitStart, splitStop, regionLocation, 350 encodedRegionName, regionSize); 351 splits.add(split); 352 if (LOG.isDebugEnabled()) { 353 LOG.debug("getSplits: split -> " + i + " -> " + split); 354 } 355 } 356 } 357 return splits; 358 } 359 360 /** 361 * Create n splits for one InputSplit, For now only support uniform distribution 362 * @param split A TableSplit corresponding to a range of rowkeys 363 * @param n Number of ranges after splitting. Pass 1 means no split for the range Pass 2 if 364 * you want to split the range in two; 365 * @return A list of TableSplit, the size of the list is {@code n} 366 */ 367 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 368 throws IllegalArgumentIOException { 369 if (split == null || !(split instanceof TableSplit)) { 370 throw new IllegalArgumentIOException( 371 "InputSplit for CreateNSplitsPerRegion can not be null + " 372 + "and should be instance of TableSplit"); 373 } 374 // if n < 1, then still continue using n = 1 375 n = n < 1 ? 1 : n; 376 List<InputSplit> res = new ArrayList<>(n); 377 if (n == 1) { 378 res.add(split); 379 return res; 380 } 381 382 // Collect Region related information 383 TableSplit ts = (TableSplit) split; 384 TableName tableName = ts.getTable(); 385 String regionLocation = ts.getRegionLocation(); 386 String encodedRegionName = ts.getEncodedRegionName(); 387 long regionSize = ts.getLength(); 388 byte[] startRow = ts.getStartRow(); 389 byte[] endRow = ts.getEndRow(); 390 391 // For special case: startRow or endRow is empty 392 if (startRow.length == 0 && endRow.length == 0) { 393 startRow = new byte[1]; 394 endRow = new byte[1]; 395 startRow[0] = 0; 396 endRow[0] = -1; 397 } 398 if (startRow.length == 0 && endRow.length != 0) { 399 startRow = new byte[1]; 400 startRow[0] = 0; 401 } 402 if (startRow.length != 0 && endRow.length == 0) { 403 endRow = new byte[startRow.length]; 404 for (int k = 0; k < startRow.length; k++) { 405 endRow[k] = -1; 406 } 407 } 408 409 // Split Region into n chunks evenly 410 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n - 1); 411 for (int i = 0; i < splitKeys.length - 1; i++) { 412 // In the table input format for single table we do not need to 413 // store the scan object in table split because it can be memory intensive and redundant 414 // information to what is already stored in conf SCAN. See HBASE-25212 415 // notice that the regionSize parameter may be not very accurate 416 TableSplit tsplit = new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], 417 regionLocation, encodedRegionName, regionSize / n); 418 res.add(tsplit); 419 } 420 return res; 421 } 422 423 /** 424 * Calculates the number of MapReduce input splits for the map tasks. The number of MapReduce 425 * input splits depends on the average region size. Make it 'public' for testing 426 * @param splits The list of input splits before balance. 427 * @param maxAverageRegionSize max Average region size for one mapper 428 * @return The list of input splits. 429 * @throws IOException When creating the list of splits fails. 430 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 431 */ 432 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, 433 long maxAverageRegionSize) throws IOException { 434 if (splits.size() == 0) { 435 return splits; 436 } 437 List<InputSplit> resultList = new ArrayList<>(); 438 long totalRegionSize = 0; 439 for (int i = 0; i < splits.size(); i++) { 440 TableSplit ts = (TableSplit) splits.get(i); 441 totalRegionSize += ts.getLength(); 442 } 443 long averageRegionSize = totalRegionSize / splits.size(); 444 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 445 if (averageRegionSize <= 0) { 446 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " 447 + "set it to Long.MAX_VALUE " + splits.size()); 448 averageRegionSize = Long.MAX_VALUE / splits.size(); 449 } 450 // if averageRegionSize is too big, change it to default as 1 GB, 451 if (averageRegionSize > maxAverageRegionSize) { 452 averageRegionSize = maxAverageRegionSize; 453 } 454 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' 455 // region 456 // set default as 16M = (default hdfs block size) / 4; 457 if (averageRegionSize < 16 * 1048576) { 458 return splits; 459 } 460 for (int i = 0; i < splits.size(); i++) { 461 TableSplit ts = (TableSplit) splits.get(i); 462 TableName tableName = ts.getTable(); 463 String regionLocation = ts.getRegionLocation(); 464 String encodedRegionName = ts.getEncodedRegionName(); 465 long regionSize = ts.getLength(); 466 467 if (regionSize >= averageRegionSize) { 468 // make this region as multiple MapReduce input split. 469 int n = 470 (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 471 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 472 resultList.addAll(temp); 473 } else { 474 // if the total size of several small continuous regions less than the average region size, 475 // combine them into one MapReduce input split. 476 long totalSize = regionSize; 477 byte[] splitStartKey = ts.getStartRow(); 478 byte[] splitEndKey = ts.getEndRow(); 479 int j = i + 1; 480 while (j < splits.size()) { 481 TableSplit nextRegion = (TableSplit) splits.get(j); 482 long nextRegionSize = nextRegion.getLength(); 483 if ( 484 totalSize + nextRegionSize <= averageRegionSize 485 && Bytes.equals(splitEndKey, nextRegion.getStartRow()) 486 ) { 487 totalSize = totalSize + nextRegionSize; 488 splitEndKey = nextRegion.getEndRow(); 489 j++; 490 } else { 491 break; 492 } 493 } 494 i = j - 1; 495 // In the table input format for single table we do not need to 496 // store the scan object in table split because it can be memory intensive and redundant 497 // information to what is already stored in conf SCAN. See HBASE-25212 498 TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation, 499 encodedRegionName, totalSize); 500 resultList.add(t); 501 } 502 } 503 return resultList; 504 } 505 506 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 507 String hostName = this.reverseDNSCacheMap.get(ipAddress); 508 if (hostName == null) { 509 String ipAddressString = null; 510 try { 511 ipAddressString = DNS.reverseDns(ipAddress, null); 512 } catch (Exception e) { 513 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 514 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 515 // reverse DNS using jndi doesn't work well with ipv6 addresses. 516 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 517 } 518 if (ipAddressString == null) { 519 throw new UnknownHostException("No host found for " + ipAddress); 520 } 521 hostName = Strings.domainNamePointerToHostName(ipAddressString); 522 this.reverseDNSCacheMap.put(ipAddress, hostName); 523 } 524 return hostName; 525 } 526 527 /** 528 * Test if the given region is to be included in the InputSplit while splitting the regions of a 529 * table. 530 * <p> 531 * This optimization is effective when there is a specific reasoning to exclude an entire region 532 * from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys 533 * of the same. <br> 534 * Useful when we need to remember the last-processed top record and revisit the [last, current) 535 * interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the 536 * load on the region server as well, due to the ordering of the keys. <br> 537 * <br> 538 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 539 * <br> 540 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no 541 * region is excluded( i.e. all regions are included). 542 * @param startKey Start key of the region 543 * @param endKey End key of the region 544 * @return true, if this region needs to be included as part of the input (default). 545 */ 546 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 547 return true; 548 } 549 550 /** 551 * Allows subclasses to get the {@link RegionLocator}. 552 */ 553 protected RegionLocator getRegionLocator() { 554 if (regionLocator == null) { 555 throw new IllegalStateException(NOT_INITIALIZED); 556 } 557 return regionLocator; 558 } 559 560 /** 561 * Allows subclasses to get the {@link Table}. 562 */ 563 protected Table getTable() { 564 if (table == null) { 565 throw new IllegalStateException(NOT_INITIALIZED); 566 } 567 return table; 568 } 569 570 /** 571 * Allows subclasses to get the {@link Admin}. 572 */ 573 protected Admin getAdmin() { 574 if (admin == null) { 575 throw new IllegalStateException(NOT_INITIALIZED); 576 } 577 return admin; 578 } 579 580 /** 581 * Allows subclasses to initialize the table information. 582 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 583 * @param tableName The {@link TableName} of the table to process. 584 */ 585 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 586 if (this.table != null || this.connection != null) { 587 LOG.warn("initializeTable called multiple times. Overwriting connection and table " 588 + "reference; TableInputFormatBase will not close these old references when done."); 589 } 590 this.table = connection.getTable(tableName); 591 this.regionLocator = connection.getRegionLocator(tableName); 592 this.admin = connection.getAdmin(); 593 this.connection = connection; 594 this.regionSizeCalculator = null; 595 } 596 597 @InterfaceAudience.Private 598 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 599 throws IOException { 600 return new RegionSizeCalculator(locator, admin); 601 } 602 603 /** 604 * Gets the scan defining the actual details like columns etc. 605 * @return The internal scan instance. 606 */ 607 public Scan getScan() { 608 if (this.scan == null) this.scan = new Scan(); 609 return scan; 610 } 611 612 /** 613 * Sets the scan defining the actual details like columns etc. 614 * @param scan The scan to set. 615 */ 616 public void setScan(Scan scan) { 617 this.scan = scan; 618 } 619 620 /** 621 * Allows subclasses to set the {@link TableRecordReader}. 622 * @param tableRecordReader A different {@link TableRecordReader} implementation. 623 */ 624 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 625 this.tableRecordReader = tableRecordReader; 626 } 627 628 /** 629 * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, 630 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 631 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 632 * retrieving the necessary configuration information and calling 633 * {@link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize 634 * call such that it is safe to call multiple times. The current TableInputFormatBase 635 * implementation relies on a non-null table reference to decide if an initialize call is needed, 636 * but this behavior may change in the future. In particular, it is critical that initializeTable 637 * not be called multiple times since this will leak Connection instances. 638 */ 639 protected void initialize(JobContext context) throws IOException { 640 } 641 642 /** 643 * Close the Table and related objects that were initialized via 644 * {@link #initializeTable(Connection, TableName)}. 645 */ 646 protected void closeTable() throws IOException { 647 close(admin, table, regionLocator, connection); 648 admin = null; 649 table = null; 650 regionLocator = null; 651 connection = null; 652 regionSizeCalculator = null; 653 } 654 655 private void close(Closeable... closables) throws IOException { 656 for (Closeable c : closables) { 657 if (c != null) { 658 c.close(); 659 } 660 } 661 } 662 663}