001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.hadoop.hdfs.DistributedFileSystem;
040import org.apache.hadoop.hdfs.MiniDFSCluster;
041import org.apache.hadoop.ipc.RemoteException;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045
046/**
047 * Test that FileLink switches between alternate locations
048 * when the current location moves or gets deleted.
049 */
050@Category({IOTests.class, MediumTests.class})
051public class TestFileLink {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestFileLink.class);
056
057  @Test
058  public void testEquals() {
059    Path p1 = new Path("/p1");
060    Path p2 = new Path("/p2");
061    Path p3 = new Path("/p3");
062
063    assertEquals(new FileLink(), new FileLink());
064    assertEquals(new FileLink(p1), new FileLink(p1));
065    assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
066    assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
067
068    assertNotEquals(new FileLink(p1), new FileLink(p3));
069    assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
070    assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
071    assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important!
072  }
073
074  @Test
075  public void testHashCode() {
076    Path p1 = new Path("/p1");
077    Path p2 = new Path("/p2");
078    Path p3 = new Path("/p3");
079
080    assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
081    assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
082    assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
083    assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
084
085    assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
086    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
087    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
088    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
089  }
090
091  /**
092   * Test, on HDFS, that the FileLink is still readable
093   * even when the current file gets renamed.
094   */
095  @Test
096  public void testHDFSLinkReadDuringRename() throws Exception {
097    HBaseTestingUtility testUtil = new HBaseTestingUtility();
098    Configuration conf = testUtil.getConfiguration();
099    conf.setInt("dfs.blocksize", 1024 * 1024);
100    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
101
102    testUtil.startMiniDFSCluster(1);
103    MiniDFSCluster cluster = testUtil.getDFSCluster();
104    FileSystem fs = cluster.getFileSystem();
105    assertEquals("hdfs", fs.getUri().getScheme());
106
107    try {
108      testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
109    } finally {
110      testUtil.shutdownMiniCluster();
111    }
112  }
113
114  private static class MyDistributedFileSystem extends DistributedFileSystem {
115    MyDistributedFileSystem() {
116    }
117    @Override
118    public FSDataInputStream open(Path f, final int bufferSize)
119        throws IOException {
120      throw new RemoteException(FileNotFoundException.class.getName(), "");
121    }
122    @Override
123    public Configuration getConf() {
124      return new Configuration();
125    }
126  }
127  @Test(expected = FileNotFoundException.class)
128  public void testLinkReadWithMissingFile() throws Exception {
129    HBaseTestingUtility testUtil = new HBaseTestingUtility();
130    FileSystem fs = new MyDistributedFileSystem();
131
132    Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
133    Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file");
134
135    List<Path> files = new ArrayList<Path>();
136    files.add(originalPath);
137    files.add(archivedPath);
138
139    FileLink link = new FileLink(files);
140    link.open(fs);
141  }
142
143  /**
144   * Test, on a local filesystem, that the FileLink is still readable
145   * even when the current file gets renamed.
146   */
147  @Test
148  public void testLocalLinkReadDuringRename() throws IOException {
149    HBaseTestingUtility testUtil = new HBaseTestingUtility();
150    FileSystem fs = testUtil.getTestFileSystem();
151    assertEquals("file", fs.getUri().getScheme());
152    testLinkReadDuringRename(fs, testUtil.getDataTestDir());
153  }
154
155  /**
156   * Test that link is still readable even when the current file gets renamed.
157   */
158  private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
159    Path originalPath = new Path(rootDir, "test.file");
160    Path archivedPath = new Path(rootDir, "archived.file");
161
162    writeSomeData(fs, originalPath, 256 << 20, (byte)2);
163
164    List<Path> files = new ArrayList<>();
165    files.add(originalPath);
166    files.add(archivedPath);
167
168    FileLink link = new FileLink(files);
169    FSDataInputStream in = link.open(fs);
170    try {
171      byte[] data = new byte[8192];
172      long size = 0;
173
174      // Read from origin
175      int n = in.read(data);
176      dataVerify(data, n, (byte)2);
177      size += n;
178
179      if (FSUtils.WINDOWS) {
180        in.close();
181      }
182
183      // Move origin to archive
184      assertFalse(fs.exists(archivedPath));
185      fs.rename(originalPath, archivedPath);
186      assertFalse(fs.exists(originalPath));
187      assertTrue(fs.exists(archivedPath));
188
189      if (FSUtils.WINDOWS) {
190        in = link.open(fs); // re-read from beginning
191        in.read(data);
192      }
193
194      // Try to read to the end
195      while ((n = in.read(data)) > 0) {
196        dataVerify(data, n, (byte)2);
197        size += n;
198      }
199
200      assertEquals(256 << 20, size);
201    } finally {
202      in.close();
203      if (fs.exists(originalPath)) fs.delete(originalPath, true);
204      if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
205    }
206  }
207
208  /**
209   * Test that link is still readable even when the current file gets deleted.
210   *
211   * NOTE: This test is valid only on HDFS.
212   * When a file is deleted from a local file-system, it is simply 'unlinked'.
213   * The inode, which contains the file's data, is not deleted until all
214   * processes have finished with it.
215   * In HDFS when the request exceed the cached block locations,
216   * a query to the namenode is performed, using the filename,
217   * and the deleted file doesn't exists anymore (FileNotFoundException).
218   */
219  @Test
220  public void testHDFSLinkReadDuringDelete() throws Exception {
221    HBaseTestingUtility testUtil = new HBaseTestingUtility();
222    Configuration conf = testUtil.getConfiguration();
223    conf.setInt("dfs.blocksize", 1024 * 1024);
224    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
225
226    testUtil.startMiniDFSCluster(1);
227    MiniDFSCluster cluster = testUtil.getDFSCluster();
228    FileSystem fs = cluster.getFileSystem();
229    assertEquals("hdfs", fs.getUri().getScheme());
230
231    try {
232      List<Path> files = new ArrayList<>();
233      for (int i = 0; i < 3; i++) {
234        Path path = new Path(String.format("test-data-%d", i));
235        writeSomeData(fs, path, 1 << 20, (byte)i);
236        files.add(path);
237      }
238
239      FileLink link = new FileLink(files);
240      FSDataInputStream in = link.open(fs);
241      try {
242        byte[] data = new byte[8192];
243        int n;
244
245        // Switch to file 1
246        n = in.read(data);
247        dataVerify(data, n, (byte)0);
248        fs.delete(files.get(0), true);
249        skipBuffer(in, (byte)0);
250
251        // Switch to file 2
252        n = in.read(data);
253        dataVerify(data, n, (byte)1);
254        fs.delete(files.get(1), true);
255        skipBuffer(in, (byte)1);
256
257        // Switch to file 3
258        n = in.read(data);
259        dataVerify(data, n, (byte)2);
260        fs.delete(files.get(2), true);
261        skipBuffer(in, (byte)2);
262
263        // No more files available
264        try {
265          n = in.read(data);
266          assert(n <= 0);
267        } catch (FileNotFoundException e) {
268          assertTrue(true);
269        }
270      } finally {
271        in.close();
272      }
273    } finally {
274      testUtil.shutdownMiniCluster();
275    }
276  }
277
278  /**
279   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
280   */
281  private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
282    byte[] data = new byte[4096];
283    for (int i = 0; i < data.length; i++) {
284      data[i] = v;
285    }
286
287    FSDataOutputStream stream = fs.create(path);
288    try {
289      long written = 0;
290      while (written < size) {
291        stream.write(data, 0, data.length);
292        written += data.length;
293      }
294    } finally {
295      stream.close();
296    }
297  }
298
299  /**
300   * Verify that all bytes in 'data' have 'v' as value.
301   */
302  private static void dataVerify(byte[] data, int n, byte v) {
303    for (int i = 0; i < n; ++i) {
304      assertEquals(v, data[i]);
305    }
306  }
307
308  private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
309    byte[] data = new byte[8192];
310    try {
311      int n;
312      while ((n = in.read(data)) == data.length) {
313        for (int i = 0; i < data.length; ++i) {
314          if (data[i] != v)
315            throw new Exception("File changed");
316        }
317      }
318    } catch (Exception e) {
319    }
320  }
321}