001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.consume;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
022import static org.hamcrest.CoreMatchers.instanceOf;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.fail;
027
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.lang.reflect.InvocationTargetException;
032import java.lang.reflect.Method;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.MiscTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hdfs.DistributedFileSystem;
049import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
050import org.apache.hadoop.hdfs.server.datanode.DataNode;
051import org.apache.hadoop.ipc.RemoteException;
052import org.junit.AfterClass;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.FixMethodOrder;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.junit.runners.MethodSorters;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.io.netty.channel.Channel;
065import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
066import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
067import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
068import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
069
070@FixMethodOrder(MethodSorters.NAME_ASCENDING)
071@Category({ MiscTests.class, MediumTests.class })
072public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
079  private static DistributedFileSystem FS;
080  private static EventLoopGroup EVENT_LOOP_GROUP;
081  private static Class<? extends Channel> CHANNEL_CLASS;
082  private static int READ_TIMEOUT_MS = 2000;
083
084  private static StreamSlowMonitor MONITOR;
085
086  @Rule
087  public TestName name = new TestName();
088
089  @BeforeClass
090  public static void setUp() throws Exception {
091    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
092    startMiniDFSCluster(3);
093    FS = CLUSTER.getFileSystem();
094    EVENT_LOOP_GROUP = new NioEventLoopGroup();
095    CHANNEL_CLASS = NioSocketChannel.class;
096    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
097  }
098
099  @AfterClass
100  public static void tearDown() throws Exception {
101    if (EVENT_LOOP_GROUP != null) {
102      EVENT_LOOP_GROUP.shutdownGracefully().get();
103    }
104    shutdownMiniDFSCluster();
105  }
106
107  private static final Random RNG = new Random(); // This test depends on Random#setSeed
108
109  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
110    throws IOException, InterruptedException, ExecutionException {
111    List<CompletableFuture<Long>> futures = new ArrayList<>();
112    byte[] b = new byte[10];
113    // test pipelined flush
114    RNG.setSeed(12345);
115    for (int i = 0; i < 10; i++) {
116      RNG.nextBytes(b);
117      out.write(b);
118      futures.add(out.flush(false));
119      futures.add(out.flush(false));
120    }
121    for (int i = 0; i < 10; i++) {
122      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
123      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
124    }
125    out.close();
126    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
127    byte[] actual = new byte[b.length];
128    RNG.setSeed(12345);
129    try (FSDataInputStream in = fs.open(f)) {
130      for (int i = 0; i < 10; i++) {
131        in.readFully(actual);
132        RNG.nextBytes(b);
133        assertArrayEquals(b, actual);
134      }
135      assertEquals(-1, in.read());
136    }
137  }
138
139  @Test
140  public void test() throws IOException, InterruptedException, ExecutionException {
141    Path f = new Path("/" + name.getMethodName());
142    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
143    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
144      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
145    writeAndVerify(FS, f, out);
146  }
147
148  /**
149   * Test method has been renamed to be the first in NAME_ASCENDING. It's an ugly hack to avoid
150   * flakiness.
151   */
152  @Test
153  public void test0Recover() throws IOException, InterruptedException, ExecutionException {
154    Path f = new Path("/" + name.getMethodName());
155    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
156    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
157      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
158    byte[] b = new byte[10];
159    Bytes.random(b);
160    out.write(b, 0, b.length);
161    out.flush(false).get();
162    // restart one datanode which causes one connection broken
163    CLUSTER.restartDataNode(0);
164    out.write(b, 0, b.length);
165    try {
166      out.flush(false).get();
167      fail("flush should fail");
168    } catch (ExecutionException e) {
169      // we restarted one datanode so the flush should fail
170      LOG.info("expected exception caught", e);
171    }
172    out.recoverAndClose(null);
173    assertEquals(b.length, FS.getFileStatus(f).getLen());
174    byte[] actual = new byte[b.length];
175    try (FSDataInputStream in = FS.open(f)) {
176      in.readFully(actual);
177    }
178    assertArrayEquals(b, actual);
179  }
180
181  @Test
182  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
183    Path f = new Path("/" + name.getMethodName());
184    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
185    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
186      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
187    Thread.sleep(READ_TIMEOUT_MS * 2);
188    // the connection to datanode should still alive.
189    writeAndVerify(FS, f, out);
190  }
191
192  /**
193   * This is important for fencing when recover from RS crash.
194   */
195  @Test
196  public void testCreateParentFailed() throws IOException {
197    Path f = new Path("/" + name.getMethodName() + "/test");
198    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
199    try {
200      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
201        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
202      fail("should fail with parent does not exist");
203    } catch (RemoteException e) {
204      LOG.info("expected exception caught", e);
205      assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
206    }
207  }
208
209  @Test
210  public void testConnectToDatanodeFailed()
211    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
212    InvocationTargetException, InterruptedException, NoSuchFieldException {
213    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
214    xceiverServerDaemonField.setAccessible(true);
215    Class<?> xceiverServerClass =
216      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
217    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
218    numPeersMethod.setAccessible(true);
219    // make one datanode broken
220    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
221    Path f = new Path("/test");
222    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
223    try (FanOutOneBlockAsyncDFSOutput output =
224      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
225        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
226      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
227      assertEquals(2, output.getPipeline().length);
228    } finally {
229      CLUSTER.restartDataNode(dnProp);
230      CLUSTER.triggerBlockReports();
231    }
232  }
233
234  @Test
235  public void testExcludeFailedConnectToDatanode()
236    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
237    InvocationTargetException, InterruptedException, NoSuchFieldException {
238    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
239    xceiverServerDaemonField.setAccessible(true);
240    Class<?> xceiverServerClass =
241      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
242    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
243    numPeersMethod.setAccessible(true);
244    // make one datanode broken
245    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
246    Path f = new Path("/test");
247    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
248    ExcludeDatanodeManager excludeDatanodeManager =
249      new ExcludeDatanodeManager(HBaseConfiguration.create());
250    StreamSlowMonitor streamSlowDNsMonitor =
251      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
252    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
253    try (FanOutOneBlockAsyncDFSOutput output =
254      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
255        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
256      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
257      assertEquals(2, output.getPipeline().length);
258      assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
259    } finally {
260      CLUSTER.restartDataNode(dnProp);
261      CLUSTER.triggerBlockReports();
262    }
263  }
264
265  @Test
266  public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
267    Path f = new Path("/" + name.getMethodName());
268    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
269    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
270      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
271    byte[] b = new byte[50 * 1024 * 1024];
272    Bytes.random(b);
273    out.write(b);
274    consume(out.flush(false));
275    assertEquals(b.length, out.flush(false).get().longValue());
276    out.close();
277    assertEquals(b.length, FS.getFileStatus(f).getLen());
278    byte[] actual = new byte[b.length];
279    try (FSDataInputStream in = FS.open(f)) {
280      in.readFully(actual);
281    }
282    assertArrayEquals(b, actual);
283  }
284}