001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.util.Optional;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
025import org.apache.hadoop.hbase.ipc.RpcServer;
026import org.apache.hadoop.hbase.ipc.ServerRpcConnection;
027import org.apache.hadoop.hbase.security.User;
028import org.apache.hadoop.hbase.security.UserProvider;
029import org.apache.hadoop.hbase.util.LossyCounting;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
033
034@InterfaceAudience.Private
035public class MetricsUserAggregateImpl implements MetricsUserAggregate {
036
037  /** Provider for mapping principal names to Users */
038  private final UserProvider userProvider;
039
040  private final MetricsUserAggregateSource source;
041  private final LossyCounting<MetricsUserSource> userMetricLossyCounting;
042
043  public MetricsUserAggregateImpl(Configuration conf) {
044    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
045      .getUserAggregate();
046    userMetricLossyCounting = new LossyCounting<>("userMetrics", conf, source::deregister);
047    this.userProvider = UserProvider.instantiate(conf);
048  }
049
050  /**
051   * Returns the active user to which authorization checks should be applied. If we are in the
052   * context of an RPC call, the remote user is used, otherwise the currently logged in user is
053   * used.
054   */
055  private String getActiveUser() {
056    Optional<User> user = RpcServer.getRequestUser();
057    if (!user.isPresent()) {
058      // for non-rpc handling, fallback to system user
059      try {
060        user = Optional.of(userProvider.getCurrent());
061      } catch (IOException ignore) {
062      }
063    }
064    return user.map(User::getShortName).orElse(null);
065  }
066
067  @Override
068  public MetricsUserAggregateSource getSource() {
069    return source;
070  }
071
072  @Override
073  public void updatePut(long t) {
074    String user = getActiveUser();
075    if (user != null) {
076      MetricsUserSource userSource = getOrCreateMetricsUser(user);
077      userSource.updatePut(t);
078      incrementClientWriteMetrics(userSource);
079    }
080
081  }
082
083  private String getClient() {
084    Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
085    return ipOptional.map(InetAddress::getHostName).orElse(null);
086  }
087
088  private void incrementClientReadMetrics(MetricsUserSource userSource) {
089    ClientConnectionContext ctx = getClientConnectionContext();
090    String client = getClient();
091    if (client != null && userSource != null) {
092      userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion,
093        ctx.serviceName).incrementReadRequest();
094    }
095  }
096
097  private void incrementFilteredReadRequests(MetricsUserSource userSource) {
098    ClientConnectionContext ctx = getClientConnectionContext();
099    String client = getClient();
100    if (client != null && userSource != null) {
101      userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion,
102        ctx.serviceName).incrementFilteredReadRequests();
103    }
104  }
105
106  private void incrementClientWriteMetrics(MetricsUserSource userSource) {
107    ClientConnectionContext ctx = getClientConnectionContext();
108    String client = getClient();
109    if (client != null && userSource != null) {
110      userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion,
111        ctx.serviceName).incrementWriteRequest();
112    }
113  }
114
115  @Override
116  public void updateDelete(long t) {
117    String user = getActiveUser();
118    if (user != null) {
119      MetricsUserSource userSource = getOrCreateMetricsUser(user);
120      userSource.updateDelete(t);
121      incrementClientWriteMetrics(userSource);
122    }
123  }
124
125  @Override
126  public void updateGet(long time, long blockBytesScanned) {
127    String user = getActiveUser();
128    if (user != null) {
129      MetricsUserSource userSource = getOrCreateMetricsUser(user);
130      userSource.updateGet(time, blockBytesScanned);
131    }
132  }
133
134  @Override
135  public void updateIncrement(long time, long blockBytesScanned) {
136    String user = getActiveUser();
137    if (user != null) {
138      MetricsUserSource userSource = getOrCreateMetricsUser(user);
139      userSource.updateIncrement(time, blockBytesScanned);
140      incrementClientWriteMetrics(userSource);
141    }
142  }
143
144  @Override
145  public void updateAppend(long time, long blockBytesScanned) {
146    String user = getActiveUser();
147    if (user != null) {
148      MetricsUserSource userSource = getOrCreateMetricsUser(user);
149      userSource.updateAppend(time, blockBytesScanned);
150      incrementClientWriteMetrics(userSource);
151    }
152  }
153
154  @Override
155  public void updateReplay(long t) {
156    String user = getActiveUser();
157    if (user != null) {
158      MetricsUserSource userSource = getOrCreateMetricsUser(user);
159      userSource.updateReplay(t);
160      incrementClientWriteMetrics(userSource);
161    }
162  }
163
164  @Override
165  public void updateScan(long time, long blockBytesScanned) {
166    String user = getActiveUser();
167    if (user != null) {
168      MetricsUserSource userSource = getOrCreateMetricsUser(user);
169      userSource.updateScan(time, blockBytesScanned);
170    }
171  }
172
173  @Override
174  public void updateCheckAndMutate(long blockBytesScanned) {
175    String user = getActiveUser();
176    if (user != null) {
177      MetricsUserSource userSource = getOrCreateMetricsUser(user);
178      userSource.updateCheckAndMutate(blockBytesScanned);
179    }
180  }
181
182  @Override
183  public void updateFilteredReadRequests() {
184    String user = getActiveUser();
185    if (user != null) {
186      MetricsUserSource userSource = getOrCreateMetricsUser(user);
187      incrementFilteredReadRequests(userSource);
188    }
189  }
190
191  @Override
192  public void updateReadRequestCount() {
193    String user = getActiveUser();
194    if (user != null) {
195      MetricsUserSource userSource = getOrCreateMetricsUser(user);
196      incrementClientReadMetrics(userSource);
197    }
198  }
199
200  private MetricsUserSource getOrCreateMetricsUser(String user) {
201    MetricsUserSource userSource = source.getOrCreateMetricsUser(user);
202    userMetricLossyCounting.add(userSource);
203    return userSource;
204  }
205
206  private ClientConnectionContext getClientConnectionContext() {
207    String hostAddress = null;
208    String userName = "Unknown";
209    String clientVersion = "Unknown";
210    String serviceName = "Unknown";
211
212    Optional<ServerRpcConnection> rpcConnectionOptional = RpcServer.getCurrentServerRpcConnection();
213    if (rpcConnectionOptional.isPresent()) {
214      ServerRpcConnection rpcConnection = rpcConnectionOptional.get();
215      hostAddress = rpcConnection.getHostAddress();
216      RPCProtos.ConnectionHeader connectionHeader = rpcConnection.getConnectionHeader();
217      if (connectionHeader.hasUserInfo()) {
218        RPCProtos.UserInformation userInfoProto = connectionHeader.getUserInfo();
219        if (userInfoProto.hasEffectiveUser()) {
220          userName = userInfoProto.getEffectiveUser();
221        }
222      }
223      if (connectionHeader.hasVersionInfo()) {
224        clientVersion = connectionHeader.getVersionInfo().getVersion();
225      }
226      if (connectionHeader.hasServiceName()) {
227        serviceName = connectionHeader.getServiceName();
228      }
229    }
230    return new ClientConnectionContext(hostAddress, userName, clientVersion, serviceName);
231  }
232
233  private static class ClientConnectionContext {
234    final String hostAddress;
235    final String userName;
236    final String clientVersion;
237    final String serviceName;
238
239    ClientConnectionContext(String hostAddress, String userName, String clientVersion,
240      String serviceName) {
241      this.hostAddress = hostAddress;
242      this.userName = userName;
243      this.clientVersion = clientVersion;
244      this.serviceName = serviceName;
245    }
246
247    public String toString() {
248      return "ClientConnectionContext{hostAddress=" + hostAddress + ", userName=" + userName
249        + ", clientVersion=" + clientVersion + ", serviceName=" + serviceName + "}";
250    }
251  }
252}