001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.lang.reflect.InvocationTargetException; 025import java.lang.reflect.Method; 026import java.net.BindException; 027import java.net.ServerSocket; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.BlockLocation; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.MiscTests; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hdfs.DistributedFileSystem; 040import org.apache.hadoop.hdfs.MiniDFSCluster; 041import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 042import org.apache.hadoop.hdfs.protocol.LocatedBlock; 043import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 044import org.apache.hadoop.hdfs.server.datanode.DataNode; 045import org.junit.jupiter.api.AfterEach; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting / 054 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be 055 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs 056 * shutdown hook' error in region server. 057 */ 058@Tag(MiscTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestBlockReorder { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class); 063 064 private Configuration conf; 065 private MiniDFSCluster cluster; 066 private HBaseTestingUtil htu; 067 private DistributedFileSystem dfs; 068 private static final String host1 = "host1"; 069 private static final String host2 = "host2"; 070 private static final String host3 = "host3"; 071 072 @BeforeEach 073 public void setUp() throws Exception { 074 htu = new HBaseTestingUtil(); 075 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 076 htu.getConfiguration().setInt("dfs.replication", 3); 077 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 078 new String[] { host1, host2, host3 }); 079 080 conf = htu.getConfiguration(); 081 cluster = htu.getDFSCluster(); 082 dfs = (DistributedFileSystem) FileSystem.get(conf); 083 } 084 085 @AfterEach 086 public void tearDownAfterClass() throws Exception { 087 htu.shutdownMiniCluster(); 088 } 089 090 /** 091 * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS. 092 */ 093 @Test 094 public void testBlockLocationReorder() throws Exception { 095 Path p = new Path("hello"); 096 097 assertTrue((short) cluster.getDataNodes().size() > 1); 098 final int repCount = 2; 099 100 // Let's write the file 101 FSDataOutputStream fop = dfs.create(p, (short) repCount); 102 final double toWrite = 875.5613; 103 fop.writeDouble(toWrite); 104 fop.close(); 105 106 // Let's check we can read it when everybody's there 107 long start = EnvironmentEdgeManager.currentTime(); 108 FSDataInputStream fin = dfs.open(p); 109 assertTrue(toWrite == fin.readDouble()); 110 long end = EnvironmentEdgeManager.currentTime(); 111 LOG.info("readtime= " + (end - start)); 112 fin.close(); 113 assertTrue((end - start) < 30 * 1000); 114 115 // Let's kill the first location. But actually the fist location returned will change 116 // The first thing to do is to get the location, then the port 117 FileStatus f = dfs.getFileStatus(p); 118 BlockLocation[] lbs; 119 do { 120 lbs = dfs.getFileBlockLocations(f, 0, 1); 121 } while (lbs.length != 1 && lbs[0].getLength() != repCount); 122 final String name = lbs[0].getNames()[0]; 123 assertTrue(name.indexOf(':') > 0); 124 String portS = name.substring(name.indexOf(':') + 1); 125 final int port = Integer.parseInt(portS); 126 LOG.info("port= " + port); 127 int ipcPort = -1; 128 129 // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need 130 // to iterate ourselves. 131 boolean ok = false; 132 final String lookup = lbs[0].getHosts()[0]; 133 StringBuilder sb = new StringBuilder(); 134 for (DataNode dn : cluster.getDataNodes()) { 135 final String dnName = getHostName(dn); 136 sb.append(dnName).append(' '); 137 if (lookup.equals(dnName)) { 138 ok = true; 139 LOG.info("killing datanode " + name + " / " + lookup); 140 ipcPort = dn.ipcServer.getListenerAddress().getPort(); 141 dn.shutdown(); 142 LOG.info("killed datanode " + name + " / " + lookup); 143 break; 144 } 145 } 146 assertTrue(ok, "didn't find the server to kill, was looking for " + lookup + " found " + sb); 147 LOG.info("ipc port= " + ipcPort); 148 149 // Add the hook, with an implementation checking that we don't use the port we've just killed. 150 assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() { 151 @Override 152 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) { 153 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 154 if (getLocatedBlockLocations(lb).length > 1) { 155 DatanodeInfo[] infos = getLocatedBlockLocations(lb); 156 if (infos[0].getHostName().equals(lookup)) { 157 LOG.info("HFileSystem bad host, inverting"); 158 DatanodeInfo tmp = infos[0]; 159 infos[0] = infos[1]; 160 infos[1] = tmp; 161 } 162 } 163 } 164 } 165 })); 166 167 final int retries = 10; 168 ServerSocket ss = null; 169 ServerSocket ssI; 170 try { 171 ss = new ServerSocket(port);// We're taking the port to have a timeout issue later. 172 ssI = new ServerSocket(ipcPort); 173 } catch (BindException be) { 174 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort 175 + ", this means that the datanode has not closed the socket or" 176 + " someone else took it. It may happen, skipping this test for this time.", be); 177 if (ss != null) { 178 ss.close(); 179 } 180 return; 181 } 182 183 // Now it will fail with a timeout, unfortunately it does not always connect to the same box, 184 // so we try retries times; with the reorder it will never last more than a few milli seconds 185 for (int i = 0; i < retries; i++) { 186 start = EnvironmentEdgeManager.currentTime(); 187 fin = dfs.open(p); 188 assertTrue(toWrite == fin.readDouble()); 189 fin.close(); 190 end = EnvironmentEdgeManager.currentTime(); 191 LOG.info("HFileSystem readtime= " + (end - start)); 192 assertFalse((end - start) > 60000, "We took too much time to read"); 193 } 194 195 ss.close(); 196 ssI.close(); 197 } 198 199 /** 200 * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) 201 */ 202 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { 203 Method m; 204 try { 205 m = DataNode.class.getMethod("getDisplayName"); 206 } catch (NoSuchMethodException e) { 207 try { 208 m = DataNode.class.getMethod("getHostName"); 209 } catch (NoSuchMethodException e1) { 210 throw new RuntimeException(e1); 211 } 212 } 213 214 String res = (String) m.invoke(dn); 215 if (res.contains(":")) { 216 return res.split(":")[0]; 217 } else { 218 return res; 219 } 220 } 221 222}