001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotEquals; 023import static org.junit.jupiter.api.Assertions.assertThrows; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.hadoop.hdfs.DistributedFileSystem; 040import org.apache.hadoop.hdfs.MiniDFSCluster; 041import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 042import org.apache.hadoop.ipc.RemoteException; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045 046/** 047 * Test that FileLink switches between alternate locations when the current location moves or gets 048 * deleted. 049 */ 050@Tag(IOTests.TAG) 051@Tag(MediumTests.TAG) 052public class TestFileLink { 053 054 @Test 055 public void testEquals() { 056 Path p1 = new Path("/p1"); 057 Path p2 = new Path("/p2"); 058 Path p3 = new Path("/p3"); 059 060 assertEquals(new FileLink(), new FileLink()); 061 assertEquals(new FileLink(p1), new FileLink(p1)); 062 assertEquals(new FileLink(p1, p2), new FileLink(p1, p2)); 063 assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3)); 064 065 assertNotEquals(new FileLink(p1), new FileLink(p3)); 066 assertNotEquals(new FileLink(p1, p2), new FileLink(p1)); 067 assertNotEquals(new FileLink(p1, p2), new FileLink(p2)); 068 assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important! 069 } 070 071 @Test 072 public void testHashCode() { 073 Path p1 = new Path("/p1"); 074 Path p2 = new Path("/p2"); 075 Path p3 = new Path("/p3"); 076 077 assertEquals(new FileLink().hashCode(), new FileLink().hashCode()); 078 assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode()); 079 assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode()); 080 assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode()); 081 082 assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode()); 083 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode()); 084 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode()); 085 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering 086 } 087 088 /** 089 * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped to a 090 * {@link HdfsDataInputStream} by 091 * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)} 092 */ 093 @Test 094 public void testGetUnderlyingFSDataInputStream() throws Exception { 095 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 096 Configuration conf = testUtil.getConfiguration(); 097 conf.setInt("dfs.blocksize", 1024 * 1024); 098 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 099 100 testUtil.startMiniDFSCluster(1); 101 try { 102 MiniDFSCluster cluster = testUtil.getDFSCluster(); 103 FileSystem fs = cluster.getFileSystem(); 104 105 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 106 107 writeSomeData(fs, originalPath, 256 << 20, (byte) 2); 108 109 List<Path> files = new ArrayList<Path>(); 110 files.add(originalPath); 111 112 FileLink link = new FileLink(files); 113 FSDataInputStream stream = link.open(fs); 114 115 FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream); 116 assertTrue(underlying instanceof HdfsDataInputStream); 117 } finally { 118 testUtil.shutdownMiniCluster(); 119 } 120 } 121 122 /** 123 * Test, on HDFS, that the FileLink is still readable even when the current file gets renamed. 124 */ 125 @Test 126 public void testHDFSLinkReadDuringRename() throws Exception { 127 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 128 Configuration conf = testUtil.getConfiguration(); 129 conf.setInt("dfs.blocksize", 1024 * 1024); 130 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 131 132 testUtil.startMiniDFSCluster(1); 133 MiniDFSCluster cluster = testUtil.getDFSCluster(); 134 FileSystem fs = cluster.getFileSystem(); 135 assertEquals("hdfs", fs.getUri().getScheme()); 136 137 try { 138 testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath()); 139 } finally { 140 testUtil.shutdownMiniCluster(); 141 } 142 } 143 144 private static class MyDistributedFileSystem extends DistributedFileSystem { 145 MyDistributedFileSystem() { 146 } 147 148 @Override 149 public FSDataInputStream open(Path f, final int bufferSize) throws IOException { 150 throw new RemoteException(FileNotFoundException.class.getName(), ""); 151 } 152 153 @Override 154 public Configuration getConf() { 155 return new Configuration(); 156 } 157 } 158 159 @Test 160 public void testLinkReadWithMissingFile() throws Exception { 161 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 162 FileSystem fs = new MyDistributedFileSystem(); 163 164 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 165 Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file"); 166 167 List<Path> files = new ArrayList<Path>(); 168 files.add(originalPath); 169 files.add(archivedPath); 170 171 FileLink link = new FileLink(files); 172 assertThrows(FileNotFoundException.class, () -> { 173 link.open(fs); 174 }); 175 } 176 177 /** 178 * Test, on a local filesystem, that the FileLink is still readable even when the current file 179 * gets renamed. 180 */ 181 @Test 182 public void testLocalLinkReadDuringRename() throws IOException { 183 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 184 FileSystem fs = testUtil.getTestFileSystem(); 185 assertEquals("file", fs.getUri().getScheme()); 186 testLinkReadDuringRename(fs, testUtil.getDataTestDir()); 187 } 188 189 /** 190 * Test that link is still readable even when the current file gets renamed. 191 */ 192 private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException { 193 Path originalPath = new Path(rootDir, "test.file"); 194 Path archivedPath = new Path(rootDir, "archived.file"); 195 196 writeSomeData(fs, originalPath, 256 << 20, (byte) 2); 197 198 List<Path> files = new ArrayList<>(); 199 files.add(originalPath); 200 files.add(archivedPath); 201 202 FileLink link = new FileLink(files); 203 FSDataInputStream in = link.open(fs); 204 try { 205 byte[] data = new byte[8192]; 206 long size = 0; 207 208 // Read from origin 209 int n = in.read(data); 210 dataVerify(data, n, (byte) 2); 211 size += n; 212 213 if (FSUtils.WINDOWS) { 214 in.close(); 215 } 216 217 // Move origin to archive 218 assertFalse(fs.exists(archivedPath)); 219 fs.rename(originalPath, archivedPath); 220 assertFalse(fs.exists(originalPath)); 221 assertTrue(fs.exists(archivedPath)); 222 223 if (FSUtils.WINDOWS) { 224 in = link.open(fs); // re-read from beginning 225 in.read(data); 226 } 227 228 // Try to read to the end 229 while ((n = in.read(data)) > 0) { 230 dataVerify(data, n, (byte) 2); 231 size += n; 232 } 233 234 assertEquals(256 << 20, size); 235 } finally { 236 in.close(); 237 if (fs.exists(originalPath)) fs.delete(originalPath, true); 238 if (fs.exists(archivedPath)) fs.delete(archivedPath, true); 239 } 240 } 241 242 /** 243 * Test that link is still readable even when the current file gets deleted. NOTE: This test is 244 * valid only on HDFS. When a file is deleted from a local file-system, it is simply 'unlinked'. 245 * The inode, which contains the file's data, is not deleted until all processes have finished 246 * with it. In HDFS when the request exceed the cached block locations, a query to the namenode is 247 * performed, using the filename, and the deleted file doesn't exists anymore 248 * (FileNotFoundException). 249 */ 250 @Test 251 public void testHDFSLinkReadDuringDelete() throws Exception { 252 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 253 Configuration conf = testUtil.getConfiguration(); 254 conf.setInt("dfs.blocksize", 1024 * 1024); 255 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 256 257 testUtil.startMiniDFSCluster(1); 258 MiniDFSCluster cluster = testUtil.getDFSCluster(); 259 FileSystem fs = cluster.getFileSystem(); 260 assertEquals("hdfs", fs.getUri().getScheme()); 261 262 try { 263 List<Path> files = new ArrayList<>(); 264 for (int i = 0; i < 3; i++) { 265 Path path = new Path(String.format("test-data-%d", i)); 266 writeSomeData(fs, path, 1 << 20, (byte) i); 267 files.add(path); 268 } 269 270 FileLink link = new FileLink(files); 271 FSDataInputStream in = link.open(fs); 272 try { 273 byte[] data = new byte[8192]; 274 int n; 275 276 // Switch to file 1 277 n = in.read(data); 278 dataVerify(data, n, (byte) 0); 279 fs.delete(files.get(0), true); 280 skipBuffer(in, (byte) 0); 281 282 // Switch to file 2 283 n = in.read(data); 284 dataVerify(data, n, (byte) 1); 285 fs.delete(files.get(1), true); 286 skipBuffer(in, (byte) 1); 287 288 // Switch to file 3 289 n = in.read(data); 290 dataVerify(data, n, (byte) 2); 291 fs.delete(files.get(2), true); 292 skipBuffer(in, (byte) 2); 293 294 // No more files available 295 try { 296 n = in.read(data); 297 assert (n <= 0); 298 } catch (FileNotFoundException e) { 299 assertTrue(true); 300 } 301 } finally { 302 in.close(); 303 } 304 } finally { 305 testUtil.shutdownMiniCluster(); 306 } 307 } 308 309 /** 310 * Write up to 'size' bytes with value 'v' into a new file called 'path'. 311 */ 312 private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException { 313 byte[] data = new byte[4096]; 314 for (int i = 0; i < data.length; i++) { 315 data[i] = v; 316 } 317 318 FSDataOutputStream stream = fs.create(path); 319 try { 320 long written = 0; 321 while (written < size) { 322 stream.write(data, 0, data.length); 323 written += data.length; 324 } 325 } finally { 326 stream.close(); 327 } 328 } 329 330 /** 331 * Verify that all bytes in 'data' have 'v' as value. 332 */ 333 private static void dataVerify(byte[] data, int n, byte v) { 334 for (int i = 0; i < n; ++i) { 335 assertEquals(v, data[i]); 336 } 337 } 338 339 private static void skipBuffer(FSDataInputStream in, byte v) throws IOException { 340 byte[] data = new byte[8192]; 341 try { 342 int n; 343 while ((n = in.read(data)) == data.length) { 344 for (int i = 0; i < data.length; ++i) { 345 if (data[i] != v) throw new Exception("File changed"); 346 } 347 } 348 } catch (Exception e) { 349 } 350 } 351}