@InterfaceAudience.Private public class ReplicationSourceManager extends Object
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
sources
. sources
is a ConcurrentHashMap and there
is a Lock for peer id in PeerProcedureHandlerImpl
. So there is no race for peer
operations.walsById
. There are four methods which modify it,
addPeer(String)
, removePeer(String)
,
cleanOldLogs(String, boolean, ReplicationSourceInterface)
and
postLogRoll(Path)
. walsById
is a ConcurrentHashMap and there is a Lock for peer
id in PeerProcedureHandlerImpl
. So there is no race between addPeer(String)
and
removePeer(String)
. cleanOldLogs(String, boolean, ReplicationSourceInterface)
is called by ReplicationSourceInterface
. So no race with addPeer(String)
.
removePeer(String)
will terminate the ReplicationSourceInterface
firstly, then
remove the wals from walsById
. So no race with removePeer(String)
. The only
case need synchronized is cleanOldLogs(String, boolean, ReplicationSourceInterface)
and
postLogRoll(Path)
.walsByIdRecoveredQueues
. There are three methods which
modify it, removePeer(String)
,
cleanOldLogs(String, boolean, ReplicationSourceInterface)
and
claimQueue(ReplicationQueueId)
.
cleanOldLogs(String, boolean, ReplicationSourceInterface)
is called by
ReplicationSourceInterface
. removePeer(String)
will terminate the
ReplicationSourceInterface
firstly, then remove the wals from
walsByIdRecoveredQueues
. And claimQueue(ReplicationQueueId)
will add the wals
to walsByIdRecoveredQueues
firstly, then start up a ReplicationSourceInterface
.
So there is no race here. For claimQueue(ReplicationQueueId)
and
removePeer(String)
, there is already synchronized on oldsources
. So no need
synchronized on walsByIdRecoveredQueues
.latestPaths
to avoid the new open source miss new log.oldsources
to avoid adding recovered source for the
to-be-removed peer.Modifier and Type | Class and Description |
---|---|
private static interface |
ReplicationSourceManager.ReplicationQueueOperation |
Constructor and Description |
---|
ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers,
org.apache.hadoop.conf.Configuration conf,
Server server,
org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path logDir,
org.apache.hadoop.fs.Path oldLogDir,
UUID clusterId,
WALFactory walFactory,
SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
MetricsReplicationGlobalSourceSource globalMetrics)
Creates a replication manager and sets the watch on all the other registered region servers
|
Modifier and Type | Method and Description |
---|---|
private void |
abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) |
private void |
abortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) |
(package private) boolean |
acquireBufferQuota(long size)
Add the size to
totalBufferUsed and check if it exceeds
totalBufferLimit . |
(package private) boolean |
acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch,
WAL.Entry entry)
Acquire the buffer quota for
WAL.Entry which is added to WALEntryBatch . |
(package private) int |
activeFailoverTaskCount() |
void |
addHFileRefs(TableName tableName,
byte[] family,
List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) |
void |
addPeer(String peerId)
Add peer to replicationPeers
Add the normal source and related replication queue
Add HFile Refs
|
private void |
addRecoveredSource(ReplicationSourceInterface src,
ReplicationPeerImpl oldPeer,
ReplicationQueueId claimedQueueId,
PriorityQueue<org.apache.hadoop.fs.Path> walFiles) |
(package private) void |
addSource(String peerId,
boolean init)
Add a normal source for the given peer on this region server.
|
private long |
addTotalBufferUsed(long size) |
(package private) boolean |
checkBufferQuota(String peerId)
Check if
totalBufferUsed exceeds
totalBufferLimit for peer. |
(package private) void |
claimQueue(ReplicationQueueId queueId) |
(package private) void |
claimQueue(ReplicationQueueId queueId,
boolean syncUp)
Claim a replication queue.
|
private void |
cleanOldLogs(NavigableSet<String> wals,
ReplicationSourceInterface source) |
(package private) void |
cleanOldLogs(String log,
boolean inclusive,
ReplicationSourceInterface source)
Cleans a log file and all older logs from replication queue.
|
void |
cleanUpHFileRefs(String peerId,
List<String> files) |
private ReplicationSourceInterface |
createRefreshedSource(ReplicationQueueId queueId,
ReplicationPeer peer) |
private ReplicationSourceInterface |
createSource(ReplicationQueueData queueData,
ReplicationPeer replicationPeer) |
private void |
deleteQueue(ReplicationQueueId queueId)
Delete a complete queue of wals associated with a replication source
|
void |
drainSources(String peerId)
This is used when we transit a sync replication peer to
SyncReplicationState.STANDBY . |
(package private) void |
finishRecoveredSource(ReplicationSourceInterface src) |
org.apache.hadoop.fs.FileSystem |
getFs()
Get the handle on the local file system
|
(package private) MetricsReplicationGlobalSourceSource |
getGlobalMetrics() |
(package private) Set<org.apache.hadoop.fs.Path> |
getLastestPath() |
org.apache.hadoop.fs.Path |
getLogDir()
Get the directory where wals are stored by their RSs
|
org.apache.hadoop.fs.Path |
getOldLogDir()
Get the directory where wals are archived
|
List<ReplicationSourceInterface> |
getOldSources()
Get a list of all the recovered sources of this rs
|
(package private) ReplicationQueueStorage |
getQueueStorage() |
ReplicationPeers |
getReplicationPeers()
Get the ReplicationPeers used by this ReplicationSourceManager
|
(package private) int |
getSizeOfLatestPath() |
ReplicationSourceInterface |
getSource(String peerId)
Get the normal source for a given peer
|
List<ReplicationSourceInterface> |
getSources()
Get a list of all the normal sources of this rs
|
String |
getStats()
Get a string representation of all the sources' metrics
|
long |
getTotalBufferLimit()
Returns the maximum size in bytes of edits held in memory which are pending replication across
all sources inside this RegionServer.
|
long |
getTotalBufferUsed() |
private PriorityQueue<org.apache.hadoop.fs.Path> |
getWALFilesToReplicate(ServerName sourceRS,
boolean syncUp,
Map<String,ReplicationGroupOffset> offsets) |
Map<ReplicationQueueId,Map<String,NavigableSet<String>>> |
getWALs()
Get a copy of the wals of the normal sources on this rs
|
(package private) void |
init()
Adds a normal source per registered peer cluster.
|
private void |
interruptOrAbortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)
Refresh replication source will terminate the old source first, then the source thread will be
interrupted.
|
void |
join()
Terminate the replication on this region server
|
void |
logPositionAndCleanOldLogs(ReplicationSourceInterface source,
WALEntryBatch entryBatch)
This method will log the current position to storage.
|
void |
postLogRoll(org.apache.hadoop.fs.Path newLog) |
void |
refreshSources(String peerId)
Close the previous replication sources of this peer id and open new sources to trigger the new
replication state changes or new replication config changes.
|
(package private) void |
releaseBufferQuota(long size)
To release the buffer quota which acquired by
acquireBufferQuota(long) . |
(package private) long |
releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch)
To release the buffer quota of
WALEntryBatch which acquired by
acquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry) . |
void |
removePeer(String peerId)
Remove peer for replicationPeers
Remove all the recovered sources for the specified id and related replication queues
Remove the normal source and related replication queue
Remove HFile Refs
|
private boolean |
removeRecoveredSource(ReplicationSourceInterface src)
Clear the metrics and related replication queue of the specified old source
|
private void |
removeRemoteWALs(String peerId,
String remoteWALDir,
Collection<String> wals) |
(package private) void |
removeSource(ReplicationSourceInterface src)
Clear the metrics and related replication queue of the specified old source
|
private boolean |
shouldReplicate(ReplicationGroupOffset offset,
String wal)
Check whether we should replicate the given
wal . |
private void |
throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) |
private static final org.slf4j.Logger LOG
private final ConcurrentMap<String,ReplicationSourceInterface> sources
private final List<ReplicationSourceInterface> oldsources
private final ReplicationQueueStorage queueStorage
private final ReplicationPeers replicationPeers
private final ConcurrentMap<ReplicationQueueId,Map<String,NavigableSet<String>>> walsById
private final ConcurrentMap<ReplicationQueueId,Map<String,NavigableSet<org.apache.hadoop.fs.Path>>> walsByIdRecoveredQueues
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager
private final org.apache.hadoop.conf.Configuration conf
private final org.apache.hadoop.fs.FileSystem fs
private final Map<String,org.apache.hadoop.fs.Path> latestPaths
private final org.apache.hadoop.fs.Path logDir
private final org.apache.hadoop.fs.Path oldLogDir
private final WALFactory walFactory
private final long sleepBeforeFailover
private final ThreadPoolExecutor executor
private AtomicLong totalBufferUsed
private final long sleepForRetries
private final int maxRetriesMultiplier
private final long totalBufferLimit
private final MetricsReplicationGlobalSourceSource globalMetrics
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException
queueStorage
- the interface for manipulating replication queuesconf
- the configuration to useserver
- the server for this region serverfs
- the file system to uselogDir
- the directory that contains all wal directories of live RSsoldLogDir
- the directory where old logs are archivedIOException
void init() throws IOException
IOException
public void addPeer(String peerId) throws IOException
peerId
- the id of replication peerIOException
public void removePeer(String peerId)
peerId
- the id of the replication peerprivate ReplicationSourceInterface createSource(ReplicationQueueData queueData, ReplicationPeer replicationPeer) throws IOException
queueId
- the id of the replication queue to associate the ReplicationSource with.IOException
for creating a ReplicationSource for meta.
void addSource(String peerId, boolean init) throws IOException
init
parameter to indicate whether this is part of the initialization process.
If so, we should skip adding the replication queues as this may introduce dead lock on region
server start up and hbase:replication table online.peerId
- the id of the replication peerinit
- whether this call is part of the initialization processIOException
public void drainSources(String peerId) throws IOException, ReplicationException
This is used when we transit a sync replication peer to SyncReplicationState.STANDBY
.
When transiting to SyncReplicationState.STANDBY
, we can remove all the pending wal
files for a replication peer as we do not need to replicate them any more. And this is
necessary, otherwise when we transit back to SyncReplicationState.DOWNGRADE_ACTIVE
later, the stale data will be replicated again and cause inconsistency.
See HBASE-20426 for more details.
peerId
- the id of the sync replication peerIOException
ReplicationException
private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, ReplicationPeer peer) throws IOException, ReplicationException
IOException
ReplicationException
public void refreshSources(String peerId) throws ReplicationException, IOException
peerId
- the id of the replication peerReplicationException
IOException
private boolean removeRecoveredSource(ReplicationSourceInterface src)
src
- source to clearvoid finishRecoveredSource(ReplicationSourceInterface src)
void removeSource(ReplicationSourceInterface src)
src
- source to clearprivate void deleteQueue(ReplicationQueueId queueId)
queueId
- the id of replication queue to deleteprivate void interruptOrAbortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)
private void abortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException
IOException
private void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException
IOException
public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch)
source
- the replication sourceentryBatch
- the wal entry batch we just shippedvoid cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source)
log
- Path to the loginclusive
- whether we should also remove the given log filesource
- the replication sourceprivate void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) throws IOException
IOException
private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source)
public void postLogRoll(org.apache.hadoop.fs.Path newLog) throws IOException
IOException
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal)
wal
.wal
- the file name of the waltrue
means we should replicate the given wal
, otherwise false
.void claimQueue(ReplicationQueueId queueId)
private PriorityQueue<org.apache.hadoop.fs.Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, Map<String,ReplicationGroupOffset> offsets) throws IOException
IOException
private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, ReplicationQueueId claimedQueueId, PriorityQueue<org.apache.hadoop.fs.Path> walFiles)
void claimQueue(ReplicationQueueId queueId, boolean syncUp)
queueId
- the replication queue id we want to claimsyncUp
- whether we are called by ReplicationSyncUppublic void join()
public Map<ReplicationQueueId,Map<String,NavigableSet<String>>> getWALs()
public List<ReplicationSourceInterface> getSources()
public List<ReplicationSourceInterface> getOldSources()
public ReplicationSourceInterface getSource(String peerId)
int getSizeOfLatestPath()
Set<org.apache.hadoop.fs.Path> getLastestPath()
public long getTotalBufferUsed()
public long getTotalBufferLimit()
public org.apache.hadoop.fs.Path getOldLogDir()
public org.apache.hadoop.fs.Path getLogDir()
public org.apache.hadoop.fs.FileSystem getFs()
public ReplicationPeers getReplicationPeers()
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) throws IOException
IOException
public void cleanUpHFileRefs(String peerId, List<String> files)
int activeFailoverTaskCount()
MetricsReplicationGlobalSourceSource getGlobalMetrics()
ReplicationQueueStorage getQueueStorage()
boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, WAL.Entry entry)
WAL.Entry
which is added to WALEntryBatch
.entry
- the wal entry which is added to WALEntryBatch
and should acquire buffer
quota.long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch)
WALEntryBatch
which acquired by
acquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry)
.boolean acquireBufferQuota(long size)
totalBufferUsed
and check if it exceeds
totalBufferLimit
.totalBufferUsed
exceeds
totalBufferLimit
,we should stop increase buffer and
ship all.void releaseBufferQuota(long size)
acquireBufferQuota(long)
.private long addTotalBufferUsed(long size)
boolean checkBufferQuota(String peerId)
totalBufferUsed
exceeds
totalBufferLimit
for peer.totalBufferUsed
not more than
totalBufferLimit
.Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.