001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.util.Random; 030import java.util.UUID; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.permission.FsPermission; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HDFSBlocksDistribution; 042import org.apache.hadoop.hbase.client.RegionInfoBuilder; 043import org.apache.hadoop.hbase.exceptions.DeserializationException; 044import org.apache.hadoop.hbase.fs.HFileSystem; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.MiscTests; 047import org.apache.hadoop.hdfs.DFSConfigKeys; 048import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 049import org.apache.hadoop.hdfs.DFSTestUtil; 050import org.apache.hadoop.hdfs.DistributedFileSystem; 051import org.apache.hadoop.hdfs.MiniDFSCluster; 052import org.junit.Assert; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * Test {@link FSUtils}. 062 */ 063@Category({MiscTests.class, MediumTests.class}) 064public class TestFSUtils { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestFSUtils.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class); 071 072 private HBaseTestingUtility htu; 073 private FileSystem fs; 074 private Configuration conf; 075 076 @Before 077 public void setUp() throws IOException { 078 htu = new HBaseTestingUtility(); 079 fs = htu.getTestFileSystem(); 080 conf = htu.getConfiguration(); 081 } 082 083 @Test public void testIsHDFS() throws Exception { 084 assertFalse(FSUtils.isHDFS(conf)); 085 MiniDFSCluster cluster = null; 086 try { 087 cluster = htu.startMiniDFSCluster(1); 088 assertTrue(FSUtils.isHDFS(conf)); 089 } finally { 090 if (cluster != null) cluster.shutdown(); 091 } 092 } 093 094 private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) 095 throws Exception { 096 FSDataOutputStream out = fs.create(file); 097 byte [] data = new byte[dataSize]; 098 out.write(data, 0, dataSize); 099 out.close(); 100 } 101 102 @Test public void testcomputeHDFSBlocksDistribution() throws Exception { 103 final int DEFAULT_BLOCK_SIZE = 1024; 104 conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 105 MiniDFSCluster cluster = null; 106 Path testFile = null; 107 108 try { 109 // set up a cluster with 3 nodes 110 String hosts[] = new String[] { "host1", "host2", "host3" }; 111 cluster = htu.startMiniDFSCluster(hosts); 112 cluster.waitActive(); 113 FileSystem fs = cluster.getFileSystem(); 114 115 // create a file with two blocks 116 testFile = new Path("/test1.txt"); 117 WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE); 118 119 // given the default replication factor is 3, the same as the number of 120 // datanodes; the locality index for each host should be 100%, 121 // or getWeight for each host should be the same as getUniqueBlocksWeights 122 final long maxTime = System.currentTimeMillis() + 2000; 123 boolean ok; 124 do { 125 ok = true; 126 FileStatus status = fs.getFileStatus(testFile); 127 HDFSBlocksDistribution blocksDistribution = 128 FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 129 long uniqueBlocksTotalWeight = 130 blocksDistribution.getUniqueBlocksTotalWeight(); 131 for (String host : hosts) { 132 long weight = blocksDistribution.getWeight(host); 133 ok = (ok && uniqueBlocksTotalWeight == weight); 134 } 135 } while (!ok && System.currentTimeMillis() < maxTime); 136 assertTrue(ok); 137 } finally { 138 htu.shutdownMiniDFSCluster(); 139 } 140 141 142 try { 143 // set up a cluster with 4 nodes 144 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 145 cluster = htu.startMiniDFSCluster(hosts); 146 cluster.waitActive(); 147 FileSystem fs = cluster.getFileSystem(); 148 149 // create a file with three blocks 150 testFile = new Path("/test2.txt"); 151 WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE); 152 153 // given the default replication factor is 3, we will have total of 9 154 // replica of blocks; thus the host with the highest weight should have 155 // weight == 3 * DEFAULT_BLOCK_SIZE 156 final long maxTime = System.currentTimeMillis() + 2000; 157 long weight; 158 long uniqueBlocksTotalWeight; 159 do { 160 FileStatus status = fs.getFileStatus(testFile); 161 HDFSBlocksDistribution blocksDistribution = 162 FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 163 uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 164 165 String tophost = blocksDistribution.getTopHosts().get(0); 166 weight = blocksDistribution.getWeight(tophost); 167 168 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 169 } while (uniqueBlocksTotalWeight != weight && System.currentTimeMillis() < maxTime); 170 assertTrue(uniqueBlocksTotalWeight == weight); 171 172 } finally { 173 htu.shutdownMiniDFSCluster(); 174 } 175 176 177 try { 178 // set up a cluster with 4 nodes 179 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 180 cluster = htu.startMiniDFSCluster(hosts); 181 cluster.waitActive(); 182 FileSystem fs = cluster.getFileSystem(); 183 184 // create a file with one block 185 testFile = new Path("/test3.txt"); 186 WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); 187 188 // given the default replication factor is 3, we will have total of 3 189 // replica of blocks; thus there is one host without weight 190 final long maxTime = System.currentTimeMillis() + 2000; 191 HDFSBlocksDistribution blocksDistribution; 192 do { 193 FileStatus status = fs.getFileStatus(testFile); 194 blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 195 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 196 } 197 while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime); 198 assertEquals("Wrong number of hosts distributing blocks.", 3, 199 blocksDistribution.getTopHosts().size()); 200 } finally { 201 htu.shutdownMiniDFSCluster(); 202 } 203 } 204 205 private void writeVersionFile(Path versionFile, String version) throws IOException { 206 if (FSUtils.isExists(fs, versionFile)) { 207 assertTrue(FSUtils.delete(fs, versionFile, true)); 208 } 209 try (FSDataOutputStream s = fs.create(versionFile)) { 210 s.writeUTF(version); 211 } 212 assertTrue(fs.exists(versionFile)); 213 } 214 215 @Test 216 public void testVersion() throws DeserializationException, IOException { 217 final Path rootdir = htu.getDataTestDir(); 218 final FileSystem fs = rootdir.getFileSystem(conf); 219 assertNull(FSUtils.getVersion(fs, rootdir)); 220 // No meta dir so no complaint from checkVersion. 221 // Presumes it a new install. Will create version file. 222 FSUtils.checkVersion(fs, rootdir, true); 223 // Now remove the version file and create a metadir so checkVersion fails. 224 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 225 assertTrue(FSUtils.isExists(fs, versionFile)); 226 assertTrue(FSUtils.delete(fs, versionFile, true)); 227 Path metaRegionDir = 228 FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO); 229 FsPermission defaultPerms = FSUtils.getFilePermissions(fs, this.conf, 230 HConstants.DATA_FILE_UMASK_KEY); 231 FSUtils.create(fs, metaRegionDir, defaultPerms, false); 232 boolean thrown = false; 233 try { 234 FSUtils.checkVersion(fs, rootdir, true); 235 } catch (FileSystemVersionException e) { 236 thrown = true; 237 } 238 assertTrue("Expected FileSystemVersionException", thrown); 239 // Write out a good version file. See if we can read it in and convert. 240 String version = HConstants.FILE_SYSTEM_VERSION; 241 writeVersionFile(versionFile, version); 242 FileStatus [] status = fs.listStatus(versionFile); 243 assertNotNull(status); 244 assertTrue(status.length > 0); 245 String newVersion = FSUtils.getVersion(fs, rootdir); 246 assertEquals(version.length(), newVersion.length()); 247 assertEquals(version, newVersion); 248 // File will have been converted. Exercise the pb format 249 assertEquals(version, FSUtils.getVersion(fs, rootdir)); 250 FSUtils.checkVersion(fs, rootdir, true); 251 // Write an old version file. 252 String oldVersion = "1"; 253 writeVersionFile(versionFile, oldVersion); 254 newVersion = FSUtils.getVersion(fs, rootdir); 255 assertNotEquals(version, newVersion); 256 thrown = false; 257 try { 258 FSUtils.checkVersion(fs, rootdir, true); 259 } catch (FileSystemVersionException e) { 260 thrown = true; 261 } 262 assertTrue("Expected FileSystemVersionException", thrown); 263 } 264 265 @Test 266 public void testPermMask() throws Exception { 267 final Path rootdir = htu.getDataTestDir(); 268 final FileSystem fs = rootdir.getFileSystem(conf); 269 // default fs permission 270 FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf, 271 HConstants.DATA_FILE_UMASK_KEY); 272 // 'hbase.data.umask.enable' is false. We will get default fs permission. 273 assertEquals(FsPermission.getFileDefault(), defaultFsPerm); 274 275 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 276 // first check that we don't crash if we don't have perms set 277 FsPermission defaultStartPerm = FSUtils.getFilePermissions(fs, conf, 278 HConstants.DATA_FILE_UMASK_KEY); 279 // default 'hbase.data.umask'is 000, and this umask will be used when 280 // 'hbase.data.umask.enable' is true. 281 // Therefore we will not get the real fs default in this case. 282 // Instead we will get the starting point FULL_RWX_PERMISSIONS 283 assertEquals(new FsPermission(FSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm); 284 285 conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077"); 286 // now check that we get the right perms 287 FsPermission filePerm = FSUtils.getFilePermissions(fs, conf, 288 HConstants.DATA_FILE_UMASK_KEY); 289 assertEquals(new FsPermission("700"), filePerm); 290 291 // then that the correct file is created 292 Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); 293 try { 294 FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); 295 out.close(); 296 FileStatus stat = fs.getFileStatus(p); 297 assertEquals(new FsPermission("700"), stat.getPermission()); 298 // and then cleanup 299 } finally { 300 fs.delete(p, true); 301 } 302 } 303 304 @Test 305 public void testDeleteAndExists() throws Exception { 306 final Path rootdir = htu.getDataTestDir(); 307 final FileSystem fs = rootdir.getFileSystem(conf); 308 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 309 FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 310 // then that the correct file is created 311 String file = UUID.randomUUID().toString(); 312 Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); 313 Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); 314 try { 315 FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); 316 out.close(); 317 assertTrue("The created file should be present", FSUtils.isExists(fs, p)); 318 // delete the file with recursion as false. Only the file will be deleted. 319 FSUtils.delete(fs, p, false); 320 // Create another file 321 FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); 322 out1.close(); 323 // delete the file with recursion as false. Still the file only will be deleted 324 FSUtils.delete(fs, p1, true); 325 assertFalse("The created file should be present", FSUtils.isExists(fs, p1)); 326 // and then cleanup 327 } finally { 328 FSUtils.delete(fs, p, true); 329 FSUtils.delete(fs, p1, true); 330 } 331 } 332 333 @Test 334 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { 335 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 336 try { 337 assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null)); 338 } finally { 339 cluster.shutdown(); 340 } 341 342 } 343 344 @Test 345 public void testRenameAndSetModifyTime() throws Exception { 346 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 347 assertTrue(FSUtils.isHDFS(conf)); 348 349 FileSystem fs = FileSystem.get(conf); 350 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 351 352 String file = UUID.randomUUID().toString(); 353 Path p = new Path(testDir, file); 354 355 FSDataOutputStream out = fs.create(p); 356 out.close(); 357 assertTrue("The created file should be present", FSUtils.isExists(fs, p)); 358 359 long expect = System.currentTimeMillis() + 1000; 360 assertNotEquals(expect, fs.getFileStatus(p).getModificationTime()); 361 362 ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge(); 363 mockEnv.setValue(expect); 364 EnvironmentEdgeManager.injectEdge(mockEnv); 365 try { 366 String dstFile = UUID.randomUUID().toString(); 367 Path dst = new Path(testDir , dstFile); 368 369 assertTrue(FSUtils.renameAndSetModifyTime(fs, p, dst)); 370 assertFalse("The moved file should not be present", FSUtils.isExists(fs, p)); 371 assertTrue("The dst file should be present", FSUtils.isExists(fs, dst)); 372 373 assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); 374 cluster.shutdown(); 375 } finally { 376 EnvironmentEdgeManager.reset(); 377 } 378 } 379 380 @Test 381 public void testSetStoragePolicyDefault() throws Exception { 382 verifyNoHDFSApiInvocationForDefaultPolicy(); 383 verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); 384 } 385 386 /** 387 * Note: currently the default policy is set to defer to HDFS and this case is to verify the 388 * logic, will need to remove the check if the default policy is changed 389 */ 390 private void verifyNoHDFSApiInvocationForDefaultPolicy() { 391 FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); 392 // There should be no exception thrown when setting to default storage policy, which indicates 393 // the HDFS API hasn't been called 394 try { 395 FSUtils.setStoragePolicy(testFs, new Path("non-exist"), HConstants.DEFAULT_WAL_STORAGE_POLICY, 396 true); 397 } catch (IOException e) { 398 Assert.fail("Should have bypassed the FS API when setting default storage policy"); 399 } 400 // There should be exception thrown when given non-default storage policy, which indicates the 401 // HDFS API has been called 402 try { 403 FSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); 404 Assert.fail("Should have invoked the FS API but haven't"); 405 } catch (IOException e) { 406 // expected given an invalid path 407 } 408 } 409 410 class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { 411 @Override 412 public void setStoragePolicy(final Path src, final String policyName) 413 throws IOException { 414 throw new IOException("The setStoragePolicy method is invoked"); 415 } 416 } 417 418 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ 419 @Test 420 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { 421 verifyFileInDirWithStoragePolicy("ALL_SSD"); 422 } 423 424 final String INVALID_STORAGE_POLICY = "1772"; 425 426 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ 427 @Test 428 public void testSetStoragePolicyInvalid() throws Exception { 429 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); 430 } 431 432 // Here instead of TestCommonFSUtils because we need a minicluster 433 private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { 434 conf.set(HConstants.WAL_STORAGE_POLICY, policy); 435 436 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 437 try { 438 assertTrue(FSUtils.isHDFS(conf)); 439 440 FileSystem fs = FileSystem.get(conf); 441 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 442 fs.mkdirs(testDir); 443 444 String storagePolicy = 445 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 446 FSUtils.setStoragePolicy(fs, testDir, storagePolicy); 447 448 String file = UUID.randomUUID().toString(); 449 Path p = new Path(testDir, file); 450 WriteDataToHDFS(fs, p, 4096); 451 HFileSystem hfs = new HFileSystem(fs); 452 String policySet = hfs.getStoragePolicyName(p); 453 LOG.debug("The storage policy of path " + p + " is " + policySet); 454 if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY) 455 || policy.equals(INVALID_STORAGE_POLICY)) { 456 String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory()); 457 LOG.debug("The default hdfs storage policy (indicated by home path: " 458 + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy); 459 Assert.assertEquals(hdfsDefaultPolicy, policySet); 460 } else { 461 Assert.assertEquals(policy, policySet); 462 } 463 // will assert existance before deleting. 464 cleanupFile(fs, testDir); 465 } finally { 466 cluster.shutdown(); 467 } 468 } 469 470 /** 471 * Ugly test that ensures we can get at the hedged read counters in dfsclient. 472 * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. 473 * @throws Exception 474 */ 475 @Test public void testDFSHedgedReadMetrics() throws Exception { 476 // Enable hedged reads and set it so the threshold is really low. 477 // Most of this test is taken from HDFS, from TestPread. 478 conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); 479 conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); 480 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); 481 conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); 482 // Set short retry timeouts so this test runs faster 483 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); 484 conf.setBoolean("dfs.datanode.transferTo.allowed", false); 485 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); 486 // Get the metrics. Should be empty. 487 DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); 488 assertEquals(0, metrics.getHedgedReadOps()); 489 FileSystem fileSys = cluster.getFileSystem(); 490 try { 491 Path p = new Path("preadtest.dat"); 492 // We need > 1 blocks to test out the hedged reads. 493 DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, 494 blockSize, (short) 3, seed); 495 pReadFile(fileSys, p); 496 cleanupFile(fileSys, p); 497 assertTrue(metrics.getHedgedReadOps() > 0); 498 } finally { 499 fileSys.close(); 500 cluster.shutdown(); 501 } 502 } 503 504 // Below is taken from TestPread over in HDFS. 505 static final int blockSize = 4096; 506 static final long seed = 0xDEADBEEFL; 507 508 private void pReadFile(FileSystem fileSys, Path name) throws IOException { 509 FSDataInputStream stm = fileSys.open(name); 510 byte[] expected = new byte[12 * blockSize]; 511 Random rand = new Random(seed); 512 rand.nextBytes(expected); 513 // do a sanity check. Read first 4K bytes 514 byte[] actual = new byte[4096]; 515 stm.readFully(actual); 516 checkAndEraseData(actual, 0, expected, "Read Sanity Test"); 517 // now do a pread for the first 8K bytes 518 actual = new byte[8192]; 519 doPread(stm, 0L, actual, 0, 8192); 520 checkAndEraseData(actual, 0, expected, "Pread Test 1"); 521 // Now check to see if the normal read returns 4K-8K byte range 522 actual = new byte[4096]; 523 stm.readFully(actual); 524 checkAndEraseData(actual, 4096, expected, "Pread Test 2"); 525 // Now see if we can cross a single block boundary successfully 526 // read 4K bytes from blockSize - 2K offset 527 stm.readFully(blockSize - 2048, actual, 0, 4096); 528 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); 529 // now see if we can cross two block boundaries successfully 530 // read blockSize + 4K bytes from blockSize - 2K offset 531 actual = new byte[blockSize + 4096]; 532 stm.readFully(blockSize - 2048, actual); 533 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); 534 // now see if we can cross two block boundaries that are not cached 535 // read blockSize + 4K bytes from 10*blockSize - 2K offset 536 actual = new byte[blockSize + 4096]; 537 stm.readFully(10 * blockSize - 2048, actual); 538 checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); 539 // now check that even after all these preads, we can still read 540 // bytes 8K-12K 541 actual = new byte[4096]; 542 stm.readFully(actual); 543 checkAndEraseData(actual, 8192, expected, "Pread Test 6"); 544 // done 545 stm.close(); 546 // check block location caching 547 stm = fileSys.open(name); 548 stm.readFully(1, actual, 0, 4096); 549 stm.readFully(4*blockSize, actual, 0, 4096); 550 stm.readFully(7*blockSize, actual, 0, 4096); 551 actual = new byte[3*4096]; 552 stm.readFully(0*blockSize, actual, 0, 3*4096); 553 checkAndEraseData(actual, 0, expected, "Pread Test 7"); 554 actual = new byte[8*4096]; 555 stm.readFully(3*blockSize, actual, 0, 8*4096); 556 checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8"); 557 // read the tail 558 stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2); 559 IOException res = null; 560 try { // read beyond the end of the file 561 stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize); 562 } catch (IOException e) { 563 // should throw an exception 564 res = e; 565 } 566 assertTrue("Error reading beyond file boundary.", res != null); 567 568 stm.close(); 569 } 570 571 private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { 572 for (int idx = 0; idx < actual.length; idx++) { 573 assertEquals(message+" byte "+(from+idx)+" differs. expected "+ 574 expected[from+idx]+" actual "+actual[idx], 575 actual[idx], expected[from+idx]); 576 actual[idx] = 0; 577 } 578 } 579 580 private void doPread(FSDataInputStream stm, long position, byte[] buffer, 581 int offset, int length) throws IOException { 582 int nread = 0; 583 // long totalRead = 0; 584 // DFSInputStream dfstm = null; 585 586 /* Disable. This counts do not add up. Some issue in original hdfs tests? 587 if (stm.getWrappedStream() instanceof DFSInputStream) { 588 dfstm = (DFSInputStream) (stm.getWrappedStream()); 589 totalRead = dfstm.getReadStatistics().getTotalBytesRead(); 590 } */ 591 592 while (nread < length) { 593 int nbytes = 594 stm.read(position + nread, buffer, offset + nread, length - nread); 595 assertTrue("Error in pread", nbytes > 0); 596 nread += nbytes; 597 } 598 599 /* Disable. This counts do not add up. Some issue in original hdfs tests? 600 if (dfstm != null) { 601 if (isHedgedRead) { 602 assertTrue("Expected read statistic to be incremented", 603 length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead); 604 } else { 605 assertEquals("Expected read statistic to be incremented", length, dfstm 606 .getReadStatistics().getTotalBytesRead() - totalRead); 607 } 608 }*/ 609 } 610 611 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 612 assertTrue(fileSys.exists(name)); 613 assertTrue(fileSys.delete(name, true)); 614 assertTrue(!fileSys.exists(name)); 615 } 616 617 618 private static final boolean STREAM_CAPABILITIES_IS_PRESENT; 619 static { 620 boolean tmp = false; 621 try { 622 Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 623 tmp = true; 624 LOG.debug("Test thought StreamCapabilities class was present."); 625 } catch (ClassNotFoundException exception) { 626 LOG.debug("Test didn't think StreamCapabilities class was present."); 627 } finally { 628 STREAM_CAPABILITIES_IS_PRESENT = tmp; 629 } 630 } 631 632 // Here instead of TestCommonFSUtils because we need a minicluster 633 @Test 634 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { 635 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 636 try (FileSystem filesystem = cluster.getFileSystem()) { 637 FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); 638 assertTrue(FSUtils.hasCapability(stream, "hsync")); 639 assertTrue(FSUtils.hasCapability(stream, "hflush")); 640 assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " + 641 "StreamCapabilities class is not defined.", 642 STREAM_CAPABILITIES_IS_PRESENT, 643 FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add.")); 644 } finally { 645 cluster.shutdown(); 646 } 647 } 648 649}