001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.util.Optional; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 025import org.apache.hadoop.hbase.ipc.RpcServer; 026import org.apache.hadoop.hbase.ipc.ServerRpcConnection; 027import org.apache.hadoop.hbase.security.User; 028import org.apache.hadoop.hbase.security.UserProvider; 029import org.apache.hadoop.hbase.util.LossyCounting; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 033 034@InterfaceAudience.Private 035public class MetricsUserAggregateImpl implements MetricsUserAggregate { 036 037 /** Provider for mapping principal names to Users */ 038 private final UserProvider userProvider; 039 040 private final MetricsUserAggregateSource source; 041 private final LossyCounting<MetricsUserSource> userMetricLossyCounting; 042 043 public MetricsUserAggregateImpl(Configuration conf) { 044 source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) 045 .getUserAggregate(); 046 userMetricLossyCounting = new LossyCounting<>("userMetrics", conf, source::deregister); 047 this.userProvider = UserProvider.instantiate(conf); 048 } 049 050 /** 051 * Returns the active user to which authorization checks should be applied. If we are in the 052 * context of an RPC call, the remote user is used, otherwise the currently logged in user is 053 * used. 054 */ 055 private String getActiveUser() { 056 Optional<User> user = RpcServer.getRequestUser(); 057 if (!user.isPresent()) { 058 // for non-rpc handling, fallback to system user 059 try { 060 user = Optional.of(userProvider.getCurrent()); 061 } catch (IOException ignore) { 062 } 063 } 064 return user.map(User::getShortName).orElse(null); 065 } 066 067 @Override 068 public MetricsUserAggregateSource getSource() { 069 return source; 070 } 071 072 @Override 073 public void updatePut(long t) { 074 String user = getActiveUser(); 075 if (user != null) { 076 MetricsUserSource userSource = getOrCreateMetricsUser(user); 077 userSource.updatePut(t); 078 incrementClientWriteMetrics(userSource); 079 } 080 081 } 082 083 private String getClient() { 084 Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress(); 085 return ipOptional.map(InetAddress::getHostName).orElse(null); 086 } 087 088 private void incrementClientReadMetrics(MetricsUserSource userSource) { 089 ClientConnectionContext ctx = getClientConnectionContext(); 090 String client = getClient(); 091 if (client != null && userSource != null) { 092 userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion, 093 ctx.serviceName).incrementReadRequest(); 094 } 095 } 096 097 private void incrementFilteredReadRequests(MetricsUserSource userSource) { 098 ClientConnectionContext ctx = getClientConnectionContext(); 099 String client = getClient(); 100 if (client != null && userSource != null) { 101 userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion, 102 ctx.serviceName).incrementFilteredReadRequests(); 103 } 104 } 105 106 private void incrementClientWriteMetrics(MetricsUserSource userSource) { 107 ClientConnectionContext ctx = getClientConnectionContext(); 108 String client = getClient(); 109 if (client != null && userSource != null) { 110 userSource.getOrCreateMetricsClient(client, ctx.hostAddress, ctx.userName, ctx.clientVersion, 111 ctx.serviceName).incrementWriteRequest(); 112 } 113 } 114 115 @Override 116 public void updateDelete(long t) { 117 String user = getActiveUser(); 118 if (user != null) { 119 MetricsUserSource userSource = getOrCreateMetricsUser(user); 120 userSource.updateDelete(t); 121 incrementClientWriteMetrics(userSource); 122 } 123 } 124 125 @Override 126 public void updateGet(long time, long blockBytesScanned) { 127 String user = getActiveUser(); 128 if (user != null) { 129 MetricsUserSource userSource = getOrCreateMetricsUser(user); 130 userSource.updateGet(time, blockBytesScanned); 131 } 132 } 133 134 @Override 135 public void updateIncrement(long time, long blockBytesScanned) { 136 String user = getActiveUser(); 137 if (user != null) { 138 MetricsUserSource userSource = getOrCreateMetricsUser(user); 139 userSource.updateIncrement(time, blockBytesScanned); 140 incrementClientWriteMetrics(userSource); 141 } 142 } 143 144 @Override 145 public void updateAppend(long time, long blockBytesScanned) { 146 String user = getActiveUser(); 147 if (user != null) { 148 MetricsUserSource userSource = getOrCreateMetricsUser(user); 149 userSource.updateAppend(time, blockBytesScanned); 150 incrementClientWriteMetrics(userSource); 151 } 152 } 153 154 @Override 155 public void updateReplay(long t) { 156 String user = getActiveUser(); 157 if (user != null) { 158 MetricsUserSource userSource = getOrCreateMetricsUser(user); 159 userSource.updateReplay(t); 160 incrementClientWriteMetrics(userSource); 161 } 162 } 163 164 @Override 165 public void updateScan(long time, long blockBytesScanned) { 166 String user = getActiveUser(); 167 if (user != null) { 168 MetricsUserSource userSource = getOrCreateMetricsUser(user); 169 userSource.updateScan(time, blockBytesScanned); 170 } 171 } 172 173 @Override 174 public void updateCheckAndMutate(long blockBytesScanned) { 175 String user = getActiveUser(); 176 if (user != null) { 177 MetricsUserSource userSource = getOrCreateMetricsUser(user); 178 userSource.updateCheckAndMutate(blockBytesScanned); 179 } 180 } 181 182 @Override 183 public void updateFilteredReadRequests() { 184 String user = getActiveUser(); 185 if (user != null) { 186 MetricsUserSource userSource = getOrCreateMetricsUser(user); 187 incrementFilteredReadRequests(userSource); 188 } 189 } 190 191 @Override 192 public void updateReadRequestCount() { 193 String user = getActiveUser(); 194 if (user != null) { 195 MetricsUserSource userSource = getOrCreateMetricsUser(user); 196 incrementClientReadMetrics(userSource); 197 } 198 } 199 200 private MetricsUserSource getOrCreateMetricsUser(String user) { 201 MetricsUserSource userSource = source.getOrCreateMetricsUser(user); 202 userMetricLossyCounting.add(userSource); 203 return userSource; 204 } 205 206 private ClientConnectionContext getClientConnectionContext() { 207 String hostAddress = null; 208 String userName = "Unknown"; 209 String clientVersion = "Unknown"; 210 String serviceName = "Unknown"; 211 212 Optional<ServerRpcConnection> rpcConnectionOptional = RpcServer.getCurrentServerRpcConnection(); 213 if (rpcConnectionOptional.isPresent()) { 214 ServerRpcConnection rpcConnection = rpcConnectionOptional.get(); 215 hostAddress = rpcConnection.getHostAddress(); 216 RPCProtos.ConnectionHeader connectionHeader = rpcConnection.getConnectionHeader(); 217 if (connectionHeader.hasUserInfo()) { 218 RPCProtos.UserInformation userInfoProto = connectionHeader.getUserInfo(); 219 if (userInfoProto.hasEffectiveUser()) { 220 userName = userInfoProto.getEffectiveUser(); 221 } 222 } 223 if (connectionHeader.hasVersionInfo()) { 224 clientVersion = connectionHeader.getVersionInfo().getVersion(); 225 } 226 if (connectionHeader.hasServiceName()) { 227 serviceName = connectionHeader.getServiceName(); 228 } 229 } 230 return new ClientConnectionContext(hostAddress, userName, clientVersion, serviceName); 231 } 232 233 private static class ClientConnectionContext { 234 final String hostAddress; 235 final String userName; 236 final String clientVersion; 237 final String serviceName; 238 239 ClientConnectionContext(String hostAddress, String userName, String clientVersion, 240 String serviceName) { 241 this.hostAddress = hostAddress; 242 this.userName = userName; 243 this.clientVersion = clientVersion; 244 this.serviceName = serviceName; 245 } 246 247 public String toString() { 248 return "ClientConnectionContext{hostAddress=" + hostAddress + ", userName=" + userName 249 + ", clientVersion=" + clientVersion + ", serviceName=" + serviceName + "}"; 250 } 251 } 252}