@InterfaceAudience.Private public class ProcedureExecutor<TEnvironment> extends Object
Modifier and Type | Class and Description |
---|---|
private class |
ProcedureExecutor.KeepAliveWorkerThread |
static interface |
ProcedureExecutor.ProcedureExecutorListener |
static class |
ProcedureExecutor.Testing
Class with parameters describing how to fail/die when in testing-context.
|
private class |
ProcedureExecutor.WorkerMonitor |
private class |
ProcedureExecutor.WorkerThread |
Modifier and Type | Field and Description |
---|---|
private AtomicInteger |
activeExecutorCount |
static String |
CHECK_OWNER_SET_CONF_KEY |
private boolean |
checkOwnerSet |
private ConcurrentHashMap<Long,CompletedProcedureRetainer<TEnvironment>> |
completed
Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
|
private org.apache.hadoop.conf.Configuration |
conf |
private int |
corePoolSize |
(package private) static int |
DEFAULT_ACKED_EVICT_TTL |
private static boolean |
DEFAULT_CHECK_OWNER_SET |
(package private) static int |
DEFAULT_EVICT_TTL |
private static long |
DEFAULT_WORKER_KEEP_ALIVE_TIME |
private TEnvironment |
environment |
static String |
EVICT_ACKED_TTL_CONF_KEY |
static String |
EVICT_TTL_CONF_KEY |
private Executor |
forceUpdateExecutor |
private long |
keepAliveTime |
private AtomicLong |
lastProcId |
private CopyOnWriteArrayList<ProcedureExecutor.ProcedureExecutorListener> |
listeners |
private static org.slf4j.Logger |
LOG |
private int |
maxPoolSize |
private ConcurrentHashMap<NonceKey,Long> |
nonceKeysToProcIdsMap
Helper map to lookup whether the procedure already issued from the same client.
|
private ConcurrentHashMap<Long,Procedure<TEnvironment>> |
procedures
Helper map to lookup the live procedures by ID.
|
private IdLock |
procExecutionLock |
private ConcurrentHashMap<Long,RootProcedureState<TEnvironment>> |
rollbackStack
Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
|
private AtomicBoolean |
running |
private ProcedureScheduler |
scheduler
Scheduler/Queue that contains runnable procedures.
|
private ProcedureStore |
store |
(package private) ProcedureExecutor.Testing |
testing
testing is non-null when ProcedureExecutor is being tested. |
private ThreadGroup |
threadGroup
Created in the
init(int, boolean) method. |
private TimeoutExecutorThread<TEnvironment> |
timeoutExecutor
Created in the
init(int, boolean) method. |
static String |
WORKER_KEEP_ALIVE_TIME_CONF_KEY |
private AtomicLong |
workerId |
private TimeoutExecutorThread<TEnvironment> |
workerMonitorExecutor
WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
there is no worker to assign meta, it will new worker thread for it, so it is very important.
|
private CopyOnWriteArrayList<ProcedureExecutor.WorkerThread> |
workerThreads
Created in the
init(int, boolean) method. |
Constructor and Description |
---|
ProcedureExecutor(org.apache.hadoop.conf.Configuration conf,
TEnvironment environment,
ProcedureStore store) |
ProcedureExecutor(org.apache.hadoop.conf.Configuration conf,
TEnvironment environment,
ProcedureStore store,
ProcedureScheduler scheduler) |
Modifier and Type | Method and Description |
---|---|
boolean |
abort(long procId)
Send an abort notification the specified procedure.
|
boolean |
abort(long procId,
boolean mayInterruptIfRunning)
Send an abort notification to the specified procedure.
|
private Procedure.LockState |
acquireLock(Procedure<TEnvironment> proc) |
void |
addChore(ProcedureInMemoryChore<TEnvironment> chore)
Add a chore procedure to the executor
|
List<Boolean> |
bypassProcedure(List<Long> pids,
long lockWait,
boolean force,
boolean recursive)
Bypass a procedure.
|
(package private) boolean |
bypassProcedure(long pid,
long lockWait,
boolean override,
boolean recursive) |
private void |
cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) |
private void |
countDownChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) |
NonceKey |
createNonceKey(long nonceGroup,
long nonce)
Create a NonceKey from the specified nonceGroup and nonce.
|
private void |
execCompletionCleanup(Procedure<TEnvironment> proc) |
private void |
execProcedure(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure)
Executes
procedure
Calls the doExecute() of the procedure
If the procedure execution didn't fail (i.e. |
private void |
executeProcedure(Procedure<TEnvironment> proc) |
private Procedure.LockState |
executeRollback(long rootProcId,
RootProcedureState<TEnvironment> procStack)
Execute the rollback of the full procedure stack.
|
private Procedure.LockState |
executeRollback(Procedure<TEnvironment> proc)
Execute the rollback of the procedure step.
|
private void |
forceUpdateProcedure(long procId) |
int |
getActiveExecutorCount() |
Collection<Procedure<TEnvironment>> |
getActiveProceduresNoCopy()
Should only be used when starting up, where the procedure workers have not been started.
|
Set<Long> |
getActiveProcIds() |
(package private) int |
getCompletedSize() |
int |
getCorePoolSize() |
TEnvironment |
getEnvironment() |
long |
getKeepAliveTime(TimeUnit timeUnit) |
protected long |
getLastProcId() |
<T extends Procedure<TEnvironment>> |
getProcedure(Class<T> clazz,
long procId) |
Procedure<TEnvironment> |
getProcedure(long procId) |
List<Procedure<TEnvironment>> |
getProcedures()
Get procedures.
|
(package private) ProcedureScheduler |
getProcedureScheduler() |
IdLock |
getProcExecutionLock() |
(package private) RootProcedureState<TEnvironment> |
getProcStack(long rootProcId) |
Procedure<TEnvironment> |
getResult(long procId) |
Procedure<TEnvironment> |
getResultOrProcedure(long procId) |
(package private) Long |
getRootProcedureId(Procedure<TEnvironment> proc) |
(package private) ProcedureScheduler |
getScheduler() |
ProcedureStore |
getStore() |
int |
getWorkerThreadCount() |
private void |
handleInterruptedException(Procedure<TEnvironment> proc,
InterruptedException e) |
void |
init(int numThreads,
boolean abortOnCorruption)
Initialize the procedure executor, but do not start workers.
|
private Procedure<TEnvironment>[] |
initializeChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure,
Procedure<TEnvironment>[] subprocs) |
boolean |
isFinished(long procId)
Return true if the procedure is finished.
|
boolean |
isProcedureOwner(long procId,
User user)
Check if the user is this procedure's owner
|
private boolean |
isRootFinished(Procedure<?> proc) |
boolean |
isRunning() |
boolean |
isStarted(long procId)
Return true if the procedure is started.
|
void |
join() |
private void |
kill(String msg) |
private void |
load(boolean abortOnCorruption) |
private void |
loadProcedures(ProcedureStore.ProcedureIterator procIter,
boolean abortOnCorruption) |
private long |
nextProcId() |
private Procedure<TEnvironment> |
prepareProcedure(Procedure<TEnvironment> proc) |
private void |
procedureFinished(Procedure<TEnvironment> proc) |
private long |
pushProcedure(Procedure<TEnvironment> proc) |
void |
refreshConfiguration(org.apache.hadoop.conf.Configuration conf) |
void |
registerListener(ProcedureExecutor.ProcedureExecutorListener listener) |
long |
registerNonce(NonceKey nonceKey)
Register a nonce for a procedure that is going to be submitted.
|
private void |
releaseLock(Procedure<TEnvironment> proc,
boolean force) |
boolean |
removeChore(ProcedureInMemoryChore<TEnvironment> chore)
Remove a chore procedure from the executor
|
void |
removeResult(long procId)
Mark the specified completed procedure, as ready to remove.
|
private void |
restoreLock(Procedure<TEnvironment> proc,
Set<Long> restored) |
private void |
restoreLocks() |
private void |
restoreLocks(Deque<Procedure<TEnvironment>> stack,
Set<Long> restored) |
private void |
sendProcedureAddedNotification(long procId) |
private void |
sendProcedureFinishedNotification(long procId) |
private void |
sendProcedureLoadedNotification(long procId) |
void |
setFailureResultForNonce(NonceKey nonceKey,
String procName,
User procOwner,
IOException exception)
If the failure failed before submitting it, we may want to give back the
same error to the requests with the same nonceKey.
|
void |
setKeepAliveTime(long keepAliveTime,
TimeUnit timeUnit) |
void |
startWorkers()
Start the workers.
|
void |
stop() |
private void |
submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) |
long |
submitProcedure(Procedure<TEnvironment> proc)
Add a new root-procedure to the executor.
|
long |
submitProcedure(Procedure<TEnvironment> proc,
NonceKey nonceKey)
Add a new root-procedure to the executor.
|
void |
submitProcedures(Procedure<TEnvironment>[] procs)
Add a set of new root-procedure to the executor.
|
boolean |
unregisterListener(ProcedureExecutor.ProcedureExecutorListener listener) |
void |
unregisterNonceIfProcedureWasNotSubmitted(NonceKey nonceKey)
Remove the NonceKey if the procedure was not submitted to the executor.
|
private void |
updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure,
Procedure<TEnvironment>[] subprocs) |
private void |
yieldProcedure(Procedure<TEnvironment> proc) |
private static final org.slf4j.Logger LOG
public static final String CHECK_OWNER_SET_CONF_KEY
private static final boolean DEFAULT_CHECK_OWNER_SET
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME
public static final String EVICT_TTL_CONF_KEY
static final int DEFAULT_EVICT_TTL
public static final String EVICT_ACKED_TTL_CONF_KEY
static final int DEFAULT_ACKED_EVICT_TTL
volatile ProcedureExecutor.Testing testing
testing
is non-null when ProcedureExecutor is being tested. Tests will try to
break PE having it fail at various junctures. When non-null, testing is set to an instance of
the below internal ProcedureExecutor.Testing
class with flags set for the particular test.private final ConcurrentHashMap<Long,CompletedProcedureRetainer<TEnvironment>> completed
private final ConcurrentHashMap<Long,RootProcedureState<TEnvironment>> rollbackStack
private final ConcurrentHashMap<Long,Procedure<TEnvironment>> procedures
private final ConcurrentHashMap<NonceKey,Long> nonceKeysToProcIdsMap
private final CopyOnWriteArrayList<ProcedureExecutor.ProcedureExecutorListener> listeners
private org.apache.hadoop.conf.Configuration conf
private ThreadGroup threadGroup
init(int, boolean)
method. Destroyed in join()
(FIX! Doing
resource handling rather than observing in a #join is unexpected).
Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
(Should be ok).private CopyOnWriteArrayList<ProcedureExecutor.WorkerThread> workerThreads
init(int, boolean)
method. Terminated in join()
(FIX! Doing
resource handling rather than observing in a #join is unexpected).
Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
(Should be ok).private TimeoutExecutorThread<TEnvironment> timeoutExecutor
init(int, boolean)
method. Terminated in join()
(FIX! Doing
resource handling rather than observing in a #join is unexpected).
Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
(Should be ok).private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor
private int corePoolSize
private int maxPoolSize
private volatile long keepAliveTime
private final ProcedureScheduler scheduler
private final Executor forceUpdateExecutor
private final AtomicLong lastProcId
private final AtomicLong workerId
private final AtomicInteger activeExecutorCount
private final AtomicBoolean running
private final TEnvironment environment
private final ProcedureStore store
private final boolean checkOwnerSet
private final IdLock procExecutionLock
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store)
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store, ProcedureScheduler scheduler)
private boolean isRootFinished(Procedure<?> proc)
private void forceUpdateProcedure(long procId) throws IOException
IOException
private void load(boolean abortOnCorruption) throws IOException
IOException
private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored)
private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored)
private void restoreLocks()
private void loadProcedures(ProcedureStore.ProcedureIterator procIter, boolean abortOnCorruption) throws IOException
IOException
public void init(int numThreads, boolean abortOnCorruption) throws IOException
numThreads
- number of threads available for procedure execution.abortOnCorruption
- true if you want to abort your service in case a corrupted procedure
is found on replay. otherwise false.IOException
public void startWorkers() throws IOException
IOException
public void stop()
public void join()
public void refreshConfiguration(org.apache.hadoop.conf.Configuration conf)
public boolean isRunning()
public int getWorkerThreadCount()
public int getCorePoolSize()
public int getActiveExecutorCount()
public TEnvironment getEnvironment()
public ProcedureStore getStore()
ProcedureScheduler getScheduler()
public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit)
public long getKeepAliveTime(TimeUnit timeUnit)
public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore)
chore
- the chore to addpublic boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore)
chore
- the chore to removepublic NonceKey createNonceKey(long nonceGroup, long nonce)
public long registerNonce(NonceKey nonceKey)
nonceKey
- A unique identifier for this operation from the client or process.public void unregisterNonceIfProcedureWasNotSubmitted(NonceKey nonceKey)
nonceKey
- A unique identifier for this operation from the client or process.public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, IOException exception)
nonceKey
- A unique identifier for this operation from the client or processprocName
- name of the procedure, used to inform the userprocOwner
- name of the owner of the procedure, used to inform the userexception
- the failure to report to the userpublic long submitProcedure(Procedure<TEnvironment> proc)
proc
- the new procedure to execute.public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, boolean recursive) throws IOException
A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is executing it 3. No child procedure has been submitted
If all the requirements are meet, the procedure and its ancestors will be bypassed and persisted to WAL.
If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What about WAITING_TIMEOUT?
pids
- the procedure idlockWait
- time to wait lockforce
- if force set to true, we will bypass the procedure even if it is executing.
This is for procedures which can't break out during executing(due to bug, mostly)
In this case, bypassing the procedure is not enough, since it is already stuck
there. We need to restart the master after bypassing, and letting the problematic
procedure to execute wth bypass=true, so in that condition, the procedure can be
successfully bypassed.recursive
- We will do an expensive search for children of each pid. EXPENSIVE!IOException
- IOExceptionboolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) throws IOException
IOException
public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey)
proc
- the new procedure to execute.nonceKey
- the registered unique identifier for this operation from the client or process.public void submitProcedures(Procedure<TEnvironment>[] procs)
procs
- the new procedures to execute.private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc)
private long pushProcedure(Procedure<TEnvironment> proc)
public boolean abort(long procId)
procId
- the procedure to abortpublic boolean abort(long procId, boolean mayInterruptIfRunning)
procId
- the procedure to abortmayInterruptIfRunning
- if the proc completed at least one step, should it be aborted?public Procedure<TEnvironment> getProcedure(long procId)
public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId)
public Procedure<TEnvironment> getResult(long procId)
public boolean isFinished(long procId)
procId
- the ID of the procedure to checkpublic boolean isStarted(long procId)
procId
- the ID of the procedure to checkpublic void removeResult(long procId)
procId
- the ID of the procedure to removepublic Procedure<TEnvironment> getResultOrProcedure(long procId)
public boolean isProcedureOwner(long procId, User user)
procId
- the target procedureuser
- the userpublic Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy()
getProcedures()
below for most cases as
it will do a copy, and also include the finished procedures.public List<Procedure<TEnvironment>> getProcedures()
public void registerListener(ProcedureExecutor.ProcedureExecutorListener listener)
public boolean unregisterListener(ProcedureExecutor.ProcedureExecutorListener listener)
private void sendProcedureLoadedNotification(long procId)
private void sendProcedureAddedNotification(long procId)
private void sendProcedureFinishedNotification(long procId)
private long nextProcId()
protected long getLastProcId()
public Set<Long> getActiveProcIds()
Long getRootProcedureId(Procedure<TEnvironment> proc)
private void executeProcedure(Procedure<TEnvironment> proc)
private Procedure.LockState acquireLock(Procedure<TEnvironment> proc)
private void releaseLock(Procedure<TEnvironment> proc, boolean force)
private Procedure.LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack)
private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc)
private Procedure.LockState executeRollback(Procedure<TEnvironment> proc)
private void yieldProcedure(Procedure<TEnvironment> proc)
private void execProcedure(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure)
procedure
private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs)
private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs)
private void countDownChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure)
private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs)
private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e)
private void execCompletionCleanup(Procedure<TEnvironment> proc)
private void procedureFinished(Procedure<TEnvironment> proc)
RootProcedureState<TEnvironment> getProcStack(long rootProcId)
ProcedureScheduler getProcedureScheduler()
int getCompletedSize()
public IdLock getProcExecutionLock()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.