001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.Mockito.mockConstruction; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.verifyNoMoreInteractions; 025 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.util.Optional; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.testclassification.MiscTests; 033import org.apache.hadoop.hdfs.DFSClient; 034import org.apache.hadoop.hdfs.DFSOutputStream; 035import org.apache.hadoop.hdfs.DistributedFileSystem; 036import org.apache.hadoop.hdfs.DummyDFSOutputStream; 037import org.junit.jupiter.api.AfterAll; 038import org.junit.jupiter.api.BeforeAll; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041import org.mockito.MockedConstruction; 042 043import org.apache.hbase.thirdparty.io.netty.channel.Channel; 044import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 046import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 047import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 048 049/** 050 * Make sure lease renewal works. Since it is in a background thread, normal read/write test can not 051 * verify it. 052 * <p> 053 * See HBASE-28955 for more details. 054 */ 055@Tag(MiscTests.TAG) 056@Tag(MediumTests.TAG) 057public class TestLeaseRenewal extends AsyncFSTestBase { 058 059 private static DistributedFileSystem FS; 060 private static EventLoopGroup EVENT_LOOP_GROUP; 061 private static Class<? extends Channel> CHANNEL_CLASS; 062 private static StreamSlowMonitor MONITOR; 063 064 @BeforeAll 065 public static void setUp() throws Exception { 066 startMiniDFSCluster(3); 067 FS = CLUSTER.getFileSystem(); 068 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 069 CHANNEL_CLASS = NioSocketChannel.class; 070 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 071 } 072 073 @AfterAll 074 public static void tearDown() throws Exception { 075 if (EVENT_LOOP_GROUP != null) { 076 EVENT_LOOP_GROUP.shutdownGracefully().get(); 077 } 078 shutdownMiniDFSCluster(); 079 } 080 081 private FanOutOneBlockAsyncDFSOutput create(String file) 082 throws IllegalArgumentException, IOException { 083 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 084 return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/test_lease_renew"), true, 085 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 086 } 087 088 @Test 089 public void testLeaseRenew() throws IOException { 090 DFSClient client = FS.getClient(); 091 assertFalse(client.renewLease()); 092 093 FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew"); 094 assertTrue(client.renewLease()); 095 client.closeAllFilesBeingWritten(false); 096 assertTrue(out.isClosed()); 097 098 assertFalse(client.renewLease()); 099 100 out = create("/test_lease_renew"); 101 assertTrue(client.renewLease()); 102 client.closeAllFilesBeingWritten(true); 103 assertTrue(out.isClosed()); 104 } 105 106 private Optional<Method> getUniqKeyMethod() { 107 try { 108 return Optional.of(DFSOutputStream.class.getMethod("getUniqKey")); 109 } catch (NoSuchMethodException e) { 110 // should be hadoop 3.3 or below 111 return Optional.empty(); 112 } 113 } 114 115 @Test 116 public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception { 117 try (MockedConstruction<DummyDFSOutputStream> mocked = 118 mockConstruction(DummyDFSOutputStream.class)) { 119 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_lease_renewal")) { 120 DummyDFSOutputStream dummy = mocked.constructed().get(0); 121 assertTrue(FS.getClient().renewLease()); 122 Optional<Method> getUniqKeyMethod = getUniqKeyMethod(); 123 if (getUniqKeyMethod.isPresent()) { 124 getUniqKeyMethod.get().invoke(verify(dummy)); 125 Method getNamespaceMethod = DFSOutputStream.class.getMethod("getNamespace"); 126 getNamespaceMethod.invoke(verify(dummy)); 127 } else { 128 verify(dummy).getFileId(); 129 } 130 verifyNoMoreInteractions(dummy); 131 } 132 } 133 } 134 135 private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception { 136 Optional<Method> getUniqKeyMethod = getUniqKeyMethod(); 137 if (getUniqKeyMethod.isPresent()) { 138 getUniqKeyMethod.get().invoke(verify(dummy)); 139 } else { 140 verify(dummy).getFileId(); 141 } 142 } 143 144 @Test 145 public void testEnsureMethodsCalledWhenClosing() throws Exception { 146 try (MockedConstruction<DummyDFSOutputStream> mocked = 147 mockConstruction(DummyDFSOutputStream.class)) { 148 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) { 149 DummyDFSOutputStream dummy = mocked.constructed().get(0); 150 verifyGetUniqKey(dummy); 151 FS.getClient().closeAllFilesBeingWritten(false); 152 verify(dummy).close(); 153 154 verifyNoMoreInteractions(dummy); 155 } 156 } 157 } 158 159 @Test 160 public void testEnsureMethodsCalledWhenAborting() throws Exception { 161 try (MockedConstruction<DummyDFSOutputStream> mocked = 162 mockConstruction(DummyDFSOutputStream.class)) { 163 try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) { 164 DummyDFSOutputStream dummy = mocked.constructed().get(0); 165 verifyGetUniqKey(dummy); 166 FS.getClient().closeAllFilesBeingWritten(true); 167 verify(dummy).abort(); 168 verifyNoMoreInteractions(dummy); 169 } 170 } 171 } 172}