001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.fail; 026 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.ThreadLocalRandom; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.MiscTests; 045import org.apache.hadoop.hdfs.DistributedFileSystem; 046import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 047import org.apache.hadoop.hdfs.server.datanode.DataNode; 048import org.apache.hadoop.ipc.RemoteException; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.io.netty.channel.Channel; 060import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 062import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 063import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 064 065@Category({ MiscTests.class, MediumTests.class }) 066public class TestFanOutOneBlockAsyncDFSOutput { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); 073 074 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 076 private static DistributedFileSystem FS; 077 078 private static EventLoopGroup EVENT_LOOP_GROUP; 079 080 private static Class<? extends Channel> CHANNEL_CLASS; 081 082 private static int READ_TIMEOUT_MS = 2000; 083 084 @Rule 085 public TestName name = new TestName(); 086 087 @BeforeClass 088 public static void setUp() throws Exception { 089 TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 090 TEST_UTIL.startMiniDFSCluster(3); 091 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 092 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 093 CHANNEL_CLASS = NioSocketChannel.class; 094 } 095 096 @AfterClass 097 public static void tearDown() throws IOException, InterruptedException { 098 if (EVENT_LOOP_GROUP != null) { 099 EVENT_LOOP_GROUP.shutdownGracefully().sync(); 100 } 101 TEST_UTIL.shutdownMiniDFSCluster(); 102 } 103 104 static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) 105 throws IOException, InterruptedException, ExecutionException { 106 List<CompletableFuture<Long>> futures = new ArrayList<>(); 107 byte[] b = new byte[10]; 108 Random rand = new Random(12345); 109 // test pipelined flush 110 for (int i = 0; i < 10; i++) { 111 rand.nextBytes(b); 112 out.write(b); 113 futures.add(out.flush(false)); 114 futures.add(out.flush(false)); 115 } 116 for (int i = 0; i < 10; i++) { 117 assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue()); 118 assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); 119 } 120 out.close(); 121 assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); 122 byte[] actual = new byte[b.length]; 123 rand.setSeed(12345); 124 try (FSDataInputStream in = fs.open(f)) { 125 for (int i = 0; i < 10; i++) { 126 in.readFully(actual); 127 rand.nextBytes(b); 128 assertArrayEquals(b, actual); 129 } 130 assertEquals(-1, in.read()); 131 } 132 } 133 134 @Test 135 public void test() throws IOException, InterruptedException, ExecutionException { 136 Path f = new Path("/" + name.getMethodName()); 137 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 138 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 139 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 140 writeAndVerify(FS, f, out); 141 } 142 143 @Test 144 public void testRecover() throws IOException, InterruptedException, ExecutionException { 145 Path f = new Path("/" + name.getMethodName()); 146 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 147 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 148 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 149 byte[] b = new byte[10]; 150 ThreadLocalRandom.current().nextBytes(b); 151 out.write(b, 0, b.length); 152 out.flush(false).get(); 153 // restart one datanode which causes one connection broken 154 TEST_UTIL.getDFSCluster().restartDataNode(0); 155 out.write(b, 0, b.length); 156 try { 157 out.flush(false).get(); 158 fail("flush should fail"); 159 } catch (ExecutionException e) { 160 // we restarted one datanode so the flush should fail 161 LOG.info("expected exception caught", e); 162 } 163 out.recoverAndClose(null); 164 assertEquals(b.length, FS.getFileStatus(f).getLen()); 165 byte[] actual = new byte[b.length]; 166 try (FSDataInputStream in = FS.open(f)) { 167 in.readFully(actual); 168 } 169 assertArrayEquals(b, actual); 170 } 171 172 @Test 173 public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { 174 Path f = new Path("/" + name.getMethodName()); 175 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 176 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 177 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 178 Thread.sleep(READ_TIMEOUT_MS * 2); 179 // the connection to datanode should still alive. 180 writeAndVerify(FS, f, out); 181 } 182 183 /** 184 * This is important for fencing when recover from RS crash. 185 */ 186 @Test 187 public void testCreateParentFailed() throws IOException { 188 Path f = new Path("/" + name.getMethodName() + "/test"); 189 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 190 try { 191 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 192 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 193 fail("should fail with parent does not exist"); 194 } catch (RemoteException e) { 195 LOG.info("expected exception caught", e); 196 assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); 197 } 198 } 199 200 @Test 201 public void testConnectToDatanodeFailed() 202 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 203 InvocationTargetException, InterruptedException, NoSuchFieldException { 204 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 205 xceiverServerDaemonField.setAccessible(true); 206 Class<?> xceiverServerClass = 207 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 208 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 209 numPeersMethod.setAccessible(true); 210 // make one datanode broken 211 DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0); 212 Path f = new Path("/test"); 213 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 214 try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, 215 f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { 216 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 217 assertEquals(2, output.getPipeline().length); 218 } finally { 219 TEST_UTIL.getDFSCluster().restartDataNode(dnProp); 220 } 221 } 222 223 @Test 224 public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { 225 Path f = new Path("/" + name.getMethodName()); 226 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 227 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 228 false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); 229 byte[] b = new byte[50 * 1024 * 1024]; 230 ThreadLocalRandom.current().nextBytes(b); 231 out.write(b); 232 out.flush(false); 233 assertEquals(b.length, out.flush(false).get().longValue()); 234 out.close(); 235 assertEquals(b.length, FS.getFileStatus(f).getLen()); 236 byte[] actual = new byte[b.length]; 237 try (FSDataInputStream in = FS.open(f)) { 238 in.readFully(actual); 239 } 240 assertArrayEquals(b, actual); 241 } 242}