001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.lang.reflect.InvocationTargetException;
025import java.lang.reflect.Method;
026import java.net.BindException;
027import java.net.ServerSocket;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.BlockLocation;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.MiscTests;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hdfs.DistributedFileSystem;
040import org.apache.hadoop.hdfs.MiniDFSCluster;
041import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
042import org.apache.hadoop.hdfs.protocol.LocatedBlock;
043import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
044import org.apache.hadoop.hdfs.server.datanode.DataNode;
045import org.junit.jupiter.api.AfterEach;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
054 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
055 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
056 * shutdown hook' error in region server.
057 */
058@Tag(MiscTests.TAG)
059@Tag(LargeTests.TAG)
060public class TestBlockReorder {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
063
064  private Configuration conf;
065  private MiniDFSCluster cluster;
066  private HBaseTestingUtil htu;
067  private DistributedFileSystem dfs;
068  private static final String host1 = "host1";
069  private static final String host2 = "host2";
070  private static final String host3 = "host3";
071
072  @BeforeEach
073  public void setUp() throws Exception {
074    htu = new HBaseTestingUtil();
075    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
076    htu.getConfiguration().setInt("dfs.replication", 3);
077    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
078      new String[] { host1, host2, host3 });
079
080    conf = htu.getConfiguration();
081    cluster = htu.getDFSCluster();
082    dfs = (DistributedFileSystem) FileSystem.get(conf);
083  }
084
085  @AfterEach
086  public void tearDownAfterClass() throws Exception {
087    htu.shutdownMiniCluster();
088  }
089
090  /**
091   * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
092   */
093  @Test
094  public void testBlockLocationReorder() throws Exception {
095    Path p = new Path("hello");
096
097    assertTrue((short) cluster.getDataNodes().size() > 1);
098    final int repCount = 2;
099
100    // Let's write the file
101    FSDataOutputStream fop = dfs.create(p, (short) repCount);
102    final double toWrite = 875.5613;
103    fop.writeDouble(toWrite);
104    fop.close();
105
106    // Let's check we can read it when everybody's there
107    long start = EnvironmentEdgeManager.currentTime();
108    FSDataInputStream fin = dfs.open(p);
109    assertTrue(toWrite == fin.readDouble());
110    long end = EnvironmentEdgeManager.currentTime();
111    LOG.info("readtime= " + (end - start));
112    fin.close();
113    assertTrue((end - start) < 30 * 1000);
114
115    // Let's kill the first location. But actually the fist location returned will change
116    // The first thing to do is to get the location, then the port
117    FileStatus f = dfs.getFileStatus(p);
118    BlockLocation[] lbs;
119    do {
120      lbs = dfs.getFileBlockLocations(f, 0, 1);
121    } while (lbs.length != 1 && lbs[0].getLength() != repCount);
122    final String name = lbs[0].getNames()[0];
123    assertTrue(name.indexOf(':') > 0);
124    String portS = name.substring(name.indexOf(':') + 1);
125    final int port = Integer.parseInt(portS);
126    LOG.info("port= " + port);
127    int ipcPort = -1;
128
129    // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
130    // to iterate ourselves.
131    boolean ok = false;
132    final String lookup = lbs[0].getHosts()[0];
133    StringBuilder sb = new StringBuilder();
134    for (DataNode dn : cluster.getDataNodes()) {
135      final String dnName = getHostName(dn);
136      sb.append(dnName).append(' ');
137      if (lookup.equals(dnName)) {
138        ok = true;
139        LOG.info("killing datanode " + name + " / " + lookup);
140        ipcPort = dn.ipcServer.getListenerAddress().getPort();
141        dn.shutdown();
142        LOG.info("killed datanode " + name + " / " + lookup);
143        break;
144      }
145    }
146    assertTrue(ok, "didn't find the server to kill, was looking for " + lookup + " found " + sb);
147    LOG.info("ipc port= " + ipcPort);
148
149    // Add the hook, with an implementation checking that we don't use the port we've just killed.
150    assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() {
151      @Override
152      public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
153        for (LocatedBlock lb : lbs.getLocatedBlocks()) {
154          if (getLocatedBlockLocations(lb).length > 1) {
155            DatanodeInfo[] infos = getLocatedBlockLocations(lb);
156            if (infos[0].getHostName().equals(lookup)) {
157              LOG.info("HFileSystem bad host, inverting");
158              DatanodeInfo tmp = infos[0];
159              infos[0] = infos[1];
160              infos[1] = tmp;
161            }
162          }
163        }
164      }
165    }));
166
167    final int retries = 10;
168    ServerSocket ss = null;
169    ServerSocket ssI;
170    try {
171      ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
172      ssI = new ServerSocket(ipcPort);
173    } catch (BindException be) {
174      LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort
175        + ", this means that the datanode has not closed the socket or"
176        + " someone else took it. It may happen, skipping this test for this time.", be);
177      if (ss != null) {
178        ss.close();
179      }
180      return;
181    }
182
183    // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
184    // so we try retries times; with the reorder it will never last more than a few milli seconds
185    for (int i = 0; i < retries; i++) {
186      start = EnvironmentEdgeManager.currentTime();
187      fin = dfs.open(p);
188      assertTrue(toWrite == fin.readDouble());
189      fin.close();
190      end = EnvironmentEdgeManager.currentTime();
191      LOG.info("HFileSystem readtime= " + (end - start));
192      assertFalse((end - start) > 60000, "We took too much time to read");
193    }
194
195    ss.close();
196    ssI.close();
197  }
198
199  /**
200   * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
201   */
202  private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
203    Method m;
204    try {
205      m = DataNode.class.getMethod("getDisplayName");
206    } catch (NoSuchMethodException e) {
207      try {
208        m = DataNode.class.getMethod("getHostName");
209      } catch (NoSuchMethodException e1) {
210        throw new RuntimeException(e1);
211      }
212    }
213
214    String res = (String) m.invoke(dn);
215    if (res.contains(":")) {
216      return res.split(":")[0];
217    } else {
218      return res;
219    }
220  }
221
222}