001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022
023import java.io.BufferedInputStream;
024import java.io.DataInputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.util.Arrays;
030import java.util.stream.Stream;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.MetricsConnection;
036import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
037import org.apache.hadoop.hbase.security.AuthMethod;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RPCTests;
040import org.junit.jupiter.api.AfterEach;
041import org.junit.jupiter.api.BeforeEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.TestTemplate;
044import org.junit.jupiter.params.provider.Arguments;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
047
048import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
056
057@Tag(RPCTests.TAG)
058@Tag(MediumTests.TAG)
059@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}")
060public class TestRpcServerSlowConnectionSetup {
061
062  private RpcServer server;
063
064  private Socket socket;
065  private final Class<? extends RpcServer> rpcServerImpl;
066
067  public TestRpcServerSlowConnectionSetup(Class<? extends RpcServer> rpcServerImpl) {
068    this.rpcServerImpl = rpcServerImpl;
069  }
070
071  public static Stream<Arguments> parameters() {
072    return Arrays.stream(new Object[] { SimpleRpcServer.class, NettyRpcServer.class })
073      .map(Arguments::of);
074  }
075
076  @BeforeEach
077  public void setUp() throws IOException {
078    Configuration conf = HBaseConfiguration.create();
079    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl.getName());
080    server = RpcServerFactory.createRpcServer(null, "testRpcServer",
081      Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
082      new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
083    server.start();
084    socket = new Socket("localhost", server.getListenerAddress().getPort());
085  }
086
087  @AfterEach
088  public void tearDown() throws IOException {
089    if (socket != null) {
090      socket.close();
091    }
092    if (server != null) {
093      server.stop();
094    }
095  }
096
097  @TestTemplate
098  public void test() throws IOException, InterruptedException {
099    int rpcHeaderLen = HConstants.RPC_HEADER.length;
100    byte[] preamble = new byte[rpcHeaderLen + 2];
101    System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
102    preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
103    preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code;
104    socket.getOutputStream().write(preamble, 0, rpcHeaderLen + 1);
105    socket.getOutputStream().flush();
106    Thread.sleep(5000);
107    socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
108    socket.getOutputStream().flush();
109
110    ConnectionHeader header = ConnectionHeader.newBuilder()
111      .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getName())
112      .setVersionInfo(ProtobufUtil.getVersionInfo()).build();
113    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
114    dos.writeInt(header.getSerializedSize());
115    header.writeTo(dos);
116    dos.flush();
117
118    int callId = 10;
119    Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
120      EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
121      HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats());
122    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
123    dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
124    requestHeader.writeDelimitedTo(dos);
125    call.param.writeDelimitedTo(dos);
126    dos.flush();
127
128    DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
129    int size = dis.readInt();
130    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis);
131    assertEquals(callId, responseHeader.getCallId());
132    EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder();
133    builder.mergeDelimitedFrom(dis);
134    assertEquals(size, IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build()));
135  }
136}