001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.net.BindException;
023import java.net.ServerSocket;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.BlockLocation;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hdfs.DistributedFileSystem;
036import org.apache.hadoop.hdfs.MiniDFSCluster;
037import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
038import org.apache.hadoop.hdfs.protocol.LocatedBlock;
039import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
040import org.apache.hadoop.hdfs.server.datanode.DataNode;
041import org.junit.After;
042import org.junit.Assert;
043import org.junit.Before;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tests for the hdfs fix from HBASE-6435.
054 *
055 * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class.
056 * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager
057 *   in hadoop 3.
058 * This leads to 'Failed suppression of fs shutdown hook' error in region server.
059 */
060@Category({MiscTests.class, LargeTests.class})
061public class TestBlockReorder {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestBlockReorder.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
068
069  private Configuration conf;
070  private MiniDFSCluster cluster;
071  private HBaseTestingUtility htu;
072  private DistributedFileSystem dfs;
073  private static final String host1 = "host1";
074  private static final String host2 = "host2";
075  private static final String host3 = "host3";
076
077  @Rule
078  public TestName name = new TestName();
079
080  @Before
081  public void setUp() throws Exception {
082    htu = new HBaseTestingUtility();
083    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
084    htu.getConfiguration().setInt("dfs.replication", 3);
085    htu.startMiniDFSCluster(3,
086        new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
087
088    conf = htu.getConfiguration();
089    cluster = htu.getDFSCluster();
090    dfs = (DistributedFileSystem) FileSystem.get(conf);
091  }
092
093  @After
094  public void tearDownAfterClass() throws Exception {
095    htu.shutdownMiniCluster();
096  }
097
098  /**
099   * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
100   */
101  @Test
102  public void testBlockLocationReorder() throws Exception {
103    Path p = new Path("hello");
104
105    Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
106    final int repCount = 2;
107
108    // Let's write the file
109    FSDataOutputStream fop = dfs.create(p, (short) repCount);
110    final double toWrite = 875.5613;
111    fop.writeDouble(toWrite);
112    fop.close();
113
114    // Let's check we can read it when everybody's there
115    long start = System.currentTimeMillis();
116    FSDataInputStream fin = dfs.open(p);
117    Assert.assertTrue(toWrite == fin.readDouble());
118    long end = System.currentTimeMillis();
119    LOG.info("readtime= " + (end - start));
120    fin.close();
121    Assert.assertTrue((end - start) < 30 * 1000);
122
123    // Let's kill the first location. But actually the fist location returned will change
124    // The first thing to do is to get the location, then the port
125    FileStatus f = dfs.getFileStatus(p);
126    BlockLocation[] lbs;
127    do {
128      lbs = dfs.getFileBlockLocations(f, 0, 1);
129    } while (lbs.length != 1 && lbs[0].getLength() != repCount);
130    final String name = lbs[0].getNames()[0];
131    Assert.assertTrue(name.indexOf(':') > 0);
132    String portS = name.substring(name.indexOf(':') + 1);
133    final int port = Integer.parseInt(portS);
134    LOG.info("port= " + port);
135    int ipcPort = -1;
136
137    // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
138    // to iterate ourselves.
139    boolean ok = false;
140    final String lookup = lbs[0].getHosts()[0];
141    StringBuilder sb = new StringBuilder();
142    for (DataNode dn : cluster.getDataNodes()) {
143      final String dnName = getHostName(dn);
144      sb.append(dnName).append(' ');
145      if (lookup.equals(dnName)) {
146        ok = true;
147        LOG.info("killing datanode " + name + " / " + lookup);
148        ipcPort = dn.ipcServer.getListenerAddress().getPort();
149        dn.shutdown();
150        LOG.info("killed datanode " + name + " / " + lookup);
151        break;
152      }
153    }
154    Assert.assertTrue(
155        "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
156    LOG.info("ipc port= " + ipcPort);
157
158    // Add the hook, with an implementation checking that we don't use the port we've just killed.
159    Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
160        new HFileSystem.ReorderBlocks() {
161          @Override
162          public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
163            for (LocatedBlock lb : lbs.getLocatedBlocks()) {
164              if (lb.getLocations().length > 1) {
165                DatanodeInfo[] infos = lb.getLocations();
166                if (infos[0].getHostName().equals(lookup)) {
167                  LOG.info("HFileSystem bad host, inverting");
168                  DatanodeInfo tmp = infos[0];
169                  infos[0] = infos[1];
170                  infos[1] = tmp;
171                }
172              }
173            }
174          }
175        }));
176
177
178    final int retries = 10;
179    ServerSocket ss = null;
180    ServerSocket ssI;
181    try {
182      ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
183      ssI = new ServerSocket(ipcPort);
184    } catch (BindException be) {
185      LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
186          ", this means that the datanode has not closed the socket or" +
187          " someone else took it. It may happen, skipping this test for this time.", be);
188      if (ss != null) {
189        ss.close();
190      }
191      return;
192    }
193
194    // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
195    // so we try retries times;  with the reorder it will never last more than a few milli seconds
196    for (int i = 0; i < retries; i++) {
197      start = System.currentTimeMillis();
198
199      fin = dfs.open(p);
200      Assert.assertTrue(toWrite == fin.readDouble());
201      fin.close();
202      end = System.currentTimeMillis();
203      LOG.info("HFileSystem readtime= " + (end - start));
204      Assert.assertFalse("We took too much time to read", (end - start) > 60000);
205    }
206
207    ss.close();
208    ssI.close();
209  }
210
211  /**
212   * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
213   */
214  private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
215    Method m;
216    try {
217      m = DataNode.class.getMethod("getDisplayName");
218    } catch (NoSuchMethodException e) {
219      try {
220        m = DataNode.class.getMethod("getHostName");
221      } catch (NoSuchMethodException e1) {
222        throw new RuntimeException(e1);
223      }
224    }
225
226    String res = (String) m.invoke(dn);
227    if (res.contains(":")) {
228      return res.split(":")[0];
229    } else {
230      return res;
231    }
232  }
233
234}