001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.net.InetSocketAddress; 024import java.nio.channels.ClosedChannelException; 025import org.apache.hadoop.hbase.CallDroppedException; 026import org.apache.hadoop.hbase.ExtendedCellScanner; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder; 032import org.apache.hadoop.hbase.trace.TraceUtil; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.yetus.audience.InterfaceStability; 038 039import org.apache.hbase.thirdparty.com.google.protobuf.Message; 040 041/** 042 * The request processing logic, which is usually executed in thread pools provided by an 043 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained RpcServer.Call 044 */ 045@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 046@InterfaceStability.Evolving 047public class CallRunner { 048 049 private static final CallDroppedException CALL_DROPPED_EXCEPTION = new CallDroppedException(); 050 051 private RpcCall call; 052 private RpcServerInterface rpcServer; 053 private MonitoredRPCHandler status; 054 private final Span span; 055 private volatile boolean successful; 056 057 /** 058 * On construction, adds the size of this call to the running count of outstanding call sizes. 059 * Presumption is that we are put on a queue while we wait on an executor to run us. During this 060 * time we occupy heap. 061 */ 062 // The constructor is shutdown so only RpcServer in this class can make one of these. 063 CallRunner(final RpcServerInterface rpcServer, final RpcCall call) { 064 this.call = call; 065 this.rpcServer = rpcServer; 066 this.span = Span.current(); 067 // Add size of the call to queue size. 068 if (call != null && rpcServer != null) { 069 this.rpcServer.addCallSize(call.getSize()); 070 } 071 } 072 073 public RpcCall getRpcCall() { 074 return call; 075 } 076 077 public void setStatus(MonitoredRPCHandler status) { 078 this.status = status; 079 } 080 081 /** 082 * Cleanup after ourselves... let go of references. 083 */ 084 private void cleanup() { 085 this.call.cleanup(); 086 this.call = null; 087 this.rpcServer = null; 088 } 089 090 public void run() { 091 try (Scope ignored = span.makeCurrent()) { 092 if (call.disconnectSince() >= 0) { 093 RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call); 094 span.addEvent("Client disconnect detected"); 095 span.setStatus(StatusCode.OK); 096 return; 097 } 098 call.setStartTime(EnvironmentEdgeManager.currentTime()); 099 if (call.getStartTime() > call.getDeadline()) { 100 RpcServer.LOG.warn("Dropping timed out call: {}", call); 101 this.rpcServer.getMetrics().callTimedOut(); 102 span.addEvent("Call deadline exceeded"); 103 span.setStatus(StatusCode.OK); 104 return; 105 } 106 this.status.setStatus("Setting up call"); 107 this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort()); 108 if (RpcServer.LOG.isTraceEnabled()) { 109 RpcServer.LOG.trace("{} executing as {}", call.toShortString(), 110 call.getRequestUser().map(User::getName).orElse("NULL principal")); 111 } 112 Throwable errorThrowable = null; 113 String error = null; 114 Pair<Message, ExtendedCellScanner> resultPair = null; 115 RpcServer.CurCall.set(call); 116 final Span ipcServerSpan = new IpcServerSpanBuilder(call).build(); 117 try (Scope ignored1 = ipcServerSpan.makeCurrent()) { 118 if (!this.rpcServer.isStarted()) { 119 InetSocketAddress address = rpcServer.getListenerAddress(); 120 throw new ServerNotRunningYetException( 121 "Server " + (address != null ? address : "(channel closed)") + " is not running yet"); 122 } 123 // make the call 124 resultPair = this.rpcServer.call(call, this.status); 125 } catch (TimeoutIOException e) { 126 RpcServer.LOG.warn("Can not complete this request in time, drop it: {}", call); 127 TraceUtil.setError(ipcServerSpan, e); 128 return; 129 } catch (Throwable e) { 130 TraceUtil.setError(ipcServerSpan, e); 131 if (e instanceof ServerNotRunningYetException) { 132 // If ServerNotRunningYetException, don't spew stack trace. 133 if (RpcServer.LOG.isTraceEnabled()) { 134 RpcServer.LOG.trace(call.toShortString(), e); 135 } 136 } else { 137 // Don't dump full exception.. just String version 138 RpcServer.LOG.debug("{}, exception={}", call.toShortString(), e); 139 } 140 errorThrowable = e; 141 error = StringUtils.stringifyException(e); 142 if (e instanceof Error) { 143 throw (Error) e; 144 } 145 } finally { 146 RpcServer.CurCall.set(null); 147 if (resultPair != null) { 148 this.rpcServer.addCallSize(call.getSize() * -1); 149 ipcServerSpan.setStatus(StatusCode.OK); 150 successful = true; 151 } 152 ipcServerSpan.end(); 153 } 154 this.status.markComplete("To send response"); 155 // return the RPC request read BB we can do here. It is done by now. 156 call.cleanup(); 157 // Set the response 158 Message param = resultPair != null ? resultPair.getFirst() : null; 159 ExtendedCellScanner cells = resultPair != null ? resultPair.getSecond() : null; 160 call.setResponse(param, cells, errorThrowable, error); 161 call.sendResponseIfReady(); 162 // don't touch `span` here because its status and `end()` are managed in `call#setResponse()` 163 } catch (OutOfMemoryError e) { 164 TraceUtil.setError(span, e); 165 if ( 166 this.rpcServer.getErrorHandler() != null && this.rpcServer.getErrorHandler().checkOOME(e) 167 ) { 168 RpcServer.LOG.info("{}: exiting on OutOfMemoryError", Thread.currentThread().getName()); 169 // exception intentionally swallowed 170 } else { 171 // rethrow if no handler 172 throw e; 173 } 174 } catch (ClosedChannelException cce) { 175 InetSocketAddress address = rpcServer.getListenerAddress(); 176 RpcServer.LOG.warn( 177 "{}: caught a ClosedChannelException, " + "this means that the server " 178 + (address != null ? address : "(channel closed)") 179 + " was processing a request but the client went away. The error message was: {}", 180 Thread.currentThread().getName(), cce.getMessage()); 181 TraceUtil.setError(span, cce); 182 } catch (Exception e) { 183 RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(), 184 StringUtils.stringifyException(e)); 185 TraceUtil.setError(span, e); 186 } finally { 187 if (!successful) { 188 this.rpcServer.addCallSize(call.getSize() * -1); 189 } 190 191 if (this.status.isRPCRunning()) { 192 this.status.markComplete("Call error"); 193 } 194 this.status.pause("Waiting for a call"); 195 cleanup(); 196 span.end(); 197 } 198 } 199 200 /** 201 * When we want to drop this call because of server is overloaded. 202 */ 203 public void drop() { 204 try (Scope ignored = span.makeCurrent()) { 205 if (call.disconnectSince() >= 0) { 206 RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call); 207 span.addEvent("Client disconnect detected"); 208 span.setStatus(StatusCode.OK); 209 return; 210 } 211 212 // Set the response 213 InetSocketAddress address = rpcServer.getListenerAddress(); 214 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server " 215 + (address != null ? address : "(channel closed)") + " is overloaded, please retry."); 216 TraceUtil.setError(span, CALL_DROPPED_EXCEPTION); 217 call.sendResponseIfReady(); 218 this.rpcServer.getMetrics().exception(CALL_DROPPED_EXCEPTION); 219 } catch (ClosedChannelException cce) { 220 InetSocketAddress address = rpcServer.getListenerAddress(); 221 RpcServer.LOG.warn( 222 "{}: caught a ClosedChannelException, " + "this means that the server " 223 + (address != null ? address : "(channel closed)") 224 + " was processing a request but the client went away. The error message was: {}", 225 Thread.currentThread().getName(), cce.getMessage()); 226 TraceUtil.setError(span, cce); 227 } catch (Exception e) { 228 RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(), 229 StringUtils.stringifyException(e)); 230 TraceUtil.setError(span, e); 231 } finally { 232 if (!successful) { 233 this.rpcServer.addCallSize(call.getSize() * -1); 234 } 235 cleanup(); 236 span.end(); 237 } 238 } 239}