Class ReplicationSourceManager
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
This class is responsible to manage all the replication sources. There are two classes of
sources:
- Normal sources are persistent and one per peer cluster
- Old sources are recovered from a failed region server and our only goal is to finish replicating the WAL queue it had
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
- No need synchronized on
sources.sourcesis a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl. So there is no race for peer operations. - Need synchronized on
walsById. There are four methods which modify it,addPeer(String),removePeer(String),cleanOldLogs(NavigableSet, String, boolean, String)andpreLogRoll(Path).walsByIdis a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl. So there is no race betweenaddPeer(String)andremovePeer(String).cleanOldLogs(NavigableSet, String, boolean, String)is called byReplicationSourceInterface. So no race withaddPeer(String).removePeer(String)will terminate theReplicationSourceInterfacefirstly, then remove the wals fromwalsById. So no race withremovePeer(String). The only case need synchronized iscleanOldLogs(NavigableSet, String, boolean, String)andpreLogRoll(Path). - No need synchronized on
walsByIdRecoveredQueues. There are three methods which modify it,removePeer(String),cleanOldLogs(NavigableSet, String, boolean, String)andclaimQueue(ServerName, String).cleanOldLogs(NavigableSet, String, boolean, String)is called byReplicationSourceInterface.removePeer(String)will terminate theReplicationSourceInterfacefirstly, then remove the wals fromwalsByIdRecoveredQueues. AndclaimQueue(ServerName, String)will add the wals towalsByIdRecoveredQueuesfirstly, then start up aReplicationSourceInterface. So there is no race here. ForclaimQueue(ServerName, String)andremovePeer(String), there is already synchronized onoldsources. So no need synchronized onwalsByIdRecoveredQueues. - Need synchronized on
latestPathsto avoid the new open source miss new log. - Need synchronized on
oldsourcesto avoid adding recovered source for the to-be-removed peer.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static interface -
Field Summary
FieldsModifier and TypeFieldDescription(package private) AtomicReference<ReplicationSourceInterface>A special ReplicationSource for hbase:meta Region Read Replicas.private final UUIDprivate final org.apache.hadoop.conf.Configurationprivate final ThreadPoolExecutorprivate final org.apache.hadoop.fs.FileSystemprivate final MetricsReplicationGlobalSourceSourceprivate static final org.slf4j.Loggerprivate final org.apache.hadoop.fs.Pathprivate final org.apache.hadoop.fs.Pathprivate final List<ReplicationSourceInterface>private final ReplicationQueueStorageStorage for queues that need persistance; e.g.private final booleanprivate final ReplicationPeersprivate final Serverprivate final longprivate final ConcurrentMap<String,ReplicationSourceInterface> private final longprivate AtomicLongprivate final WALFactoryprivate final ConcurrentMap<String,Map<String, NavigableSet<String>>> private final ConcurrentMap<String,Map<String, NavigableSet<String>>> -
Constructor Summary
ConstructorsConstructorDescriptionReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, MetricsReplicationGlobalSourceSource globalMetrics) Creates a replication manager and sets the watch on all the other registered region servers -
Method Summary
Modifier and TypeMethodDescriptionprivate voidprivate void(package private) booleanacquireBufferQuota(long size) Add the size tototalBufferUsedand check if it exceedstotalBufferLimit.(package private) booleanacquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, WAL.Entry entry) Acquire the buffer quota forWAL.Entrywhich is added toWALEntryBatch.(package private) intaddCatalogReplicationSource(RegionInfo regionInfo) Add an hbase:meta Catalog replication source.voidaddHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) void1.(package private) ReplicationSourceInterfaceAdd a normal source for the given peer on this region server.private longaddTotalBufferUsed(long size) (package private) booleancheckBufferQuota(String peerId) Check iftotalBufferUsedexceedstotalBufferLimitfor peer.(package private) voidclaimQueue(ServerName deadRS, String queue) (package private) voidcleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) Cleans a log file and all older logs from replication queue.private voidcleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) voidcleanUpHFileRefs(String peerId, List<String> files) private ReplicationSourceInterfacecreateCatalogReplicationSource(RegionInfo regionInfo) Create, initialize, and start the Catalog ReplicationSource.private ReplicationSourceInterfacecreateSource(String queueId, ReplicationPeer replicationPeer) private voiddeleteQueue(String queueId) Delete a complete queue of wals associated with a replication sourceorg.apache.hadoop.fs.FileSystemgetFs()Get the handle on the local file system(package private) MetricsReplicationGlobalSourceSource(package private) Set<org.apache.hadoop.fs.Path>org.apache.hadoop.fs.PathGet the directory where wals are stored by their RSsorg.apache.hadoop.fs.PathGet the directory where wals are archivedGet a list of all the recovered sources of this rs(package private) ReplicationQueueStorageGet the ReplicationPeers used by this ReplicationSourceManager(package private) intGet the normal source for a given peerGet a list of all the normal sources of this rsgetStats()Get a string representation of all the sources' metricslongReturns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer.longgetWALs()Get a copy of the wals of the normal sources on this rsGet a copy of the wals of the recovered sources on this rs(package private) voidinit()Adds a normal source per registered peer cluster.private voidRefresh replication source will terminate the old source first, then the source thread will be interrupted.voidjoin()Terminate the replication on this region servervoidlogPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch) This method will log the current position to storage.voidpostLogRoll(org.apache.hadoop.fs.Path newLog) voidpreLogRoll(org.apache.hadoop.fs.Path newLog) voidrefreshSources(String peerId) Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes.(package private) voidreleaseBufferQuota(long size) To release the buffer quota which acquired byacquireBufferQuota(long).(package private) longreleaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) To release the buffer quota ofWALEntryBatchwhich acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry).voidremoveCatalogReplicationSource(RegionInfo regionInfo) Remove the hbase:meta Catalog replication source.voidremovePeer(String peerId) 1.(package private) voidClear the metrics and related replication queue of the specified old source(package private) voidClear the metrics and related replication queue of the specified old sourceprivate void
-
Field Details
-
LOG
-
sources
-
oldsources
-
queueStorage
Storage for queues that need persistance; e.g. Replication state so can be recovered after a crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource instances keep state. -
replicationPeers
-
clusterId
-
server
-
walsById
-
walsByIdRecoveredQueues
-
conf
-
fs
-
latestPaths
-
logDir
-
oldLogDir
-
walFactory
-
sleepBeforeFailover
-
executor
-
replicationForBulkLoadDataEnabled
-
totalBufferUsed
-
totalBufferLimit
-
globalMetrics
-
catalogReplicationSource
A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains empty. If an hbase:meta Region is opened on this server, we will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from this server (in case it later gets moved back). We synchronize on this instance testing for presence and if absent, while creating so only created and started once.
-
-
Constructor Details
-
ReplicationSourceManager
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException Creates a replication manager and sets the watch on all the other registered region servers- Parameters:
queueStorage- the interface for manipulating replication queuesconf- the configuration to useserver- the server for this region serverfs- the file system to uselogDir- the directory that contains all wal directories of live RSsoldLogDir- the directory where old logs are archived- Throws:
IOException
-
-
Method Details
-
init
Adds a normal source per registered peer cluster.- Throws:
IOException
-
addPeer
1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add HFile Refs- Parameters:
peerId- the id of replication peer- Throws:
IOException
-
removePeer
1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id and related replication queues 3. Remove the normal source and related replication queue 4. Remove HFile Refs- Parameters:
peerId- the id of the replication peer
-
createSource
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException - Parameters:
queueId- the id of the replication queue to associate the ReplicationSource with.- Returns:
- a new 'classic' user-space replication source.
- Throws:
IOException- See Also:
-
addSource
Add a normal source for the given peer on this region server. Meanwhile, add new replication queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal group and do replication- Parameters:
peerId- the id of the replication peer- Returns:
- the source that was created
- Throws:
IOException
-
refreshSources
Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes. Here we don't need to change replication queue storage and only to enqueue all logs to the new replication source- Parameters:
peerId- the id of the replication peer- Throws:
IOException
-
removeRecoveredSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src- source to clear
-
removeSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src- source to clear
-
deleteQueue
Delete a complete queue of wals associated with a replication source- Parameters:
queueId- the id of replication queue to delete
-
interruptOrAbortWhenFail
Refresh replication source will terminate the old source first, then the source thread will be interrupted. Need to handle it instead of abort the region server. -
abortWhenFail
-
throwIOExceptionWhenFail
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
abortAndThrowIOExceptionWhenFail
private void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
logPositionAndCleanOldLogs
This method will log the current position to storage. And also clean old logs from the replication queue.- Parameters:
entryBatch- the wal entry batch we just shipped
-
cleanOldLogs
Cleans a log file and all older logs from replication queue. Called when we are sure that a log file is closed and has no more entries.- Parameters:
log- Path to the loginclusive- whether we should also remove the given log filequeueId- id of the replication queuequeueRecovered- Whether this is a recovered queue
-
preLogRoll
- Throws:
IOException
-
postLogRoll
- Throws:
IOException
-
claimQueue
-
join
Terminate the replication on this region server -
getWALs
Get a copy of the wals of the normal sources on this rs- Returns:
- a sorted set of wal names
-
getWalsByIdRecoveredQueues
Get a copy of the wals of the recovered sources on this rs- Returns:
- a sorted set of wal names
-
getSources
Get a list of all the normal sources of this rs- Returns:
- list of all normal sources
-
getOldSources
Get a list of all the recovered sources of this rs- Returns:
- list of all recovered sources
-
getSource
Get the normal source for a given peer- Returns:
- the normal source for the give peer if it exists, otherwise null.
-
getAllQueues
- Throws:
IOException
-
getSizeOfLatestPath
int getSizeOfLatestPath() -
getLastestPath
Set<org.apache.hadoop.fs.Path> getLastestPath() -
getTotalBufferUsed
-
getTotalBufferLimit
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer. -
getOldLogDir
Get the directory where wals are archived- Returns:
- the directory where wals are archived
-
getLogDir
Get the directory where wals are stored by their RSs- Returns:
- the directory where wals are stored by their RSs
-
getFs
Get the handle on the local file system- Returns:
- Handle on the local file system
-
getReplicationPeers
Get the ReplicationPeers used by this ReplicationSourceManager- Returns:
- the ReplicationPeers used by this ReplicationSourceManager
-
getStats
Get a string representation of all the sources' metrics -
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws IOException- Throws:
IOException
-
cleanUpHFileRefs
-
activeFailoverTaskCount
int activeFailoverTaskCount() -
getGlobalMetrics
-
addCatalogReplicationSource
public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo) throws IOException Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it once only. If exists already, use the existing one. -
removeCatalogReplicationSource
Remove the hbase:meta Catalog replication source. Called when we close hbase:meta. -
createCatalogReplicationSource
private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo) throws IOException Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only (caller must ensure one-time only call). This ReplicationSource is NOT created viaReplicationSourceFactory. -
getQueueStorage
-
acquireWALEntryBufferQuota
Acquire the buffer quota forWAL.Entrywhich is added toWALEntryBatch.- Parameters:
entry- the wal entry which is added toWALEntryBatchand should acquire buffer quota.- Returns:
- true if we should clear buffer and push all
-
releaseWALEntryBatchBufferQuota
To release the buffer quota ofWALEntryBatchwhich acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry).- Returns:
- the released buffer quota size.
-
acquireBufferQuota
Add the size tototalBufferUsedand check if it exceedstotalBufferLimit.- Returns:
- true if
totalBufferUsedexceedstotalBufferLimit,we should stop increase buffer and ship all.
-
releaseBufferQuota
To release the buffer quota which acquired byacquireBufferQuota(long). -
addTotalBufferUsed
-
checkBufferQuota
Check iftotalBufferUsedexceedstotalBufferLimitfor peer.- Returns:
- true if
totalBufferUsednot more thantotalBufferLimit.
-