001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.net.InetSocketAddress; 021import java.nio.channels.ClosedChannelException; 022import java.util.Optional; 023 024import org.apache.hadoop.hbase.CallDroppedException; 025import org.apache.hadoop.hbase.CellScanner; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.trace.TraceUtil; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 031import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.htrace.core.TraceScope; 037 038/** 039 * The request processing logic, which is usually executed in thread pools provided by an 040 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained 041 * RpcServer.Call 042 */ 043@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 044@InterfaceStability.Evolving 045public class CallRunner { 046 047 private static final CallDroppedException CALL_DROPPED_EXCEPTION 048 = new CallDroppedException(); 049 050 private RpcCall call; 051 private RpcServerInterface rpcServer; 052 private MonitoredRPCHandler status; 053 private volatile boolean sucessful; 054 055 /** 056 * On construction, adds the size of this call to the running count of outstanding call sizes. 057 * Presumption is that we are put on a queue while we wait on an executor to run us. During this 058 * time we occupy heap. 059 */ 060 // The constructor is shutdown so only RpcServer in this class can make one of these. 061 CallRunner(final RpcServerInterface rpcServer, final RpcCall call) { 062 this.call = call; 063 this.rpcServer = rpcServer; 064 // Add size of the call to queue size. 065 if (call != null && rpcServer != null) { 066 this.rpcServer.addCallSize(call.getSize()); 067 } 068 } 069 070 public RpcCall getRpcCall() { 071 return call; 072 } 073 074 /** 075 * Keep for backward compatibility. 076 * @deprecated As of release 2.0, this will be removed in HBase 3.0 077 */ 078 @Deprecated 079 public ServerCall<?> getCall() { 080 return (ServerCall<?>) call; 081 } 082 083 public void setStatus(MonitoredRPCHandler status) { 084 this.status = status; 085 } 086 087 /** 088 * Cleanup after ourselves... let go of references. 089 */ 090 private void cleanup() { 091 this.call.cleanup(); 092 this.call = null; 093 this.rpcServer = null; 094 } 095 096 public void run() { 097 try { 098 if (call.disconnectSince() >= 0) { 099 if (RpcServer.LOG.isDebugEnabled()) { 100 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); 101 } 102 return; 103 } 104 call.setStartTime(System.currentTimeMillis()); 105 if (call.getStartTime() > call.getDeadline()) { 106 RpcServer.LOG.warn("Dropping timed out call: " + call); 107 return; 108 } 109 this.status.setStatus("Setting up call"); 110 this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort()); 111 if (RpcServer.LOG.isTraceEnabled()) { 112 Optional<User> remoteUser = call.getRequestUser(); 113 RpcServer.LOG.trace(call.toShortString() + " executing as " + 114 (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal")); 115 } 116 Throwable errorThrowable = null; 117 String error = null; 118 Pair<Message, CellScanner> resultPair = null; 119 RpcServer.CurCall.set(call); 120 TraceScope traceScope = null; 121 try { 122 if (!this.rpcServer.isStarted()) { 123 InetSocketAddress address = rpcServer.getListenerAddress(); 124 throw new ServerNotRunningYetException("Server " + 125 (address != null ? address : "(channel closed)") + " is not running yet"); 126 } 127 String serviceName = 128 call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; 129 String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; 130 String traceString = serviceName + "." + methodName; 131 traceScope = TraceUtil.createTrace(traceString); 132 // make the call 133 resultPair = this.rpcServer.call(call, this.status); 134 } catch (TimeoutIOException e){ 135 RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); 136 return; 137 } catch (Throwable e) { 138 if (e instanceof ServerNotRunningYetException) { 139 // If ServerNotRunningYetException, don't spew stack trace. 140 if (RpcServer.LOG.isTraceEnabled()) { 141 RpcServer.LOG.trace(call.toShortString(), e); 142 } 143 } else { 144 // Don't dump full exception.. just String version 145 RpcServer.LOG.debug(call.toShortString() + ", exception=" + e); 146 } 147 errorThrowable = e; 148 error = StringUtils.stringifyException(e); 149 if (e instanceof Error) { 150 throw (Error)e; 151 } 152 } finally { 153 if (traceScope != null) { 154 traceScope.close(); 155 } 156 RpcServer.CurCall.set(null); 157 if (resultPair != null) { 158 this.rpcServer.addCallSize(call.getSize() * -1); 159 sucessful = true; 160 } 161 } 162 // return back the RPC request read BB we can do here. It is done by now. 163 call.cleanup(); 164 // Set the response 165 Message param = resultPair != null ? resultPair.getFirst() : null; 166 CellScanner cells = resultPair != null ? resultPair.getSecond() : null; 167 call.setResponse(param, cells, errorThrowable, error); 168 call.sendResponseIfReady(); 169 this.status.markComplete("Sent response"); 170 this.status.pause("Waiting for a call"); 171 } catch (OutOfMemoryError e) { 172 if (this.rpcServer.getErrorHandler() != null) { 173 if (this.rpcServer.getErrorHandler().checkOOME(e)) { 174 RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); 175 return; 176 } 177 } else { 178 // rethrow if no handler 179 throw e; 180 } 181 } catch (ClosedChannelException cce) { 182 InetSocketAddress address = rpcServer.getListenerAddress(); 183 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + 184 "this means that the server " + (address != null ? address : "(channel closed)") + 185 " was processing a request but the client went away. The error message was: " + 186 cce.getMessage()); 187 } catch (Exception e) { 188 RpcServer.LOG.warn(Thread.currentThread().getName() 189 + ": caught: " + StringUtils.stringifyException(e)); 190 } finally { 191 if (!sucessful) { 192 this.rpcServer.addCallSize(call.getSize() * -1); 193 } 194 cleanup(); 195 } 196 } 197 198 /** 199 * When we want to drop this call because of server is overloaded. 200 */ 201 public void drop() { 202 try { 203 if (call.disconnectSince() >= 0) { 204 if (RpcServer.LOG.isDebugEnabled()) { 205 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); 206 } 207 return; 208 } 209 210 // Set the response 211 InetSocketAddress address = rpcServer.getListenerAddress(); 212 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server " 213 + (address != null ? address : "(channel closed)") + " is overloaded, please retry."); 214 call.sendResponseIfReady(); 215 } catch (ClosedChannelException cce) { 216 InetSocketAddress address = rpcServer.getListenerAddress(); 217 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + 218 "this means that the server " + (address != null ? address : "(channel closed)") + 219 " was processing a request but the client went away. The error message was: " + 220 cce.getMessage()); 221 } catch (Exception e) { 222 RpcServer.LOG.warn(Thread.currentThread().getName() 223 + ": caught: " + StringUtils.stringifyException(e)); 224 } finally { 225 if (!sucessful) { 226 this.rpcServer.addCallSize(call.getSize() * -1); 227 } 228 cleanup(); 229 } 230 } 231}