001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.UnknownHostException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.RegionLocator; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 039import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 040import org.apache.hadoop.hbase.util.Addressing; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Pair; 043import org.apache.hadoop.hbase.util.Strings; 044import org.apache.hadoop.mapreduce.InputFormat; 045import org.apache.hadoop.mapreduce.InputSplit; 046import org.apache.hadoop.mapreduce.JobContext; 047import org.apache.hadoop.mapreduce.RecordReader; 048import org.apache.hadoop.mapreduce.TaskAttemptContext; 049import org.apache.hadoop.net.DNS; 050import org.apache.hadoop.util.StringUtils; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, 057 * an {@link Scan} instance that defines the input columns etc. Subclasses may use 058 * other TableRecordReader implementations. 059 * 060 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to 061 * function properly. Each of the entry points to this class used by the MapReduce framework, 062 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 063 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 064 * retrieving the necessary configuration information. If your subclass overrides either of these 065 * methods, either call the parent version or call initialize yourself. 066 * 067 * <p> 068 * An example of a subclass: 069 * <pre> 070 * class ExampleTIF extends TableInputFormatBase { 071 * 072 * {@literal @}Override 073 * protected void initialize(JobContext context) throws IOException { 074 * // We are responsible for the lifecycle of this connection until we hand it over in 075 * // initializeTable. 076 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 077 * job.getConfiguration())); 078 * TableName tableName = TableName.valueOf("exampleTable"); 079 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 080 * initializeTable(connection, tableName); 081 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 082 * Bytes.toBytes("columnB") }; 083 * // optional, by default we'll get everything for the table. 084 * Scan scan = new Scan(); 085 * for (byte[] family : inputColumns) { 086 * scan.addFamily(family); 087 * } 088 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 089 * scan.setFilter(exampleFilter); 090 * setScan(scan); 091 * } 092 * } 093 * </pre> 094 * 095 * 096 * The number of InputSplits(mappers) match the number of regions in a table by default. 097 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set 098 * this property will disable autobalance below.\ 099 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers 100 * based on average region size; For regions, whose size larger than average region size may assigned 101 * more mappers, and for smaller one, they may group together to use one mapper. If actual average 102 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. 103 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", 104 * default mas average region size is 8G. 105 */ 106@InterfaceAudience.Public 107public abstract class TableInputFormatBase 108 extends InputFormat<ImmutableBytesWritable, Result> { 109 110 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 111 112 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + 113 "initialized. Ensure you call initializeTable either in your constructor or initialize " + 114 "method"; 115 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + 116 " previous error. Please look at the previous logs lines from" + 117 " the task's full log for more details."; 118 119 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 120 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 121 /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ 122 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 123 124 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 125 public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region"; 126 127 128 /** Holds the details for the internal scanner. 129 * 130 * @see Scan */ 131 private Scan scan = null; 132 /** The {@link Admin}. */ 133 private Admin admin; 134 /** The {@link Table} to scan. */ 135 private Table table; 136 /** The {@link RegionLocator} of the table. */ 137 private RegionLocator regionLocator; 138 /** The reader scanning the table, can be a custom one. */ 139 private TableRecordReader tableRecordReader = null; 140 /** The underlying {@link Connection} of the table. */ 141 private Connection connection; 142 143 144 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 145 private HashMap<InetAddress, String> reverseDNSCacheMap = 146 new HashMap<>(); 147 148 /** 149 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses 150 * the default. 151 * 152 * @param split The split to work with. 153 * @param context The current context. 154 * @return The newly created record reader. 155 * @throws IOException When creating the reader fails. 156 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 157 * org.apache.hadoop.mapreduce.InputSplit, 158 * org.apache.hadoop.mapreduce.TaskAttemptContext) 159 */ 160 @Override 161 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 162 InputSplit split, TaskAttemptContext context) 163 throws IOException { 164 // Just in case a subclass is relying on JobConfigurable magic. 165 if (table == null) { 166 initialize(context); 167 } 168 // null check in case our child overrides getTable to not throw. 169 try { 170 if (getTable() == null) { 171 // initialize() must not have been implemented in the subclass. 172 throw new IOException(INITIALIZATION_ERROR); 173 } 174 } catch (IllegalStateException exception) { 175 throw new IOException(INITIALIZATION_ERROR, exception); 176 } 177 TableSplit tSplit = (TableSplit) split; 178 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); 179 final TableRecordReader trr = 180 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 181 Scan sc = new Scan(this.scan); 182 sc.setStartRow(tSplit.getStartRow()); 183 sc.setStopRow(tSplit.getEndRow()); 184 trr.setScan(sc); 185 trr.setTable(getTable()); 186 return new RecordReader<ImmutableBytesWritable, Result>() { 187 188 @Override 189 public void close() throws IOException { 190 trr.close(); 191 closeTable(); 192 } 193 194 @Override 195 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 196 return trr.getCurrentKey(); 197 } 198 199 @Override 200 public Result getCurrentValue() throws IOException, InterruptedException { 201 return trr.getCurrentValue(); 202 } 203 204 @Override 205 public float getProgress() throws IOException, InterruptedException { 206 return trr.getProgress(); 207 } 208 209 @Override 210 public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, 211 InterruptedException { 212 trr.initialize(inputsplit, context); 213 } 214 215 @Override 216 public boolean nextKeyValue() throws IOException, InterruptedException { 217 return trr.nextKeyValue(); 218 } 219 }; 220 } 221 222 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { 223 return getRegionLocator().getStartEndKeys(); 224 } 225 226 /** 227 * Calculates the splits that will serve as input for the map tasks. 228 * @param context The current job context. 229 * @return The list of input splits. 230 * @throws IOException When creating the list of splits fails. 231 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 232 * org.apache.hadoop.mapreduce.JobContext) 233 */ 234 @Override 235 public List<InputSplit> getSplits(JobContext context) throws IOException { 236 boolean closeOnFinish = false; 237 238 // Just in case a subclass is relying on JobConfigurable magic. 239 if (table == null) { 240 initialize(context); 241 closeOnFinish = true; 242 } 243 244 // null check in case our child overrides getTable to not throw. 245 try { 246 if (getTable() == null) { 247 // initialize() must not have been implemented in the subclass. 248 throw new IOException(INITIALIZATION_ERROR); 249 } 250 } catch (IllegalStateException exception) { 251 throw new IOException(INITIALIZATION_ERROR, exception); 252 } 253 254 try { 255 List<InputSplit> splits = oneInputSplitPerRegion(); 256 257 // set same number of mappers for each region 258 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 259 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 260 List<InputSplit> res = new ArrayList<>(); 261 for (int i = 0; i < splits.size(); i++) { 262 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 263 res.addAll(tmp); 264 } 265 return res; 266 } 267 268 //The default value of "hbase.mapreduce.input.autobalance" is false. 269 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 270 long maxAveRegionSize = context.getConfiguration() 271 .getLong(MAX_AVERAGE_REGION_SIZE, 8L*1073741824); //8GB 272 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 273 } 274 275 // return one mapper per region 276 return splits; 277 } finally { 278 if (closeOnFinish) { 279 closeTable(); 280 } 281 } 282 } 283 284 /** 285 * Create one InputSplit per region 286 * 287 * @return The list of InputSplit for all the regions 288 * @throws IOException throws IOException 289 */ 290 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 291 RegionSizeCalculator sizeCalculator = 292 createRegionSizeCalculator(getRegionLocator(), getAdmin()); 293 294 TableName tableName = getTable().getName(); 295 296 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 297 if (keys == null || keys.getFirst() == null || 298 keys.getFirst().length == 0) { 299 HRegionLocation regLoc = 300 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 301 if (null == regLoc) { 302 throw new IOException("Expecting at least one region."); 303 } 304 List<InputSplit> splits = new ArrayList<>(1); 305 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); 306 // In the table input format for single table we do not need to 307 // store the scan object in table split because it can be memory intensive and redundant 308 // information to what is already stored in conf SCAN. See HBASE-25212 309 TableSplit split = new TableSplit(tableName, null, 310 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc 311 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 312 splits.add(split); 313 return splits; 314 } 315 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 316 for (int i = 0; i < keys.getFirst().length; i++) { 317 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 318 continue; 319 } 320 321 byte[] startRow = scan.getStartRow(); 322 byte[] stopRow = scan.getStopRow(); 323 // determine if the given start an stop key fall into the region 324 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || 325 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && 326 (stopRow.length == 0 || 327 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { 328 byte[] splitStart = startRow.length == 0 || 329 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 330 keys.getFirst()[i] : startRow; 331 byte[] splitStop = (stopRow.length == 0 || 332 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && 333 keys.getSecond()[i].length > 0 ? 334 keys.getSecond()[i] : stopRow; 335 336 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 337 // The below InetSocketAddress creation does a name resolution. 338 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 339 if (isa.isUnresolved()) { 340 LOG.warn("Failed resolve " + isa); 341 } 342 InetAddress regionAddress = isa.getAddress(); 343 String regionLocation; 344 regionLocation = reverseDNS(regionAddress); 345 346 byte[] regionName = location.getRegionInfo().getRegionName(); 347 String encodedRegionName = location.getRegionInfo().getEncodedName(); 348 long regionSize = sizeCalculator.getRegionSize(regionName); 349 // In the table input format for single table we do not need to 350 // store the scan object in table split because it can be memory intensive and redundant 351 // information to what is already stored in conf SCAN. See HBASE-25212 352 TableSplit split = new TableSplit(tableName, null, 353 splitStart, splitStop, regionLocation, encodedRegionName, regionSize); 354 splits.add(split); 355 if (LOG.isDebugEnabled()) { 356 LOG.debug("getSplits: split -> " + i + " -> " + split); 357 } 358 } 359 } 360 return splits; 361 } 362 363 /** 364 * Create n splits for one InputSplit, For now only support uniform distribution 365 * @param split A TableSplit corresponding to a range of rowkeys 366 * @param n Number of ranges after splitting. Pass 1 means no split for the range 367 * Pass 2 if you want to split the range in two; 368 * @return A list of TableSplit, the size of the list is n 369 * @throws IllegalArgumentIOException throws IllegalArgumentIOException 370 */ 371 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 372 throws IllegalArgumentIOException { 373 if (split == null || !(split instanceof TableSplit)) { 374 throw new IllegalArgumentIOException( 375 "InputSplit for CreateNSplitsPerRegion can not be null + " 376 + "and should be instance of TableSplit"); 377 } 378 //if n < 1, then still continue using n = 1 379 n = n < 1 ? 1 : n; 380 List<InputSplit> res = new ArrayList<>(n); 381 if (n == 1) { 382 res.add(split); 383 return res; 384 } 385 386 // Collect Region related information 387 TableSplit ts = (TableSplit) split; 388 TableName tableName = ts.getTable(); 389 String regionLocation = ts.getRegionLocation(); 390 String encodedRegionName = ts.getEncodedRegionName(); 391 long regionSize = ts.getLength(); 392 byte[] startRow = ts.getStartRow(); 393 byte[] endRow = ts.getEndRow(); 394 395 // For special case: startRow or endRow is empty 396 if (startRow.length == 0 && endRow.length == 0){ 397 startRow = new byte[1]; 398 endRow = new byte[1]; 399 startRow[0] = 0; 400 endRow[0] = -1; 401 } 402 if (startRow.length == 0 && endRow.length != 0){ 403 startRow = new byte[1]; 404 startRow[0] = 0; 405 } 406 if (startRow.length != 0 && endRow.length == 0){ 407 endRow =new byte[startRow.length]; 408 for (int k = 0; k < startRow.length; k++){ 409 endRow[k] = -1; 410 } 411 } 412 413 // Split Region into n chunks evenly 414 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); 415 for (int i = 0; i < splitKeys.length - 1; i++) { 416 // In the table input format for single table we do not need to 417 // store the scan object in table split because it can be memory intensive and redundant 418 // information to what is already stored in conf SCAN. See HBASE-25212 419 //notice that the regionSize parameter may be not very accurate 420 TableSplit tsplit = 421 new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation, 422 encodedRegionName, regionSize / n); 423 res.add(tsplit); 424 } 425 return res; 426 } 427 /** 428 * Calculates the number of MapReduce input splits for the map tasks. The number of 429 * MapReduce input splits depends on the average region size. 430 * Make it 'public' for testing 431 * 432 * @param splits The list of input splits before balance. 433 * @param maxAverageRegionSize max Average region size for one mapper 434 * @return The list of input splits. 435 * @throws IOException When creating the list of splits fails. 436 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 437 *org.apache.hadoop.mapreduce.JobContext) 438 */ 439 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize) 440 throws IOException { 441 if (splits.size() == 0) { 442 return splits; 443 } 444 List<InputSplit> resultList = new ArrayList<>(); 445 long totalRegionSize = 0; 446 for (int i = 0; i < splits.size(); i++) { 447 TableSplit ts = (TableSplit) splits.get(i); 448 totalRegionSize += ts.getLength(); 449 } 450 long averageRegionSize = totalRegionSize / splits.size(); 451 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 452 if (averageRegionSize <= 0) { 453 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + 454 "set it to Long.MAX_VALUE " + splits.size()); 455 averageRegionSize = Long.MAX_VALUE / splits.size(); 456 } 457 //if averageRegionSize is too big, change it to default as 1 GB, 458 if (averageRegionSize > maxAverageRegionSize) { 459 averageRegionSize = maxAverageRegionSize; 460 } 461 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region 462 // set default as 16M = (default hdfs block size) / 4; 463 if (averageRegionSize < 16 * 1048576) { 464 return splits; 465 } 466 for (int i = 0; i < splits.size(); i++) { 467 TableSplit ts = (TableSplit) splits.get(i); 468 TableName tableName = ts.getTable(); 469 String regionLocation = ts.getRegionLocation(); 470 String encodedRegionName = ts.getEncodedRegionName(); 471 long regionSize = ts.getLength(); 472 473 if (regionSize >= averageRegionSize) { 474 // make this region as multiple MapReduce input split. 475 int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 476 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 477 resultList.addAll(temp); 478 } else { 479 // if the total size of several small continuous regions less than the average region size, 480 // combine them into one MapReduce input split. 481 long totalSize = regionSize; 482 byte[] splitStartKey = ts.getStartRow(); 483 byte[] splitEndKey = ts.getEndRow(); 484 int j = i + 1; 485 while (j < splits.size()) { 486 TableSplit nextRegion = (TableSplit) splits.get(j); 487 long nextRegionSize = nextRegion.getLength(); 488 if (totalSize + nextRegionSize <= averageRegionSize 489 && Bytes.equals(splitEndKey, nextRegion.getStartRow())) { 490 totalSize = totalSize + nextRegionSize; 491 splitEndKey = nextRegion.getEndRow(); 492 j++; 493 } else { 494 break; 495 } 496 } 497 i = j - 1; 498 // In the table input format for single table we do not need to 499 // store the scan object in table split because it can be memory intensive and redundant 500 // information to what is already stored in conf SCAN. See HBASE-25212 501 TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation, 502 encodedRegionName, totalSize); 503 resultList.add(t); 504 } 505 } 506 return resultList; 507 } 508 509 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 510 String hostName = this.reverseDNSCacheMap.get(ipAddress); 511 if (hostName == null) { 512 String ipAddressString = null; 513 try { 514 ipAddressString = DNS.reverseDns(ipAddress, null); 515 } catch (Exception e) { 516 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 517 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 518 // reverse DNS using jndi doesn't work well with ipv6 addresses. 519 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 520 } 521 if (ipAddressString == null) { 522 throw new UnknownHostException("No host found for " + ipAddress); 523 } 524 hostName = Strings.domainNamePointerToHostName(ipAddressString); 525 this.reverseDNSCacheMap.put(ipAddress, hostName); 526 } 527 return hostName; 528 } 529 530 /** 531 * Test if the given region is to be included in the InputSplit while splitting 532 * the regions of a table. 533 * <p> 534 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, 535 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> 536 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, 537 * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. 538 * <br> 539 * <br> 540 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 541 * <br> 542 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included). 543 * 544 * 545 * @param startKey Start key of the region 546 * @param endKey End key of the region 547 * @return true, if this region needs to be included as part of the input (default). 548 * 549 */ 550 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { 551 return true; 552 } 553 554 /** 555 * Allows subclasses to get the {@link RegionLocator}. 556 */ 557 protected RegionLocator getRegionLocator() { 558 if (regionLocator == null) { 559 throw new IllegalStateException(NOT_INITIALIZED); 560 } 561 return regionLocator; 562 } 563 564 /** 565 * Allows subclasses to get the {@link Table}. 566 */ 567 protected Table getTable() { 568 if (table == null) { 569 throw new IllegalStateException(NOT_INITIALIZED); 570 } 571 return table; 572 } 573 574 /** 575 * Allows subclasses to get the {@link Admin}. 576 */ 577 protected Admin getAdmin() { 578 if (admin == null) { 579 throw new IllegalStateException(NOT_INITIALIZED); 580 } 581 return admin; 582 } 583 584 /** 585 * Allows subclasses to initialize the table information. 586 * 587 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 588 * @param tableName The {@link TableName} of the table to process. 589 * @throws IOException 590 */ 591 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 592 if (this.table != null || this.connection != null) { 593 LOG.warn("initializeTable called multiple times. Overwriting connection and table " + 594 "reference; TableInputFormatBase will not close these old references when done."); 595 } 596 this.table = connection.getTable(tableName); 597 this.regionLocator = connection.getRegionLocator(tableName); 598 this.admin = connection.getAdmin(); 599 this.connection = connection; 600 } 601 602 @InterfaceAudience.Private 603 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 604 throws IOException { 605 return new RegionSizeCalculator(locator, admin); 606 } 607 608 /** 609 * Gets the scan defining the actual details like columns etc. 610 * 611 * @return The internal scan instance. 612 */ 613 public Scan getScan() { 614 if (this.scan == null) this.scan = new Scan(); 615 return scan; 616 } 617 618 /** 619 * Sets the scan defining the actual details like columns etc. 620 * 621 * @param scan The scan to set. 622 */ 623 public void setScan(Scan scan) { 624 this.scan = scan; 625 } 626 627 /** 628 * Allows subclasses to set the {@link TableRecordReader}. 629 * 630 * @param tableRecordReader A different {@link TableRecordReader} 631 * implementation. 632 */ 633 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 634 this.tableRecordReader = tableRecordReader; 635 } 636 637 /** 638 * Handle subclass specific set up. 639 * Each of the entry points used by the MapReduce framework, 640 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 641 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 642 * retrieving the necessary configuration information and calling 643 * {@link #initializeTable(Connection, TableName)}. 644 * 645 * Subclasses should implement their initialize call such that it is safe to call multiple times. 646 * The current TableInputFormatBase implementation relies on a non-null table reference to decide 647 * if an initialize call is needed, but this behavior may change in the future. In particular, 648 * it is critical that initializeTable not be called multiple times since this will leak 649 * Connection instances. 650 * 651 */ 652 protected void initialize(JobContext context) throws IOException { 653 } 654 655 /** 656 * Close the Table and related objects that were initialized via 657 * {@link #initializeTable(Connection, TableName)}. 658 * 659 * @throws IOException 660 */ 661 protected void closeTable() throws IOException { 662 close(admin, table, regionLocator, connection); 663 admin = null; 664 table = null; 665 regionLocator = null; 666 connection = null; 667 } 668 669 private void close(Closeable... closables) throws IOException { 670 for (Closeable c : closables) { 671 if(c != null) { c.close(); } 672 } 673 } 674 675}