@InterfaceAudience.Private public class ReplicationSource extends Thread implements ReplicationSourceInterface
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
Modifier and Type | Class and Description |
---|---|
static class |
ReplicationSource.LogsComparator
Comparator used to compare logs together based on their start time
|
Thread.State, Thread.UncaughtExceptionHandler
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
Constructor and Description |
---|
ReplicationSource() |
Modifier and Type | Method and Description |
---|---|
void |
addHFileRefs(TableName tableName,
byte[] family,
List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs)
Add hfile names to the queue to be replicated.
|
private void |
checkBandwidthChangeAndResetThrottler() |
private void |
decorateConf() |
void |
enqueueLog(org.apache.hadoop.fs.Path log)
Add a log to the list of logs to replicate
|
private long |
getCurrentBandwidth() |
org.apache.hadoop.fs.Path |
getCurrentPath()
Get the current log that's replicated
|
String |
getPeerClusterZnode()
Get the id that the source is replicating to
|
String |
getPeerId()
Get the id that the source is replicating to.
|
ReplicationEndpoint |
getReplicationEndpoint() |
ServerName |
getServerWALsBelongTo()
The queue of WALs only belong to one region server.
|
ReplicationSourceManager |
getSourceManager() |
MetricsSource |
getSourceMetrics() |
String |
getStats()
Get a string representation of the current statistics
for this source
|
Thread.UncaughtExceptionHandler |
getUncaughtExceptionHandler() |
WALFileLengthProvider |
getWALFileLengthProvider() |
void |
init(org.apache.hadoop.conf.Configuration conf,
org.apache.hadoop.fs.FileSystem fs,
ReplicationSourceManager manager,
ReplicationQueues replicationQueues,
ReplicationPeers replicationPeers,
Server server,
String peerClusterZnode,
UUID clusterId,
ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics)
Instantiation method used by region servers
|
private void |
initializeWALEntryFilter() |
boolean |
isPeerEnabled()
check whether the peer is enabled or not
|
boolean |
isSourceActive() |
void |
postShipEdits(List<WAL.Entry> entries,
int batchSize)
Call this after the shipper thread ship some entries to peer cluster.
|
void |
run() |
protected boolean |
sleepForRetries(String msg,
int sleepMultiplier)
Do the sleeping logic
|
protected ReplicationSourceWALReader |
startNewWALReader(String threadName,
String walGroupId,
PriorityBlockingQueue<org.apache.hadoop.fs.Path> queue,
long startPosition) |
void |
startup()
Start the replication
|
void |
terminate(String reason)
End the replication
|
void |
terminate(String reason,
Exception cause)
End the replication
|
void |
terminate(String reason,
Exception cause,
boolean join) |
protected void |
tryStartNewShipper(String walGroupId,
PriorityBlockingQueue<org.apache.hadoop.fs.Path> queue) |
void |
tryThrottle(int batchSize)
Try to throttle when the peer config with a bandwidth
|
private void |
uninitialize() |
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
private static final org.slf4j.Logger LOG
private Map<String,PriorityBlockingQueue<org.apache.hadoop.fs.Path>> queues
protected int queueSizePerGroup
protected ReplicationQueues replicationQueues
private ReplicationPeers replicationPeers
protected org.apache.hadoop.conf.Configuration conf
protected ReplicationQueueInfo replicationQueueInfo
protected ReplicationSourceManager manager
private long sleepForRetries
protected org.apache.hadoop.fs.FileSystem fs
private UUID peerClusterId
private AtomicLong totalReplicatedEdits
protected String peerClusterZnode
private int maxRetriesMultiplier
private volatile boolean sourceRunning
private MetricsSource metrics
private int logQueueWarnThreshold
private ReplicationEndpoint replicationEndpoint
protected WALEntryFilter walEntryFilter
private ReplicationThrottler throttler
private long defaultBandwidth
private long currentBandwidth
private WALFileLengthProvider walFileLengthProvider
protected final ConcurrentHashMap<String,ReplicationSourceShipper> workerThreads
private AtomicLong totalBufferUsed
public static final String WAIT_ON_ENDPOINT_SECONDS
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS
private int waitOnEndpointSeconds
public ReplicationSource()
public void init(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException
init
in interface ReplicationSourceInterface
conf
- configuration to usefs
- file system to usemanager
- replication manager to ping toserver
- the server for this region serverpeerClusterZnode
- the name of our znodeclusterId
- unique UUID for the clusterreplicationEndpoint
- the replication endpoint implementationmetrics
- metrics for replication sourceIOException
private void decorateConf()
public void enqueueLog(org.apache.hadoop.fs.Path log)
ReplicationSourceInterface
enqueueLog
in interface ReplicationSourceInterface
log
- path to the log to replicatepublic void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path,org.apache.hadoop.fs.Path>> pairs) throws ReplicationException
ReplicationSourceInterface
addHFileRefs
in interface ReplicationSourceInterface
tableName
- Name of the table these files belongs tofamily
- Name of the family these files belong topairs
- list of pairs of { HFile location in staging dir, HFile path in region dir which
will be added in the queue for replication}ReplicationException
- If failed to add hfile referencesprivate void initializeWALEntryFilter()
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<org.apache.hadoop.fs.Path> queue)
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, PriorityBlockingQueue<org.apache.hadoop.fs.Path> queue, long startPosition)
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()
getUncaughtExceptionHandler
in class Thread
public ReplicationEndpoint getReplicationEndpoint()
getReplicationEndpoint
in interface ReplicationSourceInterface
public ReplicationSourceManager getSourceManager()
getSourceManager
in interface ReplicationSourceInterface
public void tryThrottle(int batchSize) throws InterruptedException
ReplicationSourceInterface
tryThrottle
in interface ReplicationSourceInterface
batchSize
- entries size will be pushedInterruptedException
private void checkBandwidthChangeAndResetThrottler()
private long getCurrentBandwidth()
private void uninitialize()
protected boolean sleepForRetries(String msg, int sleepMultiplier)
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmentedsleepMultiplier
is < maxRetriesMultiplier
public boolean isPeerEnabled()
isPeerEnabled
in interface ReplicationSourceInterface
public void startup()
ReplicationSourceInterface
startup
in interface ReplicationSourceInterface
public void terminate(String reason)
ReplicationSourceInterface
terminate
in interface ReplicationSourceInterface
reason
- why it's terminatingpublic void terminate(String reason, Exception cause)
ReplicationSourceInterface
terminate
in interface ReplicationSourceInterface
reason
- why it's terminatingcause
- the error that's causing itpublic String getPeerClusterZnode()
ReplicationSourceInterface
getPeerClusterZnode
in interface ReplicationSourceInterface
public String getPeerId()
ReplicationSourceInterface
getPeerId
in interface ReplicationSourceInterface
public org.apache.hadoop.fs.Path getCurrentPath()
ReplicationSourceInterface
getCurrentPath
in interface ReplicationSourceInterface
public boolean isSourceActive()
isSourceActive
in interface ReplicationSourceInterface
public String getStats()
ReplicationSourceInterface
getStats
in interface ReplicationSourceInterface
public MetricsSource getSourceMetrics()
getSourceMetrics
in interface ReplicationSourceInterface
public void postShipEdits(List<WAL.Entry> entries, int batchSize)
ReplicationSourceInterface
postShipEdits
in interface ReplicationSourceInterface
entries
- pushedbatchSize
- entries size pushedpublic WALFileLengthProvider getWALFileLengthProvider()
getWALFileLengthProvider
in interface ReplicationSourceInterface
public ServerName getServerWALsBelongTo()
ReplicationSourceInterface
getServerWALsBelongTo
in interface ReplicationSourceInterface
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.