001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Deque;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.ConcurrentLinkedDeque;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Abortable;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.yetus.audience.InterfaceStability;
029
030/**
031 * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
032 * {@link FastPathBalancedQueueRpcExecutor}.
033 */
034@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
035@InterfaceStability.Evolving
036public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
037
038  private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
039  private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
040  private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
041
042  public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
043    PriorityFunction priority, Configuration conf, Abortable abortable) {
044    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
045  }
046
047  @Override
048  protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
049    final int handlerCount, final BlockingQueue<CallRunner> q,
050    final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
051    final Abortable abortable) {
052    Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack
053      : name.contains("write") ? writeHandlerStack
054      : scanHandlerStack;
055    return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
056      activeHandlerCount, failedHandlerCount, abortable, handlerStack);
057  }
058
059  @Override
060  public boolean dispatch(final CallRunner callTask) {
061    RpcCall call = callTask.getRpcCall();
062    boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
063    boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
064    FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll()
065      : shouldDispatchToScanQueue ? scanHandlerStack.poll()
066      : readHandlerStack.poll();
067    return handler != null
068      ? handler.loadCallRunner(callTask)
069      : dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
070  }
071}