001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.consume;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
022import static org.hamcrest.CoreMatchers.instanceOf;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.fail;
027
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.lang.reflect.InvocationTargetException;
032import java.lang.reflect.Method;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.MiscTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hdfs.DistributedFileSystem;
049import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
050import org.apache.hadoop.hdfs.server.datanode.DataNode;
051import org.apache.hadoop.ipc.RemoteException;
052import org.junit.AfterClass;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.io.netty.channel.Channel;
063import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
064import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
065import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
066import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
067
068@Category({ MiscTests.class, MediumTests.class })
069public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
076  private static DistributedFileSystem FS;
077  private static EventLoopGroup EVENT_LOOP_GROUP;
078  private static Class<? extends Channel> CHANNEL_CLASS;
079  private static int READ_TIMEOUT_MS = 2000;
080
081  private static StreamSlowMonitor MONITOR;
082
083  @Rule
084  public TestName name = new TestName();
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
089    startMiniDFSCluster(3);
090    FS = CLUSTER.getFileSystem();
091    EVENT_LOOP_GROUP = new NioEventLoopGroup();
092    CHANNEL_CLASS = NioSocketChannel.class;
093    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
094  }
095
096  @AfterClass
097  public static void tearDown() throws Exception {
098    if (EVENT_LOOP_GROUP != null) {
099      EVENT_LOOP_GROUP.shutdownGracefully().get();
100    }
101    shutdownMiniDFSCluster();
102  }
103
104  private static final Random RNG = new Random(); // This test depends on Random#setSeed
105
106  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
107    throws IOException, InterruptedException, ExecutionException {
108    List<CompletableFuture<Long>> futures = new ArrayList<>();
109    byte[] b = new byte[10];
110    // test pipelined flush
111    RNG.setSeed(12345);
112    for (int i = 0; i < 10; i++) {
113      RNG.nextBytes(b);
114      out.write(b);
115      futures.add(out.flush(false));
116      futures.add(out.flush(false));
117    }
118    for (int i = 0; i < 10; i++) {
119      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
120      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
121    }
122    out.close();
123    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
124    byte[] actual = new byte[b.length];
125    RNG.setSeed(12345);
126    try (FSDataInputStream in = fs.open(f)) {
127      for (int i = 0; i < 10; i++) {
128        in.readFully(actual);
129        RNG.nextBytes(b);
130        assertArrayEquals(b, actual);
131      }
132      assertEquals(-1, in.read());
133    }
134  }
135
136  @Test
137  public void test() throws IOException, InterruptedException, ExecutionException {
138    Path f = new Path("/" + name.getMethodName());
139    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
140    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
141      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
142    writeAndVerify(FS, f, out);
143  }
144
145  @Test
146  public void testRecover() throws IOException, InterruptedException, ExecutionException {
147    Path f = new Path("/" + name.getMethodName());
148    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
149    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
150      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
151    byte[] b = new byte[10];
152    Bytes.random(b);
153    out.write(b, 0, b.length);
154    out.flush(false).get();
155    // restart one datanode which causes one connection broken
156    CLUSTER.restartDataNode(0);
157    out.write(b, 0, b.length);
158    try {
159      out.flush(false).get();
160      fail("flush should fail");
161    } catch (ExecutionException e) {
162      // we restarted one datanode so the flush should fail
163      LOG.info("expected exception caught", e);
164    }
165    out.recoverAndClose(null);
166    assertEquals(b.length, FS.getFileStatus(f).getLen());
167    byte[] actual = new byte[b.length];
168    try (FSDataInputStream in = FS.open(f)) {
169      in.readFully(actual);
170    }
171    assertArrayEquals(b, actual);
172  }
173
174  @Test
175  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
176    Path f = new Path("/" + name.getMethodName());
177    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
178    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
179      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
180    Thread.sleep(READ_TIMEOUT_MS * 2);
181    // the connection to datanode should still alive.
182    writeAndVerify(FS, f, out);
183  }
184
185  /**
186   * This is important for fencing when recover from RS crash.
187   */
188  @Test
189  public void testCreateParentFailed() throws IOException {
190    Path f = new Path("/" + name.getMethodName() + "/test");
191    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
192    try {
193      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
194        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
195      fail("should fail with parent does not exist");
196    } catch (RemoteException e) {
197      LOG.info("expected exception caught", e);
198      assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
199    }
200  }
201
202  @Test
203  public void testConnectToDatanodeFailed()
204    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
205    InvocationTargetException, InterruptedException, NoSuchFieldException {
206    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
207    xceiverServerDaemonField.setAccessible(true);
208    Class<?> xceiverServerClass =
209      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
210    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
211    numPeersMethod.setAccessible(true);
212    // make one datanode broken
213    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
214    Path f = new Path("/test");
215    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
216    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
217      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
218      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
219      assertEquals(2, output.getPipeline().length);
220    } finally {
221      CLUSTER.restartDataNode(dnProp);
222      CLUSTER.triggerBlockReports();
223    }
224  }
225
226  @Test
227  public void testExcludeFailedConnectToDatanode()
228    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
229    InvocationTargetException, InterruptedException, NoSuchFieldException {
230    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
231    xceiverServerDaemonField.setAccessible(true);
232    Class<?> xceiverServerClass =
233      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
234    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
235    numPeersMethod.setAccessible(true);
236    // make one datanode broken
237    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
238    Path f = new Path("/test");
239    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
240    ExcludeDatanodeManager excludeDatanodeManager =
241      new ExcludeDatanodeManager(HBaseConfiguration.create());
242    StreamSlowMonitor streamSlowDNsMonitor =
243      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
244    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
245    try (FanOutOneBlockAsyncDFSOutput output =
246      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
247        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) {
248      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
249      assertEquals(2, output.getPipeline().length);
250      assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
251    } finally {
252      CLUSTER.restartDataNode(dnProp);
253      CLUSTER.triggerBlockReports();
254    }
255  }
256
257  @Test
258  public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
259    Path f = new Path("/" + name.getMethodName());
260    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
261    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
262      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
263    byte[] b = new byte[50 * 1024 * 1024];
264    Bytes.random(b);
265    out.write(b);
266    consume(out.flush(false));
267    assertEquals(b.length, out.flush(false).get().longValue());
268    out.close();
269    assertEquals(b.length, FS.getFileStatus(f).getLen());
270    byte[] actual = new byte[b.length];
271    try (FSDataInputStream in = FS.open(f)) {
272      in.readFully(actual);
273    }
274    assertArrayEquals(b, actual);
275  }
276}