001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.net.InetSocketAddress;
021import java.nio.channels.ClosedChannelException;
022import java.util.Optional;
023
024import org.apache.hadoop.hbase.CallDroppedException;
025import org.apache.hadoop.hbase.CellScanner;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
031import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hbase.thirdparty.com.google.protobuf.Message;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.htrace.core.TraceScope;
037
038/**
039 * The request processing logic, which is usually executed in thread pools provided by an
040 * {@link RpcScheduler}.  Call {@link #run()} to actually execute the contained
041 * RpcServer.Call
042 */
043@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
044@InterfaceStability.Evolving
045public class CallRunner {
046
047  private static final CallDroppedException CALL_DROPPED_EXCEPTION
048    = new CallDroppedException();
049
050  private RpcCall call;
051  private RpcServerInterface rpcServer;
052  private MonitoredRPCHandler status;
053  private volatile boolean sucessful;
054
055  /**
056   * On construction, adds the size of this call to the running count of outstanding call sizes.
057   * Presumption is that we are put on a queue while we wait on an executor to run us.  During this
058   * time we occupy heap.
059   */
060  // The constructor is shutdown so only RpcServer in this class can make one of these.
061  CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
062    this.call = call;
063    this.rpcServer = rpcServer;
064    // Add size of the call to queue size.
065    if (call != null && rpcServer != null) {
066      this.rpcServer.addCallSize(call.getSize());
067    }
068  }
069
070  public RpcCall getRpcCall() {
071    return call;
072  }
073
074  /**
075   * Keep for backward compatibility.
076   * @deprecated As of release 2.0, this will be removed in HBase 3.0
077   */
078  @Deprecated
079  public ServerCall<?> getCall() {
080    return (ServerCall<?>) call;
081  }
082
083  public void setStatus(MonitoredRPCHandler status) {
084    this.status = status;
085  }
086
087  /**
088   * Cleanup after ourselves... let go of references.
089   */
090  private void cleanup() {
091    this.call = null;
092    this.rpcServer = null;
093  }
094
095  public void run() {
096    try {
097      if (call.disconnectSince() >= 0) {
098        if (RpcServer.LOG.isDebugEnabled()) {
099          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
100        }
101        return;
102      }
103      call.setStartTime(System.currentTimeMillis());
104      if (call.getStartTime() > call.getDeadline()) {
105        RpcServer.LOG.warn("Dropping timed out call: " + call);
106        return;
107      }
108      this.status.setStatus("Setting up call");
109      this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
110      if (RpcServer.LOG.isTraceEnabled()) {
111        Optional<User> remoteUser = call.getRequestUser();
112        RpcServer.LOG.trace(call.toShortString() + " executing as " +
113            (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal"));
114      }
115      Throwable errorThrowable = null;
116      String error = null;
117      Pair<Message, CellScanner> resultPair = null;
118      RpcServer.CurCall.set(call);
119      TraceScope traceScope = null;
120      try {
121        if (!this.rpcServer.isStarted()) {
122          InetSocketAddress address = rpcServer.getListenerAddress();
123          throw new ServerNotRunningYetException("Server " +
124              (address != null ? address : "(channel closed)") + " is not running yet");
125        }
126        String serviceName =
127            call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
128        String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
129        String traceString = serviceName + "." + methodName;
130        traceScope = TraceUtil.createTrace(traceString);
131        // make the call
132        resultPair = this.rpcServer.call(call, this.status);
133      } catch (TimeoutIOException e){
134        RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
135        return;
136      } catch (Throwable e) {
137        if (e instanceof ServerNotRunningYetException) {
138          // If ServerNotRunningYetException, don't spew stack trace.
139          if (RpcServer.LOG.isTraceEnabled()) {
140            RpcServer.LOG.trace(call.toShortString(), e);
141          }
142        } else {
143          // Don't dump full exception.. just String version
144          RpcServer.LOG.debug(call.toShortString() + ", exception=" + e);
145        }
146        errorThrowable = e;
147        error = StringUtils.stringifyException(e);
148        if (e instanceof Error) {
149          throw (Error)e;
150        }
151      } finally {
152        if (traceScope != null) {
153          traceScope.close();
154        }
155        RpcServer.CurCall.set(null);
156        if (resultPair != null) {
157          this.rpcServer.addCallSize(call.getSize() * -1);
158          sucessful = true;
159        }
160      }
161      // return back the RPC request read BB we can do here. It is done by now.
162      call.cleanup();
163      // Set the response
164      Message param = resultPair != null ? resultPair.getFirst() : null;
165      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
166      call.setResponse(param, cells, errorThrowable, error);
167      call.sendResponseIfReady();
168      this.status.markComplete("Sent response");
169      this.status.pause("Waiting for a call");
170    } catch (OutOfMemoryError e) {
171      if (this.rpcServer.getErrorHandler() != null) {
172        if (this.rpcServer.getErrorHandler().checkOOME(e)) {
173          RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
174          return;
175        }
176      } else {
177        // rethrow if no handler
178        throw e;
179      }
180    } catch (ClosedChannelException cce) {
181      InetSocketAddress address = rpcServer.getListenerAddress();
182      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
183          "this means that the server " + (address != null ? address : "(channel closed)") +
184          " was processing a request but the client went away. The error message was: " +
185          cce.getMessage());
186    } catch (Exception e) {
187      RpcServer.LOG.warn(Thread.currentThread().getName()
188          + ": caught: " + StringUtils.stringifyException(e));
189    } finally {
190      if (!sucessful) {
191        this.rpcServer.addCallSize(call.getSize() * -1);
192      }
193      cleanup();
194    }
195  }
196
197  /**
198   * When we want to drop this call because of server is overloaded.
199   */
200  public void drop() {
201    try {
202      if (call.disconnectSince() >= 0) {
203        if (RpcServer.LOG.isDebugEnabled()) {
204          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
205        }
206        return;
207      }
208
209      // Set the response
210      InetSocketAddress address = rpcServer.getListenerAddress();
211      call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
212        + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
213      call.sendResponseIfReady();
214    } catch (ClosedChannelException cce) {
215      InetSocketAddress address = rpcServer.getListenerAddress();
216      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
217        "this means that the server " + (address != null ? address : "(channel closed)") +
218        " was processing a request but the client went away. The error message was: " +
219        cce.getMessage());
220    } catch (Exception e) {
221      RpcServer.LOG.warn(Thread.currentThread().getName()
222        + ": caught: " + StringUtils.stringifyException(e));
223    } finally {
224      if (!sucessful) {
225        this.rpcServer.addCallSize(call.getSize() * -1);
226      }
227      cleanup();
228    }
229  }
230}