001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.net.InetSocketAddress;
021import java.nio.channels.ClosedChannelException;
022import java.util.Optional;
023
024import org.apache.hadoop.hbase.CallDroppedException;
025import org.apache.hadoop.hbase.CellScanner;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
031import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hbase.thirdparty.com.google.protobuf.Message;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.htrace.core.TraceScope;
037
038/**
039 * The request processing logic, which is usually executed in thread pools provided by an
040 * {@link RpcScheduler}.  Call {@link #run()} to actually execute the contained
041 * RpcServer.Call
042 */
043@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
044@InterfaceStability.Evolving
045public class CallRunner {
046
047  private static final CallDroppedException CALL_DROPPED_EXCEPTION
048    = new CallDroppedException();
049
050  private RpcCall call;
051  private RpcServerInterface rpcServer;
052  private MonitoredRPCHandler status;
053  private volatile boolean sucessful;
054
055  /**
056   * On construction, adds the size of this call to the running count of outstanding call sizes.
057   * Presumption is that we are put on a queue while we wait on an executor to run us.  During this
058   * time we occupy heap.
059   */
060  // The constructor is shutdown so only RpcServer in this class can make one of these.
061  CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
062    this.call = call;
063    this.rpcServer = rpcServer;
064    // Add size of the call to queue size.
065    if (call != null && rpcServer != null) {
066      this.rpcServer.addCallSize(call.getSize());
067    }
068  }
069
070  public RpcCall getRpcCall() {
071    return call;
072  }
073
074  /**
075   * Keep for backward compatibility.
076   * @deprecated As of release 2.0, this will be removed in HBase 3.0
077   */
078  @Deprecated
079  public ServerCall<?> getCall() {
080    return (ServerCall<?>) call;
081  }
082
083  public void setStatus(MonitoredRPCHandler status) {
084    this.status = status;
085  }
086
087  /**
088   * Cleanup after ourselves... let go of references.
089   */
090  private void cleanup() {
091    this.call.cleanup();
092    this.call = null;
093    this.rpcServer = null;
094  }
095
096  public void run() {
097    try {
098      if (call.disconnectSince() >= 0) {
099        if (RpcServer.LOG.isDebugEnabled()) {
100          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
101        }
102        return;
103      }
104      call.setStartTime(System.currentTimeMillis());
105      if (call.getStartTime() > call.getDeadline()) {
106        RpcServer.LOG.warn("Dropping timed out call: " + call);
107        return;
108      }
109      this.status.setStatus("Setting up call");
110      this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
111      if (RpcServer.LOG.isTraceEnabled()) {
112        Optional<User> remoteUser = call.getRequestUser();
113        RpcServer.LOG.trace(call.toShortString() + " executing as " +
114            (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal"));
115      }
116      Throwable errorThrowable = null;
117      String error = null;
118      Pair<Message, CellScanner> resultPair = null;
119      RpcServer.CurCall.set(call);
120      TraceScope traceScope = null;
121      try {
122        if (!this.rpcServer.isStarted()) {
123          InetSocketAddress address = rpcServer.getListenerAddress();
124          throw new ServerNotRunningYetException("Server " +
125              (address != null ? address : "(channel closed)") + " is not running yet");
126        }
127        String serviceName =
128            call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
129        String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
130        String traceString = serviceName + "." + methodName;
131        traceScope = TraceUtil.createTrace(traceString);
132        // make the call
133        resultPair = this.rpcServer.call(call, this.status);
134      } catch (TimeoutIOException e){
135        RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
136        return;
137      } catch (Throwable e) {
138        if (e instanceof ServerNotRunningYetException) {
139          // If ServerNotRunningYetException, don't spew stack trace.
140          if (RpcServer.LOG.isTraceEnabled()) {
141            RpcServer.LOG.trace(call.toShortString(), e);
142          }
143        } else {
144          // Don't dump full exception.. just String version
145          RpcServer.LOG.debug(call.toShortString() + ", exception=" + e);
146        }
147        errorThrowable = e;
148        error = StringUtils.stringifyException(e);
149        if (e instanceof Error) {
150          throw (Error)e;
151        }
152      } finally {
153        if (traceScope != null) {
154          traceScope.close();
155        }
156        RpcServer.CurCall.set(null);
157        if (resultPair != null) {
158          this.rpcServer.addCallSize(call.getSize() * -1);
159          sucessful = true;
160        }
161      }
162      // return back the RPC request read BB we can do here. It is done by now.
163      call.cleanup();
164      // Set the response
165      Message param = resultPair != null ? resultPair.getFirst() : null;
166      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
167      call.setResponse(param, cells, errorThrowable, error);
168      call.sendResponseIfReady();
169      this.status.markComplete("Sent response");
170      this.status.pause("Waiting for a call");
171    } catch (OutOfMemoryError e) {
172      if (this.rpcServer.getErrorHandler() != null) {
173        if (this.rpcServer.getErrorHandler().checkOOME(e)) {
174          RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
175          return;
176        }
177      } else {
178        // rethrow if no handler
179        throw e;
180      }
181    } catch (ClosedChannelException cce) {
182      InetSocketAddress address = rpcServer.getListenerAddress();
183      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
184          "this means that the server " + (address != null ? address : "(channel closed)") +
185          " was processing a request but the client went away. The error message was: " +
186          cce.getMessage());
187    } catch (Exception e) {
188      RpcServer.LOG.warn(Thread.currentThread().getName()
189          + ": caught: " + StringUtils.stringifyException(e));
190    } finally {
191      if (!sucessful) {
192        this.rpcServer.addCallSize(call.getSize() * -1);
193      }
194      cleanup();
195    }
196  }
197
198  /**
199   * When we want to drop this call because of server is overloaded.
200   */
201  public void drop() {
202    try {
203      if (call.disconnectSince() >= 0) {
204        if (RpcServer.LOG.isDebugEnabled()) {
205          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
206        }
207        return;
208      }
209
210      // Set the response
211      InetSocketAddress address = rpcServer.getListenerAddress();
212      call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
213        + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
214      call.sendResponseIfReady();
215    } catch (ClosedChannelException cce) {
216      InetSocketAddress address = rpcServer.getListenerAddress();
217      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
218        "this means that the server " + (address != null ? address : "(channel closed)") +
219        " was processing a request but the client went away. The error message was: " +
220        cce.getMessage());
221    } catch (Exception e) {
222      RpcServer.LOG.warn(Thread.currentThread().getName()
223        + ": caught: " + StringUtils.stringifyException(e));
224    } finally {
225      if (!sucessful) {
226        this.rpcServer.addCallSize(call.getSize() * -1);
227      }
228      cleanup();
229    }
230  }
231}