001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetSocketAddress;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.codec.Codec;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.testclassification.RPCTests;
033import org.apache.hadoop.hbase.util.JVM;
034import org.junit.AfterClass;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.experimental.categories.Category;
038import org.junit.runner.RunWith;
039import org.junit.runners.Parameterized;
040import org.junit.runners.Parameterized.Parameter;
041import org.junit.runners.Parameterized.Parameters;
042import org.apache.hbase.thirdparty.io.netty.channel.Channel;
043import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
044import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
045import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
046import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
047
048@RunWith(Parameterized.class)
049@Category({ RPCTests.class, MediumTests.class })
050public class TestNettyIPC extends AbstractTestIPC {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestNettyIPC.class);
055
056  @Parameters(name = "{index}: EventLoop={0}")
057  public static Collection<Object[]> parameters() {
058    List<Object[]> params = new ArrayList<>();
059    params.add(new Object[] { "nio" });
060    params.add(new Object[] { "perClientNio" });
061    if (JVM.isLinux() && JVM.isAmd64()) {
062      params.add(new Object[] { "epoll" });
063    }
064    return params;
065  }
066
067  @Parameter
068  public String eventLoopType;
069
070  private static NioEventLoopGroup NIO;
071
072  private static EpollEventLoopGroup EPOLL;
073
074  @BeforeClass
075  public static void setUpBeforeClass() {
076    NIO = new NioEventLoopGroup();
077    if (JVM.isLinux() && JVM.isAmd64()) {
078      EPOLL = new EpollEventLoopGroup();
079    }
080  }
081
082  @AfterClass
083  public static void tearDownAfterClass() {
084    if (NIO != null) {
085      NIO.shutdownGracefully();
086    }
087    if (EPOLL != null) {
088      EPOLL.shutdownGracefully();
089    }
090  }
091
092  private void setConf(Configuration conf) {
093    switch (eventLoopType) {
094      case "nio":
095        NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class);
096        break;
097      case "epoll":
098        NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class);
099        break;
100      case "perClientNio":
101        NettyRpcClientConfigHelper.createEventLoopPerClient(conf);
102        break;
103      default:
104        break;
105    }
106  }
107
108  @Override
109  protected RpcServer createRpcServer(Server server, String name,
110      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
111      Configuration conf, RpcScheduler scheduler) throws IOException {
112    return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
113  }
114
115  @Override
116  protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
117    setConf(conf);
118    return new NettyRpcClient(conf) {
119
120      @Override
121      Codec getCodec() {
122        return null;
123      }
124
125    };
126  }
127
128  @Override
129  protected NettyRpcClient createRpcClient(Configuration conf) {
130    setConf(conf);
131    return new NettyRpcClient(conf);
132  }
133
134  @Override
135  protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
136    setConf(conf);
137    return new NettyRpcClient(conf) {
138
139      @Override
140      boolean isTcpNoDelay() {
141        throw new RuntimeException("Injected fault");
142      }
143    };
144  }
145
146  private static class TestFailingRpcServer extends NettyRpcServer {
147
148    TestFailingRpcServer(Server server, String name,
149        List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
150        Configuration conf, RpcScheduler scheduler) throws IOException {
151      super(server, name, services, bindAddress, conf, scheduler, true);
152    }
153
154    static final class FailingConnection extends NettyServerRpcConnection {
155      private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) {
156        super(rpcServer, channel);
157      }
158
159      @Override
160      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
161        // this will throw exception after the connection header is read, and an RPC is sent
162        // from client
163        throw new DoNotRetryIOException("Failing for test");
164      }
165    }
166
167    @Override
168    protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
169      return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) {
170        @Override
171        protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
172          return new FailingConnection(TestFailingRpcServer.this, channel);
173        }
174      };
175    }
176  }
177
178  @Override
179  protected RpcServer createTestFailingRpcServer(Server server, String name,
180      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
181      Configuration conf, RpcScheduler scheduler) throws IOException {
182    return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
183  }
184}