001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.util.List;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.permission.FsPermission;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HDFSBlocksDistribution;
042import org.apache.hadoop.hbase.client.RegionInfoBuilder;
043import org.apache.hadoop.hbase.exceptions.DeserializationException;
044import org.apache.hadoop.hbase.fs.HFileSystem;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.MiscTests;
047import org.apache.hadoop.hdfs.DFSConfigKeys;
048import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
049import org.apache.hadoop.hdfs.DFSTestUtil;
050import org.apache.hadoop.hdfs.DistributedFileSystem;
051import org.apache.hadoop.hdfs.MiniDFSCluster;
052import org.junit.Assert;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * Test {@link FSUtils}.
062 */
063@Category({MiscTests.class, MediumTests.class})
064public class TestFSUtils {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestFSUtils.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
071
072  private HBaseTestingUtility htu;
073  private FileSystem fs;
074  private Configuration conf;
075
076  @Before
077  public void setUp() throws IOException {
078    htu = new HBaseTestingUtility();
079    fs = htu.getTestFileSystem();
080    conf = htu.getConfiguration();
081  }
082
083  @Test public void testIsHDFS() throws Exception {
084    assertFalse(FSUtils.isHDFS(conf));
085    MiniDFSCluster cluster = null;
086    try {
087      cluster = htu.startMiniDFSCluster(1);
088      assertTrue(FSUtils.isHDFS(conf));
089    } finally {
090      if (cluster != null) cluster.shutdown();
091    }
092  }
093
094  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
095    throws Exception {
096    FSDataOutputStream out = fs.create(file);
097    byte [] data = new byte[dataSize];
098    out.write(data, 0, dataSize);
099    out.close();
100  }
101
102  @Test public void testcomputeHDFSBlocksDistribution() throws Exception {
103    final int DEFAULT_BLOCK_SIZE = 1024;
104    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
105    MiniDFSCluster cluster = null;
106    Path testFile = null;
107
108    try {
109      // set up a cluster with 3 nodes
110      String hosts[] = new String[] { "host1", "host2", "host3" };
111      cluster = htu.startMiniDFSCluster(hosts);
112      cluster.waitActive();
113      FileSystem fs = cluster.getFileSystem();
114
115      // create a file with two blocks
116      testFile = new Path("/test1.txt");
117      WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE);
118
119      // given the default replication factor is 3, the same as the number of
120      // datanodes; the locality index for each host should be 100%,
121      // or getWeight for each host should be the same as getUniqueBlocksWeights
122      final long maxTime = System.currentTimeMillis() + 2000;
123      boolean ok;
124      do {
125        ok = true;
126        FileStatus status = fs.getFileStatus(testFile);
127        HDFSBlocksDistribution blocksDistribution =
128          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
129        long uniqueBlocksTotalWeight =
130          blocksDistribution.getUniqueBlocksTotalWeight();
131        for (String host : hosts) {
132          long weight = blocksDistribution.getWeight(host);
133          ok = (ok && uniqueBlocksTotalWeight == weight);
134        }
135      } while (!ok && System.currentTimeMillis() < maxTime);
136      assertTrue(ok);
137      } finally {
138      htu.shutdownMiniDFSCluster();
139    }
140
141
142    try {
143      // set up a cluster with 4 nodes
144      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
145      cluster = htu.startMiniDFSCluster(hosts);
146      cluster.waitActive();
147      FileSystem fs = cluster.getFileSystem();
148
149      // create a file with three blocks
150      testFile = new Path("/test2.txt");
151      WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE);
152
153      // given the default replication factor is 3, we will have total of 9
154      // replica of blocks; thus the host with the highest weight should have
155      // weight == 3 * DEFAULT_BLOCK_SIZE
156      final long maxTime = System.currentTimeMillis() + 2000;
157      long weight;
158      long uniqueBlocksTotalWeight;
159      do {
160        FileStatus status = fs.getFileStatus(testFile);
161        HDFSBlocksDistribution blocksDistribution =
162          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
163        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
164
165        String tophost = blocksDistribution.getTopHosts().get(0);
166        weight = blocksDistribution.getWeight(tophost);
167
168        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
169      } while (uniqueBlocksTotalWeight != weight && System.currentTimeMillis() < maxTime);
170      assertTrue(uniqueBlocksTotalWeight == weight);
171
172    } finally {
173      htu.shutdownMiniDFSCluster();
174    }
175
176
177    try {
178      // set up a cluster with 4 nodes
179      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
180      cluster = htu.startMiniDFSCluster(hosts);
181      cluster.waitActive();
182      FileSystem fs = cluster.getFileSystem();
183
184      // create a file with one block
185      testFile = new Path("/test3.txt");
186      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
187
188      // given the default replication factor is 3, we will have total of 3
189      // replica of blocks; thus there is one host without weight
190      final long maxTime = System.currentTimeMillis() + 2000;
191      HDFSBlocksDistribution blocksDistribution;
192      do {
193        FileStatus status = fs.getFileStatus(testFile);
194        blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
195        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
196      }
197      while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime);
198      assertEquals("Wrong number of hosts distributing blocks.", 3,
199        blocksDistribution.getTopHosts().size());
200    } finally {
201      htu.shutdownMiniDFSCluster();
202    }
203  }
204
205  private void writeVersionFile(Path versionFile, String version) throws IOException {
206    if (FSUtils.isExists(fs, versionFile)) {
207      assertTrue(FSUtils.delete(fs, versionFile, true));
208    }
209    try (FSDataOutputStream s = fs.create(versionFile)) {
210      s.writeUTF(version);
211    }
212    assertTrue(fs.exists(versionFile));
213  }
214
215  @Test
216  public void testVersion() throws DeserializationException, IOException {
217    final Path rootdir = htu.getDataTestDir();
218    final FileSystem fs = rootdir.getFileSystem(conf);
219    assertNull(FSUtils.getVersion(fs, rootdir));
220    // No meta dir so no complaint from checkVersion.
221    // Presumes it a new install. Will create version file.
222    FSUtils.checkVersion(fs, rootdir, true);
223    // Now remove the version file and create a metadir so checkVersion fails.
224    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
225    assertTrue(FSUtils.isExists(fs, versionFile));
226    assertTrue(FSUtils.delete(fs, versionFile, true));
227    Path metaRegionDir =
228        FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
229    FsPermission defaultPerms = FSUtils.getFilePermissions(fs, this.conf,
230        HConstants.DATA_FILE_UMASK_KEY);
231    FSUtils.create(fs, metaRegionDir, defaultPerms, false);
232    boolean thrown = false;
233    try {
234      FSUtils.checkVersion(fs, rootdir, true);
235    } catch (FileSystemVersionException e) {
236      thrown = true;
237    }
238    assertTrue("Expected FileSystemVersionException", thrown);
239    // Write out a good version file.  See if we can read it in and convert.
240    String version = HConstants.FILE_SYSTEM_VERSION;
241    writeVersionFile(versionFile, version);
242    FileStatus [] status = fs.listStatus(versionFile);
243    assertNotNull(status);
244    assertTrue(status.length > 0);
245    String newVersion = FSUtils.getVersion(fs, rootdir);
246    assertEquals(version.length(), newVersion.length());
247    assertEquals(version, newVersion);
248    // File will have been converted. Exercise the pb format
249    assertEquals(version, FSUtils.getVersion(fs, rootdir));
250    FSUtils.checkVersion(fs, rootdir, true);
251    // Write an old version file.
252    String oldVersion = "1";
253    writeVersionFile(versionFile, oldVersion);
254    newVersion = FSUtils.getVersion(fs, rootdir);
255    assertNotEquals(version, newVersion);
256    thrown = false;
257    try {
258      FSUtils.checkVersion(fs, rootdir, true);
259    } catch (FileSystemVersionException e) {
260      thrown = true;
261    }
262    assertTrue("Expected FileSystemVersionException", thrown);
263  }
264
265  @Test
266  public void testPermMask() throws Exception {
267    final Path rootdir = htu.getDataTestDir();
268    final FileSystem fs = rootdir.getFileSystem(conf);
269    // default fs permission
270    FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf,
271        HConstants.DATA_FILE_UMASK_KEY);
272    // 'hbase.data.umask.enable' is false. We will get default fs permission.
273    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
274
275    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
276    // first check that we don't crash if we don't have perms set
277    FsPermission defaultStartPerm = FSUtils.getFilePermissions(fs, conf,
278        HConstants.DATA_FILE_UMASK_KEY);
279    // default 'hbase.data.umask'is 000, and this umask will be used when
280    // 'hbase.data.umask.enable' is true.
281    // Therefore we will not get the real fs default in this case.
282    // Instead we will get the starting point FULL_RWX_PERMISSIONS
283    assertEquals(new FsPermission(FSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
284
285    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
286    // now check that we get the right perms
287    FsPermission filePerm = FSUtils.getFilePermissions(fs, conf,
288        HConstants.DATA_FILE_UMASK_KEY);
289    assertEquals(new FsPermission("700"), filePerm);
290
291    // then that the correct file is created
292    Path p = new Path("target" + File.separator + htu.getRandomUUID().toString());
293    try {
294      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
295      out.close();
296      FileStatus stat = fs.getFileStatus(p);
297      assertEquals(new FsPermission("700"), stat.getPermission());
298      // and then cleanup
299    } finally {
300      fs.delete(p, true);
301    }
302  }
303
304  @Test
305  public void testDeleteAndExists() throws Exception {
306    final Path rootdir = htu.getDataTestDir();
307    final FileSystem fs = rootdir.getFileSystem(conf);
308    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
309    FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
310    // then that the correct file is created
311    String file = htu.getRandomUUID().toString();
312    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
313    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
314    try {
315      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
316      out.close();
317      assertTrue("The created file should be present", FSUtils.isExists(fs, p));
318      // delete the file with recursion as false. Only the file will be deleted.
319      FSUtils.delete(fs, p, false);
320      // Create another file
321      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
322      out1.close();
323      // delete the file with recursion as false. Still the file only will be deleted
324      FSUtils.delete(fs, p1, true);
325      assertFalse("The created file should be present", FSUtils.isExists(fs, p1));
326      // and then cleanup
327    } finally {
328      FSUtils.delete(fs, p, true);
329      FSUtils.delete(fs, p1, true);
330    }
331  }
332
333  @Test
334  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
335    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
336    try {
337      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null));
338    } finally {
339      cluster.shutdown();
340    }
341
342  }
343
344  @Test
345  public void testRenameAndSetModifyTime() throws Exception {
346    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
347    assertTrue(FSUtils.isHDFS(conf));
348
349    FileSystem fs = FileSystem.get(conf);
350    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
351
352    String file = htu.getRandomUUID().toString();
353    Path p = new Path(testDir, file);
354
355    FSDataOutputStream out = fs.create(p);
356    out.close();
357    assertTrue("The created file should be present", FSUtils.isExists(fs, p));
358
359    long expect = System.currentTimeMillis() + 1000;
360    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
361
362    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
363    mockEnv.setValue(expect);
364    EnvironmentEdgeManager.injectEdge(mockEnv);
365    try {
366      String dstFile = htu.getRandomUUID().toString();
367      Path dst = new Path(testDir , dstFile);
368
369      assertTrue(FSUtils.renameAndSetModifyTime(fs, p, dst));
370      assertFalse("The moved file should not be present", FSUtils.isExists(fs, p));
371      assertTrue("The dst file should be present", FSUtils.isExists(fs, dst));
372
373      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
374      cluster.shutdown();
375    } finally {
376      EnvironmentEdgeManager.reset();
377    }
378  }
379
380  @Test
381  public void testSetStoragePolicyDefault() throws Exception {
382    verifyNoHDFSApiInvocationForDefaultPolicy();
383    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
384  }
385
386  /**
387   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
388   * logic, will need to remove the check if the default policy is changed
389   */
390  private void verifyNoHDFSApiInvocationForDefaultPolicy() {
391    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
392    // There should be no exception thrown when setting to default storage policy, which indicates
393    // the HDFS API hasn't been called
394    try {
395      FSUtils.setStoragePolicy(testFs, new Path("non-exist"), HConstants.DEFAULT_WAL_STORAGE_POLICY,
396        true);
397    } catch (IOException e) {
398      Assert.fail("Should have bypassed the FS API when setting default storage policy");
399    }
400    // There should be exception thrown when given non-default storage policy, which indicates the
401    // HDFS API has been called
402    try {
403      FSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
404      Assert.fail("Should have invoked the FS API but haven't");
405    } catch (IOException e) {
406      // expected given an invalid path
407    }
408  }
409
410  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
411    @Override
412    public void setStoragePolicy(final Path src, final String policyName)
413            throws IOException {
414      throw new IOException("The setStoragePolicy method is invoked");
415    }
416  }
417
418  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
419  @Test
420  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
421    verifyFileInDirWithStoragePolicy("ALL_SSD");
422  }
423
424  final String INVALID_STORAGE_POLICY = "1772";
425
426  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
427  @Test
428  public void testSetStoragePolicyInvalid() throws Exception {
429    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
430  }
431
432  // Here instead of TestCommonFSUtils because we need a minicluster
433  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
434    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
435
436    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
437    try {
438      assertTrue(FSUtils.isHDFS(conf));
439
440      FileSystem fs = FileSystem.get(conf);
441      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
442      fs.mkdirs(testDir);
443
444      String storagePolicy =
445          conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
446      FSUtils.setStoragePolicy(fs, testDir, storagePolicy);
447
448      String file =htu.getRandomUUID().toString();
449      Path p = new Path(testDir, file);
450      WriteDataToHDFS(fs, p, 4096);
451      HFileSystem hfs = new HFileSystem(fs);
452      String policySet = hfs.getStoragePolicyName(p);
453      LOG.debug("The storage policy of path " + p + " is " + policySet);
454      if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
455              || policy.equals(INVALID_STORAGE_POLICY)) {
456        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
457        LOG.debug("The default hdfs storage policy (indicated by home path: "
458                + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
459        Assert.assertEquals(hdfsDefaultPolicy, policySet);
460      } else {
461        Assert.assertEquals(policy, policySet);
462      }
463      // will assert existance before deleting.
464      cleanupFile(fs, testDir);
465    } finally {
466      cluster.shutdown();
467    }
468  }
469
470  /**
471   * Ugly test that ensures we can get at the hedged read counters in dfsclient.
472   * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
473   * @throws Exception
474   */
475  @Test public void testDFSHedgedReadMetrics() throws Exception {
476    // Enable hedged reads and set it so the threshold is really low.
477    // Most of this test is taken from HDFS, from TestPread.
478    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
479    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
480    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
481    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
482    // Set short retry timeouts so this test runs faster
483    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
484    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
485    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
486    // Get the metrics.  Should be empty.
487    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
488    assertEquals(0, metrics.getHedgedReadOps());
489    FileSystem fileSys = cluster.getFileSystem();
490    try {
491      Path p = new Path("preadtest.dat");
492      // We need > 1 blocks to test out the hedged reads.
493      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
494        blockSize, (short) 3, seed);
495      pReadFile(fileSys, p);
496      cleanupFile(fileSys, p);
497      assertTrue(metrics.getHedgedReadOps() > 0);
498    } finally {
499      fileSys.close();
500      cluster.shutdown();
501    }
502  }
503
504
505  @Test
506  public void testCopyFilesParallel() throws Exception {
507    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
508    cluster.waitActive();
509    FileSystem fs = cluster.getFileSystem();
510    Path src = new Path("/src");
511    fs.mkdirs(src);
512    for (int i = 0; i < 50; i++) {
513      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
514    }
515    Path sub = new Path(src, "sub");
516    fs.mkdirs(sub);
517    for (int i = 0; i < 50; i++) {
518      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
519    }
520    Path dst = new Path("/dst");
521    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
522
523    assertEquals(102, allFiles.size());
524    FileStatus[] list = fs.listStatus(dst);
525    assertEquals(51, list.length);
526    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
527    assertEquals(50, sublist.length);
528  }
529
530  // Below is taken from TestPread over in HDFS.
531  static final int blockSize = 4096;
532  static final long seed = 0xDEADBEEFL;
533
534  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
535    FSDataInputStream stm = fileSys.open(name);
536    byte[] expected = new byte[12 * blockSize];
537    Random rand = new Random(seed);
538    rand.nextBytes(expected);
539    // do a sanity check. Read first 4K bytes
540    byte[] actual = new byte[4096];
541    stm.readFully(actual);
542    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
543    // now do a pread for the first 8K bytes
544    actual = new byte[8192];
545    doPread(stm, 0L, actual, 0, 8192);
546    checkAndEraseData(actual, 0, expected, "Pread Test 1");
547    // Now check to see if the normal read returns 4K-8K byte range
548    actual = new byte[4096];
549    stm.readFully(actual);
550    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
551    // Now see if we can cross a single block boundary successfully
552    // read 4K bytes from blockSize - 2K offset
553    stm.readFully(blockSize - 2048, actual, 0, 4096);
554    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
555    // now see if we can cross two block boundaries successfully
556    // read blockSize + 4K bytes from blockSize - 2K offset
557    actual = new byte[blockSize + 4096];
558    stm.readFully(blockSize - 2048, actual);
559    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
560    // now see if we can cross two block boundaries that are not cached
561    // read blockSize + 4K bytes from 10*blockSize - 2K offset
562    actual = new byte[blockSize + 4096];
563    stm.readFully(10 * blockSize - 2048, actual);
564    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
565    // now check that even after all these preads, we can still read
566    // bytes 8K-12K
567    actual = new byte[4096];
568    stm.readFully(actual);
569    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
570    // done
571    stm.close();
572    // check block location caching
573    stm = fileSys.open(name);
574    stm.readFully(1, actual, 0, 4096);
575    stm.readFully(4*blockSize, actual, 0, 4096);
576    stm.readFully(7*blockSize, actual, 0, 4096);
577    actual = new byte[3*4096];
578    stm.readFully(0*blockSize, actual, 0, 3*4096);
579    checkAndEraseData(actual, 0, expected, "Pread Test 7");
580    actual = new byte[8*4096];
581    stm.readFully(3*blockSize, actual, 0, 8*4096);
582    checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
583    // read the tail
584    stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
585    IOException res = null;
586    try { // read beyond the end of the file
587      stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
588    } catch (IOException e) {
589      // should throw an exception
590      res = e;
591    }
592    assertTrue("Error reading beyond file boundary.", res != null);
593
594    stm.close();
595  }
596
597  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
598    for (int idx = 0; idx < actual.length; idx++) {
599      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
600                        expected[from+idx]+" actual "+actual[idx],
601                        actual[idx], expected[from+idx]);
602      actual[idx] = 0;
603    }
604  }
605
606  private void doPread(FSDataInputStream stm, long position, byte[] buffer,
607      int offset, int length) throws IOException {
608    int nread = 0;
609    // long totalRead = 0;
610    // DFSInputStream dfstm = null;
611
612    /* Disable. This counts do not add up. Some issue in original hdfs tests?
613    if (stm.getWrappedStream() instanceof DFSInputStream) {
614      dfstm = (DFSInputStream) (stm.getWrappedStream());
615      totalRead = dfstm.getReadStatistics().getTotalBytesRead();
616    } */
617
618    while (nread < length) {
619      int nbytes =
620          stm.read(position + nread, buffer, offset + nread, length - nread);
621      assertTrue("Error in pread", nbytes > 0);
622      nread += nbytes;
623    }
624
625    /* Disable. This counts do not add up. Some issue in original hdfs tests?
626    if (dfstm != null) {
627      if (isHedgedRead) {
628        assertTrue("Expected read statistic to be incremented",
629          length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
630      } else {
631        assertEquals("Expected read statistic to be incremented", length, dfstm
632            .getReadStatistics().getTotalBytesRead() - totalRead);
633      }
634    }*/
635  }
636
637  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
638    assertTrue(fileSys.exists(name));
639    assertTrue(fileSys.delete(name, true));
640    assertTrue(!fileSys.exists(name));
641  }
642
643
644  private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
645  static {
646    boolean tmp = false;
647    try {
648      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
649      tmp = true;
650      LOG.debug("Test thought StreamCapabilities class was present.");
651    } catch (ClassNotFoundException exception) {
652      LOG.debug("Test didn't think StreamCapabilities class was present.");
653    } finally {
654      STREAM_CAPABILITIES_IS_PRESENT = tmp;
655    }
656  }
657
658  // Here instead of TestCommonFSUtils because we need a minicluster
659  @Test
660  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
661    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
662    try (FileSystem filesystem = cluster.getFileSystem()) {
663      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
664      assertTrue(FSUtils.hasCapability(stream, "hsync"));
665      assertTrue(FSUtils.hasCapability(stream, "hflush"));
666      assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
667          "StreamCapabilities class is not defined.",
668          STREAM_CAPABILITIES_IS_PRESENT,
669          FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
670    } finally {
671      cluster.shutdown();
672    }
673  }
674
675}