001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Locale; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.LongAdder; 031import java.util.function.Function; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.conf.ConfigurationObserver; 038import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.util.ReflectionUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 049 050/** 051 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular 052 * scheduling behavior. 053 */ 054@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 055@InterfaceStability.Evolving 056public abstract class RpcExecutor { 057 private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); 058 059 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; 060 protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f; 061 protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1; 062 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = 063 "hbase.ipc.server.callqueue.handler.factor"; 064 065 /** max delay in msec used to bound the de-prioritized requests */ 066 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = 067 "hbase.ipc.server.queue.max.call.delay"; 068 069 /** 070 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority 071 * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced 072 * throughput. 073 */ 074 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; 075 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; 076 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; 077 public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; 078 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; 079 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; 080 081 public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = 082 "hbase.ipc.server.callqueue.balancer.class"; 083 public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; 084 085 // These 3 are only used by Codel executor 086 public static final String CALL_QUEUE_CODEL_TARGET_DELAY = 087 "hbase.ipc.server.callqueue.codel.target.delay"; 088 public static final String CALL_QUEUE_CODEL_INTERVAL = 089 "hbase.ipc.server.callqueue.codel.interval"; 090 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = 091 "hbase.ipc.server.callqueue.codel.lifo.threshold"; 092 093 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5; 094 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; 095 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; 096 097 public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = 098 "hbase.ipc.server.callqueue.pluggable.queue.class.name"; 099 public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = 100 "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; 101 102 private final LongAdder numGeneralCallsDropped = new LongAdder(); 103 private final LongAdder numLifoModeSwitches = new LongAdder(); 104 105 protected final int numCallQueues; 106 protected final List<BlockingQueue<CallRunner>> queues; 107 private final Class<? extends BlockingQueue> queueClass; 108 private final Object[] queueInitArgs; 109 110 // this is soft limit of the queue, not size/capacity. 111 protected volatile int currentQueueLimit; 112 // While initializing we will use hard limit as the capacity of queue, it will let us dynamically 113 // change the queue limit 114 protected final int queueHardLimit; 115 116 private final AtomicInteger activeHandlerCount = new AtomicInteger(0); 117 private final List<RpcHandler> handlers; 118 private final int handlerCount; 119 private final AtomicInteger failedHandlerCount = new AtomicInteger(0); 120 121 private String name; 122 123 private final Configuration conf; 124 private final Abortable abortable; 125 126 public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, 127 final PriorityFunction priority, final Configuration conf, final Abortable abortable) { 128 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), 129 maxQueueLength, priority, conf, abortable); 130 } 131 132 public RpcExecutor(final String name, final int handlerCount, final String callQueueType, 133 int maxQueueLength, final PriorityFunction priority, final Configuration conf, 134 final Abortable abortable) { 135 this.name = Strings.nullToEmpty(name); 136 this.conf = conf; 137 this.abortable = abortable; 138 139 float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf); 140 if ( 141 Float.compare(callQueuesHandlersFactor, 1.0f) > 0 142 || Float.compare(0.0f, callQueuesHandlersFactor) > 0 143 ) { 144 LOG.warn( 145 CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " is *ILLEGAL*, it should be in range [0.0, 1.0]"); 146 // For callQueuesHandlersFactor > 1.0, we just set it 1.0f. 147 if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) { 148 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " 1.0f"); 149 callQueuesHandlersFactor = 1.0f; 150 } else { 151 // But for callQueuesHandlersFactor < 0.0, following method #computeNumCallQueues 152 // will compute max(1, -x) => 1 which has same effect of default value. 153 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " default value 0.0f"); 154 } 155 } 156 this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); 157 this.queues = new ArrayList<>(this.numCallQueues); 158 159 this.handlerCount = Math.max(handlerCount, this.numCallQueues); 160 this.handlers = new ArrayList<>(this.handlerCount); 161 162 // If soft limit of queue is not provided, then calculate using 163 // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER 164 if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) { 165 int handlerCountPerQueue = this.handlerCount / this.numCallQueues; 166 maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; 167 } 168 currentQueueLimit = maxQueueLength; 169 queueHardLimit = Math.max(maxQueueLength, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 170 171 if (isDeadlineQueueType(callQueueType)) { 172 this.name += ".Deadline"; 173 this.queueInitArgs = 174 new Object[] { queueHardLimit, new CallPriorityComparator(conf, priority) }; 175 this.queueClass = BoundedPriorityBlockingQueue.class; 176 } else if (isCodelQueueType(callQueueType)) { 177 this.name += ".Codel"; 178 int codelTargetDelay = 179 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 180 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 181 double codelLifoThreshold = 182 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 183 this.queueInitArgs = new Object[] { queueHardLimit, codelTargetDelay, codelInterval, 184 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches, currentQueueLimit }; 185 this.queueClass = AdaptiveLifoCoDelCallQueue.class; 186 } else if (isPluggableQueueType(callQueueType)) { 187 Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = 188 getPluggableQueueClass(); 189 190 if (!pluggableQueueClass.isPresent()) { 191 throw new PluggableRpcQueueNotFound( 192 "Pluggable call queue failed to load and selected call" + " queue type required"); 193 } else { 194 this.queueInitArgs = new Object[] { queueHardLimit, priority, conf }; 195 this.queueClass = pluggableQueueClass.get(); 196 } 197 } else { 198 this.name += ".Fifo"; 199 this.queueInitArgs = new Object[] { queueHardLimit }; 200 this.queueClass = LinkedBlockingQueue.class; 201 } 202 203 LOG.info( 204 "Instantiated {} with queueClass={}; " 205 + "numCallQueues={}, maxQueueLength={}, handlerCount={}", 206 this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); 207 } 208 209 protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { 210 return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); 211 } 212 213 /** 214 * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown". 215 */ 216 private static String getMethodName(final CallRunner callRunner) { 217 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getMethod) 218 .map(Descriptors.MethodDescriptor::getName).orElse("Unknown"); 219 } 220 221 /** 222 * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L. 223 */ 224 private static long getRpcCallSize(final CallRunner callRunner) { 225 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getSize) 226 .orElse(0L); 227 } 228 229 public Map<String, Long> getCallQueueCountsSummary() { 230 return queues.stream().flatMap(Collection::stream).map(RpcExecutor::getMethodName) 231 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); 232 } 233 234 public Map<String, Long> getCallQueueSizeSummary() { 235 return queues.stream().flatMap(Collection::stream) 236 .map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner))) 237 .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); 238 } 239 240 protected void initializeQueues(final int numQueues) { 241 for (int i = 0; i < numQueues; ++i) { 242 queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); 243 } 244 } 245 246 public void start(final int port) { 247 startHandlers(port); 248 } 249 250 public void stop() { 251 for (RpcHandler handler : handlers) { 252 handler.stopRunning(); 253 handler.interrupt(); 254 } 255 } 256 257 /** Add the request to the executor queue */ 258 public abstract boolean dispatch(final CallRunner callTask); 259 260 /** Returns the list of request queues */ 261 protected List<BlockingQueue<CallRunner>> getQueues() { 262 return queues; 263 } 264 265 protected void startHandlers(final int port) { 266 List<BlockingQueue<CallRunner>> callQueues = getQueues(); 267 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); 268 } 269 270 /** 271 * Override if providing alternate Handler implementation. 272 */ 273 protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, 274 final int handlerCount, final BlockingQueue<CallRunner> q, 275 final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, 276 final Abortable abortable) { 277 return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, 278 failedHandlerCount, abortable); 279 } 280 281 /** 282 * Start up our handlers. 283 */ 284 protected void startHandlers(final String nameSuffix, final int numHandlers, 285 final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, 286 final int port, final AtomicInteger activeHandlerCount) { 287 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); 288 double handlerFailureThreshhold = conf == null 289 ? 1.0 290 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 291 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 292 for (int i = 0; i < numHandlers; i++) { 293 final int index = qindex + (i % qsize); 294 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index 295 + ",port=" + port; 296 RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount, 297 callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable); 298 handler.start(); 299 handlers.add(handler); 300 } 301 LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", 302 handlers.size(), threadPrefix, qsize, port); 303 } 304 305 /** 306 * All requests go to the first queue, at index 0 307 */ 308 private static final QueueBalancer ONE_QUEUE = val -> 0; 309 310 protected static QueueBalancer getBalancer(final String executorName, final Configuration conf, 311 final List<BlockingQueue<CallRunner>> queues) { 312 Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); 313 if (queues.size() == 1) { 314 return ONE_QUEUE; 315 } else { 316 Class<?> balancerClass = 317 conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); 318 return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); 319 } 320 } 321 322 /** 323 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It 324 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have 325 * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out) 326 * manner. 327 */ 328 private static class CallPriorityComparator implements Comparator<CallRunner> { 329 private final static int DEFAULT_MAX_CALL_DELAY = 5000; 330 331 private final PriorityFunction priority; 332 private final int maxDelay; 333 334 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { 335 this.priority = priority; 336 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); 337 } 338 339 @Override 340 public int compare(CallRunner a, CallRunner b) { 341 RpcCall callA = a.getRpcCall(); 342 RpcCall callB = b.getRpcCall(); 343 long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); 344 long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); 345 deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); 346 deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); 347 return Long.compare(deadlineA, deadlineB); 348 } 349 } 350 351 public static boolean isDeadlineQueueType(final String callQueueType) { 352 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 353 } 354 355 public static boolean isCodelQueueType(final String callQueueType) { 356 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 357 } 358 359 public static boolean isFifoQueueType(final String callQueueType) { 360 return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 361 } 362 363 public static boolean isPluggableQueueType(String callQueueType) { 364 return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 365 } 366 367 public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) { 368 return isPluggableQueueType(callQueueType) 369 && conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false); 370 } 371 372 private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() { 373 String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME); 374 375 if (queueClassName == null) { 376 LOG.error( 377 "Pluggable queue class config at " + PLUGGABLE_CALL_QUEUE_CLASS_NAME + " was not found"); 378 return Optional.empty(); 379 } 380 381 try { 382 Class<?> clazz = Class.forName(queueClassName); 383 384 if (BlockingQueue.class.isAssignableFrom(clazz)) { 385 return Optional.of((Class<? extends BlockingQueue<CallRunner>>) clazz); 386 } else { 387 LOG.error( 388 "Pluggable Queue class " + queueClassName + " does not extend BlockingQueue<CallRunner>"); 389 return Optional.empty(); 390 } 391 } catch (ClassNotFoundException exception) { 392 LOG.error("Could not find " + queueClassName + " on the classpath to load."); 393 return Optional.empty(); 394 } 395 } 396 397 public long getNumGeneralCallsDropped() { 398 return numGeneralCallsDropped.longValue(); 399 } 400 401 public long getNumLifoModeSwitches() { 402 return numLifoModeSwitches.longValue(); 403 } 404 405 public int getActiveHandlerCount() { 406 return activeHandlerCount.get(); 407 } 408 409 public int getActiveWriteHandlerCount() { 410 return 0; 411 } 412 413 public int getActiveReadHandlerCount() { 414 return 0; 415 } 416 417 public int getActiveScanHandlerCount() { 418 return 0; 419 } 420 421 /** Returns the length of the pending queue */ 422 public int getQueueLength() { 423 int length = 0; 424 for (final BlockingQueue<CallRunner> queue : queues) { 425 length += queue.size(); 426 } 427 return length; 428 } 429 430 public int getReadQueueLength() { 431 return 0; 432 } 433 434 public int getScanQueueLength() { 435 return 0; 436 } 437 438 public int getWriteQueueLength() { 439 return 0; 440 } 441 442 public String getName() { 443 return this.name; 444 } 445 446 /** 447 * Update current soft limit for executor's call queues 448 * @param conf updated configuration 449 */ 450 public void resizeQueues(Configuration conf) { 451 String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; 452 if (name != null) { 453 if (name.toLowerCase(Locale.ROOT).contains("priority")) { 454 configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; 455 } else if (name.toLowerCase(Locale.ROOT).contains("replication")) { 456 configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH; 457 } else if (name.toLowerCase(Locale.ROOT).contains("bulkload")) { 458 configKey = RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH; 459 } 460 } 461 final int queueLimit = currentQueueLimit; 462 int newQueueLimit = conf.getInt(configKey, queueLimit); 463 if (newQueueLimit > queueHardLimit) { 464 LOG.warn( 465 "Requested soft limit {} exceeds queue hard limit/capacity {}. " 466 + "A region server restart is required to grow the underlying queue.", 467 newQueueLimit, queueHardLimit); 468 newQueueLimit = currentQueueLimit; 469 } 470 currentQueueLimit = newQueueLimit; 471 } 472 473 public void onConfigurationChange(Configuration conf) { 474 // update CoDel Scheduler tunables 475 int codelTargetDelay = 476 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 477 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 478 double codelLifoThreshold = 479 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 480 481 for (BlockingQueue<CallRunner> queue : queues) { 482 if (queue instanceof AdaptiveLifoCoDelCallQueue) { 483 // current queue Limit for executor is already updated as part of resizeQueues, we need to 484 // let codel queue also make aware of it 485 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, 486 codelLifoThreshold, currentQueueLimit); 487 } else if (queue instanceof ConfigurationObserver) { 488 ((ConfigurationObserver) queue).onConfigurationChange(conf); 489 } 490 } 491 } 492 493 protected float getCallQueueHandlerFactor(Configuration conf) { 494 return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, DEFAULT_CALL_QUEUE_HANDLER_FACTOR); 495 } 496}