001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotEquals; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027 028import java.io.File; 029import java.io.IOException; 030import java.net.URI; 031import java.net.URISyntaxException; 032import java.util.List; 033import java.util.Random; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.LocalFileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.StreamCapabilities; 042import org.apache.hadoop.fs.permission.FsPermission; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HDFSBlocksDistribution; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.exceptions.DeserializationException; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.MiscTests; 052import org.apache.hadoop.hdfs.DFSConfigKeys; 053import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 054import org.apache.hadoop.hdfs.DFSTestUtil; 055import org.apache.hadoop.hdfs.DistributedFileSystem; 056import org.apache.hadoop.hdfs.MiniDFSCluster; 057import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 058import org.junit.jupiter.api.BeforeEach; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Test {@link FSUtils}. 066 */ 067@Tag(MiscTests.TAG) 068@Tag(MediumTests.TAG) 069public class TestFSUtils { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class); 072 073 private HBaseTestingUtil htu; 074 private FileSystem fs; 075 private Configuration conf; 076 077 @BeforeEach 078 public void setUp() throws IOException { 079 htu = new HBaseTestingUtil(); 080 fs = htu.getTestFileSystem(); 081 conf = htu.getConfiguration(); 082 } 083 084 @Test 085 public void testIsHDFS() throws Exception { 086 assertFalse(CommonFSUtils.isHDFS(conf)); 087 MiniDFSCluster cluster = null; 088 try { 089 cluster = htu.startMiniDFSCluster(1); 090 assertTrue(CommonFSUtils.isHDFS(conf)); 091 assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem())); 092 FSUtils.checkDfsSafeMode(conf); 093 } finally { 094 if (cluster != null) { 095 cluster.shutdown(); 096 } 097 } 098 } 099 100 @Test 101 public void testLocalFileSystemSafeMode() throws Exception { 102 conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class); 103 assertFalse(CommonFSUtils.isHDFS(conf)); 104 assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf))); 105 FSUtils.checkDfsSafeMode(conf); 106 } 107 108 private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception { 109 FSDataOutputStream out = fs.create(file); 110 byte[] data = new byte[dataSize]; 111 out.write(data, 0, dataSize); 112 out.close(); 113 } 114 115 @Test 116 public void testComputeHDFSBlocksDistributionByInputStream() throws Exception { 117 testComputeHDFSBlocksDistribution((fs, testFile) -> { 118 try (FSDataInputStream open = fs.open(testFile)) { 119 assertTrue(open instanceof HdfsDataInputStream); 120 return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open); 121 } 122 }); 123 } 124 125 @Test 126 public void testComputeHDFSBlockDistribution() throws Exception { 127 testComputeHDFSBlocksDistribution((fs, testFile) -> { 128 FileStatus status = fs.getFileStatus(testFile); 129 return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 130 }); 131 } 132 133 @FunctionalInterface 134 interface HDFSBlockDistributionFunction { 135 HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException; 136 } 137 138 private void testComputeHDFSBlocksDistribution( 139 HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception { 140 final int DEFAULT_BLOCK_SIZE = 1024; 141 conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 142 MiniDFSCluster cluster = null; 143 Path testFile = null; 144 145 try { 146 // set up a cluster with 3 nodes 147 String hosts[] = new String[] { "host1", "host2", "host3" }; 148 cluster = htu.startMiniDFSCluster(hosts); 149 cluster.waitActive(); 150 FileSystem fs = cluster.getFileSystem(); 151 152 // create a file with two blocks 153 testFile = new Path("/test1.txt"); 154 WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE); 155 156 // given the default replication factor is 3, the same as the number of 157 // datanodes; the locality index for each host should be 100%, 158 // or getWeight for each host should be the same as getUniqueBlocksWeights 159 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 160 boolean ok; 161 do { 162 ok = true; 163 164 HDFSBlocksDistribution blocksDistribution = 165 fileToBlockDistribution.getForPath(fs, testFile); 166 167 long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 168 for (String host : hosts) { 169 long weight = blocksDistribution.getWeight(host); 170 ok = (ok && uniqueBlocksTotalWeight == weight); 171 } 172 } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime); 173 assertTrue(ok); 174 } finally { 175 htu.shutdownMiniDFSCluster(); 176 } 177 178 try { 179 // set up a cluster with 4 nodes 180 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 181 cluster = htu.startMiniDFSCluster(hosts); 182 cluster.waitActive(); 183 FileSystem fs = cluster.getFileSystem(); 184 185 // create a file with three blocks 186 testFile = new Path("/test2.txt"); 187 WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE); 188 189 // given the default replication factor is 3, we will have total of 9 190 // replica of blocks; thus the host with the highest weight should have 191 // weight == 3 * DEFAULT_BLOCK_SIZE 192 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 193 long weight; 194 long uniqueBlocksTotalWeight; 195 do { 196 HDFSBlocksDistribution blocksDistribution = 197 fileToBlockDistribution.getForPath(fs, testFile); 198 uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 199 200 String tophost = blocksDistribution.getTopHosts().get(0); 201 weight = blocksDistribution.getWeight(tophost); 202 203 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 204 } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime); 205 assertTrue(uniqueBlocksTotalWeight == weight); 206 207 } finally { 208 htu.shutdownMiniDFSCluster(); 209 } 210 211 try { 212 // set up a cluster with 4 nodes 213 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 214 cluster = htu.startMiniDFSCluster(hosts); 215 cluster.waitActive(); 216 FileSystem fs = cluster.getFileSystem(); 217 218 // create a file with one block 219 testFile = new Path("/test3.txt"); 220 WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); 221 222 // given the default replication factor is 3, we will have total of 3 223 // replica of blocks; thus there is one host without weight 224 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 225 HDFSBlocksDistribution blocksDistribution; 226 do { 227 blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); 228 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 229 } while ( 230 blocksDistribution.getTopHosts().size() != 3 231 && EnvironmentEdgeManager.currentTime() < maxTime 232 ); 233 assertEquals(3, blocksDistribution.getTopHosts().size(), 234 "Wrong number of hosts distributing blocks."); 235 } finally { 236 htu.shutdownMiniDFSCluster(); 237 } 238 } 239 240 private void writeVersionFile(Path versionFile, String version) throws IOException { 241 if (CommonFSUtils.isExists(fs, versionFile)) { 242 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 243 } 244 try (FSDataOutputStream s = fs.create(versionFile)) { 245 s.writeUTF(version); 246 } 247 assertTrue(fs.exists(versionFile)); 248 } 249 250 @Test 251 public void testVersion() throws DeserializationException, IOException { 252 final Path rootdir = htu.getDataTestDir(); 253 final FileSystem fs = rootdir.getFileSystem(conf); 254 assertNull(FSUtils.getVersion(fs, rootdir)); 255 // No meta dir so no complaint from checkVersion. 256 // Presumes it a new install. Will create version file. 257 FSUtils.checkVersion(fs, rootdir, true); 258 // Now remove the version file and create a metadir so checkVersion fails. 259 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 260 assertTrue(CommonFSUtils.isExists(fs, versionFile)); 261 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 262 Path metaRegionDir = 263 FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO); 264 FsPermission defaultPerms = 265 CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY); 266 CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false); 267 boolean thrown = false; 268 try { 269 FSUtils.checkVersion(fs, rootdir, true); 270 } catch (FileSystemVersionException e) { 271 thrown = true; 272 } 273 assertTrue(thrown, "Expected FileSystemVersionException"); 274 // Write out a good version file. See if we can read it in and convert. 275 String version = HConstants.FILE_SYSTEM_VERSION; 276 writeVersionFile(versionFile, version); 277 FileStatus[] status = fs.listStatus(versionFile); 278 assertNotNull(status); 279 assertTrue(status.length > 0); 280 String newVersion = FSUtils.getVersion(fs, rootdir); 281 assertEquals(version.length(), newVersion.length()); 282 assertEquals(version, newVersion); 283 // File will have been converted. Exercise the pb format 284 assertEquals(version, FSUtils.getVersion(fs, rootdir)); 285 FSUtils.checkVersion(fs, rootdir, true); 286 // Write an old version file. 287 String oldVersion = "1"; 288 writeVersionFile(versionFile, oldVersion); 289 newVersion = FSUtils.getVersion(fs, rootdir); 290 assertNotEquals(version, newVersion); 291 thrown = false; 292 try { 293 FSUtils.checkVersion(fs, rootdir, true); 294 } catch (FileSystemVersionException e) { 295 thrown = true; 296 } 297 assertTrue(thrown, "Expected FileSystemVersionException"); 298 } 299 300 @Test 301 public void testPermMask() throws Exception { 302 final Path rootdir = htu.getDataTestDir(); 303 final FileSystem fs = rootdir.getFileSystem(conf); 304 // default fs permission 305 FsPermission defaultFsPerm = 306 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 307 // 'hbase.data.umask.enable' is false. We will get default fs permission. 308 assertEquals(FsPermission.getFileDefault(), defaultFsPerm); 309 310 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 311 // first check that we don't crash if we don't have perms set 312 FsPermission defaultStartPerm = 313 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 314 // default 'hbase.data.umask'is 000, and this umask will be used when 315 // 'hbase.data.umask.enable' is true. 316 // Therefore we will not get the real fs default in this case. 317 // Instead we will get the starting point FULL_RWX_PERMISSIONS 318 assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm); 319 320 conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077"); 321 // now check that we get the right perms 322 FsPermission filePerm = 323 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 324 assertEquals(new FsPermission("700"), filePerm); 325 326 // then that the correct file is created 327 Path p = new Path("target" + File.separator + HBaseTestingUtil.getRandomUUID().toString()); 328 try { 329 FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); 330 out.close(); 331 FileStatus stat = fs.getFileStatus(p); 332 assertEquals(new FsPermission("700"), stat.getPermission()); 333 // and then cleanup 334 } finally { 335 fs.delete(p, true); 336 } 337 } 338 339 @Test 340 public void testDeleteAndExists() throws Exception { 341 final Path rootdir = htu.getDataTestDir(); 342 final FileSystem fs = rootdir.getFileSystem(conf); 343 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 344 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 345 // then that the correct file is created 346 String file = HBaseTestingUtil.getRandomUUID().toString(); 347 Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); 348 Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); 349 try { 350 FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); 351 out.close(); 352 assertTrue(CommonFSUtils.isExists(fs, p), "The created file should be present"); 353 // delete the file with recursion as false. Only the file will be deleted. 354 CommonFSUtils.delete(fs, p, false); 355 // Create another file 356 FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); 357 out1.close(); 358 // delete the file with recursion as false. Still the file only will be deleted 359 CommonFSUtils.delete(fs, p1, true); 360 assertFalse(CommonFSUtils.isExists(fs, p1), "The created file should be present"); 361 // and then cleanup 362 } finally { 363 CommonFSUtils.delete(fs, p, true); 364 CommonFSUtils.delete(fs, p1, true); 365 } 366 } 367 368 @Test 369 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { 370 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 371 try { 372 assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), 373 new Path("definitely/doesn't/exist"), null)); 374 } finally { 375 cluster.shutdown(); 376 } 377 378 } 379 380 @Test 381 public void testRenameAndSetModifyTime() throws Exception { 382 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 383 assertTrue(CommonFSUtils.isHDFS(conf)); 384 385 FileSystem fs = FileSystem.get(conf); 386 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 387 388 String file = HBaseTestingUtil.getRandomUUID().toString(); 389 Path p = new Path(testDir, file); 390 391 FSDataOutputStream out = fs.create(p); 392 out.close(); 393 assertTrue(CommonFSUtils.isExists(fs, p), "The created file should be present"); 394 395 long expect = EnvironmentEdgeManager.currentTime() + 1000; 396 assertNotEquals(expect, fs.getFileStatus(p).getModificationTime()); 397 398 ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge(); 399 mockEnv.setValue(expect); 400 EnvironmentEdgeManager.injectEdge(mockEnv); 401 try { 402 String dstFile = HBaseTestingUtil.getRandomUUID().toString(); 403 Path dst = new Path(testDir, dstFile); 404 405 assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst)); 406 assertFalse(CommonFSUtils.isExists(fs, p), "The moved file should not be present"); 407 assertTrue(CommonFSUtils.isExists(fs, dst), "The dst file should be present"); 408 409 assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); 410 cluster.shutdown(); 411 } finally { 412 EnvironmentEdgeManager.reset(); 413 } 414 } 415 416 @Test 417 public void testSetStoragePolicyDefault() throws Exception { 418 verifyNoHDFSApiInvocationForDefaultPolicy(); 419 verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); 420 } 421 422 /** 423 * Note: currently the default policy is set to defer to HDFS and this case is to verify the 424 * logic, will need to remove the check if the default policy is changed 425 */ 426 private void verifyNoHDFSApiInvocationForDefaultPolicy() throws URISyntaxException, IOException { 427 FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); 428 testFs.initialize(new URI("hdfs://localhost/"), conf); 429 // There should be no exception thrown when setting to default storage policy, which indicates 430 // the HDFS API hasn't been called 431 try { 432 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), 433 HConstants.DEFAULT_WAL_STORAGE_POLICY, true); 434 } catch (IOException e) { 435 org.junit.jupiter.api.Assertions 436 .fail("Should have bypassed the FS API when setting default storage policy"); 437 } 438 // There should be exception thrown when given non-default storage policy, which indicates the 439 // HDFS API has been called 440 try { 441 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); 442 fail("Should have invoked the FS API but haven't"); 443 } catch (IOException e) { 444 // expected given an invalid path 445 } 446 } 447 448 class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { 449 @Override 450 public void setStoragePolicy(final Path src, final String policyName) throws IOException { 451 throw new IOException("The setStoragePolicy method is invoked"); 452 } 453 } 454 455 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ 456 @Test 457 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { 458 verifyFileInDirWithStoragePolicy("ALL_SSD"); 459 } 460 461 final String INVALID_STORAGE_POLICY = "1772"; 462 463 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ 464 @Test 465 public void testSetStoragePolicyInvalid() throws Exception { 466 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); 467 } 468 469 // Here instead of TestCommonFSUtils because we need a minicluster 470 private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { 471 conf.set(HConstants.WAL_STORAGE_POLICY, policy); 472 473 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 474 try { 475 assertTrue(CommonFSUtils.isHDFS(conf)); 476 477 FileSystem fs = FileSystem.get(conf); 478 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 479 fs.mkdirs(testDir); 480 481 String storagePolicy = 482 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 483 CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy); 484 485 String file = HBaseTestingUtil.getRandomUUID().toString(); 486 Path p = new Path(testDir, file); 487 WriteDataToHDFS(fs, p, 4096); 488 HFileSystem hfs = new HFileSystem(fs); 489 String policySet = hfs.getStoragePolicyName(p); 490 LOG.debug("The storage policy of path " + p + " is " + policySet); 491 if ( 492 policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY) 493 || policy.equals(INVALID_STORAGE_POLICY) 494 ) { 495 String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory()); 496 LOG.debug("The default hdfs storage policy (indicated by home path: " 497 + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy); 498 assertEquals(hdfsDefaultPolicy, policySet); 499 } else { 500 assertEquals(policy, policySet); 501 } 502 // will assert existence before deleting. 503 cleanupFile(fs, testDir); 504 } finally { 505 cluster.shutdown(); 506 } 507 } 508 509 /** 510 * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of 511 * preading with hedged reads enabled using code taken from hdfs TestPread. 512 */ 513 @Test 514 public void testDFSHedgedReadMetrics() throws Exception { 515 // Enable hedged reads and set it so the threshold is really low. 516 // Most of this test is taken from HDFS, from TestPread. 517 conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); 518 conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); 519 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); 520 conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); 521 // Set short retry timeouts so this test runs faster 522 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); 523 conf.setBoolean("dfs.datanode.transferTo.allowed", false); 524 // disable metrics logger since it depend on commons-logging internal classes and we do not want 525 // commons-logging on our classpath 526 conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 527 conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 528 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); 529 // Get the metrics. Should be empty. 530 DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); 531 assertEquals(0, metrics.getHedgedReadOps()); 532 FileSystem fileSys = cluster.getFileSystem(); 533 try { 534 Path p = new Path("preadtest.dat"); 535 // We need > 1 blocks to test out the hedged reads. 536 DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3, 537 seed); 538 pReadFile(fileSys, p); 539 cleanupFile(fileSys, p); 540 assertTrue(metrics.getHedgedReadOps() > 0); 541 } finally { 542 fileSys.close(); 543 cluster.shutdown(); 544 } 545 } 546 547 @Test 548 public void testCopyFilesParallel() throws Exception { 549 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 550 cluster.waitActive(); 551 FileSystem fs = cluster.getFileSystem(); 552 Path src = new Path("/src"); 553 fs.mkdirs(src); 554 for (int i = 0; i < 50; i++) { 555 WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024); 556 } 557 Path sub = new Path(src, "sub"); 558 fs.mkdirs(sub); 559 for (int i = 0; i < 50; i++) { 560 WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024); 561 } 562 Path dst = new Path("/dst"); 563 List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4); 564 565 assertEquals(102, allFiles.size()); 566 FileStatus[] list = fs.listStatus(dst); 567 assertEquals(51, list.length); 568 FileStatus[] sublist = fs.listStatus(new Path(dst, "sub")); 569 assertEquals(50, sublist.length); 570 } 571 572 // Below is taken from TestPread over in HDFS. 573 static final int blockSize = 4096; 574 static final long seed = 0xDEADBEEFL; 575 private Random rand = new Random(); // This test depends on Random#setSeed 576 577 private void pReadFile(FileSystem fileSys, Path name) throws IOException { 578 FSDataInputStream stm = fileSys.open(name); 579 byte[] expected = new byte[12 * blockSize]; 580 rand.setSeed(seed); 581 rand.nextBytes(expected); 582 // do a sanity check. Read first 4K bytes 583 byte[] actual = new byte[4096]; 584 stm.readFully(actual); 585 checkAndEraseData(actual, 0, expected, "Read Sanity Test"); 586 // now do a pread for the first 8K bytes 587 actual = new byte[8192]; 588 doPread(stm, 0L, actual, 0, 8192); 589 checkAndEraseData(actual, 0, expected, "Pread Test 1"); 590 // Now check to see if the normal read returns 4K-8K byte range 591 actual = new byte[4096]; 592 stm.readFully(actual); 593 checkAndEraseData(actual, 4096, expected, "Pread Test 2"); 594 // Now see if we can cross a single block boundary successfully 595 // read 4K bytes from blockSize - 2K offset 596 stm.readFully(blockSize - 2048, actual, 0, 4096); 597 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); 598 // now see if we can cross two block boundaries successfully 599 // read blockSize + 4K bytes from blockSize - 2K offset 600 actual = new byte[blockSize + 4096]; 601 stm.readFully(blockSize - 2048, actual); 602 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); 603 // now see if we can cross two block boundaries that are not cached 604 // read blockSize + 4K bytes from 10*blockSize - 2K offset 605 actual = new byte[blockSize + 4096]; 606 stm.readFully(10 * blockSize - 2048, actual); 607 checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); 608 // now check that even after all these preads, we can still read 609 // bytes 8K-12K 610 actual = new byte[4096]; 611 stm.readFully(actual); 612 checkAndEraseData(actual, 8192, expected, "Pread Test 6"); 613 // done 614 stm.close(); 615 // check block location caching 616 stm = fileSys.open(name); 617 stm.readFully(1, actual, 0, 4096); 618 stm.readFully(4 * blockSize, actual, 0, 4096); 619 stm.readFully(7 * blockSize, actual, 0, 4096); 620 actual = new byte[3 * 4096]; 621 stm.readFully(0 * blockSize, actual, 0, 3 * 4096); 622 checkAndEraseData(actual, 0, expected, "Pread Test 7"); 623 actual = new byte[8 * 4096]; 624 stm.readFully(3 * blockSize, actual, 0, 8 * 4096); 625 checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8"); 626 // read the tail 627 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2); 628 IOException res = null; 629 try { // read beyond the end of the file 630 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize); 631 } catch (IOException e) { 632 // should throw an exception 633 res = e; 634 } 635 assertTrue(res != null, "Error reading beyond file boundary."); 636 637 stm.close(); 638 } 639 640 private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { 641 for (int idx = 0; idx < actual.length; idx++) { 642 assertEquals(expected[from + idx], actual[idx], message + " byte " + (from + idx) 643 + " differs. expected " + expected[from + idx] + " actual " + actual[idx]); 644 actual[idx] = 0; 645 } 646 } 647 648 private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length) 649 throws IOException { 650 int nread = 0; 651 // long totalRead = 0; 652 // DFSInputStream dfstm = null; 653 654 /* 655 * Disable. This counts do not add up. Some issue in original hdfs tests? if 656 * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream) 657 * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); } 658 */ 659 660 while (nread < length) { 661 int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread); 662 assertTrue(nbytes > 0, "Error in pread"); 663 nread += nbytes; 664 } 665 666 /* 667 * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) { 668 * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <= 669 * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else { 670 * assertEquals("Expected read statistic to be incremented", length, dfstm 671 * .getReadStatistics().getTotalBytesRead() - totalRead); } } 672 */ 673 } 674 675 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 676 assertTrue(fileSys.exists(name)); 677 assertTrue(fileSys.delete(name, true)); 678 assertTrue(!fileSys.exists(name)); 679 } 680 681 static { 682 try { 683 Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 684 LOG.debug("Test thought StreamCapabilities class was present."); 685 } catch (ClassNotFoundException exception) { 686 LOG.debug("Test didn't think StreamCapabilities class was present."); 687 } 688 } 689 690 // Here instead of TestCommonFSUtils because we need a minicluster 691 @Test 692 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { 693 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 694 try (FileSystem filesystem = cluster.getFileSystem()) { 695 FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); 696 assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); 697 assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); 698 assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add.")); 699 } finally { 700 cluster.shutdown(); 701 } 702 } 703 704 private void testIsSameHdfs(int nnport) throws IOException { 705 Configuration conf = HBaseConfiguration.create(); 706 Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); 707 Path desPath = new Path("hdfs://127.0.0.1/"); 708 FileSystem srcFs = srcPath.getFileSystem(conf); 709 FileSystem desFs = desPath.getFileSystem(conf); 710 711 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 712 713 desPath = new Path("hdfs://127.0.0.1:8070/"); 714 desFs = desPath.getFileSystem(conf); 715 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 716 717 desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); 718 desFs = desPath.getFileSystem(conf); 719 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 720 721 conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); 722 conf.set("dfs.nameservices", "haosong-hadoop"); 723 conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); 724 conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", 725 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 726 727 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport); 728 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); 729 desPath = new Path("/"); 730 desFs = desPath.getFileSystem(conf); 731 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 732 733 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport); 734 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); 735 desPath = new Path("/"); 736 desFs = desPath.getFileSystem(conf); 737 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 738 } 739 740 @Test 741 public void testIsSameHdfs() throws IOException { 742 String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); 743 LOG.info("hadoop version is: " + hadoopVersion); 744 boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); 745 if (isHadoop3_0_0) { 746 // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. 747 // See HDFS-9427 748 testIsSameHdfs(9820); 749 } else { 750 // pre hadoop 3.0.0 defaults to port 8020 751 // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 752 testIsSameHdfs(8020); 753 } 754 } 755}