001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.LongAdder; 025import org.apache.hadoop.metrics2.MetricHistogram; 026import org.apache.hadoop.metrics2.MetricsCollector; 027import org.apache.hadoop.metrics2.MetricsRecordBuilder; 028import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 029import org.apache.hadoop.metrics2.lib.MutableFastCounter; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034@InterfaceAudience.Private 035public class MetricsUserSourceImpl implements MetricsUserSource { 036 private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class); 037 038 private final String userNamePrefix; 039 040 private final String user; 041 042 private final String userGetKey; 043 private final String userScanTimeKey; 044 private final String userPutKey; 045 private final String userDeleteKey; 046 private final String userIncrementKey; 047 private final String userAppendKey; 048 private final String userReplayKey; 049 private final String userBlockBytesScannedKey; 050 051 private MetricHistogram getHisto; 052 private MetricHistogram scanTimeHisto; 053 private MetricHistogram putHisto; 054 private MetricHistogram deleteHisto; 055 private MetricHistogram incrementHisto; 056 private MetricHistogram appendHisto; 057 private MetricHistogram replayHisto; 058 private MutableFastCounter blockBytesScannedCount; 059 060 private final int hashCode; 061 062 private AtomicBoolean closed = new AtomicBoolean(false); 063 private final DynamicMetricsRegistry registry; 064 065 private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap; 066 067 static class ClientMetricsImpl implements ClientMetrics { 068 private final String hostName; 069 final LongAdder readRequestsCount = new LongAdder(); 070 final LongAdder writeRequestsCount = new LongAdder(); 071 final LongAdder filteredRequestsCount = new LongAdder(); 072 073 public ClientMetricsImpl(String hostName) { 074 this.hostName = hostName; 075 } 076 077 @Override 078 public void incrementReadRequest() { 079 readRequestsCount.increment(); 080 } 081 082 @Override 083 public void incrementWriteRequest() { 084 writeRequestsCount.increment(); 085 } 086 087 @Override 088 public String getHostName() { 089 return hostName; 090 } 091 092 @Override 093 public long getReadRequestsCount() { 094 return readRequestsCount.sum(); 095 } 096 097 @Override 098 public long getWriteRequestsCount() { 099 return writeRequestsCount.sum(); 100 } 101 102 @Override 103 public void incrementFilteredReadRequests() { 104 filteredRequestsCount.increment(); 105 106 } 107 108 @Override 109 public long getFilteredReadRequests() { 110 return filteredRequestsCount.sum(); 111 } 112 } 113 114 public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { 115 if (LOG.isDebugEnabled()) { 116 LOG.debug("Creating new MetricsUserSourceImpl for user " + user); 117 } 118 119 this.user = user; 120 this.registry = agg.getMetricsRegistry(); 121 122 this.userNamePrefix = "User_" + user + "_metric_"; 123 124 hashCode = userNamePrefix.hashCode(); 125 126 userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY; 127 userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY; 128 userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY; 129 userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY; 130 userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; 131 userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; 132 userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; 133 userBlockBytesScannedKey = userNamePrefix + MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY; 134 clientMetricsMap = new ConcurrentHashMap<>(); 135 agg.register(this); 136 } 137 138 @Override 139 public void register() { 140 getHisto = registry.newTimeHistogram(userGetKey); 141 scanTimeHisto = registry.newTimeHistogram(userScanTimeKey); 142 putHisto = registry.newTimeHistogram(userPutKey); 143 deleteHisto = registry.newTimeHistogram(userDeleteKey); 144 incrementHisto = registry.newTimeHistogram(userIncrementKey); 145 appendHisto = registry.newTimeHistogram(userAppendKey); 146 replayHisto = registry.newTimeHistogram(userReplayKey); 147 blockBytesScannedCount = registry.newCounter(userBlockBytesScannedKey, "", 0); 148 } 149 150 @Override 151 public void deregister() { 152 boolean wasClosed = closed.getAndSet(true); 153 154 // Has someone else already closed this for us? 155 if (wasClosed) { 156 return; 157 } 158 159 if (LOG.isDebugEnabled()) { 160 LOG.debug("Removing user Metrics for user: " + user); 161 } 162 163 registry.removeMetric(userGetKey); 164 registry.removeMetric(userScanTimeKey); 165 registry.removeMetric(userPutKey); 166 registry.removeMetric(userDeleteKey); 167 registry.removeMetric(userIncrementKey); 168 registry.removeMetric(userAppendKey); 169 registry.removeMetric(userReplayKey); 170 registry.removeMetric(userBlockBytesScannedKey); 171 } 172 173 @Override 174 public String getUser() { 175 return user; 176 } 177 178 @Override 179 public int compareTo(MetricsUserSource source) { 180 if (source == null) { 181 return -1; 182 } 183 if (!(source instanceof MetricsUserSourceImpl)) { 184 return -1; 185 } 186 187 MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source; 188 189 return Long.compare(hashCode, impl.hashCode); 190 } 191 192 @Override 193 public int hashCode() { 194 return hashCode; 195 } 196 197 @Override 198 public boolean equals(Object obj) { 199 return obj == this 200 || (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0); 201 } 202 203 void snapshot(MetricsRecordBuilder mrb, boolean ignored) { 204 // If there is a close that started be double extra sure 205 // that we're not getting any locks and not putting data 206 // into the metrics that should be removed. So early out 207 // before even getting the lock. 208 if (closed.get()) { 209 return; 210 } 211 212 // Grab the read 213 // This ensures that removes of the metrics 214 // can't happen while we are putting them back in. 215 synchronized (this) { 216 217 // It's possible that a close happened between checking 218 // the closed variable and getting the lock. 219 if (closed.get()) { 220 return; 221 } 222 } 223 } 224 225 @Override 226 public void updatePut(long t) { 227 putHisto.add(t); 228 } 229 230 @Override 231 public void updateDelete(long t) { 232 deleteHisto.add(t); 233 } 234 235 @Override 236 public void updateGet(long time, long blockBytesScanned) { 237 getHisto.add(time); 238 blockBytesScannedCount.incr(blockBytesScanned); 239 } 240 241 @Override 242 public void updateIncrement(long time, long blockBytesScanned) { 243 incrementHisto.add(time); 244 blockBytesScannedCount.incr(blockBytesScanned); 245 } 246 247 @Override 248 public void updateAppend(long time, long blockBytesScanned) { 249 appendHisto.add(time); 250 blockBytesScannedCount.incr(blockBytesScanned); 251 } 252 253 @Override 254 public void updateReplay(long t) { 255 replayHisto.add(t); 256 } 257 258 @Override 259 public void updateScan(long time, long blockBytesScanned) { 260 scanTimeHisto.add(time); 261 blockBytesScannedCount.incr(blockBytesScanned); 262 } 263 264 @Override 265 public void updateCheckAndMutate(long blockBytesScanned) { 266 blockBytesScannedCount.incr(blockBytesScanned); 267 } 268 269 @Override 270 public void getMetrics(MetricsCollector metricsCollector, boolean all) { 271 MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix); 272 registry.snapshot(mrb, all); 273 } 274 275 @Override 276 public Map<String, ClientMetrics> getClientMetrics() { 277 return Collections.unmodifiableMap(clientMetricsMap); 278 } 279 280 @Override 281 public ClientMetrics getOrCreateMetricsClient(String client) { 282 ClientMetrics source = clientMetricsMap.get(client); 283 if (source != null) { 284 return source; 285 } 286 source = new ClientMetricsImpl(client); 287 ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source); 288 if (prev != null) { 289 return prev; 290 } 291 return source; 292 } 293 294}