001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.consume; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 022import static org.hamcrest.CoreMatchers.instanceOf; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertArrayEquals; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.fail; 027 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.lang.reflect.InvocationTargetException; 032import java.lang.reflect.Method; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.MiscTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hdfs.DistributedFileSystem; 049import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 050import org.apache.hadoop.hdfs.server.datanode.DataNode; 051import org.apache.hadoop.ipc.RemoteException; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.io.netty.channel.Channel; 063import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 064import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 065import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 066import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 067 068@Category({ MiscTests.class, MediumTests.class }) 069public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); 076 private static DistributedFileSystem FS; 077 private static EventLoopGroup EVENT_LOOP_GROUP; 078 private static Class<? extends Channel> CHANNEL_CLASS; 079 private static int READ_TIMEOUT_MS = 2000; 080 081 private static StreamSlowMonitor MONITOR; 082 083 @Rule 084 public TestName name = new TestName(); 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 089 startMiniDFSCluster(3); 090 FS = CLUSTER.getFileSystem(); 091 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 092 CHANNEL_CLASS = NioSocketChannel.class; 093 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 094 } 095 096 @AfterClass 097 public static void tearDown() throws Exception { 098 if (EVENT_LOOP_GROUP != null) { 099 EVENT_LOOP_GROUP.shutdownGracefully().get(); 100 } 101 shutdownMiniDFSCluster(); 102 } 103 104 private static final Random RNG = new Random(); // This test depends on Random#setSeed 105 106 static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) 107 throws IOException, InterruptedException, ExecutionException { 108 List<CompletableFuture<Long>> futures = new ArrayList<>(); 109 byte[] b = new byte[10]; 110 // test pipelined flush 111 RNG.setSeed(12345); 112 for (int i = 0; i < 10; i++) { 113 RNG.nextBytes(b); 114 out.write(b); 115 futures.add(out.flush(false)); 116 futures.add(out.flush(false)); 117 } 118 for (int i = 0; i < 10; i++) { 119 assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue()); 120 assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); 121 } 122 out.close(); 123 assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); 124 byte[] actual = new byte[b.length]; 125 RNG.setSeed(12345); 126 try (FSDataInputStream in = fs.open(f)) { 127 for (int i = 0; i < 10; i++) { 128 in.readFully(actual); 129 RNG.nextBytes(b); 130 assertArrayEquals(b, actual); 131 } 132 assertEquals(-1, in.read()); 133 } 134 } 135 136 @Test 137 public void test() throws IOException, InterruptedException, ExecutionException { 138 Path f = new Path("/" + name.getMethodName()); 139 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 140 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 141 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); 142 writeAndVerify(FS, f, out); 143 } 144 145 @Test 146 public void testRecover() throws IOException, InterruptedException, ExecutionException { 147 Path f = new Path("/" + name.getMethodName()); 148 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 149 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 150 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); 151 byte[] b = new byte[10]; 152 Bytes.random(b); 153 out.write(b, 0, b.length); 154 out.flush(false).get(); 155 // restart one datanode which causes one connection broken 156 CLUSTER.restartDataNode(0); 157 out.write(b, 0, b.length); 158 try { 159 out.flush(false).get(); 160 fail("flush should fail"); 161 } catch (ExecutionException e) { 162 // we restarted one datanode so the flush should fail 163 LOG.info("expected exception caught", e); 164 } 165 out.recoverAndClose(null); 166 assertEquals(b.length, FS.getFileStatus(f).getLen()); 167 byte[] actual = new byte[b.length]; 168 try (FSDataInputStream in = FS.open(f)) { 169 in.readFully(actual); 170 } 171 assertArrayEquals(b, actual); 172 } 173 174 @Test 175 public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { 176 Path f = new Path("/" + name.getMethodName()); 177 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 178 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 179 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); 180 Thread.sleep(READ_TIMEOUT_MS * 2); 181 // the connection to datanode should still alive. 182 writeAndVerify(FS, f, out); 183 } 184 185 /** 186 * This is important for fencing when recover from RS crash. 187 */ 188 @Test 189 public void testCreateParentFailed() throws IOException { 190 Path f = new Path("/" + name.getMethodName() + "/test"); 191 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 192 try { 193 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 194 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); 195 fail("should fail with parent does not exist"); 196 } catch (RemoteException e) { 197 LOG.info("expected exception caught", e); 198 assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); 199 } 200 } 201 202 @Test 203 public void testConnectToDatanodeFailed() 204 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 205 InvocationTargetException, InterruptedException, NoSuchFieldException { 206 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 207 xceiverServerDaemonField.setAccessible(true); 208 Class<?> xceiverServerClass = 209 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 210 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 211 numPeersMethod.setAccessible(true); 212 // make one datanode broken 213 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 214 Path f = new Path("/test"); 215 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 216 try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, 217 f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { 218 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 219 assertEquals(2, output.getPipeline().length); 220 } finally { 221 CLUSTER.restartDataNode(dnProp); 222 CLUSTER.triggerBlockReports(); 223 } 224 } 225 226 @Test 227 public void testExcludeFailedConnectToDatanode() 228 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 229 InvocationTargetException, InterruptedException, NoSuchFieldException { 230 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 231 xceiverServerDaemonField.setAccessible(true); 232 Class<?> xceiverServerClass = 233 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 234 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 235 numPeersMethod.setAccessible(true); 236 // make one datanode broken 237 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 238 Path f = new Path("/test"); 239 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 240 ExcludeDatanodeManager excludeDatanodeManager = 241 new ExcludeDatanodeManager(HBaseConfiguration.create()); 242 StreamSlowMonitor streamSlowDNsMonitor = 243 excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); 244 assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); 245 try (FanOutOneBlockAsyncDFSOutput output = 246 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 247 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) { 248 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 249 assertEquals(2, output.getPipeline().length); 250 assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); 251 } finally { 252 CLUSTER.restartDataNode(dnProp); 253 CLUSTER.triggerBlockReports(); 254 } 255 } 256 257 @Test 258 public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { 259 Path f = new Path("/" + name.getMethodName()); 260 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 261 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 262 false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); 263 byte[] b = new byte[50 * 1024 * 1024]; 264 Bytes.random(b); 265 out.write(b); 266 consume(out.flush(false)); 267 assertEquals(b.length, out.flush(false).get().longValue()); 268 out.close(); 269 assertEquals(b.length, FS.getFileStatus(f).getLen()); 270 byte[] actual = new byte[b.length]; 271 try (FSDataInputStream in = FS.open(f)) { 272 in.readFully(actual); 273 } 274 assertArrayEquals(b, actual); 275 } 276}