001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.RegionLocator; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Addressing; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.Strings; 043import org.apache.hadoop.mapreduce.InputFormat; 044import org.apache.hadoop.mapreduce.InputSplit; 045import org.apache.hadoop.mapreduce.JobContext; 046import org.apache.hadoop.mapreduce.RecordReader; 047import org.apache.hadoop.mapreduce.TaskAttemptContext; 048import org.apache.hadoop.net.DNS; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, an 056 * {@link Scan} instance that defines the input columns etc. Subclasses may use other 057 * TableRecordReader implementations. Subclasses MUST ensure initializeTable(Connection, TableName) 058 * is called for an instance to function properly. Each of the entry points to this class used by 059 * the MapReduce framework, {@link #createRecordReader(InputSplit, TaskAttemptContext)} and 060 * {@link #getSplits(JobContext)}, will call {@link #initialize(JobContext)} as a convenient 061 * centralized location to handle retrieving the necessary configuration information. If your 062 * subclass overrides either of these methods, either call the parent version or call initialize 063 * yourself. 064 * <p> 065 * An example of a subclass: 066 * 067 * <pre> 068 * class ExampleTIF extends TableInputFormatBase { 069 * 070 * {@literal @}Override 071 * protected void initialize(JobContext context) throws IOException { 072 * // We are responsible for the lifecycle of this connection until we hand it over in 073 * // initializeTable. 074 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 075 * job.getConfiguration())); 076 * TableName tableName = TableName.valueOf("exampleTable"); 077 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 078 * initializeTable(connection, tableName); 079 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 080 * Bytes.toBytes("columnB") }; 081 * // optional, by default we'll get everything for the table. 082 * Scan scan = new Scan(); 083 * for (byte[] family : inputColumns) { 084 * scan.addFamily(family); 085 * } 086 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 087 * scan.setFilter(exampleFilter); 088 * setScan(scan); 089 * } 090 * } 091 * </pre> 092 * 093 * The number of InputSplits(mappers) match the number of regions in a table by default. Set 094 * "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this 095 * property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable 096 * autobalance, hbase will assign mappers based on average region size; For regions, whose size 097 * larger than average region size may assigned more mappers, and for smaller one, they may group 098 * together to use one mapper. If actual average region size is too big, like 50G, it is not good to 099 * only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max 100 * average region size when enable "autobalanece", default mas average region size is 8G. 101 */ 102@InterfaceAudience.Public 103public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result> { 104 105 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 106 107 private static final String NOT_INITIALIZED = "The input format instance has not been properly " 108 + "initialized. Ensure you call initializeTable either in your constructor or initialize " 109 + "method"; 110 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" 111 + " previous error. Please look at the previous logs lines from" 112 + " the task's full log for more details."; 113 114 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 115 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 116 /** 117 * In auto-balance, we split input by ave region size, if calculated region size is too big, we 118 * can set it. 119 */ 120 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 121 122 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 123 public static final String NUM_MAPPERS_PER_REGION = 124 "hbase.mapreduce.tableinput.mappers.per.region"; 125 126 /** 127 * Holds the details for the internal scanner. 128 * @see Scan 129 */ 130 private Scan scan = null; 131 /** The {@link Admin}. */ 132 private Admin admin; 133 /** The {@link Table} to scan. */ 134 private Table table; 135 /** The {@link RegionLocator} of the table. */ 136 private RegionLocator regionLocator; 137 /** The reader scanning the table, can be a custom one. */ 138 private TableRecordReader tableRecordReader = null; 139 /** The underlying {@link Connection} of the table. */ 140 private Connection connection; 141 /** Used to generate splits based on region size. */ 142 private RegionSizeCalculator regionSizeCalculator; 143 144 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 145 private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); 146 147 /** 148 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses the 149 * default. 150 * @param split The split to work with. 151 * @param context The current context. 152 * @return The newly created record reader. 153 * @throws IOException When creating the reader fails. 154 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 155 * org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) 156 */ 157 @Override 158 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 159 TaskAttemptContext context) throws IOException { 160 // Just in case a subclass is relying on JobConfigurable magic. 161 if (table == null) { 162 initialize(context); 163 } 164 // null check in case our child overrides getTable to not throw. 165 try { 166 if (getTable() == null) { 167 // initialize() must not have been implemented in the subclass. 168 throw new IOException(INITIALIZATION_ERROR); 169 } 170 } catch (IllegalStateException exception) { 171 throw new IOException(INITIALIZATION_ERROR, exception); 172 } 173 TableSplit tSplit = (TableSplit) split; 174 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); 175 final TableRecordReader trr = 176 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 177 Scan sc = new Scan(this.scan); 178 sc.setStartRow(tSplit.getStartRow()); 179 sc.setStopRow(tSplit.getEndRow()); 180 trr.setScan(sc); 181 trr.setTable(getTable()); 182 return new RecordReader<ImmutableBytesWritable, Result>() { 183 184 @Override 185 public void close() throws IOException { 186 trr.close(); 187 closeTable(); 188 } 189 190 @Override 191 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 192 return trr.getCurrentKey(); 193 } 194 195 @Override 196 public Result getCurrentValue() throws IOException, InterruptedException { 197 return trr.getCurrentValue(); 198 } 199 200 @Override 201 public float getProgress() throws IOException, InterruptedException { 202 return trr.getProgress(); 203 } 204 205 @Override 206 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 207 throws IOException, InterruptedException { 208 trr.initialize(inputsplit, context); 209 } 210 211 @Override 212 public boolean nextKeyValue() throws IOException, InterruptedException { 213 return trr.nextKeyValue(); 214 } 215 }; 216 } 217 218 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { 219 return getRegionLocator().getStartEndKeys(); 220 } 221 222 /** 223 * Calculates the splits that will serve as input for the map tasks. 224 * @param context The current job context. 225 * @return The list of input splits. 226 * @throws IOException When creating the list of splits fails. 227 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 228 */ 229 @Override 230 public List<InputSplit> getSplits(JobContext context) throws IOException { 231 boolean closeOnFinish = false; 232 233 // Just in case a subclass is relying on JobConfigurable magic. 234 if (table == null) { 235 initialize(context); 236 closeOnFinish = true; 237 } 238 239 // null check in case our child overrides getTable to not throw. 240 try { 241 if (getTable() == null) { 242 // initialize() must not have been implemented in the subclass. 243 throw new IOException(INITIALIZATION_ERROR); 244 } 245 } catch (IllegalStateException exception) { 246 throw new IOException(INITIALIZATION_ERROR, exception); 247 } 248 249 try { 250 List<InputSplit> splits = oneInputSplitPerRegion(); 251 252 // set same number of mappers for each region 253 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 254 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 255 List<InputSplit> res = new ArrayList<>(); 256 for (int i = 0; i < splits.size(); i++) { 257 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 258 res.addAll(tmp); 259 } 260 return res; 261 } 262 263 // The default value of "hbase.mapreduce.input.autobalance" is false. 264 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 265 long maxAveRegionSize = 266 context.getConfiguration().getLong(MAX_AVERAGE_REGION_SIZE, 8L * 1073741824); // 8GB 267 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 268 } 269 270 // return one mapper per region 271 return splits; 272 } finally { 273 if (closeOnFinish) { 274 closeTable(); 275 } 276 } 277 } 278 279 /** 280 * Create one InputSplit per region 281 * @return The list of InputSplit for all the regions 282 * @throws IOException throws IOException 283 */ 284 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 285 if (regionSizeCalculator == null) { 286 // Initialize here rather than with the other resources because this involves 287 // a full scan of meta, which can be heavy. We might as well only do it if/when necessary. 288 regionSizeCalculator = createRegionSizeCalculator(getRegionLocator(), getAdmin()); 289 } 290 291 TableName tableName = getTable().getName(); 292 293 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 294 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { 295 HRegionLocation regLoc = 296 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 297 if (null == regLoc) { 298 throw new IOException("Expecting at least one region."); 299 } 300 List<InputSplit> splits = new ArrayList<>(1); 301 long regionSize = regionSizeCalculator.getRegionSize(regLoc.getRegion().getRegionName()); 302 // In the table input format for single table we do not need to 303 // store the scan object in table split because it can be memory intensive and redundant 304 // information to what is already stored in conf SCAN. See HBASE-25212 305 TableSplit split = 306 new TableSplit(tableName, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, 307 regLoc.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 308 splits.add(split); 309 return splits; 310 } 311 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 312 for (int i = 0; i < keys.getFirst().length; i++) { 313 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 314 continue; 315 } 316 317 byte[] startRow = scan.getStartRow(); 318 byte[] stopRow = scan.getStopRow(); 319 // determine if the given start an stop key fall into the region 320 if ( 321 (startRow.length == 0 || keys.getSecond()[i].length == 0 322 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) 323 && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0) 324 ) { 325 byte[] splitStart = 326 startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 327 ? keys.getFirst()[i] 328 : startRow; 329 byte[] splitStop = 330 (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) 331 && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; 332 333 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 334 // The below InetSocketAddress creation does a name resolution. 335 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 336 if (isa.isUnresolved()) { 337 LOG.warn("Failed resolve " + isa); 338 } 339 InetAddress regionAddress = isa.getAddress(); 340 String regionLocation; 341 regionLocation = reverseDNS(regionAddress); 342 343 byte[] regionName = location.getRegion().getRegionName(); 344 String encodedRegionName = location.getRegion().getEncodedName(); 345 long regionSize = regionSizeCalculator.getRegionSize(regionName); 346 // In the table input format for single table we do not need to 347 // store the scan object in table split because it can be memory intensive and redundant 348 // information to what is already stored in conf SCAN. See HBASE-25212 349 TableSplit split = new TableSplit(tableName, null, splitStart, splitStop, regionLocation, 350 encodedRegionName, regionSize); 351 splits.add(split); 352 if (LOG.isDebugEnabled()) { 353 LOG.debug("getSplits: split -> " + i + " -> " + split); 354 } 355 } 356 } 357 return splits; 358 } 359 360 /** 361 * Create n splits for one InputSplit, For now only support uniform distribution 362 * @param split A TableSplit corresponding to a range of rowkeys 363 * @param n Number of ranges after splitting. Pass 1 means no split for the range Pass 2 if 364 * you want to split the range in two; 365 * @return A list of TableSplit, the size of the list is n 366 * @throws IllegalArgumentIOException throws IllegalArgumentIOException 367 */ 368 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 369 throws IllegalArgumentIOException { 370 if (split == null || !(split instanceof TableSplit)) { 371 throw new IllegalArgumentIOException( 372 "InputSplit for CreateNSplitsPerRegion can not be null + " 373 + "and should be instance of TableSplit"); 374 } 375 // if n < 1, then still continue using n = 1 376 n = n < 1 ? 1 : n; 377 List<InputSplit> res = new ArrayList<>(n); 378 if (n == 1) { 379 res.add(split); 380 return res; 381 } 382 383 // Collect Region related information 384 TableSplit ts = (TableSplit) split; 385 TableName tableName = ts.getTable(); 386 String regionLocation = ts.getRegionLocation(); 387 String encodedRegionName = ts.getEncodedRegionName(); 388 long regionSize = ts.getLength(); 389 byte[] startRow = ts.getStartRow(); 390 byte[] endRow = ts.getEndRow(); 391 392 // For special case: startRow or endRow is empty 393 if (startRow.length == 0 && endRow.length == 0) { 394 startRow = new byte[1]; 395 endRow = new byte[1]; 396 startRow[0] = 0; 397 endRow[0] = -1; 398 } 399 if (startRow.length == 0 && endRow.length != 0) { 400 startRow = new byte[1]; 401 startRow[0] = 0; 402 } 403 if (startRow.length != 0 && endRow.length == 0) { 404 endRow = new byte[startRow.length]; 405 for (int k = 0; k < startRow.length; k++) { 406 endRow[k] = -1; 407 } 408 } 409 410 // Split Region into n chunks evenly 411 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n - 1); 412 for (int i = 0; i < splitKeys.length - 1; i++) { 413 // In the table input format for single table we do not need to 414 // store the scan object in table split because it can be memory intensive and redundant 415 // information to what is already stored in conf SCAN. See HBASE-25212 416 // notice that the regionSize parameter may be not very accurate 417 TableSplit tsplit = new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], 418 regionLocation, encodedRegionName, regionSize / n); 419 res.add(tsplit); 420 } 421 return res; 422 } 423 424 /** 425 * Calculates the number of MapReduce input splits for the map tasks. The number of MapReduce 426 * input splits depends on the average region size. Make it 'public' for testing 427 * @param splits The list of input splits before balance. 428 * @param maxAverageRegionSize max Average region size for one mapper 429 * @return The list of input splits. 430 * @throws IOException When creating the list of splits fails. 431 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 432 */ 433 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, 434 long maxAverageRegionSize) throws IOException { 435 if (splits.size() == 0) { 436 return splits; 437 } 438 List<InputSplit> resultList = new ArrayList<>(); 439 long totalRegionSize = 0; 440 for (int i = 0; i < splits.size(); i++) { 441 TableSplit ts = (TableSplit) splits.get(i); 442 totalRegionSize += ts.getLength(); 443 } 444 long averageRegionSize = totalRegionSize / splits.size(); 445 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 446 if (averageRegionSize <= 0) { 447 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " 448 + "set it to Long.MAX_VALUE " + splits.size()); 449 averageRegionSize = Long.MAX_VALUE / splits.size(); 450 } 451 // if averageRegionSize is too big, change it to default as 1 GB, 452 if (averageRegionSize > maxAverageRegionSize) { 453 averageRegionSize = maxAverageRegionSize; 454 } 455 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' 456 // region 457 // set default as 16M = (default hdfs block size) / 4; 458 if (averageRegionSize < 16 * 1048576) { 459 return splits; 460 } 461 for (int i = 0; i < splits.size(); i++) { 462 TableSplit ts = (TableSplit) splits.get(i); 463 TableName tableName = ts.getTable(); 464 String regionLocation = ts.getRegionLocation(); 465 String encodedRegionName = ts.getEncodedRegionName(); 466 long regionSize = ts.getLength(); 467 468 if (regionSize >= averageRegionSize) { 469 // make this region as multiple MapReduce input split. 470 int n = 471 (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 472 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 473 resultList.addAll(temp); 474 } else { 475 // if the total size of several small continuous regions less than the average region size, 476 // combine them into one MapReduce input split. 477 long totalSize = regionSize; 478 byte[] splitStartKey = ts.getStartRow(); 479 byte[] splitEndKey = ts.getEndRow(); 480 int j = i + 1; 481 while (j < splits.size()) { 482 TableSplit nextRegion = (TableSplit) splits.get(j); 483 long nextRegionSize = nextRegion.getLength(); 484 if ( 485 totalSize + nextRegionSize <= averageRegionSize 486 && Bytes.equals(splitEndKey, nextRegion.getStartRow()) 487 ) { 488 totalSize = totalSize + nextRegionSize; 489 splitEndKey = nextRegion.getEndRow(); 490 j++; 491 } else { 492 break; 493 } 494 } 495 i = j - 1; 496 // In the table input format for single table we do not need to 497 // store the scan object in table split because it can be memory intensive and redundant 498 // information to what is already stored in conf SCAN. See HBASE-25212 499 TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation, 500 encodedRegionName, totalSize); 501 resultList.add(t); 502 } 503 } 504 return resultList; 505 } 506 507 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 508 String hostName = this.reverseDNSCacheMap.get(ipAddress); 509 if (hostName == null) { 510 String ipAddressString = null; 511 try { 512 ipAddressString = DNS.reverseDns(ipAddress, null); 513 } catch (Exception e) { 514 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 515 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 516 // reverse DNS using jndi doesn't work well with ipv6 addresses. 517 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 518 } 519 if (ipAddressString == null) { 520 throw new UnknownHostException("No host found for " + ipAddress); 521 } 522 hostName = Strings.domainNamePointerToHostName(ipAddressString); 523 this.reverseDNSCacheMap.put(ipAddress, hostName); 524 } 525 return hostName; 526 } 527 528 /** 529 * Test if the given region is to be included in the InputSplit while splitting the regions of a 530 * table. 531 * <p> 532 * This optimization is effective when there is a specific reasoning to exclude an entire region 533 * from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys 534 * of the same. <br> 535 * Useful when we need to remember the last-processed top record and revisit the [last, current) 536 * interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the 537 * load on the region server as well, due to the ordering of the keys. <br> 538 * <br> 539 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 540 * <br> 541 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no 542 * region is excluded( i.e. all regions are included). 543 * @param startKey Start key of the region 544 * @param endKey End key of the region 545 * @return true, if this region needs to be included as part of the input (default). 546 */ 547 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 548 return true; 549 } 550 551 /** 552 * Allows subclasses to get the {@link RegionLocator}. 553 */ 554 protected RegionLocator getRegionLocator() { 555 if (regionLocator == null) { 556 throw new IllegalStateException(NOT_INITIALIZED); 557 } 558 return regionLocator; 559 } 560 561 /** 562 * Allows subclasses to get the {@link Table}. 563 */ 564 protected Table getTable() { 565 if (table == null) { 566 throw new IllegalStateException(NOT_INITIALIZED); 567 } 568 return table; 569 } 570 571 /** 572 * Allows subclasses to get the {@link Admin}. 573 */ 574 protected Admin getAdmin() { 575 if (admin == null) { 576 throw new IllegalStateException(NOT_INITIALIZED); 577 } 578 return admin; 579 } 580 581 /** 582 * Allows subclasses to initialize the table information. 583 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 584 * @param tableName The {@link TableName} of the table to process. n 585 */ 586 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 587 if (this.table != null || this.connection != null) { 588 LOG.warn("initializeTable called multiple times. Overwriting connection and table " 589 + "reference; TableInputFormatBase will not close these old references when done."); 590 } 591 this.table = connection.getTable(tableName); 592 this.regionLocator = connection.getRegionLocator(tableName); 593 this.admin = connection.getAdmin(); 594 this.connection = connection; 595 this.regionSizeCalculator = null; 596 } 597 598 @InterfaceAudience.Private 599 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 600 throws IOException { 601 return new RegionSizeCalculator(locator, admin); 602 } 603 604 /** 605 * Gets the scan defining the actual details like columns etc. 606 * @return The internal scan instance. 607 */ 608 public Scan getScan() { 609 if (this.scan == null) this.scan = new Scan(); 610 return scan; 611 } 612 613 /** 614 * Sets the scan defining the actual details like columns etc. 615 * @param scan The scan to set. 616 */ 617 public void setScan(Scan scan) { 618 this.scan = scan; 619 } 620 621 /** 622 * Allows subclasses to set the {@link TableRecordReader}. 623 * @param tableRecordReader A different {@link TableRecordReader} implementation. 624 */ 625 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 626 this.tableRecordReader = tableRecordReader; 627 } 628 629 /** 630 * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, 631 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 632 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 633 * retrieving the necessary configuration information and calling 634 * {@link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize 635 * call such that it is safe to call multiple times. The current TableInputFormatBase 636 * implementation relies on a non-null table reference to decide if an initialize call is needed, 637 * but this behavior may change in the future. In particular, it is critical that initializeTable 638 * not be called multiple times since this will leak Connection instances. 639 */ 640 protected void initialize(JobContext context) throws IOException { 641 } 642 643 /** 644 * Close the Table and related objects that were initialized via 645 * {@link #initializeTable(Connection, TableName)}. n 646 */ 647 protected void closeTable() throws IOException { 648 close(admin, table, regionLocator, connection); 649 admin = null; 650 table = null; 651 regionLocator = null; 652 connection = null; 653 regionSizeCalculator = null; 654 } 655 656 private void close(Closeable... closables) throws IOException { 657 for (Closeable c : closables) { 658 if (c != null) { 659 c.close(); 660 } 661 } 662 } 663 664}