001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.InetAddress; 024import java.net.InetSocketAddress; 025import java.net.UnknownHostException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.RegionLocator; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 044import org.apache.hadoop.hbase.util.Addressing; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.util.Strings; 048import org.apache.hadoop.mapreduce.InputFormat; 049import org.apache.hadoop.mapreduce.InputSplit; 050import org.apache.hadoop.mapreduce.JobContext; 051import org.apache.hadoop.mapreduce.RecordReader; 052import org.apache.hadoop.mapreduce.TaskAttemptContext; 053import org.apache.hadoop.net.DNS; 054import org.apache.hadoop.util.StringUtils; 055 056/** 057 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, 058 * an {@link Scan} instance that defines the input columns etc. Subclasses may use 059 * other TableRecordReader implementations. 060 * 061 * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to 062 * function properly. Each of the entry points to this class used by the MapReduce framework, 063 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 064 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 065 * retrieving the necessary configuration information. If your subclass overrides either of these 066 * methods, either call the parent version or call initialize yourself. 067 * 068 * <p> 069 * An example of a subclass: 070 * <pre> 071 * class ExampleTIF extends TableInputFormatBase { 072 * 073 * {@literal @}Override 074 * protected void initialize(JobContext context) throws IOException { 075 * // We are responsible for the lifecycle of this connection until we hand it over in 076 * // initializeTable. 077 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 078 * job.getConfiguration())); 079 * TableName tableName = TableName.valueOf("exampleTable"); 080 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 081 * initializeTable(connection, tableName); 082 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 083 * Bytes.toBytes("columnB") }; 084 * // optional, by default we'll get everything for the table. 085 * Scan scan = new Scan(); 086 * for (byte[] family : inputColumns) { 087 * scan.addFamily(family); 088 * } 089 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 090 * scan.setFilter(exampleFilter); 091 * setScan(scan); 092 * } 093 * } 094 * </pre> 095 * 096 * 097 * The number of InputSplits(mappers) match the number of regions in a table by default. 098 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set 099 * this property will disable autobalance below.\ 100 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers 101 * based on average region size; For regions, whose size larger than average region size may assigned 102 * more mappers, and for smaller one, they may group together to use one mapper. If actual average 103 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. 104 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", 105 * default mas average region size is 8G. 106 */ 107@InterfaceAudience.Public 108public abstract class TableInputFormatBase 109 extends InputFormat<ImmutableBytesWritable, Result> { 110 111 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 112 113 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + 114 "initialized. Ensure you call initializeTable either in your constructor or initialize " + 115 "method"; 116 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + 117 " previous error. Please look at the previous logs lines from" + 118 " the task's full log for more details."; 119 120 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 121 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 122 /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ 123 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 124 125 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 126 public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region"; 127 128 129 /** Holds the details for the internal scanner. 130 * 131 * @see Scan */ 132 private Scan scan = null; 133 /** The {@link Admin}. */ 134 private Admin admin; 135 /** The {@link Table} to scan. */ 136 private Table table; 137 /** The {@link RegionLocator} of the table. */ 138 private RegionLocator regionLocator; 139 /** The reader scanning the table, can be a custom one. */ 140 private TableRecordReader tableRecordReader = null; 141 /** The underlying {@link Connection} of the table. */ 142 private Connection connection; 143 144 145 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 146 private HashMap<InetAddress, String> reverseDNSCacheMap = 147 new HashMap<>(); 148 149 /** 150 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses 151 * the default. 152 * 153 * @param split The split to work with. 154 * @param context The current context. 155 * @return The newly created record reader. 156 * @throws IOException When creating the reader fails. 157 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 158 * org.apache.hadoop.mapreduce.InputSplit, 159 * org.apache.hadoop.mapreduce.TaskAttemptContext) 160 */ 161 @Override 162 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 163 InputSplit split, TaskAttemptContext context) 164 throws IOException { 165 // Just in case a subclass is relying on JobConfigurable magic. 166 if (table == null) { 167 initialize(context); 168 } 169 // null check in case our child overrides getTable to not throw. 170 try { 171 if (getTable() == null) { 172 // initialize() must not have been implemented in the subclass. 173 throw new IOException(INITIALIZATION_ERROR); 174 } 175 } catch (IllegalStateException exception) { 176 throw new IOException(INITIALIZATION_ERROR, exception); 177 } 178 TableSplit tSplit = (TableSplit) split; 179 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); 180 final TableRecordReader trr = 181 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 182 Scan sc = new Scan(this.scan); 183 sc.setStartRow(tSplit.getStartRow()); 184 sc.setStopRow(tSplit.getEndRow()); 185 trr.setScan(sc); 186 trr.setTable(getTable()); 187 return new RecordReader<ImmutableBytesWritable, Result>() { 188 189 @Override 190 public void close() throws IOException { 191 trr.close(); 192 closeTable(); 193 } 194 195 @Override 196 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 197 return trr.getCurrentKey(); 198 } 199 200 @Override 201 public Result getCurrentValue() throws IOException, InterruptedException { 202 return trr.getCurrentValue(); 203 } 204 205 @Override 206 public float getProgress() throws IOException, InterruptedException { 207 return trr.getProgress(); 208 } 209 210 @Override 211 public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, 212 InterruptedException { 213 trr.initialize(inputsplit, context); 214 } 215 216 @Override 217 public boolean nextKeyValue() throws IOException, InterruptedException { 218 return trr.nextKeyValue(); 219 } 220 }; 221 } 222 223 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { 224 return getRegionLocator().getStartEndKeys(); 225 } 226 227 /** 228 * Calculates the splits that will serve as input for the map tasks. 229 * @param context The current job context. 230 * @return The list of input splits. 231 * @throws IOException When creating the list of splits fails. 232 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 233 * org.apache.hadoop.mapreduce.JobContext) 234 */ 235 @Override 236 public List<InputSplit> getSplits(JobContext context) throws IOException { 237 boolean closeOnFinish = false; 238 239 // Just in case a subclass is relying on JobConfigurable magic. 240 if (table == null) { 241 initialize(context); 242 closeOnFinish = true; 243 } 244 245 // null check in case our child overrides getTable to not throw. 246 try { 247 if (getTable() == null) { 248 // initialize() must not have been implemented in the subclass. 249 throw new IOException(INITIALIZATION_ERROR); 250 } 251 } catch (IllegalStateException exception) { 252 throw new IOException(INITIALIZATION_ERROR, exception); 253 } 254 255 try { 256 List<InputSplit> splits = oneInputSplitPerRegion(); 257 258 // set same number of mappers for each region 259 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 260 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 261 List<InputSplit> res = new ArrayList<>(); 262 for (int i = 0; i < splits.size(); i++) { 263 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 264 res.addAll(tmp); 265 } 266 return res; 267 } 268 269 //The default value of "hbase.mapreduce.input.autobalance" is false. 270 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 271 long maxAveRegionSize = context.getConfiguration() 272 .getLong(MAX_AVERAGE_REGION_SIZE, 8L*1073741824); //8GB 273 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 274 } 275 276 // return one mapper per region 277 return splits; 278 } finally { 279 if (closeOnFinish) { 280 closeTable(); 281 } 282 } 283 } 284 285 /** 286 * Create one InputSplit per region 287 * 288 * @return The list of InputSplit for all the regions 289 * @throws IOException 290 */ 291 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 292 RegionSizeCalculator sizeCalculator = 293 new RegionSizeCalculator(getRegionLocator(), getAdmin()); 294 295 TableName tableName = getTable().getName(); 296 297 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 298 if (keys == null || keys.getFirst() == null || 299 keys.getFirst().length == 0) { 300 HRegionLocation regLoc = 301 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 302 if (null == regLoc) { 303 throw new IOException("Expecting at least one region."); 304 } 305 List<InputSplit> splits = new ArrayList<>(1); 306 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); 307 TableSplit split = new TableSplit(tableName, scan, 308 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc 309 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 310 splits.add(split); 311 return splits; 312 } 313 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 314 for (int i = 0; i < keys.getFirst().length; i++) { 315 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 316 continue; 317 } 318 319 byte[] startRow = scan.getStartRow(); 320 byte[] stopRow = scan.getStopRow(); 321 // determine if the given start an stop key fall into the region 322 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || 323 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && 324 (stopRow.length == 0 || 325 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { 326 byte[] splitStart = startRow.length == 0 || 327 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 328 keys.getFirst()[i] : startRow; 329 byte[] splitStop = (stopRow.length == 0 || 330 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && 331 keys.getSecond()[i].length > 0 ? 332 keys.getSecond()[i] : stopRow; 333 334 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 335 // The below InetSocketAddress creation does a name resolution. 336 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 337 if (isa.isUnresolved()) { 338 LOG.warn("Failed resolve " + isa); 339 } 340 InetAddress regionAddress = isa.getAddress(); 341 String regionLocation; 342 regionLocation = reverseDNS(regionAddress); 343 344 byte[] regionName = location.getRegionInfo().getRegionName(); 345 String encodedRegionName = location.getRegionInfo().getEncodedName(); 346 long regionSize = sizeCalculator.getRegionSize(regionName); 347 TableSplit split = new TableSplit(tableName, scan, 348 splitStart, splitStop, regionLocation, encodedRegionName, regionSize); 349 splits.add(split); 350 if (LOG.isDebugEnabled()) { 351 LOG.debug("getSplits: split -> " + i + " -> " + split); 352 } 353 } 354 } 355 return splits; 356 } 357 358 /** 359 * Create n splits for one InputSplit, For now only support uniform distribution 360 * @param split A TableSplit corresponding to a range of rowkeys 361 * @param n Number of ranges after splitting. Pass 1 means no split for the range 362 * Pass 2 if you want to split the range in two; 363 * @return A list of TableSplit, the size of the list is n 364 * @throws IllegalArgumentIOException 365 */ 366 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 367 throws IllegalArgumentIOException { 368 if (split == null || !(split instanceof TableSplit)) { 369 throw new IllegalArgumentIOException( 370 "InputSplit for CreateNSplitsPerRegion can not be null + " 371 + "and should be instance of TableSplit"); 372 } 373 //if n < 1, then still continue using n = 1 374 n = n < 1 ? 1 : n; 375 List<InputSplit> res = new ArrayList<>(n); 376 if (n == 1) { 377 res.add(split); 378 return res; 379 } 380 381 // Collect Region related information 382 TableSplit ts = (TableSplit) split; 383 TableName tableName = ts.getTable(); 384 String regionLocation = ts.getRegionLocation(); 385 String encodedRegionName = ts.getEncodedRegionName(); 386 long regionSize = ts.getLength(); 387 byte[] startRow = ts.getStartRow(); 388 byte[] endRow = ts.getEndRow(); 389 390 // For special case: startRow or endRow is empty 391 if (startRow.length == 0 && endRow.length == 0){ 392 startRow = new byte[1]; 393 endRow = new byte[1]; 394 startRow[0] = 0; 395 endRow[0] = -1; 396 } 397 if (startRow.length == 0 && endRow.length != 0){ 398 startRow = new byte[1]; 399 startRow[0] = 0; 400 } 401 if (startRow.length != 0 && endRow.length == 0){ 402 endRow =new byte[startRow.length]; 403 for (int k = 0; k < startRow.length; k++){ 404 endRow[k] = -1; 405 } 406 } 407 408 // Split Region into n chunks evenly 409 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); 410 for (int i = 0; i < splitKeys.length - 1; i++) { 411 //notice that the regionSize parameter may be not very accurate 412 TableSplit tsplit = 413 new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, 414 encodedRegionName, regionSize / n); 415 res.add(tsplit); 416 } 417 return res; 418 } 419 /** 420 * Calculates the number of MapReduce input splits for the map tasks. The number of 421 * MapReduce input splits depends on the average region size. 422 * Make it 'public' for testing 423 * 424 * @param splits The list of input splits before balance. 425 * @param maxAverageRegionSize max Average region size for one mapper 426 * @return The list of input splits. 427 * @throws IOException When creating the list of splits fails. 428 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 429 *org.apache.hadoop.mapreduce.JobContext) 430 */ 431 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize) 432 throws IOException { 433 if (splits.size() == 0) { 434 return splits; 435 } 436 List<InputSplit> resultList = new ArrayList<>(); 437 long totalRegionSize = 0; 438 for (int i = 0; i < splits.size(); i++) { 439 TableSplit ts = (TableSplit) splits.get(i); 440 totalRegionSize += ts.getLength(); 441 } 442 long averageRegionSize = totalRegionSize / splits.size(); 443 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 444 if (averageRegionSize <= 0) { 445 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + 446 "set it to Long.MAX_VALUE " + splits.size()); 447 averageRegionSize = Long.MAX_VALUE / splits.size(); 448 } 449 //if averageRegionSize is too big, change it to default as 1 GB, 450 if (averageRegionSize > maxAverageRegionSize) { 451 averageRegionSize = maxAverageRegionSize; 452 } 453 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region 454 // set default as 16M = (default hdfs block size) / 4; 455 if (averageRegionSize < 16 * 1048576) { 456 return splits; 457 } 458 for (int i = 0; i < splits.size(); i++) { 459 TableSplit ts = (TableSplit) splits.get(i); 460 TableName tableName = ts.getTable(); 461 String regionLocation = ts.getRegionLocation(); 462 String encodedRegionName = ts.getEncodedRegionName(); 463 long regionSize = ts.getLength(); 464 465 if (regionSize >= averageRegionSize) { 466 // make this region as multiple MapReduce input split. 467 int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 468 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 469 resultList.addAll(temp); 470 } else { 471 // if the total size of several small continuous regions less than the average region size, 472 // combine them into one MapReduce input split. 473 long totalSize = regionSize; 474 byte[] splitStartKey = ts.getStartRow(); 475 byte[] splitEndKey = ts.getEndRow(); 476 int j = i + 1; 477 while (j < splits.size()) { 478 TableSplit nextRegion = (TableSplit) splits.get(j); 479 long nextRegionSize = nextRegion.getLength(); 480 if (totalSize + nextRegionSize <= averageRegionSize) { 481 totalSize = totalSize + nextRegionSize; 482 splitEndKey = nextRegion.getEndRow(); 483 j++; 484 } else { 485 break; 486 } 487 } 488 i = j - 1; 489 TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, 490 encodedRegionName, totalSize); 491 resultList.add(t); 492 } 493 } 494 return resultList; 495 } 496 497 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 498 String hostName = this.reverseDNSCacheMap.get(ipAddress); 499 if (hostName == null) { 500 String ipAddressString = null; 501 try { 502 ipAddressString = DNS.reverseDns(ipAddress, null); 503 } catch (Exception e) { 504 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 505 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 506 // reverse DNS using jndi doesn't work well with ipv6 addresses. 507 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 508 } 509 if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); 510 hostName = Strings.domainNamePointerToHostName(ipAddressString); 511 this.reverseDNSCacheMap.put(ipAddress, hostName); 512 } 513 return hostName; 514 } 515 516 /** 517 * Test if the given region is to be included in the InputSplit while splitting 518 * the regions of a table. 519 * <p> 520 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, 521 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> 522 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, 523 * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. 524 * <br> 525 * <br> 526 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 527 * <br> 528 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included). 529 * 530 * 531 * @param startKey Start key of the region 532 * @param endKey End key of the region 533 * @return true, if this region needs to be included as part of the input (default). 534 * 535 */ 536 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { 537 return true; 538 } 539 540 /** 541 * Allows subclasses to get the {@link RegionLocator}. 542 */ 543 protected RegionLocator getRegionLocator() { 544 if (regionLocator == null) { 545 throw new IllegalStateException(NOT_INITIALIZED); 546 } 547 return regionLocator; 548 } 549 550 /** 551 * Allows subclasses to get the {@link Table}. 552 */ 553 protected Table getTable() { 554 if (table == null) { 555 throw new IllegalStateException(NOT_INITIALIZED); 556 } 557 return table; 558 } 559 560 /** 561 * Allows subclasses to get the {@link Admin}. 562 */ 563 protected Admin getAdmin() { 564 if (admin == null) { 565 throw new IllegalStateException(NOT_INITIALIZED); 566 } 567 return admin; 568 } 569 570 /** 571 * Allows subclasses to initialize the table information. 572 * 573 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 574 * @param tableName The {@link TableName} of the table to process. 575 * @throws IOException 576 */ 577 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 578 if (this.table != null || this.connection != null) { 579 LOG.warn("initializeTable called multiple times. Overwriting connection and table " + 580 "reference; TableInputFormatBase will not close these old references when done."); 581 } 582 this.table = connection.getTable(tableName); 583 this.regionLocator = connection.getRegionLocator(tableName); 584 this.admin = connection.getAdmin(); 585 this.connection = connection; 586 } 587 588 /** 589 * Gets the scan defining the actual details like columns etc. 590 * 591 * @return The internal scan instance. 592 */ 593 public Scan getScan() { 594 if (this.scan == null) this.scan = new Scan(); 595 return scan; 596 } 597 598 /** 599 * Sets the scan defining the actual details like columns etc. 600 * 601 * @param scan The scan to set. 602 */ 603 public void setScan(Scan scan) { 604 this.scan = scan; 605 } 606 607 /** 608 * Allows subclasses to set the {@link TableRecordReader}. 609 * 610 * @param tableRecordReader A different {@link TableRecordReader} 611 * implementation. 612 */ 613 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 614 this.tableRecordReader = tableRecordReader; 615 } 616 617 /** 618 * Handle subclass specific set up. 619 * Each of the entry points used by the MapReduce framework, 620 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 621 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 622 * retrieving the necessary configuration information and calling 623 * {@link #initializeTable(Connection, TableName)}. 624 * 625 * Subclasses should implement their initialize call such that it is safe to call multiple times. 626 * The current TableInputFormatBase implementation relies on a non-null table reference to decide 627 * if an initialize call is needed, but this behavior may change in the future. In particular, 628 * it is critical that initializeTable not be called multiple times since this will leak 629 * Connection instances. 630 * 631 */ 632 protected void initialize(JobContext context) throws IOException { 633 } 634 635 /** 636 * Close the Table and related objects that were initialized via 637 * {@link #initializeTable(Connection, TableName)}. 638 * 639 * @throws IOException 640 */ 641 protected void closeTable() throws IOException { 642 close(admin, table, regionLocator, connection); 643 admin = null; 644 table = null; 645 regionLocator = null; 646 connection = null; 647 } 648 649 private void close(Closeable... closables) throws IOException { 650 for (Closeable c : closables) { 651 if(c != null) { c.close(); } 652 } 653 } 654 655}