001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.ExecutionException;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.MiscTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hdfs.DistributedFileSystem;
036import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
037import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
038import org.apache.hadoop.hdfs.server.datanode.DataNode;
039import org.junit.jupiter.api.AfterAll;
040import org.junit.jupiter.api.BeforeAll;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
047import org.apache.hbase.thirdparty.io.netty.channel.Channel;
048import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
049import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
050import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
051import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
053import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
054
055/**
056 * Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in
057 * {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no
058 * out going packet, the timeout is controlled by
059 * {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending
060 * package out and DN will respond immedately and then mess up the testing handler added by us. So
061 * in this test class we use the default value for timeout which is 60 seconds and it is enough for
062 * this test.
063 */
064@Tag(MiscTests.TAG)
065@Tag(MediumTests.TAG)
066public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
067
068  private static final Logger LOG =
069    LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class);
070
071  private static DistributedFileSystem FS;
072
073  private static EventLoopGroup EVENT_LOOP_GROUP;
074
075  private static Class<? extends Channel> CHANNEL_CLASS;
076
077  private static StreamSlowMonitor MONITOR;
078
079  private static FanOutOneBlockAsyncDFSOutput OUT;
080
081  @BeforeAll
082  public static void setUp() throws Exception {
083    startMiniDFSCluster(2);
084    FS = CLUSTER.getFileSystem();
085    EVENT_LOOP_GROUP = new NioEventLoopGroup();
086    CHANNEL_CLASS = NioSocketChannel.class;
087    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
088    Path f = new Path("/testHang");
089    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
090    OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
091      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
092  }
093
094  @AfterAll
095  public static void tearDown() throws Exception {
096    if (OUT != null) {
097      OUT.recoverAndClose(null);
098    }
099    if (EVENT_LOOP_GROUP != null) {
100      EVENT_LOOP_GROUP.shutdownGracefully().get();
101    }
102    shutdownMiniDFSCluster();
103  }
104
105  /**
106   * <pre>
107   * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
108   * The threads sequence before HBASE-26679 is:
109   * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
110   *   {@link FanOutOneBlockAsyncDFSOutput.Callback} in
111   *   {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
112   * 2.The ack from dn1 arrives firstly and triggers Netty to invoke
113   *   {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
114   *   {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
115   *   {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
116   * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
117   *   so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
118   *   and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
119   *   contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
120   *   {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
121   *   {@link FanOutOneBlockAsyncDFSOutput#state} is set to
122   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
123   *   {@link FanOutOneBlockAsyncDFSOutput#failed}.
124   * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
125   *   but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
126   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
127   *   {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
128   *   returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
129   * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
130   * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
131   * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
132   * </pre>
133   */
134  @Test
135  public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
136
137    DataNodeProperties firstDataNodeProperties = null;
138    try {
139
140      final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2);
141      Map<Channel, DatanodeInfo> datanodeInfoMap = OUT.getDatanodeInfoMap();
142      Iterator<Map.Entry<Channel, DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator();
143      assertTrue(iterator.hasNext());
144      Map.Entry<Channel, DatanodeInfo> dn1Entry = iterator.next();
145      Channel dn1Channel = dn1Entry.getKey();
146      DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue();
147      final List<String> protobufDecoderNames = new ArrayList<String>();
148      dn1Channel.pipeline().forEach((entry) -> {
149        if (ProtobufDecoder.class.isInstance(entry.getValue())) {
150          protobufDecoderNames.add(entry.getKey());
151        }
152      });
153      assertTrue(protobufDecoderNames.size() == 1);
154      dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler",
155        new ChannelInboundHandlerAdapter() {
156          @Override
157          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
158            super.channelRead(ctx, msg);
159            dn1AckReceivedCyclicBarrier.await();
160          }
161        });
162
163      assertTrue(iterator.hasNext());
164      Map.Entry<Channel, DatanodeInfo> dn2Entry = iterator.next();
165      Channel dn2Channel = dn2Entry.getKey();
166
167      /**
168       * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a
169       * slow dn2.
170       */
171      dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
172
173        @Override
174        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
175          if (!(msg instanceof ByteBuf)) {
176            ctx.fireChannelRead(msg);
177          } else {
178            ((ByteBuf) msg).release();
179          }
180        }
181      });
182
183      byte[] b = new byte[10];
184      Bytes.random(b);
185      OUT.write(b, 0, b.length);
186      CompletableFuture<Long> future = OUT.flush(false);
187      /**
188       * Wait for ack from dn1.
189       */
190      dn1AckReceivedCyclicBarrier.await();
191      /**
192       * First ack is received from dn1,we could stop dn1 now.
193       */
194      firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo);
195      assertTrue(firstDataNodeProperties != null);
196      try {
197        /**
198         * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with
199         * {@link ExecutionException}.
200         */
201        future.get();
202        fail();
203      } catch (ExecutionException e) {
204        assertTrue(e != null);
205        LOG.info("expected exception caught when get future", e);
206      }
207      /**
208       * Make sure all the data node channel are closed.
209       */
210      datanodeInfoMap.keySet().forEach(ch -> {
211        try {
212          ch.closeFuture().get();
213        } catch (InterruptedException | ExecutionException e) {
214          throw new RuntimeException(e);
215        }
216      });
217    } finally {
218      if (firstDataNodeProperties != null) {
219        CLUSTER.restartDataNode(firstDataNodeProperties);
220      }
221    }
222  }
223
224  private static DataNodeProperties findAndKillFirstDataNode(DatanodeInfo firstDatanodeInfo) {
225    assertTrue(firstDatanodeInfo != null);
226    ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes();
227    ArrayList<Integer> foundIndexes = new ArrayList<Integer>();
228    int index = 0;
229    for (DataNode dataNode : dataNodes) {
230      if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) {
231        foundIndexes.add(index);
232      }
233      index++;
234    }
235    assertTrue(foundIndexes.size() == 1);
236    return CLUSTER.stopDataNode(foundIndexes.get(0));
237  }
238
239}