001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.consume; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 022import static org.hamcrest.CoreMatchers.instanceOf; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.jupiter.api.Assertions.assertArrayEquals; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertThrows; 027 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.lang.reflect.InvocationTargetException; 032import java.lang.reflect.Method; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.MiscTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hdfs.DistributedFileSystem; 048import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 049import org.apache.hadoop.hdfs.server.datanode.DataNode; 050import org.apache.hadoop.ipc.RemoteException; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.junit.jupiter.api.TestInfo; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.io.netty.channel.Channel; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 062import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 063import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 064import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 065 066@Tag(MiscTests.TAG) 067@Tag(MediumTests.TAG) 068public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); 071 private static DistributedFileSystem FS; 072 private static EventLoopGroup EVENT_LOOP_GROUP; 073 private static Class<? extends Channel> CHANNEL_CLASS; 074 private static int READ_TIMEOUT_MS = 2000; 075 076 private static StreamSlowMonitor MONITOR; 077 078 private Path file; 079 080 @BeforeAll 081 public static void setUp() throws Exception { 082 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 083 startMiniDFSCluster(3); 084 FS = CLUSTER.getFileSystem(); 085 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 086 CHANNEL_CLASS = NioSocketChannel.class; 087 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 088 } 089 090 @AfterAll 091 public static void tearDown() throws Exception { 092 if (EVENT_LOOP_GROUP != null) { 093 EVENT_LOOP_GROUP.shutdownGracefully().get(); 094 } 095 shutdownMiniDFSCluster(); 096 } 097 098 private static final Random RNG = new Random(); // This test depends on Random#setSeed 099 100 static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) 101 throws IOException, InterruptedException, ExecutionException { 102 List<CompletableFuture<Long>> futures = new ArrayList<>(); 103 byte[] b = new byte[10]; 104 // test pipelined flush 105 RNG.setSeed(12345); 106 for (int i = 0; i < 10; i++) { 107 RNG.nextBytes(b); 108 out.write(b); 109 futures.add(out.flush(false)); 110 futures.add(out.flush(false)); 111 } 112 for (int i = 0; i < 10; i++) { 113 assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue()); 114 assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); 115 } 116 out.close(); 117 assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); 118 byte[] actual = new byte[b.length]; 119 RNG.setSeed(12345); 120 try (FSDataInputStream in = fs.open(f)) { 121 for (int i = 0; i < 10; i++) { 122 in.readFully(actual); 123 RNG.nextBytes(b); 124 assertArrayEquals(b, actual); 125 } 126 assertEquals(-1, in.read()); 127 } 128 } 129 130 @BeforeEach 131 public void setUpBeforeEach(TestInfo testInfo) { 132 file = new Path("/" + testInfo.getTestMethod().get().getName()); 133 } 134 135 @Test 136 public void test() throws IOException, InterruptedException, ExecutionException { 137 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 138 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 139 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 140 writeAndVerify(FS, file, out); 141 } 142 143 /** 144 * Test method has been renamed to be the first in NAME_ASCENDING. It's an ugly hack to avoid 145 * flakiness. 146 */ 147 @Test 148 public void test0Recover() throws IOException, InterruptedException, ExecutionException { 149 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 150 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 151 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 152 byte[] b = new byte[10]; 153 Bytes.random(b); 154 out.write(b, 0, b.length); 155 out.flush(false).get(); 156 // restart one datanode which causes one connection broken 157 CLUSTER.restartDataNode(0); 158 out.write(b, 0, b.length); 159 ExecutionException e = assertThrows(ExecutionException.class, () -> out.flush(false).get()); 160 LOG.info("expected exception caught", e); 161 out.recoverAndClose(null); 162 assertEquals(b.length, FS.getFileStatus(file).getLen()); 163 byte[] actual = new byte[b.length]; 164 try (FSDataInputStream in = FS.open(file)) { 165 in.readFully(actual); 166 } 167 assertArrayEquals(b, actual); 168 } 169 170 @Test 171 public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { 172 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 173 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 174 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 175 Thread.sleep(READ_TIMEOUT_MS * 2); 176 // the connection to datanode should still alive. 177 writeAndVerify(FS, file, out); 178 } 179 180 /** 181 * This is important for fencing when recover from RS crash. 182 */ 183 @Test 184 public void testCreateParentFailed() throws IOException { 185 Path f = new Path(file, "test"); 186 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 187 RemoteException e = assertThrows(RemoteException.class, 188 () -> FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 189 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)); 190 LOG.info("expected exception caught", e); 191 assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); 192 } 193 194 @Test 195 public void testConnectToDatanodeFailed() 196 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 197 InvocationTargetException, InterruptedException, NoSuchFieldException { 198 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 199 xceiverServerDaemonField.setAccessible(true); 200 Class<?> xceiverServerClass = 201 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 202 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 203 numPeersMethod.setAccessible(true); 204 // make one datanode broken 205 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 206 Path f = new Path("/test"); 207 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 208 try (FanOutOneBlockAsyncDFSOutput output = 209 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 210 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { 211 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 212 assertEquals(2, output.getPipeline().length); 213 } finally { 214 CLUSTER.restartDataNode(dnProp); 215 CLUSTER.triggerBlockReports(); 216 } 217 } 218 219 @Test 220 public void testExcludeFailedConnectToDatanode() 221 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 222 InvocationTargetException, InterruptedException, NoSuchFieldException { 223 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 224 xceiverServerDaemonField.setAccessible(true); 225 Class<?> xceiverServerClass = 226 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 227 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 228 numPeersMethod.setAccessible(true); 229 // make one datanode broken 230 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 231 Path f = new Path("/test"); 232 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 233 ExcludeDatanodeManager excludeDatanodeManager = 234 new ExcludeDatanodeManager(HBaseConfiguration.create()); 235 StreamSlowMonitor streamSlowDNsMonitor = 236 excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); 237 assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); 238 try (FanOutOneBlockAsyncDFSOutput output = 239 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 240 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { 241 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 242 assertEquals(2, output.getPipeline().length); 243 assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); 244 } finally { 245 CLUSTER.restartDataNode(dnProp); 246 CLUSTER.triggerBlockReports(); 247 } 248 } 249 250 @Test 251 public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { 252 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 253 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 254 true, false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); 255 byte[] b = new byte[50 * 1024 * 1024]; 256 Bytes.random(b); 257 out.write(b); 258 consume(out.flush(false)); 259 assertEquals(b.length, out.flush(false).get().longValue()); 260 out.close(); 261 assertEquals(b.length, FS.getFileStatus(file).getLen()); 262 byte[] actual = new byte[b.length]; 263 try (FSDataInputStream in = FS.open(file)) { 264 in.readFully(actual); 265 } 266 assertArrayEquals(b, actual); 267 } 268}