001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Locale; 025import java.util.Map; 026import java.util.Optional; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.LongAdder; 031import java.util.function.Function; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseInterfaceAudience; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.conf.ConfigurationObserver; 038import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.util.ReflectionUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 049 050/** 051 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular 052 * scheduling behavior. 053 */ 054@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 055@InterfaceStability.Evolving 056public abstract class RpcExecutor { 057 private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); 058 059 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; 060 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = 061 "hbase.ipc.server.callqueue.handler.factor"; 062 063 /** max delay in msec used to bound the de-prioritized requests */ 064 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = 065 "hbase.ipc.server.queue.max.call.delay"; 066 067 /** 068 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority 069 * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced 070 * throughput. 071 */ 072 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; 073 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; 074 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; 075 public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; 076 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; 077 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; 078 079 public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = 080 "hbase.ipc.server.callqueue.balancer.class"; 081 public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; 082 083 // These 3 are only used by Codel executor 084 public static final String CALL_QUEUE_CODEL_TARGET_DELAY = 085 "hbase.ipc.server.callqueue.codel.target.delay"; 086 public static final String CALL_QUEUE_CODEL_INTERVAL = 087 "hbase.ipc.server.callqueue.codel.interval"; 088 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = 089 "hbase.ipc.server.callqueue.codel.lifo.threshold"; 090 091 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; 092 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; 093 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; 094 095 public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = 096 "hbase.ipc.server.callqueue.pluggable.queue.class.name"; 097 public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = 098 "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; 099 100 private final LongAdder numGeneralCallsDropped = new LongAdder(); 101 private final LongAdder numLifoModeSwitches = new LongAdder(); 102 103 protected final int numCallQueues; 104 protected final List<BlockingQueue<CallRunner>> queues; 105 private final Class<? extends BlockingQueue> queueClass; 106 private final Object[] queueInitArgs; 107 108 protected volatile int currentQueueLimit; 109 110 private final AtomicInteger activeHandlerCount = new AtomicInteger(0); 111 private final List<RpcHandler> handlers; 112 private final int handlerCount; 113 private final AtomicInteger failedHandlerCount = new AtomicInteger(0); 114 115 private String name; 116 117 private final Configuration conf; 118 private final Abortable abortable; 119 120 public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, 121 final PriorityFunction priority, final Configuration conf, final Abortable abortable) { 122 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), 123 maxQueueLength, priority, conf, abortable); 124 } 125 126 public RpcExecutor(final String name, final int handlerCount, final String callQueueType, 127 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 128 final Abortable abortable) { 129 this.name = Strings.nullToEmpty(name); 130 this.conf = conf; 131 this.abortable = abortable; 132 133 float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); 134 if ( 135 Float.compare(callQueuesHandlersFactor, 1.0f) > 0 136 || Float.compare(0.0f, callQueuesHandlersFactor) > 0 137 ) { 138 LOG.warn( 139 CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " is *ILLEGAL*, it should be in range [0.0, 1.0]"); 140 // For callQueuesHandlersFactor > 1.0, we just set it 1.0f. 141 if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) { 142 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " 1.0f"); 143 callQueuesHandlersFactor = 1.0f; 144 } else { 145 // But for callQueuesHandlersFactor < 0.0, following method #computeNumCallQueues 146 // will compute max(1, -x) => 1 which has same effect of default value. 147 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " default value 0.0f"); 148 } 149 } 150 this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); 151 this.queues = new ArrayList<>(this.numCallQueues); 152 153 this.handlerCount = Math.max(handlerCount, this.numCallQueues); 154 this.handlers = new ArrayList<>(this.handlerCount); 155 156 if (isDeadlineQueueType(callQueueType)) { 157 this.name += ".Deadline"; 158 this.queueInitArgs = 159 new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) }; 160 this.queueClass = BoundedPriorityBlockingQueue.class; 161 } else if (isCodelQueueType(callQueueType)) { 162 this.name += ".Codel"; 163 int codelTargetDelay = 164 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 165 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 166 double codelLifoThreshold = 167 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 168 this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, 169 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; 170 this.queueClass = AdaptiveLifoCoDelCallQueue.class; 171 } else if (isPluggableQueueType(callQueueType)) { 172 Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = 173 getPluggableQueueClass(); 174 175 if (!pluggableQueueClass.isPresent()) { 176 throw new PluggableRpcQueueNotFound( 177 "Pluggable call queue failed to load and selected call" + " queue type required"); 178 } else { 179 this.queueInitArgs = new Object[] { maxQueueLength, priority, conf }; 180 this.queueClass = pluggableQueueClass.get(); 181 } 182 } else { 183 this.name += ".Fifo"; 184 this.queueInitArgs = new Object[] { maxQueueLength }; 185 this.queueClass = LinkedBlockingQueue.class; 186 } 187 188 LOG.info( 189 "Instantiated {} with queueClass={}; " 190 + "numCallQueues={}, maxQueueLength={}, handlerCount={}", 191 this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); 192 } 193 194 protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { 195 return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); 196 } 197 198 /** 199 * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown". 200 */ 201 private static String getMethodName(final CallRunner callRunner) { 202 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getMethod) 203 .map(Descriptors.MethodDescriptor::getName).orElse("Unknown"); 204 } 205 206 /** 207 * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L. 208 */ 209 private static long getRpcCallSize(final CallRunner callRunner) { 210 return Optional.ofNullable(callRunner).map(CallRunner::getRpcCall).map(RpcCall::getSize) 211 .orElse(0L); 212 } 213 214 public Map<String, Long> getCallQueueCountsSummary() { 215 return queues.stream().flatMap(Collection::stream).map(RpcExecutor::getMethodName) 216 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); 217 } 218 219 public Map<String, Long> getCallQueueSizeSummary() { 220 return queues.stream().flatMap(Collection::stream) 221 .map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner))) 222 .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); 223 } 224 225 protected void initializeQueues(final int numQueues) { 226 if (queueInitArgs.length > 0) { 227 currentQueueLimit = (int) queueInitArgs[0]; 228 queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 229 } 230 for (int i = 0; i < numQueues; ++i) { 231 queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); 232 } 233 } 234 235 public void start(final int port) { 236 startHandlers(port); 237 } 238 239 public void stop() { 240 for (RpcHandler handler : handlers) { 241 handler.stopRunning(); 242 handler.interrupt(); 243 } 244 } 245 246 /** Add the request to the executor queue */ 247 public abstract boolean dispatch(final CallRunner callTask); 248 249 /** Returns the list of request queues */ 250 protected List<BlockingQueue<CallRunner>> getQueues() { 251 return queues; 252 } 253 254 protected void startHandlers(final int port) { 255 List<BlockingQueue<CallRunner>> callQueues = getQueues(); 256 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); 257 } 258 259 /** 260 * Override if providing alternate Handler implementation. 261 */ 262 protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, 263 final int handlerCount, final BlockingQueue<CallRunner> q, 264 final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, 265 final Abortable abortable) { 266 return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, 267 failedHandlerCount, abortable); 268 } 269 270 /** 271 * Start up our handlers. 272 */ 273 protected void startHandlers(final String nameSuffix, final int numHandlers, 274 final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, 275 final int port, final AtomicInteger activeHandlerCount) { 276 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); 277 double handlerFailureThreshhold = conf == null 278 ? 1.0 279 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 280 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 281 for (int i = 0; i < numHandlers; i++) { 282 final int index = qindex + (i % qsize); 283 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index 284 + ",port=" + port; 285 RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount, 286 callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable); 287 handler.start(); 288 handlers.add(handler); 289 } 290 LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", 291 handlers.size(), threadPrefix, qsize, port); 292 } 293 294 /** 295 * All requests go to the first queue, at index 0 296 */ 297 private static final QueueBalancer ONE_QUEUE = val -> 0; 298 299 public static QueueBalancer getBalancer(final String executorName, final Configuration conf, 300 final List<BlockingQueue<CallRunner>> queues) { 301 Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); 302 if (queues.size() == 1) { 303 return ONE_QUEUE; 304 } else { 305 Class<?> balancerClass = 306 conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); 307 return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); 308 } 309 } 310 311 /** 312 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It 313 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have 314 * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out) 315 * manner. 316 */ 317 private static class CallPriorityComparator implements Comparator<CallRunner> { 318 private final static int DEFAULT_MAX_CALL_DELAY = 5000; 319 320 private final PriorityFunction priority; 321 private final int maxDelay; 322 323 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { 324 this.priority = priority; 325 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); 326 } 327 328 @Override 329 public int compare(CallRunner a, CallRunner b) { 330 RpcCall callA = a.getRpcCall(); 331 RpcCall callB = b.getRpcCall(); 332 long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); 333 long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); 334 deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); 335 deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); 336 return Long.compare(deadlineA, deadlineB); 337 } 338 } 339 340 public static boolean isDeadlineQueueType(final String callQueueType) { 341 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 342 } 343 344 public static boolean isCodelQueueType(final String callQueueType) { 345 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 346 } 347 348 public static boolean isFifoQueueType(final String callQueueType) { 349 return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 350 } 351 352 public static boolean isPluggableQueueType(String callQueueType) { 353 return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); 354 } 355 356 public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) { 357 return isPluggableQueueType(callQueueType) 358 && conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false); 359 } 360 361 private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() { 362 String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME); 363 364 if (queueClassName == null) { 365 LOG.error( 366 "Pluggable queue class config at " + PLUGGABLE_CALL_QUEUE_CLASS_NAME + " was not found"); 367 return Optional.empty(); 368 } 369 370 try { 371 Class<?> clazz = Class.forName(queueClassName); 372 373 if (BlockingQueue.class.isAssignableFrom(clazz)) { 374 return Optional.of((Class<? extends BlockingQueue<CallRunner>>) clazz); 375 } else { 376 LOG.error( 377 "Pluggable Queue class " + queueClassName + " does not extend BlockingQueue<CallRunner>"); 378 return Optional.empty(); 379 } 380 } catch (ClassNotFoundException exception) { 381 LOG.error("Could not find " + queueClassName + " on the classpath to load."); 382 return Optional.empty(); 383 } 384 } 385 386 public long getNumGeneralCallsDropped() { 387 return numGeneralCallsDropped.longValue(); 388 } 389 390 public long getNumLifoModeSwitches() { 391 return numLifoModeSwitches.longValue(); 392 } 393 394 public int getActiveHandlerCount() { 395 return activeHandlerCount.get(); 396 } 397 398 public int getActiveWriteHandlerCount() { 399 return 0; 400 } 401 402 public int getActiveReadHandlerCount() { 403 return 0; 404 } 405 406 public int getActiveScanHandlerCount() { 407 return 0; 408 } 409 410 /** Returns the length of the pending queue */ 411 public int getQueueLength() { 412 int length = 0; 413 for (final BlockingQueue<CallRunner> queue : queues) { 414 length += queue.size(); 415 } 416 return length; 417 } 418 419 public int getReadQueueLength() { 420 return 0; 421 } 422 423 public int getScanQueueLength() { 424 return 0; 425 } 426 427 public int getWriteQueueLength() { 428 return 0; 429 } 430 431 public String getName() { 432 return this.name; 433 } 434 435 /** 436 * Update current soft limit for executor's call queues 437 * @param conf updated configuration 438 */ 439 public void resizeQueues(Configuration conf) { 440 String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; 441 if (name != null) { 442 if (name.toLowerCase(Locale.ROOT).contains("priority")) { 443 configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; 444 } else if (name.toLowerCase(Locale.ROOT).contains("replication")) { 445 configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH; 446 } else if (name.toLowerCase(Locale.ROOT).contains("bulkload")) { 447 configKey = RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH; 448 } 449 } 450 final int queueLimit = currentQueueLimit; 451 currentQueueLimit = conf.getInt(configKey, queueLimit); 452 } 453 454 public void onConfigurationChange(Configuration conf) { 455 // update CoDel Scheduler tunables 456 int codelTargetDelay = 457 conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 458 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 459 double codelLifoThreshold = 460 conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 461 462 for (BlockingQueue<CallRunner> queue : queues) { 463 if (queue instanceof AdaptiveLifoCoDelCallQueue) { 464 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, 465 codelLifoThreshold); 466 } else if (queue instanceof ConfigurationObserver) { 467 ((ConfigurationObserver) queue).onConfigurationChange(conf); 468 } 469 } 470 } 471}