001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.junit.Assert.assertEquals;
022
023import java.io.BufferedInputStream;
024import java.io.DataInputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.net.Socket;
029import java.util.Arrays;
030import java.util.List;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.MetricsConnection;
036import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
037import org.apache.hadoop.hbase.security.AuthMethod;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RPCTests;
040import org.junit.After;
041import org.junit.Before;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.runner.RunWith;
046import org.junit.runners.Parameterized;
047import org.junit.runners.Parameterized.Parameter;
048import org.junit.runners.Parameterized.Parameters;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
055import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
060
061@RunWith(Parameterized.class)
062@Category({ RPCTests.class, MediumTests.class })
063public class TestRpcServerSlowConnectionSetup {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestRpcServerSlowConnectionSetup.class);
068
069  private RpcServer server;
070
071  private Socket socket;
072
073  @Parameter
074  public Class<? extends RpcServer> rpcServerImpl;
075
076  @Parameters(name = "{index}: rpcServerImpl={0}")
077  public static List<Object[]> params() {
078    return Arrays.asList(new Object[] { SimpleRpcServer.class },
079      new Object[] { NettyRpcServer.class });
080  }
081
082  @Before
083  public void setUp() throws IOException {
084    Configuration conf = HBaseConfiguration.create();
085    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl.getName());
086    server = RpcServerFactory.createRpcServer(null, "testRpcServer",
087      Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
088      new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
089    server.start();
090    socket = new Socket("localhost", server.getListenerAddress().getPort());
091  }
092
093  @After
094  public void tearDown() throws IOException {
095    if (socket != null) {
096      socket.close();
097    }
098    if (server != null) {
099      server.stop();
100    }
101  }
102
103  @Test
104  public void test() throws IOException, InterruptedException {
105    int rpcHeaderLen = HConstants.RPC_HEADER.length;
106    byte[] preamble = new byte[rpcHeaderLen + 2];
107    System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
108    preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
109    preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code;
110    socket.getOutputStream().write(preamble, 0, rpcHeaderLen + 1);
111    socket.getOutputStream().flush();
112    Thread.sleep(5000);
113    socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
114    socket.getOutputStream().flush();
115
116    ConnectionHeader header = ConnectionHeader.newBuilder()
117      .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getName())
118      .setVersionInfo(ProtobufUtil.getVersionInfo()).build();
119    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
120    dos.writeInt(header.getSerializedSize());
121    header.writeTo(dos);
122    dos.flush();
123
124    int callId = 10;
125    Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
126      EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
127      HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
128    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
129    dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
130    requestHeader.writeDelimitedTo(dos);
131    call.param.writeDelimitedTo(dos);
132    dos.flush();
133
134    DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
135    int size = dis.readInt();
136    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis);
137    assertEquals(callId, responseHeader.getCallId());
138    EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder();
139    builder.mergeDelimitedFrom(dis);
140    assertEquals(size, IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build()));
141  }
142}