001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.ipc; 020 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Locale; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.LongAdder; 030import java.util.Map; 031import java.util.HashMap; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 041import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.util.StringUtils; 044 045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 046import org.apache.hbase.thirdparty.com.google.common.base.Strings; 047 048/** 049 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular 050 * scheduling behavior. 051 */ 052@InterfaceAudience.Private 053public abstract class RpcExecutor { 054 private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); 055 056 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; 057 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; 058 059 /** max delay in msec used to bound the deprioritized requests */ 060 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay"; 061 062 /** 063 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority 064 * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced 065 * throughput. 066 */ 067 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; 068 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; 069 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; 070 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; 071 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; 072 073 // These 3 are only used by Codel executor 074 public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay"; 075 public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval"; 076 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold"; 077 078 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; 079 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; 080 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; 081 082 private LongAdder numGeneralCallsDropped = new LongAdder(); 083 private LongAdder numLifoModeSwitches = new LongAdder(); 084 085 protected final int numCallQueues; 086 protected final List<BlockingQueue<CallRunner>> queues; 087 private final Class<? extends BlockingQueue> queueClass; 088 private final Object[] queueInitArgs; 089 090 private final PriorityFunction priority; 091 092 protected volatile int currentQueueLimit; 093 094 private final AtomicInteger activeHandlerCount = new AtomicInteger(0); 095 private final List<Handler> handlers; 096 private final int handlerCount; 097 private final AtomicInteger failedHandlerCount = new AtomicInteger(0); 098 099 private String name; 100 private boolean running; 101 102 private Configuration conf = null; 103 private Abortable abortable = null; 104 105 public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, 106 final PriorityFunction priority, final Configuration conf, final Abortable abortable) { 107 this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, 108 CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable); 109 } 110 111 public RpcExecutor(final String name, final int handlerCount, final String callQueueType, 112 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 113 final Abortable abortable) { 114 this.name = Strings.nullToEmpty(name); 115 this.conf = conf; 116 this.abortable = abortable; 117 118 float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); 119 if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0 || 120 Float.compare(0.0f, callQueuesHandlersFactor) > 0) { 121 LOG.warn(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + 122 " is *ILLEGAL*, it should be in range [0.0, 1.0]"); 123 // For callQueuesHandlersFactor > 1.0, we just set it 1.0f. 124 if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) { 125 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " 1.0f"); 126 callQueuesHandlersFactor = 1.0f; 127 } else { 128 // But for callQueuesHandlersFactor < 0.0, following method #computeNumCallQueues 129 // will compute max(1, -x) => 1 which has same effect of default value. 130 LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " default value 0.0f"); 131 } 132 } 133 this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); 134 this.queues = new ArrayList<>(this.numCallQueues); 135 136 this.handlerCount = Math.max(handlerCount, this.numCallQueues); 137 this.handlers = new ArrayList<>(this.handlerCount); 138 139 this.priority = priority; 140 141 if (isDeadlineQueueType(callQueueType)) { 142 this.name += ".Deadline"; 143 this.queueInitArgs = new Object[] { maxQueueLength, 144 new CallPriorityComparator(conf, this.priority) }; 145 this.queueClass = BoundedPriorityBlockingQueue.class; 146 } else if (isCodelQueueType(callQueueType)) { 147 this.name += ".Codel"; 148 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 149 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 150 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 151 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 152 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 153 queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, 154 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; 155 queueClass = AdaptiveLifoCoDelCallQueue.class; 156 } else { 157 this.name += ".Fifo"; 158 queueInitArgs = new Object[] { maxQueueLength }; 159 queueClass = LinkedBlockingQueue.class; 160 } 161 162 LOG.info("Instantiated {} with queueClass={}; " + 163 "numCallQueues={}, maxQueueLength={}, handlerCount={}", 164 this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); 165 } 166 167 protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { 168 return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); 169 } 170 171 public Map<String, Long> getCallQueueCountsSummary() { 172 HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>(); 173 174 for(BlockingQueue<CallRunner> queue: queues) { 175 for (CallRunner cr:queue) { 176 RpcCall rpcCall = cr.getRpcCall(); 177 178 String method; 179 180 if (null==rpcCall.getMethod() || 181 StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { 182 method = "Unknown"; 183 } 184 185 callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L)); 186 } 187 } 188 189 return callQueueMethodTotalCount; 190 } 191 192 public Map<String, Long> getCallQueueSizeSummary() { 193 HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>(); 194 195 for(BlockingQueue<CallRunner> queue: queues) { 196 for (CallRunner cr:queue) { 197 RpcCall rpcCall = cr.getRpcCall(); 198 String method; 199 200 if (null==rpcCall.getMethod() || 201 StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { 202 method = "Unknown"; 203 } 204 205 long size = rpcCall.getSize(); 206 207 callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L)); 208 } 209 } 210 211 return callQueueMethodTotalSize; 212 } 213 214 215 protected void initializeQueues(final int numQueues) { 216 if (queueInitArgs.length > 0) { 217 currentQueueLimit = (int) queueInitArgs[0]; 218 queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); 219 } 220 for (int i = 0; i < numQueues; ++i) { 221 queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); 222 } 223 } 224 225 public void start(final int port) { 226 running = true; 227 startHandlers(port); 228 } 229 230 public void stop() { 231 running = false; 232 for (Thread handler : handlers) { 233 handler.interrupt(); 234 } 235 } 236 237 /** Add the request to the executor queue */ 238 public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; 239 240 /** Returns the list of request queues */ 241 protected List<BlockingQueue<CallRunner>> getQueues() { 242 return queues; 243 } 244 245 protected void startHandlers(final int port) { 246 List<BlockingQueue<CallRunner>> callQueues = getQueues(); 247 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); 248 } 249 250 /** 251 * Override if providing alternate Handler implementation. 252 */ 253 protected Handler getHandler(final String name, final double handlerFailureThreshhold, 254 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) { 255 return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount); 256 } 257 258 /** 259 * Start up our handlers. 260 */ 261 protected void startHandlers(final String nameSuffix, final int numHandlers, 262 final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, 263 final int port, final AtomicInteger activeHandlerCount) { 264 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); 265 double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble( 266 HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, 267 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); 268 for (int i = 0; i < numHandlers; i++) { 269 final int index = qindex + (i % qsize); 270 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index 271 + ",port=" + port; 272 Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), 273 activeHandlerCount); 274 handler.start(); 275 handlers.add(handler); 276 } 277 LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}", 278 handlers.size(), threadPrefix, qsize, port); 279 } 280 281 /** 282 * Handler thread run the {@link CallRunner#run()} in. 283 */ 284 protected class Handler extends Thread { 285 /** 286 * Q to find CallRunners to run in. 287 */ 288 final BlockingQueue<CallRunner> q; 289 290 final double handlerFailureThreshhold; 291 292 // metrics (shared with other handlers) 293 final AtomicInteger activeHandlerCount; 294 295 Handler(final String name, final double handlerFailureThreshhold, 296 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) { 297 super(name); 298 setDaemon(true); 299 this.q = q; 300 this.handlerFailureThreshhold = handlerFailureThreshhold; 301 this.activeHandlerCount = activeHandlerCount; 302 } 303 304 /** 305 * @return A {@link CallRunner} 306 * @throws InterruptedException 307 */ 308 protected CallRunner getCallRunner() throws InterruptedException { 309 return this.q.take(); 310 } 311 312 @Override 313 public void run() { 314 boolean interrupted = false; 315 try { 316 while (running) { 317 try { 318 run(getCallRunner()); 319 } catch (InterruptedException e) { 320 interrupted = true; 321 } 322 } 323 } catch (Exception e) { 324 LOG.warn(e.toString(), e); 325 throw e; 326 } finally { 327 if (interrupted) { 328 Thread.currentThread().interrupt(); 329 } 330 } 331 } 332 333 private void run(CallRunner cr) { 334 MonitoredRPCHandler status = RpcServer.getStatus(); 335 cr.setStatus(status); 336 try { 337 this.activeHandlerCount.incrementAndGet(); 338 cr.run(); 339 } catch (Throwable e) { 340 if (e instanceof Error) { 341 int failedCount = failedHandlerCount.incrementAndGet(); 342 if (this.handlerFailureThreshhold >= 0 343 && failedCount > handlerCount * this.handlerFailureThreshhold) { 344 String message = "Number of failed RpcServer handler runs exceeded threshhold " 345 + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); 346 if (abortable != null) { 347 abortable.abort(message, e); 348 } else { 349 LOG.error("Error but can't abort because abortable is null: " 350 + StringUtils.stringifyException(e)); 351 throw e; 352 } 353 } else { 354 LOG.warn("Handler errors " + StringUtils.stringifyException(e)); 355 } 356 } else { 357 LOG.warn("Handler exception " + StringUtils.stringifyException(e)); 358 } 359 } finally { 360 this.activeHandlerCount.decrementAndGet(); 361 } 362 } 363 } 364 365 public static abstract class QueueBalancer { 366 /** 367 * @return the index of the next queue to which a request should be inserted 368 */ 369 public abstract int getNextQueue(); 370 } 371 372 public static QueueBalancer getBalancer(int queueSize) { 373 Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); 374 if (queueSize == 1) { 375 return ONE_QUEUE; 376 } else { 377 return new RandomQueueBalancer(queueSize); 378 } 379 } 380 381 /** 382 * All requests go to the first queue, at index 0 383 */ 384 private static QueueBalancer ONE_QUEUE = new QueueBalancer() { 385 @Override 386 public int getNextQueue() { 387 return 0; 388 } 389 }; 390 391 /** 392 * Queue balancer that just randomly selects a queue in the range [0, num queues). 393 */ 394 private static class RandomQueueBalancer extends QueueBalancer { 395 private final int queueSize; 396 397 public RandomQueueBalancer(int queueSize) { 398 this.queueSize = queueSize; 399 } 400 401 @Override 402 public int getNextQueue() { 403 return ThreadLocalRandom.current().nextInt(queueSize); 404 } 405 } 406 407 /** 408 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It 409 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have 410 * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out) 411 * manner. 412 */ 413 private static class CallPriorityComparator implements Comparator<CallRunner> { 414 private final static int DEFAULT_MAX_CALL_DELAY = 5000; 415 416 private final PriorityFunction priority; 417 private final int maxDelay; 418 419 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { 420 this.priority = priority; 421 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); 422 } 423 424 @Override 425 public int compare(CallRunner a, CallRunner b) { 426 RpcCall callA = a.getRpcCall(); 427 RpcCall callB = b.getRpcCall(); 428 long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); 429 long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); 430 deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); 431 deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); 432 return Long.compare(deadlineA, deadlineB); 433 } 434 } 435 436 public static boolean isDeadlineQueueType(final String callQueueType) { 437 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); 438 } 439 440 public static boolean isCodelQueueType(final String callQueueType) { 441 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); 442 } 443 444 public static boolean isFifoQueueType(final String callQueueType) { 445 return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); 446 } 447 448 public long getNumGeneralCallsDropped() { 449 return numGeneralCallsDropped.longValue(); 450 } 451 452 public long getNumLifoModeSwitches() { 453 return numLifoModeSwitches.longValue(); 454 } 455 456 public int getActiveHandlerCount() { 457 return activeHandlerCount.get(); 458 } 459 460 public int getActiveWriteHandlerCount() { 461 return 0; 462 } 463 464 public int getActiveReadHandlerCount() { 465 return 0; 466 } 467 468 public int getActiveScanHandlerCount() { 469 return 0; 470 } 471 472 /** Returns the length of the pending queue */ 473 public int getQueueLength() { 474 int length = 0; 475 for (final BlockingQueue<CallRunner> queue: queues) { 476 length += queue.size(); 477 } 478 return length; 479 } 480 481 public int getReadQueueLength() { 482 return 0; 483 } 484 485 public int getScanQueueLength() { 486 return 0; 487 } 488 489 public int getWriteQueueLength() { 490 return 0; 491 } 492 493 public String getName() { 494 return this.name; 495 } 496 497 /** 498 * Update current soft limit for executor's call queues 499 * @param conf updated configuration 500 */ 501 public void resizeQueues(Configuration conf) { 502 String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; 503 if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { 504 configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; 505 } 506 currentQueueLimit = conf.getInt(configKey, currentQueueLimit); 507 } 508 509 public void onConfigurationChange(Configuration conf) { 510 // update CoDel Scheduler tunables 511 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 512 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); 513 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); 514 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 515 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); 516 517 for (BlockingQueue<CallRunner> queue : queues) { 518 if (queue instanceof AdaptiveLifoCoDelCallQueue) { 519 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, 520 codelLifoThreshold); 521 } 522 } 523 } 524}