1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.monitoring;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.client.Operation;
27 import org.apache.hadoop.hbase.util.Bytes;
28
29 import com.google.protobuf.Message;
30
31
32
33
34
35
36 @InterfaceAudience.Private
37 public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
38 implements MonitoredRPCHandler {
39 private String clientAddress;
40 private int remotePort;
41 private long rpcQueueTime;
42 private long rpcStartTime;
43 private String methodName = "";
44 private Object [] params = {};
45 private Message packet;
46
47 public MonitoredRPCHandlerImpl() {
48 super();
49
50
51 setState(State.WAITING);
52 }
53
54 @Override
55 public synchronized MonitoredRPCHandlerImpl clone() {
56 return (MonitoredRPCHandlerImpl) super.clone();
57 }
58
59
60
61
62
63
64 @Override
65 public String getStatus() {
66 if (getState() != State.RUNNING) {
67 return super.getStatus();
68 }
69 return super.getStatus() + " from " + getClient() + ": " + getRPC();
70 }
71
72
73
74
75
76
77 public long getRPCQueueTime() {
78 if (getState() != State.RUNNING) {
79 return -1;
80 }
81 return rpcQueueTime;
82 }
83
84
85
86
87
88
89 public long getRPCStartTime() {
90 if (getState() != State.RUNNING) {
91 return -1;
92 }
93 return rpcStartTime;
94 }
95
96
97
98
99
100
101 public String getRPC() {
102 return getRPC(false);
103 }
104
105
106
107
108
109
110
111 public synchronized String getRPC(boolean withParams) {
112 if (getState() != State.RUNNING) {
113
114 return "";
115 }
116 StringBuilder buffer = new StringBuilder(256);
117 buffer.append(methodName);
118 if (withParams) {
119 buffer.append("(");
120 for (int i = 0; i < params.length; i++) {
121 if (i != 0)
122 buffer.append(", ");
123 buffer.append(params[i]);
124 }
125 buffer.append(")");
126 }
127 return buffer.toString();
128 }
129
130
131
132
133
134
135 public long getRPCPacketLength() {
136 if (getState() != State.RUNNING || packet == null) {
137
138 return -1L;
139 }
140 return packet.getSerializedSize();
141 }
142
143
144
145
146
147
148
149 public String getClient() {
150 return clientAddress + ":" + remotePort;
151 }
152
153
154
155
156
157
158 public boolean isRPCRunning() {
159 return getState() == State.RUNNING;
160 }
161
162
163
164
165
166
167
168
169 public boolean isOperationRunning() {
170 if(!isRPCRunning()) {
171 return false;
172 }
173 for(Object param : params) {
174 if (param instanceof Operation) {
175 return true;
176 }
177 }
178 return false;
179 }
180
181
182
183
184
185
186 public synchronized void setRPC(String methodName, Object [] params,
187 long queueTime) {
188 this.methodName = methodName;
189 this.params = params;
190 this.rpcStartTime = System.currentTimeMillis();
191 this.rpcQueueTime = queueTime;
192 this.state = State.RUNNING;
193 }
194
195
196
197
198
199
200 public void setRPCPacket(Message param) {
201 this.packet = param;
202 }
203
204
205
206
207
208
209 public void setConnection(String clientAddress, int remotePort) {
210 this.clientAddress = clientAddress;
211 this.remotePort = remotePort;
212 }
213
214 @Override
215 public void markComplete(String status) {
216 super.markComplete(status);
217 this.params = null;
218 this.packet = null;
219 }
220
221 public synchronized Map<String, Object> toMap() {
222
223 Map<String, Object> map = super.toMap();
224 if (getState() != State.RUNNING) {
225 return map;
226 }
227 Map<String, Object> rpcJSON = new HashMap<String, Object>();
228 ArrayList paramList = new ArrayList();
229 map.put("rpcCall", rpcJSON);
230 rpcJSON.put("queuetimems", getRPCQueueTime());
231 rpcJSON.put("starttimems", getRPCStartTime());
232 rpcJSON.put("clientaddress", clientAddress);
233 rpcJSON.put("remoteport", remotePort);
234 rpcJSON.put("packetlength", getRPCPacketLength());
235 rpcJSON.put("method", methodName);
236 rpcJSON.put("params", paramList);
237 for(Object param : params) {
238 if(param instanceof byte []) {
239 paramList.add(Bytes.toStringBinary((byte []) param));
240 } else if (param instanceof Operation) {
241 paramList.add(((Operation) param).toMap());
242 } else {
243 paramList.add(param.toString());
244 }
245 }
246 return map;
247 }
248
249 @Override
250 public String toString() {
251 if (getState() != State.RUNNING) {
252 return super.toString();
253 }
254 return super.toString() + ", rpcMethod=" + getRPC();
255 }
256 }