001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Deque; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.ConcurrentLinkedDeque; 023import java.util.concurrent.Semaphore; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for 032 * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. 033 * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling 034 * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See 035 * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h 036 */ 037@InterfaceAudience.Private 038public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { 039 // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! 040 041 /* 042 * Stack of Handlers waiting for work. 043 */ 044 private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>(); 045 046 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 047 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 048 final Abortable abortable) { 049 super(name, handlerCount, maxQueueLength, priority, conf, abortable); 050 } 051 052 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 053 final String callQueueType, final int maxQueueLength, final PriorityFunction priority, 054 final Configuration conf, final Abortable abortable) { 055 super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); 056 } 057 058 @Override 059 protected Handler getHandler(String name, double handlerFailureThreshhold, 060 BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) { 061 return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount, 062 fastPathHandlerStack); 063 } 064 065 @Override 066 public boolean dispatch(CallRunner callTask) throws InterruptedException { 067 //FastPathHandlers don't check queue limits, so if we're completely shut down 068 //we have to prevent ourselves from using the handler in the first place 069 if (currentQueueLimit == 0){ 070 return false; 071 } 072 FastPathHandler handler = popReadyHandler(); 073 return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); 074 } 075 076 /** 077 * @return Pop a Handler instance if one available ready-to-go or else return null. 078 */ 079 private FastPathHandler popReadyHandler() { 080 return this.fastPathHandlerStack.poll(); 081 } 082 083 class FastPathHandler extends Handler { 084 // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque 085 // if an empty queue of CallRunners so we are available for direct handoff when one comes in. 086 final Deque<FastPathHandler> fastPathHandlerStack; 087 // Semaphore to coordinate loading of fastpathed loadedTask and our running it. 088 // UNFAIR synchronization. 089 private Semaphore semaphore = new Semaphore(0); 090 // The task we get when fast-pathing. 091 private CallRunner loadedCallRunner; 092 093 FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q, 094 final AtomicInteger activeHandlerCount, 095 final Deque<FastPathHandler> fastPathHandlerStack) { 096 super(name, handlerFailureThreshhold, q, activeHandlerCount); 097 this.fastPathHandlerStack = fastPathHandlerStack; 098 } 099 100 @Override 101 protected CallRunner getCallRunner() throws InterruptedException { 102 // Get a callrunner if one in the Q. 103 CallRunner cr = this.q.poll(); 104 if (cr == null) { 105 // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for 106 // the fastpath handoff done via fastPathHandlerStack. 107 if (this.fastPathHandlerStack != null) { 108 this.fastPathHandlerStack.push(this); 109 this.semaphore.acquire(); 110 cr = this.loadedCallRunner; 111 this.loadedCallRunner = null; 112 } else { 113 // No fastpath available. Block until a task comes available. 114 cr = super.getCallRunner(); 115 } 116 } 117 return cr; 118 } 119 120 /** 121 * @param cr Task gotten via fastpath. 122 * @return True if we successfully loaded our task 123 */ 124 boolean loadCallRunner(final CallRunner cr) { 125 this.loadedCallRunner = cr; 126 this.semaphore.release(); 127 return true; 128 } 129 } 130}