001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.DataInput;
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.UnknownHostException;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeSet;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.TimeUnit;
034import java.util.function.Consumer;
035import javax.security.sasl.SaslException;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ExtendedCellScanner;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.client.MetricsConnection;
040import org.apache.hadoop.hbase.codec.Codec;
041import org.apache.hadoop.hbase.net.Address;
042import org.apache.hadoop.hbase.security.AuthMethod;
043import org.apache.hadoop.hbase.security.SecurityConstants;
044import org.apache.hadoop.hbase.security.SecurityInfo;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
047import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.io.compress.CompressionCodec;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.security.SecurityUtil;
053import org.apache.hadoop.security.token.Token;
054import org.apache.hadoop.security.token.TokenIdentifier;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.protobuf.Message;
060import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
061import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
062import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
064import org.apache.hbase.thirdparty.io.netty.util.Timeout;
065import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
066
067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
074
075/**
076 * Base class for ipc connection.
077 */
078@InterfaceAudience.Private
079abstract class RpcConnection {
080
081  private static final Logger LOG = LoggerFactory.getLogger(RpcConnection.class);
082
083  protected final ConnectionId remoteId;
084
085  protected final boolean useSasl;
086
087  protected final Token<? extends TokenIdentifier> token;
088
089  protected final SecurityInfo securityInfo;
090
091  protected final int reloginMaxBackoff; // max pause before relogin on sasl failure
092
093  protected final Codec codec;
094
095  protected final CompressionCodec compressor;
096
097  protected final CellBlockBuilder cellBlockBuilder;
098
099  protected final MetricsConnection metrics;
100  private final Map<String, byte[]> connectionAttributes;
101
102  protected final HashedWheelTimer timeoutTimer;
103
104  protected final Configuration conf;
105
106  protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled";
107
108  protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false;
109
110  // the last time we were picked up from connection pool.
111  protected long lastTouched;
112
113  protected SaslClientAuthenticationProvider provider;
114
115  // Record the server principal which we have successfully authenticated with the remote server
116  // this is used to save the extra round trip with server when there are multiple candidate server
117  // principals for a given rpc service, like ClientMetaService.
118  // See HBASE-28321 for more details.
119  private String lastSucceededServerPrincipal;
120
121  protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId,
122    String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor,
123    CellBlockBuilder cellBlockBuilder, MetricsConnection metrics,
124    Map<String, byte[]> connectionAttributes) throws IOException {
125    this.timeoutTimer = timeoutTimer;
126    this.codec = codec;
127    this.compressor = compressor;
128    this.cellBlockBuilder = cellBlockBuilder;
129    this.conf = conf;
130    this.metrics = metrics;
131    this.connectionAttributes = connectionAttributes;
132    User ticket = remoteId.getTicket();
133    this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
134    this.useSasl = isSecurityEnabled;
135
136    // Choose the correct Token and AuthenticationProvider for this client to use
137    SaslClientAuthenticationProviders providers =
138      SaslClientAuthenticationProviders.getInstance(conf);
139    Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair;
140    if (useSasl && securityInfo != null) {
141      pair = providers.selectProvider(clusterId, ticket);
142      if (pair == null) {
143        if (LOG.isTraceEnabled()) {
144          LOG.trace("Found no valid authentication method from providers={} with tokens={}",
145            providers.toString(), ticket.getTokens());
146        }
147        throw new RuntimeException("Found no valid authentication method from options");
148      }
149    } else if (!useSasl) {
150      // Hack, while SIMPLE doesn't go via SASL.
151      pair = providers.getSimpleProvider();
152    } else {
153      throw new RuntimeException("Could not compute valid client authentication provider");
154    }
155
156    this.provider = pair.getFirst();
157    this.token = pair.getSecond();
158
159    LOG.debug("Using {} authentication for service={}, sasl={}",
160      provider.getSaslAuthMethod().getName(), remoteId.serviceName, useSasl);
161    reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
162    this.remoteId = remoteId;
163  }
164
165  protected final void scheduleTimeoutTask(final Call call) {
166    if (call.timeout > 0) {
167      call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
168
169        @Override
170        public void run(Timeout timeout) throws Exception {
171          call.setTimeout(new CallTimeoutException(call.toShortString() + ", waitTime="
172            + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + "ms, rpcTimeout="
173            + call.timeout + "ms"));
174          callTimeout(call);
175        }
176      }, call.timeout, TimeUnit.MILLISECONDS);
177    }
178  }
179
180  // will be overridden in tests
181  protected byte[] getConnectionHeaderPreamble() {
182    // Assemble the preamble up in a buffer first and then send it. Writing individual elements,
183    // they are getting sent across piecemeal according to wireshark and then server is messing
184    // up the reading on occasion (the passed in stream is not buffered yet).
185    int rpcHeaderLen = HConstants.RPC_HEADER.length;
186    // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
187    byte[] preamble = new byte[rpcHeaderLen + 2];
188    System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
189    preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
190    synchronized (this) {
191      preamble[preamble.length - 1] = provider.getSaslAuthMethod().getCode();
192    }
193    return preamble;
194  }
195
196  protected final ConnectionHeader getConnectionHeader() {
197    final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
198    builder.setServiceName(remoteId.getServiceName());
199    final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
200    if (userInfoPB != null) {
201      builder.setUserInfo(userInfoPB);
202    }
203    if (this.codec != null) {
204      builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
205    }
206    if (this.compressor != null) {
207      builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
208    }
209    if (connectionAttributes != null && !connectionAttributes.isEmpty()) {
210      HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder();
211      for (Map.Entry<String, byte[]> attribute : connectionAttributes.entrySet()) {
212        attributeBuilder.setName(attribute.getKey());
213        attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
214        builder.addAttribute(attributeBuilder.build());
215      }
216    }
217    builder.setVersionInfo(ProtobufUtil.getVersionInfo());
218    boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
219    // if Crypto AES enable, setup Cipher transformation
220    if (isCryptoAESEnable) {
221      builder.setRpcCryptoCipherTransformation(
222        conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding"));
223    }
224    return builder.build();
225  }
226
227  protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
228    throws UnknownHostException {
229    if (metrics != null) {
230      metrics.incrNsLookups();
231    }
232    InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
233    if (remoteAddr.isUnresolved()) {
234      if (metrics != null) {
235        metrics.incrNsLookupsFailed();
236      }
237      throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
238    }
239    return remoteAddr;
240  }
241
242  private static boolean useCanonicalHostname(Configuration conf) {
243    return !conf.getBoolean(
244      SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS,
245      SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS);
246  }
247
248  private static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) {
249    final String hostname;
250    if (useCanonicalHostname(conf)) {
251      hostname = addr.getCanonicalHostName();
252      if (hostname.equals(addr.getHostAddress())) {
253        LOG.warn("Canonical hostname for SASL principal is the same with IP address: " + hostname
254          + ", " + addr.getHostName() + ". Check DNS configuration or consider "
255          + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + "=true");
256      }
257    } else {
258      hostname = addr.getHostName();
259    }
260
261    return hostname.toLowerCase();
262  }
263
264  private static String getServerPrincipal(Configuration conf, String serverKey, InetAddress server)
265    throws IOException {
266    String hostname = getHostnameForServerPrincipal(conf, server);
267    return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname);
268  }
269
270  protected final boolean isKerberosAuth() {
271    return provider.getSaslAuthMethod().getCode() == AuthMethod.KERBEROS.code;
272  }
273
274  protected final Set<String> getServerPrincipals() throws IOException {
275    // for authentication method other than kerberos, we do not need to know the server principal
276    if (!isKerberosAuth()) {
277      return Collections.singleton(HConstants.EMPTY_STRING);
278    }
279    // if we have successfully authenticated last time, just return the server principal we use last
280    // time
281    if (lastSucceededServerPrincipal != null) {
282      return Collections.singleton(lastSucceededServerPrincipal);
283    }
284    InetAddress server =
285      new InetSocketAddress(remoteId.address.getHostName(), remoteId.address.getPort())
286        .getAddress();
287    // Even if we have multiple config key in security info, it is still possible that we configured
288    // the same principal for them, so here we use a Set
289    Set<String> serverPrincipals = new TreeSet<>();
290    for (String serverPrincipalKey : securityInfo.getServerPrincipals()) {
291      serverPrincipals.add(getServerPrincipal(conf, serverPrincipalKey, server));
292    }
293    return serverPrincipals;
294  }
295
296  protected final <T> T randomSelect(Collection<T> c) {
297    int select = ThreadLocalRandom.current().nextInt(c.size());
298    int index = 0;
299    for (T t : c) {
300      if (index == select) {
301        return t;
302      }
303      index++;
304    }
305    return null;
306  }
307
308  protected final String chooseServerPrincipal(Set<String> candidates, Call securityPreambleCall)
309    throws SaslException {
310    String principal =
311      ((SecurityPreamableResponse) securityPreambleCall.response).getServerPrincipal();
312    if (!candidates.contains(principal)) {
313      // this means the server returns principal which is not in our candidates, it could be a
314      // malicious server, stop connecting
315      throw new SaslException(remoteId.address + " tells us to use server principal " + principal
316        + " which is not expected, should be one of " + candidates);
317    }
318    return principal;
319  }
320
321  protected final void saslNegotiationDone(String serverPrincipal, boolean succeed) {
322    LOG.debug("sasl negotiation done with serverPrincipal = {}, succeed = {}", serverPrincipal,
323      succeed);
324    if (succeed) {
325      this.lastSucceededServerPrincipal = serverPrincipal;
326    } else {
327      // clear the recorded principal if authentication failed
328      this.lastSucceededServerPrincipal = null;
329    }
330  }
331
332  protected abstract void callTimeout(Call call);
333
334  public ConnectionId remoteId() {
335    return remoteId;
336  }
337
338  public long getLastTouched() {
339    return lastTouched;
340  }
341
342  public void setLastTouched(long lastTouched) {
343    this.lastTouched = lastTouched;
344  }
345
346  /**
347   * Tell the idle connection sweeper whether we could be swept.
348   */
349  public abstract boolean isActive();
350
351  /**
352   * Just close connection. Do not need to remove from connection pool.
353   */
354  public abstract void shutdown();
355
356  public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException;
357
358  /**
359   * Does the clean up work after the connection is removed from the connection pool
360   */
361  public abstract void cleanupConnection();
362
363  protected final Call createSecurityPreambleCall(RpcCallback<Call> callback) {
364    return new Call(-1, null, null, null, SecurityPreamableResponse.getDefaultInstance(), 0, 0,
365      Collections.emptyMap(), callback, MetricsConnection.newCallStats());
366  }
367
368  private <T extends InputStream & DataInput> void finishCall(ResponseHeader responseHeader, T in,
369    Call call) throws IOException {
370    Message value;
371    if (call.responseDefaultType != null) {
372      Message.Builder builder = call.responseDefaultType.newBuilderForType();
373      if (!builder.mergeDelimitedFrom(in)) {
374        // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF
375        // before reading any bytes out, so here we need to manually finish create the EOFException
376        // and finish the call
377        call.setException(new EOFException("EOF while reading response with type: "
378          + call.responseDefaultType.getClass().getName()));
379        return;
380      }
381      value = builder.build();
382    } else {
383      value = null;
384    }
385    ExtendedCellScanner cellBlockScanner;
386    if (responseHeader.hasCellBlockMeta()) {
387      int size = responseHeader.getCellBlockMeta().getLength();
388      // Maybe we could read directly from the ByteBuf.
389      // The problem here is that we do not know when to release it.
390      byte[] cellBlock = new byte[size];
391      in.readFully(cellBlock);
392      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
393    } else {
394      cellBlockScanner = null;
395    }
396    call.setResponse(value, cellBlockScanner);
397  }
398
399  <T extends InputStream & DataInput> void readResponse(T in, Map<Integer, Call> id2Call,
400    Call preambleCall, Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException {
401    int totalSize = in.readInt();
402    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
403    int id = responseHeader.getCallId();
404    if (LOG.isTraceEnabled()) {
405      LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader)
406        + ", totalSize: " + totalSize + " bytes");
407    }
408    RemoteException remoteExc;
409    if (responseHeader.hasException()) {
410      ExceptionResponse exceptionResponse = responseHeader.getException();
411      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
412      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
413        // Here we will cleanup all calls so do not need to fall back, just return.
414        fatalConnectionErrorConsumer.accept(remoteExc);
415        if (preambleCall != null) {
416          preambleCall.setException(remoteExc);
417        }
418        return;
419      }
420    } else {
421      remoteExc = null;
422    }
423    if (id < 0) {
424      LOG.debug("process preamble call response with response type {}",
425        preambleCall != null
426          ? preambleCall.responseDefaultType.getDescriptorForType().getName()
427          : "null");
428      if (preambleCall == null) {
429        // fall through so later we will skip this response
430        LOG.warn("Got a negative call id {} but there is no preamble call", id);
431      } else {
432        if (remoteExc != null) {
433          preambleCall.setException(remoteExc);
434        } else {
435          finishCall(responseHeader, in, preambleCall);
436        }
437        return;
438      }
439    }
440    Call call = id2Call.remove(id);
441    if (call == null) {
442      // So we got a response for which we have no corresponding 'call' here on the client-side.
443      // We probably timed out waiting, cleaned up all references, and now the server decides
444      // to return a response. There is nothing we can do w/ the response at this stage. Clean
445      // out the wire of the response so its out of the way and we can get other responses on
446      // this connection.
447      if (LOG.isDebugEnabled()) {
448        int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
449        int whatIsLeftToRead = totalSize - readSoFar;
450        LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead
451          + " bytes");
452      }
453      return;
454    }
455    call.callStats.setResponseSizeBytes(totalSize);
456    if (remoteExc != null) {
457      call.setException(remoteExc);
458      return;
459    }
460    try {
461      finishCall(responseHeader, in, call);
462    } catch (IOException e) {
463      // As the call has been removed from id2Call map, if we hit an exception here, the
464      // exceptionCaught method can not help us finish the call, so here we need to catch the
465      // exception and finish it
466      call.setException(e);
467      // throw the exception out, the upper layer should determine whether this is a critical
468      // problem
469      throw e;
470    }
471  }
472}