001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.monitoring; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.client.Operation; 027import org.apache.hadoop.hbase.util.Bytes; 028 029import org.apache.hbase.thirdparty.com.google.protobuf.Message; 030 031/** 032 * A MonitoredTask implementation designed for use with RPC Handlers 033 * handling frequent, short duration tasks. String concatenations and object 034 * allocations are avoided in methods that will be hit by every RPC call. 035 */ 036@InterfaceAudience.Private 037public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 038 implements MonitoredRPCHandler { 039 private String clientAddress; 040 private int remotePort; 041 private long rpcQueueTime; 042 private long rpcStartTime; 043 private String methodName = ""; 044 private Object [] params = {}; 045 private Message packet; 046 047 public MonitoredRPCHandlerImpl() { 048 super(); 049 // in this implementation, WAITING indicates that the handler is not 050 // actively servicing an RPC call. 051 setState(State.WAITING); 052 } 053 054 @Override 055 public synchronized MonitoredRPCHandlerImpl clone() { 056 return (MonitoredRPCHandlerImpl) super.clone(); 057 } 058 059 /** 060 * Gets the status of this handler; if it is currently servicing an RPC, 061 * this status will include the RPC information. 062 * @return a String describing the current status. 063 */ 064 @Override 065 public String getStatus() { 066 if (getState() != State.RUNNING) { 067 return super.getStatus(); 068 } 069 return super.getStatus() + " from " + getClient() + ": " + getRPC(); 070 } 071 072 /** 073 * Accesses the queue time for the currently running RPC on the 074 * monitored Handler. 075 * @return the queue timestamp or -1 if there is no RPC currently running. 076 */ 077 @Override 078 public long getRPCQueueTime() { 079 if (getState() != State.RUNNING) { 080 return -1; 081 } 082 return rpcQueueTime; 083 } 084 085 /** 086 * Accesses the start time for the currently running RPC on the 087 * monitored Handler. 088 * @return the start timestamp or -1 if there is no RPC currently running. 089 */ 090 @Override 091 public long getRPCStartTime() { 092 if (getState() != State.RUNNING) { 093 return -1; 094 } 095 return rpcStartTime; 096 } 097 098 /** 099 * Produces a string representation of the method currently being serviced 100 * by this Handler. 101 * @return a string representing the method call without parameters 102 */ 103 @Override 104 public synchronized String getRPC() { 105 return getRPC(false); 106 } 107 108 /** 109 * Produces a string representation of the method currently being serviced 110 * by this Handler. 111 * @param withParams toggle inclusion of parameters in the RPC String 112 * @return A human-readable string representation of the method call. 113 */ 114 @Override 115 public synchronized String getRPC(boolean withParams) { 116 if (getState() != State.RUNNING) { 117 // no RPC is currently running 118 return ""; 119 } 120 StringBuilder buffer = new StringBuilder(256); 121 buffer.append(methodName); 122 if (withParams) { 123 buffer.append("("); 124 for (int i = 0; i < params.length; i++) { 125 if (i != 0) 126 buffer.append(", "); 127 buffer.append(params[i]); 128 } 129 buffer.append(")"); 130 } 131 return buffer.toString(); 132 } 133 134 /** 135 * Produces a string representation of the method currently being serviced 136 * by this Handler. 137 * @return A human-readable string representation of the method call. 138 */ 139 @Override 140 public long getRPCPacketLength() { 141 if (getState() != State.RUNNING || packet == null) { 142 // no RPC is currently running, or we don't have an RPC's packet info 143 return -1L; 144 } 145 return packet.getSerializedSize(); 146 } 147 148 /** 149 * If an RPC call is currently running, produces a String representation of 150 * the connection from which it was received. 151 * @return A human-readable string representation of the address and port 152 * of the client. 153 */ 154 @Override 155 public String getClient() { 156 return clientAddress + ":" + remotePort; 157 } 158 159 /** 160 * Indicates to the client whether this task is monitoring a currently active 161 * RPC call. 162 * @return true if the monitored handler is currently servicing an RPC call. 163 */ 164 @Override 165 public boolean isRPCRunning() { 166 return getState() == State.RUNNING; 167 } 168 169 /** 170 * Indicates to the client whether this task is monitoring a currently active 171 * RPC call to a database command. (as defined by 172 * o.a.h.h.client.Operation) 173 * @return true if the monitored handler is currently servicing an RPC call 174 * to a database command. 175 */ 176 @Override 177 public synchronized boolean isOperationRunning() { 178 if(!isRPCRunning()) { 179 return false; 180 } 181 for(Object param : params) { 182 if (param instanceof Operation) { 183 return true; 184 } 185 } 186 return false; 187 } 188 189 /** 190 * Tells this instance that it is monitoring a new RPC call. 191 * @param methodName The name of the method that will be called by the RPC. 192 * @param params The parameters that will be passed to the indicated method. 193 */ 194 @Override 195 public synchronized void setRPC(String methodName, Object [] params, 196 long queueTime) { 197 this.methodName = methodName; 198 this.params = params; 199 long now = System.currentTimeMillis(); 200 this.rpcStartTime = now; 201 setWarnTime(now); 202 this.rpcQueueTime = queueTime; 203 this.state = State.RUNNING; 204 } 205 206 /** 207 * Gives this instance a reference to the protobuf received by the RPC, so 208 * that it can later compute its size if asked for it. 209 * @param param The protobuf received by the RPC for this call 210 */ 211 @Override 212 public void setRPCPacket(Message param) { 213 this.packet = param; 214 } 215 216 /** 217 * Registers current handler client details. 218 * @param clientAddress the address of the current client 219 * @param remotePort the port from which the client connected 220 */ 221 @Override 222 public void setConnection(String clientAddress, int remotePort) { 223 this.clientAddress = clientAddress; 224 this.remotePort = remotePort; 225 } 226 227 @Override 228 public synchronized void markComplete(String status) { 229 super.markComplete(status); 230 this.params = null; 231 this.packet = null; 232 } 233 234 @Override 235 public synchronized Map<String, Object> toMap() { 236 // only include RPC info if the Handler is actively servicing an RPC call 237 Map<String, Object> map = super.toMap(); 238 if (getState() != State.RUNNING) { 239 return map; 240 } 241 Map<String, Object> rpcJSON = new HashMap<>(); 242 ArrayList paramList = new ArrayList(); 243 map.put("rpcCall", rpcJSON); 244 rpcJSON.put("queuetimems", getRPCQueueTime()); 245 rpcJSON.put("starttimems", getRPCStartTime()); 246 rpcJSON.put("clientaddress", clientAddress); 247 rpcJSON.put("remoteport", remotePort); 248 rpcJSON.put("packetlength", getRPCPacketLength()); 249 rpcJSON.put("method", methodName); 250 rpcJSON.put("params", paramList); 251 for(Object param : params) { 252 if(param instanceof byte []) { 253 paramList.add(Bytes.toStringBinary((byte []) param)); 254 } else if (param instanceof Operation) { 255 paramList.add(((Operation) param).toMap()); 256 } else { 257 paramList.add(param.toString()); 258 } 259 } 260 return map; 261 } 262 263 @Override 264 public String toString() { 265 if (getState() != State.RUNNING) { 266 return super.toString(); 267 } 268 return super.toString() 269 + ", queuetimems=" + getRPCQueueTime() 270 + ", starttimems=" + getRPCStartTime() 271 + ", clientaddress=" + clientAddress 272 + ", remoteport=" + remotePort 273 + ", packetlength=" + getRPCPacketLength() 274 + ", rpcMethod=" + getRPC(); 275 } 276}