001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.CyclicBarrier;
029import java.util.concurrent.ExecutionException;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
038import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
039import org.apache.hadoop.hdfs.server.datanode.DataNode;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
051import org.apache.hbase.thirdparty.io.netty.channel.Channel;
052import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
053import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
054import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
055import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
056import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
057import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
058
059/**
060 * Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in
061 * {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no
062 * out going packet, the timeout is controlled by
063 * {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending
064 * package out and DN will respond immedately and then mess up the testing handler added by us. So
065 * in this test class we use the default value for timeout which is 60 seconds and it is enough for
066 * this test.
067 */
068@Category({ MiscTests.class, MediumTests.class })
069public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class);
074
075  private static final Logger LOG =
076    LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class);
077
078  private static DistributedFileSystem FS;
079
080  private static EventLoopGroup EVENT_LOOP_GROUP;
081
082  private static Class<? extends Channel> CHANNEL_CLASS;
083
084  private static StreamSlowMonitor MONITOR;
085
086  private static FanOutOneBlockAsyncDFSOutput OUT;
087
088  @Rule
089  public TestName name = new TestName();
090
091  @BeforeClass
092  public static void setUp() throws Exception {
093    startMiniDFSCluster(2);
094    FS = CLUSTER.getFileSystem();
095    EVENT_LOOP_GROUP = new NioEventLoopGroup();
096    CHANNEL_CLASS = NioSocketChannel.class;
097    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
098    Path f = new Path("/testHang");
099    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
100    OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
101      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
102  }
103
104  @AfterClass
105  public static void tearDown() throws Exception {
106    if (OUT != null) {
107      OUT.recoverAndClose(null);
108    }
109    if (EVENT_LOOP_GROUP != null) {
110      EVENT_LOOP_GROUP.shutdownGracefully().get();
111    }
112    shutdownMiniDFSCluster();
113  }
114
115  /**
116   * <pre>
117   * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
118   * The threads sequence before HBASE-26679 is:
119   * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
120   *   {@link FanOutOneBlockAsyncDFSOutput.Callback} in
121   *   {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
122   * 2.The ack from dn1 arrives firstly and triggers Netty to invoke
123   *   {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
124   *   {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
125   *   {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
126   * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
127   *   so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
128   *   and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
129   *   contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
130   *   {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
131   *   {@link FanOutOneBlockAsyncDFSOutput#state} is set to
132   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
133   *   {@link FanOutOneBlockAsyncDFSOutput#failed}.
134   * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
135   *   but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
136   *   {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
137   *   {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
138   *   returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
139   * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
140   * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
141   * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
142   * </pre>
143   */
144  @Test
145  public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
146
147    DataNodeProperties firstDataNodeProperties = null;
148    try {
149
150      final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2);
151      Map<Channel, DatanodeInfo> datanodeInfoMap = OUT.getDatanodeInfoMap();
152      Iterator<Map.Entry<Channel, DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator();
153      assertTrue(iterator.hasNext());
154      Map.Entry<Channel, DatanodeInfo> dn1Entry = iterator.next();
155      Channel dn1Channel = dn1Entry.getKey();
156      DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue();
157      final List<String> protobufDecoderNames = new ArrayList<String>();
158      dn1Channel.pipeline().forEach((entry) -> {
159        if (ProtobufDecoder.class.isInstance(entry.getValue())) {
160          protobufDecoderNames.add(entry.getKey());
161        }
162      });
163      assertTrue(protobufDecoderNames.size() == 1);
164      dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler",
165        new ChannelInboundHandlerAdapter() {
166          @Override
167          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
168            super.channelRead(ctx, msg);
169            dn1AckReceivedCyclicBarrier.await();
170          }
171        });
172
173      assertTrue(iterator.hasNext());
174      Map.Entry<Channel, DatanodeInfo> dn2Entry = iterator.next();
175      Channel dn2Channel = dn2Entry.getKey();
176
177      /**
178       * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a
179       * slow dn2.
180       */
181      dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
182
183        @Override
184        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
185          if (!(msg instanceof ByteBuf)) {
186            ctx.fireChannelRead(msg);
187          } else {
188            ((ByteBuf) msg).release();
189          }
190        }
191      });
192
193      byte[] b = new byte[10];
194      Bytes.random(b);
195      OUT.write(b, 0, b.length);
196      CompletableFuture<Long> future = OUT.flush(false);
197      /**
198       * Wait for ack from dn1.
199       */
200      dn1AckReceivedCyclicBarrier.await();
201      /**
202       * First ack is received from dn1,we could stop dn1 now.
203       */
204      firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo);
205      assertTrue(firstDataNodeProperties != null);
206      try {
207        /**
208         * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with
209         * {@link ExecutionException}.
210         */
211        future.get();
212        fail();
213      } catch (ExecutionException e) {
214        assertTrue(e != null);
215        LOG.info("expected exception caught when get future", e);
216      }
217      /**
218       * Make sure all the data node channel are closed.
219       */
220      datanodeInfoMap.keySet().forEach(ch -> {
221        try {
222          ch.closeFuture().get();
223        } catch (InterruptedException | ExecutionException e) {
224          throw new RuntimeException(e);
225        }
226      });
227    } finally {
228      if (firstDataNodeProperties != null) {
229        CLUSTER.restartDataNode(firstDataNodeProperties);
230      }
231    }
232  }
233
234  private static DataNodeProperties findAndKillFirstDataNode(DatanodeInfo firstDatanodeInfo) {
235    assertTrue(firstDatanodeInfo != null);
236    ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes();
237    ArrayList<Integer> foundIndexes = new ArrayList<Integer>();
238    int index = 0;
239    for (DataNode dataNode : dataNodes) {
240      if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) {
241        foundIndexes.add(index);
242      }
243      index++;
244    }
245    assertTrue(foundIndexes.size() == 1);
246    return CLUSTER.stopDataNode(foundIndexes.get(0));
247  }
248
249}