001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertThat;
025import static org.junit.Assert.fail;
026
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ThreadLocalRandom;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.MiscTests;
045import org.apache.hadoop.hdfs.DistributedFileSystem;
046import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
047import org.apache.hadoop.hdfs.server.datanode.DataNode;
048import org.apache.hadoop.ipc.RemoteException;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.io.netty.channel.Channel;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
064
065@Category({ MiscTests.class, MediumTests.class })
066public class TestFanOutOneBlockAsyncDFSOutput {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
073
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075
076  private static DistributedFileSystem FS;
077
078  private static EventLoopGroup EVENT_LOOP_GROUP;
079
080  private static Class<? extends Channel> CHANNEL_CLASS;
081
082  private static int READ_TIMEOUT_MS = 2000;
083
084  @Rule
085  public TestName name = new TestName();
086
087  @BeforeClass
088  public static void setUp() throws Exception {
089    TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
090    TEST_UTIL.startMiniDFSCluster(3);
091    FS = TEST_UTIL.getDFSCluster().getFileSystem();
092    EVENT_LOOP_GROUP = new NioEventLoopGroup();
093    CHANNEL_CLASS = NioSocketChannel.class;
094  }
095
096  @AfterClass
097  public static void tearDown() throws IOException, InterruptedException {
098    if (EVENT_LOOP_GROUP != null) {
099      EVENT_LOOP_GROUP.shutdownGracefully().sync();
100    }
101    TEST_UTIL.shutdownMiniDFSCluster();
102  }
103
104  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
105      throws IOException, InterruptedException, ExecutionException {
106    List<CompletableFuture<Long>> futures = new ArrayList<>();
107    byte[] b = new byte[10];
108    Random rand = new Random(12345);
109    // test pipelined flush
110    for (int i = 0; i < 10; i++) {
111      rand.nextBytes(b);
112      out.write(b);
113      futures.add(out.flush(false));
114      futures.add(out.flush(false));
115    }
116    for (int i = 0; i < 10; i++) {
117      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
118      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
119    }
120    out.close();
121    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
122    byte[] actual = new byte[b.length];
123    rand.setSeed(12345);
124    try (FSDataInputStream in = fs.open(f)) {
125      for (int i = 0; i < 10; i++) {
126        in.readFully(actual);
127        rand.nextBytes(b);
128        assertArrayEquals(b, actual);
129      }
130      assertEquals(-1, in.read());
131    }
132  }
133
134  @Test
135  public void test() throws IOException, InterruptedException, ExecutionException {
136    Path f = new Path("/" + name.getMethodName());
137    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
138    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
139      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
140    writeAndVerify(FS, f, out);
141  }
142
143  @Test
144  public void testRecover() throws IOException, InterruptedException, ExecutionException {
145    Path f = new Path("/" + name.getMethodName());
146    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
147    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
148      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
149    byte[] b = new byte[10];
150    ThreadLocalRandom.current().nextBytes(b);
151    out.write(b, 0, b.length);
152    out.flush(false).get();
153    // restart one datanode which causes one connection broken
154    TEST_UTIL.getDFSCluster().restartDataNode(0);
155    out.write(b, 0, b.length);
156    try {
157      out.flush(false).get();
158      fail("flush should fail");
159    } catch (ExecutionException e) {
160      // we restarted one datanode so the flush should fail
161      LOG.info("expected exception caught", e);
162    }
163    out.recoverAndClose(null);
164    assertEquals(b.length, FS.getFileStatus(f).getLen());
165    byte[] actual = new byte[b.length];
166    try (FSDataInputStream in = FS.open(f)) {
167      in.readFully(actual);
168    }
169    assertArrayEquals(b, actual);
170  }
171
172  @Test
173  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
174    Path f = new Path("/" + name.getMethodName());
175    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
176    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
177      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
178    Thread.sleep(READ_TIMEOUT_MS * 2);
179    // the connection to datanode should still alive.
180    writeAndVerify(FS, f, out);
181  }
182
183  /**
184   * This is important for fencing when recover from RS crash.
185   */
186  @Test
187  public void testCreateParentFailed() throws IOException {
188    Path f = new Path("/" + name.getMethodName() + "/test");
189    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
190    try {
191      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
192        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
193      fail("should fail with parent does not exist");
194    } catch (RemoteException e) {
195      LOG.info("expected exception caught", e);
196      assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
197    }
198  }
199
200  @Test
201  public void testConnectToDatanodeFailed()
202      throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
203      InvocationTargetException, InterruptedException, NoSuchFieldException {
204    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
205    xceiverServerDaemonField.setAccessible(true);
206    Class<?> xceiverServerClass =
207      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
208    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
209    numPeersMethod.setAccessible(true);
210    // make one datanode broken
211    DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0);
212    Path f = new Path("/test");
213    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
214    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
215      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
216      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
217      assertEquals(2, output.getPipeline().length);
218    } finally {
219      TEST_UTIL.getDFSCluster().restartDataNode(dnProp);
220    }
221  }
222
223  @Test
224  public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
225    Path f = new Path("/" + name.getMethodName());
226    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
227    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
228      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
229    byte[] b = new byte[50 * 1024 * 1024];
230    ThreadLocalRandom.current().nextBytes(b);
231    out.write(b);
232    out.flush(false);
233    assertEquals(b.length, out.flush(false).get().longValue());
234    out.close();
235    assertEquals(b.length, FS.getFileStatus(f).getLen());
236    byte[] actual = new byte[b.length];
237    try (FSDataInputStream in = FS.open(f)) {
238      in.readFully(actual);
239    }
240    assertArrayEquals(b, actual);
241  }
242}