001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertThrows;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.hadoop.hdfs.DistributedFileSystem;
040import org.apache.hadoop.hdfs.MiniDFSCluster;
041import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
042import org.apache.hadoop.ipc.RemoteException;
043import org.junit.jupiter.api.Tag;
044import org.junit.jupiter.api.Test;
045
046/**
047 * Test that FileLink switches between alternate locations when the current location moves or gets
048 * deleted.
049 */
050@Tag(IOTests.TAG)
051@Tag(MediumTests.TAG)
052public class TestFileLink {
053
054  @Test
055  public void testEquals() {
056    Path p1 = new Path("/p1");
057    Path p2 = new Path("/p2");
058    Path p3 = new Path("/p3");
059
060    assertEquals(new FileLink(), new FileLink());
061    assertEquals(new FileLink(p1), new FileLink(p1));
062    assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
063    assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
064
065    assertNotEquals(new FileLink(p1), new FileLink(p3));
066    assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
067    assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
068    assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important!
069  }
070
071  @Test
072  public void testHashCode() {
073    Path p1 = new Path("/p1");
074    Path p2 = new Path("/p2");
075    Path p3 = new Path("/p3");
076
077    assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
078    assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
079    assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
080    assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
081
082    assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
083    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
084    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
085    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
086  }
087
088  /**
089   * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped to a
090   * {@link HdfsDataInputStream} by
091   * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
092   */
093  @Test
094  public void testGetUnderlyingFSDataInputStream() throws Exception {
095    HBaseTestingUtil testUtil = new HBaseTestingUtil();
096    Configuration conf = testUtil.getConfiguration();
097    conf.setInt("dfs.blocksize", 1024 * 1024);
098    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
099
100    testUtil.startMiniDFSCluster(1);
101    try {
102      MiniDFSCluster cluster = testUtil.getDFSCluster();
103      FileSystem fs = cluster.getFileSystem();
104
105      Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
106
107      writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
108
109      List<Path> files = new ArrayList<Path>();
110      files.add(originalPath);
111
112      FileLink link = new FileLink(files);
113      FSDataInputStream stream = link.open(fs);
114
115      FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
116      assertTrue(underlying instanceof HdfsDataInputStream);
117    } finally {
118      testUtil.shutdownMiniCluster();
119    }
120  }
121
122  /**
123   * Test, on HDFS, that the FileLink is still readable even when the current file gets renamed.
124   */
125  @Test
126  public void testHDFSLinkReadDuringRename() throws Exception {
127    HBaseTestingUtil testUtil = new HBaseTestingUtil();
128    Configuration conf = testUtil.getConfiguration();
129    conf.setInt("dfs.blocksize", 1024 * 1024);
130    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
131
132    testUtil.startMiniDFSCluster(1);
133    MiniDFSCluster cluster = testUtil.getDFSCluster();
134    FileSystem fs = cluster.getFileSystem();
135    assertEquals("hdfs", fs.getUri().getScheme());
136
137    try {
138      testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
139    } finally {
140      testUtil.shutdownMiniCluster();
141    }
142  }
143
144  private static class MyDistributedFileSystem extends DistributedFileSystem {
145    MyDistributedFileSystem() {
146    }
147
148    @Override
149    public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
150      throw new RemoteException(FileNotFoundException.class.getName(), "");
151    }
152
153    @Override
154    public Configuration getConf() {
155      return new Configuration();
156    }
157  }
158
159  @Test
160  public void testLinkReadWithMissingFile() throws Exception {
161    HBaseTestingUtil testUtil = new HBaseTestingUtil();
162    FileSystem fs = new MyDistributedFileSystem();
163
164    Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
165    Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file");
166
167    List<Path> files = new ArrayList<Path>();
168    files.add(originalPath);
169    files.add(archivedPath);
170
171    FileLink link = new FileLink(files);
172    assertThrows(FileNotFoundException.class, () -> {
173      link.open(fs);
174    });
175  }
176
177  /**
178   * Test, on a local filesystem, that the FileLink is still readable even when the current file
179   * gets renamed.
180   */
181  @Test
182  public void testLocalLinkReadDuringRename() throws IOException {
183    HBaseTestingUtil testUtil = new HBaseTestingUtil();
184    FileSystem fs = testUtil.getTestFileSystem();
185    assertEquals("file", fs.getUri().getScheme());
186    testLinkReadDuringRename(fs, testUtil.getDataTestDir());
187  }
188
189  /**
190   * Test that link is still readable even when the current file gets renamed.
191   */
192  private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
193    Path originalPath = new Path(rootDir, "test.file");
194    Path archivedPath = new Path(rootDir, "archived.file");
195
196    writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
197
198    List<Path> files = new ArrayList<>();
199    files.add(originalPath);
200    files.add(archivedPath);
201
202    FileLink link = new FileLink(files);
203    FSDataInputStream in = link.open(fs);
204    try {
205      byte[] data = new byte[8192];
206      long size = 0;
207
208      // Read from origin
209      int n = in.read(data);
210      dataVerify(data, n, (byte) 2);
211      size += n;
212
213      if (FSUtils.WINDOWS) {
214        in.close();
215      }
216
217      // Move origin to archive
218      assertFalse(fs.exists(archivedPath));
219      fs.rename(originalPath, archivedPath);
220      assertFalse(fs.exists(originalPath));
221      assertTrue(fs.exists(archivedPath));
222
223      if (FSUtils.WINDOWS) {
224        in = link.open(fs); // re-read from beginning
225        in.read(data);
226      }
227
228      // Try to read to the end
229      while ((n = in.read(data)) > 0) {
230        dataVerify(data, n, (byte) 2);
231        size += n;
232      }
233
234      assertEquals(256 << 20, size);
235    } finally {
236      in.close();
237      if (fs.exists(originalPath)) fs.delete(originalPath, true);
238      if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
239    }
240  }
241
242  /**
243   * Test that link is still readable even when the current file gets deleted. NOTE: This test is
244   * valid only on HDFS. When a file is deleted from a local file-system, it is simply 'unlinked'.
245   * The inode, which contains the file's data, is not deleted until all processes have finished
246   * with it. In HDFS when the request exceed the cached block locations, a query to the namenode is
247   * performed, using the filename, and the deleted file doesn't exists anymore
248   * (FileNotFoundException).
249   */
250  @Test
251  public void testHDFSLinkReadDuringDelete() throws Exception {
252    HBaseTestingUtil testUtil = new HBaseTestingUtil();
253    Configuration conf = testUtil.getConfiguration();
254    conf.setInt("dfs.blocksize", 1024 * 1024);
255    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
256
257    testUtil.startMiniDFSCluster(1);
258    MiniDFSCluster cluster = testUtil.getDFSCluster();
259    FileSystem fs = cluster.getFileSystem();
260    assertEquals("hdfs", fs.getUri().getScheme());
261
262    try {
263      List<Path> files = new ArrayList<>();
264      for (int i = 0; i < 3; i++) {
265        Path path = new Path(String.format("test-data-%d", i));
266        writeSomeData(fs, path, 1 << 20, (byte) i);
267        files.add(path);
268      }
269
270      FileLink link = new FileLink(files);
271      FSDataInputStream in = link.open(fs);
272      try {
273        byte[] data = new byte[8192];
274        int n;
275
276        // Switch to file 1
277        n = in.read(data);
278        dataVerify(data, n, (byte) 0);
279        fs.delete(files.get(0), true);
280        skipBuffer(in, (byte) 0);
281
282        // Switch to file 2
283        n = in.read(data);
284        dataVerify(data, n, (byte) 1);
285        fs.delete(files.get(1), true);
286        skipBuffer(in, (byte) 1);
287
288        // Switch to file 3
289        n = in.read(data);
290        dataVerify(data, n, (byte) 2);
291        fs.delete(files.get(2), true);
292        skipBuffer(in, (byte) 2);
293
294        // No more files available
295        try {
296          n = in.read(data);
297          assert (n <= 0);
298        } catch (FileNotFoundException e) {
299          assertTrue(true);
300        }
301      } finally {
302        in.close();
303      }
304    } finally {
305      testUtil.shutdownMiniCluster();
306    }
307  }
308
309  /**
310   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
311   */
312  private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
313    byte[] data = new byte[4096];
314    for (int i = 0; i < data.length; i++) {
315      data[i] = v;
316    }
317
318    FSDataOutputStream stream = fs.create(path);
319    try {
320      long written = 0;
321      while (written < size) {
322        stream.write(data, 0, data.length);
323        written += data.length;
324      }
325    } finally {
326      stream.close();
327    }
328  }
329
330  /**
331   * Verify that all bytes in 'data' have 'v' as value.
332   */
333  private static void dataVerify(byte[] data, int n, byte v) {
334    for (int i = 0; i < n; ++i) {
335      assertEquals(v, data[i]);
336    }
337  }
338
339  private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
340    byte[] data = new byte[8192];
341    try {
342      int n;
343      while ((n = in.read(data)) == data.length) {
344        for (int i = 0; i < data.length; ++i) {
345          if (data[i] != v) throw new Exception("File changed");
346        }
347      }
348    } catch (Exception e) {
349    }
350  }
351}