001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.Mockito.mockConstruction;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.verifyNoMoreInteractions;
025
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.util.Optional;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.testclassification.MiscTests;
033import org.apache.hadoop.hdfs.DFSClient;
034import org.apache.hadoop.hdfs.DFSOutputStream;
035import org.apache.hadoop.hdfs.DistributedFileSystem;
036import org.apache.hadoop.hdfs.DummyDFSOutputStream;
037import org.junit.jupiter.api.AfterAll;
038import org.junit.jupiter.api.BeforeAll;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041import org.mockito.MockedConstruction;
042
043import org.apache.hbase.thirdparty.io.netty.channel.Channel;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
045import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
046import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
047import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
048
049/**
050 * Make sure lease renewal works. Since it is in a background thread, normal read/write test can not
051 * verify it.
052 * <p>
053 * See HBASE-28955 for more details.
054 */
055@Tag(MiscTests.TAG)
056@Tag(MediumTests.TAG)
057public class TestLeaseRenewal extends AsyncFSTestBase {
058
059  private static DistributedFileSystem FS;
060  private static EventLoopGroup EVENT_LOOP_GROUP;
061  private static Class<? extends Channel> CHANNEL_CLASS;
062  private static StreamSlowMonitor MONITOR;
063
064  @BeforeAll
065  public static void setUp() throws Exception {
066    startMiniDFSCluster(3);
067    FS = CLUSTER.getFileSystem();
068    EVENT_LOOP_GROUP = new NioEventLoopGroup();
069    CHANNEL_CLASS = NioSocketChannel.class;
070    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
071  }
072
073  @AfterAll
074  public static void tearDown() throws Exception {
075    if (EVENT_LOOP_GROUP != null) {
076      EVENT_LOOP_GROUP.shutdownGracefully().get();
077    }
078    shutdownMiniDFSCluster();
079  }
080
081  private FanOutOneBlockAsyncDFSOutput create(String file)
082    throws IllegalArgumentException, IOException {
083    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
084    return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/test_lease_renew"), true,
085      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
086  }
087
088  @Test
089  public void testLeaseRenew() throws IOException {
090    DFSClient client = FS.getClient();
091    assertFalse(client.renewLease());
092
093    FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew");
094    assertTrue(client.renewLease());
095    client.closeAllFilesBeingWritten(false);
096    assertTrue(out.isClosed());
097
098    assertFalse(client.renewLease());
099
100    out = create("/test_lease_renew");
101    assertTrue(client.renewLease());
102    client.closeAllFilesBeingWritten(true);
103    assertTrue(out.isClosed());
104  }
105
106  private Optional<Method> getUniqKeyMethod() {
107    try {
108      return Optional.of(DFSOutputStream.class.getMethod("getUniqKey"));
109    } catch (NoSuchMethodException e) {
110      // should be hadoop 3.3 or below
111      return Optional.empty();
112    }
113  }
114
115  @Test
116  public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception {
117    try (MockedConstruction<DummyDFSOutputStream> mocked =
118      mockConstruction(DummyDFSOutputStream.class)) {
119      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_lease_renewal")) {
120        DummyDFSOutputStream dummy = mocked.constructed().get(0);
121        assertTrue(FS.getClient().renewLease());
122        Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
123        if (getUniqKeyMethod.isPresent()) {
124          getUniqKeyMethod.get().invoke(verify(dummy));
125          Method getNamespaceMethod = DFSOutputStream.class.getMethod("getNamespace");
126          getNamespaceMethod.invoke(verify(dummy));
127        } else {
128          verify(dummy).getFileId();
129        }
130        verifyNoMoreInteractions(dummy);
131      }
132    }
133  }
134
135  private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception {
136    Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
137    if (getUniqKeyMethod.isPresent()) {
138      getUniqKeyMethod.get().invoke(verify(dummy));
139    } else {
140      verify(dummy).getFileId();
141    }
142  }
143
144  @Test
145  public void testEnsureMethodsCalledWhenClosing() throws Exception {
146    try (MockedConstruction<DummyDFSOutputStream> mocked =
147      mockConstruction(DummyDFSOutputStream.class)) {
148      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) {
149        DummyDFSOutputStream dummy = mocked.constructed().get(0);
150        verifyGetUniqKey(dummy);
151        FS.getClient().closeAllFilesBeingWritten(false);
152        verify(dummy).close();
153
154        verifyNoMoreInteractions(dummy);
155      }
156    }
157  }
158
159  @Test
160  public void testEnsureMethodsCalledWhenAborting() throws Exception {
161    try (MockedConstruction<DummyDFSOutputStream> mocked =
162      mockConstruction(DummyDFSOutputStream.class)) {
163      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) {
164        DummyDFSOutputStream dummy = mocked.constructed().get(0);
165        verifyGetUniqKey(dummy);
166        FS.getClient().closeAllFilesBeingWritten(true);
167        verify(dummy).abort();
168        verifyNoMoreInteractions(dummy);
169      }
170    }
171  }
172}