@InterfaceAudience.Private public class ReplicationSink extends Object
This class is responsible for replicating the edits coming from another cluster.
This replication process is currently waiting for the edits to be applied before the method can return. This means that the replication of edits is synchronized (after reading from WALs in ReplicationSource) and that a single region server cannot receive edits from two sources at the same time
This class uses the native HBase client in order to replicate entries.
TODO make this class more like ReplicationSource wrt log handlingModifier and Type | Field and Description |
---|---|
private org.apache.hadoop.conf.Configuration |
conf |
private long |
hfilesReplicated |
private static org.slf4j.Logger |
LOG |
private MetricsSink |
metrics |
private SourceFSConfigurationProvider |
provider |
private Connection |
sharedHtableCon |
private Object |
sharedHtableConLock |
private AtomicLong |
totalReplicatedEdits |
private WALEntrySinkFilter |
walEntrySinkFilter |
Constructor and Description |
---|
ReplicationSink(org.apache.hadoop.conf.Configuration conf,
Stoppable stopper)
Create a sink for replication
|
Modifier and Type | Method and Description |
---|---|
private void |
addFamilyAndItsHFilePathToTableInMap(byte[] family,
String pathToHfileFromNS,
List<Pair<byte[],List<String>>> familyHFilePathsList) |
private void |
addNewTableEntryInMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap,
byte[] family,
String pathToHfileFromNS,
String tableName) |
private <K1,K2,V> List<V> |
addToHashMultiMap(Map<K1,Map<K2,List<V>>> map,
K1 key1,
K2 key2,
V value)
Simple helper to a map from key to (a list of) values
TODO: Make a general utility method
|
protected void |
batch(TableName tableName,
Collection<List<Row>> allRows)
Do the changes and handle the pool
|
private void |
buildBulkLoadHFileMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap,
TableName table,
Cell cell) |
private void |
decorateConf()
decorate the Configuration object to make replication more receptive to delays:
lessen the timeout and numTries.
|
private Connection |
getConnection() |
private String |
getHFilePath(TableName table,
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld,
String storeFile,
byte[] family) |
MetricsSink |
getSinkMetrics()
Get replication Sink Metrics
|
String |
getStats()
Get a string representation of this sink's metrics
|
private boolean |
isNewRowOrType(Cell previousCell,
Cell cell) |
void |
replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries,
CellScanner cells,
String replicationClusterId,
String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath)
Replicate this array of entries directly into the local cluster using the native client.
|
private WALEntrySinkFilter |
setupWALEntrySinkFilter() |
void |
stopReplicationSinkServices()
stop the thread pool executor.
|
private UUID |
toUUID(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID uuid) |
private static final org.slf4j.Logger LOG
private final org.apache.hadoop.conf.Configuration conf
private volatile Connection sharedHtableCon
private final MetricsSink metrics
private final AtomicLong totalReplicatedEdits
private final Object sharedHtableConLock
private long hfilesReplicated
private SourceFSConfigurationProvider provider
private WALEntrySinkFilter walEntrySinkFilter
public ReplicationSink(org.apache.hadoop.conf.Configuration conf, Stoppable stopper) throws IOException
conf
- conf objectstopper
- boolean to tell this thread to stopIOException
- thrown when HDFS goes bad or bad file nameprivate WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException
IOException
private void decorateConf()
public void replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException
replicationClusterId
- Id which will uniquely identify source cluster FS client
configurations in the replication configuration directorysourceBaseNamespaceDirPath
- Path that point to the source cluster base namespace
directorysourceHFileArchiveDirPath
- Path that point to the source cluster hfile archive directoryIOException
- If failed to replicate the dataprivate void buildBulkLoadHFileMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap, TableName table, Cell cell) throws IOException
IOException
private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, List<Pair<byte[],List<String>>> familyHFilePathsList)
private void addNewTableEntryInMap(Map<String,List<Pair<byte[],List<String>>>> bulkLoadHFileMap, byte[] family, String pathToHfileFromNS, String tableName)
private String getHFilePath(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family)
private boolean isNewRowOrType(Cell previousCell, Cell cell)
previousCell
- cell
- private <K1,K2,V> List<V> addToHashMultiMap(Map<K1,Map<K2,List<V>>> map, K1 key1, K2 key2, V value)
map
- key1
- key2
- value
- public void stopReplicationSinkServices()
protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException
tableName
- table to insert intoallRows
- list of actionsIOException
private Connection getConnection() throws IOException
IOException
public String getStats()
public MetricsSink getSinkMetrics()
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.