@InterfaceAudience.Private @InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase
Modifier and Type | Class and Description |
---|---|
static interface |
WALProcedureStore.LeaseRecovery |
private static class |
WALProcedureStore.PushType |
ProcedureStore.ProcedureIterator, ProcedureStore.ProcedureLoader, ProcedureStore.ProcedureStoreListener
Constructor and Description |
---|
WALProcedureStore(org.apache.hadoop.conf.Configuration conf,
org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path logDir,
WALProcedureStore.LeaseRecovery leaseRecovery) |
Modifier and Type | Method and Description |
---|---|
private ByteSlot |
acquireSlot() |
private void |
closeStream() |
void |
delete(long procId)
The specified procId was removed from the executor,
due to completion, abort or failure.
|
ArrayList<ProcedureWALFile> |
getActiveLogs() |
Set<ProcedureWALFile> |
getCorruptedLogs() |
org.apache.hadoop.fs.FileSystem |
getFileSystem() |
org.apache.hadoop.fs.Path |
getLogDir() |
protected org.apache.hadoop.fs.Path |
getLogFilePath(long logId) |
private org.apache.hadoop.fs.FileStatus[] |
getLogFiles() |
private static long |
getLogIdFromName(String name) |
private static long |
getMaxLogId(org.apache.hadoop.fs.FileStatus[] logFiles) |
private long |
getMillisFromLastRoll() |
private long |
getMillisToNextPeriodicRoll() |
int |
getNumThreads() |
ProcedureStoreTracker |
getStoreTracker() |
private ProcedureWALFile |
initOldLog(org.apache.hadoop.fs.FileStatus logFile) |
private long |
initOldLogs(org.apache.hadoop.fs.FileStatus[] logFiles) |
private void |
initTrackerFromOldLogs() |
void |
insert(Procedure proc,
Procedure[] subprocs)
When a procedure is submitted to the executor insert(proc, null) will be called.
|
private boolean |
isSyncAborted() |
void |
load(ProcedureStore.ProcedureLoader loader)
Load the Procedures in the store.
|
private void |
periodicRoll() |
protected void |
periodicRollForTesting() |
private long |
pushData(WALProcedureStore.PushType type,
ByteSlot slot,
long procId,
long[] subProcIds) |
void |
recoverLease()
Acquire the lease for the procedure store.
|
private void |
releaseSlot(ByteSlot slot) |
private void |
removeAllLogs(long lastLogId) |
private void |
removeInactiveLogs() |
private boolean |
removeLogFile(ProcedureWALFile log) |
private boolean |
rollWriter() |
private boolean |
rollWriter(long logId) |
protected boolean |
rollWriterForTesting() |
private boolean |
rollWriterOrDie() |
private void |
sendStopSignal() |
void |
start(int numSlots)
Start/Open the procedure store
|
void |
stop(boolean abort)
Stop/Close the procedure store
|
private void |
syncLoop() |
private long |
syncSlots() |
protected long |
syncSlots(org.apache.hadoop.fs.FSDataOutputStream stream,
ByteSlot[] slots,
int offset,
int count) |
private boolean |
tryRollWriter() |
void |
update(Procedure proc)
The specified procedure was executed,
and the new state should be written to the store.
|
private void |
updateStoreTracker(WALProcedureStore.PushType type,
long procId,
long[] subProcIds) |
isRunning, registerListener, sendAbortProcessSignal, sendPostSyncSignal, setRunning, unregisterListener
private static final org.apache.commons.logging.Log LOG
public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY
private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL
public static final String WAIT_BEFORE_ROLL_CONF_KEY
private static final int DEFAULT_WAIT_BEFORE_ROLL
public static final String ROLL_RETRIES_CONF_KEY
private static final int DEFAULT_ROLL_RETRIES
public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY
private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL
public static final String PERIODIC_ROLL_CONF_KEY
private static final int DEFAULT_PERIODIC_ROLL
public static final String SYNC_WAIT_MSEC_CONF_KEY
private static final int DEFAULT_SYNC_WAIT_MSEC
public static final String USE_HSYNC_CONF_KEY
private static final boolean DEFAULT_USE_HSYNC
public static final String ROLL_THRESHOLD_CONF_KEY
private static final long DEFAULT_ROLL_THRESHOLD
private final LinkedList<ProcedureWALFile> logs
private final ProcedureStoreTracker storeTracker
private final ReentrantLock lock
private final Condition waitCond
private final Condition slotCond
private final Condition syncCond
private final WALProcedureStore.LeaseRecovery leaseRecovery
private final org.apache.hadoop.conf.Configuration conf
private final org.apache.hadoop.fs.FileSystem fs
private final org.apache.hadoop.fs.Path logDir
private final AtomicReference<Throwable> syncException
private final AtomicBoolean loading
private final AtomicBoolean inSync
private final AtomicLong totalSynced
private final AtomicLong lastRollTs
private LinkedTransferQueue<ByteSlot> slotsCache
private Set<ProcedureWALFile> corruptedLogs
private org.apache.hadoop.fs.FSDataOutputStream stream
private long flushLogId
private int slotIndex
private Thread syncThread
private ByteSlot[] slots
private int maxRetriesBeforeRoll
private int maxSyncFailureRoll
private int waitBeforeRoll
private int rollRetries
private int periodicRollMsec
private long rollThreshold
private boolean useHsync
private int syncWaitMsec
private static final org.apache.hadoop.fs.PathFilter WALS_PATH_FILTER
private static final Comparator<org.apache.hadoop.fs.FileStatus> FILE_STATUS_ID_COMPARATOR
public WALProcedureStore(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, WALProcedureStore.LeaseRecovery leaseRecovery)
public void start(int numSlots) throws IOException
ProcedureStore
IOException
public void stop(boolean abort)
ProcedureStore
abort
- true if the stop is an abortprivate void sendStopSignal()
public int getNumThreads()
public ProcedureStoreTracker getStoreTracker()
public ArrayList<ProcedureWALFile> getActiveLogs()
public Set<ProcedureWALFile> getCorruptedLogs()
public void recoverLease() throws IOException
ProcedureStore
IOException
public void load(ProcedureStore.ProcedureLoader loader) throws IOException
ProcedureStore
loader
- the ProcedureLoader that will handle the store-load eventsIOException
public void insert(Procedure proc, Procedure[] subprocs)
ProcedureStore
proc
- the procedure to serialize and write to the store.subprocs
- the newly created child of the proc.public void update(Procedure proc)
ProcedureStore
proc
- the procedure to serialize and write to the store.public void delete(long procId)
ProcedureStore
procId
- the ID of the procedure to remove.private ByteSlot acquireSlot()
private void releaseSlot(ByteSlot slot)
private long pushData(WALProcedureStore.PushType type, ByteSlot slot, long procId, long[] subProcIds)
private void updateStoreTracker(WALProcedureStore.PushType type, long procId, long[] subProcIds)
private boolean isSyncAborted()
protected long syncSlots(org.apache.hadoop.fs.FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) throws IOException
IOException
private boolean rollWriterOrDie()
private boolean tryRollWriter()
private long getMillisToNextPeriodicRoll()
private long getMillisFromLastRoll()
protected void periodicRollForTesting() throws IOException
IOException
protected boolean rollWriterForTesting() throws IOException
IOException
private void periodicRoll() throws IOException
IOException
private boolean rollWriter() throws IOException
IOException
private boolean rollWriter(long logId) throws IOException
IOException
private void closeStream()
private void removeInactiveLogs()
private void removeAllLogs(long lastLogId)
private boolean removeLogFile(ProcedureWALFile log)
public org.apache.hadoop.fs.Path getLogDir()
public org.apache.hadoop.fs.FileSystem getFileSystem()
protected org.apache.hadoop.fs.Path getLogFilePath(long logId) throws IOException
IOException
private static long getLogIdFromName(String name)
private org.apache.hadoop.fs.FileStatus[] getLogFiles() throws IOException
IOException
private static long getMaxLogId(org.apache.hadoop.fs.FileStatus[] logFiles)
private long initOldLogs(org.apache.hadoop.fs.FileStatus[] logFiles) throws IOException
IOException
private void initTrackerFromOldLogs()
private ProcedureWALFile initOldLog(org.apache.hadoop.fs.FileStatus logFile) throws IOException
IOException
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.