001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.fail;
026
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ThreadLocalRandom;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.testclassification.MiscTests;
044import org.apache.hadoop.hdfs.DistributedFileSystem;
045import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
046import org.apache.hadoop.hdfs.server.datanode.DataNode;
047import org.apache.hadoop.ipc.RemoteException;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.io.netty.channel.Channel;
059import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
062import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
063
064@Category({ MiscTests.class, MediumTests.class })
065public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
072
073  private static DistributedFileSystem FS;
074
075  private static EventLoopGroup EVENT_LOOP_GROUP;
076
077  private static Class<? extends Channel> CHANNEL_CLASS;
078
079  private static int READ_TIMEOUT_MS = 2000;
080
081  @Rule
082  public TestName name = new TestName();
083
084  @BeforeClass
085  public static void setUp() throws Exception {
086    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
087    startMiniDFSCluster(3);
088    FS = CLUSTER.getFileSystem();
089    EVENT_LOOP_GROUP = new NioEventLoopGroup();
090    CHANNEL_CLASS = NioSocketChannel.class;
091  }
092
093  @AfterClass
094  public static void tearDown() throws IOException, InterruptedException {
095    if (EVENT_LOOP_GROUP != null) {
096      EVENT_LOOP_GROUP.shutdownGracefully().sync();
097    }
098    shutdownMiniDFSCluster();
099  }
100
101  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
102    throws IOException, InterruptedException, ExecutionException {
103    List<CompletableFuture<Long>> futures = new ArrayList<>();
104    byte[] b = new byte[10];
105    Random rand = new Random(12345);
106    // test pipelined flush
107    for (int i = 0; i < 10; i++) {
108      rand.nextBytes(b);
109      out.write(b);
110      futures.add(out.flush(false));
111      futures.add(out.flush(false));
112    }
113    for (int i = 0; i < 10; i++) {
114      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
115      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
116    }
117    out.close();
118    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
119    byte[] actual = new byte[b.length];
120    rand.setSeed(12345);
121    try (FSDataInputStream in = fs.open(f)) {
122      for (int i = 0; i < 10; i++) {
123        in.readFully(actual);
124        rand.nextBytes(b);
125        assertArrayEquals(b, actual);
126      }
127      assertEquals(-1, in.read());
128    }
129  }
130
131  @Test
132  public void test() throws IOException, InterruptedException, ExecutionException {
133    Path f = new Path("/" + name.getMethodName());
134    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
135    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
136      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
137    writeAndVerify(FS, f, out);
138  }
139
140  @Test
141  public void testRecover() throws IOException, InterruptedException, ExecutionException {
142    Path f = new Path("/" + name.getMethodName());
143    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
144    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
145      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
146    byte[] b = new byte[10];
147    ThreadLocalRandom.current().nextBytes(b);
148    out.write(b, 0, b.length);
149    out.flush(false).get();
150    // restart one datanode which causes one connection broken
151    CLUSTER.restartDataNode(0);
152    out.write(b, 0, b.length);
153    try {
154      out.flush(false).get();
155      fail("flush should fail");
156    } catch (ExecutionException e) {
157      // we restarted one datanode so the flush should fail
158      LOG.info("expected exception caught", e);
159    }
160    out.recoverAndClose(null);
161    assertEquals(b.length, FS.getFileStatus(f).getLen());
162    byte[] actual = new byte[b.length];
163    try (FSDataInputStream in = FS.open(f)) {
164      in.readFully(actual);
165    }
166    assertArrayEquals(b, actual);
167  }
168
169  @Test
170  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
171    Path f = new Path("/" + name.getMethodName());
172    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
173    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
174      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
175    Thread.sleep(READ_TIMEOUT_MS * 2);
176    // the connection to datanode should still alive.
177    writeAndVerify(FS, f, out);
178  }
179
180  /**
181   * This is important for fencing when recover from RS crash.
182   */
183  @Test
184  public void testCreateParentFailed() throws IOException {
185    Path f = new Path("/" + name.getMethodName() + "/test");
186    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
187    try {
188      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
189        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
190      fail("should fail with parent does not exist");
191    } catch (RemoteException e) {
192      LOG.info("expected exception caught", e);
193      assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
194    }
195  }
196
197  @Test
198  public void testConnectToDatanodeFailed()
199    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
200    InvocationTargetException, InterruptedException, NoSuchFieldException {
201    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
202    xceiverServerDaemonField.setAccessible(true);
203    Class<?> xceiverServerClass =
204      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
205    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
206    numPeersMethod.setAccessible(true);
207    // make one datanode broken
208    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
209    Path f = new Path("/test");
210    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
211    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
212      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
213      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
214      assertEquals(2, output.getPipeline().length);
215    } finally {
216      CLUSTER.restartDataNode(dnProp);
217    }
218  }
219
220  @Test
221  public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
222    Path f = new Path("/" + name.getMethodName());
223    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
224    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
225      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
226    byte[] b = new byte[50 * 1024 * 1024];
227    ThreadLocalRandom.current().nextBytes(b);
228    out.write(b);
229    out.flush(false);
230    assertEquals(b.length, out.flush(false).get().longValue());
231    out.close();
232    assertEquals(b.length, FS.getFileStatus(f).getLen());
233    byte[] actual = new byte[b.length];
234    try (FSDataInputStream in = FS.open(f)) {
235      in.readFully(actual);
236    }
237    assertArrayEquals(b, actual);
238  }
239}