001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Deque; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.ConcurrentLinkedDeque; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.yetus.audience.InterfaceStability; 029 030/** 031 * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in 032 * {@link FastPathBalancedQueueRpcExecutor}. 033 */ 034@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 035@InterfaceStability.Evolving 036public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { 037 038 private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>(); 039 private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>(); 040 private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>(); 041 042 public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, 043 PriorityFunction priority, Configuration conf, Abortable abortable) { 044 super(name, handlerCount, maxQueueLength, priority, conf, abortable); 045 } 046 047 @Override 048 protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, 049 final int handlerCount, final BlockingQueue<CallRunner> q, 050 final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, 051 final Abortable abortable) { 052 Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack 053 : name.contains("write") ? writeHandlerStack 054 : scanHandlerStack; 055 return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, 056 activeHandlerCount, failedHandlerCount, abortable, handlerStack); 057 } 058 059 @Override 060 public boolean dispatch(final CallRunner callTask) { 061 RpcCall call = callTask.getRpcCall(); 062 boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam()); 063 boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); 064 FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() 065 : shouldDispatchToScanQueue ? scanHandlerStack.poll() 066 : readHandlerStack.poll(); 067 return handler != null 068 ? handler.loadCallRunner(callTask) 069 : dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask); 070 } 071}