001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.net.BindException;
025import java.net.ServerSocket;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.BlockLocation;
028import org.apache.hadoop.fs.FSDataInputStream;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.MiscTests;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hdfs.DistributedFileSystem;
039import org.apache.hadoop.hdfs.MiniDFSCluster;
040import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
041import org.apache.hadoop.hdfs.protocol.LocatedBlock;
042import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
043import org.apache.hadoop.hdfs.server.datanode.DataNode;
044import org.junit.After;
045import org.junit.Assert;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
057 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
058 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
059 * shutdown hook' error in region server.
060 */
061@Category({ MiscTests.class, LargeTests.class })
062public class TestBlockReorder {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestBlockReorder.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
069
070  private Configuration conf;
071  private MiniDFSCluster cluster;
072  private HBaseTestingUtil htu;
073  private DistributedFileSystem dfs;
074  private static final String host1 = "host1";
075  private static final String host2 = "host2";
076  private static final String host3 = "host3";
077
078  @Rule
079  public TestName name = new TestName();
080
081  @Before
082  public void setUp() throws Exception {
083    htu = new HBaseTestingUtil();
084    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
085    htu.getConfiguration().setInt("dfs.replication", 3);
086    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
087      new String[] { host1, host2, host3 });
088
089    conf = htu.getConfiguration();
090    cluster = htu.getDFSCluster();
091    dfs = (DistributedFileSystem) FileSystem.get(conf);
092  }
093
094  @After
095  public void tearDownAfterClass() throws Exception {
096    htu.shutdownMiniCluster();
097  }
098
099  /**
100   * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
101   */
102  @Test
103  public void testBlockLocationReorder() throws Exception {
104    Path p = new Path("hello");
105
106    Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
107    final int repCount = 2;
108
109    // Let's write the file
110    FSDataOutputStream fop = dfs.create(p, (short) repCount);
111    final double toWrite = 875.5613;
112    fop.writeDouble(toWrite);
113    fop.close();
114
115    // Let's check we can read it when everybody's there
116    long start = EnvironmentEdgeManager.currentTime();
117    FSDataInputStream fin = dfs.open(p);
118    Assert.assertTrue(toWrite == fin.readDouble());
119    long end = EnvironmentEdgeManager.currentTime();
120    LOG.info("readtime= " + (end - start));
121    fin.close();
122    Assert.assertTrue((end - start) < 30 * 1000);
123
124    // Let's kill the first location. But actually the fist location returned will change
125    // The first thing to do is to get the location, then the port
126    FileStatus f = dfs.getFileStatus(p);
127    BlockLocation[] lbs;
128    do {
129      lbs = dfs.getFileBlockLocations(f, 0, 1);
130    } while (lbs.length != 1 && lbs[0].getLength() != repCount);
131    final String name = lbs[0].getNames()[0];
132    Assert.assertTrue(name.indexOf(':') > 0);
133    String portS = name.substring(name.indexOf(':') + 1);
134    final int port = Integer.parseInt(portS);
135    LOG.info("port= " + port);
136    int ipcPort = -1;
137
138    // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
139    // to iterate ourselves.
140    boolean ok = false;
141    final String lookup = lbs[0].getHosts()[0];
142    StringBuilder sb = new StringBuilder();
143    for (DataNode dn : cluster.getDataNodes()) {
144      final String dnName = getHostName(dn);
145      sb.append(dnName).append(' ');
146      if (lookup.equals(dnName)) {
147        ok = true;
148        LOG.info("killing datanode " + name + " / " + lookup);
149        ipcPort = dn.ipcServer.getListenerAddress().getPort();
150        dn.shutdown();
151        LOG.info("killed datanode " + name + " / " + lookup);
152        break;
153      }
154    }
155    Assert.assertTrue("didn't find the server to kill, was looking for " + lookup + " found " + sb,
156      ok);
157    LOG.info("ipc port= " + ipcPort);
158
159    // Add the hook, with an implementation checking that we don't use the port we've just killed.
160    Assert
161      .assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() {
162        @Override
163        public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
164          for (LocatedBlock lb : lbs.getLocatedBlocks()) {
165            if (getLocatedBlockLocations(lb).length > 1) {
166              DatanodeInfo[] infos = getLocatedBlockLocations(lb);
167              if (infos[0].getHostName().equals(lookup)) {
168                LOG.info("HFileSystem bad host, inverting");
169                DatanodeInfo tmp = infos[0];
170                infos[0] = infos[1];
171                infos[1] = tmp;
172              }
173            }
174          }
175        }
176      }));
177
178    final int retries = 10;
179    ServerSocket ss = null;
180    ServerSocket ssI;
181    try {
182      ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
183      ssI = new ServerSocket(ipcPort);
184    } catch (BindException be) {
185      LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort
186        + ", this means that the datanode has not closed the socket or"
187        + " someone else took it. It may happen, skipping this test for this time.", be);
188      if (ss != null) {
189        ss.close();
190      }
191      return;
192    }
193
194    // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
195    // so we try retries times; with the reorder it will never last more than a few milli seconds
196    for (int i = 0; i < retries; i++) {
197      start = EnvironmentEdgeManager.currentTime();
198      fin = dfs.open(p);
199      Assert.assertTrue(toWrite == fin.readDouble());
200      fin.close();
201      end = EnvironmentEdgeManager.currentTime();
202      LOG.info("HFileSystem readtime= " + (end - start));
203      Assert.assertFalse("We took too much time to read", (end - start) > 60000);
204    }
205
206    ss.close();
207    ssI.close();
208  }
209
210  /**
211   * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
212   */
213  private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
214    Method m;
215    try {
216      m = DataNode.class.getMethod("getDisplayName");
217    } catch (NoSuchMethodException e) {
218      try {
219        m = DataNode.class.getMethod("getHostName");
220      } catch (NoSuchMethodException e1) {
221        throw new RuntimeException(e1);
222      }
223    }
224
225    String res = (String) m.invoke(dn);
226    if (res.contains(":")) {
227      return res.split(":")[0];
228    } else {
229      return res;
230    }
231  }
232
233}