001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.Field;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.BlockLocation;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.MiniHBaseCluster;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.master.LoadBalancer;
040import org.apache.hadoop.hbase.regionserver.HRegion;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.Region;
043import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.testclassification.MiscTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hdfs.DFSClient;
050import org.apache.hadoop.hdfs.DistributedFileSystem;
051import org.apache.hadoop.hdfs.MiniDFSCluster;
052import org.apache.hadoop.hdfs.protocol.ClientProtocol;
053import org.apache.hadoop.hdfs.protocol.DirectoryListing;
054import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
055import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
056import org.apache.hadoop.ipc.RemoteException;
057import org.junit.After;
058import org.junit.Assert;
059import org.junit.Before;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
070 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
071 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
072 * shutdown hook' error in region server.
073 */
074@Category({ MiscTests.class, LargeTests.class })
075public class TestBlockReorderMultiBlocks {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestBlockReorderMultiBlocks.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class);
082
083  private Configuration conf;
084  private MiniDFSCluster cluster;
085  private HBaseTestingUtility htu;
086  private DistributedFileSystem dfs;
087  private static final String host1 = "host1";
088  private static final String host2 = "host2";
089  private static final String host3 = "host3";
090
091  @Rule
092  public TestName name = new TestName();
093
094  @Before
095  public void setUp() throws Exception {
096    htu = new HBaseTestingUtility();
097    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
098    htu.getConfiguration().setInt("dfs.replication", 3);
099    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
100      new String[] { host1, host2, host3 });
101
102    conf = htu.getConfiguration();
103    cluster = htu.getDFSCluster();
104    dfs = (DistributedFileSystem) FileSystem.get(conf);
105  }
106
107  @After
108  public void tearDownAfterClass() throws Exception {
109    htu.shutdownMiniCluster();
110  }
111
112  /**
113   * Test that the hook works within HBase, including when there are multiple blocks.
114   */
115  @Test()
116  public void testHBaseCluster() throws Exception {
117    byte[] sb = Bytes.toBytes("sb");
118    htu.startMiniZKCluster();
119
120    MiniHBaseCluster hbm = htu.startMiniHBaseCluster();
121    hbm.waitForActiveAndReadyMaster();
122    HRegionServer targetRs =
123      LoadBalancer.isTablesOnMaster(hbm.getConf()) ? hbm.getMaster() : hbm.getRegionServer(0);
124
125    // We want to have a datanode with the same name as the region server, so
126    // we're going to get the regionservername, and start a new datanode with this name.
127    String host4 = targetRs.getServerName().getHostname();
128    LOG.info("Starting a new datanode with the name=" + host4);
129    cluster.startDataNodes(conf, 1, true, null, new String[] { "/r4" }, new String[] { host4 },
130      null);
131    cluster.waitClusterUp();
132
133    final int repCount = 3;
134
135    // We use the regionserver file system & conf as we expect it to have the hook.
136    conf = targetRs.getConfiguration();
137    HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
138    Table h = htu.createTable(TableName.valueOf(name.getMethodName()), sb);
139
140    // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
141    // with the same node will be used. We can't really stop an existing datanode, this would
142    // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
143
144    // Now we need to find the log file, its locations, and look at it
145
146    String rootDir =
147      new Path(CommonFSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/"
148        + targetRs.getServerName().toString()).toUri().getPath();
149
150    DistributedFileSystem mdfs =
151      (DistributedFileSystem) hbm.getMaster().getMasterFileSystem().getFileSystem();
152
153    int nbTest = 0;
154    while (nbTest < 10) {
155      final List<HRegion> regions = targetRs.getRegions(h.getName());
156      final CountDownLatch latch = new CountDownLatch(regions.size());
157      // listen for successful log rolls
158      final WALActionsListener listener = new WALActionsListener() {
159        @Override
160        public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
161          latch.countDown();
162        }
163      };
164      for (HRegion region : regions) {
165        region.getWAL().registerWALActionsListener(listener);
166      }
167
168      htu.getAdmin().rollWALWriter(targetRs.getServerName());
169
170      // wait
171      try {
172        latch.await();
173      } catch (InterruptedException exception) {
174        LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later "
175          + "tests fail, it's probably because we should still be waiting.");
176        Thread.currentThread().interrupt();
177      }
178      for (Region region : regions) {
179        ((HRegion) region).getWAL().unregisterWALActionsListener(listener);
180      }
181
182      // We need a sleep as the namenode is informed asynchronously
183      Thread.sleep(100);
184
185      // insert one put to ensure a minimal size
186      Put p = new Put(sb);
187      p.addColumn(sb, sb, sb);
188      h.put(p);
189
190      DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
191      HdfsFileStatus[] hfs = dl.getPartialListing();
192
193      // As we wrote a put, we should have at least one log file.
194      Assert.assertTrue(hfs.length >= 1);
195      for (HdfsFileStatus hf : hfs) {
196        // Because this is a live cluster, log files might get archived while we're processing
197        try {
198          LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
199          String logFile = rootDir + "/" + hf.getLocalName();
200          FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
201
202          LOG.info("Checking log file: " + logFile);
203          // Now checking that the hook is up and running
204          // We can't call directly getBlockLocations, it's not available in HFileSystem
205          // We're trying multiple times to be sure, as the order is random
206
207          BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
208          if (bls.length > 0) {
209            BlockLocation bl = bls[0];
210
211            LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
212            for (int i = 0; i < bl.getHosts().length - 1; i++) {
213              LOG.info(bl.getHosts()[i] + "    " + logFile);
214              Assert.assertNotSame(bl.getHosts()[i], host4);
215            }
216            String last = bl.getHosts()[bl.getHosts().length - 1];
217            LOG.info(last + "    " + logFile);
218            if (host4.equals(last)) {
219              nbTest++;
220              LOG.info(logFile + " is on the new datanode and is ok");
221              if (bl.getHosts().length == 3) {
222                // We can test this case from the file system as well
223                // Checking the underlying file system. Multiple times as the order is random
224                testFromDFS(dfs, logFile, repCount, host4);
225
226                // now from the master
227                testFromDFS(mdfs, logFile, repCount, host4);
228              }
229            }
230          }
231        } catch (FileNotFoundException exception) {
232          LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
233            + "archived out from under us so we'll ignore and retry. If this test hangs "
234            + "indefinitely you should treat this failure as a symptom.", exception);
235        } catch (RemoteException exception) {
236          if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
237            LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
238              + "archived out from under us so we'll ignore and retry. If this test hangs "
239              + "indefinitely you should treat this failure as a symptom.", exception);
240          } else {
241            throw exception;
242          }
243        }
244      }
245    }
246  }
247
248  private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
249    throws Exception {
250    // Multiple times as the order is random
251    for (int i = 0; i < 10; i++) {
252      LocatedBlocks lbs;
253      // The NN gets the block list asynchronously, so we may need multiple tries to get the list
254      final long max = EnvironmentEdgeManager.currentTime() + 10000;
255      boolean done;
256      do {
257        Assert.assertTrue("Can't get enouth replica", EnvironmentEdgeManager.currentTime() < max);
258        lbs = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
259        Assert.assertNotNull("Can't get block locations for " + src, lbs);
260        Assert.assertNotNull(lbs.getLocatedBlocks());
261        Assert.assertTrue(lbs.getLocatedBlocks().size() > 0);
262
263        done = true;
264        for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
265          done = getLocatedBlockLocations(lbs.get(y)).length == repCount;
266        }
267      } while (!done);
268
269      for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
270        Assert.assertEquals(localhost,
271          getLocatedBlockLocations(lbs.get(y))[repCount - 1].getHostName());
272      }
273    }
274  }
275
276  private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
277    Field nf = DFSClient.class.getDeclaredField("namenode");
278    nf.setAccessible(true);
279    return (ClientProtocol) nf.get(dfsc);
280  }
281
282}