@InterfaceAudience.Private public class ReplicationSourceShipper extends Thread
Modifier and Type | Class and Description |
---|---|
static class |
ReplicationSourceShipper.WorkerState |
Thread.State, Thread.UncaughtExceptionHandler
Modifier and Type | Field and Description |
---|---|
private org.apache.hadoop.conf.Configuration |
conf |
private org.apache.hadoop.fs.Path |
currentPath |
private long |
currentPosition |
private int |
DEFAULT_TIMEOUT |
protected ReplicationSourceWALReader |
entryReader |
private int |
getEntriesTimeout |
private static org.slf4j.Logger |
LOG |
protected ReplicationSourceLogQueue |
logQueue |
protected int |
maxRetriesMultiplier |
private int |
shipEditsTimeout |
protected long |
sleepForRetries |
private ReplicationSource |
source |
private ReplicationSourceShipper.WorkerState |
state |
protected String |
walGroupId |
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
Constructor and Description |
---|
ReplicationSourceShipper(org.apache.hadoop.conf.Configuration conf,
String walGroupId,
ReplicationSourceLogQueue logQueue,
ReplicationSource source) |
Modifier and Type | Method and Description |
---|---|
private void |
cleanUpHFileRefs(WALEdit edit) |
(package private) void |
clearWALEntryBatch()
Attempts to properly update
ReplicationSourceManager.totalBufferUser , in case
there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
manage to ship those because the replication source is being terminated. |
private int |
getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch)
get batchEntry size excludes bulk load file sizes.
|
(package private) org.apache.hadoop.fs.Path |
getCurrentPath() |
(package private) long |
getCurrentPosition() |
(package private) long |
getStartPosition() |
protected boolean |
isActive() |
boolean |
isFinished() |
protected void |
noMoreData() |
protected void |
postFinish() |
void |
run() |
(package private) void |
setWALReader(ReplicationSourceWALReader entryReader) |
protected void |
setWorkerState(ReplicationSourceShipper.WorkerState state) |
private void |
shipEdits(WALEntryBatch entryBatch)
Do the shipping logic
|
boolean |
sleepForRetries(String msg,
int sleepMultiplier)
Do the sleeping logic
|
void |
startup(Thread.UncaughtExceptionHandler handler) |
(package private) void |
stopWorker() |
private boolean |
updateLogPosition(WALEntryBatch batch) |
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
private static final org.slf4j.Logger LOG
private final org.apache.hadoop.conf.Configuration conf
protected final String walGroupId
protected final ReplicationSourceLogQueue logQueue
private final ReplicationSource source
private volatile long currentPosition
private org.apache.hadoop.fs.Path currentPath
private volatile ReplicationSourceShipper.WorkerState state
protected ReplicationSourceWALReader entryReader
protected final long sleepForRetries
protected final int maxRetriesMultiplier
private final int DEFAULT_TIMEOUT
private final int getEntriesTimeout
private final int shipEditsTimeout
public ReplicationSourceShipper(org.apache.hadoop.conf.Configuration conf, String walGroupId, ReplicationSourceLogQueue logQueue, ReplicationSource source)
protected void noMoreData()
protected void postFinish()
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch)
private void shipEdits(WALEntryBatch entryBatch)
private void cleanUpHFileRefs(WALEdit edit) throws IOException
IOException
private boolean updateLogPosition(WALEntryBatch batch)
public void startup(Thread.UncaughtExceptionHandler handler)
org.apache.hadoop.fs.Path getCurrentPath()
long getCurrentPosition()
void setWALReader(ReplicationSourceWALReader entryReader)
long getStartPosition()
protected boolean isActive()
protected final void setWorkerState(ReplicationSourceShipper.WorkerState state)
void stopWorker()
public boolean isFinished()
public boolean sleepForRetries(String msg, int sleepMultiplier)
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmentedsleepMultiplier
is < maxRetriesMultiplier
void clearWALEntryBatch()
ReplicationSourceManager.totalBufferUser
, in case
there were unprocessed entries batched by the reader to the shipper, but the shipper didn't
manage to ship those because the replication source is being terminated. In that case, it
iterates through the batched entries and decrease the pending entries size from
ReplicationSourceManager.totalBufferUser
NOTES 1) This method should only be called upon replication source termination. It
blocks waiting for both shipper and reader threads termination, to make sure no race conditions
when updating ReplicationSourceManager.totalBufferUser
. 2) It does not
attempt to terminate reader and shipper threads. Those must have been triggered
interruption/termination prior to calling this method.Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.