@InterfaceAudience.Private public class ReplicationSource extends Object implements ReplicationSourceInterface
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
Constructor and Description |
---|
ReplicationSource() |
ReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL,
List<WALEntryFilter> baseFilterOutWALEntries) |
Modifier and Type | Method and Description |
---|---|
void |
addHFileRefs(TableName tableName,
byte[] family,
List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs)
Add hfile names to the queue to be replicated.
|
private void |
checkBandwidthChangeAndResetThrottler() |
protected ReplicationSourceShipper |
createNewShipper(String walGroupId) |
private ReplicationSourceWALReader |
createNewWALReader(String walGroupId,
long startPosition) |
private ReplicationEndpoint |
createReplicationEndpoint() |
private void |
decorateConf() |
void |
enqueueLog(org.apache.hadoop.fs.Path wal)
Add a log to the list of logs to replicate
|
private long |
getCurrentBandwidth() |
org.apache.hadoop.fs.Path |
getCurrentPath()
Get the current log that's replicated
|
private long |
getFileSize(org.apache.hadoop.fs.Path currentPath) |
String |
getPeerId()
Get the id that the source is replicating to.
|
String |
getQueueId()
Get the queue id that the source is replicating to
|
Map<String,PriorityBlockingQueue<org.apache.hadoop.fs.Path>> |
getQueues() |
ReplicationEndpoint |
getReplicationEndpoint()
Returns the replication endpoint used by this replication source
|
ReplicationQueueInfo |
getReplicationQueueInfo() |
ReplicationQueueStorage |
getReplicationQueueStorage()
Returns The instance of queueStorage used by this ReplicationSource.
|
(package private) Server |
getServer() |
ServerName |
getServerWALsBelongTo()
The queue of WALs only belong to one region server.
|
ReplicationSourceManager |
getSourceManager()
Returns the replication source manager
|
MetricsSource |
getSourceMetrics()
Returns metrics of this replication source
|
String |
getStats()
Get a string representation of the current statistics for this source
|
(package private) WALEntryFilter |
getWalEntryFilter()
Call after
initializeWALEntryFilter(UUID) else it will be null. |
WALFileLengthProvider |
getWALFileLengthProvider()
Returns the wal file length provider
|
Map<String,ReplicationStatus> |
getWalGroupStatus()
get the stat of replication for each wal group.
|
void |
init(org.apache.hadoop.conf.Configuration conf,
org.apache.hadoop.fs.FileSystem fs,
ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage,
ReplicationPeer replicationPeer,
Server server,
String queueId,
UUID clusterId,
WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics)
Instantiation method used by region servers
|
private void |
initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) |
private void |
initialize() |
private void |
initializeWALEntryFilter(UUID peerClusterId) |
boolean |
isPeerEnabled()
check whether the peer is enabled or not
|
boolean |
isSourceActive()
Returns active or not
|
boolean |
isWorkerRunning() |
String |
logPeerId()
Returns String to use as a log prefix that contains current peerId.
|
void |
postShipEdits(List<WAL.Entry> entries,
int batchSize)
Call this after the shipper thread ship some entries to peer cluster.
|
private void |
setSourceStartupStatus(boolean initializing) |
protected boolean |
sleepForRetries(String msg,
int sleepMultiplier)
Do the sleeping logic
|
ReplicationSourceInterface |
startup()
Start the replication
|
void |
terminate(String reason)
End the replication
|
void |
terminate(String reason,
Exception cause)
End the replication
|
void |
terminate(String reason,
Exception cause,
boolean clearMetrics)
End the replication
|
void |
terminate(String reason,
Exception cause,
boolean clearMetrics,
boolean join) |
private void |
tryStartNewShipper(String walGroupId) |
void |
tryThrottle(int batchSize)
Try to throttle when the peer config with a bandwidth
|
protected void |
uncaughtException(Thread t,
Throwable e,
ReplicationSourceManager manager,
String peerId) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
isRecovered, logPositionAndCleanOldLogs
private static final org.slf4j.Logger LOG
protected int queueSizePerGroup
protected ReplicationSourceLogQueue logQueue
protected ReplicationQueueStorage queueStorage
protected ReplicationPeer replicationPeer
protected org.apache.hadoop.conf.Configuration conf
protected ReplicationQueueInfo replicationQueueInfo
protected ReplicationSourceManager manager
private long sleepForRetries
protected org.apache.hadoop.fs.FileSystem fs
private AtomicLong totalReplicatedEdits
private int maxRetriesMultiplier
volatile boolean sourceRunning
private MetricsSource metrics
private volatile ReplicationEndpoint replicationEndpoint
private boolean abortOnError
private AtomicBoolean startupOngoing
private AtomicBoolean retryStartup
protected volatile WALEntryFilter walEntryFilter
private ReplicationThrottler throttler
private long defaultBandwidth
private long currentBandwidth
private WALFileLengthProvider walFileLengthProvider
protected final ConcurrentHashMap<String,ReplicationSourceShipper> workerThreads
private AtomicLong totalBufferUsed
public static final String WAIT_ON_ENDPOINT_SECONDS
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS
private int waitOnEndpointSeconds
private Thread initThread
private final Predicate<org.apache.hadoop.fs.Path> filterInWALs
private final List<WALEntryFilter> baseFilterOutWALEntries
walEntryFilter
.WALEntryFilter
ReplicationSource()
ReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries)
replicateWAL
- Pass a filter to run against WAL Path; filter *in* WALs to
Replicate; i.e. return 'true' if you want to replicate the
content of the WAL.baseFilterOutWALEntries
- Base set of filters you want applied always; filters *out*
WALEntries so they never make it out of this ReplicationSource.public void init(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException
init
in interface ReplicationSourceInterface
conf
- configuration to usefs
- file system to usemanager
- replication manager to ping toserver
- the server for this region serverqueueId
- the id of our replication queueclusterId
- unique UUID for the clustermetrics
- metrics for replication sourceIOException
private void decorateConf()
public void enqueueLog(org.apache.hadoop.fs.Path wal)
ReplicationSourceInterface
enqueueLog
in interface ReplicationSourceInterface
wal
- path to the log to replicate@InterfaceAudience.Private public Map<String,PriorityBlockingQueue<org.apache.hadoop.fs.Path>> getQueues()
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) throws ReplicationException
ReplicationSourceInterface
addHFileRefs
in interface ReplicationSourceInterface
tableName
- Name of the table these files belongs tofamily
- Name of the family these files belong topairs
- list of pairs of { HFile location in staging dir, HFile path in region dir
which will be added in the queue for replication}ReplicationException
- If failed to add hfile referencesprivate ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException
IOException
TimeoutException
private void initializeWALEntryFilter(UUID peerClusterId)
private void tryStartNewShipper(String walGroupId)
public Map<String,ReplicationStatus> getWalGroupStatus()
ReplicationSourceInterface
getWalGroupStatus
in interface ReplicationSourceInterface
private long getFileSize(org.apache.hadoop.fs.Path currentPath) throws IOException
IOException
protected ReplicationSourceShipper createNewShipper(String walGroupId)
private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition)
WALEntryFilter getWalEntryFilter()
initializeWALEntryFilter(UUID)
else it will be null.protected final void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager, String peerId)
public ReplicationEndpoint getReplicationEndpoint()
ReplicationSourceInterface
getReplicationEndpoint
in interface ReplicationSourceInterface
public ReplicationSourceManager getSourceManager()
ReplicationSourceInterface
getSourceManager
in interface ReplicationSourceInterface
public void tryThrottle(int batchSize) throws InterruptedException
ReplicationSourceInterface
tryThrottle
in interface ReplicationSourceInterface
batchSize
- entries size will be pushedInterruptedException
private void checkBandwidthChangeAndResetThrottler()
private long getCurrentBandwidth()
protected boolean sleepForRetries(String msg, int sleepMultiplier)
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmentedsleepMultiplier
is < maxRetriesMultiplier
public boolean isPeerEnabled()
isPeerEnabled
in interface ReplicationSourceInterface
private void initialize()
private void setSourceStartupStatus(boolean initializing)
public ReplicationSourceInterface startup()
ReplicationSourceInterface
startup
in interface ReplicationSourceInterface
public void terminate(String reason)
ReplicationSourceInterface
terminate
in interface ReplicationSourceInterface
reason
- why it's terminatingpublic void terminate(String reason, Exception cause)
ReplicationSourceInterface
terminate
in interface ReplicationSourceInterface
reason
- why it's terminatingcause
- the error that's causing itpublic void terminate(String reason, Exception cause, boolean clearMetrics)
ReplicationSourceInterface
terminate
in interface ReplicationSourceInterface
reason
- why it's terminatingcause
- the error that's causing itclearMetrics
- removes all metrics about this Sourcepublic String getQueueId()
ReplicationSourceInterface
getQueueId
in interface ReplicationSourceInterface
public String getPeerId()
ReplicationSourceInterface
getPeerId
in interface ReplicationSourceInterface
public org.apache.hadoop.fs.Path getCurrentPath()
ReplicationSourceInterface
getCurrentPath
in interface ReplicationSourceInterface
public boolean isSourceActive()
ReplicationSourceInterface
isSourceActive
in interface ReplicationSourceInterface
public ReplicationQueueInfo getReplicationQueueInfo()
public boolean isWorkerRunning()
public String getStats()
ReplicationSourceInterface
getStats
in interface ReplicationSourceInterface
public MetricsSource getSourceMetrics()
ReplicationSourceInterface
getSourceMetrics
in interface ReplicationSourceInterface
public void postShipEdits(List<WAL.Entry> entries, int batchSize)
ReplicationSourceInterface
postShipEdits
in interface ReplicationSourceInterface
entries
- pushedbatchSize
- entries size pushedpublic WALFileLengthProvider getWALFileLengthProvider()
ReplicationSourceInterface
getWALFileLengthProvider
in interface ReplicationSourceInterface
public ServerName getServerWALsBelongTo()
ReplicationSourceInterface
getServerWALsBelongTo
in interface ReplicationSourceInterface
public ReplicationQueueStorage getReplicationQueueStorage()
ReplicationSourceInterface
getReplicationQueueStorage
in interface ReplicationSourceInterface
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.