001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.UnknownHostException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.RegionLocator; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 044import org.apache.hadoop.hbase.util.Addressing; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.util.Strings; 048import org.apache.hadoop.mapreduce.InputFormat; 049import org.apache.hadoop.mapreduce.InputSplit; 050import org.apache.hadoop.mapreduce.JobContext; 051import org.apache.hadoop.mapreduce.RecordReader; 052import org.apache.hadoop.mapreduce.TaskAttemptContext; 053import org.apache.hadoop.net.DNS; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 056 057/** 058 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, 059 * an {@link Scan} instance that defines the input columns etc. Subclasses may use 060 * other TableRecordReader implementations. 061 * 062 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to 063 * function properly. Each of the entry points to this class used by the MapReduce framework, 064 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 065 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 066 * retrieving the necessary configuration information. If your subclass overrides either of these 067 * methods, either call the parent version or call initialize yourself. 068 * 069 * <p> 070 * An example of a subclass: 071 * <pre> 072 * class ExampleTIF extends TableInputFormatBase { 073 * 074 * {@literal @}Override 075 * protected void initialize(JobContext context) throws IOException { 076 * // We are responsible for the lifecycle of this connection until we hand it over in 077 * // initializeTable. 078 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 079 * job.getConfiguration())); 080 * TableName tableName = TableName.valueOf("exampleTable"); 081 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 082 * initializeTable(connection, tableName); 083 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 084 * Bytes.toBytes("columnB") }; 085 * // optional, by default we'll get everything for the table. 086 * Scan scan = new Scan(); 087 * for (byte[] family : inputColumns) { 088 * scan.addFamily(family); 089 * } 090 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 091 * scan.setFilter(exampleFilter); 092 * setScan(scan); 093 * } 094 * } 095 * </pre> 096 * 097 * 098 * The number of InputSplits(mappers) match the number of regions in a table by default. 099 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set 100 * this property will disable autobalance below.\ 101 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers 102 * based on average region size; For regions, whose size larger than average region size may assigned 103 * more mappers, and for smaller one, they may group together to use one mapper. If actual average 104 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. 105 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", 106 * default mas average region size is 8G. 107 */ 108@InterfaceAudience.Public 109public abstract class TableInputFormatBase 110 extends InputFormat<ImmutableBytesWritable, Result> { 111 112 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 113 114 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + 115 "initialized. Ensure you call initializeTable either in your constructor or initialize " + 116 "method"; 117 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + 118 " previous error. Please look at the previous logs lines from" + 119 " the task's full log for more details."; 120 121 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 122 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 123 /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ 124 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 125 126 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 127 public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region"; 128 129 130 /** Holds the details for the internal scanner. 131 * 132 * @see Scan */ 133 private Scan scan = null; 134 /** The {@link Admin}. */ 135 private Admin admin; 136 /** The {@link Table} to scan. */ 137 private Table table; 138 /** The {@link RegionLocator} of the table. */ 139 private RegionLocator regionLocator; 140 /** The reader scanning the table, can be a custom one. */ 141 private TableRecordReader tableRecordReader = null; 142 /** The underlying {@link Connection} of the table. */ 143 private Connection connection; 144 145 146 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 147 private HashMap<InetAddress, String> reverseDNSCacheMap = 148 new HashMap<>(); 149 150 /** 151 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses 152 * the default. 153 * 154 * @param split The split to work with. 155 * @param context The current context. 156 * @return The newly created record reader. 157 * @throws IOException When creating the reader fails. 158 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 159 * org.apache.hadoop.mapreduce.InputSplit, 160 * org.apache.hadoop.mapreduce.TaskAttemptContext) 161 */ 162 @Override 163 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 164 InputSplit split, TaskAttemptContext context) 165 throws IOException { 166 // Just in case a subclass is relying on JobConfigurable magic. 167 if (table == null) { 168 initialize(context); 169 } 170 // null check in case our child overrides getTable to not throw. 171 try { 172 if (getTable() == null) { 173 // initialize() must not have been implemented in the subclass. 174 throw new IOException(INITIALIZATION_ERROR); 175 } 176 } catch (IllegalStateException exception) { 177 throw new IOException(INITIALIZATION_ERROR, exception); 178 } 179 TableSplit tSplit = (TableSplit) split; 180 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); 181 final TableRecordReader trr = 182 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 183 Scan sc = new Scan(this.scan); 184 sc.setStartRow(tSplit.getStartRow()); 185 sc.setStopRow(tSplit.getEndRow()); 186 trr.setScan(sc); 187 trr.setTable(getTable()); 188 return new RecordReader<ImmutableBytesWritable, Result>() { 189 190 @Override 191 public void close() throws IOException { 192 trr.close(); 193 closeTable(); 194 } 195 196 @Override 197 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 198 return trr.getCurrentKey(); 199 } 200 201 @Override 202 public Result getCurrentValue() throws IOException, InterruptedException { 203 return trr.getCurrentValue(); 204 } 205 206 @Override 207 public float getProgress() throws IOException, InterruptedException { 208 return trr.getProgress(); 209 } 210 211 @Override 212 public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, 213 InterruptedException { 214 trr.initialize(inputsplit, context); 215 } 216 217 @Override 218 public boolean nextKeyValue() throws IOException, InterruptedException { 219 return trr.nextKeyValue(); 220 } 221 }; 222 } 223 224 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { 225 return getRegionLocator().getStartEndKeys(); 226 } 227 228 /** 229 * Calculates the splits that will serve as input for the map tasks. 230 * @param context The current job context. 231 * @return The list of input splits. 232 * @throws IOException When creating the list of splits fails. 233 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 234 * org.apache.hadoop.mapreduce.JobContext) 235 */ 236 @Override 237 public List<InputSplit> getSplits(JobContext context) throws IOException { 238 boolean closeOnFinish = false; 239 240 // Just in case a subclass is relying on JobConfigurable magic. 241 if (table == null) { 242 initialize(context); 243 closeOnFinish = true; 244 } 245 246 // null check in case our child overrides getTable to not throw. 247 try { 248 if (getTable() == null) { 249 // initialize() must not have been implemented in the subclass. 250 throw new IOException(INITIALIZATION_ERROR); 251 } 252 } catch (IllegalStateException exception) { 253 throw new IOException(INITIALIZATION_ERROR, exception); 254 } 255 256 try { 257 List<InputSplit> splits = oneInputSplitPerRegion(); 258 259 // set same number of mappers for each region 260 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 261 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 262 List<InputSplit> res = new ArrayList<>(); 263 for (int i = 0; i < splits.size(); i++) { 264 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 265 res.addAll(tmp); 266 } 267 return res; 268 } 269 270 //The default value of "hbase.mapreduce.input.autobalance" is false. 271 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 272 long maxAveRegionSize = context.getConfiguration() 273 .getLong(MAX_AVERAGE_REGION_SIZE, 8L*1073741824); //8GB 274 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 275 } 276 277 // return one mapper per region 278 return splits; 279 } finally { 280 if (closeOnFinish) { 281 closeTable(); 282 } 283 } 284 } 285 286 /** 287 * Create one InputSplit per region 288 * 289 * @return The list of InputSplit for all the regions 290 * @throws IOException 291 */ 292 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 293 RegionSizeCalculator sizeCalculator = 294 createRegionSizeCalculator(getRegionLocator(), getAdmin()); 295 296 TableName tableName = getTable().getName(); 297 298 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 299 if (keys == null || keys.getFirst() == null || 300 keys.getFirst().length == 0) { 301 HRegionLocation regLoc = 302 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 303 if (null == regLoc) { 304 throw new IOException("Expecting at least one region."); 305 } 306 List<InputSplit> splits = new ArrayList<>(1); 307 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); 308 TableSplit split = new TableSplit(tableName, scan, 309 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc 310 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 311 splits.add(split); 312 return splits; 313 } 314 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 315 for (int i = 0; i < keys.getFirst().length; i++) { 316 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 317 continue; 318 } 319 320 byte[] startRow = scan.getStartRow(); 321 byte[] stopRow = scan.getStopRow(); 322 // determine if the given start an stop key fall into the region 323 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || 324 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && 325 (stopRow.length == 0 || 326 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { 327 byte[] splitStart = startRow.length == 0 || 328 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 329 keys.getFirst()[i] : startRow; 330 byte[] splitStop = (stopRow.length == 0 || 331 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && 332 keys.getSecond()[i].length > 0 ? 333 keys.getSecond()[i] : stopRow; 334 335 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 336 // The below InetSocketAddress creation does a name resolution. 337 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 338 if (isa.isUnresolved()) { 339 LOG.warn("Failed resolve " + isa); 340 } 341 InetAddress regionAddress = isa.getAddress(); 342 String regionLocation; 343 regionLocation = reverseDNS(regionAddress); 344 345 byte[] regionName = location.getRegionInfo().getRegionName(); 346 String encodedRegionName = location.getRegionInfo().getEncodedName(); 347 long regionSize = sizeCalculator.getRegionSize(regionName); 348 TableSplit split = new TableSplit(tableName, scan, 349 splitStart, splitStop, regionLocation, encodedRegionName, regionSize); 350 splits.add(split); 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("getSplits: split -> " + i + " -> " + split); 353 } 354 } 355 } 356 return splits; 357 } 358 359 /** 360 * Create n splits for one InputSplit, For now only support uniform distribution 361 * @param split A TableSplit corresponding to a range of rowkeys 362 * @param n Number of ranges after splitting. Pass 1 means no split for the range 363 * Pass 2 if you want to split the range in two; 364 * @return A list of TableSplit, the size of the list is n 365 * @throws IllegalArgumentIOException 366 */ 367 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 368 throws IllegalArgumentIOException { 369 if (split == null || !(split instanceof TableSplit)) { 370 throw new IllegalArgumentIOException( 371 "InputSplit for CreateNSplitsPerRegion can not be null + " 372 + "and should be instance of TableSplit"); 373 } 374 //if n < 1, then still continue using n = 1 375 n = n < 1 ? 1 : n; 376 List<InputSplit> res = new ArrayList<>(n); 377 if (n == 1) { 378 res.add(split); 379 return res; 380 } 381 382 // Collect Region related information 383 TableSplit ts = (TableSplit) split; 384 TableName tableName = ts.getTable(); 385 String regionLocation = ts.getRegionLocation(); 386 String encodedRegionName = ts.getEncodedRegionName(); 387 long regionSize = ts.getLength(); 388 byte[] startRow = ts.getStartRow(); 389 byte[] endRow = ts.getEndRow(); 390 391 // For special case: startRow or endRow is empty 392 if (startRow.length == 0 && endRow.length == 0){ 393 startRow = new byte[1]; 394 endRow = new byte[1]; 395 startRow[0] = 0; 396 endRow[0] = -1; 397 } 398 if (startRow.length == 0 && endRow.length != 0){ 399 startRow = new byte[1]; 400 startRow[0] = 0; 401 } 402 if (startRow.length != 0 && endRow.length == 0){ 403 endRow =new byte[startRow.length]; 404 for (int k = 0; k < startRow.length; k++){ 405 endRow[k] = -1; 406 } 407 } 408 409 // Split Region into n chunks evenly 410 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); 411 for (int i = 0; i < splitKeys.length - 1; i++) { 412 //notice that the regionSize parameter may be not very accurate 413 TableSplit tsplit = 414 new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, 415 encodedRegionName, regionSize / n); 416 res.add(tsplit); 417 } 418 return res; 419 } 420 /** 421 * Calculates the number of MapReduce input splits for the map tasks. The number of 422 * MapReduce input splits depends on the average region size. 423 * Make it 'public' for testing 424 * 425 * @param splits The list of input splits before balance. 426 * @param maxAverageRegionSize max Average region size for one mapper 427 * @return The list of input splits. 428 * @throws IOException When creating the list of splits fails. 429 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 430 *org.apache.hadoop.mapreduce.JobContext) 431 */ 432 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize) 433 throws IOException { 434 if (splits.size() == 0) { 435 return splits; 436 } 437 List<InputSplit> resultList = new ArrayList<>(); 438 long totalRegionSize = 0; 439 for (int i = 0; i < splits.size(); i++) { 440 TableSplit ts = (TableSplit) splits.get(i); 441 totalRegionSize += ts.getLength(); 442 } 443 long averageRegionSize = totalRegionSize / splits.size(); 444 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 445 if (averageRegionSize <= 0) { 446 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + 447 "set it to Long.MAX_VALUE " + splits.size()); 448 averageRegionSize = Long.MAX_VALUE / splits.size(); 449 } 450 //if averageRegionSize is too big, change it to default as 1 GB, 451 if (averageRegionSize > maxAverageRegionSize) { 452 averageRegionSize = maxAverageRegionSize; 453 } 454 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region 455 // set default as 16M = (default hdfs block size) / 4; 456 if (averageRegionSize < 16 * 1048576) { 457 return splits; 458 } 459 for (int i = 0; i < splits.size(); i++) { 460 TableSplit ts = (TableSplit) splits.get(i); 461 TableName tableName = ts.getTable(); 462 String regionLocation = ts.getRegionLocation(); 463 String encodedRegionName = ts.getEncodedRegionName(); 464 long regionSize = ts.getLength(); 465 466 if (regionSize >= averageRegionSize) { 467 // make this region as multiple MapReduce input split. 468 int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 469 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 470 resultList.addAll(temp); 471 } else { 472 // if the total size of several small continuous regions less than the average region size, 473 // combine them into one MapReduce input split. 474 long totalSize = regionSize; 475 byte[] splitStartKey = ts.getStartRow(); 476 byte[] splitEndKey = ts.getEndRow(); 477 int j = i + 1; 478 while (j < splits.size()) { 479 TableSplit nextRegion = (TableSplit) splits.get(j); 480 long nextRegionSize = nextRegion.getLength(); 481 if (totalSize + nextRegionSize <= averageRegionSize 482 && Bytes.equals(splitEndKey, nextRegion.getStartRow())) { 483 totalSize = totalSize + nextRegionSize; 484 splitEndKey = nextRegion.getEndRow(); 485 j++; 486 } else { 487 break; 488 } 489 } 490 i = j - 1; 491 TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, 492 encodedRegionName, totalSize); 493 resultList.add(t); 494 } 495 } 496 return resultList; 497 } 498 499 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 500 String hostName = this.reverseDNSCacheMap.get(ipAddress); 501 if (hostName == null) { 502 String ipAddressString = null; 503 try { 504 ipAddressString = DNS.reverseDns(ipAddress, null); 505 } catch (Exception e) { 506 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 507 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 508 // reverse DNS using jndi doesn't work well with ipv6 addresses. 509 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 510 } 511 if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); 512 hostName = Strings.domainNamePointerToHostName(ipAddressString); 513 this.reverseDNSCacheMap.put(ipAddress, hostName); 514 } 515 return hostName; 516 } 517 518 /** 519 * Test if the given region is to be included in the InputSplit while splitting 520 * the regions of a table. 521 * <p> 522 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, 523 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> 524 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, 525 * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. 526 * <br> 527 * <br> 528 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 529 * <br> 530 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included). 531 * 532 * 533 * @param startKey Start key of the region 534 * @param endKey End key of the region 535 * @return true, if this region needs to be included as part of the input (default). 536 * 537 */ 538 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { 539 return true; 540 } 541 542 /** 543 * Allows subclasses to get the {@link RegionLocator}. 544 */ 545 protected RegionLocator getRegionLocator() { 546 if (regionLocator == null) { 547 throw new IllegalStateException(NOT_INITIALIZED); 548 } 549 return regionLocator; 550 } 551 552 /** 553 * Allows subclasses to get the {@link Table}. 554 */ 555 protected Table getTable() { 556 if (table == null) { 557 throw new IllegalStateException(NOT_INITIALIZED); 558 } 559 return table; 560 } 561 562 /** 563 * Allows subclasses to get the {@link Admin}. 564 */ 565 protected Admin getAdmin() { 566 if (admin == null) { 567 throw new IllegalStateException(NOT_INITIALIZED); 568 } 569 return admin; 570 } 571 572 /** 573 * Allows subclasses to initialize the table information. 574 * 575 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 576 * @param tableName The {@link TableName} of the table to process. 577 * @throws IOException 578 */ 579 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 580 if (this.table != null || this.connection != null) { 581 LOG.warn("initializeTable called multiple times. Overwriting connection and table " + 582 "reference; TableInputFormatBase will not close these old references when done."); 583 } 584 this.table = connection.getTable(tableName); 585 this.regionLocator = connection.getRegionLocator(tableName); 586 this.admin = connection.getAdmin(); 587 this.connection = connection; 588 } 589 590 @VisibleForTesting 591 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 592 throws IOException { 593 return new RegionSizeCalculator(locator, admin); 594 } 595 596 /** 597 * Gets the scan defining the actual details like columns etc. 598 * 599 * @return The internal scan instance. 600 */ 601 public Scan getScan() { 602 if (this.scan == null) this.scan = new Scan(); 603 return scan; 604 } 605 606 /** 607 * Sets the scan defining the actual details like columns etc. 608 * 609 * @param scan The scan to set. 610 */ 611 public void setScan(Scan scan) { 612 this.scan = scan; 613 } 614 615 /** 616 * Allows subclasses to set the {@link TableRecordReader}. 617 * 618 * @param tableRecordReader A different {@link TableRecordReader} 619 * implementation. 620 */ 621 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 622 this.tableRecordReader = tableRecordReader; 623 } 624 625 /** 626 * Handle subclass specific set up. 627 * Each of the entry points used by the MapReduce framework, 628 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 629 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 630 * retrieving the necessary configuration information and calling 631 * {@link #initializeTable(Connection, TableName)}. 632 * 633 * Subclasses should implement their initialize call such that it is safe to call multiple times. 634 * The current TableInputFormatBase implementation relies on a non-null table reference to decide 635 * if an initialize call is needed, but this behavior may change in the future. In particular, 636 * it is critical that initializeTable not be called multiple times since this will leak 637 * Connection instances. 638 * 639 */ 640 protected void initialize(JobContext context) throws IOException { 641 } 642 643 /** 644 * Close the Table and related objects that were initialized via 645 * {@link #initializeTable(Connection, TableName)}. 646 * 647 * @throws IOException 648 */ 649 protected void closeTable() throws IOException { 650 close(admin, table, regionLocator, connection); 651 admin = null; 652 table = null; 653 regionLocator = null; 654 connection = null; 655 } 656 657 private void close(Closeable... closables) throws IOException { 658 for (Closeable c : closables) { 659 if(c != null) { c.close(); } 660 } 661 } 662 663}