001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.fail; 026 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.ThreadLocalRandom; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.testclassification.MiscTests; 044import org.apache.hadoop.hdfs.DistributedFileSystem; 045import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 046import org.apache.hadoop.hdfs.server.datanode.DataNode; 047import org.apache.hadoop.ipc.RemoteException; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.io.netty.channel.Channel; 059import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 061import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 063 064@Category({ MiscTests.class, MediumTests.class }) 065public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); 072 073 private static DistributedFileSystem FS; 074 075 private static EventLoopGroup EVENT_LOOP_GROUP; 076 077 private static Class<? extends Channel> CHANNEL_CLASS; 078 079 private static int READ_TIMEOUT_MS = 2000; 080 081 @Rule 082 public TestName name = new TestName(); 083 084 @BeforeClass 085 public static void setUp() throws Exception { 086 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 087 startMiniDFSCluster(3); 088 FS = CLUSTER.getFileSystem(); 089 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 090 CHANNEL_CLASS = NioSocketChannel.class; 091 } 092 093 @AfterClass 094 public static void tearDown() throws IOException, InterruptedException { 095 if (EVENT_LOOP_GROUP != null) { 096 EVENT_LOOP_GROUP.shutdownGracefully().sync(); 097 } 098 shutdownMiniDFSCluster(); 099 } 100 101 static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) 102 throws IOException, InterruptedException, ExecutionException { 103 List<CompletableFuture<Long>> futures = new ArrayList<>(); 104 byte[] b = new byte[10]; 105 Random rand = new Random(12345); 106 // test pipelined flush 107 for (int i = 0; i < 10; i++) { 108 rand.nextBytes(b); 109 out.write(b); 110 futures.add(out.flush(false)); 111 futures.add(out.flush(false)); 112 } 113 for (int i = 0; i < 10; i++) { 114 assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue()); 115 assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); 116 } 117 out.close(); 118 assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); 119 byte[] actual = new byte[b.length]; 120 rand.setSeed(12345); 121 try (FSDataInputStream in = fs.open(f)) { 122 for (int i = 0; i < 10; i++) { 123 in.readFully(actual); 124 rand.nextBytes(b); 125 assertArrayEquals(b, actual); 126 } 127 assertEquals(-1, in.read()); 128 } 129 } 130 131 @Test 132 public void test() throws IOException, InterruptedException, ExecutionException { 133 Path f = new Path("/" + name.getMethodName()); 134 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 135 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 136 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 137 writeAndVerify(FS, f, out); 138 } 139 140 @Test 141 public void testRecover() throws IOException, InterruptedException, ExecutionException { 142 Path f = new Path("/" + name.getMethodName()); 143 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 144 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 145 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 146 byte[] b = new byte[10]; 147 ThreadLocalRandom.current().nextBytes(b); 148 out.write(b, 0, b.length); 149 out.flush(false).get(); 150 // restart one datanode which causes one connection broken 151 CLUSTER.restartDataNode(0); 152 out.write(b, 0, b.length); 153 try { 154 out.flush(false).get(); 155 fail("flush should fail"); 156 } catch (ExecutionException e) { 157 // we restarted one datanode so the flush should fail 158 LOG.info("expected exception caught", e); 159 } 160 out.recoverAndClose(null); 161 assertEquals(b.length, FS.getFileStatus(f).getLen()); 162 byte[] actual = new byte[b.length]; 163 try (FSDataInputStream in = FS.open(f)) { 164 in.readFully(actual); 165 } 166 assertArrayEquals(b, actual); 167 } 168 169 @Test 170 public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { 171 Path f = new Path("/" + name.getMethodName()); 172 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 173 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 174 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 175 Thread.sleep(READ_TIMEOUT_MS * 2); 176 // the connection to datanode should still alive. 177 writeAndVerify(FS, f, out); 178 } 179 180 /** 181 * This is important for fencing when recover from RS crash. 182 */ 183 @Test 184 public void testCreateParentFailed() throws IOException { 185 Path f = new Path("/" + name.getMethodName() + "/test"); 186 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 187 try { 188 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 189 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 190 fail("should fail with parent does not exist"); 191 } catch (RemoteException e) { 192 LOG.info("expected exception caught", e); 193 assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); 194 } 195 } 196 197 @Test 198 public void testConnectToDatanodeFailed() 199 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 200 InvocationTargetException, InterruptedException, NoSuchFieldException { 201 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 202 xceiverServerDaemonField.setAccessible(true); 203 Class<?> xceiverServerClass = 204 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 205 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 206 numPeersMethod.setAccessible(true); 207 // make one datanode broken 208 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 209 Path f = new Path("/test"); 210 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 211 try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, 212 f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { 213 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 214 assertEquals(2, output.getPipeline().length); 215 } finally { 216 CLUSTER.restartDataNode(dnProp); 217 } 218 } 219 220 @Test 221 public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { 222 Path f = new Path("/" + name.getMethodName()); 223 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 224 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 225 false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); 226 byte[] b = new byte[50 * 1024 * 1024]; 227 ThreadLocalRandom.current().nextBytes(b); 228 out.write(b); 229 out.flush(false); 230 assertEquals(b.length, out.flush(false).get().longValue()); 231 out.close(); 232 assertEquals(b.length, FS.getFileStatus(f).getLen()); 233 byte[] actual = new byte[b.length]; 234 try (FSDataInputStream in = FS.open(f)) { 235 in.readFully(actual); 236 } 237 assertArrayEquals(b, actual); 238 } 239}