001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.net.InetSocketAddress;
021import java.nio.channels.ClosedChannelException;
022import java.util.Optional;
023
024import org.apache.hadoop.hbase.CallDroppedException;
025import org.apache.hadoop.hbase.CellScanner;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
031import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hbase.thirdparty.com.google.protobuf.Message;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.util.StringUtils;
036
037/**
038 * The request processing logic, which is usually executed in thread pools provided by an
039 * {@link RpcScheduler}.  Call {@link #run()} to actually execute the contained
040 * RpcServer.Call
041 */
042@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
043@InterfaceStability.Evolving
044public class CallRunner {
045
046  private static final CallDroppedException CALL_DROPPED_EXCEPTION
047    = new CallDroppedException();
048
049  private RpcCall call;
050  private RpcServerInterface rpcServer;
051  private MonitoredRPCHandler status;
052  private volatile boolean sucessful;
053
054  /**
055   * On construction, adds the size of this call to the running count of outstanding call sizes.
056   * Presumption is that we are put on a queue while we wait on an executor to run us.  During this
057   * time we occupy heap.
058   */
059  // The constructor is shutdown so only RpcServer in this class can make one of these.
060  CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
061    this.call = call;
062    this.rpcServer = rpcServer;
063    // Add size of the call to queue size.
064    if (call != null && rpcServer != null) {
065      this.rpcServer.addCallSize(call.getSize());
066    }
067  }
068
069  public RpcCall getRpcCall() {
070    return call;
071  }
072
073  /**
074   * Keep for backward compatibility.
075   * @deprecated As of release 2.0, this will be removed in HBase 3.0
076   */
077  @Deprecated
078  public ServerCall<?> getCall() {
079    return (ServerCall<?>) call;
080  }
081
082  public void setStatus(MonitoredRPCHandler status) {
083    this.status = status;
084  }
085
086  /**
087   * Cleanup after ourselves... let go of references.
088   */
089  private void cleanup() {
090    this.call = null;
091    this.rpcServer = null;
092  }
093
094  public void run() {
095    try {
096      if (call.disconnectSince() >= 0) {
097        if (RpcServer.LOG.isDebugEnabled()) {
098          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
099        }
100        return;
101      }
102      call.setStartTime(System.currentTimeMillis());
103      if (call.getStartTime() > call.getDeadline()) {
104        RpcServer.LOG.warn("Dropping timed out call: " + call);
105        return;
106      }
107      this.status.setStatus("Setting up call");
108      this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
109      if (RpcServer.LOG.isTraceEnabled()) {
110        Optional<User> remoteUser = call.getRequestUser();
111        RpcServer.LOG.trace(call.toShortString() + " executing as " +
112            (remoteUser.isPresent() ? "NULL principal" : remoteUser.get().getName()));
113      }
114      Throwable errorThrowable = null;
115      String error = null;
116      Pair<Message, CellScanner> resultPair = null;
117      RpcServer.CurCall.set(call);
118      try {
119        if (!this.rpcServer.isStarted()) {
120          InetSocketAddress address = rpcServer.getListenerAddress();
121          throw new ServerNotRunningYetException("Server " +
122              (address != null ? address : "(channel closed)") + " is not running yet");
123        }
124        String serviceName =
125            call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
126        String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
127        String traceString = serviceName + "." + methodName;
128        TraceUtil.createTrace(traceString);
129        // make the call
130        resultPair = this.rpcServer.call(call, this.status);
131      } catch (TimeoutIOException e){
132        RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
133        return;
134      } catch (Throwable e) {
135        if (e instanceof ServerNotRunningYetException) {
136          // If ServerNotRunningYetException, don't spew stack trace.
137          if (RpcServer.LOG.isTraceEnabled()) {
138            RpcServer.LOG.trace(call.toShortString(), e);
139          }
140        } else {
141          // Don't dump full exception.. just String version
142          RpcServer.LOG.debug(call.toShortString() + ", exception=" + e);
143        }
144        errorThrowable = e;
145        error = StringUtils.stringifyException(e);
146        if (e instanceof Error) {
147          throw (Error)e;
148        }
149      } finally {
150        RpcServer.CurCall.set(null);
151        if (resultPair != null) {
152          this.rpcServer.addCallSize(call.getSize() * -1);
153          sucessful = true;
154        }
155      }
156      // return back the RPC request read BB we can do here. It is done by now.
157      call.cleanup();
158      // Set the response
159      Message param = resultPair != null ? resultPair.getFirst() : null;
160      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
161      call.setResponse(param, cells, errorThrowable, error);
162      call.sendResponseIfReady();
163      this.status.markComplete("Sent response");
164      this.status.pause("Waiting for a call");
165    } catch (OutOfMemoryError e) {
166      if (this.rpcServer.getErrorHandler() != null) {
167        if (this.rpcServer.getErrorHandler().checkOOME(e)) {
168          RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
169          return;
170        }
171      } else {
172        // rethrow if no handler
173        throw e;
174      }
175    } catch (ClosedChannelException cce) {
176      InetSocketAddress address = rpcServer.getListenerAddress();
177      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
178          "this means that the server " + (address != null ? address : "(channel closed)") +
179          " was processing a request but the client went away. The error message was: " +
180          cce.getMessage());
181    } catch (Exception e) {
182      RpcServer.LOG.warn(Thread.currentThread().getName()
183          + ": caught: " + StringUtils.stringifyException(e));
184    } finally {
185      if (!sucessful) {
186        this.rpcServer.addCallSize(call.getSize() * -1);
187      }
188      cleanup();
189    }
190  }
191
192  /**
193   * When we want to drop this call because of server is overloaded.
194   */
195  public void drop() {
196    try {
197      if (call.disconnectSince() >= 0) {
198        if (RpcServer.LOG.isDebugEnabled()) {
199          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
200        }
201        return;
202      }
203
204      // Set the response
205      InetSocketAddress address = rpcServer.getListenerAddress();
206      call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
207        + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
208      call.sendResponseIfReady();
209    } catch (ClosedChannelException cce) {
210      InetSocketAddress address = rpcServer.getListenerAddress();
211      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
212        "this means that the server " + (address != null ? address : "(channel closed)") +
213        " was processing a request but the client went away. The error message was: " +
214        cce.getMessage());
215    } catch (Exception e) {
216      RpcServer.LOG.warn(Thread.currentThread().getName()
217        + ": caught: " + StringUtils.stringifyException(e));
218    } finally {
219      if (!sucessful) {
220        this.rpcServer.addCallSize(call.getSize() * -1);
221      }
222      cleanup();
223    }
224  }
225}