001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.util.List; 030import java.util.Random; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.StreamCapabilities; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HDFSBlocksDistribution; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.exceptions.DeserializationException; 047import org.apache.hadoop.hbase.fs.HFileSystem; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.testclassification.MiscTests; 050import org.apache.hadoop.hdfs.DFSConfigKeys; 051import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 052import org.apache.hadoop.hdfs.DFSTestUtil; 053import org.apache.hadoop.hdfs.DistributedFileSystem; 054import org.apache.hadoop.hdfs.MiniDFSCluster; 055import org.junit.Assert; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Test {@link FSUtils}. 065 */ 066@Category({MiscTests.class, MediumTests.class}) 067public class TestFSUtils { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestFSUtils.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class); 074 075 private HBaseTestingUtility htu; 076 private FileSystem fs; 077 private Configuration conf; 078 079 @Before 080 public void setUp() throws IOException { 081 htu = new HBaseTestingUtility(); 082 fs = htu.getTestFileSystem(); 083 conf = htu.getConfiguration(); 084 } 085 086 @Test 087 public void testIsHDFS() throws Exception { 088 assertFalse(CommonFSUtils.isHDFS(conf)); 089 MiniDFSCluster cluster = null; 090 try { 091 cluster = htu.startMiniDFSCluster(1); 092 assertTrue(CommonFSUtils.isHDFS(conf)); 093 } finally { 094 if (cluster != null) { 095 cluster.shutdown(); 096 } 097 } 098 } 099 100 private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) 101 throws Exception { 102 FSDataOutputStream out = fs.create(file); 103 byte [] data = new byte[dataSize]; 104 out.write(data, 0, dataSize); 105 out.close(); 106 } 107 108 @Test public void testcomputeHDFSBlocksDistribution() throws Exception { 109 final int DEFAULT_BLOCK_SIZE = 1024; 110 conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 111 MiniDFSCluster cluster = null; 112 Path testFile = null; 113 114 try { 115 // set up a cluster with 3 nodes 116 String hosts[] = new String[] { "host1", "host2", "host3" }; 117 cluster = htu.startMiniDFSCluster(hosts); 118 cluster.waitActive(); 119 FileSystem fs = cluster.getFileSystem(); 120 121 // create a file with two blocks 122 testFile = new Path("/test1.txt"); 123 WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE); 124 125 // given the default replication factor is 3, the same as the number of 126 // datanodes; the locality index for each host should be 100%, 127 // or getWeight for each host should be the same as getUniqueBlocksWeights 128 final long maxTime = System.currentTimeMillis() + 2000; 129 boolean ok; 130 do { 131 ok = true; 132 FileStatus status = fs.getFileStatus(testFile); 133 HDFSBlocksDistribution blocksDistribution = 134 FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 135 long uniqueBlocksTotalWeight = 136 blocksDistribution.getUniqueBlocksTotalWeight(); 137 for (String host : hosts) { 138 long weight = blocksDistribution.getWeight(host); 139 ok = (ok && uniqueBlocksTotalWeight == weight); 140 } 141 } while (!ok && System.currentTimeMillis() < maxTime); 142 assertTrue(ok); 143 } finally { 144 htu.shutdownMiniDFSCluster(); 145 } 146 147 148 try { 149 // set up a cluster with 4 nodes 150 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 151 cluster = htu.startMiniDFSCluster(hosts); 152 cluster.waitActive(); 153 FileSystem fs = cluster.getFileSystem(); 154 155 // create a file with three blocks 156 testFile = new Path("/test2.txt"); 157 WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE); 158 159 // given the default replication factor is 3, we will have total of 9 160 // replica of blocks; thus the host with the highest weight should have 161 // weight == 3 * DEFAULT_BLOCK_SIZE 162 final long maxTime = System.currentTimeMillis() + 2000; 163 long weight; 164 long uniqueBlocksTotalWeight; 165 do { 166 FileStatus status = fs.getFileStatus(testFile); 167 HDFSBlocksDistribution blocksDistribution = 168 FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 169 uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 170 171 String tophost = blocksDistribution.getTopHosts().get(0); 172 weight = blocksDistribution.getWeight(tophost); 173 174 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 175 } while (uniqueBlocksTotalWeight != weight && System.currentTimeMillis() < maxTime); 176 assertTrue(uniqueBlocksTotalWeight == weight); 177 178 } finally { 179 htu.shutdownMiniDFSCluster(); 180 } 181 182 183 try { 184 // set up a cluster with 4 nodes 185 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 186 cluster = htu.startMiniDFSCluster(hosts); 187 cluster.waitActive(); 188 FileSystem fs = cluster.getFileSystem(); 189 190 // create a file with one block 191 testFile = new Path("/test3.txt"); 192 WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); 193 194 // given the default replication factor is 3, we will have total of 3 195 // replica of blocks; thus there is one host without weight 196 final long maxTime = System.currentTimeMillis() + 2000; 197 HDFSBlocksDistribution blocksDistribution; 198 do { 199 FileStatus status = fs.getFileStatus(testFile); 200 blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 201 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 202 } 203 while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime); 204 assertEquals("Wrong number of hosts distributing blocks.", 3, 205 blocksDistribution.getTopHosts().size()); 206 } finally { 207 htu.shutdownMiniDFSCluster(); 208 } 209 } 210 211 private void writeVersionFile(Path versionFile, String version) throws IOException { 212 if (CommonFSUtils.isExists(fs, versionFile)) { 213 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 214 } 215 try (FSDataOutputStream s = fs.create(versionFile)) { 216 s.writeUTF(version); 217 } 218 assertTrue(fs.exists(versionFile)); 219 } 220 221 @Test 222 public void testVersion() throws DeserializationException, IOException { 223 final Path rootdir = htu.getDataTestDir(); 224 final FileSystem fs = rootdir.getFileSystem(conf); 225 assertNull(FSUtils.getVersion(fs, rootdir)); 226 // No meta dir so no complaint from checkVersion. 227 // Presumes it a new install. Will create version file. 228 FSUtils.checkVersion(fs, rootdir, true); 229 // Now remove the version file and create a metadir so checkVersion fails. 230 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 231 assertTrue(CommonFSUtils.isExists(fs, versionFile)); 232 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 233 Path metaRegionDir = 234 FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO); 235 FsPermission defaultPerms = CommonFSUtils.getFilePermissions(fs, this.conf, 236 HConstants.DATA_FILE_UMASK_KEY); 237 CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false); 238 boolean thrown = false; 239 try { 240 FSUtils.checkVersion(fs, rootdir, true); 241 } catch (FileSystemVersionException e) { 242 thrown = true; 243 } 244 assertTrue("Expected FileSystemVersionException", thrown); 245 // Write out a good version file. See if we can read it in and convert. 246 String version = HConstants.FILE_SYSTEM_VERSION; 247 writeVersionFile(versionFile, version); 248 FileStatus [] status = fs.listStatus(versionFile); 249 assertNotNull(status); 250 assertTrue(status.length > 0); 251 String newVersion = FSUtils.getVersion(fs, rootdir); 252 assertEquals(version.length(), newVersion.length()); 253 assertEquals(version, newVersion); 254 // File will have been converted. Exercise the pb format 255 assertEquals(version, FSUtils.getVersion(fs, rootdir)); 256 FSUtils.checkVersion(fs, rootdir, true); 257 // Write an old version file. 258 String oldVersion = "1"; 259 writeVersionFile(versionFile, oldVersion); 260 newVersion = FSUtils.getVersion(fs, rootdir); 261 assertNotEquals(version, newVersion); 262 thrown = false; 263 try { 264 FSUtils.checkVersion(fs, rootdir, true); 265 } catch (FileSystemVersionException e) { 266 thrown = true; 267 } 268 assertTrue("Expected FileSystemVersionException", thrown); 269 } 270 271 @Test 272 public void testPermMask() throws Exception { 273 final Path rootdir = htu.getDataTestDir(); 274 final FileSystem fs = rootdir.getFileSystem(conf); 275 // default fs permission 276 FsPermission defaultFsPerm = CommonFSUtils.getFilePermissions(fs, conf, 277 HConstants.DATA_FILE_UMASK_KEY); 278 // 'hbase.data.umask.enable' is false. We will get default fs permission. 279 assertEquals(FsPermission.getFileDefault(), defaultFsPerm); 280 281 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 282 // first check that we don't crash if we don't have perms set 283 FsPermission defaultStartPerm = CommonFSUtils.getFilePermissions(fs, conf, 284 HConstants.DATA_FILE_UMASK_KEY); 285 // default 'hbase.data.umask'is 000, and this umask will be used when 286 // 'hbase.data.umask.enable' is true. 287 // Therefore we will not get the real fs default in this case. 288 // Instead we will get the starting point FULL_RWX_PERMISSIONS 289 assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm); 290 291 conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077"); 292 // now check that we get the right perms 293 FsPermission filePerm = CommonFSUtils.getFilePermissions(fs, conf, 294 HConstants.DATA_FILE_UMASK_KEY); 295 assertEquals(new FsPermission("700"), filePerm); 296 297 // then that the correct file is created 298 Path p = new Path("target" + File.separator + htu.getRandomUUID().toString()); 299 try { 300 FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); 301 out.close(); 302 FileStatus stat = fs.getFileStatus(p); 303 assertEquals(new FsPermission("700"), stat.getPermission()); 304 // and then cleanup 305 } finally { 306 fs.delete(p, true); 307 } 308 } 309 310 @Test 311 public void testDeleteAndExists() throws Exception { 312 final Path rootdir = htu.getDataTestDir(); 313 final FileSystem fs = rootdir.getFileSystem(conf); 314 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 315 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 316 // then that the correct file is created 317 String file = htu.getRandomUUID().toString(); 318 Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); 319 Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); 320 try { 321 FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); 322 out.close(); 323 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 324 // delete the file with recursion as false. Only the file will be deleted. 325 CommonFSUtils.delete(fs, p, false); 326 // Create another file 327 FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); 328 out1.close(); 329 // delete the file with recursion as false. Still the file only will be deleted 330 CommonFSUtils.delete(fs, p1, true); 331 assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1)); 332 // and then cleanup 333 } finally { 334 CommonFSUtils.delete(fs, p, true); 335 CommonFSUtils.delete(fs, p1, true); 336 } 337 } 338 339 @Test 340 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { 341 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 342 try { 343 assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null)); 344 } finally { 345 cluster.shutdown(); 346 } 347 348 } 349 350 @Test 351 public void testRenameAndSetModifyTime() throws Exception { 352 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 353 assertTrue(CommonFSUtils.isHDFS(conf)); 354 355 FileSystem fs = FileSystem.get(conf); 356 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 357 358 String file = htu.getRandomUUID().toString(); 359 Path p = new Path(testDir, file); 360 361 FSDataOutputStream out = fs.create(p); 362 out.close(); 363 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 364 365 long expect = System.currentTimeMillis() + 1000; 366 assertNotEquals(expect, fs.getFileStatus(p).getModificationTime()); 367 368 ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge(); 369 mockEnv.setValue(expect); 370 EnvironmentEdgeManager.injectEdge(mockEnv); 371 try { 372 String dstFile = htu.getRandomUUID().toString(); 373 Path dst = new Path(testDir , dstFile); 374 375 assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst)); 376 assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p)); 377 assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst)); 378 379 assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); 380 cluster.shutdown(); 381 } finally { 382 EnvironmentEdgeManager.reset(); 383 } 384 } 385 386 @Test 387 public void testSetStoragePolicyDefault() throws Exception { 388 verifyNoHDFSApiInvocationForDefaultPolicy(); 389 verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); 390 } 391 392 /** 393 * Note: currently the default policy is set to defer to HDFS and this case is to verify the 394 * logic, will need to remove the check if the default policy is changed 395 */ 396 private void verifyNoHDFSApiInvocationForDefaultPolicy() { 397 FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); 398 // There should be no exception thrown when setting to default storage policy, which indicates 399 // the HDFS API hasn't been called 400 try { 401 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), 402 HConstants.DEFAULT_WAL_STORAGE_POLICY, true); 403 } catch (IOException e) { 404 Assert.fail("Should have bypassed the FS API when setting default storage policy"); 405 } 406 // There should be exception thrown when given non-default storage policy, which indicates the 407 // HDFS API has been called 408 try { 409 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); 410 Assert.fail("Should have invoked the FS API but haven't"); 411 } catch (IOException e) { 412 // expected given an invalid path 413 } 414 } 415 416 class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { 417 @Override 418 public void setStoragePolicy(final Path src, final String policyName) 419 throws IOException { 420 throw new IOException("The setStoragePolicy method is invoked"); 421 } 422 } 423 424 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ 425 @Test 426 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { 427 verifyFileInDirWithStoragePolicy("ALL_SSD"); 428 } 429 430 final String INVALID_STORAGE_POLICY = "1772"; 431 432 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ 433 @Test 434 public void testSetStoragePolicyInvalid() throws Exception { 435 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); 436 } 437 438 // Here instead of TestCommonFSUtils because we need a minicluster 439 private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { 440 conf.set(HConstants.WAL_STORAGE_POLICY, policy); 441 442 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 443 try { 444 assertTrue(CommonFSUtils.isHDFS(conf)); 445 446 FileSystem fs = FileSystem.get(conf); 447 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 448 fs.mkdirs(testDir); 449 450 String storagePolicy = 451 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 452 CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy); 453 454 String file =htu.getRandomUUID().toString(); 455 Path p = new Path(testDir, file); 456 WriteDataToHDFS(fs, p, 4096); 457 HFileSystem hfs = new HFileSystem(fs); 458 String policySet = hfs.getStoragePolicyName(p); 459 LOG.debug("The storage policy of path " + p + " is " + policySet); 460 if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY) 461 || policy.equals(INVALID_STORAGE_POLICY)) { 462 String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory()); 463 LOG.debug("The default hdfs storage policy (indicated by home path: " 464 + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy); 465 Assert.assertEquals(hdfsDefaultPolicy, policySet); 466 } else { 467 Assert.assertEquals(policy, policySet); 468 } 469 // will assert existance before deleting. 470 cleanupFile(fs, testDir); 471 } finally { 472 cluster.shutdown(); 473 } 474 } 475 476 /** 477 * Ugly test that ensures we can get at the hedged read counters in dfsclient. 478 * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. 479 * @throws Exception 480 */ 481 @Test public void testDFSHedgedReadMetrics() throws Exception { 482 // Enable hedged reads and set it so the threshold is really low. 483 // Most of this test is taken from HDFS, from TestPread. 484 conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); 485 conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); 486 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); 487 conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); 488 // Set short retry timeouts so this test runs faster 489 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); 490 conf.setBoolean("dfs.datanode.transferTo.allowed", false); 491 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); 492 // Get the metrics. Should be empty. 493 DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); 494 assertEquals(0, metrics.getHedgedReadOps()); 495 FileSystem fileSys = cluster.getFileSystem(); 496 try { 497 Path p = new Path("preadtest.dat"); 498 // We need > 1 blocks to test out the hedged reads. 499 DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, 500 blockSize, (short) 3, seed); 501 pReadFile(fileSys, p); 502 cleanupFile(fileSys, p); 503 assertTrue(metrics.getHedgedReadOps() > 0); 504 } finally { 505 fileSys.close(); 506 cluster.shutdown(); 507 } 508 } 509 510 511 @Test 512 public void testCopyFilesParallel() throws Exception { 513 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 514 cluster.waitActive(); 515 FileSystem fs = cluster.getFileSystem(); 516 Path src = new Path("/src"); 517 fs.mkdirs(src); 518 for (int i = 0; i < 50; i++) { 519 WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024); 520 } 521 Path sub = new Path(src, "sub"); 522 fs.mkdirs(sub); 523 for (int i = 0; i < 50; i++) { 524 WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024); 525 } 526 Path dst = new Path("/dst"); 527 List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4); 528 529 assertEquals(102, allFiles.size()); 530 FileStatus[] list = fs.listStatus(dst); 531 assertEquals(51, list.length); 532 FileStatus[] sublist = fs.listStatus(new Path(dst, "sub")); 533 assertEquals(50, sublist.length); 534 } 535 536 // Below is taken from TestPread over in HDFS. 537 static final int blockSize = 4096; 538 static final long seed = 0xDEADBEEFL; 539 540 private void pReadFile(FileSystem fileSys, Path name) throws IOException { 541 FSDataInputStream stm = fileSys.open(name); 542 byte[] expected = new byte[12 * blockSize]; 543 Random rand = new Random(seed); 544 rand.nextBytes(expected); 545 // do a sanity check. Read first 4K bytes 546 byte[] actual = new byte[4096]; 547 stm.readFully(actual); 548 checkAndEraseData(actual, 0, expected, "Read Sanity Test"); 549 // now do a pread for the first 8K bytes 550 actual = new byte[8192]; 551 doPread(stm, 0L, actual, 0, 8192); 552 checkAndEraseData(actual, 0, expected, "Pread Test 1"); 553 // Now check to see if the normal read returns 4K-8K byte range 554 actual = new byte[4096]; 555 stm.readFully(actual); 556 checkAndEraseData(actual, 4096, expected, "Pread Test 2"); 557 // Now see if we can cross a single block boundary successfully 558 // read 4K bytes from blockSize - 2K offset 559 stm.readFully(blockSize - 2048, actual, 0, 4096); 560 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); 561 // now see if we can cross two block boundaries successfully 562 // read blockSize + 4K bytes from blockSize - 2K offset 563 actual = new byte[blockSize + 4096]; 564 stm.readFully(blockSize - 2048, actual); 565 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); 566 // now see if we can cross two block boundaries that are not cached 567 // read blockSize + 4K bytes from 10*blockSize - 2K offset 568 actual = new byte[blockSize + 4096]; 569 stm.readFully(10 * blockSize - 2048, actual); 570 checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); 571 // now check that even after all these preads, we can still read 572 // bytes 8K-12K 573 actual = new byte[4096]; 574 stm.readFully(actual); 575 checkAndEraseData(actual, 8192, expected, "Pread Test 6"); 576 // done 577 stm.close(); 578 // check block location caching 579 stm = fileSys.open(name); 580 stm.readFully(1, actual, 0, 4096); 581 stm.readFully(4*blockSize, actual, 0, 4096); 582 stm.readFully(7*blockSize, actual, 0, 4096); 583 actual = new byte[3*4096]; 584 stm.readFully(0*blockSize, actual, 0, 3*4096); 585 checkAndEraseData(actual, 0, expected, "Pread Test 7"); 586 actual = new byte[8*4096]; 587 stm.readFully(3*blockSize, actual, 0, 8*4096); 588 checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8"); 589 // read the tail 590 stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2); 591 IOException res = null; 592 try { // read beyond the end of the file 593 stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize); 594 } catch (IOException e) { 595 // should throw an exception 596 res = e; 597 } 598 assertTrue("Error reading beyond file boundary.", res != null); 599 600 stm.close(); 601 } 602 603 private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { 604 for (int idx = 0; idx < actual.length; idx++) { 605 assertEquals(message+" byte "+(from+idx)+" differs. expected "+ 606 expected[from+idx]+" actual "+actual[idx], 607 actual[idx], expected[from+idx]); 608 actual[idx] = 0; 609 } 610 } 611 612 private void doPread(FSDataInputStream stm, long position, byte[] buffer, 613 int offset, int length) throws IOException { 614 int nread = 0; 615 // long totalRead = 0; 616 // DFSInputStream dfstm = null; 617 618 /* Disable. This counts do not add up. Some issue in original hdfs tests? 619 if (stm.getWrappedStream() instanceof DFSInputStream) { 620 dfstm = (DFSInputStream) (stm.getWrappedStream()); 621 totalRead = dfstm.getReadStatistics().getTotalBytesRead(); 622 } */ 623 624 while (nread < length) { 625 int nbytes = 626 stm.read(position + nread, buffer, offset + nread, length - nread); 627 assertTrue("Error in pread", nbytes > 0); 628 nread += nbytes; 629 } 630 631 /* Disable. This counts do not add up. Some issue in original hdfs tests? 632 if (dfstm != null) { 633 if (isHedgedRead) { 634 assertTrue("Expected read statistic to be incremented", 635 length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead); 636 } else { 637 assertEquals("Expected read statistic to be incremented", length, dfstm 638 .getReadStatistics().getTotalBytesRead() - totalRead); 639 } 640 }*/ 641 } 642 643 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 644 assertTrue(fileSys.exists(name)); 645 assertTrue(fileSys.delete(name, true)); 646 assertTrue(!fileSys.exists(name)); 647 } 648 649 650 static { 651 try { 652 Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 653 LOG.debug("Test thought StreamCapabilities class was present."); 654 } catch (ClassNotFoundException exception) { 655 LOG.debug("Test didn't think StreamCapabilities class was present."); 656 } 657 } 658 659 // Here instead of TestCommonFSUtils because we need a minicluster 660 @Test 661 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { 662 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 663 try (FileSystem filesystem = cluster.getFileSystem()) { 664 FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); 665 assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); 666 assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); 667 assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add.")); 668 } finally { 669 cluster.shutdown(); 670 } 671 } 672 673 private void testIsSameHdfs(int nnport) throws IOException { 674 Configuration conf = HBaseConfiguration.create(); 675 Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); 676 Path desPath = new Path("hdfs://127.0.0.1/"); 677 FileSystem srcFs = srcPath.getFileSystem(conf); 678 FileSystem desFs = desPath.getFileSystem(conf); 679 680 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 681 682 desPath = new Path("hdfs://127.0.0.1:8070/"); 683 desFs = desPath.getFileSystem(conf); 684 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 685 686 desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); 687 desFs = desPath.getFileSystem(conf); 688 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 689 690 conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); 691 conf.set("dfs.nameservices", "haosong-hadoop"); 692 conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); 693 conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", 694 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 695 696 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport); 697 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); 698 desPath = new Path("/"); 699 desFs = desPath.getFileSystem(conf); 700 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 701 702 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport); 703 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); 704 desPath = new Path("/"); 705 desFs = desPath.getFileSystem(conf); 706 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 707 } 708 709 @Test 710 public void testIsSameHdfs() throws IOException { 711 String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); 712 LOG.info("hadoop version is: " + hadoopVersion); 713 boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); 714 if (isHadoop3_0_0) { 715 // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. 716 // See HDFS-9427 717 testIsSameHdfs(9820); 718 } else { 719 // pre hadoop 3.0.0 defaults to port 8020 720 // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 721 testIsSameHdfs(8020); 722 } 723 } 724}