001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
024import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.net.InetSocketAddress;
029import java.net.SocketAddress;
030import java.security.cert.Certificate;
031import java.security.cert.X509Certificate;
032import java.util.List;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.atomic.AtomicReference;
035import javax.net.ssl.SSLPeerUnverifiedException;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.HBaseServerBase;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.exceptions.X509Exception;
041import org.apache.hadoop.hbase.io.FileChangeWatcher;
042import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
043import org.apache.hadoop.hbase.security.HBasePolicyProvider;
044import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
045import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
054import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
055import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
056import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
057import org.apache.hbase.thirdparty.io.netty.channel.Channel;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
063import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
064import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
065import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
066import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler;
067import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
068import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
070
071/**
072 * An RPC server with Netty4 implementation.
073 * @since 2.0.0
074 */
075@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
076public class NettyRpcServer extends RpcServer {
077  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
078
079  /**
080   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
081   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
082   * and "heap", or, the name of a class implementing ByteBufAllocator.
083   * <p>
084   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
085   * controlled by platform specific code and documented system properties.
086   * <p>
087   * "heap" will prefer heap arena allocations.
088   */
089  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
090  static final String POOLED_ALLOCATOR_TYPE = "pooled";
091  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
092  static final String HEAP_ALLOCATOR_TYPE = "heap";
093
094  /**
095   * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was
096   * exceeded, channel will have setAutoRead to true again. The server will start reading incoming
097   * bytes (requests) from the client channel.
098   */
099  public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
100    "hbase.server.netty.writable.watermark.low";
101  private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
102
103  /**
104   * High watermark for pending outbound bytes of a single netty channel. If the number of pending
105   * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server
106   * will stop reading incoming requests from the client channel.
107   * <p>
108   * Note: any requests already in the call queue will still be processed.
109   */
110  public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
111    "hbase.server.netty.writable.watermark.high";
112  private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
113
114  /**
115   * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending
116   * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory
117   * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight
118   * requests.
119   * <p>
120   * Note: must be higher than the high watermark, otherwise it's ignored.
121   */
122  public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
123    "hbase.server.netty.writable.watermark.fatal";
124  private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
125
126  private final InetSocketAddress bindAddress;
127
128  private final CountDownLatch closed = new CountDownLatch(1);
129  private final Channel serverChannel;
130  final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
131  private final ByteBufAllocator channelAllocator;
132  private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>();
133  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
134  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
135
136  private volatile int writeBufferFatalThreshold;
137  private volatile WriteBufferWaterMark writeBufferWaterMark;
138
139  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
140    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
141    boolean reservoirEnabled) throws IOException {
142    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
143    this.bindAddress = bindAddress;
144    this.channelAllocator = getChannelAllocator(conf);
145    // Get the event loop group configuration from the server class if available.
146    NettyEventLoopGroupConfig config = null;
147    if (server instanceof HBaseServerBase) {
148      config = ((HBaseServerBase<?>) server).getEventLoopGroupConfig();
149    }
150    if (config == null) {
151      config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
152    }
153
154    // call before creating bootstrap below so that the necessary configs can be set
155    configureNettyWatermarks(conf);
156
157    EventLoopGroup eventLoopGroup = config.group();
158    Class<? extends ServerChannel> channelClass = config.serverChannelClass();
159    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
160      .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
161      .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
162      .childOption(ChannelOption.SO_REUSEADDR, true)
163      .childHandler(new ChannelInitializer<Channel>() {
164        @Override
165        protected void initChannel(Channel ch) throws Exception {
166          ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
167          ch.config().setAllocator(channelAllocator);
168          ChannelPipeline pipeline = ch.pipeline();
169
170          NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
171
172          if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
173            initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
174          }
175          pipeline
176            .addLast(NettyRpcServerPreambleHandler.DECODER_NAME,
177              NettyRpcServerPreambleHandler.createDecoder())
178            .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn))
179            // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
180            // send RpcResponse to client.
181            .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics))
182            // Add writability handler after the response encoder, so we can abort writes before
183            // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so
184            // that the handler configs can be live updated via update_config.
185            .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
186              new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold,
187                () -> isWritabilityBackpressureEnabled()));
188        }
189      });
190    try {
191      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
192      LOG.info("Bind to {}", serverChannel.localAddress());
193    } catch (InterruptedException e) {
194      throw new InterruptedIOException(e.getMessage());
195    }
196    initReconfigurable(conf);
197    this.scheduler.init(new RpcSchedulerContext(this));
198  }
199
200  @Override
201  public void onConfigurationChange(Configuration newConf) {
202    super.onConfigurationChange(newConf);
203    configureNettyWatermarks(newConf);
204  }
205
206  private void configureNettyWatermarks(Configuration conf) {
207    int watermarkLow =
208      conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
209    int watermarkHigh =
210      conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
211    int fatalThreshold =
212      conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
213
214    WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
215    int oldFatalThreshold = writeBufferFatalThreshold;
216
217    boolean disabled = false;
218    if (watermarkHigh == 0 && watermarkLow == 0) {
219      // if both are 0, use the netty default, which we will treat as "disabled".
220      // when disabled, we won't manage autoRead in response to writability changes.
221      writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
222      disabled = true;
223    } else {
224      // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to
225      // 1 to avoid confusing behavior.
226      if (watermarkLow == 0) {
227        LOG.warn(
228          "Detected a {} value of 0, which is impossible to achieve "
229            + "due to how netty evaluates these thresholds, setting to 1",
230          CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
231        watermarkLow = 1;
232      }
233
234      // netty validates the watermarks and throws an exception if high < low, fail more gracefully
235      // by disabling the watermarks and warning.
236      if (watermarkHigh <= watermarkLow) {
237        LOG.warn(
238          "Detected {} value {}, lower than {} value {}. This will fail netty validation, "
239            + "so disabling",
240          CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
241          watermarkLow);
242        writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
243      } else {
244        writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh);
245      }
246
247      // only apply this check when watermark is enabled. this way we give the operator some
248      // flexibility if they want to try enabling fatal threshold without backpressure.
249      if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
250        LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.",
251          CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
252          watermarkHigh);
253        fatalThreshold = 0;
254      }
255    }
256
257    writeBufferFatalThreshold = fatalThreshold;
258
259    if (
260      oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
261        || oldWaterMark.high() != writeBufferWaterMark.high()
262        || oldFatalThreshold != writeBufferFatalThreshold)
263    ) {
264      LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}",
265        disabled ? "disabled" : writeBufferWaterMark.low(),
266        disabled ? "disabled" : writeBufferWaterMark.high(),
267        writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold);
268    }
269
270    // update any existing channels
271    for (Channel channel : allChannels) {
272      channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
273      // if disabling watermark, set auto read to true in case channel had been exceeding
274      // previous watermark
275      if (disabled) {
276        channel.config().setAutoRead(true);
277      }
278    }
279  }
280
281  public boolean isWritabilityBackpressureEnabled() {
282    return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
283  }
284
285  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
286    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
287    if (value != null) {
288      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
289        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
290        return PooledByteBufAllocator.DEFAULT;
291      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
292        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
293        return UnpooledByteBufAllocator.DEFAULT;
294      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
295        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
296        return HeapByteBufAllocator.DEFAULT;
297      } else {
298        // If the value is none of the recognized labels, treat it as a class name. This allows the
299        // user to supply a custom implementation, perhaps for debugging.
300        try {
301          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
302          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
303          LOG.info("Using {} for buffer allocation", value);
304          return alloc;
305        } catch (ClassCastException | UnsupportedOperationException e) {
306          throw new IOException(e);
307        }
308      }
309    } else {
310      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
311      return PooledByteBufAllocator.DEFAULT;
312    }
313  }
314
315  // will be overridden in tests
316  @InterfaceAudience.Private
317  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
318    return new NettyServerRpcConnection(NettyRpcServer.this, channel);
319  }
320
321  @Override
322  public synchronized void start() {
323    if (started) {
324      return;
325    }
326    authTokenSecretMgr = createSecretManager();
327    if (authTokenSecretMgr != null) {
328      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
329      // LeaderElector start. See HBASE-25875
330      synchronized (authTokenSecretMgr) {
331        setSecretManager(authTokenSecretMgr);
332        authTokenSecretMgr.start();
333      }
334    }
335    this.authManager = new ServiceAuthorizationManager();
336    HBasePolicyProvider.init(conf, authManager);
337    scheduler.start();
338    started = true;
339  }
340
341  @Override
342  public synchronized void stop() {
343    if (!running) {
344      return;
345    }
346    LOG.info("Stopping server on " + this.serverChannel.localAddress());
347    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
348    if (ks != null) {
349      ks.stop();
350    }
351    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
352    if (ts != null) {
353      ts.stop();
354    }
355    if (authTokenSecretMgr != null) {
356      authTokenSecretMgr.stop();
357      authTokenSecretMgr = null;
358    }
359    allChannels.close().awaitUninterruptibly();
360    serverChannel.close();
361    scheduler.stop();
362    closed.countDown();
363    running = false;
364  }
365
366  @Override
367  public synchronized void join() throws InterruptedException {
368    closed.await();
369  }
370
371  @Override
372  public synchronized InetSocketAddress getListenerAddress() {
373    return ((InetSocketAddress) serverChannel.localAddress());
374  }
375
376  @Override
377  public void setSocketSendBufSize(int size) {
378  }
379
380  @Override
381  public int getNumOpenConnections() {
382    return allChannels.size();
383  }
384
385  private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext)
386    throws X509Exception, IOException {
387    SslContext nettySslContext = getSslContext();
388
389    if (supportPlaintext) {
390      p.addLast("ssl", new OptionalSslHandler(nettySslContext));
391      LOG.debug("Dual mode SSL handler added for channel: {}", p.channel());
392    } else {
393      SocketAddress remoteAddress = p.channel().remoteAddress();
394      SslHandler sslHandler;
395
396      if (remoteAddress instanceof InetSocketAddress) {
397        InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress;
398        String host;
399
400        if (conf.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true)) {
401          host = remoteInetAddress.getHostName();
402        } else {
403          host = remoteInetAddress.getHostString();
404        }
405
406        int port = remoteInetAddress.getPort();
407
408        /*
409         * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the
410         * engine properly by passing the remote address
411         */
412        sslHandler = nettySslContext.newHandler(p.channel().alloc(), host, port);
413      } else {
414        sslHandler = nettySslContext.newHandler(p.channel().alloc());
415      }
416
417      sslHandler.setWrapDataSize(
418        conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE));
419
420      sslHandler.handshakeFuture()
421        .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress));
422
423      p.addLast("ssl", sslHandler);
424      LOG.debug("SSL handler added for channel: {}", p.channel());
425    }
426  }
427
428  static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler,
429    SocketAddress remoteAddress) {
430    try {
431      Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates();
432      if (certificates != null && certificates.length > 0) {
433        X509Certificate[] x509Certificates = new X509Certificate[certificates.length];
434        for (int i = 0; i < x509Certificates.length; i++) {
435          x509Certificates[i] = (X509Certificate) certificates[i];
436        }
437        conn.clientCertificateChain = x509Certificates;
438      } else if (sslHandler.engine().getNeedClientAuth()) {
439        LOG.debug(
440          "Could not get peer certificate on TLS connection from {}, although one is required",
441          remoteAddress);
442      }
443    } catch (SSLPeerUnverifiedException e) {
444      if (sslHandler.engine().getNeedClientAuth()) {
445        LOG.debug(
446          "Could not get peer certificate on TLS connection from {}, although one is required",
447          remoteAddress, e);
448      }
449    } catch (Exception e) {
450      LOG.debug("Unexpected error getting peer certificate for TLS connection from {}",
451        remoteAddress, e);
452    }
453  }
454
455  SslContext getSslContext() throws X509Exception, IOException {
456    SslContext result = sslContextForServer.get();
457    if (result == null) {
458      result = X509Util.createSslContextForServer(conf);
459      if (!sslContextForServer.compareAndSet(null, result)) {
460        // lost the race, another thread already set the value
461        result = sslContextForServer.get();
462      } else if (
463        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
464          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
465      ) {
466        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
467          () -> sslContextForServer.set(null));
468      }
469    }
470    return result;
471  }
472
473  public int getWriteBufferFatalThreshold() {
474    return writeBufferFatalThreshold;
475  }
476
477  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
478    long total = 0;
479    long max = 0;
480    for (Channel channel : allChannels) {
481      long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
482      total += outbound;
483      max = Math.max(max, outbound);
484    }
485    return Pair.newPair(total, max);
486  }
487}