Class TableInputFormatBase
java.lang.Object
org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase
- Direct Known Subclasses:
TableInputFormat
@Public
public abstract class TableInputFormatBase
extends org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
A base for
TableInputFormat
s. Receives a Connection
, a TableName
, an
Scan
instance that defines the input columns etc. Subclasses may use other
TableRecordReader implementations. Subclasses MUST ensure initializeTable(Connection, TableName)
is called for an instance to function properly. Each of the entry points to this class used by
the MapReduce framework, createRecordReader(InputSplit, TaskAttemptContext)
and
getSplits(JobContext)
, will call initialize(JobContext)
as a convenient
centralized location to handle retrieving the necessary configuration information. If your
subclass overrides either of these methods, either call the parent version or call initialize
yourself.
An example of a subclass:
class ExampleTIF extends TableInputFormatBase { @Override protected void initialize(JobContext context) throws IOException { // We are responsible for the lifecycle of this connection until we hand it over in // initializeTable. Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( job.getConfiguration())); TableName tableName = TableName.valueOf("exampleTable"); // mandatory. once passed here, TableInputFormatBase will handle closing the connection. initializeTable(connection, tableName); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; // optional, by default we'll get everything for the table. Scan scan = new Scan(); for (byte[] family : inputColumns) { scan.addFamily(family); } Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); scan.setFilter(exampleFilter); setScan(scan); } }The number of InputSplits(mappers) match the number of regions in a table by default. Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers based on average region size; For regions, whose size larger than average region size may assigned more mappers, and for smaller one, they may group together to use one mapper. If actual average region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", default mas average region size is 8G.
-
Field Summary
Modifier and TypeFieldDescriptionprivate Admin
TheAdmin
.private Connection
The underlyingConnection
of the table.private static final String
private static final org.slf4j.Logger
static final String
Specify if we enable auto-balance to set number of mappers in M/R jobs.static final String
In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it.private static final String
static final String
Set the number of Mappers for each region, all regions have same number of Mappersprivate RegionLocator
TheRegionLocator
of the table.private RegionSizeCalculator
Used to generate splits based on region size.private HashMap<InetAddress,
String> The reverse DNS lookup cache mapping: IPAddress => HostNameprivate Scan
Holds the details for the internal scanner.private Table
TheTable
to scan.private TableRecordReader
The reader scanning the table, can be a custom one. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionList<org.apache.hadoop.mapreduce.InputSplit>
calculateAutoBalancedSplits
(List<org.apache.hadoop.mapreduce.InputSplit> splits, long maxAverageRegionSize) Calculates the number of MapReduce input splits for the map tasks.private void
protected void
Close the Table and related objects that were initialized viainitializeTable(Connection, TableName)
.protected List<org.apache.hadoop.mapreduce.InputSplit>
createNInputSplitsUniform
(org.apache.hadoop.mapreduce.InputSplit split, int n) Create n splits for one InputSplit, For now only support uniform distributionorg.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,
Result> createRecordReader
(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) Builds aTableRecordReader
.protected RegionSizeCalculator
createRegionSizeCalculator
(RegionLocator locator, Admin admin) protected Admin
getAdmin()
Allows subclasses to get theAdmin
.protected RegionLocator
Allows subclasses to get theRegionLocator
.getScan()
Gets the scan defining the actual details like columns etc.List<org.apache.hadoop.mapreduce.InputSplit>
getSplits
(org.apache.hadoop.mapreduce.JobContext context) Calculates the splits that will serve as input for the map tasks.protected Pair<byte[][],
byte[][]> protected Table
getTable()
Allows subclasses to get theTable
.protected boolean
includeRegionInSplit
(byte[] startKey, byte[] endKey) Test if the given region is to be included in the InputSplit while splitting the regions of a table.protected void
initialize
(org.apache.hadoop.mapreduce.JobContext context) Handle subclass specific set up.protected void
initializeTable
(Connection connection, TableName tableName) Allows subclasses to initialize the table information.private List<org.apache.hadoop.mapreduce.InputSplit>
Create one InputSplit per region(package private) String
reverseDNS
(InetAddress ipAddress) void
Sets the scan defining the actual details like columns etc.protected void
setTableRecordReader
(TableRecordReader tableRecordReader) Allows subclasses to set theTableRecordReader
.
-
Field Details
-
LOG
-
NOT_INITIALIZED
- See Also:
-
INITIALIZATION_ERROR
- See Also:
-
MAPREDUCE_INPUT_AUTOBALANCE
Specify if we enable auto-balance to set number of mappers in M/R jobs.- See Also:
-
MAX_AVERAGE_REGION_SIZE
In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it.- See Also:
-
NUM_MAPPERS_PER_REGION
Set the number of Mappers for each region, all regions have same number of Mappers- See Also:
-
scan
Holds the details for the internal scanner.- See Also:
-
admin
TheAdmin
. -
table
TheTable
to scan. -
regionLocator
TheRegionLocator
of the table. -
tableRecordReader
The reader scanning the table, can be a custom one. -
connection
The underlyingConnection
of the table. -
regionSizeCalculator
Used to generate splits based on region size. -
reverseDNSCacheMap
The reverse DNS lookup cache mapping: IPAddress => HostName
-
-
Constructor Details
-
TableInputFormatBase
public TableInputFormatBase()
-
-
Method Details
-
createRecordReader
public org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,Result> createRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException Builds aTableRecordReader
. If noTableRecordReader
was provided, uses the default.- Specified by:
createRecordReader
in classorg.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,
Result> - Parameters:
split
- The split to work with.context
- The current context.- Returns:
- The newly created record reader.
- Throws:
IOException
- When creating the reader fails.- See Also:
-
InputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
-
getStartEndKeys
- Throws:
IOException
-
getSplits
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException Calculates the splits that will serve as input for the map tasks.- Specified by:
getSplits
in classorg.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,
Result> - Parameters:
context
- The current job context.- Returns:
- The list of input splits.
- Throws:
IOException
- When creating the list of splits fails.- See Also:
-
InputFormat.getSplits(org.apache.hadoop.mapreduce.JobContext)
-
oneInputSplitPerRegion
Create one InputSplit per region- Returns:
- The list of InputSplit for all the regions
- Throws:
IOException
- throws IOException
-
createNInputSplitsUniform
protected List<org.apache.hadoop.mapreduce.InputSplit> createNInputSplitsUniform(org.apache.hadoop.mapreduce.InputSplit split, int n) throws IllegalArgumentIOException Create n splits for one InputSplit, For now only support uniform distribution- Parameters:
split
- A TableSplit corresponding to a range of rowkeysn
- Number of ranges after splitting. Pass 1 means no split for the range Pass 2 if you want to split the range in two;- Returns:
- A list of TableSplit, the size of the list is
n
- Throws:
IllegalArgumentIOException
-
calculateAutoBalancedSplits
public List<org.apache.hadoop.mapreduce.InputSplit> calculateAutoBalancedSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits, long maxAverageRegionSize) throws IOException Calculates the number of MapReduce input splits for the map tasks. The number of MapReduce input splits depends on the average region size. Make it 'public' for testing- Parameters:
splits
- The list of input splits before balance.maxAverageRegionSize
- max Average region size for one mapper- Returns:
- The list of input splits.
- Throws:
IOException
- When creating the list of splits fails.- See Also:
-
InputFormat.getSplits(org.apache.hadoop.mapreduce.JobContext)
-
reverseDNS
- Throws:
UnknownHostException
-
includeRegionInSplit
Test if the given region is to be included in the InputSplit while splitting the regions of a table.This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys of the same.
Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
Note: It is possible thatendKey.length() == 0
, for the last (recent) region.
Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).- Parameters:
startKey
- Start key of the regionendKey
- End key of the region- Returns:
- true, if this region needs to be included as part of the input (default).
-
getRegionLocator
Allows subclasses to get theRegionLocator
. -
getTable
Allows subclasses to get theTable
. -
getAdmin
Allows subclasses to get theAdmin
. -
initializeTable
Allows subclasses to initialize the table information.- Parameters:
connection
- The Connection to the HBase cluster. MUST be unmanaged. We will close.tableName
- TheTableName
of the table to process.- Throws:
IOException
-
createRegionSizeCalculator
@Private protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) throws IOException - Throws:
IOException
-
getScan
Gets the scan defining the actual details like columns etc.- Returns:
- The internal scan instance.
-
setScan
Sets the scan defining the actual details like columns etc.- Parameters:
scan
- The scan to set.
-
setTableRecordReader
Allows subclasses to set theTableRecordReader
.- Parameters:
tableRecordReader
- A differentTableRecordReader
implementation.
-
initialize
Handle subclass specific set up. Each of the entry points used by the MapReduce framework,createRecordReader(InputSplit, TaskAttemptContext)
andgetSplits(JobContext)
, will callinitialize(JobContext)
as a convenient centralized location to handle retrieving the necessary configuration information and callinginitializeTable(Connection, TableName)
. Subclasses should implement their initialize call such that it is safe to call multiple times. The current TableInputFormatBase implementation relies on a non-null table reference to decide if an initialize call is needed, but this behavior may change in the future. In particular, it is critical that initializeTable not be called multiple times since this will leak Connection instances.- Throws:
IOException
-
closeTable
Close the Table and related objects that were initialized viainitializeTable(Connection, TableName)
.- Throws:
IOException
-
close
- Throws:
IOException
-