Class TableInputFormatBase
java.lang.Object
org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase
- Direct Known Subclasses:
TableInputFormat
@Public
public abstract class TableInputFormatBase
extends org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
A base for
TableInputFormats. Receives a Connection, a TableName, an
Scan instance that defines the input columns etc. Subclasses may use other
TableRecordReader implementations. Subclasses MUST ensure initializeTable(Connection, TableName)
is called for an instance to function properly. Each of the entry points to this class used by
the MapReduce framework, createRecordReader(InputSplit, TaskAttemptContext) and
getSplits(JobContext), will call initialize(JobContext) as a convenient
centralized location to handle retrieving the necessary configuration information. If your
subclass overrides either of these methods, either call the parent version or call initialize
yourself.
An example of a subclass:
class ExampleTIF extends TableInputFormatBase {
@Override
protected void initialize(JobContext context) throws IOException {
// We are responsible for the lifecycle of this connection until we hand it over in
// initializeTable.
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
job.getConfiguration()));
TableName tableName = TableName.valueOf("exampleTable");
// mandatory. once passed here, TableInputFormatBase will handle closing the connection.
initializeTable(connection, tableName);
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
Bytes.toBytes("columnB") };
// optional, by default we'll get everything for the table.
Scan scan = new Scan();
for (byte[] family : inputColumns) {
scan.addFamily(family);
}
Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
scan.setFilter(exampleFilter);
setScan(scan);
}
}
The number of InputSplits(mappers) match the number of regions in a table by default. Set
"hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this
property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable
autobalance, hbase will assign mappers based on average region size; For regions, whose size
larger than average region size may assigned more mappers, and for smaller one, they may group
together to use one mapper. If actual average region size is too big, like 50G, it is not good to
only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max
average region size when enable "autobalanece", default mas average region size is 8G.-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate AdminTheAdmin.private ConnectionThe underlyingConnectionof the table.private static final Stringprivate static final org.slf4j.Loggerstatic final StringSpecify if we enable auto-balance to set number of mappers in M/R jobs.static final StringIn auto-balance, we split input by ave region size, if calculated region size is too big, we can set it.private static final Stringstatic final StringSet the number of Mappers for each region, all regions have same number of Mappersprivate RegionLocatorTheRegionLocatorof the table.private RegionSizeCalculatorUsed to generate splits based on region size.private HashMap<InetAddress,String> The reverse DNS lookup cache mapping: IPAddress => HostNameprivate ScanHolds the details for the internal scanner.private TableTheTableto scan.private TableRecordReaderThe reader scanning the table, can be a custom one. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionList<org.apache.hadoop.mapreduce.InputSplit>calculateAutoBalancedSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits, long maxAverageRegionSize) Calculates the number of MapReduce input splits for the map tasks.private voidprotected voidClose the Table and related objects that were initialized viainitializeTable(Connection, TableName).protected List<org.apache.hadoop.mapreduce.InputSplit>createNInputSplitsUniform(org.apache.hadoop.mapreduce.InputSplit split, int n) Create n splits for one InputSplit, For now only support uniform distributionorg.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,Result> createRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) Builds aTableRecordReader.protected RegionSizeCalculatorcreateRegionSizeCalculator(RegionLocator locator, Admin admin) protected AdmingetAdmin()Allows subclasses to get theAdmin.protected RegionLocatorAllows subclasses to get theRegionLocator.getScan()Gets the scan defining the actual details like columns etc.List<org.apache.hadoop.mapreduce.InputSplit>getSplits(org.apache.hadoop.mapreduce.JobContext context) Calculates the splits that will serve as input for the map tasks.protected Pair<byte[][],byte[][]> protected TablegetTable()Allows subclasses to get theTable.protected booleanincludeRegionInSplit(byte[] startKey, byte[] endKey) Test if the given region is to be included in the InputSplit while splitting the regions of a table.protected voidinitialize(org.apache.hadoop.mapreduce.JobContext context) Handle subclass specific set up.protected voidinitializeTable(Connection connection, TableName tableName) Allows subclasses to initialize the table information.private List<org.apache.hadoop.mapreduce.InputSplit>Create one InputSplit per region(package private) StringreverseDNS(InetAddress ipAddress) voidSets the scan defining the actual details like columns etc.protected voidsetTableRecordReader(TableRecordReader tableRecordReader) Allows subclasses to set theTableRecordReader.
-
Field Details
-
LOG
-
NOT_INITIALIZED
- See Also:
-
INITIALIZATION_ERROR
- See Also:
-
MAPREDUCE_INPUT_AUTOBALANCE
Specify if we enable auto-balance to set number of mappers in M/R jobs.- See Also:
-
MAX_AVERAGE_REGION_SIZE
In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it.- See Also:
-
NUM_MAPPERS_PER_REGION
Set the number of Mappers for each region, all regions have same number of Mappers- See Also:
-
scan
Holds the details for the internal scanner.- See Also:
-
admin
TheAdmin. -
table
TheTableto scan. -
regionLocator
TheRegionLocatorof the table. -
tableRecordReader
The reader scanning the table, can be a custom one. -
connection
The underlyingConnectionof the table. -
regionSizeCalculator
Used to generate splits based on region size. -
reverseDNSCacheMap
The reverse DNS lookup cache mapping: IPAddress => HostName
-
-
Constructor Details
-
TableInputFormatBase
public TableInputFormatBase()
-
-
Method Details
-
createRecordReader
public org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,Result> createRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException Builds aTableRecordReader. If noTableRecordReaderwas provided, uses the default.- Specified by:
createRecordReaderin classorg.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result> - Parameters:
split- The split to work with.context- The current context.- Returns:
- The newly created record reader.
- Throws:
IOException- When creating the reader fails.- See Also:
-
InputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
-
getStartEndKeys
- Throws:
IOException
-
getSplits
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException Calculates the splits that will serve as input for the map tasks.- Specified by:
getSplitsin classorg.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result> - Parameters:
context- The current job context.- Returns:
- The list of input splits.
- Throws:
IOException- When creating the list of splits fails.- See Also:
-
InputFormat.getSplits(org.apache.hadoop.mapreduce.JobContext)
-
oneInputSplitPerRegion
Create one InputSplit per region- Returns:
- The list of InputSplit for all the regions
- Throws:
IOException- throws IOException
-
createNInputSplitsUniform
protected List<org.apache.hadoop.mapreduce.InputSplit> createNInputSplitsUniform(org.apache.hadoop.mapreduce.InputSplit split, int n) throws IllegalArgumentIOException Create n splits for one InputSplit, For now only support uniform distribution- Parameters:
split- A TableSplit corresponding to a range of rowkeysn- Number of ranges after splitting. Pass 1 means no split for the range Pass 2 if you want to split the range in two;- Returns:
- A list of TableSplit, the size of the list is
n - Throws:
IllegalArgumentIOException
-
calculateAutoBalancedSplits
public List<org.apache.hadoop.mapreduce.InputSplit> calculateAutoBalancedSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits, long maxAverageRegionSize) throws IOException Calculates the number of MapReduce input splits for the map tasks. The number of MapReduce input splits depends on the average region size. Make it 'public' for testing- Parameters:
splits- The list of input splits before balance.maxAverageRegionSize- max Average region size for one mapper- Returns:
- The list of input splits.
- Throws:
IOException- When creating the list of splits fails.- See Also:
-
InputFormat.getSplits(org.apache.hadoop.mapreduce.JobContext)
-
reverseDNS
- Throws:
UnknownHostException
-
includeRegionInSplit
Test if the given region is to be included in the InputSplit while splitting the regions of a table.This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys of the same.
Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
Note: It is possible thatendKey.length() == 0, for the last (recent) region.
Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).- Parameters:
startKey- Start key of the regionendKey- End key of the region- Returns:
- true, if this region needs to be included as part of the input (default).
-
getRegionLocator
Allows subclasses to get theRegionLocator. -
getTable
Allows subclasses to get theTable. -
getAdmin
Allows subclasses to get theAdmin. -
initializeTable
Allows subclasses to initialize the table information.- Parameters:
connection- The Connection to the HBase cluster. MUST be unmanaged. We will close.tableName- TheTableNameof the table to process.- Throws:
IOException
-
createRegionSizeCalculator
@Private protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) throws IOException - Throws:
IOException
-
getScan
Gets the scan defining the actual details like columns etc.- Returns:
- The internal scan instance.
-
setScan
Sets the scan defining the actual details like columns etc.- Parameters:
scan- The scan to set.
-
setTableRecordReader
Allows subclasses to set theTableRecordReader.- Parameters:
tableRecordReader- A differentTableRecordReaderimplementation.
-
initialize
Handle subclass specific set up. Each of the entry points used by the MapReduce framework,createRecordReader(InputSplit, TaskAttemptContext)andgetSplits(JobContext), will callinitialize(JobContext)as a convenient centralized location to handle retrieving the necessary configuration information and callinginitializeTable(Connection, TableName). Subclasses should implement their initialize call such that it is safe to call multiple times. The current TableInputFormatBase implementation relies on a non-null table reference to decide if an initialize call is needed, but this behavior may change in the future. In particular, it is critical that initializeTable not be called multiple times since this will leak Connection instances.- Throws:
IOException
-
closeTable
Close the Table and related objects that were initialized viainitializeTable(Connection, TableName).- Throws:
IOException
-
close
- Throws:
IOException
-