001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.atomic.AtomicInteger;
022import org.apache.hadoop.hbase.Abortable;
023import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
024import org.apache.hadoop.util.StringUtils;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Thread to handle rpc call. Should only be used in {@link RpcExecutor} and its sub-classes.
031 */
032@InterfaceAudience.Private
033public class RpcHandler extends Thread {
034  private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
035
036  /**
037   * Q to find CallRunners to run in.
038   */
039  final BlockingQueue<CallRunner> q;
040
041  final int handlerCount;
042  final double handlerFailureThreshhold;
043
044  // metrics (shared with other handlers)
045  final AtomicInteger activeHandlerCount;
046  final AtomicInteger failedHandlerCount;
047
048  // The up-level RpcServer.
049  final Abortable abortable;
050
051  private boolean running;
052
053  RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
054    final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
055    final AtomicInteger failedHandlerCount, final Abortable abortable) {
056    super(name);
057    setDaemon(true);
058    this.q = q;
059    this.handlerFailureThreshhold = handlerFailureThreshhold;
060    this.activeHandlerCount = activeHandlerCount;
061    this.failedHandlerCount = failedHandlerCount;
062    this.handlerCount = handlerCount;
063    this.abortable = abortable;
064  }
065
066  /** Returns A {@link CallRunner} n */
067  protected CallRunner getCallRunner() throws InterruptedException {
068    return this.q.take();
069  }
070
071  public void stopRunning() {
072    running = false;
073  }
074
075  @Override
076  public void run() {
077    boolean interrupted = false;
078    running = true;
079    try {
080      while (running) {
081        try {
082          run(getCallRunner());
083        } catch (InterruptedException e) {
084          interrupted = true;
085        }
086      }
087    } catch (Exception e) {
088      LOG.warn(e.toString(), e);
089      throw e;
090    } finally {
091      if (interrupted) {
092        Thread.currentThread().interrupt();
093      }
094    }
095  }
096
097  private void run(CallRunner cr) {
098    MonitoredRPCHandler status = RpcServer.getStatus();
099    cr.setStatus(status);
100    try {
101      this.activeHandlerCount.incrementAndGet();
102      cr.run();
103    } catch (Throwable e) {
104      if (e instanceof Error) {
105        int failedCount = failedHandlerCount.incrementAndGet();
106        if (
107          this.handlerFailureThreshhold >= 0
108            && failedCount > handlerCount * this.handlerFailureThreshhold
109        ) {
110          String message = "Number of failed RpcServer handler runs exceeded threshhold "
111            + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
112          if (abortable != null) {
113            abortable.abort(message, e);
114          } else {
115            LOG.error("Error but can't abort because abortable is null: "
116              + StringUtils.stringifyException(e));
117            throw e;
118          }
119        } else {
120          LOG.warn("Handler errors " + StringUtils.stringifyException(e));
121        }
122      } else {
123        LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
124      }
125    } finally {
126      this.activeHandlerCount.decrementAndGet();
127    }
128  }
129}