001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.LongAdder; 025import org.apache.hadoop.metrics2.MetricHistogram; 026import org.apache.hadoop.metrics2.MetricsCollector; 027import org.apache.hadoop.metrics2.MetricsRecordBuilder; 028import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 029import org.apache.hadoop.metrics2.lib.MutableFastCounter; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034@InterfaceAudience.Private 035public class MetricsUserSourceImpl implements MetricsUserSource { 036 private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class); 037 038 private final String userNamePrefix; 039 040 private final String user; 041 042 private final String userGetKey; 043 private final String userScanTimeKey; 044 private final String userPutKey; 045 private final String userDeleteKey; 046 private final String userIncrementKey; 047 private final String userAppendKey; 048 private final String userReplayKey; 049 private final String userBlockBytesScannedKey; 050 051 private MetricHistogram getHisto; 052 private MetricHistogram scanTimeHisto; 053 private MetricHistogram putHisto; 054 private MetricHistogram deleteHisto; 055 private MetricHistogram incrementHisto; 056 private MetricHistogram appendHisto; 057 private MetricHistogram replayHisto; 058 private MutableFastCounter blockBytesScannedCount; 059 060 private final int hashCode; 061 062 private AtomicBoolean closed = new AtomicBoolean(false); 063 private final DynamicMetricsRegistry registry; 064 065 private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap; 066 067 static class ClientMetricsImpl implements ClientMetrics { 068 private final String hostName; 069 final LongAdder readRequestsCount = new LongAdder(); 070 final LongAdder writeRequestsCount = new LongAdder(); 071 final LongAdder filteredRequestsCount = new LongAdder(); 072 private final String hostAddress; 073 private final String userName; 074 private final String clientVersion; 075 private final String serviceName; 076 077 public ClientMetricsImpl(String hostName, String hostAddress, String userName, 078 String clientVersion, String serviceName) { 079 this.hostName = hostName; 080 this.hostAddress = hostAddress != null ? hostAddress : "Unknown"; 081 this.userName = userName != null ? userName : "Unknown"; 082 this.clientVersion = clientVersion != null ? clientVersion : "Unknown"; 083 this.serviceName = serviceName != null ? serviceName : "Unknown"; 084 } 085 086 @Override 087 public void incrementReadRequest() { 088 readRequestsCount.increment(); 089 } 090 091 @Override 092 public void incrementWriteRequest() { 093 writeRequestsCount.increment(); 094 } 095 096 @Override 097 public String getHostName() { 098 return hostName; 099 } 100 101 @Override 102 public long getReadRequestsCount() { 103 return readRequestsCount.sum(); 104 } 105 106 @Override 107 public long getWriteRequestsCount() { 108 return writeRequestsCount.sum(); 109 } 110 111 @Override 112 public void incrementFilteredReadRequests() { 113 filteredRequestsCount.increment(); 114 115 } 116 117 @Override 118 public long getFilteredReadRequests() { 119 return filteredRequestsCount.sum(); 120 } 121 122 @Override 123 public String getHostAddress() { 124 return hostAddress; 125 } 126 127 @Override 128 public String getUserName() { 129 return userName; 130 } 131 132 @Override 133 public String getClientVersion() { 134 return clientVersion; 135 } 136 137 @Override 138 public String getServiceName() { 139 return serviceName; 140 } 141 } 142 143 public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { 144 if (LOG.isDebugEnabled()) { 145 LOG.debug("Creating new MetricsUserSourceImpl for user " + user); 146 } 147 148 this.user = user; 149 this.registry = agg.getMetricsRegistry(); 150 151 this.userNamePrefix = "User_" + user + "_metric_"; 152 153 hashCode = userNamePrefix.hashCode(); 154 155 userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY; 156 userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY; 157 userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY; 158 userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY; 159 userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; 160 userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; 161 userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; 162 userBlockBytesScannedKey = userNamePrefix + MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY; 163 clientMetricsMap = new ConcurrentHashMap<>(); 164 agg.register(this); 165 } 166 167 @Override 168 public void register() { 169 getHisto = registry.newTimeHistogram(userGetKey); 170 scanTimeHisto = registry.newTimeHistogram(userScanTimeKey); 171 putHisto = registry.newTimeHistogram(userPutKey); 172 deleteHisto = registry.newTimeHistogram(userDeleteKey); 173 incrementHisto = registry.newTimeHistogram(userIncrementKey); 174 appendHisto = registry.newTimeHistogram(userAppendKey); 175 replayHisto = registry.newTimeHistogram(userReplayKey); 176 blockBytesScannedCount = registry.newCounter(userBlockBytesScannedKey, "", 0); 177 } 178 179 @Override 180 public void deregister() { 181 boolean wasClosed = closed.getAndSet(true); 182 183 // Has someone else already closed this for us? 184 if (wasClosed) { 185 return; 186 } 187 188 if (LOG.isDebugEnabled()) { 189 LOG.debug("Removing user Metrics for user: " + user); 190 } 191 192 registry.removeMetric(userGetKey); 193 registry.removeMetric(userScanTimeKey); 194 registry.removeMetric(userPutKey); 195 registry.removeMetric(userDeleteKey); 196 registry.removeMetric(userIncrementKey); 197 registry.removeMetric(userAppendKey); 198 registry.removeMetric(userReplayKey); 199 registry.removeMetric(userBlockBytesScannedKey); 200 } 201 202 @Override 203 public String getUser() { 204 return user; 205 } 206 207 @Override 208 public int compareTo(MetricsUserSource source) { 209 if (source == null) { 210 return -1; 211 } 212 if (!(source instanceof MetricsUserSourceImpl)) { 213 return -1; 214 } 215 216 MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source; 217 218 return Long.compare(hashCode, impl.hashCode); 219 } 220 221 @Override 222 public int hashCode() { 223 return hashCode; 224 } 225 226 @Override 227 public boolean equals(Object obj) { 228 return obj == this 229 || (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0); 230 } 231 232 void snapshot(MetricsRecordBuilder mrb, boolean ignored) { 233 // If there is a close that started be double extra sure 234 // that we're not getting any locks and not putting data 235 // into the metrics that should be removed. So early out 236 // before even getting the lock. 237 if (closed.get()) { 238 return; 239 } 240 241 // Grab the read 242 // This ensures that removes of the metrics 243 // can't happen while we are putting them back in. 244 synchronized (this) { 245 246 // It's possible that a close happened between checking 247 // the closed variable and getting the lock. 248 if (closed.get()) { 249 return; 250 } 251 } 252 } 253 254 @Override 255 public void updatePut(long t) { 256 putHisto.add(t); 257 } 258 259 @Override 260 public void updateDelete(long t) { 261 deleteHisto.add(t); 262 } 263 264 @Override 265 public void updateGet(long time, long blockBytesScanned) { 266 getHisto.add(time); 267 blockBytesScannedCount.incr(blockBytesScanned); 268 } 269 270 @Override 271 public void updateIncrement(long time, long blockBytesScanned) { 272 incrementHisto.add(time); 273 blockBytesScannedCount.incr(blockBytesScanned); 274 } 275 276 @Override 277 public void updateAppend(long time, long blockBytesScanned) { 278 appendHisto.add(time); 279 blockBytesScannedCount.incr(blockBytesScanned); 280 } 281 282 @Override 283 public void updateReplay(long t) { 284 replayHisto.add(t); 285 } 286 287 @Override 288 public void updateScan(long time, long blockBytesScanned) { 289 scanTimeHisto.add(time); 290 blockBytesScannedCount.incr(blockBytesScanned); 291 } 292 293 @Override 294 public void updateCheckAndMutate(long blockBytesScanned) { 295 blockBytesScannedCount.incr(blockBytesScanned); 296 } 297 298 @Override 299 public void getMetrics(MetricsCollector metricsCollector, boolean all) { 300 MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix); 301 registry.snapshot(mrb, all); 302 } 303 304 @Override 305 public Map<String, ClientMetrics> getClientMetrics() { 306 return Collections.unmodifiableMap(clientMetricsMap); 307 } 308 309 @Override 310 public ClientMetrics getOrCreateMetricsClient(String client, String hostAddress, String userName, 311 String clientVersion, String serviceName) { 312 ClientMetrics source = clientMetricsMap.get(client); 313 if (source != null) { 314 return source; 315 } 316 source = new ClientMetricsImpl(client, hostAddress, userName, clientVersion, serviceName); 317 ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source); 318 if (prev != null) { 319 return prev; 320 } 321 return source; 322 } 323 324}