001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.monitoring;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.Map;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.client.Operation;
027import org.apache.hadoop.hbase.util.Bytes;
028
029import org.apache.hbase.thirdparty.com.google.protobuf.Message;
030
031/**
032 * A MonitoredTask implementation designed for use with RPC Handlers 
033 * handling frequent, short duration tasks. String concatenations and object 
034 * allocations are avoided in methods that will be hit by every RPC call.
035 */
036@InterfaceAudience.Private
037public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 
038  implements MonitoredRPCHandler {
039  private String clientAddress;
040  private int remotePort;
041  private long rpcQueueTime;
042  private long rpcStartTime;
043  private String methodName = "";
044  private Object [] params = {};
045  private Message packet;
046
047  public MonitoredRPCHandlerImpl() {
048    super();
049    // in this implementation, WAITING indicates that the handler is not 
050    // actively servicing an RPC call.
051    setState(State.WAITING);
052  }
053
054  @Override
055  public synchronized MonitoredRPCHandlerImpl clone() {
056    return (MonitoredRPCHandlerImpl) super.clone();
057  }
058
059  /**
060   * Gets the status of this handler; if it is currently servicing an RPC, 
061   * this status will include the RPC information.
062   * @return a String describing the current status.
063   */
064  @Override
065  public String getStatus() {
066    if (getState() != State.RUNNING) {
067      return super.getStatus();
068    }
069    return super.getStatus() + " from " + getClient() + ": " + getRPC();
070  }
071
072  /**
073   * Accesses the queue time for the currently running RPC on the 
074   * monitored Handler.
075   * @return the queue timestamp or -1 if there is no RPC currently running.
076   */
077  @Override
078  public long getRPCQueueTime() {
079    if (getState() != State.RUNNING) {
080      return -1;
081    }
082    return rpcQueueTime;
083  }
084
085  /**
086   * Accesses the start time for the currently running RPC on the 
087   * monitored Handler.
088   * @return the start timestamp or -1 if there is no RPC currently running.
089   */
090  @Override
091  public long getRPCStartTime() {
092    if (getState() != State.RUNNING) {
093      return -1;
094    }
095    return rpcStartTime;
096  }
097
098  /**
099   * Produces a string representation of the method currently being serviced
100   * by this Handler.
101   * @return a string representing the method call without parameters
102   */
103  @Override
104  public synchronized String getRPC() {
105    return getRPC(false);
106  }
107
108  /**
109   * Produces a string representation of the method currently being serviced
110   * by this Handler.
111   * @param withParams toggle inclusion of parameters in the RPC String
112   * @return A human-readable string representation of the method call.
113   */
114  @Override
115  public synchronized String getRPC(boolean withParams) {
116    if (getState() != State.RUNNING) {
117      // no RPC is currently running
118      return "";
119    }
120    StringBuilder buffer = new StringBuilder(256);
121    buffer.append(methodName);
122    if (withParams) {
123      buffer.append("(");
124      for (int i = 0; i < params.length; i++) {
125        if (i != 0)
126          buffer.append(", ");
127        buffer.append(params[i]);
128      }
129      buffer.append(")");
130    }
131    return buffer.toString();
132  }
133
134  /**
135   * Produces a string representation of the method currently being serviced
136   * by this Handler.
137   * @return A human-readable string representation of the method call.
138   */
139  @Override
140  public long getRPCPacketLength() {
141    if (getState() != State.RUNNING || packet == null) {
142      // no RPC is currently running, or we don't have an RPC's packet info
143      return -1L;
144    }
145    return packet.getSerializedSize();
146  }
147
148  /**
149   * If an RPC call is currently running, produces a String representation of 
150   * the connection from which it was received.
151   * @return A human-readable string representation of the address and port 
152   *  of the client.
153   */
154  @Override
155  public String getClient() {
156    return clientAddress + ":" + remotePort;
157  }
158
159  /**
160   * Indicates to the client whether this task is monitoring a currently active 
161   * RPC call.
162   * @return true if the monitored handler is currently servicing an RPC call.
163   */
164  @Override
165  public boolean isRPCRunning() {
166    return getState() == State.RUNNING;
167  }
168
169  /**
170   * Indicates to the client whether this task is monitoring a currently active 
171   * RPC call to a database command. (as defined by 
172   * o.a.h.h.client.Operation)
173   * @return true if the monitored handler is currently servicing an RPC call
174   * to a database command.
175   */
176  @Override
177  public synchronized boolean isOperationRunning() {
178    if(!isRPCRunning()) {
179      return false;
180    }
181    for(Object param : params) {
182      if (param instanceof Operation) {
183        return true;
184      }
185    }
186    return false;
187  }
188
189  /**
190   * Tells this instance that it is monitoring a new RPC call.
191   * @param methodName The name of the method that will be called by the RPC.
192   * @param params The parameters that will be passed to the indicated method.
193   */
194  @Override
195  public synchronized void setRPC(String methodName, Object [] params, 
196      long queueTime) {
197    this.methodName = methodName;
198    this.params = params;
199    long now = System.currentTimeMillis();
200    this.rpcStartTime = now;
201    setWarnTime(now);
202    this.rpcQueueTime = queueTime;
203    this.state = State.RUNNING;
204  }
205
206  /**
207   * Gives this instance a reference to the protobuf received by the RPC, so 
208   * that it can later compute its size if asked for it.
209   * @param param The protobuf received by the RPC for this call
210   */
211  @Override
212  public void setRPCPacket(Message param) {
213    this.packet = param;
214  }
215
216  /**
217   * Registers current handler client details.
218   * @param clientAddress the address of the current client
219   * @param remotePort the port from which the client connected
220   */
221  @Override
222  public void setConnection(String clientAddress, int remotePort) {
223    this.clientAddress = clientAddress;
224    this.remotePort = remotePort;
225  }
226
227  @Override
228  public synchronized void markComplete(String status) {
229    super.markComplete(status);
230    this.params = null;
231    this.packet = null;
232  }
233
234  @Override
235  public synchronized Map<String, Object> toMap() {
236    // only include RPC info if the Handler is actively servicing an RPC call
237    Map<String, Object> map = super.toMap();
238    if (getState() != State.RUNNING) {
239      return map;
240    }
241    Map<String, Object> rpcJSON = new HashMap<>();
242    ArrayList paramList = new ArrayList();
243    map.put("rpcCall", rpcJSON);
244    rpcJSON.put("queuetimems", getRPCQueueTime());
245    rpcJSON.put("starttimems", getRPCStartTime());
246    rpcJSON.put("clientaddress", clientAddress);
247    rpcJSON.put("remoteport", remotePort);
248    rpcJSON.put("packetlength", getRPCPacketLength());
249    rpcJSON.put("method", methodName);
250    rpcJSON.put("params", paramList);
251    for(Object param : params) {
252      if(param instanceof byte []) {
253        paramList.add(Bytes.toStringBinary((byte []) param));
254      } else if (param instanceof Operation) {
255        paramList.add(((Operation) param).toMap());
256      } else {
257        paramList.add(param.toString());
258      }
259    }
260    return map;
261  }
262
263  @Override
264  public String toString() {
265    if (getState() != State.RUNNING) {
266      return super.toString();
267    }
268    return super.toString()
269        + ", queuetimems=" + getRPCQueueTime()
270        + ", starttimems=" + getRPCStartTime()
271        + ", clientaddress=" + clientAddress
272        + ", remoteport=" + remotePort
273        + ", packetlength=" + getRPCPacketLength()
274        + ", rpcMethod=" + getRPC();
275  }
276}