001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.hadoop.hdfs.DistributedFileSystem; 040import org.apache.hadoop.hdfs.MiniDFSCluster; 041import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 042import org.apache.hadoop.ipc.RemoteException; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047/** 048 * Test that FileLink switches between alternate locations when the current location moves or gets 049 * deleted. 050 */ 051@Category({ IOTests.class, MediumTests.class }) 052public class TestFileLink { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestFileLink.class); 057 058 @Test 059 public void testEquals() { 060 Path p1 = new Path("/p1"); 061 Path p2 = new Path("/p2"); 062 Path p3 = new Path("/p3"); 063 064 assertEquals(new FileLink(), new FileLink()); 065 assertEquals(new FileLink(p1), new FileLink(p1)); 066 assertEquals(new FileLink(p1, p2), new FileLink(p1, p2)); 067 assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3)); 068 069 assertNotEquals(new FileLink(p1), new FileLink(p3)); 070 assertNotEquals(new FileLink(p1, p2), new FileLink(p1)); 071 assertNotEquals(new FileLink(p1, p2), new FileLink(p2)); 072 assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important! 073 } 074 075 @Test 076 public void testHashCode() { 077 Path p1 = new Path("/p1"); 078 Path p2 = new Path("/p2"); 079 Path p3 = new Path("/p3"); 080 081 assertEquals(new FileLink().hashCode(), new FileLink().hashCode()); 082 assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode()); 083 assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode()); 084 assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode()); 085 086 assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode()); 087 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode()); 088 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode()); 089 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering 090 } 091 092 /** 093 * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped to a 094 * {@link HdfsDataInputStream} by 095 * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)} 096 */ 097 @Test 098 public void testGetUnderlyingFSDataInputStream() throws Exception { 099 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 100 Configuration conf = testUtil.getConfiguration(); 101 conf.setInt("dfs.blocksize", 1024 * 1024); 102 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 103 104 testUtil.startMiniDFSCluster(1); 105 try { 106 MiniDFSCluster cluster = testUtil.getDFSCluster(); 107 FileSystem fs = cluster.getFileSystem(); 108 109 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 110 111 writeSomeData(fs, originalPath, 256 << 20, (byte) 2); 112 113 List<Path> files = new ArrayList<Path>(); 114 files.add(originalPath); 115 116 FileLink link = new FileLink(files); 117 FSDataInputStream stream = link.open(fs); 118 119 FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream); 120 assertTrue(underlying instanceof HdfsDataInputStream); 121 } finally { 122 testUtil.shutdownMiniCluster(); 123 } 124 } 125 126 /** 127 * Test, on HDFS, that the FileLink is still readable even when the current file gets renamed. 128 */ 129 @Test 130 public void testHDFSLinkReadDuringRename() throws Exception { 131 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 132 Configuration conf = testUtil.getConfiguration(); 133 conf.setInt("dfs.blocksize", 1024 * 1024); 134 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 135 136 testUtil.startMiniDFSCluster(1); 137 MiniDFSCluster cluster = testUtil.getDFSCluster(); 138 FileSystem fs = cluster.getFileSystem(); 139 assertEquals("hdfs", fs.getUri().getScheme()); 140 141 try { 142 testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath()); 143 } finally { 144 testUtil.shutdownMiniCluster(); 145 } 146 } 147 148 private static class MyDistributedFileSystem extends DistributedFileSystem { 149 MyDistributedFileSystem() { 150 } 151 152 @Override 153 public FSDataInputStream open(Path f, final int bufferSize) throws IOException { 154 throw new RemoteException(FileNotFoundException.class.getName(), ""); 155 } 156 157 @Override 158 public Configuration getConf() { 159 return new Configuration(); 160 } 161 } 162 163 @Test(expected = FileNotFoundException.class) 164 public void testLinkReadWithMissingFile() throws Exception { 165 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 166 FileSystem fs = new MyDistributedFileSystem(); 167 168 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 169 Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file"); 170 171 List<Path> files = new ArrayList<Path>(); 172 files.add(originalPath); 173 files.add(archivedPath); 174 175 FileLink link = new FileLink(files); 176 link.open(fs); 177 } 178 179 /** 180 * Test, on a local filesystem, that the FileLink is still readable even when the current file 181 * gets renamed. 182 */ 183 @Test 184 public void testLocalLinkReadDuringRename() throws IOException { 185 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 186 FileSystem fs = testUtil.getTestFileSystem(); 187 assertEquals("file", fs.getUri().getScheme()); 188 testLinkReadDuringRename(fs, testUtil.getDataTestDir()); 189 } 190 191 /** 192 * Test that link is still readable even when the current file gets renamed. 193 */ 194 private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException { 195 Path originalPath = new Path(rootDir, "test.file"); 196 Path archivedPath = new Path(rootDir, "archived.file"); 197 198 writeSomeData(fs, originalPath, 256 << 20, (byte) 2); 199 200 List<Path> files = new ArrayList<>(); 201 files.add(originalPath); 202 files.add(archivedPath); 203 204 FileLink link = new FileLink(files); 205 FSDataInputStream in = link.open(fs); 206 try { 207 byte[] data = new byte[8192]; 208 long size = 0; 209 210 // Read from origin 211 int n = in.read(data); 212 dataVerify(data, n, (byte) 2); 213 size += n; 214 215 if (FSUtils.WINDOWS) { 216 in.close(); 217 } 218 219 // Move origin to archive 220 assertFalse(fs.exists(archivedPath)); 221 fs.rename(originalPath, archivedPath); 222 assertFalse(fs.exists(originalPath)); 223 assertTrue(fs.exists(archivedPath)); 224 225 if (FSUtils.WINDOWS) { 226 in = link.open(fs); // re-read from beginning 227 in.read(data); 228 } 229 230 // Try to read to the end 231 while ((n = in.read(data)) > 0) { 232 dataVerify(data, n, (byte) 2); 233 size += n; 234 } 235 236 assertEquals(256 << 20, size); 237 } finally { 238 in.close(); 239 if (fs.exists(originalPath)) fs.delete(originalPath, true); 240 if (fs.exists(archivedPath)) fs.delete(archivedPath, true); 241 } 242 } 243 244 /** 245 * Test that link is still readable even when the current file gets deleted. NOTE: This test is 246 * valid only on HDFS. When a file is deleted from a local file-system, it is simply 'unlinked'. 247 * The inode, which contains the file's data, is not deleted until all processes have finished 248 * with it. In HDFS when the request exceed the cached block locations, a query to the namenode is 249 * performed, using the filename, and the deleted file doesn't exists anymore 250 * (FileNotFoundException). 251 */ 252 @Test 253 public void testHDFSLinkReadDuringDelete() throws Exception { 254 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 255 Configuration conf = testUtil.getConfiguration(); 256 conf.setInt("dfs.blocksize", 1024 * 1024); 257 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 258 259 testUtil.startMiniDFSCluster(1); 260 MiniDFSCluster cluster = testUtil.getDFSCluster(); 261 FileSystem fs = cluster.getFileSystem(); 262 assertEquals("hdfs", fs.getUri().getScheme()); 263 264 try { 265 List<Path> files = new ArrayList<>(); 266 for (int i = 0; i < 3; i++) { 267 Path path = new Path(String.format("test-data-%d", i)); 268 writeSomeData(fs, path, 1 << 20, (byte) i); 269 files.add(path); 270 } 271 272 FileLink link = new FileLink(files); 273 FSDataInputStream in = link.open(fs); 274 try { 275 byte[] data = new byte[8192]; 276 int n; 277 278 // Switch to file 1 279 n = in.read(data); 280 dataVerify(data, n, (byte) 0); 281 fs.delete(files.get(0), true); 282 skipBuffer(in, (byte) 0); 283 284 // Switch to file 2 285 n = in.read(data); 286 dataVerify(data, n, (byte) 1); 287 fs.delete(files.get(1), true); 288 skipBuffer(in, (byte) 1); 289 290 // Switch to file 3 291 n = in.read(data); 292 dataVerify(data, n, (byte) 2); 293 fs.delete(files.get(2), true); 294 skipBuffer(in, (byte) 2); 295 296 // No more files available 297 try { 298 n = in.read(data); 299 assert (n <= 0); 300 } catch (FileNotFoundException e) { 301 assertTrue(true); 302 } 303 } finally { 304 in.close(); 305 } 306 } finally { 307 testUtil.shutdownMiniCluster(); 308 } 309 } 310 311 /** 312 * Write up to 'size' bytes with value 'v' into a new file called 'path'. 313 */ 314 private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException { 315 byte[] data = new byte[4096]; 316 for (int i = 0; i < data.length; i++) { 317 data[i] = v; 318 } 319 320 FSDataOutputStream stream = fs.create(path); 321 try { 322 long written = 0; 323 while (written < size) { 324 stream.write(data, 0, data.length); 325 written += data.length; 326 } 327 } finally { 328 stream.close(); 329 } 330 } 331 332 /** 333 * Verify that all bytes in 'data' have 'v' as value. 334 */ 335 private static void dataVerify(byte[] data, int n, byte v) { 336 for (int i = 0; i < n; ++i) { 337 assertEquals(v, data[i]); 338 } 339 } 340 341 private static void skipBuffer(FSDataInputStream in, byte v) throws IOException { 342 byte[] data = new byte[8192]; 343 try { 344 int n; 345 while ((n = in.read(data)) == data.length) { 346 for (int i = 0; i < data.length; ++i) { 347 if (data[i] != v) throw new Exception("File changed"); 348 } 349 } 350 } catch (Exception e) { 351 } 352 } 353}