Class ReplicationSourceManager
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
This class is responsible to manage all the replication sources. There are two classes of
sources:
- Normal sources are persistent and one per peer cluster
- Old sources are recovered from a failed region server and our only goal is to finish replicating the WAL queue it had
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
- No need synchronized on
sources
.sources
is a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl
. So there is no race for peer operations. - Need synchronized on
walsById
. There are four methods which modify it,addPeer(String)
,removePeer(String)
,cleanOldLogs(String, boolean, ReplicationSourceInterface)
andpostLogRoll(Path)
.walsById
is a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl
. So there is no race betweenaddPeer(String)
andremovePeer(String)
.cleanOldLogs(String, boolean, ReplicationSourceInterface)
is called byReplicationSourceInterface
. So no race withaddPeer(String)
.removePeer(String)
will terminate theReplicationSourceInterface
firstly, then remove the wals fromwalsById
. So no race withremovePeer(String)
. The only case need synchronized iscleanOldLogs(String, boolean, ReplicationSourceInterface)
andpostLogRoll(Path)
. - No need synchronized on
walsByIdRecoveredQueues
. There are three methods which modify it,removePeer(String)
,cleanOldLogs(String, boolean, ReplicationSourceInterface)
andclaimQueue(ReplicationQueueId)
.cleanOldLogs(String, boolean, ReplicationSourceInterface)
is called byReplicationSourceInterface
.removePeer(String)
will terminate theReplicationSourceInterface
firstly, then remove the wals fromwalsByIdRecoveredQueues
. AndclaimQueue(ReplicationQueueId)
will add the wals towalsByIdRecoveredQueues
firstly, then start up aReplicationSourceInterface
. So there is no race here. ForclaimQueue(ReplicationQueueId)
andremovePeer(String)
, there is already synchronized onoldsources
. So no need synchronized onwalsByIdRecoveredQueues
. - Need synchronized on
latestPaths
to avoid the new open source miss new log. - Need synchronized on
oldsources
to avoid adding recovered source for the to-be-removed peer.
-
Nested Class Summary
Modifier and TypeClassDescriptionprivate static interface
-
Field Summary
Modifier and TypeFieldDescriptionprivate final UUID
private final org.apache.hadoop.conf.Configuration
private final ThreadPoolExecutor
private final org.apache.hadoop.fs.FileSystem
private final MetricsReplicationGlobalSourceSource
private static final org.slf4j.Logger
private final org.apache.hadoop.fs.Path
private final int
private final org.apache.hadoop.fs.Path
private final List<ReplicationSourceInterface>
private final ReplicationQueueStorage
Storage for queues that need persistance; e.g.private final ReplicationPeers
private final Server
private final long
private final long
private final ConcurrentMap<String,
ReplicationSourceInterface> private final SyncReplicationPeerMappingManager
private final long
private AtomicLong
private final WALFactory
private final ConcurrentMap<ReplicationQueueId,
Map<String, NavigableSet<String>>> private final ConcurrentMap<ReplicationQueueId,
Map<String, NavigableSet<org.apache.hadoop.fs.Path>>> -
Constructor Summary
ConstructorDescriptionReplicationSourceManager
(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource globalMetrics) Creates a replication manager and sets the watch on all the other registered region servers -
Method Summary
Modifier and TypeMethodDescriptionprivate void
private void
(package private) boolean
acquireBufferQuota
(long size) Add the size tototalBufferUsed
and check if it exceedstotalBufferLimit
.(package private) boolean
acquireWALEntryBufferQuota
(WALEntryBatch walEntryBatch, WAL.Entry entry) Acquire the buffer quota forWAL.Entry
which is added toWALEntryBatch
.(package private) int
void
addHFileRefs
(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) void
Add peer to replicationPeers Add the normal source and related replication queue Add HFile Refsprivate void
addRecoveredSource
(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, ReplicationQueueId claimedQueueId, PriorityQueue<org.apache.hadoop.fs.Path> walFiles) (package private) void
Add a normal source for the given peer on this region server.private long
addTotalBufferUsed
(long size) (package private) boolean
checkBufferQuota
(String peerId) Check iftotalBufferUsed
exceedstotalBufferLimit
for peer.(package private) void
claimQueue
(ReplicationQueueId queueId) (package private) void
claimQueue
(ReplicationQueueId queueId, boolean syncUp) Claim a replication queue.(package private) void
cleanOldLogs
(String log, boolean inclusive, ReplicationSourceInterface source) Cleans a log file and all older logs from replication queue.private void
cleanOldLogs
(NavigableSet<String> wals, ReplicationSourceInterface source) void
cleanUpHFileRefs
(String peerId, List<String> files) private ReplicationSourceInterface
createRefreshedSource
(ReplicationQueueId queueId, ReplicationPeer peer) private ReplicationSourceInterface
createSource
(ReplicationQueueData queueData, ReplicationPeer replicationPeer) private void
deleteQueue
(ReplicationQueueId queueId) Delete a complete queue of wals associated with a replication sourcevoid
drainSources
(String peerId) This is used when we transit a sync replication peer toSyncReplicationState.STANDBY
.(package private) void
org.apache.hadoop.fs.FileSystem
getFs()
Get the handle on the local file system(package private) MetricsReplicationGlobalSourceSource
(package private) Set<org.apache.hadoop.fs.Path>
org.apache.hadoop.fs.Path
Get the directory where wals are stored by their RSsorg.apache.hadoop.fs.Path
Get the directory where wals are archivedGet a list of all the recovered sources of this rs(package private) ReplicationQueueStorage
Get the ReplicationPeers used by this ReplicationSourceManager(package private) int
Get the normal source for a given peerGet a list of all the normal sources of this rsgetStats()
Get a string representation of all the sources' metricslong
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer.long
private PriorityQueue<org.apache.hadoop.fs.Path>
getWALFilesToReplicate
(ServerName sourceRS, boolean syncUp, Map<String, ReplicationGroupOffset> offsets) getWALs()
Get a copy of the wals of the normal sources on this rs(package private) void
init()
Adds a normal source per registered peer cluster.private void
Refresh replication source will terminate the old source first, then the source thread will be interrupted.void
join()
Terminate the replication on this region servervoid
logPositionAndCleanOldLogs
(ReplicationSourceInterface source, WALEntryBatch entryBatch) This method will log the current position to storage.void
postLogRoll
(org.apache.hadoop.fs.Path newLog) void
refreshSources
(String peerId) Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes.(package private) void
releaseBufferQuota
(long size) To release the buffer quota which acquired byacquireBufferQuota(long)
.(package private) long
releaseWALEntryBatchBufferQuota
(WALEntryBatch walEntryBatch) To release the buffer quota ofWALEntryBatch
which acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry)
.void
removePeer
(String peerId) Remove peer for replicationPeers Remove all the recovered sources for the specified id and related replication queues Remove the normal source and related replication queue Remove HFile Refsprivate boolean
Clear the metrics and related replication queue of the specified old sourceprivate void
removeRemoteWALs
(String peerId, String remoteWALDir, Collection<String> wals) (package private) void
Clear the metrics and related replication queue of the specified old sourceprivate boolean
shouldReplicate
(ReplicationGroupOffset offset, String wal) Check whether we should replicate the givenwal
.private void
-
Field Details
-
LOG
-
sources
-
oldsources
-
queueStorage
Storage for queues that need persistance; e.g. Replication state so can be recovered after a crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource instances keep state. -
replicationPeers
-
clusterId
-
server
-
walsById
-
walsByIdRecoveredQueues
private final ConcurrentMap<ReplicationQueueId,Map<String, walsByIdRecoveredQueuesNavigableSet<org.apache.hadoop.fs.Path>>> -
syncReplicationPeerMappingManager
-
conf
-
fs
-
latestPaths
-
logDir
-
oldLogDir
-
walFactory
-
sleepBeforeFailover
-
executor
-
totalBufferUsed
-
sleepForRetries
-
maxRetriesMultiplier
-
totalBufferLimit
-
globalMetrics
-
-
Constructor Details
-
ReplicationSourceManager
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException Creates a replication manager and sets the watch on all the other registered region servers- Parameters:
queueStorage
- the interface for manipulating replication queuesconf
- the configuration to useserver
- the server for this region serverfs
- the file system to uselogDir
- the directory that contains all wal directories of live RSsoldLogDir
- the directory where old logs are archived- Throws:
IOException
-
-
Method Details
-
init
Adds a normal source per registered peer cluster.- Throws:
IOException
-
addPeer
- Add peer to replicationPeers
- Add the normal source and related replication queue
- Add HFile Refs
- Parameters:
peerId
- the id of replication peer- Throws:
IOException
-
removePeer
- Remove peer for replicationPeers
- Remove all the recovered sources for the specified id and related replication queues
- Remove the normal source and related replication queue
- Remove HFile Refs
- Parameters:
peerId
- the id of the replication peer
-
createSource
private ReplicationSourceInterface createSource(ReplicationQueueData queueData, ReplicationPeer replicationPeer) throws IOException - Parameters:
queueId
- the id of the replication queue to associate the ReplicationSource with.- Returns:
- a new 'classic' user-space replication source.
- Throws:
IOException
- See Also:
-
for creating a ReplicationSource for meta.
-
addSource
Add a normal source for the given peer on this region server. Meanwhile, add new replication queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal group and do replication. We add ainit
parameter to indicate whether this is part of the initialization process. If so, we should skip adding the replication queues as this may introduce dead lock on region server start up and hbase:replication table online.- Parameters:
peerId
- the id of the replication peerinit
- whether this call is part of the initialization process- Throws:
IOException
-
drainSources
This is used when we transit a sync replication peer to
SyncReplicationState.STANDBY
.When transiting to
SyncReplicationState.STANDBY
, we can remove all the pending wal files for a replication peer as we do not need to replicate them any more. And this is necessary, otherwise when we transit back toSyncReplicationState.DOWNGRADE_ACTIVE
later, the stale data will be replicated again and cause inconsistency.See HBASE-20426 for more details.
- Parameters:
peerId
- the id of the sync replication peer- Throws:
IOException
ReplicationException
-
createRefreshedSource
private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, ReplicationPeer peer) throws IOException, ReplicationException - Throws:
IOException
ReplicationException
-
refreshSources
Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes. Here we don't need to change replication queue storage and only to enqueue all logs to the new replication source- Parameters:
peerId
- the id of the replication peer- Throws:
ReplicationException
IOException
-
removeRecoveredSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src
- source to clear
-
finishRecoveredSource
-
removeSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src
- source to clear
-
deleteQueue
Delete a complete queue of wals associated with a replication source- Parameters:
queueId
- the id of replication queue to delete
-
interruptOrAbortWhenFail
Refresh replication source will terminate the old source first, then the source thread will be interrupted. Need to handle it instead of abort the region server. -
abortWhenFail
-
throwIOExceptionWhenFail
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
abortAndThrowIOExceptionWhenFail
private void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
logPositionAndCleanOldLogs
This method will log the current position to storage. And also clean old logs from the replication queue.- Parameters:
source
- the replication sourceentryBatch
- the wal entry batch we just shipped
-
cleanOldLogs
Cleans a log file and all older logs from replication queue. Called when we are sure that a log file is closed and has no more entries.- Parameters:
log
- Path to the loginclusive
- whether we should also remove the given log filesource
- the replication source
-
removeRemoteWALs
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) throws IOException - Throws:
IOException
-
postLogRoll
- Throws:
IOException
-
shouldReplicate
Check whether we should replicate the givenwal
.- Parameters:
wal
- the file name of the wal- Returns:
true
means we should replicate the givenwal
, otherwisefalse
.
-
claimQueue
-
getWALFilesToReplicate
private PriorityQueue<org.apache.hadoop.fs.Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, Map<String, ReplicationGroupOffset> offsets) throws IOException- Throws:
IOException
-
addRecoveredSource
private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, ReplicationQueueId claimedQueueId, PriorityQueue<org.apache.hadoop.fs.Path> walFiles) -
claimQueue
Claim a replication queue. We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue operation, we are the last step of a SCP, so we can assume that all the WAL files are under oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a region server which has not been processed by SCP yet, so we still need to look at its WALs directory.- Parameters:
queueId
- the replication queue id we want to claimsyncUp
- whether we are called by ReplicationSyncUp
-
join
Terminate the replication on this region server -
getWALs
Get a copy of the wals of the normal sources on this rs- Returns:
- a sorted set of wal names
-
getSources
Get a list of all the normal sources of this rs- Returns:
- list of all normal sources
-
getOldSources
Get a list of all the recovered sources of this rs- Returns:
- list of all recovered sources
-
getSource
Get the normal source for a given peer- Returns:
- the normal source for the give peer if it exists, otherwise null.
-
getSizeOfLatestPath
int getSizeOfLatestPath() -
getLastestPath
Set<org.apache.hadoop.fs.Path> getLastestPath() -
getTotalBufferUsed
-
getTotalBufferLimit
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer. -
getOldLogDir
Get the directory where wals are archived- Returns:
- the directory where wals are archived
-
getLogDir
Get the directory where wals are stored by their RSs- Returns:
- the directory where wals are stored by their RSs
-
getFs
Get the handle on the local file system- Returns:
- Handle on the local file system
-
getReplicationPeers
Get the ReplicationPeers used by this ReplicationSourceManager- Returns:
- the ReplicationPeers used by this ReplicationSourceManager
-
getStats
Get a string representation of all the sources' metrics -
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws IOException- Throws:
IOException
-
cleanUpHFileRefs
-
activeFailoverTaskCount
int activeFailoverTaskCount() -
getGlobalMetrics
-
getQueueStorage
-
acquireWALEntryBufferQuota
Acquire the buffer quota forWAL.Entry
which is added toWALEntryBatch
.- Parameters:
entry
- the wal entry which is added toWALEntryBatch
and should acquire buffer quota.- Returns:
- true if we should clear buffer and push all
-
releaseWALEntryBatchBufferQuota
To release the buffer quota ofWALEntryBatch
which acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry)
.- Returns:
- the released buffer quota size.
-
acquireBufferQuota
Add the size tototalBufferUsed
and check if it exceedstotalBufferLimit
.- Returns:
- true if
totalBufferUsed
exceedstotalBufferLimit
,we should stop increase buffer and ship all.
-
releaseBufferQuota
To release the buffer quota which acquired byacquireBufferQuota(long)
. -
addTotalBufferUsed
-
checkBufferQuota
Check iftotalBufferUsed
exceedstotalBufferLimit
for peer.- Returns:
- true if
totalBufferUsed
not more thantotalBufferLimit
.
-