001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.ExecutionException; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.testclassification.MiscTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hdfs.DistributedFileSystem; 036import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 037import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 038import org.apache.hadoop.hdfs.server.datanode.DataNode; 039import org.junit.jupiter.api.AfterAll; 040import org.junit.jupiter.api.BeforeAll; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 047import org.apache.hbase.thirdparty.io.netty.channel.Channel; 048import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 049import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; 050import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 051import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 053import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 054 055/** 056 * Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in 057 * {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no 058 * out going packet, the timeout is controlled by 059 * {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending 060 * package out and DN will respond immedately and then mess up the testing handler added by us. So 061 * in this test class we use the default value for timeout which is 60 seconds and it is enough for 062 * this test. 063 */ 064@Tag(MiscTests.TAG) 065@Tag(MediumTests.TAG) 066public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase { 067 068 private static final Logger LOG = 069 LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class); 070 071 private static DistributedFileSystem FS; 072 073 private static EventLoopGroup EVENT_LOOP_GROUP; 074 075 private static Class<? extends Channel> CHANNEL_CLASS; 076 077 private static StreamSlowMonitor MONITOR; 078 079 private static FanOutOneBlockAsyncDFSOutput OUT; 080 081 @BeforeAll 082 public static void setUp() throws Exception { 083 startMiniDFSCluster(2); 084 FS = CLUSTER.getFileSystem(); 085 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 086 CHANNEL_CLASS = NioSocketChannel.class; 087 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 088 Path f = new Path("/testHang"); 089 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 090 OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, 091 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 092 } 093 094 @AfterAll 095 public static void tearDown() throws Exception { 096 if (OUT != null) { 097 OUT.recoverAndClose(null); 098 } 099 if (EVENT_LOOP_GROUP != null) { 100 EVENT_LOOP_GROUP.shutdownGracefully().get(); 101 } 102 shutdownMiniDFSCluster(); 103 } 104 105 /** 106 * <pre> 107 * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN. 108 * The threads sequence before HBASE-26679 is: 109 * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one 110 * {@link FanOutOneBlockAsyncDFSOutput.Callback} in 111 * {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}. 112 * 2.The ack from dn1 arrives firstly and triggers Netty to invoke 113 * {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in 114 * {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from 115 * {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}. 116 * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception, 117 * so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel, 118 * and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not 119 * contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in 120 * {@link FanOutOneBlockAsyncDFSOutput#failed} method,and 121 * {@link FanOutOneBlockAsyncDFSOutput#state} is set to 122 * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of 123 * {@link FanOutOneBlockAsyncDFSOutput#failed}. 124 * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed, 125 * but because {@link FanOutOneBlockAsyncDFSOutput#state} is already 126 * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole 127 * {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future 128 * returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever. 129 * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state} 130 * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger 131 * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}. 132 * </pre> 133 */ 134 @Test 135 public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { 136 137 DataNodeProperties firstDataNodeProperties = null; 138 try { 139 140 final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); 141 Map<Channel, DatanodeInfo> datanodeInfoMap = OUT.getDatanodeInfoMap(); 142 Iterator<Map.Entry<Channel, DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator(); 143 assertTrue(iterator.hasNext()); 144 Map.Entry<Channel, DatanodeInfo> dn1Entry = iterator.next(); 145 Channel dn1Channel = dn1Entry.getKey(); 146 DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); 147 final List<String> protobufDecoderNames = new ArrayList<String>(); 148 dn1Channel.pipeline().forEach((entry) -> { 149 if (ProtobufDecoder.class.isInstance(entry.getValue())) { 150 protobufDecoderNames.add(entry.getKey()); 151 } 152 }); 153 assertTrue(protobufDecoderNames.size() == 1); 154 dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", 155 new ChannelInboundHandlerAdapter() { 156 @Override 157 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 158 super.channelRead(ctx, msg); 159 dn1AckReceivedCyclicBarrier.await(); 160 } 161 }); 162 163 assertTrue(iterator.hasNext()); 164 Map.Entry<Channel, DatanodeInfo> dn2Entry = iterator.next(); 165 Channel dn2Channel = dn2Entry.getKey(); 166 167 /** 168 * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a 169 * slow dn2. 170 */ 171 dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { 172 173 @Override 174 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 175 if (!(msg instanceof ByteBuf)) { 176 ctx.fireChannelRead(msg); 177 } else { 178 ((ByteBuf) msg).release(); 179 } 180 } 181 }); 182 183 byte[] b = new byte[10]; 184 Bytes.random(b); 185 OUT.write(b, 0, b.length); 186 CompletableFuture<Long> future = OUT.flush(false); 187 /** 188 * Wait for ack from dn1. 189 */ 190 dn1AckReceivedCyclicBarrier.await(); 191 /** 192 * First ack is received from dn1,we could stop dn1 now. 193 */ 194 firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); 195 assertTrue(firstDataNodeProperties != null); 196 try { 197 /** 198 * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with 199 * {@link ExecutionException}. 200 */ 201 future.get(); 202 fail(); 203 } catch (ExecutionException e) { 204 assertTrue(e != null); 205 LOG.info("expected exception caught when get future", e); 206 } 207 /** 208 * Make sure all the data node channel are closed. 209 */ 210 datanodeInfoMap.keySet().forEach(ch -> { 211 try { 212 ch.closeFuture().get(); 213 } catch (InterruptedException | ExecutionException e) { 214 throw new RuntimeException(e); 215 } 216 }); 217 } finally { 218 if (firstDataNodeProperties != null) { 219 CLUSTER.restartDataNode(firstDataNodeProperties); 220 } 221 } 222 } 223 224 private static DataNodeProperties findAndKillFirstDataNode(DatanodeInfo firstDatanodeInfo) { 225 assertTrue(firstDatanodeInfo != null); 226 ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes(); 227 ArrayList<Integer> foundIndexes = new ArrayList<Integer>(); 228 int index = 0; 229 for (DataNode dataNode : dataNodes) { 230 if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { 231 foundIndexes.add(index); 232 } 233 index++; 234 } 235 assertTrue(foundIndexes.size() == 1); 236 return CLUSTER.stopDataNode(foundIndexes.get(0)); 237 } 238 239}