001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.File; 028import java.io.IOException; 029import java.net.InetSocketAddress; 030import java.nio.file.Path; 031import java.nio.file.Paths; 032import java.security.GeneralSecurityException; 033import java.security.Security; 034import java.time.Duration; 035import java.util.ArrayList; 036import java.util.List; 037import java.util.Objects; 038import java.util.concurrent.CountDownLatch; 039import java.util.concurrent.TimeUnit; 040import org.apache.commons.io.FileUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseServerBase; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.io.FileChangeWatcher; 048import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType; 049import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType; 050import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext; 051import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider; 052import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 053import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 054import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; 057import org.apache.hadoop.hbase.ipc.NettyRpcClient; 058import org.apache.hadoop.hbase.ipc.NettyRpcServer; 059import org.apache.hadoop.hbase.ipc.RpcScheduler; 060import org.apache.hadoop.hbase.ipc.RpcServer; 061import org.apache.hadoop.hbase.net.Address; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.RPCTests; 064import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 065import org.bouncycastle.jce.provider.BouncyCastleProvider; 066import org.bouncycastle.operator.OperatorCreationException; 067import org.junit.After; 068import org.junit.AfterClass; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Test; 073import org.junit.experimental.categories.Category; 074import org.junit.runner.RunWith; 075import org.junit.runners.Parameterized; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 080import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 081 082import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 083import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; 084 085@RunWith(Parameterized.class) 086@Category({ RPCTests.class, MediumTests.class }) 087public class TestNettyTLSIPCFileWatcher { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestNettyTLSIPCFileWatcher.class); 092 093 private static final Logger LOG = LoggerFactory.getLogger(TestNettyTLSIPCFileWatcher.class); 094 095 private static final Configuration CONF = HBaseConfiguration.create(); 096 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(CONF); 097 private static HBaseServerBase<?> SERVER; 098 private static X509TestContextProvider PROVIDER; 099 private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG; 100 101 private X509TestContext x509TestContext; 102 103 @Parameterized.Parameter(0) 104 public X509KeyType keyType; 105 106 @Parameterized.Parameter(1) 107 public KeyStoreFileType storeFileType; 108 109 @Parameterized.Parameters(name = "{index}: keyType={0}, storeFileType={1}") 110 public static List<Object[]> data() { 111 List<Object[]> params = new ArrayList<>(); 112 for (X509KeyType caKeyType : X509KeyType.values()) { 113 for (KeyStoreFileType ks : KeyStoreFileType.values()) { 114 params.add(new Object[] { caKeyType, ks }); 115 } 116 } 117 return params; 118 } 119 120 @BeforeClass 121 public static void setUpBeforeClass() throws IOException { 122 Security.addProvider(new BouncyCastleProvider()); 123 File dir = 124 new File(UTIL.getDataTestDir(TestNettyTLSIPCFileWatcher.class.getSimpleName()).toString()) 125 .getCanonicalFile(); 126 FileUtils.forceMkdir(dir); 127 // server must enable tls 128 CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true); 129 PROVIDER = new X509TestContextProvider(CONF, dir); 130 EVENT_LOOP_GROUP_CONFIG = 131 NettyEventLoopGroupConfig.setup(CONF, TestNettyTLSIPCFileWatcher.class.getSimpleName()); 132 SERVER = mock(HBaseServerBase.class); 133 when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); 134 } 135 136 @AfterClass 137 public static void tearDownAfterClass() throws InterruptedException { 138 Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME); 139 EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync(); 140 UTIL.cleanupTestDir(); 141 } 142 143 @Before 144 public void setUp() throws IOException { 145 x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray()); 146 x509TestContext.setConfigurations(storeFileType, storeFileType); 147 CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false); 148 CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true); 149 CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true); 150 CONF.setLong(X509Util.HBASE_TLS_FILEPOLL_INTERVAL_MILLIS, 10); 151 } 152 153 @After 154 public void tearDown() { 155 x509TestContext.clearConfigurations(); 156 x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP); 157 x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR); 158 x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL); 159 x509TestContext.getConf().unset(X509Util.HBASE_TLS_FILEPOLL_INTERVAL_MILLIS); 160 System.clearProperty("com.sun.net.ssl.checkRevocation"); 161 System.clearProperty("com.sun.security.enableCRLDP"); 162 Security.setProperty("ocsp.enable", Boolean.FALSE.toString()); 163 Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString()); 164 } 165 166 @Test 167 public void testReplaceServerKeystore() throws IOException, ServiceException, 168 GeneralSecurityException, OperatorCreationException, InterruptedException { 169 Configuration clientConf = new Configuration(CONF); 170 RpcServer rpcServer = createRpcServer("testRpcServer", 171 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 172 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 173 174 try { 175 rpcServer.start(); 176 177 try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) { 178 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 179 newBlockingStub(client, rpcServer.getListenerAddress()); 180 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 181 String message = "hello"; 182 assertEquals(message, 183 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 184 .getMessage()); 185 assertNull(pcrc.cellScanner()); 186 } 187 188 // truststore file change latch 189 final CountDownLatch latch = new CountDownLatch(1); 190 final Path trustStorePath = Paths.get(CONF.get(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION)); 191 createAndStartFileWatcher(trustStorePath, latch, Duration.ofMillis(20)); 192 193 // Replace keystore 194 x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); 195 196 if (!latch.await(1, TimeUnit.SECONDS)) { 197 throw new AssertionError("Timed out waiting for truststore file to be changed"); 198 } 199 200 try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) { 201 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 202 newBlockingStub(client, rpcServer.getListenerAddress()); 203 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 204 String message = "hello"; 205 assertEquals(message, 206 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 207 .getMessage()); 208 assertNull(pcrc.cellScanner()); 209 } 210 211 } finally { 212 rpcServer.stop(); 213 } 214 } 215 216 @Test 217 public void testReplaceClientAndServerKeystore() throws GeneralSecurityException, IOException, 218 OperatorCreationException, ServiceException, InterruptedException { 219 Configuration clientConf = new Configuration(CONF); 220 RpcServer rpcServer = createRpcServer("testRpcServer", 221 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 222 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 223 224 try { 225 rpcServer.start(); 226 227 try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) { 228 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 229 newBlockingStub(client, rpcServer.getListenerAddress()); 230 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 231 String message = "hello"; 232 assertEquals(message, 233 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 234 .getMessage()); 235 assertNull(pcrc.cellScanner()); 236 237 // truststore file change latch 238 final CountDownLatch latch = new CountDownLatch(1); 239 240 final Path trustStorePath = Paths.get(CONF.get(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION)); 241 createAndStartFileWatcher(trustStorePath, latch, Duration.ofMillis(20)); 242 243 // Replace keystore and cancel client connections 244 x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); 245 client.cancelConnections( 246 ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L)); 247 248 if (!latch.await(1, TimeUnit.SECONDS)) { 249 throw new AssertionError("Timed out waiting for truststore file to be changed"); 250 } 251 252 assertEquals(message, 253 stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) 254 .getMessage()); 255 assertNull(pcrc.cellScanner()); 256 } 257 } finally { 258 rpcServer.stop(); 259 } 260 } 261 262 private RpcServer createRpcServer(String name, 263 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 264 Configuration conf, RpcScheduler scheduler) throws IOException { 265 return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true); 266 } 267 268 private void createAndStartFileWatcher(Path trustStorePath, CountDownLatch latch, 269 Duration duration) throws IOException { 270 FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(trustStorePath, 271 Objects.toString(trustStorePath.getFileName()), duration, watchEventFilePath -> { 272 LOG.info("File " + watchEventFilePath.getFileName() + " has been changed."); 273 latch.countDown(); 274 }); 275 fileChangeWatcher.start(); 276 } 277}