Class AsyncFSWAL
java.lang.Object
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL<WALProvider.AsyncWriter>
org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL
- All Implemented Interfaces:
Closeable
,AutoCloseable
,WALFileLengthProvider
,WAL
@LimitedPrivate("Configuration")
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter>
An asynchronous implementation of FSWAL.
Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
For append, we process it as follow:
- In the caller thread(typically, in the rpc handler thread):
- Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.
- Schedule the consumer task if needed. See
AbstractFSWAL.shouldScheduleConsumer()
for more details.
- In the consumer task(executed in a single threaded thread pool)
- Poll the entry from
AbstractFSWAL.waitingConsumePayloads
and insert it intoAbstractFSWAL.toWriteAppends
- Poll the entry from
AbstractFSWAL.toWriteAppends
, append it to the AsyncWriter, and insert it intoAbstractFSWAL.unackedAppends
- If the buffered size reaches
AbstractFSWAL.batchSize
, or there is a sync request, then we call sync on the AsyncWriter. - In the callback methods:
- If succeeded, poll the entry from
AbstractFSWAL.unackedAppends
and drop it. - If failed, add all the entries in
AbstractFSWAL.unackedAppends
back toAbstractFSWAL.toWriteAppends
and wait for writing them again.
- If succeeded, poll the entry from
- Poll the entry from
Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
FSHLog.
For a normal roll request(for example, we have reached the log roll size):
- In the log roller thread, we will set
AbstractFSWAL.waitingRoll(int)
to true andAbstractFSWAL.readyForRolling
to false, and then wait onAbstractFSWAL.readyForRolling
(seeAbstractFSWAL.waitForSafePoint()
). - In the consumer thread, we will stop polling entries from
AbstractFSWAL.waitingConsumePayloads
ifAbstractFSWAL.waitingRoll(int)
is true, and also stop writing the entries inAbstractFSWAL.toWriteAppends
out. - If there are unflush data in the writer, sync them.
- When all out-going sync request is finished, i.e, the
AbstractFSWAL.unackedAppends
is empty, signal theAbstractFSWAL.readyForRollingCond
. - Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., we reach a safe point. So it is safe to replace old writer with new writer now.
- Set
AbstractFSWAL.writerBroken(int)
andAbstractFSWAL.waitingRoll(int)
to false. - Schedule the consumer task.
- Schedule a background task to close the old writer.
-
Nested Class Summary
-
Field Summary
Modifier and TypeFieldDescriptionstatic final String
static final String
private final Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel>
static final boolean
static final int
private final org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup
private AsyncFSOutput
private static final org.slf4j.Logger
private final StreamSlowMonitor
Fields inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortable, blocksize, closed, closeExecutor, conf, consumeExecutor, consumer, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_BATCH_SIZE, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, hasConsumerTask, highestProcessedAppendTxid, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shouldShutDownConsumeExecutorWhenClose, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, syncFutures, totalLogSize, toWriteAppends, unackedAppends, useHsync, WAL_AVOID_LOCAL_WRITES_DEFAULT, WAL_AVOID_LOCAL_WRITES_KEY, WAL_BATCH_SIZE, WAL_ROLL_MULTIPLIER, WAL_SHUTDOWN_WAIT_TIMEOUT_MS, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, walShutdownTimeout, writer
-
Constructor Summary
ConstructorDescriptionAsyncFSWAL
(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hadoop.fs.FileSystem remoteFs, org.apache.hadoop.fs.Path remoteWALDir, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) -
Method Summary
Modifier and TypeMethodDescriptionprotected final WALProvider.AsyncWriter
createAsyncWriter
(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) protected WALProvider.AsyncWriter
createCombinedWriter
(WALProvider.AsyncWriter localWriter, WALProvider.AsyncWriter remoteWriter) protected WALProvider.AsyncWriter
createWriterInstance
(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) protected void
doAppend
(WALProvider.AsyncWriter writer, FSWALEntry entry) protected boolean
protected CompletableFuture<Long>
doWriterSync
(WALProvider.AsyncWriter writer, boolean shouldUseHsync, long txidWhenSyn) (package private) int
This method gets the datanode replication count for the current WAL.(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[]
This method gets the pipeline for the current WAL.protected void
onWriterReplaced
(WALProvider.AsyncWriter nextWriter) Methods inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortCacheFlush, append, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, checkSlowSyncCount, close, closeWriter, completeCacheFlush, computeFilename, createSingleThreadPoolConsumeExecutor, doCheckSlowSync, doCleanUpResources, doReplaceWriter, doShutdown, doSync, doSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSequenceIdAccounting, getSyncedTxid, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, getWriter, init, isLogRollRequested, isUnflushedEntries, isWriterBroken, logRollAndSetupWalProps, main, markFutureDoneAndOffer, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, setWaitOnShutdownInSeconds, shutdown, skipRemoteWAL, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore, waitForSafePoint
-
Field Details
-
LOG
-
ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
eventLoopGroup
-
channelClass
-
fsOut
-
streamSlowMonitor
-
-
Constructor Details
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hadoop.fs.FileSystem remoteFs, org.apache.hadoop.fs.Path remoteWALDir, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseException
IOException
-
-
Method Details
-
doWriterSync
protected CompletableFuture<Long> doWriterSync(WALProvider.AsyncWriter writer, boolean shouldUseHsync, long txidWhenSyn) - Specified by:
doWriterSync
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
createAsyncWriter
protected final WALProvider.AsyncWriter createAsyncWriter(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException - Throws:
IOException
-
createWriterInstance
protected WALProvider.AsyncWriter createWriterInstance(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException - Specified by:
createWriterInstance
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
onWriterReplaced
- Specified by:
onWriterReplaced
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
doAppend
- Specified by:
doAppend
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
getPipeline
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()Description copied from class:AbstractFSWAL
This method gets the pipeline for the current WAL.- Specified by:
getPipeline
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
getLogReplication
int getLogReplication()Description copied from class:AbstractFSWAL
This method gets the datanode replication count for the current WAL.- Specified by:
getLogReplication
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
doCheckLogLowReplication
- Specified by:
doCheckLogLowReplication
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
createCombinedWriter
protected WALProvider.AsyncWriter createCombinedWriter(WALProvider.AsyncWriter localWriter, WALProvider.AsyncWriter remoteWriter) - Specified by:
createCombinedWriter
in classAbstractFSWAL<WALProvider.AsyncWriter>
-