001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.List;
027import org.apache.hadoop.fs.CanSetDropBehind;
028import org.apache.hadoop.fs.CanSetReadahead;
029import org.apache.hadoop.fs.CanUnbuffer;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PositionedReadable;
035import org.apache.hadoop.fs.Seekable;
036import org.apache.hadoop.hbase.util.CommonFSUtils;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.hadoop.security.AccessControlException;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
045 * <p>
046 * <b>The Problem:</b>
047 * <ul>
048 * <li>HDFS doesn't have support for hardlinks, and this make impossible to referencing the same
049 * data blocks using different names.</li>
050 * <li>HBase store files in one location (e.g. table/region/family/) and when the file is not needed
051 * anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.</li>
052 * </ul>
053 * If we want to create a reference to a file, we need to remember that it can be in its original
054 * location or in the archive folder. The FileLink class tries to abstract this concept and given a
055 * set of locations it is able to switch between them making this operation transparent for the
056 * user. {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
057 * <p>
058 * <b>Back-references:</b> To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore}
059 * to keep track of the links to a particular file, during the {@code FileLink} creation, a new file
060 * is placed inside a back-reference directory. There's one back-reference directory for each file
061 * that has links, and in the directory there's one file per link.
062 * <p>
063 * HFileLink Example
064 * <ul>
065 * <li>/hbase/table/region-x/cf/file-k (Original File)</li>
066 * <li>/hbase/table-cloned/region-y/cf/file-k.region-x.table (HFileLink to the original file)</li>
067 * <li>/hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table (HFileLink to the original file)
068 * </li>
069 * <li>/hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned (Back-reference to the
070 * link in table-cloned)</li>
071 * <li>/hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned (Back-reference to the
072 * link in table-2nd-cloned)</li>
073 * </ul>
074 */
075@InterfaceAudience.Private
076public class FileLink {
077  private static final Logger LOG = LoggerFactory.getLogger(FileLink.class);
078
079  /** Define the Back-reference directory name prefix: .links-&lt;hfile&gt;/ */
080  public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
081
082  /**
083   * FileLink InputStream that handles the switch between the original path and the alternative
084   * locations, when the file is moved.
085   */
086  private static class FileLinkInputStream extends InputStream
087    implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
088    private FSDataInputStream in = null;
089    private Path currentPath = null;
090    private long pos = 0;
091
092    private final FileLink fileLink;
093    private final int bufferSize;
094    private final FileSystem fs;
095
096    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink) throws IOException {
097      this(fs, fileLink, CommonFSUtils.getDefaultBufferSize(fs));
098    }
099
100    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
101      throws IOException {
102      this.bufferSize = bufferSize;
103      this.fileLink = fileLink;
104      this.fs = fs;
105
106      this.in = tryOpen();
107    }
108
109    private FSDataInputStream getUnderlyingInputStream() {
110      return in;
111    }
112
113    @Override
114    public int read() throws IOException {
115      int res;
116      try {
117        res = in.read();
118      } catch (FileNotFoundException e) {
119        res = tryOpen().read();
120      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
121        res = tryOpen().read();
122      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
123        res = tryOpen().read();
124      }
125      if (res > 0) pos += 1;
126      return res;
127    }
128
129    @Override
130    public int read(byte[] b) throws IOException {
131      return read(b, 0, b.length);
132    }
133
134    @Override
135    public int read(byte[] b, int off, int len) throws IOException {
136      int n;
137      try {
138        n = in.read(b, off, len);
139      } catch (FileNotFoundException e) {
140        n = tryOpen().read(b, off, len);
141      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
142        n = tryOpen().read(b, off, len);
143      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
144        n = tryOpen().read(b, off, len);
145      }
146      if (n > 0) pos += n;
147      assert (in.getPos() == pos);
148      return n;
149    }
150
151    @Override
152    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
153      int n;
154      try {
155        n = in.read(position, buffer, offset, length);
156      } catch (FileNotFoundException e) {
157        n = tryOpen().read(position, buffer, offset, length);
158      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
159        n = tryOpen().read(position, buffer, offset, length);
160      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
161        n = tryOpen().read(position, buffer, offset, length);
162      }
163      return n;
164    }
165
166    @Override
167    public void readFully(long position, byte[] buffer) throws IOException {
168      readFully(position, buffer, 0, buffer.length);
169    }
170
171    @Override
172    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
173      try {
174        in.readFully(position, buffer, offset, length);
175      } catch (FileNotFoundException e) {
176        tryOpen().readFully(position, buffer, offset, length);
177      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
178        tryOpen().readFully(position, buffer, offset, length);
179      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
180        tryOpen().readFully(position, buffer, offset, length);
181      }
182    }
183
184    @Override
185    public long skip(long n) throws IOException {
186      long skipped;
187
188      try {
189        skipped = in.skip(n);
190      } catch (FileNotFoundException e) {
191        skipped = tryOpen().skip(n);
192      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
193        skipped = tryOpen().skip(n);
194      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
195        skipped = tryOpen().skip(n);
196      }
197
198      if (skipped > 0) pos += skipped;
199      return skipped;
200    }
201
202    @Override
203    public int available() throws IOException {
204      try {
205        return in.available();
206      } catch (FileNotFoundException e) {
207        return tryOpen().available();
208      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
209        return tryOpen().available();
210      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
211        return tryOpen().available();
212      }
213    }
214
215    @Override
216    public void seek(long pos) throws IOException {
217      try {
218        in.seek(pos);
219      } catch (FileNotFoundException e) {
220        tryOpen().seek(pos);
221      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
222        tryOpen().seek(pos);
223      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
224        tryOpen().seek(pos);
225      }
226      this.pos = pos;
227    }
228
229    @Override
230    public long getPos() throws IOException {
231      return pos;
232    }
233
234    @Override
235    public boolean seekToNewSource(long targetPos) throws IOException {
236      boolean res;
237      try {
238        res = in.seekToNewSource(targetPos);
239      } catch (FileNotFoundException e) {
240        res = tryOpen().seekToNewSource(targetPos);
241      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
242        res = tryOpen().seekToNewSource(targetPos);
243      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
244        res = tryOpen().seekToNewSource(targetPos);
245      }
246      if (res) pos = targetPos;
247      return res;
248    }
249
250    @Override
251    public void close() throws IOException {
252      in.close();
253    }
254
255    @Override
256    public synchronized void mark(int readlimit) {
257    }
258
259    @Override
260    public synchronized void reset() throws IOException {
261      throw new IOException("mark/reset not supported");
262    }
263
264    @Override
265    public boolean markSupported() {
266      return false;
267    }
268
269    @Override
270    public void unbuffer() {
271      if (in == null) {
272        return;
273      }
274      in.unbuffer();
275    }
276
277    /**
278     * Try to open the file from one of the available locations.
279     * @return FSDataInputStream stream of the opened file link
280     * @throws IOException on unexpected error, or file not found.
281     */
282    private FSDataInputStream tryOpen() throws IOException {
283      IOException exception = null;
284      for (Path path : fileLink.getLocations()) {
285        if (path.equals(currentPath)) continue;
286        try {
287          in = fs.open(path, bufferSize);
288          if (pos != 0) in.seek(pos);
289          assert (in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
290          if (LOG.isTraceEnabled()) {
291            if (currentPath == null) {
292              LOG.debug("link open path=" + path);
293            } else {
294              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
295            }
296          }
297          currentPath = path;
298          return (in);
299        } catch (FileNotFoundException | AccessControlException | RemoteException e) {
300          exception = FileLink.handleAccessLocationException(fileLink, e, exception);
301        }
302      }
303      throw exception;
304    }
305
306    @Override
307    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
308      in.setReadahead(readahead);
309    }
310
311    @Override
312    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
313      in.setDropBehind(dropCache);
314    }
315  }
316
317  private Path[] locations = null;
318
319  protected FileLink() {
320    this.locations = null;
321  }
322
323  /**
324   * @param originPath       Original location of the file to link
325   * @param alternativePaths Alternative locations to look for the linked file
326   */
327  public FileLink(Path originPath, Path... alternativePaths) {
328    setLocations(originPath, alternativePaths);
329  }
330
331  /**
332   * @param locations locations to look for the linked file
333   */
334  public FileLink(final Collection<Path> locations) {
335    this.locations = locations.toArray(new Path[locations.size()]);
336  }
337
338  /** Returns the locations to look for the linked file. */
339  public Path[] getLocations() {
340    return locations;
341  }
342
343  @Override
344  public String toString() {
345    StringBuilder str = new StringBuilder(getClass().getSimpleName());
346    str.append(" locations=[");
347    for (int i = 0; i < locations.length; ++i) {
348      if (i > 0) str.append(", ");
349      str.append(locations[i].toString());
350    }
351    str.append("]");
352    return str.toString();
353  }
354
355  /** Returns true if the file pointed by the link exists */
356  public boolean exists(final FileSystem fs) throws IOException {
357    for (int i = 0; i < locations.length; ++i) {
358      if (fs.exists(locations[i])) {
359        return true;
360      }
361    }
362    return false;
363  }
364
365  /** Returns the path of the first available link. */
366  public Path getAvailablePath(FileSystem fs) throws IOException {
367    for (int i = 0; i < locations.length; ++i) {
368      if (fs.exists(locations[i])) {
369        return locations[i];
370      }
371    }
372    throw new FileNotFoundException(toString());
373  }
374
375  /**
376   * Get the FileStatus of the referenced file.
377   * @param fs {@link FileSystem} on which to get the file status
378   * @return InputStream for the hfile link.
379   * @throws IOException on unexpected error.
380   */
381  public FileStatus getFileStatus(FileSystem fs) throws IOException {
382    IOException exception = null;
383    for (int i = 0; i < locations.length; ++i) {
384      try {
385        return fs.getFileStatus(locations[i]);
386      } catch (FileNotFoundException | AccessControlException e) {
387        exception = handleAccessLocationException(this, e, exception);
388      }
389    }
390    throw exception;
391  }
392
393  /**
394   * Handle exceptions which are thrown when access locations of file link
395   * @param fileLink          the file link
396   * @param newException      the exception caught by access the current location
397   * @param previousException the previous exception caught by access the other locations
398   * @return return AccessControlException if access one of the locations caught, otherwise return
399   *         FileNotFoundException. The AccessControlException is threw if user scan snapshot
400   *         feature is enabled, see
401   *         {@link org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController}.
402   * @throws IOException if the exception is neither AccessControlException nor
403   *                     FileNotFoundException
404   */
405  private static IOException handleAccessLocationException(FileLink fileLink,
406    IOException newException, IOException previousException) throws IOException {
407    if (newException instanceof RemoteException) {
408      newException = ((RemoteException) newException)
409        .unwrapRemoteException(FileNotFoundException.class, AccessControlException.class);
410    }
411    if (newException instanceof FileNotFoundException) {
412      // Try another file location
413      if (previousException == null) {
414        previousException = new FileNotFoundException(fileLink.toString());
415      }
416    } else if (newException instanceof AccessControlException) {
417      // Try another file location
418      previousException = newException;
419    } else {
420      throw newException;
421    }
422    return previousException;
423  }
424
425  /**
426   * Open the FileLink for read.
427   * <p>
428   * It uses a wrapper of FSDataInputStream that is agnostic to the location of the file, even if
429   * the file switches between locations.
430   * @param fs {@link FileSystem} on which to open the FileLink
431   * @return InputStream for reading the file link.
432   * @throws IOException on unexpected error.
433   */
434  public FSDataInputStream open(final FileSystem fs) throws IOException {
435    return new FSDataInputStream(new FileLinkInputStream(fs, this));
436  }
437
438  /**
439   * Open the FileLink for read.
440   * <p>
441   * It uses a wrapper of FSDataInputStream that is agnostic to the location of the file, even if
442   * the file switches between locations.
443   * @param fs         {@link FileSystem} on which to open the FileLink
444   * @param bufferSize the size of the buffer to be used.
445   * @return InputStream for reading the file link.
446   * @throws IOException on unexpected error.
447   */
448  public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
449    return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
450  }
451
452  /**
453   * If the passed FSDataInputStream is backed by a FileLink, returns the underlying InputStream for
454   * the resolved link target. Otherwise, returns null.
455   */
456  public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
457    if (stream.getWrappedStream() instanceof FileLinkInputStream) {
458      return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
459    }
460    return null;
461  }
462
463  /**
464   * NOTE: This method must be used only in the constructor! It creates a List with the specified
465   * locations for the link.
466   */
467  protected void setLocations(Path originPath, Path... alternativePaths) {
468    assert this.locations == null : "Link locations already set";
469
470    List<Path> paths = new ArrayList<>(alternativePaths.length + 1);
471    if (originPath != null) {
472      paths.add(originPath);
473    }
474
475    for (int i = 0; i < alternativePaths.length; i++) {
476      if (alternativePaths[i] != null) {
477        paths.add(alternativePaths[i]);
478      }
479    }
480    this.locations = paths.toArray(new Path[0]);
481  }
482
483  /**
484   * Get the directory to store the link back references
485   * <p>
486   * To simplify the reference count process, during the FileLink creation a back-reference is added
487   * to the back-reference directory of the specified file.
488   * @param storeDir Root directory for the link reference folder
489   * @param fileName File Name with links
490   * @return Path for the link back references.
491   */
492  public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
493    return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
494  }
495
496  /**
497   * Get the referenced file name from the reference link directory path.
498   * @param dirPath Link references directory path
499   * @return Name of the file referenced
500   */
501  public static String getBackReferenceFileName(final Path dirPath) {
502    return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
503  }
504
505  /**
506   * Checks if the specified directory path is a back reference links folder.
507   * @param dirPath Directory path to verify
508   * @return True if the specified directory is a link references folder
509   */
510  public static boolean isBackReferencesDir(final Path dirPath) {
511    if (dirPath == null) {
512      return false;
513    }
514    return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
515  }
516
517  @Override
518  public boolean equals(Object obj) {
519    if (obj == null) {
520      return false;
521    }
522    // Assumes that the ordering of locations between objects are the same. This is true for the
523    // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
524    // or keep them presorted
525    if (this.getClass().equals(obj.getClass())) {
526      return Arrays.equals(this.locations, ((FileLink) obj).locations);
527    }
528
529    return false;
530  }
531
532  @Override
533  public int hashCode() {
534    return Arrays.hashCode(locations);
535  }
536}