001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.net.InetSocketAddress; 021import java.nio.channels.ClosedChannelException; 022import java.util.Optional; 023 024import org.apache.hadoop.hbase.CallDroppedException; 025import org.apache.hadoop.hbase.CellScanner; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.trace.TraceUtil; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 031import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.htrace.core.TraceScope; 037 038/** 039 * The request processing logic, which is usually executed in thread pools provided by an 040 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained 041 * RpcServer.Call 042 */ 043@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 044@InterfaceStability.Evolving 045public class CallRunner { 046 047 private static final CallDroppedException CALL_DROPPED_EXCEPTION 048 = new CallDroppedException(); 049 050 private RpcCall call; 051 private RpcServerInterface rpcServer; 052 private MonitoredRPCHandler status; 053 private volatile boolean sucessful; 054 055 /** 056 * On construction, adds the size of this call to the running count of outstanding call sizes. 057 * Presumption is that we are put on a queue while we wait on an executor to run us. During this 058 * time we occupy heap. 059 */ 060 // The constructor is shutdown so only RpcServer in this class can make one of these. 061 CallRunner(final RpcServerInterface rpcServer, final RpcCall call) { 062 this.call = call; 063 this.rpcServer = rpcServer; 064 // Add size of the call to queue size. 065 if (call != null && rpcServer != null) { 066 this.rpcServer.addCallSize(call.getSize()); 067 } 068 } 069 070 public RpcCall getRpcCall() { 071 return call; 072 } 073 074 /** 075 * Keep for backward compatibility. 076 * @deprecated As of release 2.0, this will be removed in HBase 3.0 077 */ 078 @Deprecated 079 public ServerCall<?> getCall() { 080 return (ServerCall<?>) call; 081 } 082 083 public void setStatus(MonitoredRPCHandler status) { 084 this.status = status; 085 } 086 087 /** 088 * Cleanup after ourselves... let go of references. 089 */ 090 private void cleanup() { 091 this.call = null; 092 this.rpcServer = null; 093 } 094 095 public void run() { 096 try { 097 if (call.disconnectSince() >= 0) { 098 if (RpcServer.LOG.isDebugEnabled()) { 099 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); 100 } 101 return; 102 } 103 call.setStartTime(System.currentTimeMillis()); 104 if (call.getStartTime() > call.getDeadline()) { 105 RpcServer.LOG.warn("Dropping timed out call: " + call); 106 return; 107 } 108 this.status.setStatus("Setting up call"); 109 this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort()); 110 if (RpcServer.LOG.isTraceEnabled()) { 111 Optional<User> remoteUser = call.getRequestUser(); 112 RpcServer.LOG.trace(call.toShortString() + " executing as " + 113 (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal")); 114 } 115 Throwable errorThrowable = null; 116 String error = null; 117 Pair<Message, CellScanner> resultPair = null; 118 RpcServer.CurCall.set(call); 119 TraceScope traceScope = null; 120 try { 121 if (!this.rpcServer.isStarted()) { 122 InetSocketAddress address = rpcServer.getListenerAddress(); 123 throw new ServerNotRunningYetException("Server " + 124 (address != null ? address : "(channel closed)") + " is not running yet"); 125 } 126 String serviceName = 127 call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; 128 String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; 129 String traceString = serviceName + "." + methodName; 130 traceScope = TraceUtil.createTrace(traceString); 131 // make the call 132 resultPair = this.rpcServer.call(call, this.status); 133 } catch (TimeoutIOException e){ 134 RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); 135 return; 136 } catch (Throwable e) { 137 if (e instanceof ServerNotRunningYetException) { 138 // If ServerNotRunningYetException, don't spew stack trace. 139 if (RpcServer.LOG.isTraceEnabled()) { 140 RpcServer.LOG.trace(call.toShortString(), e); 141 } 142 } else { 143 // Don't dump full exception.. just String version 144 RpcServer.LOG.debug(call.toShortString() + ", exception=" + e); 145 } 146 errorThrowable = e; 147 error = StringUtils.stringifyException(e); 148 if (e instanceof Error) { 149 throw (Error)e; 150 } 151 } finally { 152 if (traceScope != null) { 153 traceScope.close(); 154 } 155 RpcServer.CurCall.set(null); 156 if (resultPair != null) { 157 this.rpcServer.addCallSize(call.getSize() * -1); 158 sucessful = true; 159 } 160 } 161 // return back the RPC request read BB we can do here. It is done by now. 162 call.cleanup(); 163 // Set the response 164 Message param = resultPair != null ? resultPair.getFirst() : null; 165 CellScanner cells = resultPair != null ? resultPair.getSecond() : null; 166 call.setResponse(param, cells, errorThrowable, error); 167 call.sendResponseIfReady(); 168 this.status.markComplete("Sent response"); 169 this.status.pause("Waiting for a call"); 170 } catch (OutOfMemoryError e) { 171 if (this.rpcServer.getErrorHandler() != null) { 172 if (this.rpcServer.getErrorHandler().checkOOME(e)) { 173 RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); 174 return; 175 } 176 } else { 177 // rethrow if no handler 178 throw e; 179 } 180 } catch (ClosedChannelException cce) { 181 InetSocketAddress address = rpcServer.getListenerAddress(); 182 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + 183 "this means that the server " + (address != null ? address : "(channel closed)") + 184 " was processing a request but the client went away. The error message was: " + 185 cce.getMessage()); 186 } catch (Exception e) { 187 RpcServer.LOG.warn(Thread.currentThread().getName() 188 + ": caught: " + StringUtils.stringifyException(e)); 189 } finally { 190 if (!sucessful) { 191 this.rpcServer.addCallSize(call.getSize() * -1); 192 } 193 cleanup(); 194 } 195 } 196 197 /** 198 * When we want to drop this call because of server is overloaded. 199 */ 200 public void drop() { 201 try { 202 if (call.disconnectSince() >= 0) { 203 if (RpcServer.LOG.isDebugEnabled()) { 204 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); 205 } 206 return; 207 } 208 209 // Set the response 210 InetSocketAddress address = rpcServer.getListenerAddress(); 211 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server " 212 + (address != null ? address : "(channel closed)") + " is overloaded, please retry."); 213 call.sendResponseIfReady(); 214 } catch (ClosedChannelException cce) { 215 InetSocketAddress address = rpcServer.getListenerAddress(); 216 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + 217 "this means that the server " + (address != null ? address : "(channel closed)") + 218 " was processing a request but the client went away. The error message was: " + 219 cce.getMessage()); 220 } catch (Exception e) { 221 RpcServer.LOG.warn(Thread.currentThread().getName() 222 + ": caught: " + StringUtils.stringifyException(e)); 223 } finally { 224 if (!sucessful) { 225 this.rpcServer.addCallSize(call.getSize() * -1); 226 } 227 cleanup(); 228 } 229 } 230}