001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.ByteArrayInputStream;
027import java.net.InetSocketAddress;
028import java.net.SocketAddress;
029import java.nio.charset.StandardCharsets;
030import java.security.cert.Certificate;
031import java.security.cert.CertificateException;
032import java.security.cert.CertificateFactory;
033import java.security.cert.X509Certificate;
034import java.util.Arrays;
035import java.util.stream.Stream;
036import javax.net.ssl.SSLEngine;
037import javax.net.ssl.SSLPeerUnverifiedException;
038import javax.net.ssl.SSLSession;
039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RPCTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
053import org.junit.jupiter.api.AfterEach;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.TestInfo;
057import org.junit.jupiter.api.TestTemplate;
058import org.junit.jupiter.params.provider.Arguments;
059
060import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
061
062@Tag(RPCTests.TAG)
063@Tag(MediumTests.TAG)
064@HBaseParameterizedTestTemplate(name = "{index}: allocatorType={0}")
065public class TestNettyRpcServer {
066
067  private static final byte[] FAMILY = Bytes.toBytes("f");
068  private static final byte[] QUALIFIER = Bytes.toBytes("q");
069  private static final int NUM_ROWS = 100;
070  private static final int MIN_LEN = 1000;
071  private static final int MAX_LEN = 1000000;
072  protected static final LoadTestKVGenerator GENERATOR = new LoadTestKVGenerator(MIN_LEN, MAX_LEN);
073  protected static HBaseTestingUtil TEST_UTIL;
074  protected TableName tableName;
075  protected final String allocatorType;
076
077  public TestNettyRpcServer(String allocatorType) {
078    this.allocatorType = allocatorType;
079  }
080
081  public static Stream<Arguments> parameters() {
082    return Arrays
083      .stream(
084        new Object[] { NettyRpcServer.POOLED_ALLOCATOR_TYPE, NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE,
085          NettyRpcServer.HEAP_ALLOCATOR_TYPE, SimpleByteBufAllocator.class.getName() })
086      .map(Arguments::of);
087  }
088
089  @BeforeEach
090  public void setUpTable(TestInfo testInfo) {
091    String sanitizedAllocatorType = allocatorType.replaceAll("[^a-zA-Z0-9_.-]", "_");
092    tableName =
093      TableName.valueOf(testInfo.getTestMethod().get().getName() + "_" + sanitizedAllocatorType);
094  }
095
096  @BeforeEach
097  public void setup() throws Exception {
098    // A subclass may have already created TEST_UTIL and is now upcalling to us
099    if (TEST_UTIL == null) {
100      TEST_UTIL = new HBaseTestingUtil();
101    }
102    TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
103      NettyRpcServer.class.getName());
104    TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType);
105    TEST_UTIL.startMiniCluster();
106  }
107
108  @AfterEach
109  public void tearDown() throws Exception {
110    TEST_UTIL.shutdownMiniCluster();
111  }
112
113  @TestTemplate
114  public void testNettyRpcServer() throws Exception {
115    doTest(tableName);
116  }
117
118  protected void doTest(TableName tableName) throws Exception {
119    // Splitting just complicates the test scenario, disable it
120    final TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
121      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()).build();
122    try (Table table =
123      TEST_UTIL.createTable(desc, new byte[][] { FAMILY }, TEST_UTIL.getConfiguration())) {
124      // put some test data
125      for (int i = 0; i < NUM_ROWS; i++) {
126        final byte[] rowKey = Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(i));
127        final byte[] v = GENERATOR.generateRandomSizeValue(rowKey, QUALIFIER);
128        table.put(new Put(rowKey).addColumn(FAMILY, QUALIFIER, v));
129      }
130      // read to verify it.
131      for (int i = 0; i < NUM_ROWS; i++) {
132        final byte[] rowKey = Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(i));
133        final Result r = table.get(new Get(rowKey).addColumn(FAMILY, QUALIFIER));
134        assertNotNull(r, "Result was empty");
135        final byte[] v = r.getValue(FAMILY, QUALIFIER);
136        assertNotNull(v, "Result did not contain expected value");
137        assertTrue(LoadTestKVGenerator.verify(v, rowKey, QUALIFIER), "Value was not verified");
138      }
139    }
140  }
141
142  private static final String CERTIFICATE = "-----BEGIN CERTIFICATE-----\n"
143    + "MIIEITCCAwmgAwIBAgIUaLL8vLOhWLCLXVHEJqXJhfmsTB8wDQYJKoZIhvcNAQEL\n"
144    + "BQAwgawxCzAJBgNVBAYTAlVTMRYwFAYDVQQIDA1NYXNzYWNodXNldHRzMRIwEAYD\n"
145    + "VQQHDAlDYW1icmlkZ2UxGDAWBgNVBAoMD25ldHR5IHRlc3QgY2FzZTEYMBYGA1UE\n"
146    + "CwwPbmV0dHkgdGVzdCBjYXNlMRgwFgYDVQQDDA9uZXR0eSB0ZXN0IGNhc2UxIzAh\n"
147    + "BgkqhkiG9w0BCQEWFGNjb25uZWxsQGh1YnNwb3QuY29tMB4XDTI0MDEyMTE5MzMy\n"
148    + "MFoXDTI1MDEyMDE5MzMyMFowgawxCzAJBgNVBAYTAlVTMRYwFAYDVQQIDA1NYXNz\n"
149    + "YWNodXNldHRzMRIwEAYDVQQHDAlDYW1icmlkZ2UxGDAWBgNVBAoMD25ldHR5IHRl\n"
150    + "c3QgY2FzZTEYMBYGA1UECwwPbmV0dHkgdGVzdCBjYXNlMRgwFgYDVQQDDA9uZXR0\n"
151    + "eSB0ZXN0IGNhc2UxIzAhBgkqhkiG9w0BCQEWFGNjb25uZWxsQGh1YnNwb3QuY29t\n"
152    + "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+qzEZpQMjVdLj0siUcG\n"
153    + "y8LIHOW4S+tgHIKFkF865qWq6FVGbROe2Z0f5W6yIamZkdxzptT0iv+8S5okNNeW\n"
154    + "2NbsN/HNJIRtWfxku1Jh1gBqSkAYIjXyq7+20hIaJTzzxqike9M/Lc14EGb33Ja/\n"
155    + "kDPRV3UtiM3Ntf3eALXKbrWptkbgQngCaTgtfg8IkMAEpP270wZ9fW0lDHv3NPPt\n"
156    + "Zt0QSJzWSqWfu+l4ayvcUQYyNJesx9YmTHSJu69lvT4QApoX8FEiHfNCJ28R50CS\n"
157    + "aIgOpCWUvkH7rqx0p9q393uJRS/S6RlLbU30xUN1fNrVmP/XAapfy+R0PSgiUi8o\n"
158    + "EQIDAQABozkwNzAWBgNVHRIEDzANggt3d3cuZm9vLmNvbTAdBgNVHQ4EFgQUl4FD\n"
159    + "Y8jJ/JHJR68YqPsGUjUJuwgwDQYJKoZIhvcNAQELBQADggEBADVzivYz2M0qsWUc\n"
160    + "jXjCHymwTIr+7ud10um53FbYEAfKWsIY8Pp35fKpFzUwc5wVdCnLU86K/YMKRzNB\n"
161    + "zL2Auow3PJFRvXecOv7dWxNlNneLDcwbVrdNRu6nQXmZUgyz0oUKuJbF+JGtI+7W\n"
162    + "kRw7yhBfki+UCSQWeDqvaWzgmA4Us0N8NFq3euAs4xFbMMPMQWrT9Z7DGchCeRiB\n"
163    + "dkQBvh88vbR3v2Saq14W4Wt5rj2++vXWGQSeAQL6nGbOwc3ohW6isNNV0eGQQTmS\n"
164    + "khS2d/JDZq2XL5RGexf3CA6YYzWiTr9YZHNjuobvLH7mVnA2c8n6Zty/UhfnuK1x\n" + "JbkleFk=\n"
165    + "-----END CERTIFICATE-----";
166
167  @TestTemplate
168  public void testHandshakeCompleteHandler()
169    throws SSLPeerUnverifiedException, CertificateException {
170    NettyServerRpcConnection conn = mock(NettyServerRpcConnection.class);
171    SslHandler sslHandler = mock(SslHandler.class);
172    SocketAddress remoteAddress = new InetSocketAddress("localhost", 5555);
173    SSLEngine engine = mock(SSLEngine.class);
174    SSLSession session = mock(SSLSession.class);
175    CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
176    X509Certificate x509Certificate = (X509Certificate) certificateFactory
177      .generateCertificate(new ByteArrayInputStream(CERTIFICATE.getBytes(StandardCharsets.UTF_8)));
178    Certificate[] certificates = new Certificate[] { x509Certificate };
179
180    when(sslHandler.engine()).thenReturn(engine);
181    when(engine.getSession()).thenReturn(session);
182    when(session.getPeerCertificates()).thenReturn(certificates);
183
184    NettyRpcServer.sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress);
185
186    assertArrayEquals(certificates, conn.clientCertificateChain);
187  }
188
189}