001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.net.InetSocketAddress;
024import java.nio.channels.ClosedChannelException;
025import org.apache.hadoop.hbase.CallDroppedException;
026import org.apache.hadoop.hbase.CellScanner;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder;
032import org.apache.hadoop.hbase.trace.TraceUtil;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.Message;
040
041/**
042 * The request processing logic, which is usually executed in thread pools provided by an
043 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained RpcServer.Call
044 */
045@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
046@InterfaceStability.Evolving
047public class CallRunner {
048
049  private static final CallDroppedException CALL_DROPPED_EXCEPTION = new CallDroppedException();
050
051  private RpcCall call;
052  private RpcServerInterface rpcServer;
053  private MonitoredRPCHandler status;
054  private final Span span;
055  private volatile boolean successful;
056
057  /**
058   * On construction, adds the size of this call to the running count of outstanding call sizes.
059   * Presumption is that we are put on a queue while we wait on an executor to run us. During this
060   * time we occupy heap.
061   */
062  // The constructor is shutdown so only RpcServer in this class can make one of these.
063  CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
064    this.call = call;
065    this.rpcServer = rpcServer;
066    this.span = Span.current();
067    // Add size of the call to queue size.
068    if (call != null && rpcServer != null) {
069      this.rpcServer.addCallSize(call.getSize());
070    }
071  }
072
073  public RpcCall getRpcCall() {
074    return call;
075  }
076
077  public void setStatus(MonitoredRPCHandler status) {
078    this.status = status;
079  }
080
081  /**
082   * Cleanup after ourselves... let go of references.
083   */
084  private void cleanup() {
085    this.call.cleanup();
086    this.call = null;
087    this.rpcServer = null;
088  }
089
090  public void run() {
091    try (Scope ignored = span.makeCurrent()) {
092      if (call.disconnectSince() >= 0) {
093        RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call);
094        span.addEvent("Client disconnect detected");
095        span.setStatus(StatusCode.OK);
096        return;
097      }
098      call.setStartTime(EnvironmentEdgeManager.currentTime());
099      if (call.getStartTime() > call.getDeadline()) {
100        RpcServer.LOG.warn("Dropping timed out call: {}", call);
101        this.rpcServer.getMetrics().callTimedOut();
102        span.addEvent("Call deadline exceeded");
103        span.setStatus(StatusCode.OK);
104        return;
105      }
106      this.status.setStatus("Setting up call");
107      this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
108      if (RpcServer.LOG.isTraceEnabled()) {
109        RpcServer.LOG.trace("{} executing as {}", call.toShortString(),
110          call.getRequestUser().map(User::getName).orElse("NULL principal"));
111      }
112      Throwable errorThrowable = null;
113      String error = null;
114      Pair<Message, CellScanner> resultPair = null;
115      RpcServer.CurCall.set(call);
116      final Span ipcServerSpan = new IpcServerSpanBuilder(call).build();
117      try (Scope ignored1 = ipcServerSpan.makeCurrent()) {
118        if (!this.rpcServer.isStarted()) {
119          InetSocketAddress address = rpcServer.getListenerAddress();
120          throw new ServerNotRunningYetException(
121            "Server " + (address != null ? address : "(channel closed)") + " is not running yet");
122        }
123        // make the call
124        resultPair = this.rpcServer.call(call, this.status);
125      } catch (TimeoutIOException e) {
126        RpcServer.LOG.warn("Can not complete this request in time, drop it: {}", call);
127        TraceUtil.setError(ipcServerSpan, e);
128        return;
129      } catch (Throwable e) {
130        TraceUtil.setError(ipcServerSpan, e);
131        if (e instanceof ServerNotRunningYetException) {
132          // If ServerNotRunningYetException, don't spew stack trace.
133          if (RpcServer.LOG.isTraceEnabled()) {
134            RpcServer.LOG.trace(call.toShortString(), e);
135          }
136        } else {
137          // Don't dump full exception.. just String version
138          RpcServer.LOG.debug("{}, exception={}", call.toShortString(), e);
139        }
140        errorThrowable = e;
141        error = StringUtils.stringifyException(e);
142        if (e instanceof Error) {
143          throw (Error) e;
144        }
145      } finally {
146        RpcServer.CurCall.set(null);
147        if (resultPair != null) {
148          this.rpcServer.addCallSize(call.getSize() * -1);
149          ipcServerSpan.setStatus(StatusCode.OK);
150          successful = true;
151        }
152        ipcServerSpan.end();
153      }
154      this.status.markComplete("To send response");
155      // return the RPC request read BB we can do here. It is done by now.
156      call.cleanup();
157      // Set the response
158      Message param = resultPair != null ? resultPair.getFirst() : null;
159      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
160      call.setResponse(param, cells, errorThrowable, error);
161      call.sendResponseIfReady();
162      // don't touch `span` here because its status and `end()` are managed in `call#setResponse()`
163    } catch (OutOfMemoryError e) {
164      TraceUtil.setError(span, e);
165      if (
166        this.rpcServer.getErrorHandler() != null && this.rpcServer.getErrorHandler().checkOOME(e)
167      ) {
168        RpcServer.LOG.info("{}: exiting on OutOfMemoryError", Thread.currentThread().getName());
169        // exception intentionally swallowed
170      } else {
171        // rethrow if no handler
172        throw e;
173      }
174    } catch (ClosedChannelException cce) {
175      InetSocketAddress address = rpcServer.getListenerAddress();
176      RpcServer.LOG.warn(
177        "{}: caught a ClosedChannelException, " + "this means that the server "
178          + (address != null ? address : "(channel closed)")
179          + " was processing a request but the client went away. The error message was: {}",
180        Thread.currentThread().getName(), cce.getMessage());
181      TraceUtil.setError(span, cce);
182    } catch (Exception e) {
183      RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(),
184        StringUtils.stringifyException(e));
185      TraceUtil.setError(span, e);
186    } finally {
187      if (!successful) {
188        this.rpcServer.addCallSize(call.getSize() * -1);
189      }
190
191      if (this.status.isRPCRunning()) {
192        this.status.markComplete("Call error");
193      }
194      this.status.pause("Waiting for a call");
195      cleanup();
196      span.end();
197    }
198  }
199
200  /**
201   * When we want to drop this call because of server is overloaded.
202   */
203  public void drop() {
204    try (Scope ignored = span.makeCurrent()) {
205      if (call.disconnectSince() >= 0) {
206        RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call);
207        span.addEvent("Client disconnect detected");
208        span.setStatus(StatusCode.OK);
209        return;
210      }
211
212      // Set the response
213      InetSocketAddress address = rpcServer.getListenerAddress();
214      call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
215        + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
216      TraceUtil.setError(span, CALL_DROPPED_EXCEPTION);
217      call.sendResponseIfReady();
218      this.rpcServer.getMetrics().exception(CALL_DROPPED_EXCEPTION);
219    } catch (ClosedChannelException cce) {
220      InetSocketAddress address = rpcServer.getListenerAddress();
221      RpcServer.LOG.warn(
222        "{}: caught a ClosedChannelException, " + "this means that the server "
223          + (address != null ? address : "(channel closed)")
224          + " was processing a request but the client went away. The error message was: {}",
225        Thread.currentThread().getName(), cce.getMessage());
226      TraceUtil.setError(span, cce);
227    } catch (Exception e) {
228      RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(),
229        StringUtils.stringifyException(e));
230      TraceUtil.setError(span, e);
231    } finally {
232      if (!successful) {
233        this.rpcServer.addCallSize(call.getSize() * -1);
234      }
235      cleanup();
236      span.end();
237    }
238  }
239}