View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.monitoring;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.client.Operation;
27  import org.apache.hadoop.hbase.util.Bytes;
28  
29  import com.google.protobuf.Message;
30  
31  /**
32   * A MonitoredTask implementation designed for use with RPC Handlers 
33   * handling frequent, short duration tasks. String concatenations and object 
34   * allocations are avoided in methods that will be hit by every RPC call.
35   */
36  @InterfaceAudience.Private
37  public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 
38    implements MonitoredRPCHandler {
39    private String clientAddress;
40    private int remotePort;
41    private long rpcQueueTime;
42    private long rpcStartTime;
43    private String methodName = "";
44    private Object [] params = {};
45    private Message packet;
46  
47    public MonitoredRPCHandlerImpl() {
48      super();
49      // in this implementation, WAITING indicates that the handler is not 
50      // actively servicing an RPC call.
51      setState(State.WAITING);
52    }
53  
54    @Override
55    public synchronized MonitoredRPCHandlerImpl clone() {
56      return (MonitoredRPCHandlerImpl) super.clone();
57    }
58  
59    /**
60     * Gets the status of this handler; if it is currently servicing an RPC, 
61     * this status will include the RPC information.
62     * @return a String describing the current status.
63     */
64    @Override
65    public String getStatus() {
66      if (getState() != State.RUNNING) {
67        return super.getStatus();
68      }
69      return super.getStatus() + " from " + getClient() + ": " + getRPC();
70    }
71  
72    /**
73     * Accesses the queue time for the currently running RPC on the 
74     * monitored Handler.
75     * @return the queue timestamp or -1 if there is no RPC currently running.
76     */
77    public long getRPCQueueTime() {
78      if (getState() != State.RUNNING) {
79        return -1;
80      }
81      return rpcQueueTime;
82    }
83  
84    /**
85     * Accesses the start time for the currently running RPC on the 
86     * monitored Handler.
87     * @return the start timestamp or -1 if there is no RPC currently running.
88     */
89    public long getRPCStartTime() {
90      if (getState() != State.RUNNING) {
91        return -1;
92      }
93      return rpcStartTime;
94    }
95  
96    /**
97     * Produces a string representation of the method currently being serviced
98     * by this Handler.
99     * @return a string representing the method call without parameters
100    */
101   public String getRPC() {
102     return getRPC(false);
103   }
104 
105   /**
106    * Produces a string representation of the method currently being serviced
107    * by this Handler.
108    * @param withParams toggle inclusion of parameters in the RPC String
109    * @return A human-readable string representation of the method call.
110    */
111   public synchronized String getRPC(boolean withParams) {
112     if (getState() != State.RUNNING) {
113       // no RPC is currently running
114       return "";
115     }
116     StringBuilder buffer = new StringBuilder(256);
117     buffer.append(methodName);
118     if (withParams) {
119       buffer.append("(");
120       for (int i = 0; i < params.length; i++) {
121         if (i != 0)
122           buffer.append(", ");
123         buffer.append(params[i]);
124       }
125       buffer.append(")");
126     }
127     return buffer.toString();
128   }
129 
130   /**
131    * Produces a string representation of the method currently being serviced
132    * by this Handler.
133    * @return A human-readable string representation of the method call.
134    */
135   public long getRPCPacketLength() {
136     if (getState() != State.RUNNING || packet == null) {
137       // no RPC is currently running, or we don't have an RPC's packet info
138       return -1L;
139     }
140     return packet.getSerializedSize();
141   }
142 
143   /**
144    * If an RPC call is currently running, produces a String representation of 
145    * the connection from which it was received.
146    * @return A human-readable string representation of the address and port 
147    *  of the client.
148    */
149   public String getClient() {
150     return clientAddress + ":" + remotePort;
151   }
152 
153   /**
154    * Indicates to the client whether this task is monitoring a currently active 
155    * RPC call.
156    * @return true if the monitored handler is currently servicing an RPC call.
157    */
158   public boolean isRPCRunning() {
159     return getState() == State.RUNNING;
160   }
161 
162   /**
163    * Indicates to the client whether this task is monitoring a currently active 
164    * RPC call to a database command. (as defined by 
165    * o.a.h.h.client.Operation)
166    * @return true if the monitored handler is currently servicing an RPC call
167    * to a database command.
168    */
169   public boolean isOperationRunning() {
170     if(!isRPCRunning()) {
171       return false;
172     }
173     for(Object param : params) {
174       if (param instanceof Operation) {
175         return true;
176       }
177     }
178     return false;
179   }
180 
181   /**
182    * Tells this instance that it is monitoring a new RPC call.
183    * @param methodName The name of the method that will be called by the RPC.
184    * @param params The parameters that will be passed to the indicated method.
185    */
186   public synchronized void setRPC(String methodName, Object [] params, 
187       long queueTime) {
188     this.methodName = methodName;
189     this.params = params;
190     this.rpcStartTime = System.currentTimeMillis();
191     this.rpcQueueTime = queueTime;
192     this.state = State.RUNNING;
193   }
194 
195   /**
196    * Gives this instance a reference to the protobuf received by the RPC, so 
197    * that it can later compute its size if asked for it.
198    * @param param The protobuf received by the RPC for this call
199    */
200   public void setRPCPacket(Message param) {
201     this.packet = param;
202   }
203 
204   /**
205    * Registers current handler client details.
206    * @param clientAddress the address of the current client
207    * @param remotePort the port from which the client connected
208    */
209   public void setConnection(String clientAddress, int remotePort) {
210     this.clientAddress = clientAddress;
211     this.remotePort = remotePort;
212   }
213 
214   @Override
215   public void markComplete(String status) {
216     super.markComplete(status);
217     this.params = null;
218     this.packet = null;
219   }
220 
221   public synchronized Map<String, Object> toMap() {
222     // only include RPC info if the Handler is actively servicing an RPC call
223     Map<String, Object> map = super.toMap();
224     if (getState() != State.RUNNING) {
225       return map;
226     }
227     Map<String, Object> rpcJSON = new HashMap<String, Object>();
228     ArrayList paramList = new ArrayList();
229     map.put("rpcCall", rpcJSON);
230     rpcJSON.put("queuetimems", getRPCQueueTime());
231     rpcJSON.put("starttimems", getRPCStartTime());
232     rpcJSON.put("clientaddress", clientAddress);
233     rpcJSON.put("remoteport", remotePort);
234     rpcJSON.put("packetlength", getRPCPacketLength());
235     rpcJSON.put("method", methodName);
236     rpcJSON.put("params", paramList);
237     for(Object param : params) {
238       if(param instanceof byte []) {
239         paramList.add(Bytes.toStringBinary((byte []) param));
240       } else if (param instanceof Operation) {
241         paramList.add(((Operation) param).toMap());
242       } else {
243         paramList.add(param.toString());
244       }
245     }
246     return map;
247   }
248 
249   @Override
250   public String toString() {
251     if (getState() != State.RUNNING) {
252       return super.toString();
253     }
254     return super.toString() + ", rpcMethod=" + getRPC();
255   }
256 }