001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.hadoop.hdfs.DistributedFileSystem; 040import org.apache.hadoop.hdfs.MiniDFSCluster; 041import org.apache.hadoop.ipc.RemoteException; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045 046/** 047 * Test that FileLink switches between alternate locations 048 * when the current location moves or gets deleted. 049 */ 050@Category({IOTests.class, MediumTests.class}) 051public class TestFileLink { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestFileLink.class); 056 057 @Test 058 public void testEquals() { 059 Path p1 = new Path("/p1"); 060 Path p2 = new Path("/p2"); 061 Path p3 = new Path("/p3"); 062 063 assertEquals(new FileLink(), new FileLink()); 064 assertEquals(new FileLink(p1), new FileLink(p1)); 065 assertEquals(new FileLink(p1, p2), new FileLink(p1, p2)); 066 assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3)); 067 068 assertNotEquals(new FileLink(p1), new FileLink(p3)); 069 assertNotEquals(new FileLink(p1, p2), new FileLink(p1)); 070 assertNotEquals(new FileLink(p1, p2), new FileLink(p2)); 071 assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important! 072 } 073 074 @Test 075 public void testHashCode() { 076 Path p1 = new Path("/p1"); 077 Path p2 = new Path("/p2"); 078 Path p3 = new Path("/p3"); 079 080 assertEquals(new FileLink().hashCode(), new FileLink().hashCode()); 081 assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode()); 082 assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode()); 083 assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode()); 084 085 assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode()); 086 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode()); 087 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode()); 088 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering 089 } 090 091 /** 092 * Test, on HDFS, that the FileLink is still readable 093 * even when the current file gets renamed. 094 */ 095 @Test 096 public void testHDFSLinkReadDuringRename() throws Exception { 097 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 098 Configuration conf = testUtil.getConfiguration(); 099 conf.setInt("dfs.blocksize", 1024 * 1024); 100 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 101 102 testUtil.startMiniDFSCluster(1); 103 MiniDFSCluster cluster = testUtil.getDFSCluster(); 104 FileSystem fs = cluster.getFileSystem(); 105 assertEquals("hdfs", fs.getUri().getScheme()); 106 107 try { 108 testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath()); 109 } finally { 110 testUtil.shutdownMiniCluster(); 111 } 112 } 113 114 private static class MyDistributedFileSystem extends DistributedFileSystem { 115 MyDistributedFileSystem() { 116 } 117 @Override 118 public FSDataInputStream open(Path f, final int bufferSize) 119 throws IOException { 120 throw new RemoteException(FileNotFoundException.class.getName(), ""); 121 } 122 @Override 123 public Configuration getConf() { 124 return new Configuration(); 125 } 126 } 127 @Test(expected = FileNotFoundException.class) 128 public void testLinkReadWithMissingFile() throws Exception { 129 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 130 FileSystem fs = new MyDistributedFileSystem(); 131 132 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 133 Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file"); 134 135 List<Path> files = new ArrayList<Path>(); 136 files.add(originalPath); 137 files.add(archivedPath); 138 139 FileLink link = new FileLink(files); 140 link.open(fs); 141 } 142 143 /** 144 * Test, on a local filesystem, that the FileLink is still readable 145 * even when the current file gets renamed. 146 */ 147 @Test 148 public void testLocalLinkReadDuringRename() throws IOException { 149 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 150 FileSystem fs = testUtil.getTestFileSystem(); 151 assertEquals("file", fs.getUri().getScheme()); 152 testLinkReadDuringRename(fs, testUtil.getDataTestDir()); 153 } 154 155 /** 156 * Test that link is still readable even when the current file gets renamed. 157 */ 158 private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException { 159 Path originalPath = new Path(rootDir, "test.file"); 160 Path archivedPath = new Path(rootDir, "archived.file"); 161 162 writeSomeData(fs, originalPath, 256 << 20, (byte)2); 163 164 List<Path> files = new ArrayList<>(); 165 files.add(originalPath); 166 files.add(archivedPath); 167 168 FileLink link = new FileLink(files); 169 FSDataInputStream in = link.open(fs); 170 try { 171 byte[] data = new byte[8192]; 172 long size = 0; 173 174 // Read from origin 175 int n = in.read(data); 176 dataVerify(data, n, (byte)2); 177 size += n; 178 179 if (FSUtils.WINDOWS) { 180 in.close(); 181 } 182 183 // Move origin to archive 184 assertFalse(fs.exists(archivedPath)); 185 fs.rename(originalPath, archivedPath); 186 assertFalse(fs.exists(originalPath)); 187 assertTrue(fs.exists(archivedPath)); 188 189 if (FSUtils.WINDOWS) { 190 in = link.open(fs); // re-read from beginning 191 in.read(data); 192 } 193 194 // Try to read to the end 195 while ((n = in.read(data)) > 0) { 196 dataVerify(data, n, (byte)2); 197 size += n; 198 } 199 200 assertEquals(256 << 20, size); 201 } finally { 202 in.close(); 203 if (fs.exists(originalPath)) fs.delete(originalPath, true); 204 if (fs.exists(archivedPath)) fs.delete(archivedPath, true); 205 } 206 } 207 208 /** 209 * Test that link is still readable even when the current file gets deleted. 210 * 211 * NOTE: This test is valid only on HDFS. 212 * When a file is deleted from a local file-system, it is simply 'unlinked'. 213 * The inode, which contains the file's data, is not deleted until all 214 * processes have finished with it. 215 * In HDFS when the request exceed the cached block locations, 216 * a query to the namenode is performed, using the filename, 217 * and the deleted file doesn't exists anymore (FileNotFoundException). 218 */ 219 @Test 220 public void testHDFSLinkReadDuringDelete() throws Exception { 221 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 222 Configuration conf = testUtil.getConfiguration(); 223 conf.setInt("dfs.blocksize", 1024 * 1024); 224 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 225 226 testUtil.startMiniDFSCluster(1); 227 MiniDFSCluster cluster = testUtil.getDFSCluster(); 228 FileSystem fs = cluster.getFileSystem(); 229 assertEquals("hdfs", fs.getUri().getScheme()); 230 231 try { 232 List<Path> files = new ArrayList<>(); 233 for (int i = 0; i < 3; i++) { 234 Path path = new Path(String.format("test-data-%d", i)); 235 writeSomeData(fs, path, 1 << 20, (byte)i); 236 files.add(path); 237 } 238 239 FileLink link = new FileLink(files); 240 FSDataInputStream in = link.open(fs); 241 try { 242 byte[] data = new byte[8192]; 243 int n; 244 245 // Switch to file 1 246 n = in.read(data); 247 dataVerify(data, n, (byte)0); 248 fs.delete(files.get(0), true); 249 skipBuffer(in, (byte)0); 250 251 // Switch to file 2 252 n = in.read(data); 253 dataVerify(data, n, (byte)1); 254 fs.delete(files.get(1), true); 255 skipBuffer(in, (byte)1); 256 257 // Switch to file 3 258 n = in.read(data); 259 dataVerify(data, n, (byte)2); 260 fs.delete(files.get(2), true); 261 skipBuffer(in, (byte)2); 262 263 // No more files available 264 try { 265 n = in.read(data); 266 assert(n <= 0); 267 } catch (FileNotFoundException e) { 268 assertTrue(true); 269 } 270 } finally { 271 in.close(); 272 } 273 } finally { 274 testUtil.shutdownMiniCluster(); 275 } 276 } 277 278 /** 279 * Write up to 'size' bytes with value 'v' into a new file called 'path'. 280 */ 281 private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException { 282 byte[] data = new byte[4096]; 283 for (int i = 0; i < data.length; i++) { 284 data[i] = v; 285 } 286 287 FSDataOutputStream stream = fs.create(path); 288 try { 289 long written = 0; 290 while (written < size) { 291 stream.write(data, 0, data.length); 292 written += data.length; 293 } 294 } finally { 295 stream.close(); 296 } 297 } 298 299 /** 300 * Verify that all bytes in 'data' have 'v' as value. 301 */ 302 private static void dataVerify(byte[] data, int n, byte v) { 303 for (int i = 0; i < n; ++i) { 304 assertEquals(v, data[i]); 305 } 306 } 307 308 private static void skipBuffer(FSDataInputStream in, byte v) throws IOException { 309 byte[] data = new byte[8192]; 310 try { 311 int n; 312 while ((n = in.read(data)) == data.length) { 313 for (int i = 0; i < data.length; ++i) { 314 if (data[i] != v) 315 throw new Exception("File changed"); 316 } 317 } 318 } catch (Exception e) { 319 } 320 } 321}