001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 021import org.apache.hbase.thirdparty.io.netty.util.Timeout; 022import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 023 024import java.io.IOException; 025import java.net.UnknownHostException; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.codec.Codec; 034import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 038import org.apache.hadoop.hbase.security.AuthMethod; 039import org.apache.hadoop.hbase.security.SecurityInfo; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.io.Text; 042import org.apache.hadoop.io.compress.CompressionCodec; 043import org.apache.hadoop.security.SecurityUtil; 044import org.apache.hadoop.security.UserGroupInformation; 045import org.apache.hadoop.security.token.Token; 046import org.apache.hadoop.security.token.TokenIdentifier; 047import org.apache.hadoop.security.token.TokenSelector; 048 049/** 050 * Base class for ipc connection. 051 */ 052@InterfaceAudience.Private 053abstract class RpcConnection { 054 055 private static final Logger LOG = LoggerFactory.getLogger(RpcConnection.class); 056 057 protected final ConnectionId remoteId; 058 059 protected final AuthMethod authMethod; 060 061 protected final boolean useSasl; 062 063 protected final Token<? extends TokenIdentifier> token; 064 065 protected final String serverPrincipal; // server's krb5 principal name 066 067 protected final int reloginMaxBackoff; // max pause before relogin on sasl failure 068 069 protected final Codec codec; 070 071 protected final CompressionCodec compressor; 072 073 protected final HashedWheelTimer timeoutTimer; 074 075 protected final Configuration conf; 076 077 protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled"; 078 079 protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false; 080 081 // the last time we were picked up from connection pool. 082 protected long lastTouched; 083 084 protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, 085 String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor) 086 throws IOException { 087 if (remoteId.getAddress().isUnresolved()) { 088 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); 089 } 090 this.timeoutTimer = timeoutTimer; 091 this.codec = codec; 092 this.compressor = compressor; 093 this.conf = conf; 094 095 UserGroupInformation ticket = remoteId.getTicket().getUGI(); 096 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); 097 this.useSasl = isSecurityEnabled; 098 Token<? extends TokenIdentifier> token = null; 099 String serverPrincipal = null; 100 if (useSasl && securityInfo != null) { 101 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); 102 if (tokenKind != null) { 103 TokenSelector<? extends TokenIdentifier> tokenSelector = AbstractRpcClient.TOKEN_HANDLERS 104 .get(tokenKind); 105 if (tokenSelector != null) { 106 token = tokenSelector.selectToken(new Text(clusterId), ticket.getTokens()); 107 } else if (LOG.isDebugEnabled()) { 108 LOG.debug("No token selector found for type " + tokenKind); 109 } 110 } 111 String serverKey = securityInfo.getServerPrincipal(); 112 if (serverKey == null) { 113 throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); 114 } 115 serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey), 116 remoteId.address.getAddress().getCanonicalHostName().toLowerCase()); 117 if (LOG.isDebugEnabled()) { 118 LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName() 119 + " is " + serverPrincipal); 120 } 121 } 122 this.token = token; 123 this.serverPrincipal = serverPrincipal; 124 if (!useSasl) { 125 authMethod = AuthMethod.SIMPLE; 126 } else if (token != null) { 127 authMethod = AuthMethod.DIGEST; 128 } else { 129 authMethod = AuthMethod.KERBEROS; 130 } 131 132 // Log if debug AND non-default auth, else if trace enabled. 133 // No point logging obvious. 134 if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) || 135 LOG.isTraceEnabled()) { 136 // Only log if not default auth. 137 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName 138 + ", sasl=" + useSasl); 139 } 140 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); 141 this.remoteId = remoteId; 142 } 143 144 private UserInformation getUserInfo(UserGroupInformation ugi) { 145 if (ugi == null || authMethod == AuthMethod.DIGEST) { 146 // Don't send user for token auth 147 return null; 148 } 149 UserInformation.Builder userInfoPB = UserInformation.newBuilder(); 150 if (authMethod == AuthMethod.KERBEROS) { 151 // Send effective user for Kerberos auth 152 userInfoPB.setEffectiveUser(ugi.getUserName()); 153 } else if (authMethod == AuthMethod.SIMPLE) { 154 // Send both effective user and real user for simple auth 155 userInfoPB.setEffectiveUser(ugi.getUserName()); 156 if (ugi.getRealUser() != null) { 157 userInfoPB.setRealUser(ugi.getRealUser().getUserName()); 158 } 159 } 160 return userInfoPB.build(); 161 } 162 163 protected UserGroupInformation getUGI() { 164 UserGroupInformation ticket = remoteId.getTicket().getUGI(); 165 if (authMethod == AuthMethod.KERBEROS) { 166 if (ticket != null && ticket.getRealUser() != null) { 167 ticket = ticket.getRealUser(); 168 } 169 } 170 return ticket; 171 } 172 173 protected boolean shouldAuthenticateOverKrb() throws IOException { 174 UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); 175 UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); 176 UserGroupInformation realUser = currentUser.getRealUser(); 177 return authMethod == AuthMethod.KERBEROS && loginUser != null && 178 // Make sure user logged in using Kerberos either keytab or TGT 179 loginUser.hasKerberosCredentials() && 180 // relogin only in case it is the login user (e.g. JT) 181 // or superuser (like oozie). 182 (loginUser.equals(currentUser) || loginUser.equals(realUser)); 183 } 184 185 protected void relogin() throws IOException { 186 if (UserGroupInformation.isLoginKeytabBased()) { 187 UserGroupInformation.getLoginUser().reloginFromKeytab(); 188 } else { 189 UserGroupInformation.getLoginUser().reloginFromTicketCache(); 190 } 191 } 192 193 protected void scheduleTimeoutTask(final Call call) { 194 if (call.timeout > 0) { 195 call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { 196 197 @Override 198 public void run(Timeout timeout) throws Exception { 199 call.setTimeout(new CallTimeoutException("Call id=" + call.id + ", waitTime=" 200 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 201 + call.timeout)); 202 callTimeout(call); 203 } 204 }, call.timeout, TimeUnit.MILLISECONDS); 205 } 206 } 207 208 protected byte[] getConnectionHeaderPreamble() { 209 // Assemble the preamble up in a buffer first and then send it. Writing individual elements, 210 // they are getting sent across piecemeal according to wireshark and then server is messing 211 // up the reading on occasion (the passed in stream is not buffered yet). 212 213 // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE 214 int rpcHeaderLen = HConstants.RPC_HEADER.length; 215 byte[] preamble = new byte[rpcHeaderLen + 2]; 216 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); 217 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; 218 synchronized (this) { 219 preamble[rpcHeaderLen + 1] = authMethod.code; 220 } 221 return preamble; 222 } 223 224 protected ConnectionHeader getConnectionHeader() { 225 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); 226 builder.setServiceName(remoteId.getServiceName()); 227 UserInformation userInfoPB; 228 if ((userInfoPB = getUserInfo(remoteId.ticket.getUGI())) != null) { 229 builder.setUserInfo(userInfoPB); 230 } 231 if (this.codec != null) { 232 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); 233 } 234 if (this.compressor != null) { 235 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); 236 } 237 builder.setVersionInfo(ProtobufUtil.getVersionInfo()); 238 boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 239 // if Crypto AES enable, setup Cipher transformation 240 if (isCryptoAESEnable) { 241 builder.setRpcCryptoCipherTransformation( 242 conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding")); 243 } 244 return builder.build(); 245 } 246 247 protected abstract void callTimeout(Call call); 248 249 public ConnectionId remoteId() { 250 return remoteId; 251 } 252 253 public long getLastTouched() { 254 return lastTouched; 255 } 256 257 public void setLastTouched(long lastTouched) { 258 this.lastTouched = lastTouched; 259 } 260 261 /** 262 * Tell the idle connection sweeper whether we could be swept. 263 */ 264 public abstract boolean isActive(); 265 266 /** 267 * Just close connection. Do not need to remove from connection pool. 268 */ 269 public abstract void shutdown(); 270 271 public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException; 272 273 /** 274 * Does the clean up work after the connection is removed from the connection pool 275 */ 276 public abstract void cleanupConnection(); 277}