001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Deque; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.ConcurrentLinkedDeque; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for ordering so 030 * a fast path skipping the queuing of Calls if an Handler is available, is possible. Just pass the 031 * Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling rather than let them 032 * go cold and lose context. Idea taken from Apace Kudu (incubating). See 033 * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h 034 */ 035@InterfaceAudience.Private 036public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { 037 // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! 038 039 /* 040 * Stack of Handlers waiting for work. 041 */ 042 private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>(); 043 044 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 045 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 046 final Abortable abortable) { 047 super(name, handlerCount, maxQueueLength, priority, conf, abortable); 048 } 049 050 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 051 final String callQueueType, final int maxQueueLength, final PriorityFunction priority, 052 final Configuration conf, final Abortable abortable) { 053 super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); 054 } 055 056 @Override 057 protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, 058 final int handlerCount, final BlockingQueue<CallRunner> q, 059 final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, 060 final Abortable abortable) { 061 return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, 062 activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack); 063 } 064 065 @Override 066 public boolean dispatch(CallRunner callTask) { 067 // FastPathHandlers don't check queue limits, so if we're completely shut down 068 // we have to prevent ourselves from using the handler in the first place 069 if (currentQueueLimit == 0) { 070 return false; 071 } 072 FastPathRpcHandler handler = popReadyHandler(); 073 return handler != null ? handler.loadCallRunner(callTask) : super.dispatch(callTask); 074 } 075 076 /** Returns Pop a Handler instance if one available ready-to-go or else return null. */ 077 private FastPathRpcHandler popReadyHandler() { 078 return this.fastPathHandlerStack.poll(); 079 } 080}