001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.lang.reflect.Field;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.BlockLocation;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.MiniHBaseCluster;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.Region;
041import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MiscTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hdfs.DFSClient;
047import org.apache.hadoop.hdfs.DistributedFileSystem;
048import org.apache.hadoop.hdfs.MiniDFSCluster;
049import org.apache.hadoop.hdfs.protocol.ClientProtocol;
050import org.apache.hadoop.hdfs.protocol.DirectoryListing;
051import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
052import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
053import org.apache.hadoop.ipc.RemoteException;
054import org.junit.After;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Tests for the hdfs fix from HBASE-6435.
067 *
068 * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class.
069 * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager
070 *   in hadoop 3.
071 * This leads to 'Failed suppression of fs shutdown hook' error in region server.
072 */
073@Category({MiscTests.class, LargeTests.class})
074public class TestBlockReorderMultiBlocks {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestBlockReorderMultiBlocks.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class);
081
082  private Configuration conf;
083  private MiniDFSCluster cluster;
084  private HBaseTestingUtility htu;
085  private DistributedFileSystem dfs;
086  private static final String host1 = "host1";
087  private static final String host2 = "host2";
088  private static final String host3 = "host3";
089
090  @Rule
091  public TestName name = new TestName();
092
093  @Before
094  public void setUp() throws Exception {
095    htu = new HBaseTestingUtility();
096    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
097    htu.getConfiguration().setInt("dfs.replication", 3);
098    htu.startMiniDFSCluster(3,
099        new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
100
101    conf = htu.getConfiguration();
102    cluster = htu.getDFSCluster();
103    dfs = (DistributedFileSystem) FileSystem.get(conf);
104  }
105
106  @After
107  public void tearDownAfterClass() throws Exception {
108    htu.shutdownMiniCluster();
109  }
110
111  /**
112   * Test that the hook works within HBase, including when there are multiple blocks.
113   */
114  @Test()
115  public void testHBaseCluster() throws Exception {
116    byte[] sb = Bytes.toBytes("sb");
117    htu.startMiniZKCluster();
118
119    MiniHBaseCluster hbm = htu.startMiniHBaseCluster();
120    hbm.waitForActiveAndReadyMaster();
121    HRegionServer targetRs = LoadBalancer.isTablesOnMaster(hbm.getConf())? hbm.getMaster():
122      hbm.getRegionServer(0);
123
124    // We want to have a datanode with the same name as the region server, so
125    //  we're going to get the regionservername, and start a new datanode with this name.
126    String host4 = targetRs.getServerName().getHostname();
127    LOG.info("Starting a new datanode with the name=" + host4);
128    cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
129    cluster.waitClusterUp();
130
131    final int repCount = 3;
132
133    // We use the regionserver file system & conf as we expect it to have the hook.
134    conf = targetRs.getConfiguration();
135    HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
136    Table h = htu.createTable(TableName.valueOf(name.getMethodName()), sb);
137
138    // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
139    // with the same node will be used. We can't really stop an existing datanode, this would
140    // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
141
142    // Now we need to find the log file, its locations, and look at it
143
144    String rootDir = new Path(CommonFSUtils.getWALRootDir(conf) + "/" +
145      HConstants.HREGION_LOGDIR_NAME + "/" + targetRs.getServerName().toString()).toUri().getPath();
146
147    DistributedFileSystem mdfs = (DistributedFileSystem)
148        hbm.getMaster().getMasterFileSystem().getFileSystem();
149
150
151    int nbTest = 0;
152    while (nbTest < 10) {
153      final List<HRegion> regions = targetRs.getRegions(h.getName());
154      final CountDownLatch latch = new CountDownLatch(regions.size());
155      // listen for successful log rolls
156      final WALActionsListener listener = new WALActionsListener() {
157            @Override
158            public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
159              latch.countDown();
160            }
161          };
162      for (HRegion region : regions) {
163        region.getWAL().registerWALActionsListener(listener);
164      }
165
166      htu.getAdmin().rollWALWriter(targetRs.getServerName());
167
168      // wait
169      try {
170        latch.await();
171      } catch (InterruptedException exception) {
172        LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
173            "tests fail, it's probably because we should still be waiting.");
174        Thread.currentThread().interrupt();
175      }
176      for (Region region : regions) {
177        ((HRegion)region).getWAL().unregisterWALActionsListener(listener);
178      }
179
180      // We need a sleep as the namenode is informed asynchronously
181      Thread.sleep(100);
182
183      // insert one put to ensure a minimal size
184      Put p = new Put(sb);
185      p.addColumn(sb, sb, sb);
186      h.put(p);
187
188      DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
189      HdfsFileStatus[] hfs = dl.getPartialListing();
190
191      // As we wrote a put, we should have at least one log file.
192      Assert.assertTrue(hfs.length >= 1);
193      for (HdfsFileStatus hf : hfs) {
194        // Because this is a live cluster, log files might get archived while we're processing
195        try {
196          LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
197          String logFile = rootDir + "/" + hf.getLocalName();
198          FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
199
200          LOG.info("Checking log file: " + logFile);
201          // Now checking that the hook is up and running
202          // We can't call directly getBlockLocations, it's not available in HFileSystem
203          // We're trying multiple times to be sure, as the order is random
204
205          BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
206          if (bls.length > 0) {
207            BlockLocation bl = bls[0];
208
209            LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
210            for (int i = 0; i < bl.getHosts().length - 1; i++) {
211              LOG.info(bl.getHosts()[i] + "    " + logFile);
212              Assert.assertNotSame(bl.getHosts()[i], host4);
213            }
214            String last = bl.getHosts()[bl.getHosts().length - 1];
215            LOG.info(last + "    " + logFile);
216            if (host4.equals(last)) {
217              nbTest++;
218              LOG.info(logFile + " is on the new datanode and is ok");
219              if (bl.getHosts().length == 3) {
220                // We can test this case from the file system as well
221                // Checking the underlying file system. Multiple times as the order is random
222                testFromDFS(dfs, logFile, repCount, host4);
223
224                // now from the master
225                testFromDFS(mdfs, logFile, repCount, host4);
226              }
227            }
228          }
229        } catch (FileNotFoundException exception) {
230          LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
231              "archived out from under us so we'll ignore and retry. If this test hangs " +
232              "indefinitely you should treat this failure as a symptom.", exception);
233        } catch (RemoteException exception) {
234          if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
235            LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
236                "archived out from under us so we'll ignore and retry. If this test hangs " +
237                "indefinitely you should treat this failure as a symptom.", exception);
238          } else {
239            throw exception;
240          }
241        }
242      }
243    }
244  }
245
246  private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
247      throws Exception {
248    // Multiple times as the order is random
249    for (int i = 0; i < 10; i++) {
250      LocatedBlocks l;
251      // The NN gets the block list asynchronously, so we may need multiple tries to get the list
252      final long max = System.currentTimeMillis() + 10000;
253      boolean done;
254      do {
255        Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
256        l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
257        Assert.assertNotNull("Can't get block locations for " + src, l);
258        Assert.assertNotNull(l.getLocatedBlocks());
259        Assert.assertTrue(l.getLocatedBlocks().size() > 0);
260
261        done = true;
262        for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
263          done = (l.get(y).getLocations().length == repCount);
264        }
265      } while (!done);
266
267      for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
268        Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
269      }
270    }
271  }
272
273  private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
274    Field nf = DFSClient.class.getDeclaredField("namenode");
275    nf.setAccessible(true);
276    return (ClientProtocol) nf.get(dfsc);
277  }
278
279}