@InterfaceAudience.Private public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint
ReplicationEndpoint
implementation for replicating
to another HBase cluster. For the slave cluster it selects a random number of peers using a
replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region
servers, 10 will be selected.
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
HBaseReplicationEndpoint.PeerRegionServerListener, HBaseReplicationEndpoint.SinkPeer
ReplicationEndpoint.Context, ReplicationEndpoint.ReplicateContext
Modifier and Type | Field and Description |
---|---|
private org.apache.hadoop.fs.Path |
baseNamespaceDir |
private boolean |
dropOnDeletedColumnFamilies |
private boolean |
dropOnDeletedTables |
private org.apache.hadoop.fs.Path |
hfileArchiveDir |
private boolean |
isSerial |
private long |
lastSinkFetchTime |
private static org.slf4j.Logger |
LOG |
private int |
maxRetriesMultiplier |
private int |
maxThreads |
private MetricsSource |
metrics |
private boolean |
peersSelected |
static String |
REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY
Drop edits for CFs that been deleted from the replication source and target
|
static String |
REPLICATION_DROP_ON_DELETED_TABLE_KEY
Drop edits for tables that been deleted from the replication source and target
|
private boolean |
replicationBulkLoadDataEnabled |
private String |
replicationClusterId |
private int |
replicationRpcLimit |
private long |
sleepForRetries |
private int |
socketTimeoutMultiplier |
private boolean |
stopping |
conf, DEFAULT_BAD_SINK_THRESHOLD, DEFAULT_REPLICATION_SOURCE_RATIO
ctx, REPLICATION_WALENTRYFILTER_CONFIG_KEY
Constructor and Description |
---|
HBaseInterClusterReplicationEndpoint() |
Modifier and Type | Method and Description |
---|---|
protected CompletableFuture<Integer> |
asyncReplicate(List<WAL.Entry> entries,
int batchIndex,
int timeout)
Replicate entries to peer cluster by async API.
|
private void |
connectToPeers() |
private List<List<WAL.Entry>> |
createBatches(List<WAL.Entry> entries)
Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
concurrently.
|
private List<List<WAL.Entry>> |
createParallelBatches(List<WAL.Entry> entries) |
private List<List<WAL.Entry>> |
createSerialBatches(List<WAL.Entry> entries) |
private void |
decorateConf() |
protected void |
doStop() |
(package private) List<List<WAL.Entry>> |
filterNotExistColumnFamilyEdits(List<List<WAL.Entry>> oldEntryList) |
(package private) List<List<WAL.Entry>> |
filterNotExistTableEdits(List<List<WAL.Entry>> oldEntryList) |
private int |
getEstimatedEntrySize(WAL.Entry e) |
void |
init(ReplicationEndpoint.Context context)
Initialize the replication endpoint with the given context.
|
static boolean |
isNoSuchColumnFamilyException(Throwable io)
Check if there's an
NoSuchColumnFamilyException in the caused by stacktrace. |
protected boolean |
isPeerEnabled() |
static boolean |
isTableNotFoundException(Throwable io)
Check if there's an
TableNotFoundException in the caused by stacktrace. |
private String |
logPeerId() |
private void |
onReplicateWALEntryException(int entriesHashCode,
Throwable exception,
HBaseReplicationEndpoint.SinkPeer sinkPeer) |
private long |
parallelReplicate(ReplicationEndpoint.ReplicateContext replicateContext,
List<List<WAL.Entry>> batches) |
boolean |
replicate(ReplicationEndpoint.ReplicateContext replicateContext)
Do the shipping logic
|
protected CompletableFuture<Integer> |
replicateEntries(List<WAL.Entry> entries,
int batchIndex,
int timeout) |
private CompletableFuture<Integer> |
serialReplicateRegionEntries(org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator<WAL.Entry> walEntryPeekingIterator,
int batchIndex,
int timeout)
Here for
HBaseInterClusterReplicationEndpoint#isSerialis is true, we iterator over the
WAL WAL.Entry list, once we reached a batch limit, we send it out, and in the callback, we
send the next batch, until we send all entries out. |
private boolean |
sleepForRetries(String msg,
int sleepMultiplier)
Do the sleeping logic
|
abort, chooseSinks, createConnection, disconnect, doStart, fetchSlavesAddresses, getNumSinks, getPeerUUID, getReplicationSink, isAborted, reportBadSink, reportSinkSuccess, start, stop
canReplicateToSameCluster, getNamespaceTableCfWALEntryFilter, getScopeWALEntryFilter, getWALEntryfilter, isStarting, peerConfigUpdated
addListener, awaitRunning, awaitRunning, awaitRunning, awaitTerminated, awaitTerminated, awaitTerminated, doCancelStart, failureCause, isRunning, notifyFailed, notifyStarted, notifyStopped, startAsync, state, stopAsync, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
awaitRunning, awaitRunning, awaitTerminated, awaitTerminated, failureCause, isRunning
private static final org.slf4j.Logger LOG
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY
public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY
private long sleepForRetries
private int maxRetriesMultiplier
private int socketTimeoutMultiplier
private int replicationRpcLimit
private MetricsSource metrics
private boolean peersSelected
private String replicationClusterId
private int maxThreads
private org.apache.hadoop.fs.Path baseNamespaceDir
private org.apache.hadoop.fs.Path hfileArchiveDir
private boolean replicationBulkLoadDataEnabled
private boolean dropOnDeletedTables
private boolean dropOnDeletedColumnFamilies
private boolean isSerial
private long lastSinkFetchTime
private volatile boolean stopping
public HBaseInterClusterReplicationEndpoint()
public void init(ReplicationEndpoint.Context context) throws IOException
ReplicationEndpoint
init
in interface ReplicationEndpoint
init
in class HBaseReplicationEndpoint
context
- replication contextIOException
- error occur when initialize the endpoint.private void decorateConf()
private void connectToPeers()
private boolean sleepForRetries(String msg, int sleepMultiplier)
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmentedsleepMultiplier
is < maxRetriesMultiplier
private int getEstimatedEntrySize(WAL.Entry e)
private List<List<WAL.Entry>> createBatches(List<WAL.Entry> entries)
public static boolean isTableNotFoundException(Throwable io)
TableNotFoundException
in the caused by stacktrace.public static boolean isNoSuchColumnFamilyException(Throwable io)
NoSuchColumnFamilyException
in the caused by stacktrace.List<List<WAL.Entry>> filterNotExistTableEdits(List<List<WAL.Entry>> oldEntryList)
List<List<WAL.Entry>> filterNotExistColumnFamilyEdits(List<List<WAL.Entry>> oldEntryList)
private long parallelReplicate(ReplicationEndpoint.ReplicateContext replicateContext, List<List<WAL.Entry>> batches) throws IOException
IOException
public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext)
replicateContext
- a context where WAL entries and other parameters can be obtained.protected boolean isPeerEnabled()
protected void doStop()
doStop
in class HBaseReplicationEndpoint
protected CompletableFuture<Integer> replicateEntries(List<WAL.Entry> entries, int batchIndex, int timeout)
private void onReplicateWALEntryException(int entriesHashCode, Throwable exception, HBaseReplicationEndpoint.SinkPeer sinkPeer)
private CompletableFuture<Integer> serialReplicateRegionEntries(org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator<WAL.Entry> walEntryPeekingIterator, int batchIndex, int timeout)
HBaseInterClusterReplicationEndpoint#isSerialis
is true, we iterator over the
WAL WAL.Entry
list, once we reached a batch limit, we send it out, and in the callback, we
send the next batch, until we send all entries out.protected CompletableFuture<Integer> asyncReplicate(List<WAL.Entry> entries, int batchIndex, int timeout)
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.