Class AsyncFSWAL
java.lang.Object
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL<WALProvider.AsyncWriter>
org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL
- All Implemented Interfaces:
Closeable,AutoCloseable,WALFileLengthProvider,WAL
@LimitedPrivate("Configuration")
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter>
An asynchronous implementation of FSWAL.
Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
For append, we process it as follow:
- In the caller thread(typically, in the rpc handler thread):
- Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.
- Schedule the consumer task if needed. See
shouldScheduleConsumer()for more details.
- In the consumer task(executed in a single threaded thread pool)
- Poll the entry from
waitingConsumePayloadsand insert it intotoWriteAppends - Poll the entry from
toWriteAppends, append it to the AsyncWriter, and insert it intounackedAppends - If the buffered size reaches
batchSize, or there is a sync request, then we call sync on the AsyncWriter. - In the callback methods:
- If succeeded, poll the entry from
unackedAppendsand drop it. - If failed, add all the entries in
unackedAppendsback totoWriteAppendsand wait for writing them again.
- If succeeded, poll the entry from
- Poll the entry from
Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
FSHLog.
For a normal roll request(for example, we have reached the log roll size):
- In the log roller thread, we will set
waitingRoll(int)to true andreadyForRollingto false, and then wait onreadyForRolling(seewaitForSafePoint()). - In the consumer thread, we will stop polling entries from
waitingConsumePayloadsifwaitingRoll(int)is true, and also stop writing the entries intoWriteAppendsout. - If there are unflush data in the writer, sync them.
- When all out-going sync request is finished, i.e, the
unackedAppendsis empty, signal thereadyForRollingCond. - Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., we reach a safe point. So it is safe to replace old writer with new writer now.
- Set
writerBroken(int)andwaitingRoll(int)to false. - Schedule the consumer task.
- Schedule a background task to close the old writer.
-
Nested Class Summary
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Stringstatic final Stringprivate final longprivate final Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel>private final ExecutorServiceprivate final Lockprivate final Runnableprivate final AtomicBooleanstatic final booleanstatic final intstatic final longprivate intprivate final org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroupprivate longprivate AsyncFSOutputprivate longprivate longprivate static final org.slf4j.Loggerprivate static final intprivate booleanprivate final Conditionprivate static final Comparator<SyncFuture>private final StreamSlowMonitorprivate final SortedSet<SyncFuture>private final Deque<FSWALEntry>private final Deque<FSWALEntry>private final com.lmax.disruptor.RingBuffer<RingBufferTruck>private final com.lmax.disruptor.Sequenceprivate final intstatic final StringFields inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortable, blocksize, closed, closeExecutor, conf, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, totalLogSize, useHsync, WAL_AVOID_LOCAL_WRITES_DEFAULT, WAL_AVOID_LOCAL_WRITES_KEY, WAL_ROLL_MULTIPLIER, WAL_SHUTDOWN_WAIT_TIMEOUT_MS, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, walShutdownTimeout, writer -
Constructor Summary
ConstructorsConstructorDescriptionAsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) -
Method Summary
Modifier and TypeMethodDescriptionprotected longappend(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) Append a set of edits to the WAL.private voidprivate voidcloseWriter(WALProvider.AsyncWriter writer, org.apache.hadoop.fs.Path path) private voidconsume()protected WALProvider.AsyncWritercreateWriterInstance(org.apache.hadoop.fs.Path path) protected voiddoAppend(WALProvider.AsyncWriter writer, FSWALEntry entry) protected booleanprotected voiddoReplaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.AsyncWriter nextWriter) Notice that you need to clear theAbstractFSWAL.rollRequestedflag in this method, as the new writer will begin to work before returning from this method.protected voidprotected voiddoSync(boolean forceSync) protected voiddoSync(long txid, boolean forceSync) private static intepoch(int epochAndState) private intprivate intfinishSyncLowerThanTxid(long txid) private static longgetLastTxid(Deque<FSWALEntry> queue) (package private) intThis method gets the datanode replication count for the current WAL.(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[]This method gets the pipeline for the current WAL.private booleanisHsync(long beginTxid, long endTxid) private voidmarkFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) Helper that marks the future as DONE and offers it back to the cache.private booleanprivate voidsync(WALProvider.AsyncWriter writer) private voidsyncCompleted(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) private voidsyncFailed(long epochWhenSync, Throwable error) private booleanprivate voidprivate static booleanwaitingRoll(int epochAndState) private static booleanwriterBroken(int epochAndState) Methods inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortCacheFlush, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, close, completeCacheFlush, computeFilename, doCheckSlowSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, getWriter, init, isLogRollRequested, isUnflushedEntries, logRollAndSetupWalProps, main, markClosedAndClean, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, shutdown, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore
-
Field Details
-
LOG
-
SEQ_COMPARATOR
-
WAL_BATCH_SIZE
- See Also:
-
DEFAULT_WAL_BATCH_SIZE
- See Also:
-
ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
eventLoopGroup
-
consumeExecutor
-
channelClass
-
consumeLock
-
consumer
-
hasConsumerTask
-
MAX_EPOCH
- See Also:
-
epochAndState
-
readyForRolling
-
readyForRollingCond
-
waitingConsumePayloads
-
waitingConsumePayloadsGatingSequence
-
consumerScheduled
-
batchSize
-
fsOut
-
toWriteAppends
-
unackedAppends
-
syncFutures
-
highestProcessedAppendTxid
-
fileLengthAtLastSync
-
highestProcessedAppendTxidAtLastSync
-
waitOnShutdownInSeconds
-
streamSlowMonitor
-
-
Constructor Details
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseExceptionIOException
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseExceptionIOException
-
-
Method Details
-
markFutureDoneAndOffer
Helper that marks the future as DONE and offers it back to the cache. -
waitingRoll
-
writerBroken
-
epoch
-
trySetReadyForRolling
-
syncFailed
-
syncCompleted
private void syncCompleted(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) -
isHsync
-
sync
-
finishSyncLowerThanTxid
-
finishSync
-
getLastTxid
-
appendAndSync
-
consume
-
shouldScheduleConsumer
-
append
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException Description copied from class:AbstractFSWALAppend a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must have its region edit/sequence id assigned else it messes up our unification of mvcc and sequenceid. On returnkeywill have the region edit/sequence id filled in. NOTE: This appends, at a time that is usually after this call returns, starts a mvcc transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 'complete' the transaction this mvcc transaction by calling MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it in the finally of a try/finally block within which this appends lives and any subsequent operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the passed in WALKeywalKeyparameter. Be warned that the WriteEntry is not immediately available on return from this method. It WILL be available subsequent to a sync of this append; otherwise, you will just have to wait on the WriteEntry to get filled in.- Specified by:
appendin classAbstractFSWAL<WALProvider.AsyncWriter>- Parameters:
hri- the regioninfo associated with appendkey- Modified by this call; we add to it this edits region edit/sequence id.edits- Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit sequence id that is after all currently appended edits.inMemstore- Always true except for case where we are writing a region event meta marker edit, for example, a compaction completion record into the WAL or noting a Region Open event. In these cases the entry is just so we can finish an unfinished compaction after a crash when the new Server reads the WAL on recovery, etc. These transition event 'Markers' do not go via the memstore. When memstore is false, we presume a Marker event edit.- Returns:
- Returns a 'transaction id' and
keywill have the region edit/sequence id in it. - Throws:
IOException
-
doSync
- Specified by:
doSyncin classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
doSync
- Specified by:
doSyncin classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
createWriterInstance
protected WALProvider.AsyncWriter createWriterInstance(org.apache.hadoop.fs.Path path) throws IOException - Specified by:
createWriterInstancein classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
waitForSafePoint
-
closeWriter
-
doReplaceWriter
protected void doReplaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.AsyncWriter nextWriter) throws IOException Description copied from class:AbstractFSWALNotice that you need to clear theAbstractFSWAL.rollRequestedflag in this method, as the new writer will begin to work before returning from this method. If we clear the flag after returning from this call, we may miss a roll request. The implementation class should choose a proper place to clear theAbstractFSWAL.rollRequestedflag, so we do not miss a roll request, typically before you start writing to the new writer.- Specified by:
doReplaceWriterin classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
doShutdown
- Specified by:
doShutdownin classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
doAppend
- Specified by:
doAppendin classAbstractFSWAL<WALProvider.AsyncWriter>
-
getPipeline
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()Description copied from class:AbstractFSWALThis method gets the pipeline for the current WAL.- Specified by:
getPipelinein classAbstractFSWAL<WALProvider.AsyncWriter>
-
getLogReplication
int getLogReplication()Description copied from class:AbstractFSWALThis method gets the datanode replication count for the current WAL.- Specified by:
getLogReplicationin classAbstractFSWAL<WALProvider.AsyncWriter>
-
doCheckLogLowReplication
- Specified by:
doCheckLogLowReplicationin classAbstractFSWAL<WALProvider.AsyncWriter>
-