@InterfaceAudience.Private public class ReplicationSourceManager extends Object implements ReplicationListener
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
sources. sources is a ConcurrentHashMap and there
 is a Lock for peer id in PeerProcedureHandlerImpl. So there is no race for peer
 operations.walsById. There are four methods which modify it,
 addPeer(String), removePeer(String),
 cleanOldLogs(NavigableSet, String, boolean, String) and preLogRoll(Path).
 walsById is a ConcurrentHashMap and there is a Lock for peer id in
 PeerProcedureHandlerImpl. So there is no race between addPeer(String) and
 removePeer(String). cleanOldLogs(NavigableSet, String, boolean, String) is
 called by ReplicationSourceInterface. So no race with addPeer(String).
 removePeer(String) will terminate the ReplicationSourceInterface firstly, then
 remove the wals from walsById. So no race with removePeer(String). The only
 case need synchronized is cleanOldLogs(NavigableSet, String, boolean, String) and
 preLogRoll(Path).walsByIdRecoveredQueues. There are three methods which
 modify it, removePeer(String) ,
 cleanOldLogs(NavigableSet, String, boolean, String) and
 ReplicationSourceManager.NodeFailoverWorker.run().
 cleanOldLogs(NavigableSet, String, boolean, String) is called by
 ReplicationSourceInterface. removePeer(String) will terminate the
 ReplicationSourceInterface firstly, then remove the wals from
 walsByIdRecoveredQueues. And ReplicationSourceManager.NodeFailoverWorker.run()
 will add the wals to walsByIdRecoveredQueues firstly, then start up a
 ReplicationSourceInterface. So there is no race here. For
 ReplicationSourceManager.NodeFailoverWorker.run() and removePeer(String), there
 is already synchronized on oldsources. So no need synchronized on
 walsByIdRecoveredQueues.latestPaths to avoid the new open source miss new log.oldsources to avoid adding recovered source for the
 to-be-removed peer.| Modifier and Type | Class and Description | 
|---|---|
| (package private) class  | ReplicationSourceManager.NodeFailoverWorkerClass responsible to setup new ReplicationSources to take care of the queues from dead region
 servers. | 
| private static interface  | ReplicationSourceManager.ReplicationQueueOperation | 
| Modifier and Type | Field and Description | 
|---|---|
| private UUID | clusterId | 
| private org.apache.hadoop.conf.Configuration | conf | 
| private ThreadPoolExecutor | executor | 
| private org.apache.hadoop.fs.FileSystem | fs | 
| private Set<org.apache.hadoop.fs.Path> | latestPaths | 
| private static org.slf4j.Logger | LOG | 
| private org.apache.hadoop.fs.Path | logDir | 
| private org.apache.hadoop.fs.Path | oldLogDir | 
| private List<ReplicationSourceInterface> | oldsources | 
| private ReplicationQueueStorage | queueStorage | 
| private boolean | replicationForBulkLoadDataEnabled | 
| private ReplicationPeers | replicationPeers | 
| private ReplicationTracker | replicationTracker | 
| private Server | server | 
| private long | sleepBeforeFailover | 
| private ConcurrentMap<String,ReplicationSourceInterface> | sources | 
| private AtomicLong | totalBufferUsed | 
| private WALFileLengthProvider | walFileLengthProvider | 
| private ConcurrentMap<String,Map<String,NavigableSet<String>>> | walsById | 
| private ConcurrentMap<String,Map<String,NavigableSet<String>>> | walsByIdRecoveredQueues | 
| Constructor and Description | 
|---|
| ReplicationSourceManager(ReplicationQueueStorage queueStorage,
                        ReplicationPeers replicationPeers,
                        ReplicationTracker replicationTracker,
                        org.apache.hadoop.conf.Configuration conf,
                        Server server,
                        org.apache.hadoop.fs.FileSystem fs,
                        org.apache.hadoop.fs.Path logDir,
                        org.apache.hadoop.fs.Path oldLogDir,
                        UUID clusterId,
                        WALFileLengthProvider walFileLengthProvider)Creates a replication manager and sets the watch on all the other registered region servers | 
| Modifier and Type | Method and Description | 
|---|---|
| private void | abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) | 
| private void | abortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) | 
| (package private) int | activeFailoverTaskCount() | 
| void | addHFileRefs(TableName tableName,
            byte[] family,
            List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) | 
| void | addPeer(String peerId)1. | 
| (package private) ReplicationSourceInterface | addSource(String peerId)Add a normal source for the given peer on this region server. | 
| private void | adoptAbandonedQueues() | 
| private void | cleanOldLogs(NavigableSet<String> wals,
            String key,
            boolean inclusive,
            String id) | 
| (package private) void | cleanOldLogs(String log,
            boolean inclusive,
            String queueId,
            boolean queueRecovered)Cleans a log file and all older logs from replication queue. | 
| void | cleanUpHFileRefs(String peerId,
                List<String> files) | 
| private ReplicationSourceInterface | createSource(String queueId,
            ReplicationPeer replicationPeer)Factory method to create a replication source | 
| private void | deleteQueue(String queueId)Delete a complete queue of wals associated with a replication source | 
| (package private) List<String> | getAllQueues() | 
| org.apache.hadoop.fs.FileSystem | getFs()Get the handle on the local file system | 
| org.apache.hadoop.fs.Path | getLogDir()Get the directory where wals are stored by their RSs | 
| org.apache.hadoop.fs.Path | getOldLogDir()Get the directory where wals are archived | 
| List<ReplicationSourceInterface> | getOldSources()Get a list of all the recovered sources of this rs | 
| ReplicationPeers | getReplicationPeers()Get the ReplicationPeers used by this ReplicationSourceManager | 
| (package private) int | getSizeOfLatestPath() | 
| ReplicationSourceInterface | getSource(String peerId)Get the normal source for a given peer | 
| List<ReplicationSourceInterface> | getSources()Get a list of all the normal sources of this rs | 
| String | getStats()Get a string representation of all the sources' metrics | 
| AtomicLong | getTotalBufferUsed() | 
| Map<String,Map<String,NavigableSet<String>>> | getWALs()Get a copy of the wals of the normal sources on this rs | 
| (package private) Map<String,Map<String,NavigableSet<String>>> | getWalsByIdRecoveredQueues()Get a copy of the wals of the recovered sources on this rs | 
| (package private) Future<?> | init()Adds a normal source per registered peer cluster and tries to process all old region server wal
 queues | 
| private void | interruptOrAbortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)Refresh replication source will terminate the old source first, then the source thread will be
 interrupted. | 
| void | join()Terminate the replication on this region server | 
| void | logPositionAndCleanOldLogs(String queueId,
                          boolean queueRecovered,
                          WALEntryBatch entryBatch)This method will log the current position to storage. | 
| void | postLogRoll(org.apache.hadoop.fs.Path newLog) | 
| void | preLogRoll(org.apache.hadoop.fs.Path newLog) | 
| void | refreshSources(String peerId)Close the previous replication sources of this peer id and open new sources to trigger the new
 replication state changes or new replication config changes. | 
| void | regionServerRemoved(String regionserver)A region server has been removed from the local cluster | 
| void | removePeer(String peerId)1. | 
| (package private) void | removeRecoveredSource(ReplicationSourceInterface src)Clear the metrics and related replication queue of the specified old source | 
| (package private) void | removeSource(ReplicationSourceInterface src)Clear the metrics and related replication queue of the specified old source | 
| private void | throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) | 
| private void | transferQueues(ServerName deadRS)Transfer all the queues of the specified to this region server. | 
private static final org.slf4j.Logger LOG
private final ConcurrentMap<String,ReplicationSourceInterface> sources
private final List<ReplicationSourceInterface> oldsources
private final ReplicationQueueStorage queueStorage
private final ReplicationTracker replicationTracker
private final ReplicationPeers replicationPeers
private final ConcurrentMap<String,Map<String,NavigableSet<String>>> walsById
private final ConcurrentMap<String,Map<String,NavigableSet<String>>> walsByIdRecoveredQueues
private final org.apache.hadoop.conf.Configuration conf
private final org.apache.hadoop.fs.FileSystem fs
private final Set<org.apache.hadoop.fs.Path> latestPaths
private final org.apache.hadoop.fs.Path logDir
private final org.apache.hadoop.fs.Path oldLogDir
private final WALFileLengthProvider walFileLengthProvider
private final long sleepBeforeFailover
private final ThreadPoolExecutor executor
private final boolean replicationForBulkLoadDataEnabled
private AtomicLong totalBufferUsed
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException
queueStorage - the interface for manipulating replication queuesreplicationPeers - replicationTracker - conf - the configuration to useserver - the server for this region serverfs - the file system to uselogDir - the directory that contains all wal directories of live RSsoldLogDir - the directory where old logs are archivedclusterId - IOExceptionFuture<?> init() throws IOException
The returned future is for adoptAbandonedQueues task.
IOExceptionprivate void adoptAbandonedQueues()
public void addPeer(String peerId) throws IOException
peerId - the id of replication peerIOExceptionpublic void removePeer(String peerId)
peerId - the id of the replication peerprivate ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException
queueId - the id of the replication queueIOExceptionReplicationSourceInterface addSource(String peerId) throws IOException
peerId - the id of the replication peerIOExceptionpublic void refreshSources(String peerId) throws IOException
peerId - the id of the replication peerIOExceptionvoid removeRecoveredSource(ReplicationSourceInterface src)
src - source to clearvoid removeSource(ReplicationSourceInterface src)
src - source to clearprivate void deleteQueue(String queueId)
queueId - the id of replication queue to deleteprivate void interruptOrAbortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)
private void abortWhenFail(ReplicationSourceManager.ReplicationQueueOperation op)
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException
IOExceptionprivate void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException
IOExceptionpublic void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, WALEntryBatch entryBatch)
queueId - id of the replication queuequeueRecovered - indicates if this queue comes from another region serverentryBatch - the wal entry batch we just shippedvoid cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered)
log - Path to the loginclusive - whether we should also remove the given log filequeueId - id of the replication queuequeueRecovered - Whether this is a recovered queueprivate void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id)
public void preLogRoll(org.apache.hadoop.fs.Path newLog) throws IOException
IOExceptionpublic void postLogRoll(org.apache.hadoop.fs.Path newLog) throws IOException
IOExceptionpublic void regionServerRemoved(String regionserver)
ReplicationListenerregionServerRemoved in interface ReplicationListenerregionserver - the removed region serverprivate void transferQueues(ServerName deadRS)
It creates one old source for any type of source of the old rs.
public void join()
public Map<String,Map<String,NavigableSet<String>>> getWALs()
Map<String,Map<String,NavigableSet<String>>> getWalsByIdRecoveredQueues()
public List<ReplicationSourceInterface> getSources()
public List<ReplicationSourceInterface> getOldSources()
public ReplicationSourceInterface getSource(String peerId)
List<String> getAllQueues() throws IOException
IOExceptionint getSizeOfLatestPath()
public AtomicLong getTotalBufferUsed()
public org.apache.hadoop.fs.Path getOldLogDir()
public org.apache.hadoop.fs.Path getLogDir()
public org.apache.hadoop.fs.FileSystem getFs()
public ReplicationPeers getReplicationPeers()
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) throws IOException
IOExceptionpublic void cleanUpHFileRefs(String peerId, List<String> files)
int activeFailoverTaskCount()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.