001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.ByteArrayInputStream; 027import java.net.InetSocketAddress; 028import java.net.SocketAddress; 029import java.nio.charset.StandardCharsets; 030import java.security.cert.Certificate; 031import java.security.cert.CertificateException; 032import java.security.cert.CertificateFactory; 033import java.security.cert.X509Certificate; 034import java.util.Arrays; 035import java.util.stream.Stream; 036import javax.net.ssl.SSLEngine; 037import javax.net.ssl.SSLPeerUnverifiedException; 038import javax.net.ssl.SSLSession; 039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RPCTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.TestInfo; 057import org.junit.jupiter.api.TestTemplate; 058import org.junit.jupiter.params.provider.Arguments; 059 060import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 061 062@Tag(RPCTests.TAG) 063@Tag(MediumTests.TAG) 064@HBaseParameterizedTestTemplate(name = "{index}: allocatorType={0}") 065public class TestNettyRpcServer { 066 067 private static final byte[] FAMILY = Bytes.toBytes("f"); 068 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 069 private static final int NUM_ROWS = 100; 070 private static final int MIN_LEN = 1000; 071 private static final int MAX_LEN = 1000000; 072 protected static final LoadTestKVGenerator GENERATOR = new LoadTestKVGenerator(MIN_LEN, MAX_LEN); 073 protected static HBaseTestingUtil TEST_UTIL; 074 protected TableName tableName; 075 protected final String allocatorType; 076 077 public TestNettyRpcServer(String allocatorType) { 078 this.allocatorType = allocatorType; 079 } 080 081 public static Stream<Arguments> parameters() { 082 return Arrays 083 .stream( 084 new Object[] { NettyRpcServer.POOLED_ALLOCATOR_TYPE, NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE, 085 NettyRpcServer.HEAP_ALLOCATOR_TYPE, SimpleByteBufAllocator.class.getName() }) 086 .map(Arguments::of); 087 } 088 089 @BeforeEach 090 public void setUpTable(TestInfo testInfo) { 091 String sanitizedAllocatorType = allocatorType.replaceAll("[^a-zA-Z0-9_.-]", "_"); 092 tableName = 093 TableName.valueOf(testInfo.getTestMethod().get().getName() + "_" + sanitizedAllocatorType); 094 } 095 096 @BeforeEach 097 public void setup() throws Exception { 098 // A subclass may have already created TEST_UTIL and is now upcalling to us 099 if (TEST_UTIL == null) { 100 TEST_UTIL = new HBaseTestingUtil(); 101 } 102 TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 103 NettyRpcServer.class.getName()); 104 TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType); 105 TEST_UTIL.startMiniCluster(); 106 } 107 108 @AfterEach 109 public void tearDown() throws Exception { 110 TEST_UTIL.shutdownMiniCluster(); 111 } 112 113 @TestTemplate 114 public void testNettyRpcServer() throws Exception { 115 doTest(tableName); 116 } 117 118 protected void doTest(TableName tableName) throws Exception { 119 // Splitting just complicates the test scenario, disable it 120 final TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 121 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()).build(); 122 try (Table table = 123 TEST_UTIL.createTable(desc, new byte[][] { FAMILY }, TEST_UTIL.getConfiguration())) { 124 // put some test data 125 for (int i = 0; i < NUM_ROWS; i++) { 126 final byte[] rowKey = Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(i)); 127 final byte[] v = GENERATOR.generateRandomSizeValue(rowKey, QUALIFIER); 128 table.put(new Put(rowKey).addColumn(FAMILY, QUALIFIER, v)); 129 } 130 // read to verify it. 131 for (int i = 0; i < NUM_ROWS; i++) { 132 final byte[] rowKey = Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(i)); 133 final Result r = table.get(new Get(rowKey).addColumn(FAMILY, QUALIFIER)); 134 assertNotNull(r, "Result was empty"); 135 final byte[] v = r.getValue(FAMILY, QUALIFIER); 136 assertNotNull(v, "Result did not contain expected value"); 137 assertTrue(LoadTestKVGenerator.verify(v, rowKey, QUALIFIER), "Value was not verified"); 138 } 139 } 140 } 141 142 private static final String CERTIFICATE = "-----BEGIN CERTIFICATE-----\n" 143 + "MIIEITCCAwmgAwIBAgIUaLL8vLOhWLCLXVHEJqXJhfmsTB8wDQYJKoZIhvcNAQEL\n" 144 + "BQAwgawxCzAJBgNVBAYTAlVTMRYwFAYDVQQIDA1NYXNzYWNodXNldHRzMRIwEAYD\n" 145 + "VQQHDAlDYW1icmlkZ2UxGDAWBgNVBAoMD25ldHR5IHRlc3QgY2FzZTEYMBYGA1UE\n" 146 + "CwwPbmV0dHkgdGVzdCBjYXNlMRgwFgYDVQQDDA9uZXR0eSB0ZXN0IGNhc2UxIzAh\n" 147 + "BgkqhkiG9w0BCQEWFGNjb25uZWxsQGh1YnNwb3QuY29tMB4XDTI0MDEyMTE5MzMy\n" 148 + "MFoXDTI1MDEyMDE5MzMyMFowgawxCzAJBgNVBAYTAlVTMRYwFAYDVQQIDA1NYXNz\n" 149 + "YWNodXNldHRzMRIwEAYDVQQHDAlDYW1icmlkZ2UxGDAWBgNVBAoMD25ldHR5IHRl\n" 150 + "c3QgY2FzZTEYMBYGA1UECwwPbmV0dHkgdGVzdCBjYXNlMRgwFgYDVQQDDA9uZXR0\n" 151 + "eSB0ZXN0IGNhc2UxIzAhBgkqhkiG9w0BCQEWFGNjb25uZWxsQGh1YnNwb3QuY29t\n" 152 + "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+qzEZpQMjVdLj0siUcG\n" 153 + "y8LIHOW4S+tgHIKFkF865qWq6FVGbROe2Z0f5W6yIamZkdxzptT0iv+8S5okNNeW\n" 154 + "2NbsN/HNJIRtWfxku1Jh1gBqSkAYIjXyq7+20hIaJTzzxqike9M/Lc14EGb33Ja/\n" 155 + "kDPRV3UtiM3Ntf3eALXKbrWptkbgQngCaTgtfg8IkMAEpP270wZ9fW0lDHv3NPPt\n" 156 + "Zt0QSJzWSqWfu+l4ayvcUQYyNJesx9YmTHSJu69lvT4QApoX8FEiHfNCJ28R50CS\n" 157 + "aIgOpCWUvkH7rqx0p9q393uJRS/S6RlLbU30xUN1fNrVmP/XAapfy+R0PSgiUi8o\n" 158 + "EQIDAQABozkwNzAWBgNVHRIEDzANggt3d3cuZm9vLmNvbTAdBgNVHQ4EFgQUl4FD\n" 159 + "Y8jJ/JHJR68YqPsGUjUJuwgwDQYJKoZIhvcNAQELBQADggEBADVzivYz2M0qsWUc\n" 160 + "jXjCHymwTIr+7ud10um53FbYEAfKWsIY8Pp35fKpFzUwc5wVdCnLU86K/YMKRzNB\n" 161 + "zL2Auow3PJFRvXecOv7dWxNlNneLDcwbVrdNRu6nQXmZUgyz0oUKuJbF+JGtI+7W\n" 162 + "kRw7yhBfki+UCSQWeDqvaWzgmA4Us0N8NFq3euAs4xFbMMPMQWrT9Z7DGchCeRiB\n" 163 + "dkQBvh88vbR3v2Saq14W4Wt5rj2++vXWGQSeAQL6nGbOwc3ohW6isNNV0eGQQTmS\n" 164 + "khS2d/JDZq2XL5RGexf3CA6YYzWiTr9YZHNjuobvLH7mVnA2c8n6Zty/UhfnuK1x\n" + "JbkleFk=\n" 165 + "-----END CERTIFICATE-----"; 166 167 @TestTemplate 168 public void testHandshakeCompleteHandler() 169 throws SSLPeerUnverifiedException, CertificateException { 170 NettyServerRpcConnection conn = mock(NettyServerRpcConnection.class); 171 SslHandler sslHandler = mock(SslHandler.class); 172 SocketAddress remoteAddress = new InetSocketAddress("localhost", 5555); 173 SSLEngine engine = mock(SSLEngine.class); 174 SSLSession session = mock(SSLSession.class); 175 CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509"); 176 X509Certificate x509Certificate = (X509Certificate) certificateFactory 177 .generateCertificate(new ByteArrayInputStream(CERTIFICATE.getBytes(StandardCharsets.UTF_8))); 178 Certificate[] certificates = new Certificate[] { x509Certificate }; 179 180 when(sslHandler.engine()).thenReturn(engine); 181 when(engine.getSession()).thenReturn(session); 182 when(session.getPeerCertificates()).thenReturn(certificates); 183 184 NettyRpcServer.sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress); 185 186 assertArrayEquals(certificates, conn.clientCertificateChain); 187 } 188 189}