001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNotSame;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.BlockLocation;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.regionserver.Region;
045import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MiscTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hdfs.DFSClient;
052import org.apache.hadoop.hdfs.DistributedFileSystem;
053import org.apache.hadoop.hdfs.MiniDFSCluster;
054import org.apache.hadoop.hdfs.protocol.ClientProtocol;
055import org.apache.hadoop.hdfs.protocol.DirectoryListing;
056import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
057import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
058import org.apache.hadoop.ipc.RemoteException;
059import org.junit.jupiter.api.AfterEach;
060import org.junit.jupiter.api.BeforeEach;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
069 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
070 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
071 * shutdown hook' error in region server.
072 */
073@Tag(MiscTests.TAG)
074@Tag(LargeTests.TAG)
075public class TestBlockReorderMultiBlocks {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class);
078
079  private Configuration conf;
080  private MiniDFSCluster cluster;
081  private HBaseTestingUtil htu;
082  private DistributedFileSystem dfs;
083  private static final String host1 = "host1";
084  private static final String host2 = "host2";
085  private static final String host3 = "host3";
086
087  @BeforeEach
088  public void setUp() throws Exception {
089    htu = new HBaseTestingUtil();
090    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
091    htu.getConfiguration().setInt("dfs.replication", 3);
092    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
093      new String[] { host1, host2, host3 });
094
095    conf = htu.getConfiguration();
096    cluster = htu.getDFSCluster();
097    dfs = (DistributedFileSystem) FileSystem.get(conf);
098  }
099
100  @AfterEach
101  public void tearDownAfterClass() throws Exception {
102    htu.shutdownMiniCluster();
103  }
104
105  /**
106   * Test that the hook works within HBase, including when there are multiple blocks.
107   */
108  @Test()
109  public void testHBaseCluster(TestInfo testInfo) throws Exception {
110    byte[] sb = Bytes.toBytes("sb");
111    htu.startMiniZKCluster();
112
113    SingleProcessHBaseCluster hbm = htu.startMiniHBaseCluster();
114    hbm.waitForActiveAndReadyMaster();
115    HRegionServer targetRs = hbm.getRegionServer(0);
116
117    // We want to have a datanode with the same name as the region server, so
118    // we're going to get the regionservername, and start a new datanode with this name.
119    String host4 = targetRs.getServerName().getHostname();
120    LOG.info("Starting a new datanode with the name=" + host4);
121    cluster.startDataNodes(conf, 1, true, null, new String[] { "/r4" }, new String[] { host4 },
122      null);
123    cluster.waitClusterUp();
124
125    final int repCount = 3;
126
127    // We use the regionserver file system & conf as we expect it to have the hook.
128    conf = targetRs.getConfiguration();
129    HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
130    Table h = htu.createTable(TableName.valueOf(testInfo.getTestMethod().get().getName()), sb);
131
132    // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
133    // with the same node will be used. We can't really stop an existing datanode, this would
134    // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
135
136    // Now we need to find the log file, its locations, and look at it
137
138    String rootDir =
139      new Path(CommonFSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/"
140        + targetRs.getServerName().toString()).toUri().getPath();
141
142    DistributedFileSystem mdfs =
143      (DistributedFileSystem) hbm.getMaster().getMasterFileSystem().getFileSystem();
144
145    int nbTest = 0;
146    while (nbTest < 10) {
147      final List<HRegion> regions = targetRs.getRegions(h.getName());
148      final CountDownLatch latch = new CountDownLatch(regions.size());
149      // listen for successful log rolls
150      final WALActionsListener listener = new WALActionsListener() {
151        @Override
152        public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
153          latch.countDown();
154        }
155      };
156      for (HRegion region : regions) {
157        region.getWAL().registerWALActionsListener(listener);
158      }
159
160      htu.getAdmin().rollWALWriter(targetRs.getServerName());
161
162      // wait
163      try {
164        latch.await();
165      } catch (InterruptedException exception) {
166        LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later "
167          + "tests fail, it's probably because we should still be waiting.");
168        Thread.currentThread().interrupt();
169      }
170      for (Region region : regions) {
171        ((HRegion) region).getWAL().unregisterWALActionsListener(listener);
172      }
173
174      // We need a sleep as the namenode is informed asynchronously
175      Thread.sleep(100);
176
177      // insert one put to ensure a minimal size
178      Put p = new Put(sb);
179      p.addColumn(sb, sb, sb);
180      h.put(p);
181
182      DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
183      HdfsFileStatus[] hfs = dl.getPartialListing();
184
185      // As we wrote a put, we should have at least one log file.
186      assertTrue(hfs.length >= 1);
187      for (HdfsFileStatus hf : hfs) {
188        // Because this is a live cluster, log files might get archived while we're processing
189        try {
190          LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
191          String logFile = rootDir + "/" + hf.getLocalName();
192          FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
193
194          LOG.info("Checking log file: " + logFile);
195          // Now checking that the hook is up and running
196          // We can't call directly getBlockLocations, it's not available in HFileSystem
197          // We're trying multiple times to be sure, as the order is random
198
199          BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
200          if (bls.length > 0) {
201            BlockLocation bl = bls[0];
202
203            LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
204            for (int i = 0; i < bl.getHosts().length - 1; i++) {
205              LOG.info(bl.getHosts()[i] + "    " + logFile);
206              assertNotSame(bl.getHosts()[i], host4);
207            }
208            String last = bl.getHosts()[bl.getHosts().length - 1];
209            LOG.info(last + "    " + logFile);
210            if (host4.equals(last)) {
211              nbTest++;
212              LOG.info(logFile + " is on the new datanode and is ok");
213              if (bl.getHosts().length == 3) {
214                // We can test this case from the file system as well
215                // Checking the underlying file system. Multiple times as the order is random
216                testFromDFS(dfs, logFile, repCount, host4);
217
218                // now from the master
219                testFromDFS(mdfs, logFile, repCount, host4);
220              }
221            }
222          }
223        } catch (FileNotFoundException exception) {
224          LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
225            + "archived out from under us so we'll ignore and retry. If this test hangs "
226            + "indefinitely you should treat this failure as a symptom.", exception);
227        } catch (RemoteException exception) {
228          if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
229            LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
230              + "archived out from under us so we'll ignore and retry. If this test hangs "
231              + "indefinitely you should treat this failure as a symptom.", exception);
232          } else {
233            throw exception;
234          }
235        }
236      }
237    }
238  }
239
240  private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
241    throws Exception {
242    // Multiple times as the order is random
243    for (int i = 0; i < 10; i++) {
244      LocatedBlocks lbs;
245      // The NN gets the block list asynchronously, so we may need multiple tries to get the list
246      final long max = EnvironmentEdgeManager.currentTime() + 10000;
247      boolean done;
248      do {
249        assertTrue(EnvironmentEdgeManager.currentTime() < max, "Can't get enough replica");
250        lbs = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
251        assertNotNull(lbs, "Can't get block locations for " + src);
252        assertNotNull(lbs.getLocatedBlocks());
253        assertTrue(lbs.getLocatedBlocks().size() > 0);
254
255        done = true;
256        for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
257          done = getLocatedBlockLocations(lbs.get(y)).length == repCount;
258        }
259      } while (!done);
260
261      for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
262        assertEquals(localhost, getLocatedBlockLocations(lbs.get(y))[repCount - 1].getHostName());
263      }
264    }
265  }
266
267  private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
268    Field nf = DFSClient.class.getDeclaredField("namenode");
269    nf.setAccessible(true);
270    return (ClientProtocol) nf.get(dfsc);
271  }
272
273}