001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.net.BindException; 023import java.net.ServerSocket; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.BlockLocation; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MiscTests; 035import org.apache.hadoop.hdfs.DistributedFileSystem; 036import org.apache.hadoop.hdfs.MiniDFSCluster; 037import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 038import org.apache.hadoop.hdfs.protocol.LocatedBlock; 039import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 040import org.apache.hadoop.hdfs.server.datanode.DataNode; 041import org.junit.After; 042import org.junit.Assert; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Tests for the hdfs fix from HBASE-6435. 054 * 055 * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class. 056 * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager 057 * in hadoop 3. 058 * This leads to 'Failed suppression of fs shutdown hook' error in region server. 059 */ 060@Category({MiscTests.class, LargeTests.class}) 061public class TestBlockReorder { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestBlockReorder.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class); 068 069 private Configuration conf; 070 private MiniDFSCluster cluster; 071 private HBaseTestingUtility htu; 072 private DistributedFileSystem dfs; 073 private static final String host1 = "host1"; 074 private static final String host2 = "host2"; 075 private static final String host3 = "host3"; 076 077 @Rule 078 public TestName name = new TestName(); 079 080 @Before 081 public void setUp() throws Exception { 082 htu = new HBaseTestingUtility(); 083 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 084 htu.getConfiguration().setInt("dfs.replication", 3); 085 htu.startMiniDFSCluster(3, 086 new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3}); 087 088 conf = htu.getConfiguration(); 089 cluster = htu.getDFSCluster(); 090 dfs = (DistributedFileSystem) FileSystem.get(conf); 091 } 092 093 @After 094 public void tearDownAfterClass() throws Exception { 095 htu.shutdownMiniCluster(); 096 } 097 098 /** 099 * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS. 100 */ 101 @Test 102 public void testBlockLocationReorder() throws Exception { 103 Path p = new Path("hello"); 104 105 Assert.assertTrue((short) cluster.getDataNodes().size() > 1); 106 final int repCount = 2; 107 108 // Let's write the file 109 FSDataOutputStream fop = dfs.create(p, (short) repCount); 110 final double toWrite = 875.5613; 111 fop.writeDouble(toWrite); 112 fop.close(); 113 114 // Let's check we can read it when everybody's there 115 long start = System.currentTimeMillis(); 116 FSDataInputStream fin = dfs.open(p); 117 Assert.assertTrue(toWrite == fin.readDouble()); 118 long end = System.currentTimeMillis(); 119 LOG.info("readtime= " + (end - start)); 120 fin.close(); 121 Assert.assertTrue((end - start) < 30 * 1000); 122 123 // Let's kill the first location. But actually the fist location returned will change 124 // The first thing to do is to get the location, then the port 125 FileStatus f = dfs.getFileStatus(p); 126 BlockLocation[] lbs; 127 do { 128 lbs = dfs.getFileBlockLocations(f, 0, 1); 129 } while (lbs.length != 1 && lbs[0].getLength() != repCount); 130 final String name = lbs[0].getNames()[0]; 131 Assert.assertTrue(name.indexOf(':') > 0); 132 String portS = name.substring(name.indexOf(':') + 1); 133 final int port = Integer.parseInt(portS); 134 LOG.info("port= " + port); 135 int ipcPort = -1; 136 137 // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need 138 // to iterate ourselves. 139 boolean ok = false; 140 final String lookup = lbs[0].getHosts()[0]; 141 StringBuilder sb = new StringBuilder(); 142 for (DataNode dn : cluster.getDataNodes()) { 143 final String dnName = getHostName(dn); 144 sb.append(dnName).append(' '); 145 if (lookup.equals(dnName)) { 146 ok = true; 147 LOG.info("killing datanode " + name + " / " + lookup); 148 ipcPort = dn.ipcServer.getListenerAddress().getPort(); 149 dn.shutdown(); 150 LOG.info("killed datanode " + name + " / " + lookup); 151 break; 152 } 153 } 154 Assert.assertTrue( 155 "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok); 156 LOG.info("ipc port= " + ipcPort); 157 158 // Add the hook, with an implementation checking that we don't use the port we've just killed. 159 Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, 160 new HFileSystem.ReorderBlocks() { 161 @Override 162 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) { 163 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 164 if (lb.getLocations().length > 1) { 165 DatanodeInfo[] infos = lb.getLocations(); 166 if (infos[0].getHostName().equals(lookup)) { 167 LOG.info("HFileSystem bad host, inverting"); 168 DatanodeInfo tmp = infos[0]; 169 infos[0] = infos[1]; 170 infos[1] = tmp; 171 } 172 } 173 } 174 } 175 })); 176 177 178 final int retries = 10; 179 ServerSocket ss = null; 180 ServerSocket ssI; 181 try { 182 ss = new ServerSocket(port);// We're taking the port to have a timeout issue later. 183 ssI = new ServerSocket(ipcPort); 184 } catch (BindException be) { 185 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort + 186 ", this means that the datanode has not closed the socket or" + 187 " someone else took it. It may happen, skipping this test for this time.", be); 188 if (ss != null) { 189 ss.close(); 190 } 191 return; 192 } 193 194 // Now it will fail with a timeout, unfortunately it does not always connect to the same box, 195 // so we try retries times; with the reorder it will never last more than a few milli seconds 196 for (int i = 0; i < retries; i++) { 197 start = System.currentTimeMillis(); 198 199 fin = dfs.open(p); 200 Assert.assertTrue(toWrite == fin.readDouble()); 201 fin.close(); 202 end = System.currentTimeMillis(); 203 LOG.info("HFileSystem readtime= " + (end - start)); 204 Assert.assertFalse("We took too much time to read", (end - start) > 60000); 205 } 206 207 ss.close(); 208 ssI.close(); 209 } 210 211 /** 212 * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) 213 */ 214 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { 215 Method m; 216 try { 217 m = DataNode.class.getMethod("getDisplayName"); 218 } catch (NoSuchMethodException e) { 219 try { 220 m = DataNode.class.getMethod("getHostName"); 221 } catch (NoSuchMethodException e1) { 222 throw new RuntimeException(e1); 223 } 224 } 225 226 String res = (String) m.invoke(dn); 227 if (res.contains(":")) { 228 return res.split(":")[0]; 229 } else { 230 return res; 231 } 232 } 233 234}