001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.lang.reflect.Field;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.BlockLocation;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.MiniHBaseCluster;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.Region;
041import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MiscTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hdfs.DFSClient;
048import org.apache.hadoop.hdfs.DistributedFileSystem;
049import org.apache.hadoop.hdfs.MiniDFSCluster;
050import org.apache.hadoop.hdfs.protocol.ClientProtocol;
051import org.apache.hadoop.hdfs.protocol.DirectoryListing;
052import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
053import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
054import org.apache.hadoop.ipc.RemoteException;
055import org.junit.After;
056import org.junit.Assert;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting /
068 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be
069 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs
070 * shutdown hook' error in region server.
071 */
072@Category({ MiscTests.class, LargeTests.class })
073public class TestBlockReorderMultiBlocks {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestBlockReorderMultiBlocks.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class);
080
081  private Configuration conf;
082  private MiniDFSCluster cluster;
083  private HBaseTestingUtility htu;
084  private DistributedFileSystem dfs;
085  private static final String host1 = "host1";
086  private static final String host2 = "host2";
087  private static final String host3 = "host3";
088
089  @Rule
090  public TestName name = new TestName();
091
092  @Before
093  public void setUp() throws Exception {
094    htu = new HBaseTestingUtility();
095    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
096    htu.getConfiguration().setInt("dfs.replication", 3);
097    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
098      new String[] { host1, host2, host3 });
099
100    conf = htu.getConfiguration();
101    cluster = htu.getDFSCluster();
102    dfs = (DistributedFileSystem) FileSystem.get(conf);
103  }
104
105  @After
106  public void tearDownAfterClass() throws Exception {
107    htu.shutdownMiniCluster();
108  }
109
110  /**
111   * Test that the hook works within HBase, including when there are multiple blocks.
112   */
113  @Test()
114  public void testHBaseCluster() throws Exception {
115    byte[] sb = Bytes.toBytes("sb");
116    htu.startMiniZKCluster();
117
118    MiniHBaseCluster hbm = htu.startMiniHBaseCluster();
119    hbm.waitForActiveAndReadyMaster();
120    HRegionServer targetRs =
121      LoadBalancer.isTablesOnMaster(hbm.getConf()) ? hbm.getMaster() : hbm.getRegionServer(0);
122
123    // We want to have a datanode with the same name as the region server, so
124    // we're going to get the regionservername, and start a new datanode with this name.
125    String host4 = targetRs.getServerName().getHostname();
126    LOG.info("Starting a new datanode with the name=" + host4);
127    cluster.startDataNodes(conf, 1, true, null, new String[] { "/r4" }, new String[] { host4 },
128      null);
129    cluster.waitClusterUp();
130
131    final int repCount = 3;
132
133    // We use the regionserver file system & conf as we expect it to have the hook.
134    conf = targetRs.getConfiguration();
135    HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
136    Table h = htu.createTable(TableName.valueOf(name.getMethodName()), sb);
137
138    // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
139    // with the same node will be used. We can't really stop an existing datanode, this would
140    // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
141
142    // Now we need to find the log file, its locations, and look at it
143
144    String rootDir =
145      new Path(CommonFSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/"
146        + targetRs.getServerName().toString()).toUri().getPath();
147
148    DistributedFileSystem mdfs =
149      (DistributedFileSystem) hbm.getMaster().getMasterFileSystem().getFileSystem();
150
151    int nbTest = 0;
152    while (nbTest < 10) {
153      final List<HRegion> regions = targetRs.getRegions(h.getName());
154      final CountDownLatch latch = new CountDownLatch(regions.size());
155      // listen for successful log rolls
156      final WALActionsListener listener = new WALActionsListener() {
157        @Override
158        public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
159          latch.countDown();
160        }
161      };
162      for (HRegion region : regions) {
163        region.getWAL().registerWALActionsListener(listener);
164      }
165
166      htu.getAdmin().rollWALWriter(targetRs.getServerName());
167
168      // wait
169      try {
170        latch.await();
171      } catch (InterruptedException exception) {
172        LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later "
173          + "tests fail, it's probably because we should still be waiting.");
174        Thread.currentThread().interrupt();
175      }
176      for (Region region : regions) {
177        ((HRegion) region).getWAL().unregisterWALActionsListener(listener);
178      }
179
180      // We need a sleep as the namenode is informed asynchronously
181      Thread.sleep(100);
182
183      // insert one put to ensure a minimal size
184      Put p = new Put(sb);
185      p.addColumn(sb, sb, sb);
186      h.put(p);
187
188      DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
189      HdfsFileStatus[] hfs = dl.getPartialListing();
190
191      // As we wrote a put, we should have at least one log file.
192      Assert.assertTrue(hfs.length >= 1);
193      for (HdfsFileStatus hf : hfs) {
194        // Because this is a live cluster, log files might get archived while we're processing
195        try {
196          LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
197          String logFile = rootDir + "/" + hf.getLocalName();
198          FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
199
200          LOG.info("Checking log file: " + logFile);
201          // Now checking that the hook is up and running
202          // We can't call directly getBlockLocations, it's not available in HFileSystem
203          // We're trying multiple times to be sure, as the order is random
204
205          BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
206          if (bls.length > 0) {
207            BlockLocation bl = bls[0];
208
209            LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
210            for (int i = 0; i < bl.getHosts().length - 1; i++) {
211              LOG.info(bl.getHosts()[i] + "    " + logFile);
212              Assert.assertNotSame(bl.getHosts()[i], host4);
213            }
214            String last = bl.getHosts()[bl.getHosts().length - 1];
215            LOG.info(last + "    " + logFile);
216            if (host4.equals(last)) {
217              nbTest++;
218              LOG.info(logFile + " is on the new datanode and is ok");
219              if (bl.getHosts().length == 3) {
220                // We can test this case from the file system as well
221                // Checking the underlying file system. Multiple times as the order is random
222                testFromDFS(dfs, logFile, repCount, host4);
223
224                // now from the master
225                testFromDFS(mdfs, logFile, repCount, host4);
226              }
227            }
228          }
229        } catch (FileNotFoundException exception) {
230          LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
231            + "archived out from under us so we'll ignore and retry. If this test hangs "
232            + "indefinitely you should treat this failure as a symptom.", exception);
233        } catch (RemoteException exception) {
234          if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
235            LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was "
236              + "archived out from under us so we'll ignore and retry. If this test hangs "
237              + "indefinitely you should treat this failure as a symptom.", exception);
238          } else {
239            throw exception;
240          }
241        }
242      }
243    }
244  }
245
246  private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
247    throws Exception {
248    // Multiple times as the order is random
249    for (int i = 0; i < 10; i++) {
250      LocatedBlocks l;
251      // The NN gets the block list asynchronously, so we may need multiple tries to get the list
252      final long max = EnvironmentEdgeManager.currentTime() + 10000;
253      boolean done;
254      do {
255        Assert.assertTrue("Can't get enouth replica", EnvironmentEdgeManager.currentTime() < max);
256        l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
257        Assert.assertNotNull("Can't get block locations for " + src, l);
258        Assert.assertNotNull(l.getLocatedBlocks());
259        Assert.assertTrue(l.getLocatedBlocks().size() > 0);
260
261        done = true;
262        for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
263          done = (l.get(y).getLocations().length == repCount);
264        }
265      } while (!done);
266
267      for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
268        Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
269      }
270    }
271  }
272
273  private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
274    Field nf = DFSClient.class.getDeclaredField("namenode");
275    nf.setAccessible(true);
276    return (ClientProtocol) nf.get(dfsc);
277  }
278
279}