001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.Collections;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.LongAdder;
026
027import org.apache.hadoop.metrics2.MetricHistogram;
028import org.apache.hadoop.metrics2.MetricsCollector;
029import org.apache.hadoop.metrics2.MetricsRecordBuilder;
030import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@InterfaceAudience.Private
036public class MetricsUserSourceImpl implements MetricsUserSource {
037  private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class);
038
039  private final String userNamePrefix;
040
041  private final String user;
042
043  private final String userGetKey;
044  private final String userScanTimeKey;
045  private final String userPutKey;
046  private final String userDeleteKey;
047  private final String userIncrementKey;
048  private final String userAppendKey;
049  private final String userReplayKey;
050
051  private MetricHistogram getHisto;
052  private MetricHistogram scanTimeHisto;
053  private MetricHistogram putHisto;
054  private MetricHistogram deleteHisto;
055  private MetricHistogram incrementHisto;
056  private MetricHistogram appendHisto;
057  private MetricHistogram replayHisto;
058
059  private final int hashCode;
060
061  private AtomicBoolean closed = new AtomicBoolean(false);
062  private final MetricsUserAggregateSourceImpl agg;
063  private final DynamicMetricsRegistry registry;
064
065  private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
066
067  static class ClientMetricsImpl implements ClientMetrics {
068    private final String hostName;
069    final LongAdder readRequestsCount = new LongAdder();
070    final LongAdder writeRequestsCount = new LongAdder();
071    final LongAdder filteredRequestsCount = new LongAdder();
072
073    public ClientMetricsImpl(String hostName) {
074      this.hostName = hostName;
075    }
076
077    @Override public void incrementReadRequest() {
078      readRequestsCount.increment();
079    }
080
081    @Override public void incrementWriteRequest() {
082      writeRequestsCount.increment();
083    }
084
085    @Override public String getHostName() {
086      return hostName;
087    }
088
089    @Override public long getReadRequestsCount() {
090      return readRequestsCount.sum();
091    }
092
093    @Override public long getWriteRequestsCount() {
094      return writeRequestsCount.sum();
095    }
096
097    @Override public void incrementFilteredReadRequests() {
098      filteredRequestsCount.increment();
099
100    }
101
102    @Override public long getFilteredReadRequests() {
103      return filteredRequestsCount.sum();
104    }
105  }
106
107  public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
108    if (LOG.isDebugEnabled()) {
109      LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
110    }
111
112    this.user = user;
113    this.agg = agg;
114    this.registry = agg.getMetricsRegistry();
115
116    this.userNamePrefix = "user_" + user + "_metric_";
117
118    hashCode = userNamePrefix.hashCode();
119
120    userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
121    userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
122    userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY;
123    userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
124    userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
125    userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
126    userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
127    clientMetricsMap = new ConcurrentHashMap<>();
128    agg.register(this);
129  }
130
131  @Override
132  public void register() {
133    synchronized (this) {
134      getHisto = registry.newTimeHistogram(userGetKey);
135      scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
136      putHisto = registry.newTimeHistogram(userPutKey);
137      deleteHisto = registry.newTimeHistogram(userDeleteKey);
138      incrementHisto = registry.newTimeHistogram(userIncrementKey);
139      appendHisto = registry.newTimeHistogram(userAppendKey);
140      replayHisto = registry.newTimeHistogram(userReplayKey);
141    }
142  }
143
144  @Override
145  public void deregister() {
146    boolean wasClosed = closed.getAndSet(true);
147
148    // Has someone else already closed this for us?
149    if (wasClosed) {
150      return;
151    }
152
153    if (LOG.isDebugEnabled()) {
154      LOG.debug("Removing user Metrics for user: " + user);
155    }
156
157    synchronized (this) {
158      registry.removeMetric(userGetKey);
159      registry.removeMetric(userScanTimeKey);
160      registry.removeMetric(userPutKey);
161      registry.removeMetric(userDeleteKey);
162      registry.removeMetric(userIncrementKey);
163      registry.removeMetric(userAppendKey);
164      registry.removeMetric(userReplayKey);
165    }
166  }
167
168  @Override
169  public String getUser() {
170    return user;
171  }
172
173  @Override
174  public int compareTo(MetricsUserSource source) {
175    if (source == null) {
176      return -1;
177    }
178    if (!(source instanceof MetricsUserSourceImpl)) {
179      return -1;
180    }
181
182    MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
183
184    return Long.compare(hashCode, impl.hashCode);
185  }
186
187  @Override
188  public int hashCode() {
189    return hashCode;
190  }
191
192  @Override
193  public boolean equals(Object obj) {
194    return obj == this ||
195        (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
196  }
197
198  void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
199    // If there is a close that started be double extra sure
200    // that we're not getting any locks and not putting data
201    // into the metrics that should be removed. So early out
202    // before even getting the lock.
203    if (closed.get()) {
204      return;
205    }
206
207    // Grab the read
208    // This ensures that removes of the metrics
209    // can't happen while we are putting them back in.
210    synchronized (this) {
211
212      // It's possible that a close happened between checking
213      // the closed variable and getting the lock.
214      if (closed.get()) {
215        return;
216      }
217    }
218  }
219
220  @Override
221  public void updatePut(long t) {
222    putHisto.add(t);
223  }
224
225  @Override
226  public void updateDelete(long t) {
227    deleteHisto.add(t);
228  }
229
230  @Override
231  public void updateGet(long t) {
232    getHisto.add(t);
233  }
234
235  @Override
236  public void updateIncrement(long t) {
237    incrementHisto.add(t);
238  }
239
240  @Override
241  public void updateAppend(long t) {
242    appendHisto.add(t);
243  }
244
245  @Override
246  public void updateReplay(long t) {
247    replayHisto.add(t);
248  }
249
250  @Override
251  public void updateScanTime(long t) {
252    scanTimeHisto.add(t);
253  }
254
255  @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) {
256    MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
257    registry.snapshot(mrb, all);
258  }
259
260  @Override public Map<String, ClientMetrics> getClientMetrics() {
261    return Collections.unmodifiableMap(clientMetricsMap);
262  }
263
264  @Override public ClientMetrics getOrCreateMetricsClient(String client) {
265    ClientMetrics source = clientMetricsMap.get(client);
266    if (source != null) {
267      return source;
268    }
269    source = new ClientMetricsImpl(client);
270    ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
271    if (prev != null) {
272      return prev;
273    }
274    return source;
275  }
276
277}