001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Locale;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.LongAdder;
030import java.util.Map;
031import java.util.HashMap;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
041import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
042import org.apache.hadoop.hbase.util.ReflectionUtils;
043import org.apache.hadoop.util.StringUtils;
044
045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
046import org.apache.hbase.thirdparty.com.google.common.base.Strings;
047
048/**
049 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
050 * scheduling behavior.
051 */
052@InterfaceAudience.Private
053public abstract class RpcExecutor {
054  private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
055
056  protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
057  public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
058
059  /** max delay in msec used to bound the deprioritized requests */
060  public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
061
062  /**
063   * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
064   * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
065   * throughput.
066   */
067  public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
068  public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
069  public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
070  public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
071  public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
072
073  // These 3 are only used by Codel executor
074  public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
075  public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
076  public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
077
078  public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
079  public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
080  public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
081
082  private LongAdder numGeneralCallsDropped = new LongAdder();
083  private LongAdder numLifoModeSwitches = new LongAdder();
084
085  protected final int numCallQueues;
086  protected final List<BlockingQueue<CallRunner>> queues;
087  private final Class<? extends BlockingQueue> queueClass;
088  private final Object[] queueInitArgs;
089
090  private final PriorityFunction priority;
091
092  protected volatile int currentQueueLimit;
093
094  private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
095  private final List<Handler> handlers;
096  private final int handlerCount;
097  private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
098
099  private String name;
100  private boolean running;
101
102  private Configuration conf = null;
103  private Abortable abortable = null;
104
105  public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
106      final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
107    this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY,
108      CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable);
109  }
110
111  public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
112      final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
113      final Abortable abortable) {
114    this.name = Strings.nullToEmpty(name);
115    this.conf = conf;
116    this.abortable = abortable;
117
118    float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
119    if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0 ||
120        Float.compare(0.0f, callQueuesHandlersFactor) > 0) {
121      LOG.warn(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY +
122        " is *ILLEGAL*, it should be in range [0.0, 1.0]");
123      // For callQueuesHandlersFactor > 1.0, we just set it 1.0f.
124      if (Float.compare(callQueuesHandlersFactor, 1.0f) > 0) {
125        LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " 1.0f");
126        callQueuesHandlersFactor = 1.0f;
127      } else {
128        // But for callQueuesHandlersFactor < 0.0, following method #computeNumCallQueues
129        // will compute max(1, -x) => 1 which has same effect of default value.
130        LOG.warn("Set " + CALL_QUEUE_HANDLER_FACTOR_CONF_KEY + " default value 0.0f");
131      }
132    }
133    this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
134    this.queues = new ArrayList<>(this.numCallQueues);
135
136    this.handlerCount = Math.max(handlerCount, this.numCallQueues);
137    this.handlers = new ArrayList<>(this.handlerCount);
138
139    this.priority = priority;
140
141    if (isDeadlineQueueType(callQueueType)) {
142      this.name += ".Deadline";
143      this.queueInitArgs = new Object[] { maxQueueLength,
144        new CallPriorityComparator(conf, this.priority) };
145      this.queueClass = BoundedPriorityBlockingQueue.class;
146    } else if (isCodelQueueType(callQueueType)) {
147      this.name += ".Codel";
148      int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
149        CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
150      int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
151      double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
152        CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
153      queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
154          codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
155      queueClass = AdaptiveLifoCoDelCallQueue.class;
156    } else {
157      this.name += ".Fifo";
158      queueInitArgs = new Object[] { maxQueueLength };
159      queueClass = LinkedBlockingQueue.class;
160    }
161
162    LOG.info("Instantiated {} with queueClass={}; " +
163        "numCallQueues={}, maxQueueLength={}, handlerCount={}",
164        this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount);
165  }
166
167  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
168    return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
169  }
170
171  public Map<String, Long> getCallQueueCountsSummary() {
172    HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
173
174    for(BlockingQueue<CallRunner> queue: queues) {
175      for (CallRunner cr:queue) {
176        RpcCall rpcCall = cr.getRpcCall();
177
178        String method;
179
180        if (null==rpcCall.getMethod() ||
181             StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
182          method = "Unknown";
183        }
184
185        callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
186      }
187    }
188
189    return callQueueMethodTotalCount;
190  }
191
192  public Map<String, Long> getCallQueueSizeSummary() {
193    HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
194
195    for(BlockingQueue<CallRunner> queue: queues) {
196      for (CallRunner cr:queue) {
197        RpcCall rpcCall = cr.getRpcCall();
198        String method;
199
200        if (null==rpcCall.getMethod() ||
201          StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
202          method = "Unknown";
203        }
204
205        long size = rpcCall.getSize();
206
207        callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
208      }
209    }
210
211    return callQueueMethodTotalSize;
212  }
213
214
215  protected void initializeQueues(final int numQueues) {
216    if (queueInitArgs.length > 0) {
217      currentQueueLimit = (int) queueInitArgs[0];
218      queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
219    }
220    for (int i = 0; i < numQueues; ++i) {
221      queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs));
222    }
223  }
224
225  public void start(final int port) {
226    running = true;
227    startHandlers(port);
228  }
229
230  public void stop() {
231    running = false;
232    for (Thread handler : handlers) {
233      handler.interrupt();
234    }
235  }
236
237  /** Add the request to the executor queue */
238  public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
239
240  /** Returns the list of request queues */
241  protected List<BlockingQueue<CallRunner>> getQueues() {
242    return queues;
243  }
244
245  protected void startHandlers(final int port) {
246    List<BlockingQueue<CallRunner>> callQueues = getQueues();
247    startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount);
248  }
249
250  /**
251   * Override if providing alternate Handler implementation.
252   */
253  protected Handler getHandler(final String name, final double handlerFailureThreshhold,
254      final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
255    return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
256  }
257
258  /**
259   * Start up our handlers.
260   */
261  protected void startHandlers(final String nameSuffix, final int numHandlers,
262      final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
263      final int port, final AtomicInteger activeHandlerCount) {
264    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
265    double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
266      HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
267      HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
268    for (int i = 0; i < numHandlers; i++) {
269      final int index = qindex + (i % qsize);
270      String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
271          + ",port=" + port;
272      Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
273        activeHandlerCount);
274      handler.start();
275      handlers.add(handler);
276    }
277    LOG.debug("Started handlerCount={} with threadPrefix={}, numCallQueues={}, port={}",
278        handlers.size(), threadPrefix, qsize, port);
279  }
280
281  /**
282   * Handler thread run the {@link CallRunner#run()} in.
283   */
284  protected class Handler extends Thread {
285    /**
286     * Q to find CallRunners to run in.
287     */
288    final BlockingQueue<CallRunner> q;
289
290    final double handlerFailureThreshhold;
291
292    // metrics (shared with other handlers)
293    final AtomicInteger activeHandlerCount;
294
295    Handler(final String name, final double handlerFailureThreshhold,
296        final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
297      super(name);
298      setDaemon(true);
299      this.q = q;
300      this.handlerFailureThreshhold = handlerFailureThreshhold;
301      this.activeHandlerCount = activeHandlerCount;
302    }
303
304    /**
305     * @return A {@link CallRunner}
306     * @throws InterruptedException
307     */
308    protected CallRunner getCallRunner() throws InterruptedException {
309      return this.q.take();
310    }
311
312    @Override
313    public void run() {
314      boolean interrupted = false;
315      try {
316        while (running) {
317          try {
318            run(getCallRunner());
319          } catch (InterruptedException e) {
320            interrupted = true;
321          }
322        }
323      } catch (Exception e) {
324        LOG.warn(e.toString(), e);
325        throw e;
326      } finally {
327        if (interrupted) {
328          Thread.currentThread().interrupt();
329        }
330      }
331    }
332
333    private void run(CallRunner cr) {
334      MonitoredRPCHandler status = RpcServer.getStatus();
335      cr.setStatus(status);
336      try {
337        this.activeHandlerCount.incrementAndGet();
338        cr.run();
339      } catch (Throwable e) {
340        if (e instanceof Error) {
341          int failedCount = failedHandlerCount.incrementAndGet();
342          if (this.handlerFailureThreshhold >= 0
343              && failedCount > handlerCount * this.handlerFailureThreshhold) {
344            String message = "Number of failed RpcServer handler runs exceeded threshhold "
345                + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
346            if (abortable != null) {
347              abortable.abort(message, e);
348            } else {
349              LOG.error("Error but can't abort because abortable is null: "
350                  + StringUtils.stringifyException(e));
351              throw e;
352            }
353          } else {
354            LOG.warn("Handler errors " + StringUtils.stringifyException(e));
355          }
356        } else {
357          LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
358        }
359      } finally {
360        this.activeHandlerCount.decrementAndGet();
361      }
362    }
363  }
364
365  public static abstract class QueueBalancer {
366    /**
367     * @return the index of the next queue to which a request should be inserted
368     */
369    public abstract int getNextQueue();
370  }
371
372  public static QueueBalancer getBalancer(int queueSize) {
373    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
374    if (queueSize == 1) {
375      return ONE_QUEUE;
376    } else {
377      return new RandomQueueBalancer(queueSize);
378    }
379  }
380
381  /**
382   * All requests go to the first queue, at index 0
383   */
384  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
385    @Override
386    public int getNextQueue() {
387      return 0;
388    }
389  };
390
391  /**
392   * Queue balancer that just randomly selects a queue in the range [0, num queues).
393   */
394  private static class RandomQueueBalancer extends QueueBalancer {
395    private final int queueSize;
396
397    public RandomQueueBalancer(int queueSize) {
398      this.queueSize = queueSize;
399    }
400
401    @Override
402    public int getNextQueue() {
403      return ThreadLocalRandom.current().nextInt(queueSize);
404    }
405  }
406
407  /**
408   * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
409   * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
410   * the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out)
411   * manner.
412   */
413  private static class CallPriorityComparator implements Comparator<CallRunner> {
414    private final static int DEFAULT_MAX_CALL_DELAY = 5000;
415
416    private final PriorityFunction priority;
417    private final int maxDelay;
418
419    public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
420      this.priority = priority;
421      this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
422    }
423
424    @Override
425    public int compare(CallRunner a, CallRunner b) {
426      RpcCall callA = a.getRpcCall();
427      RpcCall callB = b.getRpcCall();
428      long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam());
429      long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam());
430      deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay);
431      deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay);
432      return Long.compare(deadlineA, deadlineB);
433    }
434  }
435
436  public static boolean isDeadlineQueueType(final String callQueueType) {
437    return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
438  }
439
440  public static boolean isCodelQueueType(final String callQueueType) {
441    return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
442  }
443
444  public static boolean isFifoQueueType(final String callQueueType) {
445    return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
446  }
447
448  public long getNumGeneralCallsDropped() {
449    return numGeneralCallsDropped.longValue();
450  }
451
452  public long getNumLifoModeSwitches() {
453    return numLifoModeSwitches.longValue();
454  }
455
456  public int getActiveHandlerCount() {
457    return activeHandlerCount.get();
458  }
459
460  public int getActiveWriteHandlerCount() {
461    return 0;
462  }
463
464  public int getActiveReadHandlerCount() {
465    return 0;
466  }
467
468  public int getActiveScanHandlerCount() {
469    return 0;
470  }
471
472  /** Returns the length of the pending queue */
473  public int getQueueLength() {
474    int length = 0;
475    for (final BlockingQueue<CallRunner> queue: queues) {
476      length += queue.size();
477    }
478    return length;
479  }
480
481  public int getReadQueueLength() {
482    return 0;
483  }
484
485  public int getScanQueueLength() {
486    return 0;
487  }
488
489  public int getWriteQueueLength() {
490    return 0;
491  }
492
493  public String getName() {
494    return this.name;
495  }
496
497  /**
498   * Update current soft limit for executor's call queues
499   * @param conf updated configuration
500   */
501  public void resizeQueues(Configuration conf) {
502    String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
503    if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
504      configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
505    }
506    currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
507  }
508
509  public void onConfigurationChange(Configuration conf) {
510    // update CoDel Scheduler tunables
511    int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
512      CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
513    int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
514    double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
515      CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
516
517    for (BlockingQueue<CallRunner> queue : queues) {
518      if (queue instanceof AdaptiveLifoCoDelCallQueue) {
519        ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
520          codelLifoThreshold);
521      }
522    }
523  }
524}