View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.monitoring;
21  
22  import org.apache.hadoop.hbase.client.Operation;
23  import org.apache.hadoop.hbase.io.WritableWithSize;
24  import org.apache.hadoop.hbase.util.Bytes;
25  import org.apache.hadoop.io.Writable;
26  
27  import org.codehaus.jackson.map.ObjectMapper;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.HashMap;
32  import java.util.Map;
33  
34  /**
35   * A MonitoredTask implementation designed for use with RPC Handlers 
36   * handling frequent, short duration tasks. String concatenations and object 
37   * allocations are avoided in methods that will be hit by every RPC call.
38   */
39  public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 
40    implements MonitoredRPCHandler {
41    private String clientAddress;
42    private int remotePort;
43    private long rpcQueueTime;
44    private long rpcStartTime;
45    private String methodName = "";
46    private Object [] params = {};
47    private Writable packet;
48  
49    public MonitoredRPCHandlerImpl() {
50      super();
51      // in this implementation, WAITING indicates that the handler is not 
52      // actively servicing an RPC call.
53      setState(State.WAITING);
54    }
55  
56    @Override
57    public synchronized MonitoredRPCHandlerImpl clone() {
58      return (MonitoredRPCHandlerImpl) super.clone();
59    }
60  
61    /**
62     * Gets the status of this handler; if it is currently servicing an RPC, 
63     * this status will include the RPC information.
64     * @return a String describing the current status.
65     */
66    @Override
67    public String getStatus() {
68      if (getState() != State.RUNNING) {
69        return super.getStatus();
70      }
71      return super.getStatus() + " from " + getClient() + ": " + getRPC();
72    }
73  
74    /**
75     * Accesses the queue time for the currently running RPC on the 
76     * monitored Handler.
77     * @return the queue timestamp or -1 if there is no RPC currently running.
78     */
79    public long getRPCQueueTime() {
80      if (getState() != State.RUNNING) {
81        return -1;
82      }
83      return rpcQueueTime;
84    }
85  
86    /**
87     * Accesses the start time for the currently running RPC on the 
88     * monitored Handler.
89     * @return the start timestamp or -1 if there is no RPC currently running.
90     */
91    public long getRPCStartTime() {
92      if (getState() != State.RUNNING) {
93        return -1;
94      }
95      return rpcStartTime;
96    }
97  
98    /**
99     * Produces a string representation of the method currently being serviced
100    * by this Handler.
101    * @return a string representing the method call without parameters
102    */
103   public String getRPC() {
104     return getRPC(false);
105   }
106 
107   /**
108    * Produces a string representation of the method currently being serviced
109    * by this Handler.
110    * @param withParams toggle inclusion of parameters in the RPC String
111    * @return A human-readable string representation of the method call.
112    */
113   public synchronized String getRPC(boolean withParams) {
114     if (getState() != State.RUNNING) {
115       // no RPC is currently running
116       return "";
117     }
118     StringBuilder buffer = new StringBuilder(256);
119     buffer.append(methodName);
120     if (withParams) {
121       buffer.append("(");
122       for (int i = 0; i < params.length; i++) {
123         if (i != 0)
124           buffer.append(", ");
125         buffer.append(params[i]);
126       }
127       buffer.append(")");
128     }
129     return buffer.toString();
130   }
131 
132   /**
133    * Produces a string representation of the method currently being serviced
134    * by this Handler.
135    * @return A human-readable string representation of the method call.
136    */
137   public long getRPCPacketLength() {
138     if (getState() != State.RUNNING || packet == null) {
139       // no RPC is currently running, or we don't have an RPC's packet info
140       return -1L;
141     }
142     if (!(packet instanceof WritableWithSize)) {
143       // the packet passed to us doesn't expose size information
144       return -1L;
145     }
146     return ((WritableWithSize) packet).getWritableSize();
147   }
148 
149   /**
150    * If an RPC call is currently running, produces a String representation of 
151    * the connection from which it was received.
152    * @return A human-readable string representation of the address and port 
153    *  of the client.
154    */
155   public String getClient() {
156     return clientAddress + ":" + remotePort;
157   }
158 
159   /**
160    * Indicates to the client whether this task is monitoring a currently active 
161    * RPC call.
162    * @return true if the monitored handler is currently servicing an RPC call.
163    */
164   public boolean isRPCRunning() {
165     return getState() == State.RUNNING;
166   }
167 
168   /**
169    * Indicates to the client whether this task is monitoring a currently active 
170    * RPC call to a database command. (as defined by 
171    * o.a.h.h.client.Operation)
172    * @return true if the monitored handler is currently servicing an RPC call
173    * to a database command.
174    */
175   public boolean isOperationRunning() {
176     if(!isRPCRunning()) {
177       return false;
178     }
179     for(Object param : params) {
180       if (param instanceof Operation) {
181         return true;
182       }
183     }
184     return false;
185   }
186 
187   /**
188    * Tells this instance that it is monitoring a new RPC call.
189    * @param methodName The name of the method that will be called by the RPC.
190    * @param params The parameters that will be passed to the indicated method.
191    */
192   public synchronized void setRPC(String methodName, Object [] params, 
193       long queueTime) {
194     this.methodName = methodName;
195     this.params = params;
196     this.rpcStartTime = System.currentTimeMillis();
197     this.rpcQueueTime = queueTime;
198     this.state = State.RUNNING;
199   }
200 
201   /**
202    * Gives this instance a reference to the Writable received by the RPC, so 
203    * that it can later compute its size if asked for it.
204    * @param param The Writable received by the RPC for this call
205    */
206   public void setRPCPacket(Writable param) {
207     this.packet = param;
208   }
209 
210   /**
211    * Registers current handler client details.
212    * @param clientAddress the address of the current client
213    * @param remotePort the port from which the client connected
214    */
215   public void setConnection(String clientAddress, int remotePort) {
216     this.clientAddress = clientAddress;
217     this.remotePort = remotePort;
218   }
219 
220   @Override
221   public void markComplete(String status) {
222     super.markComplete(status);
223     this.params = null;
224     this.packet = null;
225   }
226 
227   public synchronized Map<String, Object> toMap() {
228     // only include RPC info if the Handler is actively servicing an RPC call
229     Map<String, Object> map = super.toMap();
230     if (getState() != State.RUNNING) {
231       return map;
232     }
233     Map<String, Object> rpcJSON = new HashMap<String, Object>();
234     ArrayList paramList = new ArrayList();
235     map.put("rpcCall", rpcJSON);
236     rpcJSON.put("queuetimems", getRPCQueueTime());
237     rpcJSON.put("starttimems", getRPCStartTime());
238     rpcJSON.put("clientaddress", clientAddress);
239     rpcJSON.put("remoteport", remotePort);
240     rpcJSON.put("packetlength", getRPCPacketLength());
241     rpcJSON.put("method", methodName);
242     rpcJSON.put("params", paramList);
243     for(Object param : params) {
244       if(param instanceof byte []) {
245         paramList.add(Bytes.toStringBinary((byte []) param));
246       } else if (param instanceof Operation) {
247         paramList.add(((Operation) param).toMap());
248       } else {
249         paramList.add(param.toString());
250       }
251     }
252     return map;
253   }
254 
255   @Override
256   public String toString() {
257     if (getState() != State.RUNNING) {
258       return super.toString();
259     }
260     return super.toString() + ", rpcMethod=" + getRPC();
261   }
262 
263 }