Class ReplicationSourceManager
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
This class is responsible to manage all the replication sources. There are two classes of
sources:
- Normal sources are persistent and one per peer cluster
- Old sources are recovered from a failed region server and our only goal is to finish replicating the WAL queue it had
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
- No need synchronized on
sources.sourcesis a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl. So there is no race for peer operations. - Need synchronized on
walsById. There are four methods which modify it,addPeer(String),removePeer(String),cleanOldLogs(String, boolean, ReplicationSourceInterface)andpostLogRoll(Path).walsByIdis a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl. So there is no race betweenaddPeer(String)andremovePeer(String).cleanOldLogs(String, boolean, ReplicationSourceInterface)is called byReplicationSourceInterface. So no race withaddPeer(String).removePeer(String)will terminate theReplicationSourceInterfacefirstly, then remove the wals fromwalsById. So no race withremovePeer(String). The only case need synchronized iscleanOldLogs(String, boolean, ReplicationSourceInterface)andpostLogRoll(Path). - No need synchronized on
walsByIdRecoveredQueues. There are three methods which modify it,removePeer(String),cleanOldLogs(String, boolean, ReplicationSourceInterface)andclaimQueue(ReplicationQueueId).cleanOldLogs(String, boolean, ReplicationSourceInterface)is called byReplicationSourceInterface.removePeer(String)will terminate theReplicationSourceInterfacefirstly, then remove the wals fromwalsByIdRecoveredQueues. AndclaimQueue(ReplicationQueueId)will add the wals towalsByIdRecoveredQueuesfirstly, then start up aReplicationSourceInterface. So there is no race here. ForclaimQueue(ReplicationQueueId)andremovePeer(String), there is already synchronized onoldsources. So no need synchronized onwalsByIdRecoveredQueues. - Need synchronized on
latestPathsto avoid the new open source miss new log. - Need synchronized on
oldsourcesto avoid adding recovered source for the to-be-removed peer.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static interface -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final UUIDprivate final org.apache.hadoop.conf.Configurationprivate final ThreadPoolExecutorprivate final org.apache.hadoop.fs.FileSystemprivate final MetricsReplicationGlobalSourceSourceprivate static final org.slf4j.Loggerprivate final org.apache.hadoop.fs.Pathprivate final intprivate final org.apache.hadoop.fs.Pathprivate final List<ReplicationSourceInterface>private final ReplicationQueueStorageStorage for queues that need persistance; e.g.private final ReplicationPeersprivate final Serverprivate final longprivate final longprivate final ConcurrentMap<String,ReplicationSourceInterface> private final SyncReplicationPeerMappingManagerprivate final longprivate AtomicLongprivate final WALFactoryprivate final ConcurrentMap<ReplicationQueueId,Map<String, NavigableSet<String>>> private final ConcurrentMap<ReplicationQueueId,Map<String, NavigableSet<org.apache.hadoop.fs.Path>>> -
Constructor Summary
ConstructorsConstructorDescriptionReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource globalMetrics) Creates a replication manager and sets the watch on all the other registered region servers -
Method Summary
Modifier and TypeMethodDescriptionprivate voidprivate void(package private) booleanacquireBufferQuota(long size) Add the size tototalBufferUsedand check if it exceedstotalBufferLimit.(package private) booleanacquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, WAL.Entry entry) Acquire the buffer quota forWAL.Entrywhich is added toWALEntryBatch.(package private) intvoidaddHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) voidAdd peer to replicationPeers Add the normal source and related replication queue Add HFile Refsprivate voidaddRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, ReplicationQueueId claimedQueueId, PriorityQueue<org.apache.hadoop.fs.Path> walFiles) (package private) voidAdd a normal source for the given peer on this region server.private longaddTotalBufferUsed(long size) (package private) booleancheckBufferQuota(String peerId) Check iftotalBufferUsedexceedstotalBufferLimitfor peer.(package private) voidclaimQueue(ReplicationQueueId queueId) (package private) voidclaimQueue(ReplicationQueueId queueId, boolean syncUp) Claim a replication queue.(package private) voidcleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) Cleans a log file and all older logs from replication queue.private voidcleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) voidcleanUpHFileRefs(String peerId, List<String> files) private ReplicationSourceInterfacecreateRefreshedSource(ReplicationQueueId queueId, ReplicationPeer peer) private ReplicationSourceInterfacecreateSource(ReplicationQueueData queueData, ReplicationPeer replicationPeer) private voiddeleteQueue(ReplicationQueueId queueId) Delete a complete queue of wals associated with a replication sourcevoiddrainSources(String peerId) This is used when we transit a sync replication peer toSyncReplicationState.STANDBY.(package private) voidorg.apache.hadoop.fs.FileSystemgetFs()Get the handle on the local file system(package private) MetricsReplicationGlobalSourceSource(package private) Set<org.apache.hadoop.fs.Path>org.apache.hadoop.fs.PathGet the directory where wals are stored by their RSsorg.apache.hadoop.fs.PathGet the directory where wals are archivedGet a list of all the recovered sources of this rs(package private) ReplicationQueueStorageGet the ReplicationPeers used by this ReplicationSourceManager(package private) intGet the normal source for a given peerGet a list of all the normal sources of this rsgetStats()Get a string representation of all the sources' metricslongReturns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer.longprivate PriorityQueue<org.apache.hadoop.fs.Path>getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, Map<String, ReplicationGroupOffset> offsets) getWALs()Get a copy of the wals of the normal sources on this rs(package private) voidinit()Adds a normal source per registered peer cluster.private voidRefresh replication source will terminate the old source first, then the source thread will be interrupted.voidjoin()Terminate the replication on this region servervoidlogPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch) This method will log the current position to storage.voidpostLogRoll(org.apache.hadoop.fs.Path newLog) voidrefreshSources(String peerId) Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes.(package private) voidreleaseBufferQuota(long size) To release the buffer quota which acquired byacquireBufferQuota(long).(package private) longreleaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) To release the buffer quota ofWALEntryBatchwhich acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry).voidremovePeer(String peerId) Remove peer for replicationPeers Remove all the recovered sources for the specified id and related replication queues Remove the normal source and related replication queue Remove HFile Refsprivate booleanClear the metrics and related replication queue of the specified old sourceprivate voidremoveRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) (package private) voidClear the metrics and related replication queue of the specified old sourceprivate booleanshouldReplicate(ReplicationGroupOffset offset, String wal) Check whether we should replicate the givenwal.private void
-
Field Details
-
LOG
-
sources
-
oldsources
-
queueStorage
Storage for queues that need persistance; e.g. Replication state so can be recovered after a crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource instances keep state. -
replicationPeers
-
clusterId
-
server
-
walsById
-
walsByIdRecoveredQueues
private final ConcurrentMap<ReplicationQueueId,Map<String, walsByIdRecoveredQueuesNavigableSet<org.apache.hadoop.fs.Path>>> -
syncReplicationPeerMappingManager
-
conf
-
fs
-
latestPaths
-
logDir
-
oldLogDir
-
walFactory
-
sleepBeforeFailover
-
executor
-
totalBufferUsed
-
sleepForRetries
-
maxRetriesMultiplier
-
totalBufferLimit
-
globalMetrics
-
-
Constructor Details
-
ReplicationSourceManager
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException Creates a replication manager and sets the watch on all the other registered region servers- Parameters:
queueStorage- the interface for manipulating replication queuesconf- the configuration to useserver- the server for this region serverfs- the file system to uselogDir- the directory that contains all wal directories of live RSsoldLogDir- the directory where old logs are archived- Throws:
IOException
-
-
Method Details
-
init
Adds a normal source per registered peer cluster.- Throws:
IOException
-
addPeer
- Add peer to replicationPeers
- Add the normal source and related replication queue
- Add HFile Refs
- Parameters:
peerId- the id of replication peer- Throws:
IOException
-
removePeer
- Remove peer for replicationPeers
- Remove all the recovered sources for the specified id and related replication queues
- Remove the normal source and related replication queue
- Remove HFile Refs
- Parameters:
peerId- the id of the replication peer
-
createSource
private ReplicationSourceInterface createSource(ReplicationQueueData queueData, ReplicationPeer replicationPeer) throws IOException - Parameters:
queueId- the id of the replication queue to associate the ReplicationSource with.- Returns:
- a new 'classic' user-space replication source.
- Throws:
IOException- See Also:
-
for creating a ReplicationSource for meta.
-
addSource
Add a normal source for the given peer on this region server. Meanwhile, add new replication queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal group and do replication. We add ainitparameter to indicate whether this is part of the initialization process. If so, we should skip adding the replication queues as this may introduce dead lock on region server start up and hbase:replication table online.- Parameters:
peerId- the id of the replication peerinit- whether this call is part of the initialization process- Throws:
IOException
-
drainSources
This is used when we transit a sync replication peer to
SyncReplicationState.STANDBY.When transiting to
SyncReplicationState.STANDBY, we can remove all the pending wal files for a replication peer as we do not need to replicate them any more. And this is necessary, otherwise when we transit back toSyncReplicationState.DOWNGRADE_ACTIVElater, the stale data will be replicated again and cause inconsistency.See HBASE-20426 for more details.
- Parameters:
peerId- the id of the sync replication peer- Throws:
IOExceptionReplicationException
-
createRefreshedSource
private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, ReplicationPeer peer) throws IOException, ReplicationException - Throws:
IOExceptionReplicationException
-
refreshSources
Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes. Here we don't need to change replication queue storage and only to enqueue all logs to the new replication source- Parameters:
peerId- the id of the replication peer- Throws:
ReplicationExceptionIOException
-
removeRecoveredSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src- source to clear
-
finishRecoveredSource
-
removeSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src- source to clear
-
deleteQueue
Delete a complete queue of wals associated with a replication source- Parameters:
queueId- the id of replication queue to delete
-
interruptOrAbortWhenFail
Refresh replication source will terminate the old source first, then the source thread will be interrupted. Need to handle it instead of abort the region server. -
abortWhenFail
-
throwIOExceptionWhenFail
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
abortAndThrowIOExceptionWhenFail
private void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
logPositionAndCleanOldLogs
This method will log the current position to storage. And also clean old logs from the replication queue.- Parameters:
source- the replication sourceentryBatch- the wal entry batch we just shipped
-
cleanOldLogs
Cleans a log file and all older logs from replication queue. Called when we are sure that a log file is closed and has no more entries.- Parameters:
log- Path to the loginclusive- whether we should also remove the given log filesource- the replication source
-
removeRemoteWALs
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) throws IOException - Throws:
IOException
-
postLogRoll
- Throws:
IOException
-
shouldReplicate
Check whether we should replicate the givenwal.- Parameters:
wal- the file name of the wal- Returns:
truemeans we should replicate the givenwal, otherwisefalse.
-
claimQueue
-
getWALFilesToReplicate
private PriorityQueue<org.apache.hadoop.fs.Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, Map<String, ReplicationGroupOffset> offsets) throws IOException- Throws:
IOException
-
addRecoveredSource
private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, ReplicationQueueId claimedQueueId, PriorityQueue<org.apache.hadoop.fs.Path> walFiles) -
claimQueue
Claim a replication queue. We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue operation, we are the last step of a SCP, so we can assume that all the WAL files are under oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a region server which has not been processed by SCP yet, so we still need to look at its WALs directory.- Parameters:
queueId- the replication queue id we want to claimsyncUp- whether we are called by ReplicationSyncUp
-
join
Terminate the replication on this region server -
getWALs
Get a copy of the wals of the normal sources on this rs- Returns:
- a sorted set of wal names
-
getSources
Get a list of all the normal sources of this rs- Returns:
- list of all normal sources
-
getOldSources
Get a list of all the recovered sources of this rs- Returns:
- list of all recovered sources
-
getSource
Get the normal source for a given peer- Returns:
- the normal source for the give peer if it exists, otherwise null.
-
getSizeOfLatestPath
int getSizeOfLatestPath() -
getLastestPath
Set<org.apache.hadoop.fs.Path> getLastestPath() -
getTotalBufferUsed
-
getTotalBufferLimit
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer. -
getOldLogDir
Get the directory where wals are archived- Returns:
- the directory where wals are archived
-
getLogDir
Get the directory where wals are stored by their RSs- Returns:
- the directory where wals are stored by their RSs
-
getFs
Get the handle on the local file system- Returns:
- Handle on the local file system
-
getReplicationPeers
Get the ReplicationPeers used by this ReplicationSourceManager- Returns:
- the ReplicationPeers used by this ReplicationSourceManager
-
getStats
Get a string representation of all the sources' metrics -
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws IOException- Throws:
IOException
-
cleanUpHFileRefs
-
activeFailoverTaskCount
int activeFailoverTaskCount() -
getGlobalMetrics
-
getQueueStorage
-
acquireWALEntryBufferQuota
Acquire the buffer quota forWAL.Entrywhich is added toWALEntryBatch.- Parameters:
entry- the wal entry which is added toWALEntryBatchand should acquire buffer quota.- Returns:
- true if we should clear buffer and push all
-
releaseWALEntryBatchBufferQuota
To release the buffer quota ofWALEntryBatchwhich acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry).- Returns:
- the released buffer quota size.
-
acquireBufferQuota
Add the size tototalBufferUsedand check if it exceedstotalBufferLimit.- Returns:
- true if
totalBufferUsedexceedstotalBufferLimit,we should stop increase buffer and ship all.
-
releaseBufferQuota
To release the buffer quota which acquired byacquireBufferQuota(long). -
addTotalBufferUsed
-
checkBufferQuota
Check iftotalBufferUsedexceedstotalBufferLimitfor peer.- Returns:
- true if
totalBufferUsednot more thantotalBufferLimit.
-