001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.util.Optional;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
025import org.apache.hadoop.hbase.ipc.RpcServer;
026import org.apache.hadoop.hbase.security.User;
027import org.apache.hadoop.hbase.security.UserProvider;
028import org.apache.hadoop.hbase.util.LossyCounting;
029import org.apache.yetus.audience.InterfaceAudience;
030
031@InterfaceAudience.Private
032public class MetricsUserAggregateImpl implements MetricsUserAggregate {
033
034  /** Provider for mapping principal names to Users */
035  private final UserProvider userProvider;
036
037  private final MetricsUserAggregateSource source;
038  private final LossyCounting<MetricsUserSource> userMetricLossyCounting;
039
040  public MetricsUserAggregateImpl(Configuration conf) {
041    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
042      .getUserAggregate();
043    userMetricLossyCounting = new LossyCounting<>("userMetrics", conf, source::deregister);
044    this.userProvider = UserProvider.instantiate(conf);
045  }
046
047  /**
048   * Returns the active user to which authorization checks should be applied. If we are in the
049   * context of an RPC call, the remote user is used, otherwise the currently logged in user is
050   * used.
051   */
052  private String getActiveUser() {
053    Optional<User> user = RpcServer.getRequestUser();
054    if (!user.isPresent()) {
055      // for non-rpc handling, fallback to system user
056      try {
057        user = Optional.of(userProvider.getCurrent());
058      } catch (IOException ignore) {
059      }
060    }
061    return user.map(User::getShortName).orElse(null);
062  }
063
064  @Override
065  public MetricsUserAggregateSource getSource() {
066    return source;
067  }
068
069  @Override
070  public void updatePut(long t) {
071    String user = getActiveUser();
072    if (user != null) {
073      MetricsUserSource userSource = getOrCreateMetricsUser(user);
074      userSource.updatePut(t);
075      incrementClientWriteMetrics(userSource);
076    }
077
078  }
079
080  private String getClient() {
081    Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
082    return ipOptional.map(InetAddress::getHostName).orElse(null);
083  }
084
085  private void incrementClientReadMetrics(MetricsUserSource userSource) {
086    String client = getClient();
087    if (client != null && userSource != null) {
088      userSource.getOrCreateMetricsClient(client).incrementReadRequest();
089    }
090  }
091
092  private void incrementFilteredReadRequests(MetricsUserSource userSource) {
093    String client = getClient();
094    if (client != null && userSource != null) {
095      userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests();
096    }
097  }
098
099  private void incrementClientWriteMetrics(MetricsUserSource userSource) {
100    String client = getClient();
101    if (client != null && userSource != null) {
102      userSource.getOrCreateMetricsClient(client).incrementWriteRequest();
103    }
104  }
105
106  @Override
107  public void updateDelete(long t) {
108    String user = getActiveUser();
109    if (user != null) {
110      MetricsUserSource userSource = getOrCreateMetricsUser(user);
111      userSource.updateDelete(t);
112      incrementClientWriteMetrics(userSource);
113    }
114  }
115
116  @Override
117  public void updateGet(long time, long blockBytesScanned) {
118    String user = getActiveUser();
119    if (user != null) {
120      MetricsUserSource userSource = getOrCreateMetricsUser(user);
121      userSource.updateGet(time, blockBytesScanned);
122    }
123  }
124
125  @Override
126  public void updateIncrement(long time, long blockBytesScanned) {
127    String user = getActiveUser();
128    if (user != null) {
129      MetricsUserSource userSource = getOrCreateMetricsUser(user);
130      userSource.updateIncrement(time, blockBytesScanned);
131      incrementClientWriteMetrics(userSource);
132    }
133  }
134
135  @Override
136  public void updateAppend(long time, long blockBytesScanned) {
137    String user = getActiveUser();
138    if (user != null) {
139      MetricsUserSource userSource = getOrCreateMetricsUser(user);
140      userSource.updateAppend(time, blockBytesScanned);
141      incrementClientWriteMetrics(userSource);
142    }
143  }
144
145  @Override
146  public void updateReplay(long t) {
147    String user = getActiveUser();
148    if (user != null) {
149      MetricsUserSource userSource = getOrCreateMetricsUser(user);
150      userSource.updateReplay(t);
151      incrementClientWriteMetrics(userSource);
152    }
153  }
154
155  @Override
156  public void updateScan(long time, long blockBytesScanned) {
157    String user = getActiveUser();
158    if (user != null) {
159      MetricsUserSource userSource = getOrCreateMetricsUser(user);
160      userSource.updateScan(time, blockBytesScanned);
161    }
162  }
163
164  @Override
165  public void updateCheckAndMutate(long blockBytesScanned) {
166    String user = getActiveUser();
167    if (user != null) {
168      MetricsUserSource userSource = getOrCreateMetricsUser(user);
169      userSource.updateCheckAndMutate(blockBytesScanned);
170    }
171  }
172
173  @Override
174  public void updateFilteredReadRequests() {
175    String user = getActiveUser();
176    if (user != null) {
177      MetricsUserSource userSource = getOrCreateMetricsUser(user);
178      incrementFilteredReadRequests(userSource);
179    }
180  }
181
182  @Override
183  public void updateReadRequestCount() {
184    String user = getActiveUser();
185    if (user != null) {
186      MetricsUserSource userSource = getOrCreateMetricsUser(user);
187      incrementClientReadMetrics(userSource);
188    }
189  }
190
191  private MetricsUserSource getOrCreateMetricsUser(String user) {
192    MetricsUserSource userSource = source.getOrCreateMetricsUser(user);
193    userMetricLossyCounting.add(userSource);
194    return userSource;
195  }
196}