001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.net.InetSocketAddress;
028import java.net.SocketAddress;
029import java.security.cert.Certificate;
030import java.security.cert.X509Certificate;
031import java.util.List;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicReference;
034import javax.net.ssl.SSLPeerUnverifiedException;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HBaseServerBase;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.exceptions.X509Exception;
040import org.apache.hadoop.hbase.io.FileChangeWatcher;
041import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
042import org.apache.hadoop.hbase.security.HBasePolicyProvider;
043import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
044import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.util.ReflectionUtils;
047import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
054import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
055import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
056import org.apache.hbase.thirdparty.io.netty.channel.Channel;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
062import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
063import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
064import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
065import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler;
066import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
067import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
068import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
069
070/**
071 * An RPC server with Netty4 implementation.
072 * @since 2.0.0
073 */
074@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
075public class NettyRpcServer extends RpcServer {
076  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
077
078  /**
079   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
080   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
081   * and "heap", or, the name of a class implementing ByteBufAllocator.
082   * <p>
083   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
084   * controlled by platform specific code and documented system properties.
085   * <p>
086   * "heap" will prefer heap arena allocations.
087   */
088  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
089  static final String POOLED_ALLOCATOR_TYPE = "pooled";
090  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
091  static final String HEAP_ALLOCATOR_TYPE = "heap";
092
093  /**
094   * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was
095   * exceeded, channel will have setAutoRead to true again. The server will start reading incoming
096   * bytes (requests) from the client channel.
097   */
098  public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
099    "hbase.server.netty.writable.watermark.low";
100  private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
101
102  /**
103   * High watermark for pending outbound bytes of a single netty channel. If the number of pending
104   * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server
105   * will stop reading incoming requests from the client channel.
106   * <p>
107   * Note: any requests already in the call queue will still be processed.
108   */
109  public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
110    "hbase.server.netty.writable.watermark.high";
111  private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
112
113  /**
114   * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending
115   * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory
116   * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight
117   * requests.
118   * <p>
119   * Note: must be higher than the high watermark, otherwise it's ignored.
120   */
121  public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
122    "hbase.server.netty.writable.watermark.fatal";
123  private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
124
125  private final InetSocketAddress bindAddress;
126
127  private final CountDownLatch closed = new CountDownLatch(1);
128  private final Channel serverChannel;
129  final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
130  private final ByteBufAllocator channelAllocator;
131  private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>();
132  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
133  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
134
135  private volatile int writeBufferFatalThreshold;
136  private volatile WriteBufferWaterMark writeBufferWaterMark;
137
138  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
139    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
140    boolean reservoirEnabled) throws IOException {
141    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
142    this.bindAddress = bindAddress;
143    this.channelAllocator = getChannelAllocator(conf);
144    // Get the event loop group configuration from the server class if available.
145    NettyEventLoopGroupConfig config = null;
146    if (server instanceof HBaseServerBase) {
147      config = ((HBaseServerBase<?>) server).getEventLoopGroupConfig();
148    }
149    if (config == null) {
150      config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
151    }
152
153    // call before creating bootstrap below so that the necessary configs can be set
154    configureNettyWatermarks(conf);
155
156    EventLoopGroup eventLoopGroup = config.group();
157    Class<? extends ServerChannel> channelClass = config.serverChannelClass();
158    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
159      .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
160      .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
161      .childOption(ChannelOption.SO_REUSEADDR, true)
162      .childHandler(new ChannelInitializer<Channel>() {
163        @Override
164        protected void initChannel(Channel ch) throws Exception {
165          ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
166          ch.config().setAllocator(channelAllocator);
167          ChannelPipeline pipeline = ch.pipeline();
168
169          NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
170
171          if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
172            initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
173          }
174          pipeline
175            .addLast(NettyRpcServerPreambleHandler.DECODER_NAME,
176              NettyRpcServerPreambleHandler.createDecoder())
177            .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn))
178            // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
179            // send RpcResponse to client.
180            .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics))
181            // Add writability handler after the response encoder, so we can abort writes before
182            // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so
183            // that the handler configs can be live updated via update_config.
184            .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
185              new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold,
186                () -> isWritabilityBackpressureEnabled()));
187        }
188      });
189    try {
190      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
191      LOG.info("Bind to {}", serverChannel.localAddress());
192    } catch (InterruptedException e) {
193      throw new InterruptedIOException(e.getMessage());
194    }
195    initReconfigurable(conf);
196    this.scheduler.init(new RpcSchedulerContext(this));
197  }
198
199  @Override
200  public void onConfigurationChange(Configuration newConf) {
201    super.onConfigurationChange(newConf);
202    configureNettyWatermarks(newConf);
203  }
204
205  private void configureNettyWatermarks(Configuration conf) {
206    int watermarkLow =
207      conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
208    int watermarkHigh =
209      conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
210    int fatalThreshold =
211      conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
212
213    WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
214    int oldFatalThreshold = writeBufferFatalThreshold;
215
216    boolean disabled = false;
217    if (watermarkHigh == 0 && watermarkLow == 0) {
218      // if both are 0, use the netty default, which we will treat as "disabled".
219      // when disabled, we won't manage autoRead in response to writability changes.
220      writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
221      disabled = true;
222    } else {
223      // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to
224      // 1 to avoid confusing behavior.
225      if (watermarkLow == 0) {
226        LOG.warn(
227          "Detected a {} value of 0, which is impossible to achieve "
228            + "due to how netty evaluates these thresholds, setting to 1",
229          CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
230        watermarkLow = 1;
231      }
232
233      // netty validates the watermarks and throws an exception if high < low, fail more gracefully
234      // by disabling the watermarks and warning.
235      if (watermarkHigh <= watermarkLow) {
236        LOG.warn(
237          "Detected {} value {}, lower than {} value {}. This will fail netty validation, "
238            + "so disabling",
239          CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
240          watermarkLow);
241        writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
242      } else {
243        writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh);
244      }
245
246      // only apply this check when watermark is enabled. this way we give the operator some
247      // flexibility if they want to try enabling fatal threshold without backpressure.
248      if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
249        LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.",
250          CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
251          watermarkHigh);
252        fatalThreshold = 0;
253      }
254    }
255
256    writeBufferFatalThreshold = fatalThreshold;
257
258    if (
259      oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
260        || oldWaterMark.high() != writeBufferWaterMark.high()
261        || oldFatalThreshold != writeBufferFatalThreshold)
262    ) {
263      LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}",
264        disabled ? "disabled" : writeBufferWaterMark.low(),
265        disabled ? "disabled" : writeBufferWaterMark.high(),
266        writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold);
267    }
268
269    // update any existing channels
270    for (Channel channel : allChannels) {
271      channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
272      // if disabling watermark, set auto read to true in case channel had been exceeding
273      // previous watermark
274      if (disabled) {
275        channel.config().setAutoRead(true);
276      }
277    }
278  }
279
280  public boolean isWritabilityBackpressureEnabled() {
281    return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
282  }
283
284  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
285    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
286    if (value != null) {
287      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
288        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
289        return PooledByteBufAllocator.DEFAULT;
290      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
291        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
292        return UnpooledByteBufAllocator.DEFAULT;
293      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
294        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
295        return HeapByteBufAllocator.DEFAULT;
296      } else {
297        // If the value is none of the recognized labels, treat it as a class name. This allows the
298        // user to supply a custom implementation, perhaps for debugging.
299        try {
300          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
301          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
302          LOG.info("Using {} for buffer allocation", value);
303          return alloc;
304        } catch (ClassCastException | UnsupportedOperationException e) {
305          throw new IOException(e);
306        }
307      }
308    } else {
309      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
310      return PooledByteBufAllocator.DEFAULT;
311    }
312  }
313
314  // will be overridden in tests
315  @InterfaceAudience.Private
316  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
317    return new NettyServerRpcConnection(NettyRpcServer.this, channel);
318  }
319
320  @Override
321  public synchronized void start() {
322    if (started) {
323      return;
324    }
325    authTokenSecretMgr = createSecretManager();
326    if (authTokenSecretMgr != null) {
327      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
328      // LeaderElector start. See HBASE-25875
329      synchronized (authTokenSecretMgr) {
330        setSecretManager(authTokenSecretMgr);
331        authTokenSecretMgr.start();
332      }
333    }
334    this.authManager = new ServiceAuthorizationManager();
335    HBasePolicyProvider.init(conf, authManager);
336    scheduler.start();
337    started = true;
338  }
339
340  @Override
341  public synchronized void stop() {
342    if (!running) {
343      return;
344    }
345    LOG.info("Stopping server on " + this.serverChannel.localAddress());
346    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
347    if (ks != null) {
348      ks.stop();
349    }
350    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
351    if (ts != null) {
352      ts.stop();
353    }
354    if (authTokenSecretMgr != null) {
355      authTokenSecretMgr.stop();
356      authTokenSecretMgr = null;
357    }
358    allChannels.close().awaitUninterruptibly();
359    serverChannel.close();
360    scheduler.stop();
361    closed.countDown();
362    running = false;
363  }
364
365  @Override
366  public synchronized void join() throws InterruptedException {
367    closed.await();
368  }
369
370  @Override
371  public synchronized InetSocketAddress getListenerAddress() {
372    return ((InetSocketAddress) serverChannel.localAddress());
373  }
374
375  @Override
376  public void setSocketSendBufSize(int size) {
377  }
378
379  @Override
380  public int getNumOpenConnections() {
381    return allChannels.size();
382  }
383
384  private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext)
385    throws X509Exception, IOException {
386    SslContext nettySslContext = getSslContext();
387
388    /*
389     * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the engine
390     * properly by passing the remote address
391     */
392
393    if (supportPlaintext) {
394      SocketAddress remoteAddress = p.channel().remoteAddress();
395      OptionalSslHandler optionalSslHandler;
396
397      if (remoteAddress instanceof InetSocketAddress) {
398        InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress;
399        optionalSslHandler = new OptionalSslHandlerWithHostPort(nettySslContext,
400          remoteInetAddress.getHostString(), remoteInetAddress.getPort());
401      } else {
402        optionalSslHandler = new OptionalSslHandler(nettySslContext);
403      }
404
405      p.addLast("ssl", optionalSslHandler);
406      LOG.debug("Dual mode SSL handler added for channel: {}", p.channel());
407    } else {
408      SocketAddress remoteAddress = p.channel().remoteAddress();
409      SslHandler sslHandler;
410
411      if (remoteAddress instanceof InetSocketAddress) {
412        InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress;
413        sslHandler = nettySslContext.newHandler(p.channel().alloc(),
414          remoteInetAddress.getHostString(), remoteInetAddress.getPort());
415      } else {
416        sslHandler = nettySslContext.newHandler(p.channel().alloc());
417      }
418
419      sslHandler.setWrapDataSize(
420        conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE));
421
422      sslHandler.handshakeFuture()
423        .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress));
424
425      p.addLast("ssl", sslHandler);
426      LOG.debug("SSL handler added for channel: {}", p.channel());
427    }
428  }
429
430  static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler,
431    SocketAddress remoteAddress) {
432    try {
433      Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates();
434      if (certificates != null && certificates.length > 0) {
435        X509Certificate[] x509Certificates = new X509Certificate[certificates.length];
436        for (int i = 0; i < x509Certificates.length; i++) {
437          x509Certificates[i] = (X509Certificate) certificates[i];
438        }
439        conn.clientCertificateChain = x509Certificates;
440      } else if (sslHandler.engine().getNeedClientAuth()) {
441        LOG.debug(
442          "Could not get peer certificate on TLS connection from {}, although one is required",
443          remoteAddress);
444      }
445    } catch (SSLPeerUnverifiedException e) {
446      if (sslHandler.engine().getNeedClientAuth()) {
447        LOG.debug(
448          "Could not get peer certificate on TLS connection from {}, although one is required",
449          remoteAddress, e);
450      }
451    } catch (Exception e) {
452      LOG.debug("Unexpected error getting peer certificate for TLS connection from {}",
453        remoteAddress, e);
454    }
455  }
456
457  SslContext getSslContext() throws X509Exception, IOException {
458    SslContext result = sslContextForServer.get();
459    if (result == null) {
460      result = X509Util.createSslContextForServer(conf);
461      if (!sslContextForServer.compareAndSet(null, result)) {
462        // lost the race, another thread already set the value
463        result = sslContextForServer.get();
464      } else if (
465        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
466          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
467      ) {
468        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
469          () -> sslContextForServer.set(null));
470      }
471    }
472    return result;
473  }
474
475  public int getWriteBufferFatalThreshold() {
476    return writeBufferFatalThreshold;
477  }
478
479  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
480    long total = 0;
481    long max = 0;
482    for (Channel channel : allChannels) {
483      long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
484      total += outbound;
485      max = Math.max(max, outbound);
486    }
487    return Pair.newPair(total, max);
488  }
489}