001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Deque; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.Semaphore; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.hbase.Abortable; 025import org.apache.yetus.audience.InterfaceAudience; 026 027@InterfaceAudience.Private 028public class FastPathRpcHandler extends RpcHandler { 029 // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque 030 // if an empty queue of CallRunners so we are available for direct handoff when one comes in. 031 final Deque<FastPathRpcHandler> fastPathHandlerStack; 032 // Semaphore to coordinate loading of fastpathed loadedTask and our running it. 033 // UNFAIR synchronization. 034 private Semaphore semaphore = new Semaphore(0); 035 // The task we get when fast-pathing. 036 private CallRunner loadedCallRunner; 037 038 FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount, 039 BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount, AtomicInteger failedHandlerCount, 040 final Abortable abortable, final Deque<FastPathRpcHandler> fastPathHandlerStack) { 041 super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount, 042 abortable); 043 this.fastPathHandlerStack = fastPathHandlerStack; 044 } 045 046 @Override 047 protected CallRunner getCallRunner() throws InterruptedException { 048 // Get a callrunner if one in the Q. 049 CallRunner cr = this.q.poll(); 050 if (cr == null) { 051 // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for 052 // the fastpath handoff done via fastPathHandlerStack. 053 if (this.fastPathHandlerStack != null) { 054 this.fastPathHandlerStack.push(this); 055 this.semaphore.acquire(); 056 cr = this.loadedCallRunner; 057 this.loadedCallRunner = null; 058 } else { 059 // No fastpath available. Block until a task comes available. 060 cr = super.getCallRunner(); 061 } 062 } 063 return cr; 064 } 065 066 /** 067 * @param cr Task gotten via fastpath. 068 * @return True if we successfully loaded our task 069 */ 070 boolean loadCallRunner(final CallRunner cr) { 071 this.loadedCallRunner = cr; 072 this.semaphore.release(); 073 return true; 074 } 075}