001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.net.BindException;
023import java.net.ServerSocket;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.BlockLocation;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
039import org.apache.hadoop.hdfs.protocol.LocatedBlock;
040import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
041import org.apache.hadoop.hdfs.server.datanode.DataNode;
042import org.junit.After;
043import org.junit.Assert;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.rules.TestName;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tests for the hdfs fix from HBASE-6435.
055 *
056 * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class.
057 * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager
058 *   in hadoop 3.
059 * This leads to 'Failed suppression of fs shutdown hook' error in region server.
060 */
061@Category({MiscTests.class, LargeTests.class})
062public class TestBlockReorder {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestBlockReorder.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
069
070  private Configuration conf;
071  private MiniDFSCluster cluster;
072  private HBaseTestingUtil htu;
073  private DistributedFileSystem dfs;
074  private static final String host1 = "host1";
075  private static final String host2 = "host2";
076  private static final String host3 = "host3";
077
078  @Rule
079  public TestName name = new TestName();
080
081  @Before
082  public void setUp() throws Exception {
083    htu = new HBaseTestingUtil();
084    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
085    htu.getConfiguration().setInt("dfs.replication", 3);
086    htu.startMiniDFSCluster(3,
087        new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
088
089    conf = htu.getConfiguration();
090    cluster = htu.getDFSCluster();
091    dfs = (DistributedFileSystem) FileSystem.get(conf);
092  }
093
094  @After
095  public void tearDownAfterClass() throws Exception {
096    htu.shutdownMiniCluster();
097  }
098
099  /**
100   * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
101   */
102  @Test
103  public void testBlockLocationReorder() throws Exception {
104    Path p = new Path("hello");
105
106    Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
107    final int repCount = 2;
108
109    // Let's write the file
110    FSDataOutputStream fop = dfs.create(p, (short) repCount);
111    final double toWrite = 875.5613;
112    fop.writeDouble(toWrite);
113    fop.close();
114
115    // Let's check we can read it when everybody's there
116    long start = EnvironmentEdgeManager.currentTime();
117    FSDataInputStream fin = dfs.open(p);
118    Assert.assertTrue(toWrite == fin.readDouble());
119    long end = EnvironmentEdgeManager.currentTime();
120    LOG.info("readtime= " + (end - start));
121    fin.close();
122    Assert.assertTrue((end - start) < 30 * 1000);
123
124    // Let's kill the first location. But actually the fist location returned will change
125    // The first thing to do is to get the location, then the port
126    FileStatus f = dfs.getFileStatus(p);
127    BlockLocation[] lbs;
128    do {
129      lbs = dfs.getFileBlockLocations(f, 0, 1);
130    } while (lbs.length != 1 && lbs[0].getLength() != repCount);
131    final String name = lbs[0].getNames()[0];
132    Assert.assertTrue(name.indexOf(':') > 0);
133    String portS = name.substring(name.indexOf(':') + 1);
134    final int port = Integer.parseInt(portS);
135    LOG.info("port= " + port);
136    int ipcPort = -1;
137
138    // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
139    // to iterate ourselves.
140    boolean ok = false;
141    final String lookup = lbs[0].getHosts()[0];
142    StringBuilder sb = new StringBuilder();
143    for (DataNode dn : cluster.getDataNodes()) {
144      final String dnName = getHostName(dn);
145      sb.append(dnName).append(' ');
146      if (lookup.equals(dnName)) {
147        ok = true;
148        LOG.info("killing datanode " + name + " / " + lookup);
149        ipcPort = dn.ipcServer.getListenerAddress().getPort();
150        dn.shutdown();
151        LOG.info("killed datanode " + name + " / " + lookup);
152        break;
153      }
154    }
155    Assert.assertTrue(
156        "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
157    LOG.info("ipc port= " + ipcPort);
158
159    // Add the hook, with an implementation checking that we don't use the port we've just killed.
160    Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
161        new HFileSystem.ReorderBlocks() {
162          @Override
163          public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
164            for (LocatedBlock lb : lbs.getLocatedBlocks()) {
165              if (lb.getLocations().length > 1) {
166                DatanodeInfo[] infos = lb.getLocations();
167                if (infos[0].getHostName().equals(lookup)) {
168                  LOG.info("HFileSystem bad host, inverting");
169                  DatanodeInfo tmp = infos[0];
170                  infos[0] = infos[1];
171                  infos[1] = tmp;
172                }
173              }
174            }
175          }
176        }));
177
178
179    final int retries = 10;
180    ServerSocket ss = null;
181    ServerSocket ssI;
182    try {
183      ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
184      ssI = new ServerSocket(ipcPort);
185    } catch (BindException be) {
186      LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
187          ", this means that the datanode has not closed the socket or" +
188          " someone else took it. It may happen, skipping this test for this time.", be);
189      if (ss != null) {
190        ss.close();
191      }
192      return;
193    }
194
195    // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
196    // so we try retries times;  with the reorder it will never last more than a few milli seconds
197    for (int i = 0; i < retries; i++) {
198      start = EnvironmentEdgeManager.currentTime();
199      fin = dfs.open(p);
200      Assert.assertTrue(toWrite == fin.readDouble());
201      fin.close();
202      end = EnvironmentEdgeManager.currentTime();
203      LOG.info("HFileSystem readtime= " + (end - start));
204      Assert.assertFalse("We took too much time to read", (end - start) > 60000);
205    }
206
207    ss.close();
208    ssI.close();
209  }
210
211  /**
212   * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
213   */
214  private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
215    Method m;
216    try {
217      m = DataNode.class.getMethod("getDisplayName");
218    } catch (NoSuchMethodException e) {
219      try {
220        m = DataNode.class.getMethod("getHostName");
221      } catch (NoSuchMethodException e1) {
222        throw new RuntimeException(e1);
223      }
224    }
225
226    String res = (String) m.invoke(dn);
227    if (res.contains(":")) {
228      return res.split(":")[0];
229    } else {
230      return res;
231    }
232  }
233
234}