@InterfaceAudience.Private public abstract class RpcExecutor extends Object
dispatch(CallRunner)
. Subclass and add particular
scheduling behavior.Modifier and Type | Class and Description |
---|---|
private static class |
RpcExecutor.CallPriorityComparator
Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
protected class |
RpcExecutor.Handler
Handler thread run the
CallRunner.run() in. |
static class |
RpcExecutor.QueueBalancer |
private static class |
RpcExecutor.RandomQueueBalancer
Queue balancer that just randomly selects a queue in the range [0, num queues).
|
Constructor and Description |
---|
RpcExecutor(String name,
int handlerCount,
int maxQueueLength,
PriorityFunction priority,
org.apache.hadoop.conf.Configuration conf,
Abortable abortable) |
RpcExecutor(String name,
int handlerCount,
String callQueueType,
int maxQueueLength,
PriorityFunction priority,
org.apache.hadoop.conf.Configuration conf,
Abortable abortable) |
Modifier and Type | Method and Description |
---|---|
protected int |
computeNumCallQueues(int handlerCount,
float callQueuesHandlersFactor) |
abstract boolean |
dispatch(CallRunner callTask)
Add the request to the executor queue
|
int |
getActiveHandlerCount() |
int |
getActiveReadHandlerCount() |
int |
getActiveScanHandlerCount() |
int |
getActiveWriteHandlerCount() |
static RpcExecutor.QueueBalancer |
getBalancer(int queueSize) |
Map<String,Long> |
getCallQueueCountsSummary() |
Map<String,Long> |
getCallQueueSizeSummary() |
protected RpcExecutor.Handler |
getHandler(String name,
double handlerFailureThreshhold,
BlockingQueue<CallRunner> q,
AtomicInteger activeHandlerCount)
Override if providing alternate Handler implementation.
|
String |
getName() |
long |
getNumGeneralCallsDropped() |
long |
getNumLifoModeSwitches() |
int |
getQueueLength()
Returns the length of the pending queue
|
protected List<BlockingQueue<CallRunner>> |
getQueues()
Returns the list of request queues
|
int |
getReadQueueLength() |
int |
getScanQueueLength() |
int |
getWriteQueueLength() |
protected void |
initializeQueues(int numQueues) |
static boolean |
isCodelQueueType(String callQueueType) |
static boolean |
isDeadlineQueueType(String callQueueType) |
static boolean |
isFifoQueueType(String callQueueType) |
void |
onConfigurationChange(org.apache.hadoop.conf.Configuration conf) |
void |
resizeQueues(org.apache.hadoop.conf.Configuration conf)
Update current soft limit for executor's call queues
|
void |
start(int port) |
protected void |
startHandlers(int port) |
protected void |
startHandlers(String nameSuffix,
int numHandlers,
List<BlockingQueue<CallRunner>> callQueues,
int qindex,
int qsize,
int port,
AtomicInteger activeHandlerCount)
Start up our handlers.
|
void |
stop() |
private static final org.slf4j.Logger LOG
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE
public static final String CALL_QUEUE_TYPE_CONF_KEY
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT
public static final String CALL_QUEUE_CODEL_TARGET_DELAY
public static final String CALL_QUEUE_CODEL_INTERVAL
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD
private LongAdder numGeneralCallsDropped
private LongAdder numLifoModeSwitches
protected final int numCallQueues
protected final List<BlockingQueue<CallRunner>> queues
private final Class<? extends BlockingQueue> queueClass
private final Object[] queueInitArgs
private final PriorityFunction priority
protected volatile int currentQueueLimit
private final AtomicInteger activeHandlerCount
private final List<RpcExecutor.Handler> handlers
private final int handlerCount
private final AtomicInteger failedHandlerCount
private boolean running
private org.apache.hadoop.conf.Configuration conf
private static RpcExecutor.QueueBalancer ONE_QUEUE
public RpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, org.apache.hadoop.conf.Configuration conf, Abortable abortable)
public RpcExecutor(String name, int handlerCount, String callQueueType, int maxQueueLength, PriorityFunction priority, org.apache.hadoop.conf.Configuration conf, Abortable abortable)
protected int computeNumCallQueues(int handlerCount, float callQueuesHandlersFactor)
public Map<String,Long> getCallQueueCountsSummary()
public Map<String,Long> getCallQueueSizeSummary()
protected void initializeQueues(int numQueues)
public void start(int port)
public void stop()
public abstract boolean dispatch(CallRunner callTask) throws InterruptedException
InterruptedException
protected List<BlockingQueue<CallRunner>> getQueues()
protected void startHandlers(int port)
protected RpcExecutor.Handler getHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount)
protected void startHandlers(String nameSuffix, int numHandlers, List<BlockingQueue<CallRunner>> callQueues, int qindex, int qsize, int port, AtomicInteger activeHandlerCount)
public static RpcExecutor.QueueBalancer getBalancer(int queueSize)
public static boolean isDeadlineQueueType(String callQueueType)
public static boolean isCodelQueueType(String callQueueType)
public static boolean isFifoQueueType(String callQueueType)
public long getNumGeneralCallsDropped()
public long getNumLifoModeSwitches()
public int getActiveHandlerCount()
public int getActiveWriteHandlerCount()
public int getActiveReadHandlerCount()
public int getActiveScanHandlerCount()
public int getQueueLength()
public int getReadQueueLength()
public int getScanQueueLength()
public int getWriteQueueLength()
public void resizeQueues(org.apache.hadoop.conf.Configuration conf)
conf
- updated configurationpublic void onConfigurationChange(org.apache.hadoop.conf.Configuration conf)
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.