001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Locale; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.LongAdder; 030import java.util.Map; 031import java.util.HashMap; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 041import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.util.StringUtils; 044 045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 046import org.apache.hbase.thirdparty.com.google.common.base.Strings; 047 048/** 049 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular 050 * scheduling behavior. 051 */ 052@InterfaceAudience.Private 053public abstract class RpcExecutor { 054 private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); 055 056 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; 057 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; 058 059 /** max delay in msec used to bound the deprioritized requests */ 060 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay"; 061 062 /** 063 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority 064 * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced 065 * throughput. 066 */ 067 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; 068 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; 069 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; 070 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; 071 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; 072 073 // These 3 are only used by Codel executor 074 public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay"; 075 public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval"; 076 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold"; 077 078 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; 079 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; 080 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; 081 082 private LongAdder numGeneralCallsDropped = new LongAdder(); 083 private LongAdder numLifoModeSwitches = new LongAdder(); 084 085 protected final int numCallQueues; 086 protected final List<BlockingQueue<CallRunner>> queues; 087 private final Class<? extends BlockingQueue> queueClass; 088 private final Object[] queueInitArgs; 089 090 private final PriorityFunction priority; 091 092 protected volatile int currentQueueLimit; 093 094 private final AtomicInteger activeHandlerCount = new AtomicInteger(0); 095 private final List<Handler> handlers; 096 private final int handlerCount; 097 private final AtomicInteger failedHandlerCount = new AtomicInteger(0); 098 099 private String name; 100 private boolean running; 101 102 private Configuration conf = null; 103 private Abortable abortable = null; 104 105 public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, 106 final PriorityFunction priority, final Configuration conf, final Abortable abortable) { 107 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, 108 CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable); 109 } 110 111 public RpcExecutor(final String name, final int handlerCount, final String callQueueType, 112 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 113 final Abortable abortable) { 114 this.name = Strings.nullToEmpty(name); 115 this.conf = conf; 116 this.abortable = abortable; 117 118 float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); 119 this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); 120 this.queues = new ArrayList<>(this.numCallQueues); 121 122 this.handlerCount = Math.max(handlerCount, this.numCallQueues); 123 this.handlers = new ArrayList<>(this.handlerCount); 124 125 this.priority = priority; 126 127 if (isDeadlineQueueType(callQueueType)) { 128 this.name += ".Deadline"; 129 this.queueInitArgs = new Object[] { maxQueueLength, 130 new CallPriorityComparator(conf, this.priority) }; 131 this.queueClass = BoundedPriorityBlockingQueue.class; 132 } else if (isCodelQueueType(callQueueType)) { 133 this.name += ".Codel"; 134 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 135 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 136 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 137 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 138 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 139 queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, 140 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; 141 queueClass = AdaptiveLifoCoDelCallQueue.class; 142 } else { 143 this.name += ".Fifo"; 144 queueInitArgs = new Object[] { maxQueueLength }; 145 queueClass = LinkedBlockingQueue.class; 146 } 147 148 LOG.info("Instantiated {} with queueClass={}; " + 149 "numCallQueues={}, maxQueueLength={}, handlerCount={}", 150 this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); 151 } 152 153 protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { 154 return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); 155 } 156 157 public Map<String, Long> getCallQueueCountsSummary() { 158 HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>(); 159 160 for(BlockingQueue<CallRunner> queue: queues) { 161 for (CallRunner cr:queue) { 162 RpcCall rpcCall = cr.getRpcCall(); 163 164 String method; 165 166 if (null==rpcCall.getMethod() || 167 StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { 168 method = "Unknown"; 169 } 170 171 callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L)); 172 } 173 } 174 175 return callQueueMethodTotalCount; 176 } 177 178 public Map<String, Long> getCallQueueSizeSummary() { 179 HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>(); 180 181 for(BlockingQueue<CallRunner> queue: queues) { 182 for (CallRunner cr:queue) { 183 RpcCall rpcCall = cr.getRpcCall(); 184 String method; 185 186 if (null==rpcCall.getMethod() || 187 StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { 188 method = "Unknown"; 189 } 190 191 long size = rpcCall.getSize(); 192 193 callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L)); 194 } 195 } 196 197 return callQueueMethodTotalSize; 198 } 199 200 201 protected void initializeQueues(final int numQueues) { 202 if (queueInitArgs.length > 0) { 203 currentQueueLimit = (int) queueInitArgs[0]; 204 queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 205 } 206 for (int i = 0; i < numQueues; ++i) { 207 queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); 208 } 209 } 210 211 public void start(final int port) { 212 running = true; 213 startHandlers(port); 214 } 215 216 public void stop() { 217 running = false; 218 for (Thread handler : handlers) { 219 handler.interrupt(); 220 } 221 } 222 223 /** Add the request to the executor queue */ 224 public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; 225 226 /** Returns the list of request queues */ 227 protected List<BlockingQueue<CallRunner>> getQueues() { 228 return queues; 229 } 230 231 protected void startHandlers(final int port) { 232 List<BlockingQueue<CallRunner>> callQueues = getQueues(); 233 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); 234 } 235 236 /** 237 * Override if providing alternate Handler implementation. 238 */ 239 protected Handler getHandler(final String name, final double handlerFailureThreshhold, 240 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) { 241 return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount); 242 } 243 244 /** 245 * Start up our handlers. 246 */ 247 protected void startHandlers(final String nameSuffix, final int numHandlers, 248 final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, 249 final int port, final AtomicInteger activeHandlerCount) { 250 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); 251 double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble( 252 HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 253 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 254 for (int i = 0; i < numHandlers; i++) { 255 final int index = qindex + (i % qsize); 256 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index 257 + ",port=" + port; 258 Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), 259 activeHandlerCount); 260 handler.start(); 261 handlers.add(handler); 262 } 263 LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", 264 handlers.size(), threadPrefix, qsize, port); 265 } 266 267 /** 268 * Handler thread run the {@link CallRunner#run()} in. 269 */ 270 protected class Handler extends Thread { 271 /** 272 * Q to find CallRunners to run in. 273 */ 274 final BlockingQueue<CallRunner> q; 275 276 final double handlerFailureThreshhold; 277 278 // metrics (shared with other handlers) 279 final AtomicInteger activeHandlerCount; 280 281 Handler(final String name, final double handlerFailureThreshhold, 282 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) { 283 super(name); 284 setDaemon(true); 285 this.q = q; 286 this.handlerFailureThreshhold = handlerFailureThreshhold; 287 this.activeHandlerCount = activeHandlerCount; 288 } 289 290 /** 291 * @return A {@link CallRunner} 292 * @throws InterruptedException 293 */ 294 protected CallRunner getCallRunner() throws InterruptedException { 295 return this.q.take(); 296 } 297 298 @Override 299 public void run() { 300 boolean interrupted = false; 301 try { 302 while (running) { 303 try { 304 run(getCallRunner()); 305 } catch (InterruptedException e) { 306 interrupted = true; 307 } 308 } 309 } catch (Exception e) { 310 LOG.warn(e.toString(), e); 311 throw e; 312 } finally { 313 if (interrupted) { 314 Thread.currentThread().interrupt(); 315 } 316 } 317 } 318 319 private void run(CallRunner cr) { 320 MonitoredRPCHandler status = RpcServer.getStatus(); 321 cr.setStatus(status); 322 try { 323 this.activeHandlerCount.incrementAndGet(); 324 cr.run(); 325 } catch (Throwable e) { 326 if (e instanceof Error) { 327 int failedCount = failedHandlerCount.incrementAndGet(); 328 if (this.handlerFailureThreshhold >= 0 329 && failedCount > handlerCount * this.handlerFailureThreshhold) { 330 String message = "Number of failed RpcServer handler runs exceeded threshhold " 331 + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); 332 if (abortable != null) { 333 abortable.abort(message, e); 334 } else { 335 LOG.error("Error but can't abort because abortable is null: " 336 + StringUtils.stringifyException(e)); 337 throw e; 338 } 339 } else { 340 LOG.warn("Handler errors " + StringUtils.stringifyException(e)); 341 } 342 } else { 343 LOG.warn("Handler exception " + StringUtils.stringifyException(e)); 344 } 345 } finally { 346 this.activeHandlerCount.decrementAndGet(); 347 } 348 } 349 } 350 351 public static abstract class QueueBalancer { 352 /** 353 * @return the index of the next queue to which a request should be inserted 354 */ 355 public abstract int getNextQueue(); 356 } 357 358 public static QueueBalancer getBalancer(int queueSize) { 359 Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); 360 if (queueSize == 1) { 361 return ONE_QUEUE; 362 } else { 363 return new RandomQueueBalancer(queueSize); 364 } 365 } 366 367 /** 368 * All requests go to the first queue, at index 0 369 */ 370 private static QueueBalancer ONE_QUEUE = new QueueBalancer() { 371 @Override 372 public int getNextQueue() { 373 return 0; 374 } 375 }; 376 377 /** 378 * Queue balancer that just randomly selects a queue in the range [0, num queues). 379 */ 380 private static class RandomQueueBalancer extends QueueBalancer { 381 private final int queueSize; 382 383 public RandomQueueBalancer(int queueSize) { 384 this.queueSize = queueSize; 385 } 386 387 @Override 388 public int getNextQueue() { 389 return ThreadLocalRandom.current().nextInt(queueSize); 390 } 391 } 392 393 /** 394 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It 395 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have 396 * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out) 397 * manner. 398 */ 399 private static class CallPriorityComparator implements Comparator<CallRunner> { 400 private final static int DEFAULT_MAX_CALL_DELAY = 5000; 401 402 private final PriorityFunction priority; 403 private final int maxDelay; 404 405 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { 406 this.priority = priority; 407 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); 408 } 409 410 @Override 411 public int compare(CallRunner a, CallRunner b) { 412 RpcCall callA = a.getRpcCall(); 413 RpcCall callB = b.getRpcCall(); 414 long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); 415 long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); 416 deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); 417 deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); 418 return Long.compare(deadlineA, deadlineB); 419 } 420 } 421 422 public static boolean isDeadlineQueueType(final String callQueueType) { 423 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 424 } 425 426 public static boolean isCodelQueueType(final String callQueueType) { 427 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 428 } 429 430 public static boolean isFifoQueueType(final String callQueueType) { 431 return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 432 } 433 434 public long getNumGeneralCallsDropped() { 435 return numGeneralCallsDropped.longValue(); 436 } 437 438 public long getNumLifoModeSwitches() { 439 return numLifoModeSwitches.longValue(); 440 } 441 442 public int getActiveHandlerCount() { 443 return activeHandlerCount.get(); 444 } 445 446 public int getActiveWriteHandlerCount() { 447 return 0; 448 } 449 450 public int getActiveReadHandlerCount() { 451 return 0; 452 } 453 454 public int getActiveScanHandlerCount() { 455 return 0; 456 } 457 458 /** Returns the length of the pending queue */ 459 public int getQueueLength() { 460 int length = 0; 461 for (final BlockingQueue<CallRunner> queue: queues) { 462 length += queue.size(); 463 } 464 return length; 465 } 466 467 public int getReadQueueLength() { 468 return 0; 469 } 470 471 public int getScanQueueLength() { 472 return 0; 473 } 474 475 public int getWriteQueueLength() { 476 return 0; 477 } 478 479 public String getName() { 480 return this.name; 481 } 482 483 /** 484 * Update current soft limit for executor's call queues 485 * @param conf updated configuration 486 */ 487 public void resizeQueues(Configuration conf) { 488 String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; 489 if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { 490 configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; 491 } 492 currentQueueLimit = conf.getInt(configKey, currentQueueLimit); 493 } 494 495 public void onConfigurationChange(Configuration conf) { 496 // update CoDel Scheduler tunables 497 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 498 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 499 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 500 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 501 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 502 503 for (BlockingQueue<CallRunner> queue : queues) { 504 if (queue instanceof AdaptiveLifoCoDelCallQueue) { 505 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, 506 codelLifoThreshold); 507 } 508 } 509 } 510}