001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.monitoring; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.client.Operation; 027import org.apache.hadoop.hbase.util.Bytes; 028 029import org.apache.hbase.thirdparty.com.google.protobuf.Message; 030 031/** 032 * A MonitoredTask implementation designed for use with RPC Handlers 033 * handling frequent, short duration tasks. String concatenations and object 034 * allocations are avoided in methods that will be hit by every RPC call. 035 */ 036@InterfaceAudience.Private 037public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl 038 implements MonitoredRPCHandler { 039 private String clientAddress; 040 private int remotePort; 041 private long rpcQueueTime; 042 private long rpcStartTime; 043 private String methodName = ""; 044 private Object [] params = {}; 045 private Message packet; 046 047 public MonitoredRPCHandlerImpl() { 048 super(); 049 // in this implementation, WAITING indicates that the handler is not 050 // actively servicing an RPC call. 051 setState(State.WAITING); 052 } 053 054 @Override 055 public synchronized MonitoredRPCHandlerImpl clone() { 056 return (MonitoredRPCHandlerImpl) super.clone(); 057 } 058 059 /** 060 * Gets the status of this handler; if it is currently servicing an RPC, 061 * this status will include the RPC information. 062 * @return a String describing the current status. 063 */ 064 @Override 065 public String getStatus() { 066 if (getState() != State.RUNNING) { 067 return super.getStatus(); 068 } 069 return super.getStatus() + " from " + getClient() + ": " + getRPC(); 070 } 071 072 /** 073 * Accesses the queue time for the currently running RPC on the 074 * monitored Handler. 075 * @return the queue timestamp or -1 if there is no RPC currently running. 076 */ 077 @Override 078 public long getRPCQueueTime() { 079 if (getState() != State.RUNNING) { 080 return -1; 081 } 082 return rpcQueueTime; 083 } 084 085 /** 086 * Accesses the start time for the currently running RPC on the 087 * monitored Handler. 088 * @return the start timestamp or -1 if there is no RPC currently running. 089 */ 090 @Override 091 public long getRPCStartTime() { 092 if (getState() != State.RUNNING) { 093 return -1; 094 } 095 return rpcStartTime; 096 } 097 098 /** 099 * Produces a string representation of the method currently being serviced 100 * by this Handler. 101 * @return a string representing the method call without parameters 102 */ 103 @Override 104 public synchronized String getRPC() { 105 return getRPC(false); 106 } 107 108 /** 109 * Produces a string representation of the method currently being serviced 110 * by this Handler. 111 * @param withParams toggle inclusion of parameters in the RPC String 112 * @return A human-readable string representation of the method call. 113 */ 114 @Override 115 public synchronized String getRPC(boolean withParams) { 116 if (getState() != State.RUNNING) { 117 // no RPC is currently running 118 return ""; 119 } 120 StringBuilder buffer = new StringBuilder(256); 121 buffer.append(methodName); 122 if (withParams) { 123 buffer.append("("); 124 for (int i = 0; i < params.length; i++) { 125 if (i != 0) 126 buffer.append(", "); 127 buffer.append(params[i]); 128 } 129 buffer.append(")"); 130 } 131 return buffer.toString(); 132 } 133 134 /** 135 * Produces a string representation of the method currently being serviced 136 * by this Handler. 137 * @return A human-readable string representation of the method call. 138 */ 139 @Override 140 public long getRPCPacketLength() { 141 if (getState() != State.RUNNING || packet == null) { 142 // no RPC is currently running, or we don't have an RPC's packet info 143 return -1L; 144 } 145 return packet.getSerializedSize(); 146 } 147 148 /** 149 * If an RPC call is currently running, produces a String representation of 150 * the connection from which it was received. 151 * @return A human-readable string representation of the address and port 152 * of the client. 153 */ 154 @Override 155 public String getClient() { 156 return clientAddress + ":" + remotePort; 157 } 158 159 /** 160 * Indicates to the client whether this task is monitoring a currently active 161 * RPC call. 162 * @return true if the monitored handler is currently servicing an RPC call. 163 */ 164 @Override 165 public boolean isRPCRunning() { 166 return getState() == State.RUNNING; 167 } 168 169 /** 170 * Indicates to the client whether this task is monitoring a currently active 171 * RPC call to a database command. (as defined by 172 * o.a.h.h.client.Operation) 173 * @return true if the monitored handler is currently servicing an RPC call 174 * to a database command. 175 */ 176 @Override 177 public synchronized boolean isOperationRunning() { 178 if(!isRPCRunning()) { 179 return false; 180 } 181 for(Object param : params) { 182 if (param instanceof Operation) { 183 return true; 184 } 185 } 186 return false; 187 } 188 189 /** 190 * Tells this instance that it is monitoring a new RPC call. 191 * @param methodName The name of the method that will be called by the RPC. 192 * @param params The parameters that will be passed to the indicated method. 193 */ 194 @Override 195 public synchronized void setRPC(String methodName, Object [] params, 196 long queueTime) { 197 this.methodName = methodName; 198 this.params = params; 199 this.rpcStartTime = System.currentTimeMillis(); 200 this.rpcQueueTime = queueTime; 201 this.state = State.RUNNING; 202 } 203 204 /** 205 * Gives this instance a reference to the protobuf received by the RPC, so 206 * that it can later compute its size if asked for it. 207 * @param param The protobuf received by the RPC for this call 208 */ 209 @Override 210 public void setRPCPacket(Message param) { 211 this.packet = param; 212 } 213 214 /** 215 * Registers current handler client details. 216 * @param clientAddress the address of the current client 217 * @param remotePort the port from which the client connected 218 */ 219 @Override 220 public void setConnection(String clientAddress, int remotePort) { 221 this.clientAddress = clientAddress; 222 this.remotePort = remotePort; 223 } 224 225 @Override 226 public synchronized void markComplete(String status) { 227 super.markComplete(status); 228 this.params = null; 229 this.packet = null; 230 } 231 232 @Override 233 public synchronized Map<String, Object> toMap() { 234 // only include RPC info if the Handler is actively servicing an RPC call 235 Map<String, Object> map = super.toMap(); 236 if (getState() != State.RUNNING) { 237 return map; 238 } 239 Map<String, Object> rpcJSON = new HashMap<>(); 240 ArrayList paramList = new ArrayList(); 241 map.put("rpcCall", rpcJSON); 242 rpcJSON.put("queuetimems", getRPCQueueTime()); 243 rpcJSON.put("starttimems", getRPCStartTime()); 244 rpcJSON.put("clientaddress", clientAddress); 245 rpcJSON.put("remoteport", remotePort); 246 rpcJSON.put("packetlength", getRPCPacketLength()); 247 rpcJSON.put("method", methodName); 248 rpcJSON.put("params", paramList); 249 for(Object param : params) { 250 if(param instanceof byte []) { 251 paramList.add(Bytes.toStringBinary((byte []) param)); 252 } else if (param instanceof Operation) { 253 paramList.add(((Operation) param).toMap()); 254 } else { 255 paramList.add(param.toString()); 256 } 257 } 258 return map; 259 } 260 261 @Override 262 public String toString() { 263 if (getState() != State.RUNNING) { 264 return super.toString(); 265 } 266 return super.toString() 267 + ", queuetimems=" + getRPCQueueTime() 268 + ", starttimems=" + getRPCStartTime() 269 + ", clientaddress=" + clientAddress 270 + ", remoteport=" + remotePort 271 + ", packetlength=" + getRPCPacketLength() 272 + ", rpcMethod=" + getRPC(); 273 } 274}