@InterfaceAudience.Private public class FSHLog extends AbstractFSWAL<WALProvider.Writer>
Modifier and Type | Class and Description |
---|---|
(package private) class |
FSHLog.RingBufferEventHandler
Handler that is run by the disruptor ringbuffer consumer.
|
(package private) static class |
FSHLog.RingBufferExceptionHandler
Exception handler to pass the disruptor ringbuffer.
|
(package private) static class |
FSHLog.SafePointZigZagLatch
This class is used coordinating two threads holding one thread at a 'safe point' while the
orchestrating thread does some work that requires the first thread paused: e.g.
|
private class |
FSHLog.SyncRunner
Thread to runs the hdfs sync call.
|
WAL.Entry, WAL.Reader
Modifier and Type | Field and Description |
---|---|
private AtomicInteger |
closeErrorCount |
private int |
closeErrorsTolerated
Number of log close errors tolerated before we abort
|
private ExecutorService |
closeExecutor |
private AtomicInteger |
consecutiveLogRolls |
private static int |
DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS |
private static int |
DEFAULT_LOW_REPLICATION_ROLL_LIMIT |
private static int |
DEFAULT_MAX_BATCH_COUNT |
private static int |
DEFAULT_ROLL_ERRORS_TOLERATED |
private static int |
DEFAULT_SYNCER_COUNT |
private com.lmax.disruptor.dsl.Disruptor<RingBufferTruck> |
disruptor
The nexus at which all incoming handlers meet.
|
static long |
FIXED_OVERHEAD |
private static String |
FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS |
private org.apache.hadoop.fs.FSDataOutputStream |
hdfs_out
FSDataOutputStream associated with the current SequenceFile.writer
|
private static org.slf4j.Logger |
LOG |
private static String |
LOW_REPLICATION_ROLL_LIMIT |
private boolean |
lowReplicationRollEnabled |
private int |
lowReplicationRollLimit |
private static String |
MAX_BATCH_COUNT |
private int |
minTolerableReplication |
private FSHLog.RingBufferEventHandler |
ringBufferEventHandler
This fellow is run by the above appendExecutor service but it is all about batching up appends
and syncs; it may shutdown without cleaning out the last few appends or syncs.
|
private static String |
ROLL_ERRORS_TOLERATED |
private static String |
SYNCER_COUNT |
private static String |
TOLERABLE_LOW_REPLICATION |
private int |
waitOnShutdownInSeconds |
abortable, blocksize, closed, conf, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, totalLogSize, useHsync, WAL_ROLL_MULTIPLIER, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, writer
Constructor and Description |
---|
FSHLog(org.apache.hadoop.fs.FileSystem fs,
Abortable abortable,
org.apache.hadoop.fs.Path root,
String logDir,
org.apache.hadoop.conf.Configuration conf) |
FSHLog(org.apache.hadoop.fs.FileSystem fs,
Abortable abortable,
org.apache.hadoop.fs.Path rootDir,
String logDir,
String archiveDir,
org.apache.hadoop.conf.Configuration conf,
List<WALActionsListener> listeners,
boolean failIfWALExists,
String prefix,
String suffix)
Create an edit log at the given
dir location. |
FSHLog(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path root,
String logDir,
org.apache.hadoop.conf.Configuration conf)
Constructor.
|
FSHLog(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path rootDir,
String logDir,
String archiveDir,
org.apache.hadoop.conf.Configuration conf,
List<WALActionsListener> listeners,
boolean failIfWALExists,
String prefix,
String suffix) |
Modifier and Type | Method and Description |
---|---|
protected void |
afterCreatingZigZagLatch()
Used to manufacture race condition reliably.
|
protected long |
append(RegionInfo hri,
WALKeyImpl key,
WALEdit edits,
boolean inMemstore)
Append a set of edits to the WAL.
|
protected void |
beforeWaitOnSafePoint() |
private void |
checkLogRoll()
Schedule a log roll if needed.
|
private void |
closeWriter(WALProvider.Writer writer,
org.apache.hadoop.fs.Path path,
boolean syncCloseCall) |
protected WALProvider.Writer |
createWriterInstance(org.apache.hadoop.fs.Path path)
This method allows subclasses to inject different writers without having to extend other
methods like rollWriter().
|
protected void |
doAppend(WALProvider.Writer writer,
FSWALEntry entry) |
protected boolean |
doCheckLogLowReplication()
Returns true if number of replicas for the WAL is lower than threshold
|
protected void |
doReplaceWriter(org.apache.hadoop.fs.Path oldPath,
org.apache.hadoop.fs.Path newPath,
WALProvider.Writer nextWriter)
Notice that you need to clear the
AbstractFSWAL.rollRequested flag in this method, as the new writer
will begin to work before returning from this method. |
protected void |
doShutdown() |
protected void |
doSync(boolean forceSync) |
protected void |
doSync(long txid,
boolean forceSync) |
(package private) int |
getLogReplication()
This method gets the datanode replication count for the current WAL.
|
(package private) OutputStream |
getOutputStream()
Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the
default behavior (such as setting the maxRecoveryErrorCount value).
|
(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[] |
getPipeline()
This method gets the pipeline for the current WAL.
|
protected long |
getSequenceOnRingBuffer() |
(package private) WALProvider.Writer |
getWriter() |
(package private) boolean |
isLowReplicationRollEnabled() |
private void |
preemptiveSync(ProtobufLogWriter nextWriter)
Run a sync after opening to set up the pipeline.
|
private SyncFuture |
publishSyncOnRingBuffer(boolean forceSync) |
protected SyncFuture |
publishSyncOnRingBuffer(long sequence,
boolean forceSync) |
private void |
publishSyncThenBlockOnCompletion(boolean forceSync) |
(package private) void |
setWriter(WALProvider.Writer writer) |
abortCacheFlush, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, close, completeCacheFlush, computeFilename, doCheckSlowSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, init, isLogRollRequested, isUnflushedEntries, logRollAndSetupWalProps, main, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, shutdown, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getTimestamp
private static final org.slf4j.Logger LOG
private static final String TOLERABLE_LOW_REPLICATION
private static final String LOW_REPLICATION_ROLL_LIMIT
private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT
private static final String ROLL_ERRORS_TOLERATED
private static final int DEFAULT_ROLL_ERRORS_TOLERATED
private static final String SYNCER_COUNT
private static final int DEFAULT_SYNCER_COUNT
private static final String MAX_BATCH_COUNT
private static final int DEFAULT_MAX_BATCH_COUNT
private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS
private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS
private final com.lmax.disruptor.dsl.Disruptor<RingBufferTruck> disruptor
private final FSHLog.RingBufferEventHandler ringBufferEventHandler
private org.apache.hadoop.fs.FSDataOutputStream hdfs_out
private final int minTolerableReplication
private final AtomicInteger consecutiveLogRolls
private final int lowReplicationRollLimit
private volatile boolean lowReplicationRollEnabled
private final int closeErrorsTolerated
private final AtomicInteger closeErrorCount
private final int waitOnShutdownInSeconds
private final ExecutorService closeExecutor
public static final long FIXED_OVERHEAD
public FSHLog(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path root, String logDir, org.apache.hadoop.conf.Configuration conf) throws IOException
fs
- filesystem handleroot
- path for stored and archived walslogDir
- dir where wals are storedconf
- configuration to useIOException
public FSHLog(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path root, String logDir, org.apache.hadoop.conf.Configuration conf) throws IOException
IOException
public FSHLog(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException
IOException
public FSHLog(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException
dir
location. You should never have to load an
existing log. If there is a log at startup, it should have already been processed and deleted
by the time the WAL object is started up.fs
- filesystem handlerootDir
- path to where logs and oldlogslogDir
- dir where wals are storedarchiveDir
- dir where wals are archivedconf
- configuration to uselisteners
- Listeners on WAL events. Listeners passed here will be registered before
we do anything else; e.g. the Constructor AbstractFSWAL.rollWriter()
.failIfWALExists
- If true IOException will be thrown if files related to this wal already
exist.prefix
- should always be hostname and port in distributed env and it will be URL
encoded before being used. If prefix is null, "wal" will be usedsuffix
- will be url encoded. null is treated as empty. non-empty must start with
AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER
IOException
OutputStream getOutputStream()
private void preemptiveSync(ProtobufLogWriter nextWriter)
protected WALProvider.Writer createWriterInstance(org.apache.hadoop.fs.Path path) throws IOException
createWriterInstance
in class AbstractFSWAL<WALProvider.Writer>
IOException
protected void afterCreatingZigZagLatch()
beforeWaitOnSafePoint()
protected void beforeWaitOnSafePoint()
afterCreatingZigZagLatch()
protected void doAppend(WALProvider.Writer writer, FSWALEntry entry) throws IOException
doAppend
in class AbstractFSWAL<WALProvider.Writer>
IOException
protected void doReplaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.Writer nextWriter) throws IOException
AbstractFSWAL
AbstractFSWAL.rollRequested
flag in this method, as the new writer
will begin to work before returning from this method. If we clear the flag after returning from
this call, we may miss a roll request. The implementation class should choose a proper place to
clear the AbstractFSWAL.rollRequested
flag so we do not miss a roll request, typically before you
start writing to the new writer.doReplaceWriter
in class AbstractFSWAL<WALProvider.Writer>
IOException
private void closeWriter(WALProvider.Writer writer, org.apache.hadoop.fs.Path path, boolean syncCloseCall) throws IOException
IOException
protected void doShutdown() throws IOException
doShutdown
in class AbstractFSWAL<WALProvider.Writer>
IOException
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException
AbstractFSWAL
key
will have the region edit/sequence id filled in.
NOTE: This append, at a time that is usually after this call returns, starts an mvcc
transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
'complete' the transaction this mvcc transaction by calling
MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
in the finally of a try/finally block within which this append lives and any subsequent
operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
passed in WALKey walKey
parameter. Be warned that the WriteEntry is not
immediately available on return from this method. It WILL be available subsequent to a sync of
this append; otherwise, you will just have to wait on the WriteEntry to get filled in.append
in class AbstractFSWAL<WALProvider.Writer>
hri
- the regioninfo associated with appendkey
- Modified by this call; we add to it this edits region edit/sequence id.edits
- Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
sequence id that is after all currently appended edits.inMemstore
- Always true except for case where we are writing a region event meta marker
edit, for example, a compaction completion record into the WAL or noting a
Region Open event. In these cases the entry is just so we can finish an
unfinished compaction after a crash when the new Server reads the WAL on
recovery, etc. These transition event 'Markers' do not go via the memstore.
When memstore is false, we presume a Marker event edit.key
will have the region edit/sequence id
in it.IOException
private void checkLogRoll()
protected boolean doCheckLogLowReplication()
doCheckLogLowReplication
in class AbstractFSWAL<WALProvider.Writer>
protected long getSequenceOnRingBuffer()
private SyncFuture publishSyncOnRingBuffer(boolean forceSync)
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync)
private void publishSyncThenBlockOnCompletion(boolean forceSync) throws IOException
IOException
int getLogReplication()
If the pipeline isn't started yet or is empty, you will get the default replication factor. Therefore, if this function returns 0, it means you are not properly running with the HDFS-826 patch.
getLogReplication
in class AbstractFSWAL<WALProvider.Writer>
protected void doSync(boolean forceSync) throws IOException
doSync
in class AbstractFSWAL<WALProvider.Writer>
IOException
protected void doSync(long txid, boolean forceSync) throws IOException
doSync
in class AbstractFSWAL<WALProvider.Writer>
IOException
boolean isLowReplicationRollEnabled()
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()
getPipeline
in class AbstractFSWAL<WALProvider.Writer>
WALProvider.Writer getWriter()
void setWriter(WALProvider.Writer writer)
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.