001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Locale;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.LongAdder;
030import java.util.Map;
031import java.util.HashMap;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
041import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
042import org.apache.hadoop.hbase.util.ReflectionUtils;
043import org.apache.hadoop.util.StringUtils;
044
045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
046import org.apache.hbase.thirdparty.com.google.common.base.Strings;
047
048/**
049 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
050 * scheduling behavior.
051 */
052@InterfaceAudience.Private
053public abstract class RpcExecutor {
054  private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
055
056  protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
057  public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
058
059  /** max delay in msec used to bound the deprioritized requests */
060  public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
061
062  /**
063   * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
064   * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
065   * throughput.
066   */
067  public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
068  public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
069  public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
070  public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
071  public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
072
073  // These 3 are only used by Codel executor
074  public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
075  public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
076  public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
077
078  public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
079  public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
080  public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
081
082  private LongAdder numGeneralCallsDropped = new LongAdder();
083  private LongAdder numLifoModeSwitches = new LongAdder();
084
085  protected final int numCallQueues;
086  protected final List<BlockingQueue<CallRunner>> queues;
087  private final Class<? extends BlockingQueue> queueClass;
088  private final Object[] queueInitArgs;
089
090  private final PriorityFunction priority;
091
092  protected volatile int currentQueueLimit;
093
094  private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
095  private final List<Handler> handlers;
096  private final int handlerCount;
097  private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
098
099  private String name;
100  private boolean running;
101
102  private Configuration conf = null;
103  private Abortable abortable = null;
104
105  public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
106      final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
107    this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY,
108      CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable);
109  }
110
111  public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
112      final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
113      final Abortable abortable) {
114    this.name = Strings.nullToEmpty(name);
115    this.conf = conf;
116    this.abortable = abortable;
117
118    float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
119    this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
120    this.queues = new ArrayList<>(this.numCallQueues);
121
122    this.handlerCount = Math.max(handlerCount, this.numCallQueues);
123    this.handlers = new ArrayList<>(this.handlerCount);
124
125    this.priority = priority;
126
127    if (isDeadlineQueueType(callQueueType)) {
128      this.name += ".Deadline";
129      this.queueInitArgs = new Object[] { maxQueueLength,
130        new CallPriorityComparator(conf, this.priority) };
131      this.queueClass = BoundedPriorityBlockingQueue.class;
132    } else if (isCodelQueueType(callQueueType)) {
133      this.name += ".Codel";
134      int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
135        CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
136      int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
137      double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
138        CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
139      queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
140          codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
141      queueClass = AdaptiveLifoCoDelCallQueue.class;
142    } else {
143      this.name += ".Fifo";
144      queueInitArgs = new Object[] { maxQueueLength };
145      queueClass = LinkedBlockingQueue.class;
146    }
147
148    LOG.info("Instantiated {} with queueClass={}; " +
149        "numCallQueues={}, maxQueueLength={}, handlerCount={}",
150        this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount);
151  }
152
153  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
154    return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
155  }
156
157  public Map<String, Long> getCallQueueCountsSummary() {
158    HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
159
160    for(BlockingQueue<CallRunner> queue: queues) {
161      for (CallRunner cr:queue) {
162        RpcCall rpcCall = cr.getRpcCall();
163
164        String method;
165
166        if (null==rpcCall.getMethod() ||
167             StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
168          method = "Unknown";
169        }
170
171        callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
172      }
173    }
174
175    return callQueueMethodTotalCount;
176  }
177
178  public Map<String, Long> getCallQueueSizeSummary() {
179    HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
180
181    for(BlockingQueue<CallRunner> queue: queues) {
182      for (CallRunner cr:queue) {
183        RpcCall rpcCall = cr.getRpcCall();
184        String method;
185
186        if (null==rpcCall.getMethod() ||
187          StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
188          method = "Unknown";
189        }
190
191        long size = rpcCall.getSize();
192
193        callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
194      }
195    }
196
197    return callQueueMethodTotalSize;
198  }
199
200
201  protected void initializeQueues(final int numQueues) {
202    if (queueInitArgs.length > 0) {
203      currentQueueLimit = (int) queueInitArgs[0];
204      queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
205    }
206    for (int i = 0; i < numQueues; ++i) {
207      queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs));
208    }
209  }
210
211  public void start(final int port) {
212    running = true;
213    startHandlers(port);
214  }
215
216  public void stop() {
217    running = false;
218    for (Thread handler : handlers) {
219      handler.interrupt();
220    }
221  }
222
223  /** Add the request to the executor queue */
224  public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
225
226  /** Returns the list of request queues */
227  protected List<BlockingQueue<CallRunner>> getQueues() {
228    return queues;
229  }
230
231  protected void startHandlers(final int port) {
232    List<BlockingQueue<CallRunner>> callQueues = getQueues();
233    startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount);
234  }
235
236  /**
237   * Override if providing alternate Handler implementation.
238   */
239  protected Handler getHandler(final String name, final double handlerFailureThreshhold,
240      final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
241    return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
242  }
243
244  /**
245   * Start up our handlers.
246   */
247  protected void startHandlers(final String nameSuffix, final int numHandlers,
248      final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
249      final int port, final AtomicInteger activeHandlerCount) {
250    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
251    double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
252      HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
253      HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
254    for (int i = 0; i < numHandlers; i++) {
255      final int index = qindex + (i % qsize);
256      String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
257          + ",port=" + port;
258      Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
259        activeHandlerCount);
260      handler.start();
261      handlers.add(handler);
262    }
263    LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}",
264        handlers.size(), threadPrefix, qsize, port);
265  }
266
267  /**
268   * Handler thread run the {@link CallRunner#run()} in.
269   */
270  protected class Handler extends Thread {
271    /**
272     * Q to find CallRunners to run in.
273     */
274    final BlockingQueue<CallRunner> q;
275
276    final double handlerFailureThreshhold;
277
278    // metrics (shared with other handlers)
279    final AtomicInteger activeHandlerCount;
280
281    Handler(final String name, final double handlerFailureThreshhold,
282        final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
283      super(name);
284      setDaemon(true);
285      this.q = q;
286      this.handlerFailureThreshhold = handlerFailureThreshhold;
287      this.activeHandlerCount = activeHandlerCount;
288    }
289
290    /**
291     * @return A {@link CallRunner}
292     * @throws InterruptedException
293     */
294    protected CallRunner getCallRunner() throws InterruptedException {
295      return this.q.take();
296    }
297
298    @Override
299    public void run() {
300      boolean interrupted = false;
301      try {
302        while (running) {
303          try {
304            run(getCallRunner());
305          } catch (InterruptedException e) {
306            interrupted = true;
307          }
308        }
309      } catch (Exception e) {
310        LOG.warn(e.toString(), e);
311        throw e;
312      } finally {
313        if (interrupted) {
314          Thread.currentThread().interrupt();
315        }
316      }
317    }
318
319    private void run(CallRunner cr) {
320      MonitoredRPCHandler status = RpcServer.getStatus();
321      cr.setStatus(status);
322      try {
323        this.activeHandlerCount.incrementAndGet();
324        cr.run();
325      } catch (Throwable e) {
326        if (e instanceof Error) {
327          int failedCount = failedHandlerCount.incrementAndGet();
328          if (this.handlerFailureThreshhold >= 0
329              && failedCount > handlerCount * this.handlerFailureThreshhold) {
330            String message = "Number of failed RpcServer handler runs exceeded threshhold "
331                + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
332            if (abortable != null) {
333              abortable.abort(message, e);
334            } else {
335              LOG.error("Error but can't abort because abortable is null: "
336                  + StringUtils.stringifyException(e));
337              throw e;
338            }
339          } else {
340            LOG.warn("Handler errors " + StringUtils.stringifyException(e));
341          }
342        } else {
343          LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
344        }
345      } finally {
346        this.activeHandlerCount.decrementAndGet();
347      }
348    }
349  }
350
351  public static abstract class QueueBalancer {
352    /**
353     * @return the index of the next queue to which a request should be inserted
354     */
355    public abstract int getNextQueue();
356  }
357
358  public static QueueBalancer getBalancer(int queueSize) {
359    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
360    if (queueSize == 1) {
361      return ONE_QUEUE;
362    } else {
363      return new RandomQueueBalancer(queueSize);
364    }
365  }
366
367  /**
368   * All requests go to the first queue, at index 0
369   */
370  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
371    @Override
372    public int getNextQueue() {
373      return 0;
374    }
375  };
376
377  /**
378   * Queue balancer that just randomly selects a queue in the range [0, num queues).
379   */
380  private static class RandomQueueBalancer extends QueueBalancer {
381    private final int queueSize;
382
383    public RandomQueueBalancer(int queueSize) {
384      this.queueSize = queueSize;
385    }
386
387    @Override
388    public int getNextQueue() {
389      return ThreadLocalRandom.current().nextInt(queueSize);
390    }
391  }
392
393  /**
394   * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
395   * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
396   * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out)
397   * manner.
398   */
399  private static class CallPriorityComparator implements Comparator<CallRunner> {
400    private final static int DEFAULT_MAX_CALL_DELAY = 5000;
401
402    private final PriorityFunction priority;
403    private final int maxDelay;
404
405    public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
406      this.priority = priority;
407      this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
408    }
409
410    @Override
411    public int compare(CallRunner a, CallRunner b) {
412      RpcCall callA = a.getRpcCall();
413      RpcCall callB = b.getRpcCall();
414      long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam());
415      long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam());
416      deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay);
417      deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay);
418      return Long.compare(deadlineA, deadlineB);
419    }
420  }
421
422  public static boolean isDeadlineQueueType(final String callQueueType) {
423    return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
424  }
425
426  public static boolean isCodelQueueType(final String callQueueType) {
427    return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
428  }
429
430  public static boolean isFifoQueueType(final String callQueueType) {
431    return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
432  }
433
434  public long getNumGeneralCallsDropped() {
435    return numGeneralCallsDropped.longValue();
436  }
437
438  public long getNumLifoModeSwitches() {
439    return numLifoModeSwitches.longValue();
440  }
441
442  public int getActiveHandlerCount() {
443    return activeHandlerCount.get();
444  }
445
446  public int getActiveWriteHandlerCount() {
447    return 0;
448  }
449
450  public int getActiveReadHandlerCount() {
451    return 0;
452  }
453
454  public int getActiveScanHandlerCount() {
455    return 0;
456  }
457
458  /** Returns the length of the pending queue */
459  public int getQueueLength() {
460    int length = 0;
461    for (final BlockingQueue<CallRunner> queue: queues) {
462      length += queue.size();
463    }
464    return length;
465  }
466
467  public int getReadQueueLength() {
468    return 0;
469  }
470
471  public int getScanQueueLength() {
472    return 0;
473  }
474
475  public int getWriteQueueLength() {
476    return 0;
477  }
478
479  public String getName() {
480    return this.name;
481  }
482
483  /**
484   * Update current soft limit for executor's call queues
485   * @param conf updated configuration
486   */
487  public void resizeQueues(Configuration conf) {
488    String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
489    if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
490      configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
491    }
492    currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
493  }
494
495  public void onConfigurationChange(Configuration conf) {
496    // update CoDel Scheduler tunables
497    int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
498      CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
499    int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
500    double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
501      CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
502
503    for (BlockingQueue<CallRunner> queue : queues) {
504      if (queue instanceof AdaptiveLifoCoDelCallQueue) {
505        ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
506          codelLifoThreshold);
507      }
508    }
509  }
510}