001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.stream.Stream;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
026import org.apache.hadoop.hbase.codec.Codec;
027import org.apache.hadoop.hbase.testclassification.MediumTests;
028import org.apache.hadoop.hbase.testclassification.RPCTests;
029import org.apache.hadoop.hbase.util.JVM;
030import org.junit.jupiter.api.AfterAll;
031import org.junit.jupiter.api.BeforeAll;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.params.provider.Arguments;
034
035import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
036import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
037import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
038import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
039
040@Tag(RPCTests.TAG)
041@Tag(MediumTests.TAG)
042@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
043public class TestNettyIPC extends AbstractTestIPC {
044
045  private static List<String> getEventLoopTypes() {
046    List<String> types = new ArrayList<>();
047    types.add("nio");
048    types.add("perClientNio");
049    if (JVM.isLinux() && JVM.isAmd64()) {
050      types.add("epoll");
051    }
052    return types;
053  }
054
055  public static Stream<Arguments> parameters() {
056    List<Arguments> params = new ArrayList<>();
057    for (String eventLoopType : getEventLoopTypes()) {
058      params.add(Arguments.of(SimpleRpcServer.class, eventLoopType));
059      params.add(Arguments.of(NettyRpcServer.class, eventLoopType));
060    }
061    return params.stream();
062  }
063
064  private String eventLoopType;
065
066  public TestNettyIPC(Class<? extends RpcServer> rpcServerImpl, String eventLoopType) {
067    super(rpcServerImpl);
068    this.eventLoopType = eventLoopType;
069  }
070
071  private static NioEventLoopGroup NIO;
072
073  private static EpollEventLoopGroup EPOLL;
074
075  @BeforeAll
076  public static void setUpBeforeClass() {
077    NIO = new NioEventLoopGroup();
078    if (JVM.isLinux() && JVM.isAmd64()) {
079      EPOLL = new EpollEventLoopGroup();
080    }
081  }
082
083  @AfterAll
084  public static void tearDownAfterClass() {
085    if (NIO != null) {
086      NIO.shutdownGracefully();
087    }
088    if (EPOLL != null) {
089      EPOLL.shutdownGracefully();
090    }
091  }
092
093  private void setConf(Configuration conf) {
094    switch (eventLoopType) {
095      case "nio":
096        NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class);
097        break;
098      case "epoll":
099        NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class);
100        break;
101      case "perClientNio":
102        NettyRpcClientConfigHelper.createEventLoopPerClient(conf);
103        break;
104      default:
105        break;
106    }
107  }
108
109  @Override
110  protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
111    setConf(conf);
112    return new NettyRpcClient(conf) {
113
114      @Override
115      protected Codec getCodec() {
116        return null;
117      }
118
119    };
120  }
121
122  @Override
123  protected NettyRpcClient createRpcClient(Configuration conf) {
124    setConf(conf);
125    return new NettyRpcClient(conf);
126  }
127
128  @Override
129  protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
130    setConf(conf);
131    return new NettyRpcClient(conf) {
132
133      @Override
134      protected boolean isTcpNoDelay() {
135        throw new RuntimeException("Injected fault");
136      }
137    };
138  }
139
140  @Override
141  protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
142    return new NettyRpcClient(conf) {
143
144      @Override
145      protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {
146        return new BadAuthNettyRpcConnection(this, remoteId);
147      }
148    };
149  }
150}