Class AsyncFSWAL
java.lang.Object
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL<WALProvider.AsyncWriter>
org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL
- All Implemented Interfaces:
Closeable,AutoCloseable,WALFileLengthProvider,WAL
@LimitedPrivate("Configuration")
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter>
An asynchronous implementation of FSWAL.
Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
For append, we process it as follow:
- In the caller thread(typically, in the rpc handler thread):
- Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.
- Schedule the consumer task if needed. See
AbstractFSWAL.shouldScheduleConsumer()for more details.
- In the consumer task(executed in a single threaded thread pool)
- Poll the entry from
AbstractFSWAL.waitingConsumePayloadsand insert it intoAbstractFSWAL.toWriteAppends - Poll the entry from
AbstractFSWAL.toWriteAppends, append it to the AsyncWriter, and insert it intoAbstractFSWAL.unackedAppends - If the buffered size reaches
AbstractFSWAL.batchSize, or there is a sync request, then we call sync on the AsyncWriter. - In the callback methods:
- If succeeded, poll the entry from
AbstractFSWAL.unackedAppendsand drop it. - If failed, add all the entries in
AbstractFSWAL.unackedAppendsback toAbstractFSWAL.toWriteAppendsand wait for writing them again.
- If succeeded, poll the entry from
- Poll the entry from
Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
FSHLog.
For a normal roll request(for example, we have reached the log roll size):
- In the log roller thread, we will set
AbstractFSWAL.waitingRoll(int)to true andAbstractFSWAL.readyForRollingto false, and then wait onAbstractFSWAL.readyForRolling(seeAbstractFSWAL.waitForSafePoint()). - In the consumer thread, we will stop polling entries from
AbstractFSWAL.waitingConsumePayloadsifAbstractFSWAL.waitingRoll(int)is true, and also stop writing the entries inAbstractFSWAL.toWriteAppendsout. - If there are unflush data in the writer, sync them.
- When all out-going sync request is finished, i.e, the
AbstractFSWAL.unackedAppendsis empty, signal theAbstractFSWAL.readyForRollingCond. - Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., we reach a safe point. So it is safe to replace old writer with new writer now.
- Set
AbstractFSWAL.writerBroken(int)andAbstractFSWAL.waitingRoll(int)to false. - Schedule the consumer task.
- Schedule a background task to close the old writer.
-
Nested Class Summary
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Stringstatic final Stringprivate final Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel>static final booleanstatic final intprivate final org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroupprivate AsyncFSOutputprivate static final org.slf4j.Loggerprivate final StreamSlowMonitorFields inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortable, blocksize, closed, closeExecutor, conf, consumeExecutor, consumer, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_BATCH_SIZE, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, hasConsumerTask, highestProcessedAppendTxid, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shouldShutDownConsumeExecutorWhenClose, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, syncFutures, totalLogSize, toWriteAppends, unackedAppends, useHsync, WAL_AVOID_LOCAL_WRITES_DEFAULT, WAL_AVOID_LOCAL_WRITES_KEY, WAL_BATCH_SIZE, WAL_ROLL_MULTIPLIER, WAL_SHUTDOWN_WAIT_TIMEOUT_MS, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, walShutdownTimeout, writer -
Constructor Summary
ConstructorsConstructorDescriptionAsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hadoop.fs.FileSystem remoteFs, org.apache.hadoop.fs.Path remoteWALDir, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) -
Method Summary
Modifier and TypeMethodDescriptionprotected final WALProvider.AsyncWritercreateAsyncWriter(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) protected WALProvider.AsyncWritercreateCombinedWriter(WALProvider.AsyncWriter localWriter, WALProvider.AsyncWriter remoteWriter) protected WALProvider.AsyncWritercreateWriterInstance(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) protected voiddoAppend(WALProvider.AsyncWriter writer, FSWALEntry entry) protected booleanprotected CompletableFuture<Long>doWriterSync(WALProvider.AsyncWriter writer, boolean shouldUseHsync, long txidWhenSyn) (package private) intThis method gets the datanode replication count for the current WAL.(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[]This method gets the pipeline for the current WAL.protected voidonWriterReplaced(WALProvider.AsyncWriter nextWriter) Methods inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortCacheFlush, append, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, checkSlowSyncCount, close, closeWriter, completeCacheFlush, computeFilename, createSingleThreadPoolConsumeExecutor, doCheckSlowSync, doCleanUpResources, doReplaceWriter, doShutdown, doSync, doSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSequenceIdAccounting, getSyncedTxid, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, getWriter, init, isLogRollRequested, isUnflushedEntries, isWriterBroken, logRollAndSetupWalProps, main, markFutureDoneAndOffer, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, setWaitOnShutdownInSeconds, shutdown, skipRemoteWAL, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore, waitForSafePoint
-
Field Details
-
LOG
-
ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
eventLoopGroup
-
channelClass
-
fsOut
-
streamSlowMonitor
-
-
Constructor Details
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hadoop.fs.FileSystem remoteFs, org.apache.hadoop.fs.Path remoteWALDir, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseExceptionIOException
-
-
Method Details
-
doWriterSync
protected CompletableFuture<Long> doWriterSync(WALProvider.AsyncWriter writer, boolean shouldUseHsync, long txidWhenSyn) - Specified by:
doWriterSyncin classAbstractFSWAL<WALProvider.AsyncWriter>
-
createAsyncWriter
protected final WALProvider.AsyncWriter createAsyncWriter(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException - Throws:
IOException
-
createWriterInstance
protected WALProvider.AsyncWriter createWriterInstance(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException - Specified by:
createWriterInstancein classAbstractFSWAL<WALProvider.AsyncWriter>- Throws:
IOException
-
onWriterReplaced
- Specified by:
onWriterReplacedin classAbstractFSWAL<WALProvider.AsyncWriter>
-
doAppend
- Specified by:
doAppendin classAbstractFSWAL<WALProvider.AsyncWriter>
-
getPipeline
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()Description copied from class:AbstractFSWALThis method gets the pipeline for the current WAL.- Specified by:
getPipelinein classAbstractFSWAL<WALProvider.AsyncWriter>
-
getLogReplication
int getLogReplication()Description copied from class:AbstractFSWALThis method gets the datanode replication count for the current WAL.- Specified by:
getLogReplicationin classAbstractFSWAL<WALProvider.AsyncWriter>
-
doCheckLogLowReplication
- Specified by:
doCheckLogLowReplicationin classAbstractFSWAL<WALProvider.AsyncWriter>
-
createCombinedWriter
protected WALProvider.AsyncWriter createCombinedWriter(WALProvider.AsyncWriter localWriter, WALProvider.AsyncWriter remoteWriter) - Specified by:
createCombinedWriterin classAbstractFSWAL<WALProvider.AsyncWriter>
-