001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.atomic.AtomicInteger; 022import org.apache.hadoop.hbase.Abortable; 023import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 024import org.apache.hadoop.util.StringUtils; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Thread to handle rpc call. Should only be used in {@link RpcExecutor} and its sub-classes. 031 */ 032@InterfaceAudience.Private 033public class RpcHandler extends Thread { 034 private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class); 035 036 /** 037 * Q to find CallRunners to run in. 038 */ 039 final BlockingQueue<CallRunner> q; 040 041 final int handlerCount; 042 final double handlerFailureThreshhold; 043 044 // metrics (shared with other handlers) 045 final AtomicInteger activeHandlerCount; 046 final AtomicInteger failedHandlerCount; 047 048 // The up-level RpcServer. 049 final Abortable abortable; 050 051 private boolean running; 052 053 RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount, 054 final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount, 055 final AtomicInteger failedHandlerCount, final Abortable abortable) { 056 super(name); 057 setDaemon(true); 058 this.q = q; 059 this.handlerFailureThreshhold = handlerFailureThreshhold; 060 this.activeHandlerCount = activeHandlerCount; 061 this.failedHandlerCount = failedHandlerCount; 062 this.handlerCount = handlerCount; 063 this.abortable = abortable; 064 } 065 066 /** Returns A {@link CallRunner} n */ 067 protected CallRunner getCallRunner() throws InterruptedException { 068 return this.q.take(); 069 } 070 071 public void stopRunning() { 072 running = false; 073 } 074 075 @Override 076 public void run() { 077 boolean interrupted = false; 078 running = true; 079 try { 080 while (running) { 081 try { 082 run(getCallRunner()); 083 } catch (InterruptedException e) { 084 interrupted = true; 085 } 086 } 087 } catch (Exception e) { 088 LOG.warn(e.toString(), e); 089 throw e; 090 } finally { 091 if (interrupted) { 092 Thread.currentThread().interrupt(); 093 } 094 } 095 } 096 097 private void run(CallRunner cr) { 098 MonitoredRPCHandler status = RpcServer.getStatus(); 099 cr.setStatus(status); 100 try { 101 this.activeHandlerCount.incrementAndGet(); 102 cr.run(); 103 } catch (Throwable e) { 104 if (e instanceof Error) { 105 int failedCount = failedHandlerCount.incrementAndGet(); 106 if ( 107 this.handlerFailureThreshhold >= 0 108 && failedCount > handlerCount * this.handlerFailureThreshhold 109 ) { 110 String message = "Number of failed RpcServer handler runs exceeded threshhold " 111 + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); 112 if (abortable != null) { 113 abortable.abort(message, e); 114 } else { 115 LOG.error("Error but can't abort because abortable is null: " 116 + StringUtils.stringifyException(e)); 117 throw e; 118 } 119 } else { 120 LOG.warn("Handler errors " + StringUtils.stringifyException(e)); 121 } 122 } else { 123 LOG.warn("Handler exception " + StringUtils.stringifyException(e)); 124 } 125 } finally { 126 this.activeHandlerCount.decrementAndGet(); 127 } 128 } 129}