001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Collections;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.LongAdder;
025import org.apache.hadoop.metrics2.MetricHistogram;
026import org.apache.hadoop.metrics2.MetricsCollector;
027import org.apache.hadoop.metrics2.MetricsRecordBuilder;
028import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
029import org.apache.hadoop.metrics2.lib.MutableFastCounter;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034@InterfaceAudience.Private
035public class MetricsUserSourceImpl implements MetricsUserSource {
036  private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class);
037
038  private final String userNamePrefix;
039
040  private final String user;
041
042  private final String userGetKey;
043  private final String userScanTimeKey;
044  private final String userPutKey;
045  private final String userDeleteKey;
046  private final String userIncrementKey;
047  private final String userAppendKey;
048  private final String userReplayKey;
049  private final String userBlockBytesScannedKey;
050
051  private MetricHistogram getHisto;
052  private MetricHistogram scanTimeHisto;
053  private MetricHistogram putHisto;
054  private MetricHistogram deleteHisto;
055  private MetricHistogram incrementHisto;
056  private MetricHistogram appendHisto;
057  private MetricHistogram replayHisto;
058  private MutableFastCounter blockBytesScannedCount;
059
060  private final int hashCode;
061
062  private AtomicBoolean closed = new AtomicBoolean(false);
063  private final DynamicMetricsRegistry registry;
064
065  private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
066
067  static class ClientMetricsImpl implements ClientMetrics {
068    private final String hostName;
069    final LongAdder readRequestsCount = new LongAdder();
070    final LongAdder writeRequestsCount = new LongAdder();
071    final LongAdder filteredRequestsCount = new LongAdder();
072
073    public ClientMetricsImpl(String hostName) {
074      this.hostName = hostName;
075    }
076
077    @Override
078    public void incrementReadRequest() {
079      readRequestsCount.increment();
080    }
081
082    @Override
083    public void incrementWriteRequest() {
084      writeRequestsCount.increment();
085    }
086
087    @Override
088    public String getHostName() {
089      return hostName;
090    }
091
092    @Override
093    public long getReadRequestsCount() {
094      return readRequestsCount.sum();
095    }
096
097    @Override
098    public long getWriteRequestsCount() {
099      return writeRequestsCount.sum();
100    }
101
102    @Override
103    public void incrementFilteredReadRequests() {
104      filteredRequestsCount.increment();
105
106    }
107
108    @Override
109    public long getFilteredReadRequests() {
110      return filteredRequestsCount.sum();
111    }
112  }
113
114  public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
115    if (LOG.isDebugEnabled()) {
116      LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
117    }
118
119    this.user = user;
120    this.registry = agg.getMetricsRegistry();
121
122    this.userNamePrefix = "User_" + user + "_metric_";
123
124    hashCode = userNamePrefix.hashCode();
125
126    userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
127    userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
128    userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY;
129    userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
130    userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
131    userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
132    userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
133    userBlockBytesScannedKey = userNamePrefix + MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY;
134    clientMetricsMap = new ConcurrentHashMap<>();
135    agg.register(this);
136  }
137
138  @Override
139  public void register() {
140    getHisto = registry.newTimeHistogram(userGetKey);
141    scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
142    putHisto = registry.newTimeHistogram(userPutKey);
143    deleteHisto = registry.newTimeHistogram(userDeleteKey);
144    incrementHisto = registry.newTimeHistogram(userIncrementKey);
145    appendHisto = registry.newTimeHistogram(userAppendKey);
146    replayHisto = registry.newTimeHistogram(userReplayKey);
147    blockBytesScannedCount = registry.newCounter(userBlockBytesScannedKey, "", 0);
148  }
149
150  @Override
151  public void deregister() {
152    boolean wasClosed = closed.getAndSet(true);
153
154    // Has someone else already closed this for us?
155    if (wasClosed) {
156      return;
157    }
158
159    if (LOG.isDebugEnabled()) {
160      LOG.debug("Removing user Metrics for user: " + user);
161    }
162
163    registry.removeMetric(userGetKey);
164    registry.removeMetric(userScanTimeKey);
165    registry.removeMetric(userPutKey);
166    registry.removeMetric(userDeleteKey);
167    registry.removeMetric(userIncrementKey);
168    registry.removeMetric(userAppendKey);
169    registry.removeMetric(userReplayKey);
170    registry.removeMetric(userBlockBytesScannedKey);
171  }
172
173  @Override
174  public String getUser() {
175    return user;
176  }
177
178  @Override
179  public int compareTo(MetricsUserSource source) {
180    if (source == null) {
181      return -1;
182    }
183    if (!(source instanceof MetricsUserSourceImpl)) {
184      return -1;
185    }
186
187    MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
188
189    return Long.compare(hashCode, impl.hashCode);
190  }
191
192  @Override
193  public int hashCode() {
194    return hashCode;
195  }
196
197  @Override
198  public boolean equals(Object obj) {
199    return obj == this
200      || (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
201  }
202
203  void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
204    // If there is a close that started be double extra sure
205    // that we're not getting any locks and not putting data
206    // into the metrics that should be removed. So early out
207    // before even getting the lock.
208    if (closed.get()) {
209      return;
210    }
211
212    // Grab the read
213    // This ensures that removes of the metrics
214    // can't happen while we are putting them back in.
215    synchronized (this) {
216
217      // It's possible that a close happened between checking
218      // the closed variable and getting the lock.
219      if (closed.get()) {
220        return;
221      }
222    }
223  }
224
225  @Override
226  public void updatePut(long t) {
227    putHisto.add(t);
228  }
229
230  @Override
231  public void updateDelete(long t) {
232    deleteHisto.add(t);
233  }
234
235  @Override
236  public void updateGet(long time, long blockBytesScanned) {
237    getHisto.add(time);
238    blockBytesScannedCount.incr(blockBytesScanned);
239  }
240
241  @Override
242  public void updateIncrement(long time, long blockBytesScanned) {
243    incrementHisto.add(time);
244    blockBytesScannedCount.incr(blockBytesScanned);
245  }
246
247  @Override
248  public void updateAppend(long time, long blockBytesScanned) {
249    appendHisto.add(time);
250    blockBytesScannedCount.incr(blockBytesScanned);
251  }
252
253  @Override
254  public void updateReplay(long t) {
255    replayHisto.add(t);
256  }
257
258  @Override
259  public void updateScan(long time, long blockBytesScanned) {
260    scanTimeHisto.add(time);
261    blockBytesScannedCount.incr(blockBytesScanned);
262  }
263
264  @Override
265  public void updateCheckAndMutate(long blockBytesScanned) {
266    blockBytesScannedCount.incr(blockBytesScanned);
267  }
268
269  @Override
270  public void getMetrics(MetricsCollector metricsCollector, boolean all) {
271    MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
272    registry.snapshot(mrb, all);
273  }
274
275  @Override
276  public Map<String, ClientMetrics> getClientMetrics() {
277    return Collections.unmodifiableMap(clientMetricsMap);
278  }
279
280  @Override
281  public ClientMetrics getOrCreateMetricsClient(String client) {
282    ClientMetrics source = clientMetricsMap.get(client);
283    if (source != null) {
284      return source;
285    }
286    source = new ClientMetricsImpl(client);
287    ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
288    if (prev != null) {
289      return prev;
290    }
291    return source;
292  }
293
294}