Class ReplicationSource
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSource
- All Implemented Interfaces:
ReplicationSourceInterface
- Direct Known Subclasses:
RecoveredReplicationSource
Class that handles the source of a replication stream. Currently does not handle more than 1
slave cluster. For each slave cluster it selects a random number of peers using a replication
ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
be selected.
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
-
Field Summary
Modifier and TypeFieldDescriptionprivate boolean
private final List<WALEntryFilter>
Base WALEntry filters for this class.private UUID
protected org.apache.hadoop.conf.Configuration
private long
static final int
private long
private final Predicate<org.apache.hadoop.fs.Path>
WALs to replicate.protected org.apache.hadoop.fs.FileSystem
private Thread
private static final org.slf4j.Logger
protected ReplicationSourceLogQueue
protected ReplicationSourceManager
private int
private MetricsSource
protected ReplicationQueueId
protected int
protected ReplicationQueueStorage
private ReplicationEndpoint
protected ReplicationPeer
private AtomicBoolean
protected Server
private long
(package private) boolean
protected org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap<String,
ReplicationGroupOffset> private AtomicBoolean
private ReplicationThrottler
private AtomicLong
static final String
private int
protected WALEntryFilter
A filter (or a chain of filters) for WAL entries; filters out edits.private WALFileLengthProvider
protected final ConcurrentHashMap<String,
ReplicationSourceShipper> -
Constructor Summary
ConstructorDescriptionReplicationSource
(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addHFileRefs
(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) Add hfile names to the queue to be replicated.private void
private void
checkError
(Thread t, Throwable error) protected final ReplicationSourceShipper
createNewShipper
(String walGroupId) protected ReplicationSourceShipper
createNewShipper
(String walGroupId, ReplicationSourceWALReader walReader) private ReplicationSourceWALReader
createNewWALReader
(String walGroupId, long startPosition) private ReplicationEndpoint
private void
void
enqueueLog
(org.apache.hadoop.fs.Path wal) Add a log to the list of logs to replicateprivate long
org.apache.hadoop.fs.Path
Get the current log that's replicatedprivate long
getFileSize
(org.apache.hadoop.fs.Path currentPath) getPeer()
Get the replication peer instance.Get the queue id that the source is replicating toMap<String,
PriorityBlockingQueue<org.apache.hadoop.fs.Path>> Returns the replication endpoint used by this replication sourceReturns The instance of queueStorage used by this ReplicationSource.(package private) Server
The queue of WALs only belong to one region server.Returns the replication source managerReturns metrics of this replication sourceprivate long
getStartOffset
(String walGroupId) getStats()
Get a string representation of the current statistics for this sourcelong
(package private) WALEntryFilter
Call afterinitializeWALEntryFilter(UUID)
else it will be null.Returns the wal file length providerget the stat of replication for each wal group.void
init
(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) Instantiation method used by region serversprivate void
initAndStartReplicationEndpoint
(ReplicationEndpoint replicationEndpoint) private void
private void
initializeWALEntryFilter
(UUID peerClusterId) boolean
Returns active or notboolean
void
postShipEdits
(List<WAL.Entry> entries, long batchSize) Call this after the shipper thread ship some entries to peer cluster.(package private) void
removeWorker
(ReplicationSourceShipper worker) private void
retryRefreshing
(Thread t, Throwable error) private void
setSourceStartupStatus
(boolean initializing) private boolean
sleepForRetries
(String msg, int sleepMultiplier) Do the sleeping logicprotected final void
startShipper
(ReplicationSourceShipper worker) protected void
startup()
Start the replicationvoid
End the replicationvoid
End the replicationvoid
End the replicationprivate void
private void
tryStartNewShipper
(String walGroupId) void
tryThrottle
(int batchSize) Try to throttle when the peer config with a bandwidthMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
getPeerId, isPeerEnabled, isRecovered, isSyncReplication, logPositionAndCleanOldLogs
-
Field Details
-
LOG
-
queueSizePerGroup
-
logQueue
-
queueStorage
-
replicationPeer
-
conf
-
manager
-
server
-
sleepForRetries
-
fs
-
clusterId
-
totalReplicatedEdits
-
queueId
-
startOffsets
protected org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap<String,ReplicationGroupOffset> startOffsets -
maxRetriesMultiplier
-
sourceRunning
-
metrics
-
replicationEndpoint
-
abortOnError
-
startupOngoing
-
retryStartup
-
walEntryFilter
A filter (or a chain of filters) for WAL entries; filters out edits. -
throttler
-
defaultBandwidth
-
currentBandwidth
-
walFileLengthProvider
-
workerThreads
-
WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
DEFAULT_WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
waitOnEndpointSeconds
-
initThread
-
filterInWALs
WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to skip. -
baseFilterOutWALEntries
Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we do not want replicated, passed on to replication endpoints. This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are put after those that we pick up from the configured endpoints and other machinations to create the finalwalEntryFilter
.- See Also:
-
-
Constructor Details
-
ReplicationSource
-
ReplicationSource
ReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) - Parameters:
replicateWAL
- Pass a filter to run against WAL Path; filter *in* WALs to Replicate; i.e. return 'true' if you want to replicate the content of the WAL.baseFilterOutWALEntries
- Base set of filters you want applied always; filters *out* WALEntries so they never make it out of this ReplicationSource.
-
-
Method Details
-
init
public void init(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException Instantiation method used by region servers- Specified by:
init
in interfaceReplicationSourceInterface
- Parameters:
conf
- configuration to usefs
- file system to usemanager
- replication manager to ping toserver
- the server for this region serverqueueData
- the id and offsets of our replication queueclusterId
- unique UUID for the clustermetrics
- metrics for replication sourcequeueStorage
- the replication queue storagereplicationPeer
- the replication peerwalFileLengthProvider
- for getting the length of the WAL file which is currently being written- Throws:
IOException
-
decorateConf
-
enqueueLog
Description copied from interface:ReplicationSourceInterface
Add a log to the list of logs to replicate- Specified by:
enqueueLog
in interfaceReplicationSourceInterface
- Parameters:
wal
- path to the log to replicate
-
getQueues
-
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws ReplicationExceptionDescription copied from interface:ReplicationSourceInterface
Add hfile names to the queue to be replicated.- Specified by:
addHFileRefs
in interfaceReplicationSourceInterface
- Parameters:
tableName
- Name of the table these files belongs tofamily
- Name of the family these files belong topairs
- list of pairs of { HFile location in staging dir, HFile path in region dir which will be added in the queue for replication}- Throws:
ReplicationException
- If failed to add hfile references
-
createReplicationEndpoint
-
initAndStartReplicationEndpoint
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException - Throws:
IOException
TimeoutException
-
initializeWALEntryFilter
-
getStartOffset
-
createNewShipper
-
startShipper
-
tryStartNewShipper
-
getWalGroupStatus
Description copied from interface:ReplicationSourceInterface
get the stat of replication for each wal group.- Specified by:
getWalGroupStatus
in interfaceReplicationSourceInterface
- Returns:
- stat of replication
-
getFileSize
- Throws:
IOException
-
createNewShipper
protected ReplicationSourceShipper createNewShipper(String walGroupId, ReplicationSourceWALReader walReader) -
createNewWALReader
-
getWalEntryFilter
Call afterinitializeWALEntryFilter(UUID)
else it will be null.- Returns:
- WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
-
checkError
-
retryRefreshing
-
getReplicationEndpoint
Description copied from interface:ReplicationSourceInterface
Returns the replication endpoint used by this replication source- Specified by:
getReplicationEndpoint
in interfaceReplicationSourceInterface
-
getSourceManager
Description copied from interface:ReplicationSourceInterface
Returns the replication source manager- Specified by:
getSourceManager
in interfaceReplicationSourceInterface
-
tryThrottle
Description copied from interface:ReplicationSourceInterface
Try to throttle when the peer config with a bandwidth- Specified by:
tryThrottle
in interfaceReplicationSourceInterface
- Parameters:
batchSize
- entries size will be pushed- Throws:
InterruptedException
-
checkBandwidthChangeAndResetThrottler
-
getCurrentBandwidth
-
sleepForRetries
Do the sleeping logic- Parameters:
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmented- Returns:
- True if
sleepMultiplier
is <maxRetriesMultiplier
-
initialize
-
startShippers
-
setSourceStartupStatus
-
startup
Description copied from interface:ReplicationSourceInterface
Start the replication- Specified by:
startup
in interfaceReplicationSourceInterface
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminating
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminatingcause
- the error that's causing it
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminatingcause
- the error that's causing itclearMetrics
- removes all metrics about this Source
-
terminate
-
getQueueId
Description copied from interface:ReplicationSourceInterface
Get the queue id that the source is replicating to- Specified by:
getQueueId
in interfaceReplicationSourceInterface
- Returns:
- queue id
-
getCurrentPath
Description copied from interface:ReplicationSourceInterface
Get the current log that's replicated- Specified by:
getCurrentPath
in interfaceReplicationSourceInterface
- Returns:
- the current log
-
isSourceActive
Description copied from interface:ReplicationSourceInterface
Returns active or not- Specified by:
isSourceActive
in interfaceReplicationSourceInterface
-
isWorkerRunning
-
getStats
Description copied from interface:ReplicationSourceInterface
Get a string representation of the current statistics for this source- Specified by:
getStats
in interfaceReplicationSourceInterface
- Returns:
- printable stats
-
getSourceMetrics
Description copied from interface:ReplicationSourceInterface
Returns metrics of this replication source- Specified by:
getSourceMetrics
in interfaceReplicationSourceInterface
-
postShipEdits
Description copied from interface:ReplicationSourceInterface
Call this after the shipper thread ship some entries to peer cluster.- Specified by:
postShipEdits
in interfaceReplicationSourceInterface
- Parameters:
entries
- pushedbatchSize
- entries size pushed
-
getWALFileLengthProvider
Description copied from interface:ReplicationSourceInterface
Returns the wal file length provider- Specified by:
getWALFileLengthProvider
in interfaceReplicationSourceInterface
-
getServerWALsBelongTo
Description copied from interface:ReplicationSourceInterface
The queue of WALs only belong to one region server. This will return the server name which all WALs belong to.- Specified by:
getServerWALsBelongTo
in interfaceReplicationSourceInterface
- Returns:
- the server name which all WALs belong to
-
getPeer
Description copied from interface:ReplicationSourceInterface
Get the replication peer instance.- Specified by:
getPeer
in interfaceReplicationSourceInterface
- Returns:
- the replication peer instance
-
getServer
-
getReplicationQueueStorage
Description copied from interface:ReplicationSourceInterface
Returns The instance of queueStorage used by this ReplicationSource.- Specified by:
getReplicationQueueStorage
in interfaceReplicationSourceInterface
-
removeWorker
-
logPeerId
-
getTotalReplicatedEdits
-