001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.net.BindException; 023import java.net.ServerSocket; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.BlockLocation; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MiscTests; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.hadoop.hdfs.MiniDFSCluster; 038import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 039import org.apache.hadoop.hdfs.protocol.LocatedBlock; 040import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 041import org.apache.hadoop.hdfs.server.datanode.DataNode; 042import org.junit.After; 043import org.junit.Assert; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting / 055 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be 056 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs 057 * shutdown hook' error in region server. 058 */ 059@Category({ MiscTests.class, LargeTests.class }) 060public class TestBlockReorder { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestBlockReorder.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class); 067 068 private Configuration conf; 069 private MiniDFSCluster cluster; 070 private HBaseTestingUtility htu; 071 private DistributedFileSystem dfs; 072 private static final String host1 = "host1"; 073 private static final String host2 = "host2"; 074 private static final String host3 = "host3"; 075 076 @Rule 077 public TestName name = new TestName(); 078 079 @Before 080 public void setUp() throws Exception { 081 htu = new HBaseTestingUtility(); 082 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 083 htu.getConfiguration().setInt("dfs.replication", 3); 084 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 085 new String[] { host1, host2, host3 }); 086 087 conf = htu.getConfiguration(); 088 cluster = htu.getDFSCluster(); 089 dfs = (DistributedFileSystem) FileSystem.get(conf); 090 } 091 092 @After 093 public void tearDownAfterClass() throws Exception { 094 htu.shutdownMiniCluster(); 095 } 096 097 /** 098 * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS. 099 */ 100 @Test 101 public void testBlockLocationReorder() throws Exception { 102 Path p = new Path("hello"); 103 104 Assert.assertTrue((short) cluster.getDataNodes().size() > 1); 105 final int repCount = 2; 106 107 // Let's write the file 108 FSDataOutputStream fop = dfs.create(p, (short) repCount); 109 final double toWrite = 875.5613; 110 fop.writeDouble(toWrite); 111 fop.close(); 112 113 // Let's check we can read it when everybody's there 114 long start = EnvironmentEdgeManager.currentTime(); 115 FSDataInputStream fin = dfs.open(p); 116 Assert.assertTrue(toWrite == fin.readDouble()); 117 long end = EnvironmentEdgeManager.currentTime(); 118 LOG.info("readtime= " + (end - start)); 119 fin.close(); 120 Assert.assertTrue((end - start) < 30 * 1000); 121 122 // Let's kill the first location. But actually the fist location returned will change 123 // The first thing to do is to get the location, then the port 124 FileStatus f = dfs.getFileStatus(p); 125 BlockLocation[] lbs; 126 do { 127 lbs = dfs.getFileBlockLocations(f, 0, 1); 128 } while (lbs.length != 1 && lbs[0].getLength() != repCount); 129 final String name = lbs[0].getNames()[0]; 130 Assert.assertTrue(name.indexOf(':') > 0); 131 String portS = name.substring(name.indexOf(':') + 1); 132 final int port = Integer.parseInt(portS); 133 LOG.info("port= " + port); 134 int ipcPort = -1; 135 136 // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need 137 // to iterate ourselves. 138 boolean ok = false; 139 final String lookup = lbs[0].getHosts()[0]; 140 StringBuilder sb = new StringBuilder(); 141 for (DataNode dn : cluster.getDataNodes()) { 142 final String dnName = getHostName(dn); 143 sb.append(dnName).append(' '); 144 if (lookup.equals(dnName)) { 145 ok = true; 146 LOG.info("killing datanode " + name + " / " + lookup); 147 ipcPort = dn.ipcServer.getListenerAddress().getPort(); 148 dn.shutdown(); 149 LOG.info("killed datanode " + name + " / " + lookup); 150 break; 151 } 152 } 153 Assert.assertTrue("didn't find the server to kill, was looking for " + lookup + " found " + sb, 154 ok); 155 LOG.info("ipc port= " + ipcPort); 156 157 // Add the hook, with an implementation checking that we don't use the port we've just killed. 158 Assert 159 .assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() { 160 @Override 161 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) { 162 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 163 if (lb.getLocations().length > 1) { 164 DatanodeInfo[] infos = lb.getLocations(); 165 if (infos[0].getHostName().equals(lookup)) { 166 LOG.info("HFileSystem bad host, inverting"); 167 DatanodeInfo tmp = infos[0]; 168 infos[0] = infos[1]; 169 infos[1] = tmp; 170 } 171 } 172 } 173 } 174 })); 175 176 final int retries = 10; 177 ServerSocket ss = null; 178 ServerSocket ssI; 179 try { 180 ss = new ServerSocket(port);// We're taking the port to have a timeout issue later. 181 ssI = new ServerSocket(ipcPort); 182 } catch (BindException be) { 183 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort 184 + ", this means that the datanode has not closed the socket or" 185 + " someone else took it. It may happen, skipping this test for this time.", be); 186 if (ss != null) { 187 ss.close(); 188 } 189 return; 190 } 191 192 // Now it will fail with a timeout, unfortunately it does not always connect to the same box, 193 // so we try retries times; with the reorder it will never last more than a few milli seconds 194 for (int i = 0; i < retries; i++) { 195 start = EnvironmentEdgeManager.currentTime(); 196 fin = dfs.open(p); 197 Assert.assertTrue(toWrite == fin.readDouble()); 198 fin.close(); 199 end = EnvironmentEdgeManager.currentTime(); 200 LOG.info("HFileSystem readtime= " + (end - start)); 201 Assert.assertFalse("We took too much time to read", (end - start) > 60000); 202 } 203 204 ss.close(); 205 ssI.close(); 206 } 207 208 /** 209 * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) 210 */ 211 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { 212 Method m; 213 try { 214 m = DataNode.class.getMethod("getDisplayName"); 215 } catch (NoSuchMethodException e) { 216 try { 217 m = DataNode.class.getMethod("getHostName"); 218 } catch (NoSuchMethodException e1) { 219 throw new RuntimeException(e1); 220 } 221 } 222 223 String res = (String) m.invoke(dn); 224 if (res.contains(":")) { 225 return res.split(":")[0]; 226 } else { 227 return res; 228 } 229 } 230 231}