Class ReplicationSource
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSource
- All Implemented Interfaces:
ReplicationSourceInterface
- Direct Known Subclasses:
CatalogReplicationSource,RecoveredReplicationSource
Class that handles the source of a replication stream. Currently does not handle more than 1
slave cluster. For each slave cluster it selects a random number of peers using a replication
ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
be selected.
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate booleanprivate final List<WALEntryFilter>Base WALEntry filters for this class.private UUIDprotected org.apache.hadoop.conf.Configurationprivate longstatic final intprivate longprivate final Predicate<org.apache.hadoop.fs.Path>WALs to replicate.protected org.apache.hadoop.fs.FileSystemprivate Threadprivate static final org.slf4j.Loggerprotected ReplicationSourceLogQueueprotected ReplicationSourceManagerprivate intprivate MetricsSourceprivate Stringprotected Stringprotected intprotected ReplicationQueueStorageprivate ReplicationEndpointprotected ReplicationPeerprotected ReplicationQueueInfoprivate AtomicBooleanprotected Serverprivate long(package private) booleanprivate AtomicBooleanprivate ReplicationThrottlerprivate AtomicLongstatic final Stringprivate intprotected WALEntryFilterA filter (or a chain of filters) for WAL entries; filters out edits.private WALFileLengthProviderprotected final ConcurrentHashMap<String,ReplicationSourceShipper> -
Constructor Summary
ConstructorsConstructorDescriptionReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) Add hfile names to the queue to be replicated.private voidprivate voidcheckError(Thread t, Throwable error) protected ReplicationSourceShippercreateNewShipper(String walGroupId) private ReplicationSourceWALReadercreateNewWALReader(String walGroupId, long startPosition) private ReplicationEndpointprivate voidvoidenqueueLog(org.apache.hadoop.fs.Path wal) Add a log to the list of logs to replicateprivate longorg.apache.hadoop.fs.PathGet the current log that's replicatedprivate longgetFileSize(org.apache.hadoop.fs.Path currentPath) Get the id that the source is replicating to.Get the queue id that the source is replicating toMap<String,PriorityBlockingQueue<org.apache.hadoop.fs.Path>> Returns the replication endpoint used by this replication sourceReturns The instance of queueStorage used by this ReplicationSource.(package private) ServerThe queue of WALs only belong to one region server.Returns the replication source managerReturns metrics of this replication sourcegetStats()Get a string representation of the current statistics for this sourcelong(package private) WALEntryFilterCall afterinitializeWALEntryFilter(UUID)else it will be null.Returns the wal file length providerget the stat of replication for each wal group.voidinit(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) Instantiation method used by region serversprivate voidinitAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) private voidprivate voidinitializeWALEntryFilter(UUID peerClusterId) booleancheck whether the peer is enabled or notbooleanReturns active or notbooleanReturns String to use as a log prefix that contains current peerId.voidpostShipEdits(List<WAL.Entry> entries, long batchSize) Call this after the shipper thread ship some entries to peer cluster.private voidretryRefreshing(Thread t, Throwable error) private voidsetSourceStartupStatus(boolean initializing) protected booleansleepForRetries(String msg, int sleepMultiplier) Do the sleeping logicstartup()Start the replicationvoidEnd the replicationvoidEnd the replicationvoidEnd the replicationvoidprivate voidtryStartNewShipper(String walGroupId) voidtryThrottle(int batchSize) Try to throttle when the peer config with a bandwidthMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
isRecovered, logPositionAndCleanOldLogs
-
Field Details
-
LOG
-
queueSizePerGroup
-
logQueue
-
queueStorage
-
replicationPeer
-
conf
-
replicationQueueInfo
-
peerId
-
manager
-
server
-
sleepForRetries
-
fs
-
clusterId
-
totalReplicatedEdits
-
queueId
-
maxRetriesMultiplier
-
sourceRunning
-
metrics
-
replicationEndpoint
-
abortOnError
-
startupOngoing
-
retryStartup
-
walEntryFilter
A filter (or a chain of filters) for WAL entries; filters out edits. -
throttler
-
defaultBandwidth
-
currentBandwidth
-
walFileLengthProvider
-
workerThreads
-
WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
DEFAULT_WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
waitOnEndpointSeconds
-
initThread
-
filterInWALs
WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to skip. -
baseFilterOutWALEntries
Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we do not want replicated, passed on to replication endpoints. This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are put after those that we pick up from the configured endpoints and other machinations to create the finalwalEntryFilter.- See Also:
-
-
Constructor Details
-
ReplicationSource
-
ReplicationSource
ReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) - Parameters:
replicateWAL- Pass a filter to run against WAL Path; filter *in* WALs to Replicate; i.e. return 'true' if you want to replicate the content of the WAL.baseFilterOutWALEntries- Base set of filters you want applied always; filters *out* WALEntries so they never make it out of this ReplicationSource.
-
-
Method Details
-
init
public void init(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException Instantiation method used by region servers- Specified by:
initin interfaceReplicationSourceInterface- Parameters:
conf- configuration to usefs- file system to usemanager- replication manager to ping toserver- the server for this region serverqueueId- the id of our replication queueclusterId- unique UUID for the clustermetrics- metrics for replication source- Throws:
IOException
-
decorateConf
-
enqueueLog
Description copied from interface:ReplicationSourceInterfaceAdd a log to the list of logs to replicate- Specified by:
enqueueLogin interfaceReplicationSourceInterface- Parameters:
wal- path to the log to replicate
-
getQueues
-
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws ReplicationExceptionDescription copied from interface:ReplicationSourceInterfaceAdd hfile names to the queue to be replicated.- Specified by:
addHFileRefsin interfaceReplicationSourceInterface- Parameters:
tableName- Name of the table these files belongs tofamily- Name of the family these files belong topairs- list of pairs of { HFile location in staging dir, HFile path in region dir which will be added in the queue for replication}- Throws:
ReplicationException- If failed to add hfile references
-
createReplicationEndpoint
-
initAndStartReplicationEndpoint
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException - Throws:
IOExceptionTimeoutException
-
initializeWALEntryFilter
-
tryStartNewShipper
-
getWalGroupStatus
Description copied from interface:ReplicationSourceInterfaceget the stat of replication for each wal group.- Specified by:
getWalGroupStatusin interfaceReplicationSourceInterface- Returns:
- stat of replication
-
getFileSize
- Throws:
IOException
-
createNewShipper
-
createNewWALReader
-
getWalEntryFilter
Call afterinitializeWALEntryFilter(UUID)else it will be null.- Returns:
- WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
-
checkError
-
retryRefreshing
-
getReplicationEndpoint
Description copied from interface:ReplicationSourceInterfaceReturns the replication endpoint used by this replication source- Specified by:
getReplicationEndpointin interfaceReplicationSourceInterface
-
getSourceManager
Description copied from interface:ReplicationSourceInterfaceReturns the replication source manager- Specified by:
getSourceManagerin interfaceReplicationSourceInterface
-
tryThrottle
Description copied from interface:ReplicationSourceInterfaceTry to throttle when the peer config with a bandwidth- Specified by:
tryThrottlein interfaceReplicationSourceInterface- Parameters:
batchSize- entries size will be pushed- Throws:
InterruptedException
-
checkBandwidthChangeAndResetThrottler
-
getCurrentBandwidth
-
sleepForRetries
Do the sleeping logic- Parameters:
msg- Why we sleepsleepMultiplier- by how many times the default sleeping time is augmented- Returns:
- True if
sleepMultiplieris <maxRetriesMultiplier
-
isPeerEnabled
check whether the peer is enabled or not- Specified by:
isPeerEnabledin interfaceReplicationSourceInterface- Returns:
- true if the peer is enabled, otherwise false
-
initialize
-
setSourceStartupStatus
-
startup
Description copied from interface:ReplicationSourceInterfaceStart the replication- Specified by:
startupin interfaceReplicationSourceInterface
-
terminate
Description copied from interface:ReplicationSourceInterfaceEnd the replication- Specified by:
terminatein interfaceReplicationSourceInterface- Parameters:
reason- why it's terminating
-
terminate
Description copied from interface:ReplicationSourceInterfaceEnd the replication- Specified by:
terminatein interfaceReplicationSourceInterface- Parameters:
reason- why it's terminatingcause- the error that's causing it
-
terminate
Description copied from interface:ReplicationSourceInterfaceEnd the replication- Specified by:
terminatein interfaceReplicationSourceInterface- Parameters:
reason- why it's terminatingcause- the error that's causing itclearMetrics- removes all metrics about this Source
-
terminate
-
getQueueId
Description copied from interface:ReplicationSourceInterfaceGet the queue id that the source is replicating to- Specified by:
getQueueIdin interfaceReplicationSourceInterface- Returns:
- queue id
-
getPeerId
Description copied from interface:ReplicationSourceInterfaceGet the id that the source is replicating to.- Specified by:
getPeerIdin interfaceReplicationSourceInterface- Returns:
- peer id
-
getCurrentPath
Description copied from interface:ReplicationSourceInterfaceGet the current log that's replicated- Specified by:
getCurrentPathin interfaceReplicationSourceInterface- Returns:
- the current log
-
isSourceActive
Description copied from interface:ReplicationSourceInterfaceReturns active or not- Specified by:
isSourceActivein interfaceReplicationSourceInterface
-
getReplicationQueueInfo
-
isWorkerRunning
-
getStats
Description copied from interface:ReplicationSourceInterfaceGet a string representation of the current statistics for this source- Specified by:
getStatsin interfaceReplicationSourceInterface- Returns:
- printable stats
-
getSourceMetrics
Description copied from interface:ReplicationSourceInterfaceReturns metrics of this replication source- Specified by:
getSourceMetricsin interfaceReplicationSourceInterface
-
postShipEdits
Description copied from interface:ReplicationSourceInterfaceCall this after the shipper thread ship some entries to peer cluster.- Specified by:
postShipEditsin interfaceReplicationSourceInterface- Parameters:
entries- pushedbatchSize- entries size pushed
-
getWALFileLengthProvider
Description copied from interface:ReplicationSourceInterfaceReturns the wal file length provider- Specified by:
getWALFileLengthProviderin interfaceReplicationSourceInterface
-
getServerWALsBelongTo
Description copied from interface:ReplicationSourceInterfaceThe queue of WALs only belong to one region server. This will return the server name which all WALs belong to.- Specified by:
getServerWALsBelongToin interfaceReplicationSourceInterface- Returns:
- the server name which all WALs belong to
-
getServer
-
getReplicationQueueStorage
Description copied from interface:ReplicationSourceInterfaceReturns The instance of queueStorage used by this ReplicationSource.- Specified by:
getReplicationQueueStoragein interfaceReplicationSourceInterface
-
logPeerId
Returns String to use as a log prefix that contains current peerId. -
getTotalReplicatedEdits
-