001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Locale; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.LongAdder; 031import java.util.function.Function; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.conf.ConfigurationObserver; 038import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.util.ReflectionUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 049 050/** 051 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular 052 * scheduling behavior. 053 */ 054@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 055@InterfaceStability.Evolving 056public abstract class RpcExecutor { 057 private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); 058 059 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; 060 protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f; 061 protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1; 062 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = 063 "hbase.ipc.server.callqueue.handler.factor"; 064 065 /** max delay in msec used to bound the de-prioritized requests */ 066 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = 067 "hbase.ipc.server.queue.max.call.delay"; 068 069 /** 070 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority 071 * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced 072 * throughput. 073 */ 074 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; 075 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; 076 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; 077 public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; 078 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; 079 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; 080 081 public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = 082 "hbase.ipc.server.callqueue.balancer.class"; 083 public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; 084 085 // These 3 are only used by Codel executor 086 public static final String CALL_QUEUE_CODEL_TARGET_DELAY = 087 "hbase.ipc.server.callqueue.codel.target.delay"; 088 public static final String CALL_QUEUE_CODEL_INTERVAL = 089 "hbase.ipc.server.callqueue.codel.interval"; 090 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = 091 "hbase.ipc.server.callqueue.codel.lifo.threshold"; 092 093 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; 094 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; 095 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; 096 097 public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = 098 "hbase.ipc.server.callqueue.pluggable.queue.class.name"; 099 public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = 100 "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; 101 102 private final LongAdder numGeneralCallsDropped = new LongAdder(); 103 private final LongAdder numLifoModeSwitches = new LongAdder(); 104 105 protected final int numCallQueues; 106 protected final List<BlockingQueue<CallRunner>> queues; 107 private final Class<? extends BlockingQueue> queueClass; 108 private final Object[] queueInitArgs; 109 110 protected volatile int currentQueueLimit; 111 112 private final AtomicInteger activeHandlerCount = new AtomicInteger(0); 113 private final List<RpcHandler> handlers; 114 private final int handlerCount; 115 private final AtomicInteger failedHandlerCount = new AtomicInteger(0); 116 117 private String name; 118 119 private final Configuration conf; 120 private final Abortable abortable; 121 122 public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, 123 final PriorityFunction priority, final Configuration conf, final Abortable abortable) { 124 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), 125 maxQueueLength, priority, conf, abortable); 126 } 127 128 public RpcExecutor(final String name, final int handlerCount, final String callQueueType, 129 int maxQueueLength, final PriorityFunction priority, final Configuration conf, 130 final Abortable abortable) { 131 this.name = Strings.nullToEmpty(name); 132 this.conf = conf; 133 this.abortable = abortable; 134 135 float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf); 136 if ( 137 Float.compare(callQueuesHandlersFactor, 1.0f) > 0 138 || Float.compare(0.0f, callQueuesHandlersFactor) > 0 139 ) { 140 LOG.warn( 141 CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " is *ILLEGAL*, it should be in range [0.0, 1.0]"); 142 // For callQueuesHandlersFactor > 1.0, we just set it 1.0f. 143 if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) { 144 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " 1.0f"); 145 callQueuesHandlersFactor = 1.0f; 146 } else { 147 // But for callQueuesHandlersFactor < 0.0, following method #computeNumCallQueues 148 // will compute max(1, -x) => 1 which has same effect of default value. 149 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " default value 0.0f"); 150 } 151 } 152 this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); 153 this.queues = new ArrayList<>(this.numCallQueues); 154 155 this.handlerCount = Math.max(handlerCount, this.numCallQueues); 156 this.handlers = new ArrayList<>(this.handlerCount); 157 158 // If soft limit of queue is not provided, then calculate using 159 // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER 160 if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) { 161 int handlerCountPerQueue = this.handlerCount / this.numCallQueues; 162 maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; 163 } 164 165 if (isDeadlineQueueType(callQueueType)) { 166 this.name += ".Deadline"; 167 this.queueInitArgs = 168 new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) }; 169 this.queueClass = BoundedPriorityBlockingQueue.class; 170 } else if (isCodelQueueType(callQueueType)) { 171 this.name += ".Codel"; 172 int codelTargetDelay = 173 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 174 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 175 double codelLifoThreshold = 176 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 177 this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, 178 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; 179 this.queueClass = AdaptiveLifoCoDelCallQueue.class; 180 } else if (isPluggableQueueType(callQueueType)) { 181 Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = 182 getPluggableQueueClass(); 183 184 if (!pluggableQueueClass.isPresent()) { 185 throw new PluggableRpcQueueNotFound( 186 "Pluggable call queue failed to load and selected call" + " queue type required"); 187 } else { 188 this.queueInitArgs = new Object[] { maxQueueLength, priority, conf }; 189 this.queueClass = pluggableQueueClass.get(); 190 } 191 } else { 192 this.name += ".Fifo"; 193 this.queueInitArgs = new Object[] { maxQueueLength }; 194 this.queueClass = LinkedBlockingQueue.class; 195 } 196 197 LOG.info( 198 "Instantiated {} with queueClass={}; " 199 + "numCallQueues={}, maxQueueLength={}, handlerCount={}", 200 this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); 201 } 202 203 protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { 204 return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); 205 } 206 207 /** 208 * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown". 209 */ 210 private static String getMethodName(final CallRunner callRunner) { 211 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getMethod) 212 .map(Descriptors.MethodDescriptor::getName).orElse("Unknown"); 213 } 214 215 /** 216 * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L. 217 */ 218 private static long getRpcCallSize(final CallRunner callRunner) { 219 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getSize) 220 .orElse(0L); 221 } 222 223 public Map<String, Long> getCallQueueCountsSummary() { 224 return queues.stream().flatMap(Collection::stream).map(RpcExecutor::getMethodName) 225 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); 226 } 227 228 public Map<String, Long> getCallQueueSizeSummary() { 229 return queues.stream().flatMap(Collection::stream) 230 .map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner))) 231 .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); 232 } 233 234 // This method can only be called ONCE per executor instance. 235 // Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity) 236 // After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original 237 // soft limit. 238 // Multiple calls would incorrectly use the hard limit as the soft limit. 239 // As all the queues has same initArgs and queueClass, there should be no need to call this again. 240 protected void initializeQueues(final int numQueues) { 241 if (!queues.isEmpty()) { 242 throw new RuntimeException("Queues are already initialized"); 243 } 244 if (queueInitArgs.length > 0) { 245 currentQueueLimit = (int) queueInitArgs[0]; 246 queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 247 } 248 for (int i = 0; i < numQueues; ++i) { 249 queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); 250 } 251 } 252 253 public void start(final int port) { 254 startHandlers(port); 255 } 256 257 public void stop() { 258 for (RpcHandler handler : handlers) { 259 handler.stopRunning(); 260 handler.interrupt(); 261 } 262 } 263 264 /** Add the request to the executor queue */ 265 public abstract boolean dispatch(final CallRunner callTask); 266 267 /** Returns the list of request queues */ 268 protected List<BlockingQueue<CallRunner>> getQueues() { 269 return queues; 270 } 271 272 protected void startHandlers(final int port) { 273 List<BlockingQueue<CallRunner>> callQueues = getQueues(); 274 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); 275 } 276 277 /** 278 * Override if providing alternate Handler implementation. 279 */ 280 protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, 281 final int handlerCount, final BlockingQueue<CallRunner> q, 282 final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, 283 final Abortable abortable) { 284 return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, 285 failedHandlerCount, abortable); 286 } 287 288 /** 289 * Start up our handlers. 290 */ 291 protected void startHandlers(final String nameSuffix, final int numHandlers, 292 final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, 293 final int port, final AtomicInteger activeHandlerCount) { 294 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); 295 double handlerFailureThreshhold = conf == null 296 ? 1.0 297 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 298 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 299 for (int i = 0; i < numHandlers; i++) { 300 final int index = qindex + (i % qsize); 301 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index 302 + ",port=" + port; 303 RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount, 304 callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable); 305 handler.start(); 306 handlers.add(handler); 307 } 308 LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", 309 handlers.size(), threadPrefix, qsize, port); 310 } 311 312 /** 313 * All requests go to the first queue, at index 0 314 */ 315 private static final QueueBalancer ONE_QUEUE = val -> 0; 316 317 protected static QueueBalancer getBalancer(final String executorName, final Configuration conf, 318 final List<BlockingQueue<CallRunner>> queues) { 319 Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); 320 if (queues.size() == 1) { 321 return ONE_QUEUE; 322 } else { 323 Class<?> balancerClass = 324 conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); 325 return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); 326 } 327 } 328 329 /** 330 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It 331 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have 332 * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out) 333 * manner. 334 */ 335 private static class CallPriorityComparator implements Comparator<CallRunner> { 336 private final static int DEFAULT_MAX_CALL_DELAY = 5000; 337 338 private final PriorityFunction priority; 339 private final int maxDelay; 340 341 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { 342 this.priority = priority; 343 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); 344 } 345 346 @Override 347 public int compare(CallRunner a, CallRunner b) { 348 RpcCall callA = a.getRpcCall(); 349 RpcCall callB = b.getRpcCall(); 350 long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); 351 long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); 352 deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); 353 deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); 354 return Long.compare(deadlineA, deadlineB); 355 } 356 } 357 358 public static boolean isDeadlineQueueType(final String callQueueType) { 359 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 360 } 361 362 public static boolean isCodelQueueType(final String callQueueType) { 363 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 364 } 365 366 public static boolean isFifoQueueType(final String callQueueType) { 367 return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 368 } 369 370 public static boolean isPluggableQueueType(String callQueueType) { 371 return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 372 } 373 374 public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) { 375 return isPluggableQueueType(callQueueType) 376 && conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false); 377 } 378 379 private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() { 380 String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME); 381 382 if (queueClassName == null) { 383 LOG.error( 384 "Pluggable queue class config at " + PLUGGABLE_CALL_QUEUE_CLASS_NAME + " was not found"); 385 return Optional.empty(); 386 } 387 388 try { 389 Class<?> clazz = Class.forName(queueClassName); 390 391 if (BlockingQueue.class.isAssignableFrom(clazz)) { 392 return Optional.of((Class<? extends BlockingQueue<CallRunner>>) clazz); 393 } else { 394 LOG.error( 395 "Pluggable Queue class " + queueClassName + " does not extend BlockingQueue<CallRunner>"); 396 return Optional.empty(); 397 } 398 } catch (ClassNotFoundException exception) { 399 LOG.error("Could not find " + queueClassName + " on the classpath to load."); 400 return Optional.empty(); 401 } 402 } 403 404 public long getNumGeneralCallsDropped() { 405 return numGeneralCallsDropped.longValue(); 406 } 407 408 public long getNumLifoModeSwitches() { 409 return numLifoModeSwitches.longValue(); 410 } 411 412 public int getActiveHandlerCount() { 413 return activeHandlerCount.get(); 414 } 415 416 public int getActiveWriteHandlerCount() { 417 return 0; 418 } 419 420 public int getActiveReadHandlerCount() { 421 return 0; 422 } 423 424 public int getActiveScanHandlerCount() { 425 return 0; 426 } 427 428 /** Returns the length of the pending queue */ 429 public int getQueueLength() { 430 int length = 0; 431 for (final BlockingQueue<CallRunner> queue : queues) { 432 length += queue.size(); 433 } 434 return length; 435 } 436 437 public int getReadQueueLength() { 438 return 0; 439 } 440 441 public int getScanQueueLength() { 442 return 0; 443 } 444 445 public int getWriteQueueLength() { 446 return 0; 447 } 448 449 public String getName() { 450 return this.name; 451 } 452 453 /** 454 * Update current soft limit for executor's call queues 455 * @param conf updated configuration 456 */ 457 public void resizeQueues(Configuration conf) { 458 String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; 459 if (name != null) { 460 if (name.toLowerCase(Locale.ROOT).contains("priority")) { 461 configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; 462 } else if (name.toLowerCase(Locale.ROOT).contains("replication")) { 463 configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH; 464 } else if (name.toLowerCase(Locale.ROOT).contains("bulkload")) { 465 configKey = RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH; 466 } 467 } 468 final int queueLimit = currentQueueLimit; 469 currentQueueLimit = conf.getInt(configKey, queueLimit); 470 } 471 472 public void onConfigurationChange(Configuration conf) { 473 // update CoDel Scheduler tunables 474 int codelTargetDelay = 475 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 476 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 477 double codelLifoThreshold = 478 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 479 480 for (BlockingQueue<CallRunner> queue : queues) { 481 if (queue instanceof AdaptiveLifoCoDelCallQueue) { 482 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, 483 codelLifoThreshold); 484 } else if (queue instanceof ConfigurationObserver) { 485 ((ConfigurationObserver) queue).onConfigurationChange(conf); 486 } 487 } 488 } 489 490 protected float getCallQueueHandlerFactor(Configuration conf) { 491 return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, DEFAULT_CALL_QUEUE_HANDLER_FACTOR); 492 } 493}