@InterfaceAudience.Private public class ReplicationSink extends Object
This class is responsible for replicating the edits coming from another cluster.
This replication process is currently waiting for the edits to be applied before the method can return. This means that the replication of edits is synchronized (after reading from WALs in ReplicationSource) and that a single region server cannot receive edits from two sources at the same time
This class uses the native HBase client in order to replicate entries.
TODO make this class more like ReplicationSource wrt log handlingModifier and Type | Field and Description |
---|---|
private org.apache.hadoop.conf.Configuration |
conf |
private long |
hfilesReplicated |
private static org.slf4j.Logger |
LOG |
private MetricsSink |
metrics |
private SourceFSConfigurationProvider |
provider |
private int |
rowSizeWarnThreshold
Row size threshold for multi requests above which a warning is logged
|
private AsyncConnection |
sharedAsyncConnection
This shared
AsyncConnection is used for handling wal replication. |
private Object |
sharedAsyncConnectionLock |
private Connection |
sharedConnection
This shared
Connection is used for handling bulk load hfiles replication. |
private Object |
sharedConnectionLock |
private AtomicLong |
totalReplicatedEdits |
private WALEntrySinkFilter |
walEntrySinkFilter |
Constructor and Description |
---|
ReplicationSink(org.apache.hadoop.conf.Configuration conf)
Create a sink for replication
|
Modifier and Type | Method and Description |
---|---|
private void |
addFamilyAndItsHFilePathToTableInMap(byte[] family,
String pathToHfileFromNS,
List<Pair<byte[],List<String>>> familyHFilePathsList) |
private void |
addNewTableEntryInMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap,
byte[] family,
String pathToHfileFromNS,
String tableName) |
private <K1,K2,V> List<V> |
addToHashMultiMap(Map<K1,Map<K2,List<V>>> map,
K1 key1,
K2 key2,
V value)
Simple helper to a map from key to (a list of) values TODO: Make a general utility method nnnn
* @return the list of values corresponding to key1 and key2
|
private void |
batch(TableName tableName,
Collection<List<Row>> allRows,
int batchRowSizeThreshold)
Do the changes and handle the pool
|
private void |
buildBulkLoadHFileMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap,
TableName table,
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) |
private void |
decorateConf()
decorate the Configuration object to make replication more receptive to delays: lessen the
timeout and numTries.
|
private AsyncConnection |
getAsyncConnection()
Return the shared
AsyncConnection which is used for handling wal replication. |
private Connection |
getConnection()
Return the shared
Connection which is used for handling bulk load hfiles replication. |
private String |
getHFilePath(TableName table,
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld,
String storeFile,
byte[] family) |
MetricsSink |
getSinkMetrics()
Get replication Sink Metrics n
|
String |
getStats()
Get a string representation of this sink's metrics
|
private boolean |
isNewRowOrType(Cell previousCell,
Cell cell)
nn * @return True if we have crossed over onto a new row or type
|
void |
replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries,
CellScanner cells,
String replicationClusterId,
String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath)
Replicate this array of entries directly into the local cluster using the native client.
|
private WALEntrySinkFilter |
setupWALEntrySinkFilter() |
void |
stopReplicationSinkServices()
stop the thread pool executor.
|
private UUID |
toUUID(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID uuid) |
private static final org.slf4j.Logger LOG
private final org.apache.hadoop.conf.Configuration conf
private volatile Connection sharedConnection
Connection
is used for handling bulk load hfiles replication.private volatile AsyncConnection sharedAsyncConnection
AsyncConnection
is used for handling wal replication.private final MetricsSink metrics
private final AtomicLong totalReplicatedEdits
private final Object sharedConnectionLock
private final Object sharedAsyncConnectionLock
private long hfilesReplicated
private SourceFSConfigurationProvider provider
private WALEntrySinkFilter walEntrySinkFilter
private final int rowSizeWarnThreshold
public ReplicationSink(org.apache.hadoop.conf.Configuration conf) throws IOException
conf
- conf objectIOException
- thrown when HDFS goes bad or bad file nameprivate WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException
IOException
private void decorateConf()
public void replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException
replicationClusterId
- Id which will uniquely identify source cluster FS client
configurations in the replication configuration directorysourceBaseNamespaceDirPath
- Path that point to the source cluster base namespace
directorysourceHFileArchiveDirPath
- Path that point to the source cluster hfile archive directoryIOException
- If failed to replicate the dataprivate void buildBulkLoadHFileMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap, TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) throws IOException
IOException
private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, List<Pair<byte[],List<String>>> familyHFilePathsList)
private void addNewTableEntryInMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap, byte[] family, String pathToHfileFromNS, String tableName)
private String getHFilePath(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family)
private boolean isNewRowOrType(Cell previousCell, Cell cell)
private <K1,K2,V> List<V> addToHashMultiMap(Map<K1,Map<K2,List<V>>> map, K1 key1, K2 key2, V value)
public void stopReplicationSinkServices()
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) throws IOException
tableName
- table to insert intoallRows
- list of actionsbatchRowSizeThreshold
- rowSize threshold for batch mutationIOException
private Connection getConnection() throws IOException
Connection
which is used for handling bulk load hfiles replication.IOException
private AsyncConnection getAsyncConnection() throws IOException
AsyncConnection
which is used for handling wal replication.IOException
public String getStats()
public MetricsSink getSinkMetrics()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.