001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.hadoop.hdfs.DistributedFileSystem;
040import org.apache.hadoop.hdfs.MiniDFSCluster;
041import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
042import org.apache.hadoop.ipc.RemoteException;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047/**
048 * Test that FileLink switches between alternate locations when the current location moves or gets
049 * deleted.
050 */
051@Category({ IOTests.class, MediumTests.class })
052public class TestFileLink {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestFileLink.class);
057
058  @Test
059  public void testEquals() {
060    Path p1 = new Path("/p1");
061    Path p2 = new Path("/p2");
062    Path p3 = new Path("/p3");
063
064    assertEquals(new FileLink(), new FileLink());
065    assertEquals(new FileLink(p1), new FileLink(p1));
066    assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
067    assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
068
069    assertNotEquals(new FileLink(p1), new FileLink(p3));
070    assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
071    assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
072    assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important!
073  }
074
075  @Test
076  public void testHashCode() {
077    Path p1 = new Path("/p1");
078    Path p2 = new Path("/p2");
079    Path p3 = new Path("/p3");
080
081    assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
082    assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
083    assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
084    assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
085
086    assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
087    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
088    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
089    assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
090  }
091
092  /**
093   * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped to a
094   * {@link HdfsDataInputStream} by
095   * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
096   */
097  @Test
098  public void testGetUnderlyingFSDataInputStream() throws Exception {
099    HBaseTestingUtil testUtil = new HBaseTestingUtil();
100    Configuration conf = testUtil.getConfiguration();
101    conf.setInt("dfs.blocksize", 1024 * 1024);
102    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
103
104    testUtil.startMiniDFSCluster(1);
105    try {
106      MiniDFSCluster cluster = testUtil.getDFSCluster();
107      FileSystem fs = cluster.getFileSystem();
108
109      Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
110
111      writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
112
113      List<Path> files = new ArrayList<Path>();
114      files.add(originalPath);
115
116      FileLink link = new FileLink(files);
117      FSDataInputStream stream = link.open(fs);
118
119      FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
120      assertTrue(underlying instanceof HdfsDataInputStream);
121    } finally {
122      testUtil.shutdownMiniCluster();
123    }
124  }
125
126  /**
127   * Test, on HDFS, that the FileLink is still readable even when the current file gets renamed.
128   */
129  @Test
130  public void testHDFSLinkReadDuringRename() throws Exception {
131    HBaseTestingUtil testUtil = new HBaseTestingUtil();
132    Configuration conf = testUtil.getConfiguration();
133    conf.setInt("dfs.blocksize", 1024 * 1024);
134    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
135
136    testUtil.startMiniDFSCluster(1);
137    MiniDFSCluster cluster = testUtil.getDFSCluster();
138    FileSystem fs = cluster.getFileSystem();
139    assertEquals("hdfs", fs.getUri().getScheme());
140
141    try {
142      testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
143    } finally {
144      testUtil.shutdownMiniCluster();
145    }
146  }
147
148  private static class MyDistributedFileSystem extends DistributedFileSystem {
149    MyDistributedFileSystem() {
150    }
151
152    @Override
153    public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
154      throw new RemoteException(FileNotFoundException.class.getName(), "");
155    }
156
157    @Override
158    public Configuration getConf() {
159      return new Configuration();
160    }
161  }
162
163  @Test(expected = FileNotFoundException.class)
164  public void testLinkReadWithMissingFile() throws Exception {
165    HBaseTestingUtil testUtil = new HBaseTestingUtil();
166    FileSystem fs = new MyDistributedFileSystem();
167
168    Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
169    Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file");
170
171    List<Path> files = new ArrayList<Path>();
172    files.add(originalPath);
173    files.add(archivedPath);
174
175    FileLink link = new FileLink(files);
176    link.open(fs);
177  }
178
179  /**
180   * Test, on a local filesystem, that the FileLink is still readable even when the current file
181   * gets renamed.
182   */
183  @Test
184  public void testLocalLinkReadDuringRename() throws IOException {
185    HBaseTestingUtil testUtil = new HBaseTestingUtil();
186    FileSystem fs = testUtil.getTestFileSystem();
187    assertEquals("file", fs.getUri().getScheme());
188    testLinkReadDuringRename(fs, testUtil.getDataTestDir());
189  }
190
191  /**
192   * Test that link is still readable even when the current file gets renamed.
193   */
194  private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
195    Path originalPath = new Path(rootDir, "test.file");
196    Path archivedPath = new Path(rootDir, "archived.file");
197
198    writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
199
200    List<Path> files = new ArrayList<>();
201    files.add(originalPath);
202    files.add(archivedPath);
203
204    FileLink link = new FileLink(files);
205    FSDataInputStream in = link.open(fs);
206    try {
207      byte[] data = new byte[8192];
208      long size = 0;
209
210      // Read from origin
211      int n = in.read(data);
212      dataVerify(data, n, (byte) 2);
213      size += n;
214
215      if (FSUtils.WINDOWS) {
216        in.close();
217      }
218
219      // Move origin to archive
220      assertFalse(fs.exists(archivedPath));
221      fs.rename(originalPath, archivedPath);
222      assertFalse(fs.exists(originalPath));
223      assertTrue(fs.exists(archivedPath));
224
225      if (FSUtils.WINDOWS) {
226        in = link.open(fs); // re-read from beginning
227        in.read(data);
228      }
229
230      // Try to read to the end
231      while ((n = in.read(data)) > 0) {
232        dataVerify(data, n, (byte) 2);
233        size += n;
234      }
235
236      assertEquals(256 << 20, size);
237    } finally {
238      in.close();
239      if (fs.exists(originalPath)) fs.delete(originalPath, true);
240      if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
241    }
242  }
243
244  /**
245   * Test that link is still readable even when the current file gets deleted. NOTE: This test is
246   * valid only on HDFS. When a file is deleted from a local file-system, it is simply 'unlinked'.
247   * The inode, which contains the file's data, is not deleted until all processes have finished
248   * with it. In HDFS when the request exceed the cached block locations, a query to the namenode is
249   * performed, using the filename, and the deleted file doesn't exists anymore
250   * (FileNotFoundException).
251   */
252  @Test
253  public void testHDFSLinkReadDuringDelete() throws Exception {
254    HBaseTestingUtil testUtil = new HBaseTestingUtil();
255    Configuration conf = testUtil.getConfiguration();
256    conf.setInt("dfs.blocksize", 1024 * 1024);
257    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
258
259    testUtil.startMiniDFSCluster(1);
260    MiniDFSCluster cluster = testUtil.getDFSCluster();
261    FileSystem fs = cluster.getFileSystem();
262    assertEquals("hdfs", fs.getUri().getScheme());
263
264    try {
265      List<Path> files = new ArrayList<>();
266      for (int i = 0; i < 3; i++) {
267        Path path = new Path(String.format("test-data-%d", i));
268        writeSomeData(fs, path, 1 << 20, (byte) i);
269        files.add(path);
270      }
271
272      FileLink link = new FileLink(files);
273      FSDataInputStream in = link.open(fs);
274      try {
275        byte[] data = new byte[8192];
276        int n;
277
278        // Switch to file 1
279        n = in.read(data);
280        dataVerify(data, n, (byte) 0);
281        fs.delete(files.get(0), true);
282        skipBuffer(in, (byte) 0);
283
284        // Switch to file 2
285        n = in.read(data);
286        dataVerify(data, n, (byte) 1);
287        fs.delete(files.get(1), true);
288        skipBuffer(in, (byte) 1);
289
290        // Switch to file 3
291        n = in.read(data);
292        dataVerify(data, n, (byte) 2);
293        fs.delete(files.get(2), true);
294        skipBuffer(in, (byte) 2);
295
296        // No more files available
297        try {
298          n = in.read(data);
299          assert (n <= 0);
300        } catch (FileNotFoundException e) {
301          assertTrue(true);
302        }
303      } finally {
304        in.close();
305      }
306    } finally {
307      testUtil.shutdownMiniCluster();
308    }
309  }
310
311  /**
312   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
313   */
314  private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
315    byte[] data = new byte[4096];
316    for (int i = 0; i < data.length; i++) {
317      data[i] = v;
318    }
319
320    FSDataOutputStream stream = fs.create(path);
321    try {
322      long written = 0;
323      while (written < size) {
324        stream.write(data, 0, data.length);
325        written += data.length;
326      }
327    } finally {
328      stream.close();
329    }
330  }
331
332  /**
333   * Verify that all bytes in 'data' have 'v' as value.
334   */
335  private static void dataVerify(byte[] data, int n, byte v) {
336    for (int i = 0; i < n; ++i) {
337      assertEquals(v, data[i]);
338    }
339  }
340
341  private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
342    byte[] data = new byte[8192];
343    try {
344      int n;
345      while ((n = in.read(data)) == data.length) {
346        for (int i = 0; i < data.length; ++i) {
347          if (data[i] != v) throw new Exception("File changed");
348        }
349      }
350    } catch (Exception e) {
351    }
352  }
353}