001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.net.BindException;
023import java.net.ServerSocket;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.BlockLocation;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
039import org.apache.hadoop.hdfs.protocol.LocatedBlock;
040import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
041import org.apache.hadoop.hdfs.server.datanode.DataNode;
042import org.junit.After;
043import org.junit.Assert;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.rules.TestName;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
055 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
056 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
057 * shutdown hook' error in region server.
058 */
059@Category({ MiscTests.class, LargeTests.class })
060public class TestBlockReorder {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestBlockReorder.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
067
068  private Configuration conf;
069  private MiniDFSCluster cluster;
070  private HBaseTestingUtility htu;
071  private DistributedFileSystem dfs;
072  private static final String host1 = "host1";
073  private static final String host2 = "host2";
074  private static final String host3 = "host3";
075
076  @Rule
077  public TestName name = new TestName();
078
079  @Before
080  public void setUp() throws Exception {
081    htu = new HBaseTestingUtility();
082    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
083    htu.getConfiguration().setInt("dfs.replication", 3);
084    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
085      new String[] { host1, host2, host3 });
086
087    conf = htu.getConfiguration();
088    cluster = htu.getDFSCluster();
089    dfs = (DistributedFileSystem) FileSystem.get(conf);
090  }
091
092  @After
093  public void tearDownAfterClass() throws Exception {
094    htu.shutdownMiniCluster();
095  }
096
097  /**
098   * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
099   */
100  @Test
101  public void testBlockLocationReorder() throws Exception {
102    Path p = new Path("hello");
103
104    Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
105    final int repCount = 2;
106
107    // Let's write the file
108    FSDataOutputStream fop = dfs.create(p, (short) repCount);
109    final double toWrite = 875.5613;
110    fop.writeDouble(toWrite);
111    fop.close();
112
113    // Let's check we can read it when everybody's there
114    long start = EnvironmentEdgeManager.currentTime();
115    FSDataInputStream fin = dfs.open(p);
116    Assert.assertTrue(toWrite == fin.readDouble());
117    long end = EnvironmentEdgeManager.currentTime();
118    LOG.info("readtime= " + (end - start));
119    fin.close();
120    Assert.assertTrue((end - start) < 30 * 1000);
121
122    // Let's kill the first location. But actually the fist location returned will change
123    // The first thing to do is to get the location, then the port
124    FileStatus f = dfs.getFileStatus(p);
125    BlockLocation[] lbs;
126    do {
127      lbs = dfs.getFileBlockLocations(f, 0, 1);
128    } while (lbs.length != 1 && lbs[0].getLength() != repCount);
129    final String name = lbs[0].getNames()[0];
130    Assert.assertTrue(name.indexOf(':') > 0);
131    String portS = name.substring(name.indexOf(':') + 1);
132    final int port = Integer.parseInt(portS);
133    LOG.info("port= " + port);
134    int ipcPort = -1;
135
136    // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
137    // to iterate ourselves.
138    boolean ok = false;
139    final String lookup = lbs[0].getHosts()[0];
140    StringBuilder sb = new StringBuilder();
141    for (DataNode dn : cluster.getDataNodes()) {
142      final String dnName = getHostName(dn);
143      sb.append(dnName).append(' ');
144      if (lookup.equals(dnName)) {
145        ok = true;
146        LOG.info("killing datanode " + name + " / " + lookup);
147        ipcPort = dn.ipcServer.getListenerAddress().getPort();
148        dn.shutdown();
149        LOG.info("killed datanode " + name + " / " + lookup);
150        break;
151      }
152    }
153    Assert.assertTrue("didn't find the server to kill, was looking for " + lookup + " found " + sb,
154      ok);
155    LOG.info("ipc port= " + ipcPort);
156
157    // Add the hook, with an implementation checking that we don't use the port we've just killed.
158    Assert
159      .assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() {
160        @Override
161        public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
162          for (LocatedBlock lb : lbs.getLocatedBlocks()) {
163            if (lb.getLocations().length > 1) {
164              DatanodeInfo[] infos = lb.getLocations();
165              if (infos[0].getHostName().equals(lookup)) {
166                LOG.info("HFileSystem bad host, inverting");
167                DatanodeInfo tmp = infos[0];
168                infos[0] = infos[1];
169                infos[1] = tmp;
170              }
171            }
172          }
173        }
174      }));
175
176    final int retries = 10;
177    ServerSocket ss = null;
178    ServerSocket ssI;
179    try {
180      ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
181      ssI = new ServerSocket(ipcPort);
182    } catch (BindException be) {
183      LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort
184        + ", this means that the datanode has not closed the socket or"
185        + " someone else took it. It may happen, skipping this test for this time.", be);
186      if (ss != null) {
187        ss.close();
188      }
189      return;
190    }
191
192    // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
193    // so we try retries times; with the reorder it will never last more than a few milli seconds
194    for (int i = 0; i < retries; i++) {
195      start = EnvironmentEdgeManager.currentTime();
196      fin = dfs.open(p);
197      Assert.assertTrue(toWrite == fin.readDouble());
198      fin.close();
199      end = EnvironmentEdgeManager.currentTime();
200      LOG.info("HFileSystem readtime= " + (end - start));
201      Assert.assertFalse("We took too much time to read", (end - start) > 60000);
202    }
203
204    ss.close();
205    ssI.close();
206  }
207
208  /**
209   * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
210   */
211  private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
212    Method m;
213    try {
214      m = DataNode.class.getMethod("getDisplayName");
215    } catch (NoSuchMethodException e) {
216      try {
217        m = DataNode.class.getMethod("getHostName");
218      } catch (NoSuchMethodException e1) {
219        throw new RuntimeException(e1);
220      }
221    }
222
223    String res = (String) m.invoke(dn);
224    if (res.contains(":")) {
225      return res.split(":")[0];
226    } else {
227      return res;
228    }
229  }
230
231}