001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.InetSocketAddress;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.CellScanner;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.hadoop.hbase.security.HBasePolicyProvider;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
034import org.apache.hadoop.hbase.util.Pair;
035import org.apache.hadoop.hbase.util.ReflectionUtils;
036import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
046import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
047import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
048import org.apache.hbase.thirdparty.io.netty.channel.Channel;
049import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
050import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
051import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
052import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
053import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
054import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
055import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
056import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
057import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
058
059/**
060 * An RPC server with Netty4 implementation.
061 * @since 2.0.0
062 */
063@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
064public class NettyRpcServer extends RpcServer {
065  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
066
067  /**
068   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
069   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
070   * and "heap", or, the name of a class implementing ByteBufAllocator.
071   * <p>
072   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
073   * controlled by platform specific code and documented system properties.
074   * <p>
075   * "heap" will prefer heap arena allocations.
076   */
077  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
078  static final String POOLED_ALLOCATOR_TYPE = "pooled";
079  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
080  static final String HEAP_ALLOCATOR_TYPE = "heap";
081
082  private final InetSocketAddress bindAddress;
083
084  private final CountDownLatch closed = new CountDownLatch(1);
085  private final Channel serverChannel;
086  final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
087  private final ByteBufAllocator channelAllocator;
088
089  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
090    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
091    boolean reservoirEnabled) throws IOException {
092    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
093    this.bindAddress = bindAddress;
094    this.channelAllocator = getChannelAllocator(conf);
095    // Get the event loop group configuration from the server class if available.
096    NettyEventLoopGroupConfig config = null;
097    if (server instanceof HRegionServer) {
098      config = ((HRegionServer) server).getEventLoopGroupConfig();
099    }
100    if (config == null) {
101      config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
102    }
103    EventLoopGroup eventLoopGroup = config.group();
104    Class<? extends ServerChannel> channelClass = config.serverChannelClass();
105    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
106      .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
107      .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
108      .childOption(ChannelOption.SO_REUSEADDR, true)
109      .childHandler(new ChannelInitializer<Channel>() {
110        @Override
111        protected void initChannel(Channel ch) throws Exception {
112          ch.config().setAllocator(channelAllocator);
113          ChannelPipeline pipeline = ch.pipeline();
114          FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
115          preambleDecoder.setSingleDecode(true);
116          pipeline.addLast("preambleDecoder", preambleDecoder);
117          pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
118          pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize));
119          pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
120          pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
121        }
122      });
123    try {
124      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
125      LOG.info("Bind to {}", serverChannel.localAddress());
126    } catch (InterruptedException e) {
127      throw new InterruptedIOException(e.getMessage());
128    }
129    initReconfigurable(conf);
130    this.scheduler.init(new RpcSchedulerContext(this));
131  }
132
133  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
134    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
135    if (value != null) {
136      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
137        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
138        return PooledByteBufAllocator.DEFAULT;
139      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
140        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
141        return UnpooledByteBufAllocator.DEFAULT;
142      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
143        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
144        return HeapByteBufAllocator.DEFAULT;
145      } else {
146        // If the value is none of the recognized labels, treat it as a class name. This allows the
147        // user to supply a custom implementation, perhaps for debugging.
148        try {
149          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
150          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
151          LOG.info("Using {} for buffer allocation", value);
152          return alloc;
153        } catch (ClassCastException | UnsupportedOperationException e) {
154          throw new IOException(e);
155        }
156      }
157    } else {
158      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
159      return PooledByteBufAllocator.DEFAULT;
160    }
161  }
162
163  @InterfaceAudience.Private
164  protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
165    return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
166  }
167
168  @Override
169  public synchronized void start() {
170    if (started) {
171      return;
172    }
173    authTokenSecretMgr = createSecretManager();
174    if (authTokenSecretMgr != null) {
175      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
176      // LeaderElector start. See HBASE-25875
177      synchronized (authTokenSecretMgr) {
178        setSecretManager(authTokenSecretMgr);
179        authTokenSecretMgr.start();
180      }
181    }
182    this.authManager = new ServiceAuthorizationManager();
183    HBasePolicyProvider.init(conf, authManager);
184    scheduler.start();
185    started = true;
186  }
187
188  @Override
189  public synchronized void stop() {
190    if (!running) {
191      return;
192    }
193    LOG.info("Stopping server on " + this.serverChannel.localAddress());
194    if (authTokenSecretMgr != null) {
195      authTokenSecretMgr.stop();
196      authTokenSecretMgr = null;
197    }
198    allChannels.close().awaitUninterruptibly();
199    serverChannel.close();
200    scheduler.stop();
201    closed.countDown();
202    running = false;
203  }
204
205  @Override
206  public synchronized void join() throws InterruptedException {
207    closed.await();
208  }
209
210  @Override
211  public synchronized InetSocketAddress getListenerAddress() {
212    return ((InetSocketAddress) serverChannel.localAddress());
213  }
214
215  @Override
216  public void setSocketSendBufSize(int size) {
217  }
218
219  @Override
220  public int getNumOpenConnections() {
221    int channelsCount = allChannels.size();
222    // allChannels also contains the server channel, so exclude that from the count.
223    return channelsCount > 0 ? channelsCount - 1 : channelsCount;
224  }
225
226  @Override
227  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
228    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
229    throws IOException {
230    return call(service, md, param, cellScanner, receiveTime, status,
231      EnvironmentEdgeManager.currentTime(), 0);
232  }
233
234  @Override
235  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
236    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
237    long startTime, int timeout) throws IOException {
238    NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
239      -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
240    return call(fakeCall, status);
241  }
242}