@InterfaceAudience.Private public class FSHLog extends Object implements WAL
WAL
to go against FileSystem
; i.e. keep WALs in HDFS.
Only one WAL is ever being written at a time. When a WAL hits a configured maximum size,
it is rolled. This is done internal to the implementation.
As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. A bunch of work in the below is done keeping account of these region sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only.
It is only practical to delete entire files. Thus, we delete an entire on-disk file
F
when all of the edits in F
have a log-sequence-id that's older
(smaller) than the most-recent flush.
To read an WAL, call WALFactory.createReader(org.apache.hadoop.fs.FileSystem,
org.apache.hadoop.fs.Path)
.
Modifier and Type | Class and Description |
---|---|
(package private) class |
FSHLog.RingBufferEventHandler
Handler that is run by the disruptor ringbuffer consumer.
|
(package private) static class |
FSHLog.RingBufferExceptionHandler
Exception handler to pass the disruptor ringbuffer.
|
(package private) static class |
FSHLog.SafePointZigZagLatch
This class is used coordinating two threads holding one thread at a
'safe point' while the orchestrating thread does some work that requires the first thread
paused: e.g.
|
private class |
FSHLog.SyncRunner
Thread to runs the hdfs sync call.
|
WAL.Entry, WAL.Reader
Modifier and Type | Field and Description |
---|---|
private ExecutorService |
appendExecutor
An executorservice that runs the disruptor AppendEventHandler append executor.
|
private NavigableMap<org.apache.hadoop.fs.Path,Map<byte[],Long>> |
byWalRegionSequenceIds
Map of WAL log file to the latest sequence ids of all regions it has entries of.
|
private ThreadLocal<SyncFuture> |
cachedSyncFutures
Map of
SyncFuture s owned by Thread objects. |
private DrainBarrier |
closeBarrier
The barrier used to ensure that close() waits for all log rolls and flushes to finish.
|
private boolean |
closed |
private AtomicInteger |
closeErrorCount |
private int |
closeErrorsTolerated
Number of log close errors tolerated before we abort
|
protected org.apache.hadoop.conf.Configuration |
conf
conf object
|
private AtomicInteger |
consecutiveLogRolls |
private WALCoprocessorHost |
coprocessorHost |
private static int |
DEFAULT_SLOW_SYNC_TIME_MS |
private static int |
DEFAULT_WAL_SYNC_TIMEOUT_MS |
private com.lmax.disruptor.dsl.Disruptor<RingBufferTruck> |
disruptor
The nexus at which all incoming handlers meet.
|
private AtomicLong |
filenum |
static long |
FIXED_OVERHEAD |
protected org.apache.hadoop.fs.FileSystem |
fs
file system instance
|
private org.apache.hadoop.fs.Path |
fullPathArchiveDir
dir path where old logs are kept.
|
private org.apache.hadoop.fs.Path |
fullPathLogDir
WAL directory, where all WAL files would be placed.
|
private org.apache.hadoop.fs.FSDataOutputStream |
hdfs_out
FSDataOutputStream associated with the current SequenceFile.writer
|
private AtomicLong |
highestSyncedSequence
Updated to the ring buffer sequence of the last successful sync call.
|
private long |
highestUnsyncedSequence
The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
ring buffer sequence.
|
private long |
lastTimeCheckLowReplication |
private List<WALActionsListener> |
listeners
Listeners that are called on WAL events.
|
private static org.apache.commons.logging.Log |
LOG |
(package private) Comparator<org.apache.hadoop.fs.Path> |
LOG_NAME_COMPARATOR
WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
|
private String |
logFilePrefix
Prefix of a WAL file, usually the region server name it is hosted on.
|
private String |
logFileSuffix
Suffix included on generated wal file names
|
private long |
logrollsize |
private boolean |
lowReplicationRollEnabled |
private int |
lowReplicationRollLimit |
private int |
maxLogs |
private int |
minTolerableReplication |
private AtomicInteger |
numEntries |
private org.apache.hadoop.fs.PathFilter |
ourFiles
Matches just those wal files that belong to this wal instance.
|
private String |
prefixPathStr
Prefix used when checking for wal membership.
|
private FSHLog.RingBufferEventHandler |
ringBufferEventHandler
This fellow is run by the above appendExecutor service but it is all about batching up appends
and syncs; it may shutdown without cleaning out the last few appends or syncs.
|
private ReentrantLock |
rollWriterLock
This lock makes sure only one log roll runs at a time.
|
private SequenceIdAccounting |
sequenceIdAccounting
Class that does accounting of sequenceids in WAL subsystem.
|
private AtomicBoolean |
shutdown |
private int |
slowSyncNs |
private AtomicLong |
totalLogSize
The total size of wal
|
private long |
walSyncTimeout |
(package private) WALProvider.Writer |
writer
Current log file.
|
Constructor and Description |
---|
FSHLog(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path root,
String logDir,
org.apache.hadoop.conf.Configuration conf)
Constructor.
|
FSHLog(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path rootDir,
String logDir,
String archiveDir,
org.apache.hadoop.conf.Configuration conf,
List<WALActionsListener> listeners,
boolean failIfWALExists,
String prefix,
String suffix)
Create an edit log at the given
dir location. |
Modifier and Type | Method and Description |
---|---|
void |
abortCacheFlush(byte[] encodedRegionName)
Abort a cache flush.
|
protected void |
afterCreatingZigZagLatch()
Used to manufacture race condition reliably.
|
long |
append(HTableDescriptor htd,
HRegionInfo hri,
WALKey key,
WALEdit edits,
boolean inMemstore)
Append a set of edits to the WAL.
|
private void |
archiveLogFile(org.apache.hadoop.fs.Path p) |
(package private) void |
atHeadOfRingBufferEventHandlerAppend()
Exposed for testing only.
|
protected void |
beforeWaitOnSafePoint() |
private org.apache.htrace.Span |
blockOnSync(SyncFuture syncFuture) |
private int |
calculateMaxLogFiles(float memstoreSizeRatio,
long logRollSize) |
void |
checkLogRoll()
Schedule a log roll if needed.
|
private boolean |
checkLowReplication() |
private void |
cleanOldLogs()
Archive old logs.
|
void |
close()
Caller no longer needs any edits from this WAL.
|
void |
completeCacheFlush(byte[] encodedRegionName)
Complete the cache flush.
|
protected org.apache.hadoop.fs.Path |
computeFilename(long filenum)
This is a convenience method that computes a new filename with a given
file-number.
|
private IOException |
convertInterruptedExceptionToIOException(InterruptedException ie) |
protected WALProvider.Writer |
createWriterInstance(org.apache.hadoop.fs.Path path)
This method allows subclasses to inject different writers without having to
extend other methods like rollWriter().
|
private static IOException |
ensureIOException(Throwable t) |
(package private) byte[][] |
findRegionsToForceFlush()
If the number of un-archived WAL files is greater than maximum allowed, check the first
(oldest) WAL file, and returns those regions which should be flushed so that it can
be archived.
|
WALCoprocessorHost |
getCoprocessorHost() |
org.apache.hadoop.fs.Path |
getCurrentFileName()
This is a convenience method that computes a new filename with a given
using the current WAL file-number
|
long |
getEarliestMemstoreSeqNum(byte[] encodedRegionName)
Gets the earliest unflushed sequence id in the memstore for the region.
|
long |
getEarliestMemstoreSeqNum(byte[] encodedRegionName,
byte[] familyName)
Gets the earliest unflushed sequence id in the memstore for the store.
|
protected long |
getFileNumFromFileName(org.apache.hadoop.fs.Path fileName)
A log file has a creation timestamp (in ms) in its file name (
filenum . |
protected org.apache.hadoop.fs.FileStatus[] |
getFiles()
Get the backing files associated with this WAL.
|
long |
getLastTimeCheckLowReplication() |
long |
getLogFileSize() |
(package private) int |
getLogReplication()
This method gets the datanode replication count for the current WAL.
|
private org.apache.hadoop.fs.Path |
getNewPath()
retrieve the next path to use for writing.
|
int |
getNumLogFiles() |
int |
getNumRolledLogFiles() |
(package private) org.apache.hadoop.fs.Path |
getOldPath() |
(package private) OutputStream |
getOutputStream()
Currently, we need to expose the writer's OutputStream to tests so that they can manipulate
the default behavior (such as setting the maxRecoveryErrorCount value for example (see
TestWALReplay.testReplayEditsWrittenIntoWAL() ). |
(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[] |
getPipeLine()
This method gets the pipeline for the current WAL.
|
private long |
getSequenceOnRingBuffer() |
private SyncFuture |
getSyncFuture(long sequence,
org.apache.htrace.Span span) |
(package private) long |
getUnflushedEntriesCount() |
static org.apache.hadoop.fs.Path |
getWALArchivePath(org.apache.hadoop.fs.Path archiveDir,
org.apache.hadoop.fs.Path p) |
(package private) boolean |
isLowReplicationRollEnabled() |
(package private) boolean |
isUnflushedEntries() |
static void |
main(String[] args)
Pass one or more log file names and it will either dump out a text version
on
stdout or split the specified log files. |
private long |
postAppend(WAL.Entry e,
long elapsedTime) |
private void |
postSync(long timeInNanos,
int handlerSyncs) |
private void |
preemptiveSync(ProtobufLogWriter nextWriter)
Run a sync after opening to set up the pipeline.
|
private SyncFuture |
publishSyncOnRingBuffer(long sequence) |
private SyncFuture |
publishSyncOnRingBuffer(long sequence,
org.apache.htrace.Span span) |
private SyncFuture |
publishSyncOnRingBuffer(org.apache.htrace.Span span) |
private org.apache.htrace.Span |
publishSyncThenBlockOnCompletion(org.apache.htrace.Span span) |
void |
registerWALActionsListener(WALActionsListener listener)
Registers WALActionsListener
|
(package private) org.apache.hadoop.fs.Path |
replaceWriter(org.apache.hadoop.fs.Path oldPath,
org.apache.hadoop.fs.Path newPath,
WALProvider.Writer nextWriter,
org.apache.hadoop.fs.FSDataOutputStream nextHdfsOut)
Cleans up current writer closing it and then puts in place the passed in
nextWriter . |
void |
requestLogRoll() |
private void |
requestLogRoll(boolean tooFewReplicas) |
byte[][] |
rollWriter()
Roll the log writer.
|
byte[][] |
rollWriter(boolean force)
Roll the log writer.
|
void |
shutdown()
Stop accepting new writes.
|
private static void |
split(org.apache.hadoop.conf.Configuration conf,
org.apache.hadoop.fs.Path p) |
Long |
startCacheFlush(byte[] encodedRegionName,
Set<byte[]> families)
WAL keeps track of the sequence numbers that are as yet not flushed im memstores
in order to be able to do accounting to figure which WALs can be let go.
|
void |
sync()
Sync what we have in the WAL.
|
void |
sync(long txid)
Sync the WAL if the txId was not already sync'd.
|
private void |
tellListenersAboutPostLogRoll(org.apache.hadoop.fs.Path oldPath,
org.apache.hadoop.fs.Path newPath)
Tell listeners about post log roll.
|
private void |
tellListenersAboutPreLogRoll(org.apache.hadoop.fs.Path oldPath,
org.apache.hadoop.fs.Path newPath)
Tell listeners about pre log roll.
|
String |
toString()
Human readable identifying information about the state of this WAL.
|
boolean |
unregisterWALActionsListener(WALActionsListener listener)
Unregisters WALActionsListener
|
private static void |
usage() |
private static final org.apache.commons.logging.Log LOG
private static final int DEFAULT_SLOW_SYNC_TIME_MS
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS
private final com.lmax.disruptor.dsl.Disruptor<RingBufferTruck> disruptor
private final ExecutorService appendExecutor
private final FSHLog.RingBufferEventHandler ringBufferEventHandler
private final ThreadLocal<SyncFuture> cachedSyncFutures
SyncFuture
s owned by Thread objects. Used so we reuse SyncFutures.
Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
private volatile long highestUnsyncedSequence
private final AtomicLong highestSyncedSequence
highestUnsyncedSequence
for case where we have an append where a sync has not yet
come in for it. Maintained by the syncing threads.protected final org.apache.hadoop.fs.FileSystem fs
private final org.apache.hadoop.fs.Path fullPathLogDir
private final org.apache.hadoop.fs.Path fullPathArchiveDir
private final org.apache.hadoop.fs.PathFilter ourFiles
private final String logFilePrefix
private final String logFileSuffix
private final String prefixPathStr
private final WALCoprocessorHost coprocessorHost
protected final org.apache.hadoop.conf.Configuration conf
private final List<WALActionsListener> listeners
private org.apache.hadoop.fs.FSDataOutputStream hdfs_out
private final int minTolerableReplication
private final int slowSyncNs
private final long walSyncTimeout
private final AtomicInteger consecutiveLogRolls
private final int lowReplicationRollLimit
private volatile boolean lowReplicationRollEnabled
private SequenceIdAccounting sequenceIdAccounting
volatile WALProvider.Writer writer
private final DrainBarrier closeBarrier
private final ReentrantLock rollWriterLock
private volatile boolean closed
private final AtomicBoolean shutdown
private final AtomicLong filenum
private final AtomicInteger numEntries
private final long logrollsize
private AtomicLong totalLogSize
private final int maxLogs
private final int closeErrorsTolerated
private final AtomicInteger closeErrorCount
private volatile long lastTimeCheckLowReplication
final Comparator<org.apache.hadoop.fs.Path> LOG_NAME_COMPARATOR
private NavigableMap<org.apache.hadoop.fs.Path,Map<byte[],Long>> byWalRegionSequenceIds
public static final long FIXED_OVERHEAD
public FSHLog(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path root, String logDir, org.apache.hadoop.conf.Configuration conf) throws IOException
fs
- filesystem handleroot
- path for stored and archived walslogDir
- dir where wals are storedconf
- configuration to useIOException
public FSHLog(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException
dir
location.
You should never have to load an existing log. If there is a log at
startup, it should have already been processed and deleted by the time the
WAL object is started up.fs
- filesystem handlerootDir
- path to where logs and oldlogslogDir
- dir where wals are storedarchiveDir
- dir where wals are archivedconf
- configuration to uselisteners
- Listeners on WAL events. Listeners passed here will
be registered before we do anything else; e.g. the
Constructor rollWriter()
.failIfWALExists
- If true IOException will be thrown if files related to this wal
already exist.prefix
- should always be hostname and port in distributed env and
it will be URL encoded before being used.
If prefix is null, "wal" will be usedsuffix
- will be url encoded. null is treated as empty. non-empty must start with
DefaultWALProvider.WAL_FILE_NAME_DELIMITER
IOException
public void registerWALActionsListener(WALActionsListener listener)
WAL
registerWALActionsListener
in interface WAL
public boolean unregisterWALActionsListener(WALActionsListener listener)
WAL
unregisterWALActionsListener
in interface WAL
public WALCoprocessorHost getCoprocessorHost()
getCoprocessorHost
in interface WAL
private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize)
protected org.apache.hadoop.fs.FileStatus[] getFiles() throws IOException
IOException
OutputStream getOutputStream()
TestWALReplay.testReplayEditsWrittenIntoWAL()
). This is done using reflection on the
underlying HDFS OutputStream.
NOTE: This could be removed once Hadoop1 support is removed.public byte[][] rollWriter() throws FailedLogCloseException, IOException
WAL
The implementation is synchronized in order to make sure there's one rollWriter running at any given time.
rollWriter
in interface WAL
HRegionInfo.getEncodedName()
FailedLogCloseException
IOException
private org.apache.hadoop.fs.Path getNewPath() throws IOException
IOException
org.apache.hadoop.fs.Path getOldPath()
private void tellListenersAboutPreLogRoll(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath) throws IOException
IOException
private void tellListenersAboutPostLogRoll(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath) throws IOException
IOException
private void preemptiveSync(ProtobufLogWriter nextWriter)
nextWriter
- startTimeNanos
- public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException
WAL
The implementation is synchronized in order to make sure there's one rollWriter running at any given time.
rollWriter
in interface WAL
force
- If true, force creation of a new writer even if no entries have
been written to the current writerHRegionInfo.getEncodedName()
FailedLogCloseException
IOException
protected WALProvider.Writer createWriterInstance(org.apache.hadoop.fs.Path path) throws IOException
IOException
private void cleanOldLogs() throws IOException
IOException
byte[][] findRegionsToForceFlush() throws IOException
IOException
protected void afterCreatingZigZagLatch()
beforeWaitOnSafePoint()
protected void beforeWaitOnSafePoint()
afterCreatingZigZagLatch()
org.apache.hadoop.fs.Path replaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.Writer nextWriter, org.apache.hadoop.fs.FSDataOutputStream nextHdfsOut) throws IOException
nextWriter
.
In the case of creating a new WAL, oldPath will be null.
In the case of rolling over from one file to the next, none of the params will be null.
In the case of closing out this FSHLog with no further use newPath, nextWriter, and
nextHdfsOut will be null.oldPath
- may be nullnewPath
- may be nullnextWriter
- may be nullnextHdfsOut
- may be nullnewPath
IOException
- if there is a problem flushing or closing the underlying FSlong getUnflushedEntriesCount()
boolean isUnflushedEntries()
public static org.apache.hadoop.fs.Path getWALArchivePath(org.apache.hadoop.fs.Path archiveDir, org.apache.hadoop.fs.Path p)
private void archiveLogFile(org.apache.hadoop.fs.Path p) throws IOException
IOException
protected org.apache.hadoop.fs.Path computeFilename(long filenum)
filenum
- to usepublic org.apache.hadoop.fs.Path getCurrentFileName()
public String toString()
WAL
protected long getFileNumFromFileName(org.apache.hadoop.fs.Path fileName)
filenum
.
This helper method returns the creation timestamp from a given log file.
It extracts the timestamp assuming the filename is created with the
computeFilename(long filenum)
method.fileName
- public void close() throws IOException
WAL
close
in interface Closeable
close
in interface AutoCloseable
close
in interface WAL
IOException
public void shutdown() throws IOException
WAL
shutdown
in interface WAL
IOException
public long append(HTableDescriptor htd, HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException
WAL
key
will
have the region edit/sequence id filled in.append
in interface WAL
htd
- used to give scope for replication TODO refactor out in favor of table name and
infokey
- Modified by this call; we add to it this edits region edit/sequence id.edits
- Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
sequence id that is after all currently appended edits.inMemstore
- Always true except for case where we are writing a compaction completion
record into the WAL; in this case the entry is just so we can finish an unfinished compaction
-- it is not an edit for memstore.key
will have the region edit/sequence id
in it.IOException
public void checkLogRoll()
private boolean checkLowReplication()
private SyncFuture publishSyncOnRingBuffer(long sequence)
private long getSequenceOnRingBuffer()
private SyncFuture publishSyncOnRingBuffer(org.apache.htrace.Span span)
private SyncFuture publishSyncOnRingBuffer(long sequence, org.apache.htrace.Span span)
private org.apache.htrace.Span publishSyncThenBlockOnCompletion(org.apache.htrace.Span span) throws IOException
IOException
private org.apache.htrace.Span blockOnSync(SyncFuture syncFuture) throws IOException
IOException
private IOException convertInterruptedExceptionToIOException(InterruptedException ie)
private SyncFuture getSyncFuture(long sequence, org.apache.htrace.Span span)
private void postSync(long timeInNanos, int handlerSyncs)
private long postAppend(WAL.Entry e, long elapsedTime)
int getLogReplication()
public void sync() throws IOException
WAL
sync
in interface WAL
IOException
public void sync(long txid) throws IOException
WAL
sync
in interface WAL
txid
- Transaction id to sync to.IOException
public void requestLogRoll()
private void requestLogRoll(boolean tooFewReplicas)
public int getNumRolledLogFiles()
public int getNumLogFiles()
public long getLogFileSize()
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families)
WAL
Currently, it is expected that the update lock is held for the region; i.e. no concurrent appends while we set up cache flush.
startCacheFlush
in interface WAL
families
- Families to flush. May be a subset of all families in the region.HConstants.NO_SEQNUM
if we are flushing the whole region OR if
we are flushing a subset of all families but there are no edits in those families not
being flushed; in other words, this is effectively same as a flush of all of the region
though we were passed a subset of regions. Otherwise, it returns the sequence id of the
oldest/lowest outstanding edit.WAL.completeCacheFlush(byte[])
,
WAL.abortCacheFlush(byte[])
public void completeCacheFlush(byte[] encodedRegionName)
WAL
completeCacheFlush
in interface WAL
encodedRegionName
- Encoded region name.WAL.startCacheFlush(byte[], Set)
,
WAL.abortCacheFlush(byte[])
public void abortCacheFlush(byte[] encodedRegionName)
WAL
abortCacheFlush
in interface WAL
encodedRegionName
- Encoded region name.boolean isLowReplicationRollEnabled()
private static void split(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.Path p) throws IOException
IOException
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName)
WAL
getEarliestMemstoreSeqNum
in interface WAL
encodedRegionName
- The region to get the number for.public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName)
WAL
getEarliestMemstoreSeqNum
in interface WAL
encodedRegionName
- The region to get the number for.familyName
- The family to get the number for.void atHeadOfRingBufferEventHandlerAppend()
private static IOException ensureIOException(Throwable t)
private static void usage()
public static void main(String[] args) throws IOException
stdout
or split the specified log files.args
- IOException
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeLine()
public long getLastTimeCheckLowReplication()
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.