001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetSocketAddress;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.codec.Codec;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.testclassification.RPCTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.apache.hadoop.hbase.util.JVM;
034import org.junit.AfterClass;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.experimental.categories.Category;
038import org.junit.runner.RunWith;
039import org.junit.runners.Parameterized;
040import org.junit.runners.Parameterized.Parameter;
041import org.junit.runners.Parameterized.Parameters;
042
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
045import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
046import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
047import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
048
049@RunWith(Parameterized.class)
050@Category({ RPCTests.class, SmallTests.class })
051public class TestNettyIPC extends AbstractTestIPC {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestNettyIPC.class);
056
057  @Parameters(name = "{index}: EventLoop={0}")
058  public static Collection<Object[]> parameters() {
059    List<Object[]> params = new ArrayList<>();
060    params.add(new Object[] { "nio" });
061    params.add(new Object[] { "perClientNio" });
062    if (JVM.isLinux() && JVM.isAmd64()) {
063      params.add(new Object[] { "epoll" });
064    }
065    return params;
066  }
067
068  @Parameter
069  public String eventLoopType;
070
071  private static NioEventLoopGroup NIO;
072
073  private static EpollEventLoopGroup EPOLL;
074
075  @BeforeClass
076  public static void setUpBeforeClass() {
077    NIO = new NioEventLoopGroup();
078    if (JVM.isLinux() && JVM.isAmd64()) {
079      EPOLL = new EpollEventLoopGroup();
080    }
081  }
082
083  @AfterClass
084  public static void tearDownAfterClass() {
085    if (NIO != null) {
086      NIO.shutdownGracefully();
087    }
088    if (EPOLL != null) {
089      EPOLL.shutdownGracefully();
090    }
091  }
092
093  private void setConf(Configuration conf) {
094    switch (eventLoopType) {
095      case "nio":
096        NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class);
097        break;
098      case "epoll":
099        NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class);
100        break;
101      case "perClientNio":
102        NettyRpcClientConfigHelper.createEventLoopPerClient(conf);
103        break;
104      default:
105        break;
106    }
107  }
108
109  @Override
110  protected RpcServer createRpcServer(Server server, String name,
111      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
112      Configuration conf, RpcScheduler scheduler) throws IOException {
113    return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
114  }
115
116  @Override
117  protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
118    setConf(conf);
119    return new NettyRpcClient(conf) {
120
121      @Override
122      Codec getCodec() {
123        return null;
124      }
125
126    };
127  }
128
129  @Override
130  protected NettyRpcClient createRpcClient(Configuration conf) {
131    setConf(conf);
132    return new NettyRpcClient(conf);
133  }
134
135  @Override
136  protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
137    setConf(conf);
138    return new NettyRpcClient(conf) {
139
140      @Override
141      boolean isTcpNoDelay() {
142        throw new RuntimeException("Injected fault");
143      }
144    };
145  }
146
147  private static class TestFailingRpcServer extends NettyRpcServer {
148
149    TestFailingRpcServer(Server server, String name,
150        List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
151        Configuration conf, RpcScheduler scheduler) throws IOException {
152      super(server, name, services, bindAddress, conf, scheduler, true);
153    }
154
155    static final class FailingConnection extends NettyServerRpcConnection {
156      private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) {
157        super(rpcServer, channel);
158      }
159
160      @Override
161      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
162        // this will throw exception after the connection header is read, and an RPC is sent
163        // from client
164        throw new DoNotRetryIOException("Failing for test");
165      }
166    }
167
168    @Override
169    protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
170      return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) {
171        @Override
172        protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
173          return new FailingConnection(TestFailingRpcServer.this, channel);
174        }
175      };
176    }
177  }
178
179  @Override
180  protected RpcServer createTestFailingRpcServer(Server server, String name,
181      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
182      Configuration conf, RpcScheduler scheduler) throws IOException {
183    return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
184  }
185}