@InterfaceAudience.Private public class WALProcedureStore extends ProcedureStoreBase
start(int)
, then recoverLease()
,
then ProcedureStore.load(ProcedureLoader)
.
In recoverLease()
, we will get the lease by closing all the existing wal files(by
calling recoverFileLease), and creating a new wal writer. And we will also get the list of all
the old wal files.
FIXME: notice that the current recover lease implementation is problematic, it can not deal with
the races if there are two master both wants to acquire the lease...
In ProcedureStore.load(ProcedureLoader)
method, we will load all the active procedures. See the
comments of this method for more details.
The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is
a slots
, which is more like the ring buffer, and in the insert, update and delete
methods we will put thing into the slots
and wait. And there is a background sync
thread(see the syncLoop()
method) which get data from the slots
and write them
to the FileSystem, and notify the caller that we have finished.
TODO: try using disruptor to increase performance and simplify the logic?
The storeTracker
keeps track of the modified procedures in the newest wal file, which is
also the one being written currently. And the deleted bits in it are for all the procedures, not
only the ones in the newest wal file. And when rolling a log, we will first store it in the
trailer of the current wal file, and then reset its modified bits, so that it can start to track
the modified procedures for the new wal file.
The holdingCleanupTracker
is used to test whether we are safe to delete the oldest wal
file. When there are log rolling and there are more than 1 wal files, we will make use of it. It
will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
method ProcedureStoreTracker.resetTo(ProcedureStoreTracker, boolean)
, and then merge it
with the tracker of every newer wal files, using the
ProcedureStoreTracker.setDeletedIfModifiedInBoth(ProcedureStoreTracker)
.
If we find out
that all the modified procedures for the oldest wal file are modified or deleted in newer wal
files, then we can delete it. This is because that, every time we call
ProcedureStore.insert(Procedure[])
or ProcedureStore.update(Procedure)
, we will
persist the full state of a Procedure, so the earlier wal records for this procedure can all be
deleted.for printing content of a single WAL.
,
to parse a directory of MasterWALProcs.
Modifier and Type | Class and Description |
---|---|
static interface |
WALProcedureStore.LeaseRecovery |
private static class |
WALProcedureStore.PushType |
static class |
WALProcedureStore.SyncMetrics |
ProcedureStore.ProcedureIterator, ProcedureStore.ProcedureLoader, ProcedureStore.ProcedureStoreListener
Constructor and Description |
---|
WALProcedureStore(org.apache.hadoop.conf.Configuration conf,
org.apache.hadoop.fs.Path walDir,
org.apache.hadoop.fs.Path walArchiveDir,
WALProcedureStore.LeaseRecovery leaseRecovery) |
WALProcedureStore(org.apache.hadoop.conf.Configuration conf,
WALProcedureStore.LeaseRecovery leaseRecovery) |
Modifier and Type | Method and Description |
---|---|
private ByteSlot |
acquireSlot() |
private void |
buildHoldingCleanupTracker() |
private void |
closeCurrentLogStream(boolean abort) |
void |
delete(long procId)
The specified procId was removed from the executor,
due to completion, abort or failure.
|
private void |
delete(long[] procIds) |
void |
delete(long[] procIds,
int offset,
int count)
The specified procIds were removed from the executor,
due to completion, abort or failure.
|
void |
delete(Procedure<?> proc,
long[] subProcIds)
The parent procedure completed.
|
ArrayList<ProcedureWALFile> |
getActiveLogs() |
Set<ProcedureWALFile> |
getCorruptedLogs() |
org.apache.hadoop.fs.FileSystem |
getFileSystem() |
protected org.apache.hadoop.fs.Path |
getLogFilePath(long logId) |
private org.apache.hadoop.fs.FileStatus[] |
getLogFiles() |
private static long |
getLogIdFromName(String name) |
private static long |
getMaxLogId(org.apache.hadoop.fs.FileStatus[] logFiles)
Make sure that the file set are gotten by calling
getLogFiles() , where we will sort
the file set by log id. |
long |
getMillisFromLastRoll() |
long |
getMillisToNextPeriodicRoll() |
int |
getNumThreads() |
ProcedureStoreTracker |
getStoreTracker() |
ArrayList<WALProcedureStore.SyncMetrics> |
getSyncMetrics() |
(package private) org.apache.hadoop.fs.Path |
getWalArchiveDir() |
org.apache.hadoop.fs.Path |
getWALDir() |
private ProcedureWALFile |
initOldLog(org.apache.hadoop.fs.FileStatus logFile,
org.apache.hadoop.fs.Path walArchiveDir)
Loads given log file and it's tracker.
|
private long |
initOldLogs(org.apache.hadoop.fs.FileStatus[] logFiles)
Make sure that the file set are gotten by calling
getLogFiles() , where we will sort
the file set by log id. |
private void |
initTrackerFromOldLogs()
If last log's tracker is not null, use it as
storeTracker . |
void |
insert(Procedure<?>[] procs)
Serialize a set of new procedures.
|
void |
insert(Procedure<?> proc,
Procedure<?>[] subprocs)
When a procedure is submitted to the executor insert(proc, null) will be called.
|
private boolean |
isSyncAborted() |
void |
load(ProcedureStore.ProcedureLoader loader)
Load the Procedures in the store.
|
private void |
periodicRoll() |
(package private) void |
periodicRollForTesting() |
private long |
pushData(WALProcedureStore.PushType type,
ByteSlot slot,
long procId,
long[] subProcIds) |
void |
recoverLease()
Acquire the lease for the procedure store.
|
private void |
releaseSlot(ByteSlot slot) |
private void |
removeAllLogs(long lastLogId,
String why)
Remove all logs with logId <=
lastLogId . |
private void |
removeInactiveLogs() |
(package private) void |
removeInactiveLogsForTesting() |
private boolean |
removeLogFile(ProcedureWALFile log,
org.apache.hadoop.fs.Path walArchiveDir) |
private boolean |
rollWriter() |
(package private) boolean |
rollWriter(long logId) |
boolean |
rollWriterForTesting() |
private boolean |
rollWriterWithRetries() |
private void |
sendStopSignal() |
int |
setRunningProcedureCount(int count)
Set the number of procedure running.
|
void |
start(int numSlots)
Start/Open the procedure store
|
void |
stop(boolean abort)
Stop/Close the procedure store
|
private void |
syncLoop() |
private long |
syncSlots() |
protected long |
syncSlots(org.apache.hadoop.fs.FSDataOutputStream stream,
ByteSlot[] slots,
int offset,
int count) |
protected void |
syncStream(org.apache.hadoop.fs.FSDataOutputStream stream) |
private void |
tryCleanupLogsOnLoad() |
private boolean |
tryRollWriter() |
void |
update(Procedure<?> proc)
The specified procedure was executed,
and the new state should be written to the store.
|
private void |
updateStoreTracker(WALProcedureStore.PushType type,
long procId,
long[] subProcIds) |
isRunning, registerListener, sendAbortProcessSignal, sendForceUpdateSignal, sendPostSyncSignal, setRunning, unregisterListener
private static final org.slf4j.Logger LOG
public static final String LOG_PREFIX
public static final String MASTER_PROCEDURE_LOGDIR
public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD
public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY
private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY
public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY
private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL
public static final String WAIT_BEFORE_ROLL_CONF_KEY
private static final int DEFAULT_WAIT_BEFORE_ROLL
public static final String ROLL_RETRIES_CONF_KEY
private static final int DEFAULT_ROLL_RETRIES
public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY
private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL
public static final String PERIODIC_ROLL_CONF_KEY
private static final int DEFAULT_PERIODIC_ROLL
public static final String SYNC_WAIT_MSEC_CONF_KEY
private static final int DEFAULT_SYNC_WAIT_MSEC
public static final String USE_HSYNC_CONF_KEY
private static final boolean DEFAULT_USE_HSYNC
public static final String ROLL_THRESHOLD_CONF_KEY
private static final long DEFAULT_ROLL_THRESHOLD
public static final String STORE_WAL_SYNC_STATS_COUNT
private static final int DEFAULT_SYNC_STATS_COUNT
private final LinkedList<ProcedureWALFile> logs
private final ProcedureStoreTracker holdingCleanupTracker
private final ProcedureStoreTracker storeTracker
private final ReentrantLock lock
private final WALProcedureStore.LeaseRecovery leaseRecovery
private final org.apache.hadoop.conf.Configuration conf
private final org.apache.hadoop.fs.FileSystem fs
private final org.apache.hadoop.fs.Path walDir
private final org.apache.hadoop.fs.Path walArchiveDir
private final boolean enforceStreamCapability
private final AtomicReference<Throwable> syncException
private final AtomicBoolean loading
private final AtomicBoolean inSync
private final AtomicLong totalSynced
private final AtomicLong lastRollTs
private final AtomicLong syncId
private LinkedTransferQueue<ByteSlot> slotsCache
private Set<ProcedureWALFile> corruptedLogs
private org.apache.hadoop.fs.FSDataOutputStream stream
private int runningProcCount
private long flushLogId
private int syncMaxSlot
private int slotIndex
private Thread syncThread
private int walCountWarnThreshold
private int maxRetriesBeforeRoll
private int maxSyncFailureRoll
private int waitBeforeRoll
private int rollRetries
private int periodicRollMsec
private long rollThreshold
private boolean useHsync
private int syncWaitMsec
private org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue<WALProcedureStore.SyncMetrics> syncMetricsQueue
private static final org.apache.hadoop.fs.PathFilter WALS_PATH_FILTER
private static final Comparator<org.apache.hadoop.fs.FileStatus> FILE_STATUS_ID_COMPARATOR
public WALProcedureStore(org.apache.hadoop.conf.Configuration conf, WALProcedureStore.LeaseRecovery leaseRecovery) throws IOException
IOException
public WALProcedureStore(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.Path walDir, org.apache.hadoop.fs.Path walArchiveDir, WALProcedureStore.LeaseRecovery leaseRecovery) throws IOException
IOException
public void start(int numSlots) throws IOException
ProcedureStore
IOException
public void stop(boolean abort)
ProcedureStore
abort
- true if the stop is an abortprivate void sendStopSignal()
public int getNumThreads()
public int setRunningProcedureCount(int count)
ProcedureStore
count
).public ProcedureStoreTracker getStoreTracker()
public ArrayList<ProcedureWALFile> getActiveLogs()
public Set<ProcedureWALFile> getCorruptedLogs()
public void recoverLease() throws IOException
ProcedureStore
IOException
public void load(ProcedureStore.ProcedureLoader loader) throws IOException
ProcedureStore
loader
- the ProcedureLoader that will handle the store-load eventsIOException
private void tryCleanupLogsOnLoad()
public void insert(Procedure<?> proc, Procedure<?>[] subprocs)
ProcedureStore
proc
- the procedure to serialize and write to the store.subprocs
- the newly created child of the proc.public void insert(Procedure<?>[] procs)
ProcedureStore
procs
- the procedures to serialize and write to the store.public void update(Procedure<?> proc)
ProcedureStore
proc
- the procedure to serialize and write to the store.public void delete(long procId)
ProcedureStore
procId
- the ID of the procedure to remove.public void delete(Procedure<?> proc, long[] subProcIds)
ProcedureStore
proc
- the parent procedure to serialize and write to the store.subProcIds
- the IDs of the sub-procedure to remove.public void delete(long[] procIds, int offset, int count)
ProcedureStore
procIds
- the IDs of the procedures to remove.offset
- the array offset from where to start to deletecount
- the number of IDs to deleteprivate void delete(long[] procIds)
private ByteSlot acquireSlot()
private void releaseSlot(ByteSlot slot)
private long pushData(WALProcedureStore.PushType type, ByteSlot slot, long procId, long[] subProcIds)
private void updateStoreTracker(WALProcedureStore.PushType type, long procId, long[] subProcIds)
private boolean isSyncAborted()
public ArrayList<WALProcedureStore.SyncMetrics> getSyncMetrics()
protected long syncSlots(org.apache.hadoop.fs.FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) throws IOException
IOException
protected void syncStream(org.apache.hadoop.fs.FSDataOutputStream stream) throws IOException
IOException
private boolean rollWriterWithRetries()
private boolean tryRollWriter()
public long getMillisToNextPeriodicRoll()
public long getMillisFromLastRoll()
void periodicRollForTesting() throws IOException
IOException
public boolean rollWriterForTesting() throws IOException
IOException
void removeInactiveLogsForTesting() throws Exception
Exception
private void periodicRoll() throws IOException
IOException
private boolean rollWriter() throws IOException
IOException
boolean rollWriter(long logId) throws IOException
IOException
private void closeCurrentLogStream(boolean abort)
private void removeInactiveLogs() throws IOException
IOException
private void buildHoldingCleanupTracker()
private void removeAllLogs(long lastLogId, String why)
lastLogId
.private boolean removeLogFile(ProcedureWALFile log, org.apache.hadoop.fs.Path walArchiveDir)
public org.apache.hadoop.fs.Path getWALDir()
org.apache.hadoop.fs.Path getWalArchiveDir()
public org.apache.hadoop.fs.FileSystem getFileSystem()
protected org.apache.hadoop.fs.Path getLogFilePath(long logId) throws IOException
IOException
private static long getLogIdFromName(String name)
private org.apache.hadoop.fs.FileStatus[] getLogFiles() throws IOException
IOException
private static long getMaxLogId(org.apache.hadoop.fs.FileStatus[] logFiles)
getLogFiles()
, where we will sort
the file set by log id.private long initOldLogs(org.apache.hadoop.fs.FileStatus[] logFiles) throws IOException
getLogFiles()
, where we will sort
the file set by log id.IOException
private void initTrackerFromOldLogs()
storeTracker
. Otherwise, set storeTracker
as partial, and let ProcedureWALFormatReader
rebuild it using entries in the log.private ProcedureWALFile initOldLog(org.apache.hadoop.fs.FileStatus logFile, org.apache.hadoop.fs.Path walArchiveDir) throws IOException
IOException
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.