@InterfaceAudience.LimitedPrivate(value="Configuration") public class AsyncFSWAL extends AbstractFSWAL<WALProvider.AsyncWriter>
Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
For append, we process it as follow:
shouldScheduleConsumer()
for more details.
waitingConsumePayloads
and insert it into
toWriteAppends
toWriteAppends
, append it to the AsyncWriter, and insert it into
unackedAppends
batchSize
, or there is a sync request, then we call
sync on the AsyncWriter.unackedAppends
and drop it.unackedAppends
back to toWriteAppends
and
wait for writing them again.
Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
FSHLog.
For a normal roll request(for example, we have reached the log roll size):
waitingRoll(int)
to true and
readyForRolling
to false, and then wait on readyForRolling
(see
waitForSafePoint()
).waitingConsumePayloads
if
waitingRoll(int)
is true, and also stop writing the entries in toWriteAppends
out.
unackedAppends
is empty,
signal the readyForRollingCond
.writerBroken(int)
and waitingRoll(int)
to false.WAL.Entry, WAL.Reader
abortable, blocksize, closed, conf, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, totalLogSize, useHsync, WAL_ROLL_MULTIPLIER, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, writer
Constructor and Description |
---|
AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs,
Abortable abortable,
org.apache.hadoop.fs.Path rootDir,
String logDir,
String archiveDir,
org.apache.hadoop.conf.Configuration conf,
List<WALActionsListener> listeners,
boolean failIfWALExists,
String prefix,
String suffix,
org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup,
Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass,
StreamSlowMonitor monitor) |
AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path rootDir,
String logDir,
String archiveDir,
org.apache.hadoop.conf.Configuration conf,
List<WALActionsListener> listeners,
boolean failIfWALExists,
String prefix,
String suffix,
org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup,
Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) |
Modifier and Type | Method and Description |
---|---|
protected long |
append(RegionInfo hri,
WALKeyImpl key,
WALEdit edits,
boolean inMemstore)
Append a set of edits to the WAL.
|
private void |
appendAndSync() |
protected long |
closeWriter(WALProvider.AsyncWriter writer,
org.apache.hadoop.fs.Path path) |
private void |
consume() |
protected WALProvider.AsyncWriter |
createWriterInstance(org.apache.hadoop.fs.Path path) |
protected void |
doAppend(WALProvider.AsyncWriter writer,
FSWALEntry entry) |
protected boolean |
doCheckLogLowReplication() |
protected void |
doReplaceWriter(org.apache.hadoop.fs.Path oldPath,
org.apache.hadoop.fs.Path newPath,
WALProvider.AsyncWriter nextWriter)
Notice that you need to clear the
AbstractFSWAL.rollRequested flag in this method, as the new writer
will begin to work before returning from this method. |
protected void |
doShutdown() |
protected void |
doSync(boolean forceSync) |
protected void |
doSync(long txid,
boolean forceSync) |
private static int |
epoch(int epochAndState) |
private int |
finishSync() |
private int |
finishSyncLowerThanTxid(long txid) |
private static long |
getLastTxid(Deque<FSWALEntry> queue) |
(package private) int |
getLogReplication()
This method gets the datanode replication count for the current WAL.
|
(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[] |
getPipeline()
This method gets the pipeline for the current WAL.
|
private boolean |
isHsync(long beginTxid,
long endTxid) |
private void |
markFutureDoneAndOffer(SyncFuture future,
long txid,
Throwable t)
Helper that marks the future as DONE and offers it back to the cache.
|
private boolean |
shouldScheduleConsumer() |
private void |
sync(WALProvider.AsyncWriter writer) |
private void |
syncCompleted(long epochWhenSync,
WALProvider.AsyncWriter writer,
long processedTxid,
long startTimeNs) |
private void |
syncFailed(long epochWhenSync,
Throwable error) |
private boolean |
trySetReadyForRolling() |
private void |
waitForSafePoint() |
private static boolean |
waitingRoll(int epochAndState) |
private static boolean |
writerBroken(int epochAndState) |
abortCacheFlush, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, close, completeCacheFlush, computeFilename, doCheckSlowSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, init, isLogRollRequested, isUnflushedEntries, logRollAndSetupWalProps, main, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, shutdown, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getTimestamp
private static final org.slf4j.Logger LOG
private static final Comparator<SyncFuture> SEQ_COMPARATOR
public static final String WAL_BATCH_SIZE
public static final long DEFAULT_WAL_BATCH_SIZE
public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP
public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP
public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
private final org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup
private final ExecutorService consumeExecutor
private final Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass
private final Lock consumeLock
private final Supplier<Boolean> hasConsumerTask
private static final int MAX_EPOCH
private volatile int epochAndState
private boolean readyForRolling
private final Condition readyForRollingCond
private final com.lmax.disruptor.RingBuffer<RingBufferTruck> waitingConsumePayloads
private final com.lmax.disruptor.Sequence waitingConsumePayloadsGatingSequence
private final AtomicBoolean consumerScheduled
private final long batchSize
private final ExecutorService closeExecutor
private volatile AsyncFSOutput fsOut
private final Deque<FSWALEntry> toWriteAppends
private final Deque<FSWALEntry> unackedAppends
private final SortedSet<SyncFuture> syncFutures
private long highestProcessedAppendTxid
private long fileLengthAtLastSync
private long highestProcessedAppendTxidAtLastSync
private final int waitOnShutdownInSeconds
private final StreamSlowMonitor streamSlowMonitor
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) throws FailedLogCloseException, IOException
FailedLogCloseException
IOException
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException
FailedLogCloseException
IOException
private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t)
private static boolean waitingRoll(int epochAndState)
private static boolean writerBroken(int epochAndState)
private static int epoch(int epochAndState)
private boolean trySetReadyForRolling()
private void syncFailed(long epochWhenSync, Throwable error)
private void syncCompleted(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs)
private boolean isHsync(long beginTxid, long endTxid)
private void sync(WALProvider.AsyncWriter writer)
private int finishSyncLowerThanTxid(long txid)
private int finishSync()
private static long getLastTxid(Deque<FSWALEntry> queue)
private void appendAndSync()
private void consume()
private boolean shouldScheduleConsumer()
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException
AbstractFSWAL
key
will have the region edit/sequence id filled in.
NOTE: This append, at a time that is usually after this call returns, starts an mvcc
transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
'complete' the transaction this mvcc transaction by calling
MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
in the finally of a try/finally block within which this append lives and any subsequent
operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
passed in WALKey walKey
parameter. Be warned that the WriteEntry is not
immediately available on return from this method. It WILL be available subsequent to a sync of
this append; otherwise, you will just have to wait on the WriteEntry to get filled in.append
in class AbstractFSWAL<WALProvider.AsyncWriter>
hri
- the regioninfo associated with appendkey
- Modified by this call; we add to it this edits region edit/sequence id.edits
- Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
sequence id that is after all currently appended edits.inMemstore
- Always true except for case where we are writing a region event meta marker
edit, for example, a compaction completion record into the WAL or noting a
Region Open event. In these cases the entry is just so we can finish an
unfinished compaction after a crash when the new Server reads the WAL on
recovery, etc. These transition event 'Markers' do not go via the memstore.
When memstore is false, we presume a Marker event edit.key
will have the region edit/sequence id
in it.IOException
protected void doSync(boolean forceSync) throws IOException
doSync
in class AbstractFSWAL<WALProvider.AsyncWriter>
IOException
protected void doSync(long txid, boolean forceSync) throws IOException
doSync
in class AbstractFSWAL<WALProvider.AsyncWriter>
IOException
protected WALProvider.AsyncWriter createWriterInstance(org.apache.hadoop.fs.Path path) throws IOException
createWriterInstance
in class AbstractFSWAL<WALProvider.AsyncWriter>
IOException
private void waitForSafePoint()
protected final long closeWriter(WALProvider.AsyncWriter writer, org.apache.hadoop.fs.Path path)
protected void doReplaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.AsyncWriter nextWriter) throws IOException
AbstractFSWAL
AbstractFSWAL.rollRequested
flag in this method, as the new writer
will begin to work before returning from this method. If we clear the flag after returning from
this call, we may miss a roll request. The implementation class should choose a proper place to
clear the AbstractFSWAL.rollRequested
flag so we do not miss a roll request, typically before you
start writing to the new writer.doReplaceWriter
in class AbstractFSWAL<WALProvider.AsyncWriter>
IOException
protected void doShutdown() throws IOException
doShutdown
in class AbstractFSWAL<WALProvider.AsyncWriter>
IOException
protected void doAppend(WALProvider.AsyncWriter writer, FSWALEntry entry)
doAppend
in class AbstractFSWAL<WALProvider.AsyncWriter>
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()
AbstractFSWAL
getPipeline
in class AbstractFSWAL<WALProvider.AsyncWriter>
int getLogReplication()
AbstractFSWAL
getLogReplication
in class AbstractFSWAL<WALProvider.AsyncWriter>
protected boolean doCheckLogLowReplication()
doCheckLogLowReplication
in class AbstractFSWAL<WALProvider.AsyncWriter>
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.