001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.junit.jupiter.api.Assertions.fail;
027
028import java.io.File;
029import java.io.IOException;
030import java.net.URI;
031import java.net.URISyntaxException;
032import java.util.List;
033import java.util.Random;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.LocalFileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.StreamCapabilities;
042import org.apache.hadoop.fs.permission.FsPermission;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.exceptions.DeserializationException;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.MiscTests;
052import org.apache.hadoop.hdfs.DFSConfigKeys;
053import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
054import org.apache.hadoop.hdfs.DFSTestUtil;
055import org.apache.hadoop.hdfs.DistributedFileSystem;
056import org.apache.hadoop.hdfs.MiniDFSCluster;
057import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
058import org.junit.jupiter.api.BeforeEach;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Test {@link FSUtils}.
066 */
067@Tag(MiscTests.TAG)
068@Tag(MediumTests.TAG)
069public class TestFSUtils {
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
072
073  private HBaseTestingUtil htu;
074  private FileSystem fs;
075  private Configuration conf;
076
077  @BeforeEach
078  public void setUp() throws IOException {
079    htu = new HBaseTestingUtil();
080    fs = htu.getTestFileSystem();
081    conf = htu.getConfiguration();
082  }
083
084  @Test
085  public void testIsHDFS() throws Exception {
086    assertFalse(CommonFSUtils.isHDFS(conf));
087    MiniDFSCluster cluster = null;
088    try {
089      cluster = htu.startMiniDFSCluster(1);
090      assertTrue(CommonFSUtils.isHDFS(conf));
091      assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem()));
092      FSUtils.checkDfsSafeMode(conf);
093    } finally {
094      if (cluster != null) {
095        cluster.shutdown();
096      }
097    }
098  }
099
100  @Test
101  public void testLocalFileSystemSafeMode() throws Exception {
102    conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
103    assertFalse(CommonFSUtils.isHDFS(conf));
104    assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf)));
105    FSUtils.checkDfsSafeMode(conf);
106  }
107
108  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
109    FSDataOutputStream out = fs.create(file);
110    byte[] data = new byte[dataSize];
111    out.write(data, 0, dataSize);
112    out.close();
113  }
114
115  @Test
116  public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
117    testComputeHDFSBlocksDistribution((fs, testFile) -> {
118      try (FSDataInputStream open = fs.open(testFile)) {
119        assertTrue(open instanceof HdfsDataInputStream);
120        return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
121      }
122    });
123  }
124
125  @Test
126  public void testComputeHDFSBlockDistribution() throws Exception {
127    testComputeHDFSBlocksDistribution((fs, testFile) -> {
128      FileStatus status = fs.getFileStatus(testFile);
129      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
130    });
131  }
132
133  @FunctionalInterface
134  interface HDFSBlockDistributionFunction {
135    HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
136  }
137
138  private void testComputeHDFSBlocksDistribution(
139    HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
140    final int DEFAULT_BLOCK_SIZE = 1024;
141    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
142    MiniDFSCluster cluster = null;
143    Path testFile = null;
144
145    try {
146      // set up a cluster with 3 nodes
147      String hosts[] = new String[] { "host1", "host2", "host3" };
148      cluster = htu.startMiniDFSCluster(hosts);
149      cluster.waitActive();
150      FileSystem fs = cluster.getFileSystem();
151
152      // create a file with two blocks
153      testFile = new Path("/test1.txt");
154      WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE);
155
156      // given the default replication factor is 3, the same as the number of
157      // datanodes; the locality index for each host should be 100%,
158      // or getWeight for each host should be the same as getUniqueBlocksWeights
159      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
160      boolean ok;
161      do {
162        ok = true;
163
164        HDFSBlocksDistribution blocksDistribution =
165          fileToBlockDistribution.getForPath(fs, testFile);
166
167        long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
168        for (String host : hosts) {
169          long weight = blocksDistribution.getWeight(host);
170          ok = (ok && uniqueBlocksTotalWeight == weight);
171        }
172      } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime);
173      assertTrue(ok);
174    } finally {
175      htu.shutdownMiniDFSCluster();
176    }
177
178    try {
179      // set up a cluster with 4 nodes
180      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
181      cluster = htu.startMiniDFSCluster(hosts);
182      cluster.waitActive();
183      FileSystem fs = cluster.getFileSystem();
184
185      // create a file with three blocks
186      testFile = new Path("/test2.txt");
187      WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE);
188
189      // given the default replication factor is 3, we will have total of 9
190      // replica of blocks; thus the host with the highest weight should have
191      // weight == 3 * DEFAULT_BLOCK_SIZE
192      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
193      long weight;
194      long uniqueBlocksTotalWeight;
195      do {
196        HDFSBlocksDistribution blocksDistribution =
197          fileToBlockDistribution.getForPath(fs, testFile);
198        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
199
200        String tophost = blocksDistribution.getTopHosts().get(0);
201        weight = blocksDistribution.getWeight(tophost);
202
203        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
204      } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime);
205      assertTrue(uniqueBlocksTotalWeight == weight);
206
207    } finally {
208      htu.shutdownMiniDFSCluster();
209    }
210
211    try {
212      // set up a cluster with 4 nodes
213      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
214      cluster = htu.startMiniDFSCluster(hosts);
215      cluster.waitActive();
216      FileSystem fs = cluster.getFileSystem();
217
218      // create a file with one block
219      testFile = new Path("/test3.txt");
220      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
221
222      // given the default replication factor is 3, we will have total of 3
223      // replica of blocks; thus there is one host without weight
224      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
225      HDFSBlocksDistribution blocksDistribution;
226      do {
227        blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
228        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
229      } while (
230        blocksDistribution.getTopHosts().size() != 3
231          && EnvironmentEdgeManager.currentTime() < maxTime
232      );
233      assertEquals(3, blocksDistribution.getTopHosts().size(),
234        "Wrong number of hosts distributing blocks.");
235    } finally {
236      htu.shutdownMiniDFSCluster();
237    }
238  }
239
240  private void writeVersionFile(Path versionFile, String version) throws IOException {
241    if (CommonFSUtils.isExists(fs, versionFile)) {
242      assertTrue(CommonFSUtils.delete(fs, versionFile, true));
243    }
244    try (FSDataOutputStream s = fs.create(versionFile)) {
245      s.writeUTF(version);
246    }
247    assertTrue(fs.exists(versionFile));
248  }
249
250  @Test
251  public void testVersion() throws DeserializationException, IOException {
252    final Path rootdir = htu.getDataTestDir();
253    final FileSystem fs = rootdir.getFileSystem(conf);
254    assertNull(FSUtils.getVersion(fs, rootdir));
255    // No meta dir so no complaint from checkVersion.
256    // Presumes it a new install. Will create version file.
257    FSUtils.checkVersion(fs, rootdir, true);
258    // Now remove the version file and create a metadir so checkVersion fails.
259    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
260    assertTrue(CommonFSUtils.isExists(fs, versionFile));
261    assertTrue(CommonFSUtils.delete(fs, versionFile, true));
262    Path metaRegionDir =
263      FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
264    FsPermission defaultPerms =
265      CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY);
266    CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
267    boolean thrown = false;
268    try {
269      FSUtils.checkVersion(fs, rootdir, true);
270    } catch (FileSystemVersionException e) {
271      thrown = true;
272    }
273    assertTrue(thrown, "Expected FileSystemVersionException");
274    // Write out a good version file. See if we can read it in and convert.
275    String version = HConstants.FILE_SYSTEM_VERSION;
276    writeVersionFile(versionFile, version);
277    FileStatus[] status = fs.listStatus(versionFile);
278    assertNotNull(status);
279    assertTrue(status.length > 0);
280    String newVersion = FSUtils.getVersion(fs, rootdir);
281    assertEquals(version.length(), newVersion.length());
282    assertEquals(version, newVersion);
283    // File will have been converted. Exercise the pb format
284    assertEquals(version, FSUtils.getVersion(fs, rootdir));
285    FSUtils.checkVersion(fs, rootdir, true);
286    // Write an old version file.
287    String oldVersion = "1";
288    writeVersionFile(versionFile, oldVersion);
289    newVersion = FSUtils.getVersion(fs, rootdir);
290    assertNotEquals(version, newVersion);
291    thrown = false;
292    try {
293      FSUtils.checkVersion(fs, rootdir, true);
294    } catch (FileSystemVersionException e) {
295      thrown = true;
296    }
297    assertTrue(thrown, "Expected FileSystemVersionException");
298  }
299
300  @Test
301  public void testPermMask() throws Exception {
302    final Path rootdir = htu.getDataTestDir();
303    final FileSystem fs = rootdir.getFileSystem(conf);
304    // default fs permission
305    FsPermission defaultFsPerm =
306      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
307    // 'hbase.data.umask.enable' is false. We will get default fs permission.
308    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
309
310    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
311    // first check that we don't crash if we don't have perms set
312    FsPermission defaultStartPerm =
313      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
314    // default 'hbase.data.umask'is 000, and this umask will be used when
315    // 'hbase.data.umask.enable' is true.
316    // Therefore we will not get the real fs default in this case.
317    // Instead we will get the starting point FULL_RWX_PERMISSIONS
318    assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
319
320    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
321    // now check that we get the right perms
322    FsPermission filePerm =
323      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
324    assertEquals(new FsPermission("700"), filePerm);
325
326    // then that the correct file is created
327    Path p = new Path("target" + File.separator + HBaseTestingUtil.getRandomUUID().toString());
328    try {
329      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
330      out.close();
331      FileStatus stat = fs.getFileStatus(p);
332      assertEquals(new FsPermission("700"), stat.getPermission());
333      // and then cleanup
334    } finally {
335      fs.delete(p, true);
336    }
337  }
338
339  @Test
340  public void testDeleteAndExists() throws Exception {
341    final Path rootdir = htu.getDataTestDir();
342    final FileSystem fs = rootdir.getFileSystem(conf);
343    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
344    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
345    // then that the correct file is created
346    String file = HBaseTestingUtil.getRandomUUID().toString();
347    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
348    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
349    try {
350      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
351      out.close();
352      assertTrue(CommonFSUtils.isExists(fs, p), "The created file should be present");
353      // delete the file with recursion as false. Only the file will be deleted.
354      CommonFSUtils.delete(fs, p, false);
355      // Create another file
356      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
357      out1.close();
358      // delete the file with recursion as false. Still the file only will be deleted
359      CommonFSUtils.delete(fs, p1, true);
360      assertFalse(CommonFSUtils.isExists(fs, p1), "The created file should be present");
361      // and then cleanup
362    } finally {
363      CommonFSUtils.delete(fs, p, true);
364      CommonFSUtils.delete(fs, p1, true);
365    }
366  }
367
368  @Test
369  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
370    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
371    try {
372      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(),
373        new Path("definitely/doesn't/exist"), null));
374    } finally {
375      cluster.shutdown();
376    }
377
378  }
379
380  @Test
381  public void testRenameAndSetModifyTime() throws Exception {
382    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
383    assertTrue(CommonFSUtils.isHDFS(conf));
384
385    FileSystem fs = FileSystem.get(conf);
386    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
387
388    String file = HBaseTestingUtil.getRandomUUID().toString();
389    Path p = new Path(testDir, file);
390
391    FSDataOutputStream out = fs.create(p);
392    out.close();
393    assertTrue(CommonFSUtils.isExists(fs, p), "The created file should be present");
394
395    long expect = EnvironmentEdgeManager.currentTime() + 1000;
396    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
397
398    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
399    mockEnv.setValue(expect);
400    EnvironmentEdgeManager.injectEdge(mockEnv);
401    try {
402      String dstFile = HBaseTestingUtil.getRandomUUID().toString();
403      Path dst = new Path(testDir, dstFile);
404
405      assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
406      assertFalse(CommonFSUtils.isExists(fs, p), "The moved file should not be present");
407      assertTrue(CommonFSUtils.isExists(fs, dst), "The dst file should be present");
408
409      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
410      cluster.shutdown();
411    } finally {
412      EnvironmentEdgeManager.reset();
413    }
414  }
415
416  @Test
417  public void testSetStoragePolicyDefault() throws Exception {
418    verifyNoHDFSApiInvocationForDefaultPolicy();
419    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
420  }
421
422  /**
423   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
424   * logic, will need to remove the check if the default policy is changed
425   */
426  private void verifyNoHDFSApiInvocationForDefaultPolicy() throws URISyntaxException, IOException {
427    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
428    testFs.initialize(new URI("hdfs://localhost/"), conf);
429    // There should be no exception thrown when setting to default storage policy, which indicates
430    // the HDFS API hasn't been called
431    try {
432      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
433        HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
434    } catch (IOException e) {
435      org.junit.jupiter.api.Assertions
436        .fail("Should have bypassed the FS API when setting default storage policy");
437    }
438    // There should be exception thrown when given non-default storage policy, which indicates the
439    // HDFS API has been called
440    try {
441      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
442      fail("Should have invoked the FS API but haven't");
443    } catch (IOException e) {
444      // expected given an invalid path
445    }
446  }
447
448  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
449    @Override
450    public void setStoragePolicy(final Path src, final String policyName) throws IOException {
451      throw new IOException("The setStoragePolicy method is invoked");
452    }
453  }
454
455  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
456  @Test
457  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
458    verifyFileInDirWithStoragePolicy("ALL_SSD");
459  }
460
461  final String INVALID_STORAGE_POLICY = "1772";
462
463  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
464  @Test
465  public void testSetStoragePolicyInvalid() throws Exception {
466    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
467  }
468
469  // Here instead of TestCommonFSUtils because we need a minicluster
470  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
471    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
472
473    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
474    try {
475      assertTrue(CommonFSUtils.isHDFS(conf));
476
477      FileSystem fs = FileSystem.get(conf);
478      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
479      fs.mkdirs(testDir);
480
481      String storagePolicy =
482        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
483      CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
484
485      String file = HBaseTestingUtil.getRandomUUID().toString();
486      Path p = new Path(testDir, file);
487      WriteDataToHDFS(fs, p, 4096);
488      HFileSystem hfs = new HFileSystem(fs);
489      String policySet = hfs.getStoragePolicyName(p);
490      LOG.debug("The storage policy of path " + p + " is " + policySet);
491      if (
492        policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
493          || policy.equals(INVALID_STORAGE_POLICY)
494      ) {
495        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
496        LOG.debug("The default hdfs storage policy (indicated by home path: "
497          + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
498        assertEquals(hdfsDefaultPolicy, policySet);
499      } else {
500        assertEquals(policy, policySet);
501      }
502      // will assert existence before deleting.
503      cleanupFile(fs, testDir);
504    } finally {
505      cluster.shutdown();
506    }
507  }
508
509  /**
510   * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of
511   * preading with hedged reads enabled using code taken from hdfs TestPread.
512   */
513  @Test
514  public void testDFSHedgedReadMetrics() throws Exception {
515    // Enable hedged reads and set it so the threshold is really low.
516    // Most of this test is taken from HDFS, from TestPread.
517    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
518    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
519    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
520    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
521    // Set short retry timeouts so this test runs faster
522    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
523    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
524    // disable metrics logger since it depend on commons-logging internal classes and we do not want
525    // commons-logging on our classpath
526    conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
527    conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0);
528    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
529    // Get the metrics. Should be empty.
530    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
531    assertEquals(0, metrics.getHedgedReadOps());
532    FileSystem fileSys = cluster.getFileSystem();
533    try {
534      Path p = new Path("preadtest.dat");
535      // We need > 1 blocks to test out the hedged reads.
536      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3,
537        seed);
538      pReadFile(fileSys, p);
539      cleanupFile(fileSys, p);
540      assertTrue(metrics.getHedgedReadOps() > 0);
541    } finally {
542      fileSys.close();
543      cluster.shutdown();
544    }
545  }
546
547  @Test
548  public void testCopyFilesParallel() throws Exception {
549    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
550    cluster.waitActive();
551    FileSystem fs = cluster.getFileSystem();
552    Path src = new Path("/src");
553    fs.mkdirs(src);
554    for (int i = 0; i < 50; i++) {
555      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
556    }
557    Path sub = new Path(src, "sub");
558    fs.mkdirs(sub);
559    for (int i = 0; i < 50; i++) {
560      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
561    }
562    Path dst = new Path("/dst");
563    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
564
565    assertEquals(102, allFiles.size());
566    FileStatus[] list = fs.listStatus(dst);
567    assertEquals(51, list.length);
568    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
569    assertEquals(50, sublist.length);
570  }
571
572  // Below is taken from TestPread over in HDFS.
573  static final int blockSize = 4096;
574  static final long seed = 0xDEADBEEFL;
575  private Random rand = new Random(); // This test depends on Random#setSeed
576
577  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
578    FSDataInputStream stm = fileSys.open(name);
579    byte[] expected = new byte[12 * blockSize];
580    rand.setSeed(seed);
581    rand.nextBytes(expected);
582    // do a sanity check. Read first 4K bytes
583    byte[] actual = new byte[4096];
584    stm.readFully(actual);
585    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
586    // now do a pread for the first 8K bytes
587    actual = new byte[8192];
588    doPread(stm, 0L, actual, 0, 8192);
589    checkAndEraseData(actual, 0, expected, "Pread Test 1");
590    // Now check to see if the normal read returns 4K-8K byte range
591    actual = new byte[4096];
592    stm.readFully(actual);
593    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
594    // Now see if we can cross a single block boundary successfully
595    // read 4K bytes from blockSize - 2K offset
596    stm.readFully(blockSize - 2048, actual, 0, 4096);
597    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
598    // now see if we can cross two block boundaries successfully
599    // read blockSize + 4K bytes from blockSize - 2K offset
600    actual = new byte[blockSize + 4096];
601    stm.readFully(blockSize - 2048, actual);
602    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
603    // now see if we can cross two block boundaries that are not cached
604    // read blockSize + 4K bytes from 10*blockSize - 2K offset
605    actual = new byte[blockSize + 4096];
606    stm.readFully(10 * blockSize - 2048, actual);
607    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
608    // now check that even after all these preads, we can still read
609    // bytes 8K-12K
610    actual = new byte[4096];
611    stm.readFully(actual);
612    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
613    // done
614    stm.close();
615    // check block location caching
616    stm = fileSys.open(name);
617    stm.readFully(1, actual, 0, 4096);
618    stm.readFully(4 * blockSize, actual, 0, 4096);
619    stm.readFully(7 * blockSize, actual, 0, 4096);
620    actual = new byte[3 * 4096];
621    stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
622    checkAndEraseData(actual, 0, expected, "Pread Test 7");
623    actual = new byte[8 * 4096];
624    stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
625    checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
626    // read the tail
627    stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
628    IOException res = null;
629    try { // read beyond the end of the file
630      stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
631    } catch (IOException e) {
632      // should throw an exception
633      res = e;
634    }
635    assertTrue(res != null, "Error reading beyond file boundary.");
636
637    stm.close();
638  }
639
640  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
641    for (int idx = 0; idx < actual.length; idx++) {
642      assertEquals(expected[from + idx], actual[idx], message + " byte " + (from + idx)
643        + " differs. expected " + expected[from + idx] + " actual " + actual[idx]);
644      actual[idx] = 0;
645    }
646  }
647
648  private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length)
649    throws IOException {
650    int nread = 0;
651    // long totalRead = 0;
652    // DFSInputStream dfstm = null;
653
654    /*
655     * Disable. This counts do not add up. Some issue in original hdfs tests? if
656     * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream)
657     * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); }
658     */
659
660    while (nread < length) {
661      int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread);
662      assertTrue(nbytes > 0, "Error in pread");
663      nread += nbytes;
664    }
665
666    /*
667     * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) {
668     * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <=
669     * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else {
670     * assertEquals("Expected read statistic to be incremented", length, dfstm
671     * .getReadStatistics().getTotalBytesRead() - totalRead); } }
672     */
673  }
674
675  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
676    assertTrue(fileSys.exists(name));
677    assertTrue(fileSys.delete(name, true));
678    assertTrue(!fileSys.exists(name));
679  }
680
681  static {
682    try {
683      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
684      LOG.debug("Test thought StreamCapabilities class was present.");
685    } catch (ClassNotFoundException exception) {
686      LOG.debug("Test didn't think StreamCapabilities class was present.");
687    }
688  }
689
690  // Here instead of TestCommonFSUtils because we need a minicluster
691  @Test
692  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
693    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
694    try (FileSystem filesystem = cluster.getFileSystem()) {
695      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
696      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
697      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
698      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
699    } finally {
700      cluster.shutdown();
701    }
702  }
703
704  private void testIsSameHdfs(int nnport) throws IOException {
705    Configuration conf = HBaseConfiguration.create();
706    Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
707    Path desPath = new Path("hdfs://127.0.0.1/");
708    FileSystem srcFs = srcPath.getFileSystem(conf);
709    FileSystem desFs = desPath.getFileSystem(conf);
710
711    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
712
713    desPath = new Path("hdfs://127.0.0.1:8070/");
714    desFs = desPath.getFileSystem(conf);
715    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
716
717    desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
718    desFs = desPath.getFileSystem(conf);
719    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
720
721    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
722    conf.set("dfs.nameservices", "haosong-hadoop");
723    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
724    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
725      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
726
727    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
728    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
729    desPath = new Path("/");
730    desFs = desPath.getFileSystem(conf);
731    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
732
733    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
734    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
735    desPath = new Path("/");
736    desFs = desPath.getFileSystem(conf);
737    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
738  }
739
740  @Test
741  public void testIsSameHdfs() throws IOException {
742    String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
743    LOG.info("hadoop version is: " + hadoopVersion);
744    boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
745    if (isHadoop3_0_0) {
746      // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
747      // See HDFS-9427
748      testIsSameHdfs(9820);
749    } else {
750      // pre hadoop 3.0.0 defaults to port 8020
751      // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
752      testIsSameHdfs(8020);
753    }
754  }
755}