Class ReplicationSink
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSink
This class is responsible for replicating the edits coming from another cluster.
This replication process is currently waiting for the edits to be applied before the method can return. This means that the replication of edits is synchronized (after reading from WALs in ReplicationSource) and that a single region server cannot receive edits from two sources at the same time
This class uses the native HBase client in order to replicate entries.
TODO make this class more like ReplicationSource wrt log handling-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final org.apache.hadoop.conf.Configurationprivate longprivate static final org.slf4j.Loggerprivate final MetricsSinkprivate SourceFSConfigurationProviderprivate booleanprivate final intRow size threshold for multi requests above which a warning is loggedprivate final RegionServerCoprocessorHostprivate AsyncClusterConnectionprivate final Objectprivate final AtomicLongprivate WALEntrySinkFilter -
Constructor Summary
ConstructorsConstructorDescriptionReplicationSink(org.apache.hadoop.conf.Configuration conf, RegionServerCoprocessorHost rsServerHost) Create a sink for replication -
Method Summary
Modifier and TypeMethodDescriptionprivate voidaddFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS, List<Pair<byte[], List<String>>> familyHFilePathsList) private voidaddNewTableEntryInMap(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, String pathToHfileFromNS, String tableName) private <K1,K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) Simple helper to a map from key to (a list of) values TODO: Make a general utility methodprivate voidbatch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) Do the changes and handle the poolprivate voidbuildBulkLoadHFileMap(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) private voiddecorate the Configuration object to make replication more receptive to delays: lessen the timeout and numTries.private AsyncClusterConnectionprivate StringgetHFilePath(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family) Get replication Sink MetricsgetStats()Get a string representation of this sink's metricsprivate booleanisNewRowOrType(ExtendedCell previousCell, ExtendedCell cell) Returns True if we have crossed over onto a new row or typeprivate PutvoidreplicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, ExtendedCellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) Replicate this array of entries directly into the local cluster using the native client.private WALEntrySinkFiltervoidstop the thread pool executor.private UUIDtoUUID(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID uuid)
-
Field Details
-
LOG
-
conf
-
metrics
-
totalReplicatedEdits
-
hfilesReplicated
-
provider
-
walEntrySinkFilter
-
rowSizeWarnThreshold
Row size threshold for multi requests above which a warning is logged -
replicationSinkTrackerEnabled
-
rsServerHost
-
-
Constructor Details
-
ReplicationSink
public ReplicationSink(org.apache.hadoop.conf.Configuration conf, RegionServerCoprocessorHost rsServerHost) throws IOException Create a sink for replication- Parameters:
conf- conf object- Throws:
IOException- thrown when HDFS goes bad or bad file name
-
-
Method Details
-
setupWALEntrySinkFilter
- Throws:
IOException
-
decorateConf
decorate the Configuration object to make replication more receptive to delays: lessen the timeout and numTries. -
replicateEntries
public void replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, ExtendedCellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException Replicate this array of entries directly into the local cluster using the native client. Only operates against raw protobuf type saving on a conversion from pb to pojo.- Parameters:
entries- WAL entries to be replicated.cells- cell scanner for iteration.replicationClusterId- Id which will uniquely identify source cluster FS client configurations in the replication configuration directorysourceBaseNamespaceDirPath- Path that point to the source cluster base namespace directorysourceHFileArchiveDirPath- Path that point to the source cluster hfile archive directory- Throws:
IOException- If failed to replicate the data
-
processReplicationMarkerEntry
- Throws:
IOException
-
buildBulkLoadHFileMap
private void buildBulkLoadHFileMap(Map<String, List<Pair<byte[], throws IOExceptionList<String>>>> bulkLoadHFileMap, TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) - Throws:
IOException
-
addFamilyAndItsHFilePathToTableInMap
-
addNewTableEntryInMap
-
getHFilePath
private String getHFilePath(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family) -
isNewRowOrType
Returns True if we have crossed over onto a new row or type -
toUUID
-
addToHashMultiMap
private <K1,K2, List<V> addToHashMultiMapV> (Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) Simple helper to a map from key to (a list of) values TODO: Make a general utility method- Returns:
- the list of values corresponding to key1 and key2
-
stopReplicationSinkServices
stop the thread pool executor. It is called when the regionserver is stopped. -
batch
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) throws IOException Do the changes and handle the pool- Parameters:
tableName- table to insert intoallRows- list of actionsbatchRowSizeThreshold- rowSize threshold for batch mutation- Throws:
IOException
-
getConnection
- Throws:
IOException
-
getStats
Get a string representation of this sink's metrics- Returns:
- string with the total replicated edits count and the date of the last edit that was applied
-
getSinkMetrics
Get replication Sink Metrics
-