@InterfaceAudience.Public public abstract class TableInputFormatBase extends org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
TableInputFormat
s. Receives a Connection
, a TableName
,
an Scan
instance that defines the input columns etc. Subclasses may use
other TableRecordReader implementations.
Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
function properly. Each of the entry points to this class used by the MapReduce framework,
createRecordReader(InputSplit, TaskAttemptContext)
and getSplits(JobContext)
,
will call initialize(JobContext)
as a convenient centralized location to handle
retrieving the necessary configuration information. If your subclass overrides either of these
methods, either call the parent version or call initialize yourself.
An example of a subclass:
class ExampleTIF extends TableInputFormatBase { @Override protected void initialize(JobContext context) throws IOException { // We are responsible for the lifecycle of this connection until we hand it over in // initializeTable. Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( job.getConfiguration())); TableName tableName = TableName.valueOf("exampleTable"); // mandatory. once passed here, TableInputFormatBase will handle closing the connection. initializeTable(connection, tableName); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; // optional, by default we'll get everything for the table. Scan scan = new Scan(); for (byte[] family : inputColumns) { scan.addFamily(family); } Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); scan.setFilter(exampleFilter); setScan(scan); } }The number of InputSplits(mappers) match the number of regions in a table by default. Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers based on average region size; For regions, whose size larger than average region size may assigned more mappers, and for smaller one, they may group together to use one mapper. If actual average region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", default mas average region size is 8G.
Modifier and Type | Field and Description |
---|---|
private Admin |
admin
The
Admin . |
private Connection |
connection
The underlying
Connection of the table. |
private static String |
INITIALIZATION_ERROR |
private static org.slf4j.Logger |
LOG |
static String |
MAPREDUCE_INPUT_AUTOBALANCE
Specify if we enable auto-balance to set number of mappers in M/R jobs.
|
static String |
MAX_AVERAGE_REGION_SIZE
In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it.
|
private static String |
NOT_INITIALIZED |
static String |
NUM_MAPPERS_PER_REGION
Set the number of Mappers for each region, all regions have same number of Mappers
|
private RegionLocator |
regionLocator
The
RegionLocator of the table. |
private HashMap<InetAddress,String> |
reverseDNSCacheMap
The reverse DNS lookup cache mapping: IPAddress => HostName
|
private Scan |
scan
Holds the details for the internal scanner.
|
private Table |
table
The
Table to scan. |
private TableRecordReader |
tableRecordReader
The reader scanning the table, can be a custom one.
|
Constructor and Description |
---|
TableInputFormatBase() |
Modifier and Type | Method and Description |
---|---|
List<org.apache.hadoop.mapreduce.InputSplit> |
calculateAutoBalancedSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits,
long maxAverageRegionSize)
Calculates the number of MapReduce input splits for the map tasks.
|
private void |
close(Closeable... closables) |
protected void |
closeTable()
Close the Table and related objects that were initialized via
initializeTable(Connection, TableName) . |
protected List<org.apache.hadoop.mapreduce.InputSplit> |
createNInputSplitsUniform(org.apache.hadoop.mapreduce.InputSplit split,
int n)
Create n splits for one InputSplit, For now only support uniform distribution
|
org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,Result> |
createRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.TaskAttemptContext context)
Builds a
TableRecordReader . |
protected Admin |
getAdmin()
Allows subclasses to get the
Admin . |
protected RegionLocator |
getRegionLocator()
Allows subclasses to get the
RegionLocator . |
Scan |
getScan()
Gets the scan defining the actual details like columns etc.
|
List<org.apache.hadoop.mapreduce.InputSplit> |
getSplits(org.apache.hadoop.mapreduce.JobContext context)
Calculates the splits that will serve as input for the map tasks.
|
protected Pair<byte[][],byte[][]> |
getStartEndKeys() |
protected Table |
getTable()
Allows subclasses to get the
Table . |
protected boolean |
includeRegionInSplit(byte[] startKey,
byte[] endKey)
Test if the given region is to be included in the InputSplit while splitting
the regions of a table.
|
protected void |
initialize(org.apache.hadoop.mapreduce.JobContext context)
Handle subclass specific set up.
|
protected void |
initializeTable(Connection connection,
TableName tableName)
Allows subclasses to initialize the table information.
|
private List<org.apache.hadoop.mapreduce.InputSplit> |
oneInputSplitPerRegion()
Create one InputSplit per region
|
(package private) String |
reverseDNS(InetAddress ipAddress) |
void |
setScan(Scan scan)
Sets the scan defining the actual details like columns etc.
|
protected void |
setTableRecordReader(TableRecordReader tableRecordReader)
Allows subclasses to set the
TableRecordReader . |
private static final org.slf4j.Logger LOG
private static final String NOT_INITIALIZED
private static final String INITIALIZATION_ERROR
public static final String MAPREDUCE_INPUT_AUTOBALANCE
public static final String MAX_AVERAGE_REGION_SIZE
public static final String NUM_MAPPERS_PER_REGION
private RegionLocator regionLocator
RegionLocator
of the table.private TableRecordReader tableRecordReader
private Connection connection
Connection
of the table.private HashMap<InetAddress,String> reverseDNSCacheMap
public TableInputFormatBase()
public org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable,Result> createRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException
TableRecordReader
. If no TableRecordReader
was provided, uses
the default.createRecordReader
in class org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
split
- The split to work with.context
- The current context.IOException
- When creating the reader fails.InputFormat.createRecordReader(
org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)
protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException
IOException
public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException
getSplits
in class org.apache.hadoop.mapreduce.InputFormat<ImmutableBytesWritable,Result>
context
- The current job context.IOException
- When creating the list of splits fails.InputFormat.getSplits(
org.apache.hadoop.mapreduce.JobContext)
private List<org.apache.hadoop.mapreduce.InputSplit> oneInputSplitPerRegion() throws IOException
IOException
protected List<org.apache.hadoop.mapreduce.InputSplit> createNInputSplitsUniform(org.apache.hadoop.mapreduce.InputSplit split, int n) throws IllegalArgumentIOException
split
- A TableSplit corresponding to a range of rowkeysn
- Number of ranges after splitting. Pass 1 means no split for the range
Pass 2 if you want to split the range in two;IllegalArgumentIOException
public List<org.apache.hadoop.mapreduce.InputSplit> calculateAutoBalancedSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits, long maxAverageRegionSize) throws IOException
splits
- The list of input splits before balance.maxAverageRegionSize
- max Average region size for one mapperIOException
- When creating the list of splits fails.InputFormat.getSplits(
org.apache.hadoop.mapreduce.JobContext)
String reverseDNS(InetAddress ipAddress) throws UnknownHostException
UnknownHostException
protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey)
This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
(and hence, not contributing to the InputSplit), given the start and end keys of the same.
Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
Note: It is possible that endKey.length() == 0
, for the last (recent) region.
Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
startKey
- Start key of the regionendKey
- End key of the regionprotected RegionLocator getRegionLocator()
RegionLocator
.protected void initializeTable(Connection connection, TableName tableName) throws IOException
connection
- The Connection to the HBase cluster. MUST be unmanaged. We will close.tableName
- The TableName
of the table to process.IOException
public Scan getScan()
public void setScan(Scan scan)
scan
- The scan to set.protected void setTableRecordReader(TableRecordReader tableRecordReader)
TableRecordReader
.tableRecordReader
- A different TableRecordReader
implementation.protected void initialize(org.apache.hadoop.mapreduce.JobContext context) throws IOException
createRecordReader(InputSplit, TaskAttemptContext)
and getSplits(JobContext)
,
will call initialize(JobContext)
as a convenient centralized location to handle
retrieving the necessary configuration information and calling
initializeTable(Connection, TableName)
.
Subclasses should implement their initialize call such that it is safe to call multiple times.
The current TableInputFormatBase implementation relies on a non-null table reference to decide
if an initialize call is needed, but this behavior may change in the future. In particular,
it is critical that initializeTable not be called multiple times since this will leak
Connection instances.IOException
protected void closeTable() throws IOException
initializeTable(Connection, TableName)
.IOException
private void close(Closeable... closables) throws IOException
IOException
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.