001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.monitoring;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.hbase.client.Operation;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
026import org.apache.yetus.audience.InterfaceAudience;
027
028import org.apache.hbase.thirdparty.com.google.protobuf.Message;
029
030/**
031 * A MonitoredTask implementation designed for use with RPC Handlers handling frequent, short
032 * duration tasks. String concatenations and object allocations are avoided in methods that will be
033 * hit by every RPC call.
034 */
035@InterfaceAudience.Private
036public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements MonitoredRPCHandler {
037  private String clientAddress;
038  private int remotePort;
039  private long rpcQueueTime;
040  private long rpcStartTime;
041  private String methodName = "";
042  private Object[] params = {};
043  private Message packet;
044  private boolean snapshot = false;
045  private Map<String, Object> callInfoMap = new HashMap<>();
046
047  public MonitoredRPCHandlerImpl(String description) {
048    super(false, description);
049    // in this implementation, WAITING indicates that the handler is not
050    // actively servicing an RPC call.
051    setState(State.WAITING);
052  }
053
054  @Override
055  public synchronized MonitoredRPCHandlerImpl clone() {
056    MonitoredRPCHandlerImpl clone = (MonitoredRPCHandlerImpl) super.clone();
057    clone.callInfoMap = generateCallInfoMap();
058    clone.snapshot = true;
059    return clone;
060  }
061
062  /**
063   * Gets the status of this handler; if it is currently servicing an RPC, this status will include
064   * the RPC information.
065   * @return a String describing the current status.
066   */
067  @Override
068  public String getStatus() {
069    if (getState() != State.RUNNING) {
070      return super.getStatus();
071    }
072    return super.getStatus() + " from " + getClient() + ": " + getRPC();
073  }
074
075  /**
076   * Accesses the queue time for the currently running RPC on the monitored Handler.
077   * @return the queue timestamp or -1 if there is no RPC currently running.
078   */
079  @Override
080  public long getRPCQueueTime() {
081    if (getState() != State.RUNNING) {
082      return -1;
083    }
084    return rpcQueueTime;
085  }
086
087  /**
088   * Accesses the start time for the currently running RPC on the monitored Handler.
089   * @return the start timestamp or -1 if there is no RPC currently running.
090   */
091  @Override
092  public long getRPCStartTime() {
093    if (getState() != State.RUNNING) {
094      return -1;
095    }
096    return rpcStartTime;
097  }
098
099  /**
100   * Produces a string representation of the method currently being serviced by this Handler.
101   * @return a string representing the method call without parameters
102   */
103  @Override
104  public synchronized String getRPC() {
105    return getRPC(false);
106  }
107
108  /**
109   * Produces a string representation of the method currently being serviced by this Handler.
110   * @param withParams toggle inclusion of parameters in the RPC String
111   * @return A human-readable string representation of the method call.
112   */
113  @Override
114  public synchronized String getRPC(boolean withParams) {
115    if (getState() != State.RUNNING) {
116      // no RPC is currently running
117      return "";
118    }
119    StringBuilder buffer = new StringBuilder(256);
120    buffer.append(methodName);
121    if (withParams) {
122      buffer.append("(");
123      for (int i = 0; i < params.length; i++) {
124        if (i != 0) buffer.append(", ");
125        buffer.append(params[i]);
126      }
127      buffer.append(")");
128    }
129    return buffer.toString();
130  }
131
132  /**
133   * Produces a string representation of the method currently being serviced by this Handler.
134   * @return A human-readable string representation of the method call.
135   */
136  @Override
137  public long getRPCPacketLength() {
138    if (getState() != State.RUNNING || packet == null) {
139      // no RPC is currently running, or we don't have an RPC's packet info
140      return -1L;
141    }
142    return packet.getSerializedSize();
143  }
144
145  /**
146   * If an RPC call is currently running, produces a String representation of the connection from
147   * which it was received.
148   * @return A human-readable string representation of the address and port of the client.
149   */
150  @Override
151  public String getClient() {
152    return clientAddress + ":" + remotePort;
153  }
154
155  /**
156   * Indicates to the client whether this task is monitoring a currently active RPC call.
157   * @return true if the monitored handler is currently servicing an RPC call.
158   */
159  @Override
160  public boolean isRPCRunning() {
161    return getState() == State.RUNNING;
162  }
163
164  /**
165   * Indicates to the client whether this task is monitoring a currently active RPC call to a
166   * database command. (as defined by o.a.h.h.client.Operation)
167   * @return true if the monitored handler is currently servicing an RPC call to a database command.
168   */
169  @Override
170  public synchronized boolean isOperationRunning() {
171    if (!isRPCRunning()) {
172      return false;
173    }
174    for (Object param : params) {
175      if (param instanceof Operation) {
176        return true;
177      }
178    }
179    return false;
180  }
181
182  /**
183   * Tells this instance that it is monitoring a new RPC call.
184   * @param methodName The name of the method that will be called by the RPC.
185   * @param params     The parameters that will be passed to the indicated method.
186   */
187  @Override
188  public synchronized void setRPC(String methodName, Object[] params, long queueTime) {
189    this.methodName = methodName;
190    this.params = params;
191    long now = EnvironmentEdgeManager.currentTime();
192    this.rpcStartTime = now;
193    setWarnTime(now);
194    this.rpcQueueTime = queueTime;
195    this.state = State.RUNNING;
196  }
197
198  /**
199   * Gives this instance a reference to the protobuf received by the RPC, so that it can later
200   * compute its size if asked for it.
201   * @param param The protobuf received by the RPC for this call
202   */
203  @Override
204  public void setRPCPacket(Message param) {
205    this.packet = param;
206  }
207
208  /**
209   * Registers current handler client details.
210   * @param clientAddress the address of the current client
211   * @param remotePort    the port from which the client connected
212   */
213  @Override
214  public void setConnection(String clientAddress, int remotePort) {
215    this.clientAddress = clientAddress;
216    this.remotePort = remotePort;
217  }
218
219  @Override
220  public synchronized void markComplete(String status) {
221    super.markComplete(status);
222    this.params = null;
223    this.packet = null;
224  }
225
226  @Override
227  public synchronized Map<String, Object> toMap() {
228    return this.snapshot ? this.callInfoMap : generateCallInfoMap();
229  }
230
231  private Map<String, Object> generateCallInfoMap() {
232    // only include RPC info if the Handler is actively servicing an RPC call
233    Map<String, Object> map = super.toMap();
234    if (getState() != State.RUNNING) {
235      return map;
236    }
237    Map<String, Object> rpcJSON = new HashMap<>();
238    ArrayList<Object> paramList = new ArrayList<>();
239    map.put("rpcCall", rpcJSON);
240    rpcJSON.put("queuetimems", getRPCQueueTime());
241    rpcJSON.put("starttimems", getRPCStartTime());
242    rpcJSON.put("clientaddress", clientAddress);
243    rpcJSON.put("remoteport", remotePort);
244    rpcJSON.put("packetlength", getRPCPacketLength());
245    rpcJSON.put("method", methodName);
246    rpcJSON.put("params", paramList);
247    for (Object param : params) {
248      if (param instanceof byte[]) {
249        paramList.add(Bytes.toStringBinary((byte[]) param));
250      } else if (param instanceof Operation) {
251        paramList.add(((Operation) param).toMap());
252      } else {
253        paramList.add(param.toString());
254      }
255    }
256    return map;
257  }
258
259  @Override
260  public String toString() {
261    if (getState() != State.RUNNING) {
262      return super.toString();
263    }
264    return super.toString() + ", queuetimems=" + getRPCQueueTime() + ", starttimems="
265      + getRPCStartTime() + ", clientaddress=" + clientAddress + ", remoteport=" + remotePort
266      + ", packetlength=" + getRPCPacketLength() + ", rpcMethod=" + getRPC();
267  }
268}