001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.List;
028import org.apache.hadoop.fs.CanSetDropBehind;
029import org.apache.hadoop.fs.CanSetReadahead;
030import org.apache.hadoop.fs.CanUnbuffer;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PositionedReadable;
036import org.apache.hadoop.fs.Seekable;
037import org.apache.hadoop.hbase.util.CommonFSUtils;
038import org.apache.hadoop.ipc.RemoteException;
039import org.apache.hadoop.security.AccessControlException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
046 *
047 * <p><b>The Problem:</b>
048 * <ul>
049 *  <li>
050 *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
051 *    the same data blocks using different names.
052 *  </li>
053 *  <li>
054 *    HBase store files in one location (e.g. table/region/family/) and when the file is not
055 *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
056 *  </li>
057 * </ul>
058 * If we want to create a reference to a file, we need to remember that it can be in its
059 * original location or in the archive folder.
060 * The FileLink class tries to abstract this concept and given a set of locations
061 * it is able to switch between them making this operation transparent for the user.
062 * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
063 *
064 * <p><b>Back-references:</b>
065 * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
066 * the links to a particular file, during the {@code FileLink} creation, a new file is placed
067 * inside a back-reference directory. There's one back-reference directory for each file that
068 * has links, and in the directory there's one file per link.
069 *
070 * <p>HFileLink Example
071 * <ul>
072 *  <li>
073 *      /hbase/table/region-x/cf/file-k
074 *      (Original File)
075 *  </li>
076 *  <li>
077 *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
078 *     (HFileLink to the original file)
079 *  </li>
080 *  <li>
081 *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
082 *      (HFileLink to the original file)
083 *  </li>
084 *  <li>
085 *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
086 *      (Back-reference to the link in table-cloned)
087 *  </li>
088 *  <li>
089 *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
090 *      (Back-reference to the link in table-2nd-cloned)
091 *  </li>
092 * </ul>
093 */
094@InterfaceAudience.Private
095public class FileLink {
096  private static final Logger LOG = LoggerFactory.getLogger(FileLink.class);
097
098  /** Define the Back-reference directory name prefix: .links-&lt;hfile&gt;/ */
099  public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
100
101  /**
102   * FileLink InputStream that handles the switch between the original path
103   * and the alternative locations, when the file is moved.
104   */
105  private static class FileLinkInputStream extends InputStream
106      implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
107    private FSDataInputStream in = null;
108    private Path currentPath = null;
109    private long pos = 0;
110
111    private final FileLink fileLink;
112    private final int bufferSize;
113    private final FileSystem fs;
114
115    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
116        throws IOException {
117      this(fs, fileLink, CommonFSUtils.getDefaultBufferSize(fs));
118    }
119
120    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
121        throws IOException {
122      this.bufferSize = bufferSize;
123      this.fileLink = fileLink;
124      this.fs = fs;
125
126      this.in = tryOpen();
127    }
128
129    @Override
130    public int read() throws IOException {
131      int res;
132      try {
133        res = in.read();
134      } catch (FileNotFoundException e) {
135        res = tryOpen().read();
136      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
137        res = tryOpen().read();
138      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
139        res = tryOpen().read();
140      }
141      if (res > 0) pos += 1;
142      return res;
143    }
144
145    @Override
146    public int read(byte[] b) throws IOException {
147       return read(b, 0, b.length);
148    }
149
150    @Override
151    public int read(byte[] b, int off, int len) throws IOException {
152      int n;
153      try {
154        n = in.read(b, off, len);
155      } catch (FileNotFoundException e) {
156        n = tryOpen().read(b, off, len);
157      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
158        n = tryOpen().read(b, off, len);
159      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
160        n = tryOpen().read(b, off, len);
161      }
162      if (n > 0) pos += n;
163      assert(in.getPos() == pos);
164      return n;
165    }
166
167    @Override
168    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
169      int n;
170      try {
171        n = in.read(position, buffer, offset, length);
172      } catch (FileNotFoundException e) {
173        n = tryOpen().read(position, buffer, offset, length);
174      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
175        n = tryOpen().read(position, buffer, offset, length);
176      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
177        n = tryOpen().read(position, buffer, offset, length);
178      }
179      return n;
180    }
181
182    @Override
183    public void readFully(long position, byte[] buffer) throws IOException {
184      readFully(position, buffer, 0, buffer.length);
185    }
186
187    @Override
188    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
189      try {
190        in.readFully(position, buffer, offset, length);
191      } catch (FileNotFoundException e) {
192        tryOpen().readFully(position, buffer, offset, length);
193      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
194        tryOpen().readFully(position, buffer, offset, length);
195      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
196        tryOpen().readFully(position, buffer, offset, length);
197      }
198    }
199
200    @Override
201    public long skip(long n) throws IOException {
202      long skipped;
203
204      try {
205        skipped = in.skip(n);
206      } catch (FileNotFoundException e) {
207        skipped = tryOpen().skip(n);
208      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
209        skipped = tryOpen().skip(n);
210      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
211        skipped = tryOpen().skip(n);
212      }
213
214      if (skipped > 0) pos += skipped;
215      return skipped;
216    }
217
218    @Override
219    public int available() throws IOException {
220      try {
221        return in.available();
222      } catch (FileNotFoundException e) {
223        return tryOpen().available();
224      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
225        return tryOpen().available();
226      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
227        return tryOpen().available();
228      }
229    }
230
231    @Override
232    public void seek(long pos) throws IOException {
233      try {
234        in.seek(pos);
235      } catch (FileNotFoundException e) {
236        tryOpen().seek(pos);
237      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
238        tryOpen().seek(pos);
239      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
240        tryOpen().seek(pos);
241      }
242      this.pos = pos;
243    }
244
245    @Override
246    public long getPos() throws IOException {
247      return pos;
248    }
249
250    @Override
251    public boolean seekToNewSource(long targetPos) throws IOException {
252      boolean res;
253      try {
254        res = in.seekToNewSource(targetPos);
255      } catch (FileNotFoundException e) {
256        res = tryOpen().seekToNewSource(targetPos);
257      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
258        res = tryOpen().seekToNewSource(targetPos);
259      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
260        res = tryOpen().seekToNewSource(targetPos);
261      }
262      if (res) pos = targetPos;
263      return res;
264    }
265
266    @Override
267    public void close() throws IOException {
268      in.close();
269    }
270
271    @Override
272    public synchronized void mark(int readlimit) {
273    }
274
275    @Override
276    public synchronized void reset() throws IOException {
277      throw new IOException("mark/reset not supported");
278    }
279
280    @Override
281    public boolean markSupported() {
282      return false;
283    }
284
285    @Override
286    public void unbuffer() {
287      if (in == null) {
288        return;
289      }
290      in.unbuffer();
291    }
292
293    /**
294     * Try to open the file from one of the available locations.
295     *
296     * @return FSDataInputStream stream of the opened file link
297     * @throws IOException on unexpected error, or file not found.
298     */
299    private FSDataInputStream tryOpen() throws IOException {
300      IOException exception = null;
301      for (Path path: fileLink.getLocations()) {
302        if (path.equals(currentPath)) continue;
303        try {
304          in = fs.open(path, bufferSize);
305          if (pos != 0) in.seek(pos);
306          assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
307          if (LOG.isTraceEnabled()) {
308            if (currentPath == null) {
309              LOG.debug("link open path=" + path);
310            } else {
311              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
312            }
313          }
314          currentPath = path;
315          return(in);
316        } catch (FileNotFoundException | AccessControlException | RemoteException e) {
317          exception = FileLink.handleAccessLocationException(fileLink, e, exception);
318        }
319      }
320      throw exception;
321    }
322
323    @Override
324    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
325      in.setReadahead(readahead);
326    }
327
328    @Override
329    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
330      in.setDropBehind(dropCache);
331    }
332  }
333
334  private Path[] locations = null;
335
336  protected FileLink() {
337    this.locations = null;
338  }
339
340  /**
341   * @param originPath Original location of the file to link
342   * @param alternativePaths Alternative locations to look for the linked file
343   */
344  public FileLink(Path originPath, Path... alternativePaths) {
345    setLocations(originPath, alternativePaths);
346  }
347
348  /**
349   * @param locations locations to look for the linked file
350   */
351  public FileLink(final Collection<Path> locations) {
352    this.locations = locations.toArray(new Path[locations.size()]);
353  }
354
355  /**
356   * @return the locations to look for the linked file.
357   */
358  public Path[] getLocations() {
359    return locations;
360  }
361
362  @Override
363  public String toString() {
364    StringBuilder str = new StringBuilder(getClass().getSimpleName());
365    str.append(" locations=[");
366    for (int i = 0; i < locations.length; ++i) {
367      if (i > 0) str.append(", ");
368      str.append(locations[i].toString());
369    }
370    str.append("]");
371    return str.toString();
372  }
373
374  /**
375   * @return true if the file pointed by the link exists
376   */
377  public boolean exists(final FileSystem fs) throws IOException {
378    for (int i = 0; i < locations.length; ++i) {
379      if (fs.exists(locations[i])) {
380        return true;
381      }
382    }
383    return false;
384  }
385
386  /**
387   * @return the path of the first available link.
388   */
389  public Path getAvailablePath(FileSystem fs) throws IOException {
390    for (int i = 0; i < locations.length; ++i) {
391      if (fs.exists(locations[i])) {
392        return locations[i];
393      }
394    }
395    throw new FileNotFoundException(toString());
396  }
397
398  /**
399   * Get the FileStatus of the referenced file.
400   *
401   * @param fs {@link FileSystem} on which to get the file status
402   * @return InputStream for the hfile link.
403   * @throws IOException on unexpected error.
404   */
405  public FileStatus getFileStatus(FileSystem fs) throws IOException {
406    IOException exception = null;
407    for (int i = 0; i < locations.length; ++i) {
408      try {
409        return fs.getFileStatus(locations[i]);
410      } catch (FileNotFoundException | AccessControlException e) {
411        exception = handleAccessLocationException(this, e, exception);
412      }
413    }
414    throw exception;
415  }
416
417  /**
418   * Handle exceptions which are thrown when access locations of file link
419   * @param fileLink the file link
420   * @param newException the exception caught by access the current location
421   * @param previousException the previous exception caught by access the other locations
422   * @return return AccessControlException if access one of the locations caught, otherwise return
423   *         FileNotFoundException. The AccessControlException is threw if user scan snapshot
424   *         feature is enabled, see
425   *         {@link org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController}.
426   * @throws IOException if the exception is neither AccessControlException nor
427   *           FileNotFoundException
428   */
429  private static IOException handleAccessLocationException(FileLink fileLink,
430      IOException newException, IOException previousException) throws IOException {
431    if (newException instanceof RemoteException) {
432      newException = ((RemoteException) newException)
433          .unwrapRemoteException(FileNotFoundException.class, AccessControlException.class);
434    }
435    if (newException instanceof FileNotFoundException) {
436      // Try another file location
437      if (previousException == null) {
438        previousException = new FileNotFoundException(fileLink.toString());
439      }
440    } else if (newException instanceof AccessControlException) {
441      // Try another file location
442      previousException = newException;
443    } else {
444      throw newException;
445    }
446    return previousException;
447  }
448
449  /**
450   * Open the FileLink for read.
451   * <p>
452   * It uses a wrapper of FSDataInputStream that is agnostic to the location
453   * of the file, even if the file switches between locations.
454   *
455   * @param fs {@link FileSystem} on which to open the FileLink
456   * @return InputStream for reading the file link.
457   * @throws IOException on unexpected error.
458   */
459  public FSDataInputStream open(final FileSystem fs) throws IOException {
460    return new FSDataInputStream(new FileLinkInputStream(fs, this));
461  }
462
463  /**
464   * Open the FileLink for read.
465   * <p>
466   * It uses a wrapper of FSDataInputStream that is agnostic to the location
467   * of the file, even if the file switches between locations.
468   *
469   * @param fs {@link FileSystem} on which to open the FileLink
470   * @param bufferSize the size of the buffer to be used.
471   * @return InputStream for reading the file link.
472   * @throws IOException on unexpected error.
473   */
474  public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
475    return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
476  }
477
478  /**
479   * NOTE: This method must be used only in the constructor!
480   * It creates a List with the specified locations for the link.
481   */
482  protected void setLocations(Path originPath, Path... alternativePaths) {
483    assert this.locations == null : "Link locations already set";
484
485    List<Path> paths = new ArrayList<>(alternativePaths.length +1);
486    if (originPath != null) {
487      paths.add(originPath);
488    }
489
490    for (int i = 0; i < alternativePaths.length; i++) {
491      if (alternativePaths[i] != null) {
492        paths.add(alternativePaths[i]);
493      }
494    }
495    this.locations = paths.toArray(new Path[0]);
496  }
497
498  /**
499   * Get the directory to store the link back references
500   *
501   * <p>To simplify the reference count process, during the FileLink creation
502   * a back-reference is added to the back-reference directory of the specified file.
503   *
504   * @param storeDir Root directory for the link reference folder
505   * @param fileName File Name with links
506   * @return Path for the link back references.
507   */
508  public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
509    return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
510  }
511
512  /**
513   * Get the referenced file name from the reference link directory path.
514   *
515   * @param dirPath Link references directory path
516   * @return Name of the file referenced
517   */
518  public static String getBackReferenceFileName(final Path dirPath) {
519    return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
520  }
521
522  /**
523   * Checks if the specified directory path is a back reference links folder.
524   * @param dirPath Directory path to verify
525   * @return True if the specified directory is a link references folder
526   */
527  public static boolean isBackReferencesDir(final Path dirPath) {
528    if (dirPath == null) {
529      return false;
530    }
531    return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
532  }
533
534  @Override
535  public boolean equals(Object obj) {
536    if (obj == null) {
537      return false;
538    }
539    // Assumes that the ordering of locations between objects are the same. This is true for the
540    // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
541    // or keep them presorted
542    if (this.getClass().equals(obj.getClass())) {
543      return Arrays.equals(this.locations, ((FileLink) obj).locations);
544    }
545
546    return false;
547  }
548
549  @Override
550  public int hashCode() {
551    return Arrays.hashCode(locations);
552  }
553}
554