001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.File;
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.nio.file.Path;
031import java.nio.file.Paths;
032import java.security.GeneralSecurityException;
033import java.security.Security;
034import java.time.Duration;
035import java.util.ArrayList;
036import java.util.List;
037import java.util.Objects;
038import java.util.concurrent.CountDownLatch;
039import java.util.concurrent.TimeUnit;
040import java.util.stream.Stream;
041import org.apache.commons.io.FileUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
046import org.apache.hadoop.hbase.HBaseServerBase;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.io.FileChangeWatcher;
049import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
050import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
051import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
052import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
053import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
054import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
055import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
056import org.apache.hadoop.hbase.ipc.HBaseRpcController;
057import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
058import org.apache.hadoop.hbase.ipc.NettyRpcClient;
059import org.apache.hadoop.hbase.ipc.NettyRpcServer;
060import org.apache.hadoop.hbase.ipc.RpcScheduler;
061import org.apache.hadoop.hbase.ipc.RpcServer;
062import org.apache.hadoop.hbase.net.Address;
063import org.apache.hadoop.hbase.testclassification.RPCTests;
064import org.apache.hadoop.hbase.testclassification.SmallTests;
065import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
066import org.bouncycastle.jce.provider.BouncyCastleProvider;
067import org.bouncycastle.operator.OperatorCreationException;
068import org.junit.jupiter.api.AfterAll;
069import org.junit.jupiter.api.AfterEach;
070import org.junit.jupiter.api.BeforeAll;
071import org.junit.jupiter.api.BeforeEach;
072import org.junit.jupiter.api.Tag;
073import org.junit.jupiter.api.TestTemplate;
074import org.junit.jupiter.params.provider.Arguments;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
080
081import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
082import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
083
084@Tag(RPCTests.TAG)
085@Tag(SmallTests.TAG)
086@HBaseParameterizedTestTemplate(name = "{index}: keyType={0}, storeFileType={1}")
087public class TestNettyTLSIPCFileWatcher {
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestNettyTLSIPCFileWatcher.class);
090
091  private static final Configuration CONF = HBaseConfiguration.create();
092  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(CONF);
093  private static HBaseServerBase<?> SERVER;
094  private static X509TestContextProvider PROVIDER;
095  private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
096
097  private X509TestContext x509TestContext;
098
099  private X509KeyType keyType;
100
101  private KeyStoreFileType storeFileType;
102
103  public static Stream<Arguments> parameters() {
104    List<Arguments> params = new ArrayList<>();
105    for (X509KeyType caKeyType : X509KeyType.values()) {
106      for (KeyStoreFileType ks : KeyStoreFileType.values()) {
107        params.add(Arguments.of(caKeyType, ks));
108      }
109    }
110    return params.stream();
111  }
112
113  public TestNettyTLSIPCFileWatcher(X509KeyType keyType, KeyStoreFileType storeFileType) {
114    this.keyType = keyType;
115    this.storeFileType = storeFileType;
116  }
117
118  @BeforeAll
119  public static void setUpBeforeClass() throws IOException {
120    Security.addProvider(new BouncyCastleProvider());
121    File dir =
122      new File(UTIL.getDataTestDir(TestNettyTLSIPCFileWatcher.class.getSimpleName()).toString())
123        .getCanonicalFile();
124    FileUtils.forceMkdir(dir);
125    // server must enable tls
126    CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
127    PROVIDER = new X509TestContextProvider(CONF, dir);
128    EVENT_LOOP_GROUP_CONFIG =
129      NettyEventLoopGroupConfig.setup(CONF, TestNettyTLSIPCFileWatcher.class.getSimpleName());
130    SERVER = mock(HBaseServerBase.class);
131    when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG);
132  }
133
134  @AfterAll
135  public static void tearDownAfterClass() throws InterruptedException {
136    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
137    EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync();
138    UTIL.cleanupTestDir();
139  }
140
141  @BeforeEach
142  public void setUp() throws IOException {
143    x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray());
144    x509TestContext.setConfigurations(storeFileType, storeFileType);
145    CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
146    CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true);
147    CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true);
148    CONF.setLong(X509Util.HBASE_TLS_FILEPOLL_INTERVAL_MILLIS, 10);
149  }
150
151  @AfterEach
152  public void tearDown() {
153    x509TestContext.clearConfigurations();
154    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
155    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
156    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
157    x509TestContext.getConf().unset(X509Util.HBASE_TLS_FILEPOLL_INTERVAL_MILLIS);
158    System.clearProperty("com.sun.net.ssl.checkRevocation");
159    System.clearProperty("com.sun.security.enableCRLDP");
160    Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
161    Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
162  }
163
164  @TestTemplate
165  public void testReplaceServerKeystore() throws IOException, ServiceException,
166    GeneralSecurityException, OperatorCreationException, InterruptedException {
167    Configuration clientConf = new Configuration(CONF);
168    RpcServer rpcServer = createRpcServer("testRpcServer",
169      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
170      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
171
172    try {
173      rpcServer.start();
174
175      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
176        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
177          newBlockingStub(client, rpcServer.getListenerAddress());
178        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
179        String message = "hello";
180        assertEquals(message,
181          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
182            .getMessage());
183        assertNull(pcrc.cellScanner());
184      }
185
186      // truststore file change latch
187      final CountDownLatch latch = new CountDownLatch(1);
188      final Path trustStorePath = Paths.get(CONF.get(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION));
189      createAndStartFileWatcher(trustStorePath, latch, Duration.ofMillis(20));
190
191      // Replace keystore
192      x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
193
194      if (!latch.await(1, TimeUnit.SECONDS)) {
195        throw new AssertionError("Timed out waiting for truststore file to be changed");
196      }
197
198      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
199        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
200          newBlockingStub(client, rpcServer.getListenerAddress());
201        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
202        String message = "hello";
203        assertEquals(message,
204          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
205            .getMessage());
206        assertNull(pcrc.cellScanner());
207      }
208
209    } finally {
210      rpcServer.stop();
211    }
212  }
213
214  @TestTemplate
215  public void testReplaceClientAndServerKeystore() throws GeneralSecurityException, IOException,
216    OperatorCreationException, ServiceException, InterruptedException {
217    Configuration clientConf = new Configuration(CONF);
218    RpcServer rpcServer = createRpcServer("testRpcServer",
219      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
220      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
221
222    try {
223      rpcServer.start();
224
225      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
226        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
227          newBlockingStub(client, rpcServer.getListenerAddress());
228        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
229        String message = "hello";
230        assertEquals(message,
231          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
232            .getMessage());
233        assertNull(pcrc.cellScanner());
234
235        // truststore file change latch
236        final CountDownLatch latch = new CountDownLatch(1);
237
238        final Path trustStorePath = Paths.get(CONF.get(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION));
239        createAndStartFileWatcher(trustStorePath, latch, Duration.ofMillis(20));
240
241        // Replace keystore and cancel client connections
242        x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
243        client.cancelConnections(
244          ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L));
245
246        if (!latch.await(1, TimeUnit.SECONDS)) {
247          throw new AssertionError("Timed out waiting for truststore file to be changed");
248        }
249
250        assertEquals(message,
251          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
252            .getMessage());
253        assertNull(pcrc.cellScanner());
254      }
255    } finally {
256      rpcServer.stop();
257    }
258  }
259
260  private RpcServer createRpcServer(String name,
261    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
262    Configuration conf, RpcScheduler scheduler) throws IOException {
263    return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true);
264  }
265
266  private void createAndStartFileWatcher(Path trustStorePath, CountDownLatch latch,
267    Duration duration) throws IOException {
268    FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(trustStorePath,
269      Objects.toString(trustStorePath.getFileName()), duration, watchEventFilePath -> {
270        LOG.info("File " + watchEventFilePath.getFileName() + " has been changed.");
271        latch.countDown();
272      });
273    fileChangeWatcher.start();
274  }
275}