001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.util.List;
030import java.util.Random;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.StreamCapabilities;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HDFSBlocksDistribution;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.exceptions.DeserializationException;
047import org.apache.hadoop.hbase.fs.HFileSystem;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.testclassification.MiscTests;
050import org.apache.hadoop.hdfs.DFSConfigKeys;
051import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
052import org.apache.hadoop.hdfs.DFSTestUtil;
053import org.apache.hadoop.hdfs.DistributedFileSystem;
054import org.apache.hadoop.hdfs.MiniDFSCluster;
055import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
056import org.junit.Assert;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Test {@link FSUtils}.
066 */
067@Category({MiscTests.class, MediumTests.class})
068public class TestFSUtils {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestFSUtils.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
075
076  private HBaseTestingUtil htu;
077  private FileSystem fs;
078  private Configuration conf;
079
080  @Before
081  public void setUp() throws IOException {
082    htu = new HBaseTestingUtil();
083    fs = htu.getTestFileSystem();
084    conf = htu.getConfiguration();
085  }
086
087  @Test
088  public void testIsHDFS() throws Exception {
089    assertFalse(CommonFSUtils.isHDFS(conf));
090    MiniDFSCluster cluster = null;
091    try {
092      cluster = htu.startMiniDFSCluster(1);
093      assertTrue(CommonFSUtils.isHDFS(conf));
094    } finally {
095      if (cluster != null) {
096        cluster.shutdown();
097      }
098    }
099  }
100
101  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
102    throws Exception {
103    FSDataOutputStream out = fs.create(file);
104    byte [] data = new byte[dataSize];
105    out.write(data, 0, dataSize);
106    out.close();
107  }
108
109  @Test
110  public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
111    testComputeHDFSBlocksDistribution((fs, testFile) -> {
112      try (FSDataInputStream open = fs.open(testFile)) {
113        assertTrue(open instanceof HdfsDataInputStream);
114        return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
115      }
116    });
117  }
118
119  @Test
120  public void testComputeHDFSBlockDistribution() throws Exception {
121    testComputeHDFSBlocksDistribution((fs, testFile) -> {
122      FileStatus status = fs.getFileStatus(testFile);
123      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
124    });
125  }
126
127  @FunctionalInterface
128  interface HDFSBlockDistributionFunction {
129    HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
130  }
131
132  private void testComputeHDFSBlocksDistribution(
133    HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
134    final int DEFAULT_BLOCK_SIZE = 1024;
135    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
136    MiniDFSCluster cluster = null;
137    Path testFile = null;
138
139    try {
140      // set up a cluster with 3 nodes
141      String hosts[] = new String[] { "host1", "host2", "host3" };
142      cluster = htu.startMiniDFSCluster(hosts);
143      cluster.waitActive();
144      FileSystem fs = cluster.getFileSystem();
145
146      // create a file with two blocks
147      testFile = new Path("/test1.txt");
148      WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE);
149
150      // given the default replication factor is 3, the same as the number of
151      // datanodes; the locality index for each host should be 100%,
152      // or getWeight for each host should be the same as getUniqueBlocksWeights
153      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
154      boolean ok;
155      do {
156        ok = true;
157
158        HDFSBlocksDistribution blocksDistribution =
159          fileToBlockDistribution.getForPath(fs, testFile);
160
161        long uniqueBlocksTotalWeight =
162          blocksDistribution.getUniqueBlocksTotalWeight();
163        for (String host : hosts) {
164          long weight = blocksDistribution.getWeight(host);
165          ok = (ok && uniqueBlocksTotalWeight == weight);
166        }
167      } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime);
168      assertTrue(ok);
169      } finally {
170      htu.shutdownMiniDFSCluster();
171    }
172
173
174    try {
175      // set up a cluster with 4 nodes
176      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
177      cluster = htu.startMiniDFSCluster(hosts);
178      cluster.waitActive();
179      FileSystem fs = cluster.getFileSystem();
180
181      // create a file with three blocks
182      testFile = new Path("/test2.txt");
183      WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE);
184
185      // given the default replication factor is 3, we will have total of 9
186      // replica of blocks; thus the host with the highest weight should have
187      // weight == 3 * DEFAULT_BLOCK_SIZE
188      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
189      long weight;
190      long uniqueBlocksTotalWeight;
191      do {
192        HDFSBlocksDistribution blocksDistribution =
193          fileToBlockDistribution.getForPath(fs, testFile);
194        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
195
196        String tophost = blocksDistribution.getTopHosts().get(0);
197        weight = blocksDistribution.getWeight(tophost);
198
199        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
200      } while (uniqueBlocksTotalWeight != weight &&
201          EnvironmentEdgeManager.currentTime() < maxTime);
202      assertTrue(uniqueBlocksTotalWeight == weight);
203
204    } finally {
205      htu.shutdownMiniDFSCluster();
206    }
207
208
209    try {
210      // set up a cluster with 4 nodes
211      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
212      cluster = htu.startMiniDFSCluster(hosts);
213      cluster.waitActive();
214      FileSystem fs = cluster.getFileSystem();
215
216      // create a file with one block
217      testFile = new Path("/test3.txt");
218      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
219
220      // given the default replication factor is 3, we will have total of 3
221      // replica of blocks; thus there is one host without weight
222      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
223      HDFSBlocksDistribution blocksDistribution;
224      do {
225        blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
226        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
227      }
228      while (blocksDistribution.getTopHosts().size() != 3 &&
229          EnvironmentEdgeManager.currentTime() < maxTime);
230      assertEquals("Wrong number of hosts distributing blocks.", 3,
231        blocksDistribution.getTopHosts().size());
232    } finally {
233      htu.shutdownMiniDFSCluster();
234    }
235  }
236
237  private void writeVersionFile(Path versionFile, String version) throws IOException {
238    if (CommonFSUtils.isExists(fs, versionFile)) {
239      assertTrue(CommonFSUtils.delete(fs, versionFile, true));
240    }
241    try (FSDataOutputStream s = fs.create(versionFile)) {
242      s.writeUTF(version);
243    }
244    assertTrue(fs.exists(versionFile));
245  }
246
247  @Test
248  public void testVersion() throws DeserializationException, IOException {
249    final Path rootdir = htu.getDataTestDir();
250    final FileSystem fs = rootdir.getFileSystem(conf);
251    assertNull(FSUtils.getVersion(fs, rootdir));
252    // No meta dir so no complaint from checkVersion.
253    // Presumes it a new install. Will create version file.
254    FSUtils.checkVersion(fs, rootdir, true);
255    // Now remove the version file and create a metadir so checkVersion fails.
256    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
257    assertTrue(CommonFSUtils.isExists(fs, versionFile));
258    assertTrue(CommonFSUtils.delete(fs, versionFile, true));
259    Path metaRegionDir =
260        FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
261    FsPermission defaultPerms = CommonFSUtils.getFilePermissions(fs, this.conf,
262        HConstants.DATA_FILE_UMASK_KEY);
263    CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
264    boolean thrown = false;
265    try {
266      FSUtils.checkVersion(fs, rootdir, true);
267    } catch (FileSystemVersionException e) {
268      thrown = true;
269    }
270    assertTrue("Expected FileSystemVersionException", thrown);
271    // Write out a good version file.  See if we can read it in and convert.
272    String version = HConstants.FILE_SYSTEM_VERSION;
273    writeVersionFile(versionFile, version);
274    FileStatus [] status = fs.listStatus(versionFile);
275    assertNotNull(status);
276    assertTrue(status.length > 0);
277    String newVersion = FSUtils.getVersion(fs, rootdir);
278    assertEquals(version.length(), newVersion.length());
279    assertEquals(version, newVersion);
280    // File will have been converted. Exercise the pb format
281    assertEquals(version, FSUtils.getVersion(fs, rootdir));
282    FSUtils.checkVersion(fs, rootdir, true);
283    // Write an old version file.
284    String oldVersion = "1";
285    writeVersionFile(versionFile, oldVersion);
286    newVersion = FSUtils.getVersion(fs, rootdir);
287    assertNotEquals(version, newVersion);
288    thrown = false;
289    try {
290      FSUtils.checkVersion(fs, rootdir, true);
291    } catch (FileSystemVersionException e) {
292      thrown = true;
293    }
294    assertTrue("Expected FileSystemVersionException", thrown);
295  }
296
297  @Test
298  public void testPermMask() throws Exception {
299    final Path rootdir = htu.getDataTestDir();
300    final FileSystem fs = rootdir.getFileSystem(conf);
301    // default fs permission
302    FsPermission defaultFsPerm = CommonFSUtils.getFilePermissions(fs, conf,
303        HConstants.DATA_FILE_UMASK_KEY);
304    // 'hbase.data.umask.enable' is false. We will get default fs permission.
305    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
306
307    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
308    // first check that we don't crash if we don't have perms set
309    FsPermission defaultStartPerm = CommonFSUtils.getFilePermissions(fs, conf,
310        HConstants.DATA_FILE_UMASK_KEY);
311    // default 'hbase.data.umask'is 000, and this umask will be used when
312    // 'hbase.data.umask.enable' is true.
313    // Therefore we will not get the real fs default in this case.
314    // Instead we will get the starting point FULL_RWX_PERMISSIONS
315    assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
316
317    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
318    // now check that we get the right perms
319    FsPermission filePerm = CommonFSUtils.getFilePermissions(fs, conf,
320        HConstants.DATA_FILE_UMASK_KEY);
321    assertEquals(new FsPermission("700"), filePerm);
322
323    // then that the correct file is created
324    Path p = new Path("target" + File.separator + htu.getRandomUUID().toString());
325    try {
326      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
327      out.close();
328      FileStatus stat = fs.getFileStatus(p);
329      assertEquals(new FsPermission("700"), stat.getPermission());
330      // and then cleanup
331    } finally {
332      fs.delete(p, true);
333    }
334  }
335
336  @Test
337  public void testDeleteAndExists() throws Exception {
338    final Path rootdir = htu.getDataTestDir();
339    final FileSystem fs = rootdir.getFileSystem(conf);
340    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
341    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
342    // then that the correct file is created
343    String file = htu.getRandomUUID().toString();
344    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
345    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
346    try {
347      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
348      out.close();
349      assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
350      // delete the file with recursion as false. Only the file will be deleted.
351      CommonFSUtils.delete(fs, p, false);
352      // Create another file
353      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
354      out1.close();
355      // delete the file with recursion as false. Still the file only will be deleted
356      CommonFSUtils.delete(fs, p1, true);
357      assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1));
358      // and then cleanup
359    } finally {
360      CommonFSUtils.delete(fs, p, true);
361      CommonFSUtils.delete(fs, p1, true);
362    }
363  }
364
365  @Test
366  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
367    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
368    try {
369      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null));
370    } finally {
371      cluster.shutdown();
372    }
373
374  }
375
376  @Test
377  public void testRenameAndSetModifyTime() throws Exception {
378    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
379    assertTrue(CommonFSUtils.isHDFS(conf));
380
381    FileSystem fs = FileSystem.get(conf);
382    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
383
384    String file = htu.getRandomUUID().toString();
385    Path p = new Path(testDir, file);
386
387    FSDataOutputStream out = fs.create(p);
388    out.close();
389    assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
390
391    long expect = EnvironmentEdgeManager.currentTime() + 1000;
392    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
393
394    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
395    mockEnv.setValue(expect);
396    EnvironmentEdgeManager.injectEdge(mockEnv);
397    try {
398      String dstFile = htu.getRandomUUID().toString();
399      Path dst = new Path(testDir , dstFile);
400
401      assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
402      assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p));
403      assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst));
404
405      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
406      cluster.shutdown();
407    } finally {
408      EnvironmentEdgeManager.reset();
409    }
410  }
411
412  @Test
413  public void testSetStoragePolicyDefault() throws Exception {
414    verifyNoHDFSApiInvocationForDefaultPolicy();
415    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
416  }
417
418  /**
419   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
420   * logic, will need to remove the check if the default policy is changed
421   */
422  private void verifyNoHDFSApiInvocationForDefaultPolicy() {
423    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
424    // There should be no exception thrown when setting to default storage policy, which indicates
425    // the HDFS API hasn't been called
426    try {
427      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
428        HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
429    } catch (IOException e) {
430      Assert.fail("Should have bypassed the FS API when setting default storage policy");
431    }
432    // There should be exception thrown when given non-default storage policy, which indicates the
433    // HDFS API has been called
434    try {
435      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
436      Assert.fail("Should have invoked the FS API but haven't");
437    } catch (IOException e) {
438      // expected given an invalid path
439    }
440  }
441
442  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
443    @Override
444    public void setStoragePolicy(final Path src, final String policyName)
445            throws IOException {
446      throw new IOException("The setStoragePolicy method is invoked");
447    }
448  }
449
450  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
451  @Test
452  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
453    verifyFileInDirWithStoragePolicy("ALL_SSD");
454  }
455
456  final String INVALID_STORAGE_POLICY = "1772";
457
458  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
459  @Test
460  public void testSetStoragePolicyInvalid() throws Exception {
461    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
462  }
463
464  // Here instead of TestCommonFSUtils because we need a minicluster
465  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
466    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
467
468    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
469    try {
470      assertTrue(CommonFSUtils.isHDFS(conf));
471
472      FileSystem fs = FileSystem.get(conf);
473      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
474      fs.mkdirs(testDir);
475
476      String storagePolicy =
477          conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
478      CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
479
480      String file =htu.getRandomUUID().toString();
481      Path p = new Path(testDir, file);
482      WriteDataToHDFS(fs, p, 4096);
483      HFileSystem hfs = new HFileSystem(fs);
484      String policySet = hfs.getStoragePolicyName(p);
485      LOG.debug("The storage policy of path " + p + " is " + policySet);
486      if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
487              || policy.equals(INVALID_STORAGE_POLICY)) {
488        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
489        LOG.debug("The default hdfs storage policy (indicated by home path: "
490                + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
491        Assert.assertEquals(hdfsDefaultPolicy, policySet);
492      } else {
493        Assert.assertEquals(policy, policySet);
494      }
495      // will assert existance before deleting.
496      cleanupFile(fs, testDir);
497    } finally {
498      cluster.shutdown();
499    }
500  }
501
502  /**
503   * Ugly test that ensures we can get at the hedged read counters in dfsclient.
504   * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
505   * @throws Exception
506   */
507  @Test public void testDFSHedgedReadMetrics() throws Exception {
508    // Enable hedged reads and set it so the threshold is really low.
509    // Most of this test is taken from HDFS, from TestPread.
510    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
511    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
512    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
513    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
514    // Set short retry timeouts so this test runs faster
515    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
516    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
517    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
518    // Get the metrics.  Should be empty.
519    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
520    assertEquals(0, metrics.getHedgedReadOps());
521    FileSystem fileSys = cluster.getFileSystem();
522    try {
523      Path p = new Path("preadtest.dat");
524      // We need > 1 blocks to test out the hedged reads.
525      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
526        blockSize, (short) 3, seed);
527      pReadFile(fileSys, p);
528      cleanupFile(fileSys, p);
529      assertTrue(metrics.getHedgedReadOps() > 0);
530    } finally {
531      fileSys.close();
532      cluster.shutdown();
533    }
534  }
535
536
537  @Test
538  public void testCopyFilesParallel() throws Exception {
539    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
540    cluster.waitActive();
541    FileSystem fs = cluster.getFileSystem();
542    Path src = new Path("/src");
543    fs.mkdirs(src);
544    for (int i = 0; i < 50; i++) {
545      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
546    }
547    Path sub = new Path(src, "sub");
548    fs.mkdirs(sub);
549    for (int i = 0; i < 50; i++) {
550      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
551    }
552    Path dst = new Path("/dst");
553    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
554
555    assertEquals(102, allFiles.size());
556    FileStatus[] list = fs.listStatus(dst);
557    assertEquals(51, list.length);
558    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
559    assertEquals(50, sublist.length);
560  }
561
562  // Below is taken from TestPread over in HDFS.
563  static final int blockSize = 4096;
564  static final long seed = 0xDEADBEEFL;
565
566  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
567    FSDataInputStream stm = fileSys.open(name);
568    byte[] expected = new byte[12 * blockSize];
569    Random rand = new Random(seed);
570    rand.nextBytes(expected);
571    // do a sanity check. Read first 4K bytes
572    byte[] actual = new byte[4096];
573    stm.readFully(actual);
574    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
575    // now do a pread for the first 8K bytes
576    actual = new byte[8192];
577    doPread(stm, 0L, actual, 0, 8192);
578    checkAndEraseData(actual, 0, expected, "Pread Test 1");
579    // Now check to see if the normal read returns 4K-8K byte range
580    actual = new byte[4096];
581    stm.readFully(actual);
582    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
583    // Now see if we can cross a single block boundary successfully
584    // read 4K bytes from blockSize - 2K offset
585    stm.readFully(blockSize - 2048, actual, 0, 4096);
586    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
587    // now see if we can cross two block boundaries successfully
588    // read blockSize + 4K bytes from blockSize - 2K offset
589    actual = new byte[blockSize + 4096];
590    stm.readFully(blockSize - 2048, actual);
591    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
592    // now see if we can cross two block boundaries that are not cached
593    // read blockSize + 4K bytes from 10*blockSize - 2K offset
594    actual = new byte[blockSize + 4096];
595    stm.readFully(10 * blockSize - 2048, actual);
596    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
597    // now check that even after all these preads, we can still read
598    // bytes 8K-12K
599    actual = new byte[4096];
600    stm.readFully(actual);
601    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
602    // done
603    stm.close();
604    // check block location caching
605    stm = fileSys.open(name);
606    stm.readFully(1, actual, 0, 4096);
607    stm.readFully(4*blockSize, actual, 0, 4096);
608    stm.readFully(7*blockSize, actual, 0, 4096);
609    actual = new byte[3*4096];
610    stm.readFully(0*blockSize, actual, 0, 3*4096);
611    checkAndEraseData(actual, 0, expected, "Pread Test 7");
612    actual = new byte[8*4096];
613    stm.readFully(3*blockSize, actual, 0, 8*4096);
614    checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
615    // read the tail
616    stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
617    IOException res = null;
618    try { // read beyond the end of the file
619      stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
620    } catch (IOException e) {
621      // should throw an exception
622      res = e;
623    }
624    assertTrue("Error reading beyond file boundary.", res != null);
625
626    stm.close();
627  }
628
629  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
630    for (int idx = 0; idx < actual.length; idx++) {
631      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
632                        expected[from+idx]+" actual "+actual[idx],
633                        actual[idx], expected[from+idx]);
634      actual[idx] = 0;
635    }
636  }
637
638  private void doPread(FSDataInputStream stm, long position, byte[] buffer,
639      int offset, int length) throws IOException {
640    int nread = 0;
641    // long totalRead = 0;
642    // DFSInputStream dfstm = null;
643
644    /* Disable. This counts do not add up. Some issue in original hdfs tests?
645    if (stm.getWrappedStream() instanceof DFSInputStream) {
646      dfstm = (DFSInputStream) (stm.getWrappedStream());
647      totalRead = dfstm.getReadStatistics().getTotalBytesRead();
648    } */
649
650    while (nread < length) {
651      int nbytes =
652          stm.read(position + nread, buffer, offset + nread, length - nread);
653      assertTrue("Error in pread", nbytes > 0);
654      nread += nbytes;
655    }
656
657    /* Disable. This counts do not add up. Some issue in original hdfs tests?
658    if (dfstm != null) {
659      if (isHedgedRead) {
660        assertTrue("Expected read statistic to be incremented",
661          length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
662      } else {
663        assertEquals("Expected read statistic to be incremented", length, dfstm
664            .getReadStatistics().getTotalBytesRead() - totalRead);
665      }
666    }*/
667  }
668
669  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
670    assertTrue(fileSys.exists(name));
671    assertTrue(fileSys.delete(name, true));
672    assertTrue(!fileSys.exists(name));
673  }
674
675
676  static {
677    try {
678      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
679      LOG.debug("Test thought StreamCapabilities class was present.");
680    } catch (ClassNotFoundException exception) {
681      LOG.debug("Test didn't think StreamCapabilities class was present.");
682    }
683  }
684
685  // Here instead of TestCommonFSUtils because we need a minicluster
686  @Test
687  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
688    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
689    try (FileSystem filesystem = cluster.getFileSystem()) {
690      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
691      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
692      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
693      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
694    } finally {
695      cluster.shutdown();
696    }
697  }
698
699  private void testIsSameHdfs(int nnport) throws IOException {
700    Configuration conf = HBaseConfiguration.create();
701    Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
702    Path desPath = new Path("hdfs://127.0.0.1/");
703    FileSystem srcFs = srcPath.getFileSystem(conf);
704    FileSystem desFs = desPath.getFileSystem(conf);
705
706    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
707
708    desPath = new Path("hdfs://127.0.0.1:8070/");
709    desFs = desPath.getFileSystem(conf);
710    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
711
712    desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
713    desFs = desPath.getFileSystem(conf);
714    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
715
716    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
717    conf.set("dfs.nameservices", "haosong-hadoop");
718    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
719    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
720      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
721
722    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
723    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
724    desPath = new Path("/");
725    desFs = desPath.getFileSystem(conf);
726    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
727
728    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
729    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
730    desPath = new Path("/");
731    desFs = desPath.getFileSystem(conf);
732    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
733  }
734
735  @Test
736  public void testIsSameHdfs() throws IOException {
737    String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
738    LOG.info("hadoop version is: " + hadoopVersion);
739    boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
740    if (isHadoop3_0_0) {
741      // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
742      // See HDFS-9427
743      testIsSameHdfs(9820);
744    } else {
745      // pre hadoop 3.0.0 defaults to port 8020
746      // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
747      testIsSameHdfs(8020);
748    }
749  }
750}