001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.util.List;
030import java.util.Random;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.StreamCapabilities;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HDFSBlocksDistribution;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.exceptions.DeserializationException;
047import org.apache.hadoop.hbase.fs.HFileSystem;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.testclassification.MiscTests;
050import org.apache.hadoop.hdfs.DFSConfigKeys;
051import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
052import org.apache.hadoop.hdfs.DFSTestUtil;
053import org.apache.hadoop.hdfs.DistributedFileSystem;
054import org.apache.hadoop.hdfs.MiniDFSCluster;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Test {@link FSUtils}.
065 */
066@Category({MiscTests.class, MediumTests.class})
067public class TestFSUtils {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestFSUtils.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
074
075  private HBaseTestingUtility htu;
076  private FileSystem fs;
077  private Configuration conf;
078
079  @Before
080  public void setUp() throws IOException {
081    htu = new HBaseTestingUtility();
082    fs = htu.getTestFileSystem();
083    conf = htu.getConfiguration();
084  }
085
086  @Test
087  public void testIsHDFS() throws Exception {
088    assertFalse(CommonFSUtils.isHDFS(conf));
089    MiniDFSCluster cluster = null;
090    try {
091      cluster = htu.startMiniDFSCluster(1);
092      assertTrue(CommonFSUtils.isHDFS(conf));
093    } finally {
094      if (cluster != null) {
095        cluster.shutdown();
096      }
097    }
098  }
099
100  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
101    throws Exception {
102    FSDataOutputStream out = fs.create(file);
103    byte [] data = new byte[dataSize];
104    out.write(data, 0, dataSize);
105    out.close();
106  }
107
108  @Test public void testcomputeHDFSBlocksDistribution() throws Exception {
109    final int DEFAULT_BLOCK_SIZE = 1024;
110    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
111    MiniDFSCluster cluster = null;
112    Path testFile = null;
113
114    try {
115      // set up a cluster with 3 nodes
116      String hosts[] = new String[] { "host1", "host2", "host3" };
117      cluster = htu.startMiniDFSCluster(hosts);
118      cluster.waitActive();
119      FileSystem fs = cluster.getFileSystem();
120
121      // create a file with two blocks
122      testFile = new Path("/test1.txt");
123      WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE);
124
125      // given the default replication factor is 3, the same as the number of
126      // datanodes; the locality index for each host should be 100%,
127      // or getWeight for each host should be the same as getUniqueBlocksWeights
128      final long maxTime = System.currentTimeMillis() + 2000;
129      boolean ok;
130      do {
131        ok = true;
132        FileStatus status = fs.getFileStatus(testFile);
133        HDFSBlocksDistribution blocksDistribution =
134          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
135        long uniqueBlocksTotalWeight =
136          blocksDistribution.getUniqueBlocksTotalWeight();
137        for (String host : hosts) {
138          long weight = blocksDistribution.getWeight(host);
139          ok = (ok && uniqueBlocksTotalWeight == weight);
140        }
141      } while (!ok && System.currentTimeMillis() < maxTime);
142      assertTrue(ok);
143      } finally {
144      htu.shutdownMiniDFSCluster();
145    }
146
147
148    try {
149      // set up a cluster with 4 nodes
150      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
151      cluster = htu.startMiniDFSCluster(hosts);
152      cluster.waitActive();
153      FileSystem fs = cluster.getFileSystem();
154
155      // create a file with three blocks
156      testFile = new Path("/test2.txt");
157      WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE);
158
159      // given the default replication factor is 3, we will have total of 9
160      // replica of blocks; thus the host with the highest weight should have
161      // weight == 3 * DEFAULT_BLOCK_SIZE
162      final long maxTime = System.currentTimeMillis() + 2000;
163      long weight;
164      long uniqueBlocksTotalWeight;
165      do {
166        FileStatus status = fs.getFileStatus(testFile);
167        HDFSBlocksDistribution blocksDistribution =
168          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
169        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
170
171        String tophost = blocksDistribution.getTopHosts().get(0);
172        weight = blocksDistribution.getWeight(tophost);
173
174        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
175      } while (uniqueBlocksTotalWeight != weight && System.currentTimeMillis() < maxTime);
176      assertTrue(uniqueBlocksTotalWeight == weight);
177
178    } finally {
179      htu.shutdownMiniDFSCluster();
180    }
181
182
183    try {
184      // set up a cluster with 4 nodes
185      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
186      cluster = htu.startMiniDFSCluster(hosts);
187      cluster.waitActive();
188      FileSystem fs = cluster.getFileSystem();
189
190      // create a file with one block
191      testFile = new Path("/test3.txt");
192      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
193
194      // given the default replication factor is 3, we will have total of 3
195      // replica of blocks; thus there is one host without weight
196      final long maxTime = System.currentTimeMillis() + 2000;
197      HDFSBlocksDistribution blocksDistribution;
198      do {
199        FileStatus status = fs.getFileStatus(testFile);
200        blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
201        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
202      }
203      while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime);
204      assertEquals("Wrong number of hosts distributing blocks.", 3,
205        blocksDistribution.getTopHosts().size());
206    } finally {
207      htu.shutdownMiniDFSCluster();
208    }
209  }
210
211  private void writeVersionFile(Path versionFile, String version) throws IOException {
212    if (CommonFSUtils.isExists(fs, versionFile)) {
213      assertTrue(CommonFSUtils.delete(fs, versionFile, true));
214    }
215    try (FSDataOutputStream s = fs.create(versionFile)) {
216      s.writeUTF(version);
217    }
218    assertTrue(fs.exists(versionFile));
219  }
220
221  @Test
222  public void testVersion() throws DeserializationException, IOException {
223    final Path rootdir = htu.getDataTestDir();
224    final FileSystem fs = rootdir.getFileSystem(conf);
225    assertNull(FSUtils.getVersion(fs, rootdir));
226    // No meta dir so no complaint from checkVersion.
227    // Presumes it a new install. Will create version file.
228    FSUtils.checkVersion(fs, rootdir, true);
229    // Now remove the version file and create a metadir so checkVersion fails.
230    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
231    assertTrue(CommonFSUtils.isExists(fs, versionFile));
232    assertTrue(CommonFSUtils.delete(fs, versionFile, true));
233    Path metaRegionDir =
234        FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
235    FsPermission defaultPerms = CommonFSUtils.getFilePermissions(fs, this.conf,
236        HConstants.DATA_FILE_UMASK_KEY);
237    CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
238    boolean thrown = false;
239    try {
240      FSUtils.checkVersion(fs, rootdir, true);
241    } catch (FileSystemVersionException e) {
242      thrown = true;
243    }
244    assertTrue("Expected FileSystemVersionException", thrown);
245    // Write out a good version file.  See if we can read it in and convert.
246    String version = HConstants.FILE_SYSTEM_VERSION;
247    writeVersionFile(versionFile, version);
248    FileStatus [] status = fs.listStatus(versionFile);
249    assertNotNull(status);
250    assertTrue(status.length > 0);
251    String newVersion = FSUtils.getVersion(fs, rootdir);
252    assertEquals(version.length(), newVersion.length());
253    assertEquals(version, newVersion);
254    // File will have been converted. Exercise the pb format
255    assertEquals(version, FSUtils.getVersion(fs, rootdir));
256    FSUtils.checkVersion(fs, rootdir, true);
257    // Write an old version file.
258    String oldVersion = "1";
259    writeVersionFile(versionFile, oldVersion);
260    newVersion = FSUtils.getVersion(fs, rootdir);
261    assertNotEquals(version, newVersion);
262    thrown = false;
263    try {
264      FSUtils.checkVersion(fs, rootdir, true);
265    } catch (FileSystemVersionException e) {
266      thrown = true;
267    }
268    assertTrue("Expected FileSystemVersionException", thrown);
269  }
270
271  @Test
272  public void testPermMask() throws Exception {
273    final Path rootdir = htu.getDataTestDir();
274    final FileSystem fs = rootdir.getFileSystem(conf);
275    // default fs permission
276    FsPermission defaultFsPerm = CommonFSUtils.getFilePermissions(fs, conf,
277        HConstants.DATA_FILE_UMASK_KEY);
278    // 'hbase.data.umask.enable' is false. We will get default fs permission.
279    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
280
281    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
282    // first check that we don't crash if we don't have perms set
283    FsPermission defaultStartPerm = CommonFSUtils.getFilePermissions(fs, conf,
284        HConstants.DATA_FILE_UMASK_KEY);
285    // default 'hbase.data.umask'is 000, and this umask will be used when
286    // 'hbase.data.umask.enable' is true.
287    // Therefore we will not get the real fs default in this case.
288    // Instead we will get the starting point FULL_RWX_PERMISSIONS
289    assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
290
291    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
292    // now check that we get the right perms
293    FsPermission filePerm = CommonFSUtils.getFilePermissions(fs, conf,
294        HConstants.DATA_FILE_UMASK_KEY);
295    assertEquals(new FsPermission("700"), filePerm);
296
297    // then that the correct file is created
298    Path p = new Path("target" + File.separator + htu.getRandomUUID().toString());
299    try {
300      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
301      out.close();
302      FileStatus stat = fs.getFileStatus(p);
303      assertEquals(new FsPermission("700"), stat.getPermission());
304      // and then cleanup
305    } finally {
306      fs.delete(p, true);
307    }
308  }
309
310  @Test
311  public void testDeleteAndExists() throws Exception {
312    final Path rootdir = htu.getDataTestDir();
313    final FileSystem fs = rootdir.getFileSystem(conf);
314    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
315    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
316    // then that the correct file is created
317    String file = htu.getRandomUUID().toString();
318    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
319    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
320    try {
321      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
322      out.close();
323      assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
324      // delete the file with recursion as false. Only the file will be deleted.
325      CommonFSUtils.delete(fs, p, false);
326      // Create another file
327      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
328      out1.close();
329      // delete the file with recursion as false. Still the file only will be deleted
330      CommonFSUtils.delete(fs, p1, true);
331      assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1));
332      // and then cleanup
333    } finally {
334      CommonFSUtils.delete(fs, p, true);
335      CommonFSUtils.delete(fs, p1, true);
336    }
337  }
338
339  @Test
340  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
341    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
342    try {
343      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null));
344    } finally {
345      cluster.shutdown();
346    }
347
348  }
349
350  @Test
351  public void testRenameAndSetModifyTime() throws Exception {
352    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
353    assertTrue(CommonFSUtils.isHDFS(conf));
354
355    FileSystem fs = FileSystem.get(conf);
356    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
357
358    String file = htu.getRandomUUID().toString();
359    Path p = new Path(testDir, file);
360
361    FSDataOutputStream out = fs.create(p);
362    out.close();
363    assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
364
365    long expect = System.currentTimeMillis() + 1000;
366    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
367
368    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
369    mockEnv.setValue(expect);
370    EnvironmentEdgeManager.injectEdge(mockEnv);
371    try {
372      String dstFile = htu.getRandomUUID().toString();
373      Path dst = new Path(testDir , dstFile);
374
375      assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
376      assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p));
377      assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst));
378
379      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
380      cluster.shutdown();
381    } finally {
382      EnvironmentEdgeManager.reset();
383    }
384  }
385
386  @Test
387  public void testSetStoragePolicyDefault() throws Exception {
388    verifyNoHDFSApiInvocationForDefaultPolicy();
389    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
390  }
391
392  /**
393   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
394   * logic, will need to remove the check if the default policy is changed
395   */
396  private void verifyNoHDFSApiInvocationForDefaultPolicy() {
397    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
398    // There should be no exception thrown when setting to default storage policy, which indicates
399    // the HDFS API hasn't been called
400    try {
401      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
402        HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
403    } catch (IOException e) {
404      Assert.fail("Should have bypassed the FS API when setting default storage policy");
405    }
406    // There should be exception thrown when given non-default storage policy, which indicates the
407    // HDFS API has been called
408    try {
409      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
410      Assert.fail("Should have invoked the FS API but haven't");
411    } catch (IOException e) {
412      // expected given an invalid path
413    }
414  }
415
416  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
417    @Override
418    public void setStoragePolicy(final Path src, final String policyName)
419            throws IOException {
420      throw new IOException("The setStoragePolicy method is invoked");
421    }
422  }
423
424  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
425  @Test
426  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
427    verifyFileInDirWithStoragePolicy("ALL_SSD");
428  }
429
430  final String INVALID_STORAGE_POLICY = "1772";
431
432  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
433  @Test
434  public void testSetStoragePolicyInvalid() throws Exception {
435    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
436  }
437
438  // Here instead of TestCommonFSUtils because we need a minicluster
439  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
440    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
441
442    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
443    try {
444      assertTrue(CommonFSUtils.isHDFS(conf));
445
446      FileSystem fs = FileSystem.get(conf);
447      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
448      fs.mkdirs(testDir);
449
450      String storagePolicy =
451          conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
452      CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
453
454      String file =htu.getRandomUUID().toString();
455      Path p = new Path(testDir, file);
456      WriteDataToHDFS(fs, p, 4096);
457      HFileSystem hfs = new HFileSystem(fs);
458      String policySet = hfs.getStoragePolicyName(p);
459      LOG.debug("The storage policy of path " + p + " is " + policySet);
460      if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
461              || policy.equals(INVALID_STORAGE_POLICY)) {
462        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
463        LOG.debug("The default hdfs storage policy (indicated by home path: "
464                + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
465        Assert.assertEquals(hdfsDefaultPolicy, policySet);
466      } else {
467        Assert.assertEquals(policy, policySet);
468      }
469      // will assert existance before deleting.
470      cleanupFile(fs, testDir);
471    } finally {
472      cluster.shutdown();
473    }
474  }
475
476  /**
477   * Ugly test that ensures we can get at the hedged read counters in dfsclient.
478   * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
479   * @throws Exception
480   */
481  @Test public void testDFSHedgedReadMetrics() throws Exception {
482    // Enable hedged reads and set it so the threshold is really low.
483    // Most of this test is taken from HDFS, from TestPread.
484    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
485    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
486    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
487    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
488    // Set short retry timeouts so this test runs faster
489    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
490    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
491    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
492    // Get the metrics.  Should be empty.
493    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
494    assertEquals(0, metrics.getHedgedReadOps());
495    FileSystem fileSys = cluster.getFileSystem();
496    try {
497      Path p = new Path("preadtest.dat");
498      // We need > 1 blocks to test out the hedged reads.
499      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
500        blockSize, (short) 3, seed);
501      pReadFile(fileSys, p);
502      cleanupFile(fileSys, p);
503      assertTrue(metrics.getHedgedReadOps() > 0);
504    } finally {
505      fileSys.close();
506      cluster.shutdown();
507    }
508  }
509
510
511  @Test
512  public void testCopyFilesParallel() throws Exception {
513    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
514    cluster.waitActive();
515    FileSystem fs = cluster.getFileSystem();
516    Path src = new Path("/src");
517    fs.mkdirs(src);
518    for (int i = 0; i < 50; i++) {
519      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
520    }
521    Path sub = new Path(src, "sub");
522    fs.mkdirs(sub);
523    for (int i = 0; i < 50; i++) {
524      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
525    }
526    Path dst = new Path("/dst");
527    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
528
529    assertEquals(102, allFiles.size());
530    FileStatus[] list = fs.listStatus(dst);
531    assertEquals(51, list.length);
532    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
533    assertEquals(50, sublist.length);
534  }
535
536  // Below is taken from TestPread over in HDFS.
537  static final int blockSize = 4096;
538  static final long seed = 0xDEADBEEFL;
539
540  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
541    FSDataInputStream stm = fileSys.open(name);
542    byte[] expected = new byte[12 * blockSize];
543    Random rand = new Random(seed);
544    rand.nextBytes(expected);
545    // do a sanity check. Read first 4K bytes
546    byte[] actual = new byte[4096];
547    stm.readFully(actual);
548    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
549    // now do a pread for the first 8K bytes
550    actual = new byte[8192];
551    doPread(stm, 0L, actual, 0, 8192);
552    checkAndEraseData(actual, 0, expected, "Pread Test 1");
553    // Now check to see if the normal read returns 4K-8K byte range
554    actual = new byte[4096];
555    stm.readFully(actual);
556    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
557    // Now see if we can cross a single block boundary successfully
558    // read 4K bytes from blockSize - 2K offset
559    stm.readFully(blockSize - 2048, actual, 0, 4096);
560    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
561    // now see if we can cross two block boundaries successfully
562    // read blockSize + 4K bytes from blockSize - 2K offset
563    actual = new byte[blockSize + 4096];
564    stm.readFully(blockSize - 2048, actual);
565    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
566    // now see if we can cross two block boundaries that are not cached
567    // read blockSize + 4K bytes from 10*blockSize - 2K offset
568    actual = new byte[blockSize + 4096];
569    stm.readFully(10 * blockSize - 2048, actual);
570    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
571    // now check that even after all these preads, we can still read
572    // bytes 8K-12K
573    actual = new byte[4096];
574    stm.readFully(actual);
575    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
576    // done
577    stm.close();
578    // check block location caching
579    stm = fileSys.open(name);
580    stm.readFully(1, actual, 0, 4096);
581    stm.readFully(4*blockSize, actual, 0, 4096);
582    stm.readFully(7*blockSize, actual, 0, 4096);
583    actual = new byte[3*4096];
584    stm.readFully(0*blockSize, actual, 0, 3*4096);
585    checkAndEraseData(actual, 0, expected, "Pread Test 7");
586    actual = new byte[8*4096];
587    stm.readFully(3*blockSize, actual, 0, 8*4096);
588    checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
589    // read the tail
590    stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
591    IOException res = null;
592    try { // read beyond the end of the file
593      stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
594    } catch (IOException e) {
595      // should throw an exception
596      res = e;
597    }
598    assertTrue("Error reading beyond file boundary.", res != null);
599
600    stm.close();
601  }
602
603  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
604    for (int idx = 0; idx < actual.length; idx++) {
605      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
606                        expected[from+idx]+" actual "+actual[idx],
607                        actual[idx], expected[from+idx]);
608      actual[idx] = 0;
609    }
610  }
611
612  private void doPread(FSDataInputStream stm, long position, byte[] buffer,
613      int offset, int length) throws IOException {
614    int nread = 0;
615    // long totalRead = 0;
616    // DFSInputStream dfstm = null;
617
618    /* Disable. This counts do not add up. Some issue in original hdfs tests?
619    if (stm.getWrappedStream() instanceof DFSInputStream) {
620      dfstm = (DFSInputStream) (stm.getWrappedStream());
621      totalRead = dfstm.getReadStatistics().getTotalBytesRead();
622    } */
623
624    while (nread < length) {
625      int nbytes =
626          stm.read(position + nread, buffer, offset + nread, length - nread);
627      assertTrue("Error in pread", nbytes > 0);
628      nread += nbytes;
629    }
630
631    /* Disable. This counts do not add up. Some issue in original hdfs tests?
632    if (dfstm != null) {
633      if (isHedgedRead) {
634        assertTrue("Expected read statistic to be incremented",
635          length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
636      } else {
637        assertEquals("Expected read statistic to be incremented", length, dfstm
638            .getReadStatistics().getTotalBytesRead() - totalRead);
639      }
640    }*/
641  }
642
643  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
644    assertTrue(fileSys.exists(name));
645    assertTrue(fileSys.delete(name, true));
646    assertTrue(!fileSys.exists(name));
647  }
648
649
650  static {
651    try {
652      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
653      LOG.debug("Test thought StreamCapabilities class was present.");
654    } catch (ClassNotFoundException exception) {
655      LOG.debug("Test didn't think StreamCapabilities class was present.");
656    }
657  }
658
659  // Here instead of TestCommonFSUtils because we need a minicluster
660  @Test
661  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
662    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
663    try (FileSystem filesystem = cluster.getFileSystem()) {
664      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
665      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
666      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
667      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
668    } finally {
669      cluster.shutdown();
670    }
671  }
672
673  private void testIsSameHdfs(int nnport) throws IOException {
674    Configuration conf = HBaseConfiguration.create();
675    Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
676    Path desPath = new Path("hdfs://127.0.0.1/");
677    FileSystem srcFs = srcPath.getFileSystem(conf);
678    FileSystem desFs = desPath.getFileSystem(conf);
679
680    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
681
682    desPath = new Path("hdfs://127.0.0.1:8070/");
683    desFs = desPath.getFileSystem(conf);
684    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
685
686    desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
687    desFs = desPath.getFileSystem(conf);
688    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
689
690    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
691    conf.set("dfs.nameservices", "haosong-hadoop");
692    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
693    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
694      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
695
696    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
697    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
698    desPath = new Path("/");
699    desFs = desPath.getFileSystem(conf);
700    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
701
702    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
703    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
704    desPath = new Path("/");
705    desFs = desPath.getFileSystem(conf);
706    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
707  }
708
709  @Test
710  public void testIsSameHdfs() throws IOException {
711    String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
712    LOG.info("hadoop version is: " + hadoopVersion);
713    boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
714    if (isHadoop3_0_0) {
715      // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
716      // See HDFS-9427
717      testIsSameHdfs(9820);
718    } else {
719      // pre hadoop 3.0.0 defaults to port 8020
720      // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
721      testIsSameHdfs(8020);
722    }
723  }
724}