001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Deque; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.ConcurrentLinkedDeque; 023import java.util.concurrent.Semaphore; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for 032 * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. 033 * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling 034 * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See 035 * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h 036 */ 037@InterfaceAudience.Private 038public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { 039 // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! 040 041 /* 042 * Stack of Handlers waiting for work. 043 */ 044 private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>(); 045 046 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 047 final int maxQueueLength, final PriorityFunction priority, final Configuration conf, 048 final Abortable abortable) { 049 super(name, handlerCount, maxQueueLength, priority, conf, abortable); 050 } 051 052 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, 053 final String callQueueType, final int maxQueueLength, final PriorityFunction priority, 054 final Configuration conf, final Abortable abortable) { 055 super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); 056 } 057 058 @Override 059 protected Handler getHandler(String name, double handlerFailureThreshhold, 060 BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) { 061 return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount, 062 fastPathHandlerStack); 063 } 064 065 @Override 066 public boolean dispatch(CallRunner callTask) throws InterruptedException { 067 FastPathHandler handler = popReadyHandler(); 068 return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); 069 } 070 071 /** 072 * @return Pop a Handler instance if one available ready-to-go or else return null. 073 */ 074 private FastPathHandler popReadyHandler() { 075 return this.fastPathHandlerStack.poll(); 076 } 077 078 class FastPathHandler extends Handler { 079 // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque 080 // if an empty queue of CallRunners so we are available for direct handoff when one comes in. 081 final Deque<FastPathHandler> fastPathHandlerStack; 082 // Semaphore to coordinate loading of fastpathed loadedTask and our running it. 083 // UNFAIR synchronization. 084 private Semaphore semaphore = new Semaphore(0); 085 // The task we get when fast-pathing. 086 private CallRunner loadedCallRunner; 087 088 FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q, 089 final AtomicInteger activeHandlerCount, 090 final Deque<FastPathHandler> fastPathHandlerStack) { 091 super(name, handlerFailureThreshhold, q, activeHandlerCount); 092 this.fastPathHandlerStack = fastPathHandlerStack; 093 } 094 095 @Override 096 protected CallRunner getCallRunner() throws InterruptedException { 097 // Get a callrunner if one in the Q. 098 CallRunner cr = this.q.poll(); 099 if (cr == null) { 100 // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for 101 // the fastpath handoff done via fastPathHandlerStack. 102 if (this.fastPathHandlerStack != null) { 103 this.fastPathHandlerStack.push(this); 104 this.semaphore.acquire(); 105 cr = this.loadedCallRunner; 106 this.loadedCallRunner = null; 107 } else { 108 // No fastpath available. Block until a task comes available. 109 cr = super.getCallRunner(); 110 } 111 } 112 return cr; 113 } 114 115 /** 116 * @param cr Task gotten via fastpath. 117 * @return True if we successfully loaded our task 118 */ 119 boolean loadCallRunner(final CallRunner cr) { 120 this.loadedCallRunner = cr; 121 this.semaphore.release(); 122 return true; 123 } 124 } 125}