1 package org.apache.hadoop.hbase.ipc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import java.net.InetSocketAddress;
20 import java.nio.channels.ClosedChannelException;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.CellScanner;
25 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
29 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
30 import org.apache.hadoop.hbase.util.Pair;
31 import org.apache.hadoop.security.UserGroupInformation;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.htrace.Trace;
34 import org.apache.htrace.TraceScope;
35
36 import com.google.protobuf.Message;
37
38
39
40
41
42
43 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
44 @InterfaceStability.Evolving
45 public class CallRunner {
46 private static final Log LOG = LogFactory.getLog(CallRunner.class);
47
48 private Call call;
49 private RpcServerInterface rpcServer;
50 private MonitoredRPCHandler status;
51 private volatile boolean sucessful;
52
53
54
55
56
57
58
59 CallRunner(final RpcServerInterface rpcServer, final Call call) {
60 this.call = call;
61 this.rpcServer = rpcServer;
62
63 this.rpcServer.addCallSize(call.getSize());
64 }
65
66 public Call getCall() {
67 return call;
68 }
69
70 public void setStatus(MonitoredRPCHandler status) {
71 this.status = status;
72 }
73
74
75
76
77 private void cleanup() {
78 this.call = null;
79 this.rpcServer = null;
80 }
81
82 public void run() {
83 try {
84 if (!call.connection.channel.isOpen()) {
85 if (RpcServer.LOG.isDebugEnabled()) {
86 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
87 }
88 return;
89 }
90 this.status.setStatus("Setting up call");
91 this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
92 if (RpcServer.LOG.isTraceEnabled()) {
93 UserGroupInformation remoteUser = call.connection.ugi;
94 RpcServer.LOG.trace(call.toShortString() + " executing as " +
95 ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
96 }
97 Throwable errorThrowable = null;
98 String error = null;
99 Pair<Message, CellScanner> resultPair = null;
100 RpcServer.CurCall.set(call);
101 TraceScope traceScope = null;
102 try {
103 if (!this.rpcServer.isStarted()) {
104 InetSocketAddress address = rpcServer.getListenerAddress();
105 throw new ServerNotRunningYetException("Server " +
106 (address != null ? address : "(channel closed)") + " is not running yet");
107 }
108 if (call.tinfo != null) {
109 traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
110 }
111
112 resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
113 call.timestamp, this.status);
114 } catch (Throwable e) {
115 RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
116 errorThrowable = e;
117 error = StringUtils.stringifyException(e);
118 if (e instanceof Error) {
119 throw (Error)e;
120 }
121 } finally {
122 if (traceScope != null) {
123 traceScope.close();
124 }
125 RpcServer.CurCall.set(null);
126 if (resultPair != null) {
127 this.rpcServer.addCallSize(call.getSize() * -1);
128 sucessful = true;
129 }
130 }
131
132 Message param = resultPair != null ? resultPair.getFirst() : null;
133 CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
134 call.setResponse(param, cells, errorThrowable, error);
135 call.sendResponseIfReady();
136 this.status.markComplete("Sent response");
137 this.status.pause("Waiting for a call");
138 } catch (OutOfMemoryError e) {
139 if (this.rpcServer.getErrorHandler() != null) {
140 if (this.rpcServer.getErrorHandler().checkOOME(e)) {
141 RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
142 return;
143 }
144 } else {
145
146 throw e;
147 }
148 } catch (ClosedChannelException cce) {
149 InetSocketAddress address = rpcServer.getListenerAddress();
150 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
151 "this means that the server " + (address != null ? address : "(channel closed)") +
152 " was processing a request but the client went away. The error message was: " +
153 cce.getMessage());
154 } catch (Exception e) {
155 RpcServer.LOG.warn(Thread.currentThread().getName()
156 + ": caught: " + StringUtils.stringifyException(e));
157 } finally {
158 if (!sucessful) {
159 this.rpcServer.addCallSize(call.getSize() * -1);
160 }
161 cleanup();
162 }
163 }
164 }