001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.consume;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
022import static org.hamcrest.CoreMatchers.instanceOf;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.jupiter.api.Assertions.assertArrayEquals;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertThrows;
027
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.lang.reflect.InvocationTargetException;
032import java.lang.reflect.Method;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.MiscTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hdfs.DistributedFileSystem;
048import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
049import org.apache.hadoop.hdfs.server.datanode.DataNode;
050import org.apache.hadoop.ipc.RemoteException;
051import org.junit.jupiter.api.AfterAll;
052import org.junit.jupiter.api.BeforeAll;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056import org.junit.jupiter.api.TestInfo;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.io.netty.channel.Channel;
061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
062import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
063import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
064import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
065
066@Tag(MiscTests.TAG)
067@Tag(MediumTests.TAG)
068public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
071  private static DistributedFileSystem FS;
072  private static EventLoopGroup EVENT_LOOP_GROUP;
073  private static Class<? extends Channel> CHANNEL_CLASS;
074  private static int READ_TIMEOUT_MS = 2000;
075
076  private static StreamSlowMonitor MONITOR;
077
078  private Path file;
079
080  @BeforeAll
081  public static void setUp() throws Exception {
082    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
083    startMiniDFSCluster(3);
084    FS = CLUSTER.getFileSystem();
085    EVENT_LOOP_GROUP = new NioEventLoopGroup();
086    CHANNEL_CLASS = NioSocketChannel.class;
087    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
088  }
089
090  @AfterAll
091  public static void tearDown() throws Exception {
092    if (EVENT_LOOP_GROUP != null) {
093      EVENT_LOOP_GROUP.shutdownGracefully().get();
094    }
095    shutdownMiniDFSCluster();
096  }
097
098  private static final Random RNG = new Random(); // This test depends on Random#setSeed
099
100  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
101    throws IOException, InterruptedException, ExecutionException {
102    List<CompletableFuture<Long>> futures = new ArrayList<>();
103    byte[] b = new byte[10];
104    // test pipelined flush
105    RNG.setSeed(12345);
106    for (int i = 0; i < 10; i++) {
107      RNG.nextBytes(b);
108      out.write(b);
109      futures.add(out.flush(false));
110      futures.add(out.flush(false));
111    }
112    for (int i = 0; i < 10; i++) {
113      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
114      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
115    }
116    out.close();
117    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
118    byte[] actual = new byte[b.length];
119    RNG.setSeed(12345);
120    try (FSDataInputStream in = fs.open(f)) {
121      for (int i = 0; i < 10; i++) {
122        in.readFully(actual);
123        RNG.nextBytes(b);
124        assertArrayEquals(b, actual);
125      }
126      assertEquals(-1, in.read());
127    }
128  }
129
130  @BeforeEach
131  public void setUpBeforeEach(TestInfo testInfo) {
132    file = new Path("/" + testInfo.getTestMethod().get().getName());
133  }
134
135  @Test
136  public void test() throws IOException, InterruptedException, ExecutionException {
137    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
138    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
139      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
140    writeAndVerify(FS, file, out);
141  }
142
143  /**
144   * Test method has been renamed to be the first in NAME_ASCENDING. It's an ugly hack to avoid
145   * flakiness.
146   */
147  @Test
148  public void test0Recover() throws IOException, InterruptedException, ExecutionException {
149    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
150    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
151      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
152    byte[] b = new byte[10];
153    Bytes.random(b);
154    out.write(b, 0, b.length);
155    out.flush(false).get();
156    // restart one datanode which causes one connection broken
157    CLUSTER.restartDataNode(0);
158    out.write(b, 0, b.length);
159    ExecutionException e = assertThrows(ExecutionException.class, () -> out.flush(false).get());
160    LOG.info("expected exception caught", e);
161    out.recoverAndClose(null);
162    assertEquals(b.length, FS.getFileStatus(file).getLen());
163    byte[] actual = new byte[b.length];
164    try (FSDataInputStream in = FS.open(file)) {
165      in.readFully(actual);
166    }
167    assertArrayEquals(b, actual);
168  }
169
170  @Test
171  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
172    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
173    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
174      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
175    Thread.sleep(READ_TIMEOUT_MS * 2);
176    // the connection to datanode should still alive.
177    writeAndVerify(FS, file, out);
178  }
179
180  /**
181   * This is important for fencing when recover from RS crash.
182   */
183  @Test
184  public void testCreateParentFailed() throws IOException {
185    Path f = new Path(file, "test");
186    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
187    RemoteException e = assertThrows(RemoteException.class,
188      () -> FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
189        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true));
190    LOG.info("expected exception caught", e);
191    assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
192  }
193
194  @Test
195  public void testConnectToDatanodeFailed()
196    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
197    InvocationTargetException, InterruptedException, NoSuchFieldException {
198    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
199    xceiverServerDaemonField.setAccessible(true);
200    Class<?> xceiverServerClass =
201      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
202    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
203    numPeersMethod.setAccessible(true);
204    // make one datanode broken
205    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
206    Path f = new Path("/test");
207    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
208    try (FanOutOneBlockAsyncDFSOutput output =
209      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
210        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
211      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
212      assertEquals(2, output.getPipeline().length);
213    } finally {
214      CLUSTER.restartDataNode(dnProp);
215      CLUSTER.triggerBlockReports();
216    }
217  }
218
219  @Test
220  public void testExcludeFailedConnectToDatanode()
221    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
222    InvocationTargetException, InterruptedException, NoSuchFieldException {
223    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
224    xceiverServerDaemonField.setAccessible(true);
225    Class<?> xceiverServerClass =
226      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
227    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
228    numPeersMethod.setAccessible(true);
229    // make one datanode broken
230    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
231    Path f = new Path("/test");
232    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
233    ExcludeDatanodeManager excludeDatanodeManager =
234      new ExcludeDatanodeManager(HBaseConfiguration.create());
235    StreamSlowMonitor streamSlowDNsMonitor =
236      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
237    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
238    try (FanOutOneBlockAsyncDFSOutput output =
239      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
240        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
241      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
242      assertEquals(2, output.getPipeline().length);
243      assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
244    } finally {
245      CLUSTER.restartDataNode(dnProp);
246      CLUSTER.triggerBlockReports();
247    }
248  }
249
250  @Test
251  public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
252    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
253    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
254      true, false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
255    byte[] b = new byte[50 * 1024 * 1024];
256    Bytes.random(b);
257    out.write(b);
258    consume(out.flush(false));
259    assertEquals(b.length, out.flush(false).get().longValue());
260    out.close();
261    assertEquals(b.length, FS.getFileStatus(file).getLen());
262    byte[] actual = new byte[b.length];
263    try (FSDataInputStream in = FS.open(file)) {
264      in.readFully(actual);
265    }
266    assertArrayEquals(b, actual);
267  }
268}