001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Deque;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.Semaphore;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.hbase.Abortable;
025import org.apache.yetus.audience.InterfaceAudience;
026
027@InterfaceAudience.Private
028public class FastPathRpcHandler extends RpcHandler {
029  // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
030  // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
031  final Deque<FastPathRpcHandler> fastPathHandlerStack;
032  // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
033  // UNFAIR synchronization.
034  private Semaphore semaphore = new Semaphore(0);
035  // The task we get when fast-pathing.
036  private CallRunner loadedCallRunner;
037
038  FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
039    BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount, AtomicInteger failedHandlerCount,
040    final Abortable abortable, final Deque<FastPathRpcHandler> fastPathHandlerStack) {
041    super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
042      abortable);
043    this.fastPathHandlerStack = fastPathHandlerStack;
044  }
045
046  @Override
047  protected CallRunner getCallRunner() throws InterruptedException {
048    // Get a callrunner if one in the Q.
049    CallRunner cr = this.q.poll();
050    if (cr == null) {
051      // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
052      // the fastpath handoff done via fastPathHandlerStack.
053      if (this.fastPathHandlerStack != null) {
054        this.fastPathHandlerStack.push(this);
055        this.semaphore.acquire();
056        cr = this.loadedCallRunner;
057        this.loadedCallRunner = null;
058      } else {
059        // No fastpath available. Block until a task comes available.
060        cr = super.getCallRunner();
061      }
062    }
063    return cr;
064  }
065
066  /**
067   * @param cr Task gotten via fastpath.
068   * @return True if we successfully loaded our task
069   */
070  boolean loadCallRunner(final CallRunner cr) {
071    this.loadedCallRunner = cr;
072    this.semaphore.release();
073    return true;
074  }
075}