001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.hbase.client.Operation; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 026import org.apache.yetus.audience.InterfaceAudience; 027 028import org.apache.hbase.thirdparty.com.google.protobuf.Message; 029 030/** 031 * A MonitoredTask implementation designed for use with RPC Handlers handling frequent, short 032 * duration tasks. String concatenations and object allocations are avoided in methods that will be 033 * hit by every RPC call. 034 */ 035@InterfaceAudience.Private 036public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements MonitoredRPCHandler { 037 private String clientAddress; 038 private int remotePort; 039 private long rpcQueueTime; 040 private long rpcStartTime; 041 private String methodName = ""; 042 private Object[] params = {}; 043 private Message packet; 044 private boolean snapshot = false; 045 private Map<String, Object> callInfoMap = new HashMap<>(); 046 047 public MonitoredRPCHandlerImpl() { 048 super(false); 049 // in this implementation, WAITING indicates that the handler is not 050 // actively servicing an RPC call. 051 setState(State.WAITING); 052 } 053 054 @Override 055 public synchronized MonitoredRPCHandlerImpl clone() { 056 MonitoredRPCHandlerImpl clone = (MonitoredRPCHandlerImpl) super.clone(); 057 clone.callInfoMap = generateCallInfoMap(); 058 clone.snapshot = true; 059 return clone; 060 } 061 062 /** 063 * Gets the status of this handler; if it is currently servicing an RPC, this status will include 064 * the RPC information. 065 * @return a String describing the current status. 066 */ 067 @Override 068 public String getStatus() { 069 if (getState() != State.RUNNING) { 070 return super.getStatus(); 071 } 072 return super.getStatus() + " from " + getClient() + ": " + getRPC(); 073 } 074 075 /** 076 * Accesses the queue time for the currently running RPC on the monitored Handler. 077 * @return the queue timestamp or -1 if there is no RPC currently running. 078 */ 079 @Override 080 public long getRPCQueueTime() { 081 if (getState() != State.RUNNING) { 082 return -1; 083 } 084 return rpcQueueTime; 085 } 086 087 /** 088 * Accesses the start time for the currently running RPC on the monitored Handler. 089 * @return the start timestamp or -1 if there is no RPC currently running. 090 */ 091 @Override 092 public long getRPCStartTime() { 093 if (getState() != State.RUNNING) { 094 return -1; 095 } 096 return rpcStartTime; 097 } 098 099 /** 100 * Produces a string representation of the method currently being serviced by this Handler. 101 * @return a string representing the method call without parameters 102 */ 103 @Override 104 public synchronized String getRPC() { 105 return getRPC(false); 106 } 107 108 /** 109 * Produces a string representation of the method currently being serviced by this Handler. 110 * @param withParams toggle inclusion of parameters in the RPC String 111 * @return A human-readable string representation of the method call. 112 */ 113 @Override 114 public synchronized String getRPC(boolean withParams) { 115 if (getState() != State.RUNNING) { 116 // no RPC is currently running 117 return ""; 118 } 119 StringBuilder buffer = new StringBuilder(256); 120 buffer.append(methodName); 121 if (withParams) { 122 buffer.append("("); 123 for (int i = 0; i < params.length; i++) { 124 if (i != 0) buffer.append(", "); 125 buffer.append(params[i]); 126 } 127 buffer.append(")"); 128 } 129 return buffer.toString(); 130 } 131 132 /** 133 * Produces a string representation of the method currently being serviced by this Handler. 134 * @return A human-readable string representation of the method call. 135 */ 136 @Override 137 public long getRPCPacketLength() { 138 if (getState() != State.RUNNING || packet == null) { 139 // no RPC is currently running, or we don't have an RPC's packet info 140 return -1L; 141 } 142 return packet.getSerializedSize(); 143 } 144 145 /** 146 * If an RPC call is currently running, produces a String representation of the connection from 147 * which it was received. 148 * @return A human-readable string representation of the address and port of the client. 149 */ 150 @Override 151 public String getClient() { 152 return clientAddress + ":" + remotePort; 153 } 154 155 /** 156 * Indicates to the client whether this task is monitoring a currently active RPC call. 157 * @return true if the monitored handler is currently servicing an RPC call. 158 */ 159 @Override 160 public boolean isRPCRunning() { 161 return getState() == State.RUNNING; 162 } 163 164 /** 165 * Indicates to the client whether this task is monitoring a currently active RPC call to a 166 * database command. (as defined by o.a.h.h.client.Operation) 167 * @return true if the monitored handler is currently servicing an RPC call to a database command. 168 */ 169 @Override 170 public synchronized boolean isOperationRunning() { 171 if (!isRPCRunning()) { 172 return false; 173 } 174 for (Object param : params) { 175 if (param instanceof Operation) { 176 return true; 177 } 178 } 179 return false; 180 } 181 182 /** 183 * Tells this instance that it is monitoring a new RPC call. 184 * @param methodName The name of the method that will be called by the RPC. 185 * @param params The parameters that will be passed to the indicated method. 186 */ 187 @Override 188 public synchronized void setRPC(String methodName, Object[] params, long queueTime) { 189 this.methodName = methodName; 190 this.params = params; 191 long now = EnvironmentEdgeManager.currentTime(); 192 this.rpcStartTime = now; 193 setWarnTime(now); 194 this.rpcQueueTime = queueTime; 195 this.state = State.RUNNING; 196 } 197 198 /** 199 * Gives this instance a reference to the protobuf received by the RPC, so that it can later 200 * compute its size if asked for it. 201 * @param param The protobuf received by the RPC for this call 202 */ 203 @Override 204 public void setRPCPacket(Message param) { 205 this.packet = param; 206 } 207 208 /** 209 * Registers current handler client details. 210 * @param clientAddress the address of the current client 211 * @param remotePort the port from which the client connected 212 */ 213 @Override 214 public void setConnection(String clientAddress, int remotePort) { 215 this.clientAddress = clientAddress; 216 this.remotePort = remotePort; 217 } 218 219 @Override 220 public synchronized void markComplete(String status) { 221 super.markComplete(status); 222 this.params = null; 223 this.packet = null; 224 } 225 226 @Override 227 public synchronized Map<String, Object> toMap() { 228 return this.snapshot ? this.callInfoMap : generateCallInfoMap(); 229 } 230 231 private Map<String, Object> generateCallInfoMap() { 232 // only include RPC info if the Handler is actively servicing an RPC call 233 Map<String, Object> map = super.toMap(); 234 if (getState() != State.RUNNING) { 235 return map; 236 } 237 Map<String, Object> rpcJSON = new HashMap<>(); 238 ArrayList<Object> paramList = new ArrayList<>(); 239 map.put("rpcCall", rpcJSON); 240 rpcJSON.put("queuetimems", getRPCQueueTime()); 241 rpcJSON.put("starttimems", getRPCStartTime()); 242 rpcJSON.put("clientaddress", clientAddress); 243 rpcJSON.put("remoteport", remotePort); 244 rpcJSON.put("packetlength", getRPCPacketLength()); 245 rpcJSON.put("method", methodName); 246 rpcJSON.put("params", paramList); 247 for (Object param : params) { 248 if (param instanceof byte[]) { 249 paramList.add(Bytes.toStringBinary((byte[]) param)); 250 } else if (param instanceof Operation) { 251 paramList.add(((Operation) param).toMap()); 252 } else { 253 paramList.add(param.toString()); 254 } 255 } 256 return map; 257 } 258 259 @Override 260 public String toString() { 261 if (getState() != State.RUNNING) { 262 return super.toString(); 263 } 264 return super.toString() + ", queuetimems=" + getRPCQueueTime() + ", starttimems=" 265 + getRPCStartTime() + ", clientaddress=" + clientAddress + ", remoteport=" + remotePort 266 + ", packetlength=" + getRPCPacketLength() + ", rpcMethod=" + getRPC(); 267 } 268}