001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.FileNotFoundException;
027import java.util.List;
028
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.fs.CanSetDropBehind;
033import org.apache.hadoop.fs.CanSetReadahead;
034import org.apache.hadoop.fs.CanUnbuffer;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.PositionedReadable;
040import org.apache.hadoop.fs.Seekable;
041import org.apache.hadoop.hbase.util.FSUtils;
042import org.apache.hadoop.ipc.RemoteException;
043
044/**
045 * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
046 *
047 * <p><b>The Problem:</b>
048 * <ul>
049 *  <li>
050 *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
051 *    the same data blocks using different names.
052 *  </li>
053 *  <li>
054 *    HBase store files in one location (e.g. table/region/family/) and when the file is not
055 *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
056 *  </li>
057 * </ul>
058 * If we want to create a reference to a file, we need to remember that it can be in its
059 * original location or in the archive folder.
060 * The FileLink class tries to abstract this concept and given a set of locations
061 * it is able to switch between them making this operation transparent for the user.
062 * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
063 *
064 * <p><b>Back-references:</b>
065 * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
066 * the links to a particular file, during the {@code FileLink} creation, a new file is placed
067 * inside a back-reference directory. There's one back-reference directory for each file that
068 * has links, and in the directory there's one file per link.
069 *
070 * <p>HFileLink Example
071 * <ul>
072 *  <li>
073 *      /hbase/table/region-x/cf/file-k
074 *      (Original File)
075 *  </li>
076 *  <li>
077 *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
078 *     (HFileLink to the original file)
079 *  </li>
080 *  <li>
081 *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
082 *      (HFileLink to the original file)
083 *  </li>
084 *  <li>
085 *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
086 *      (Back-reference to the link in table-cloned)
087 *  </li>
088 *  <li>
089 *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
090 *      (Back-reference to the link in table-2nd-cloned)
091 *  </li>
092 * </ul>
093 */
094@InterfaceAudience.Private
095public class FileLink {
096  private static final Logger LOG = LoggerFactory.getLogger(FileLink.class);
097
098  /** Define the Back-reference directory name prefix: .links-&lt;hfile&gt;/ */
099  public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
100
101  /**
102   * FileLink InputStream that handles the switch between the original path
103   * and the alternative locations, when the file is moved.
104   */
105  private static class FileLinkInputStream extends InputStream
106      implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
107    private FSDataInputStream in = null;
108    private Path currentPath = null;
109    private long pos = 0;
110
111    private final FileLink fileLink;
112    private final int bufferSize;
113    private final FileSystem fs;
114
115    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
116        throws IOException {
117      this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
118    }
119
120    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
121        throws IOException {
122      this.bufferSize = bufferSize;
123      this.fileLink = fileLink;
124      this.fs = fs;
125
126      this.in = tryOpen();
127    }
128
129    @Override
130    public int read() throws IOException {
131      int res;
132      try {
133        res = in.read();
134      } catch (FileNotFoundException e) {
135        res = tryOpen().read();
136      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
137        res = tryOpen().read();
138      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
139        res = tryOpen().read();
140      }
141      if (res > 0) pos += 1;
142      return res;
143    }
144
145    @Override
146    public int read(byte[] b) throws IOException {
147       return read(b, 0, b.length);
148    }
149
150    @Override
151    public int read(byte[] b, int off, int len) throws IOException {
152      int n;
153      try {
154        n = in.read(b, off, len);
155      } catch (FileNotFoundException e) {
156        n = tryOpen().read(b, off, len);
157      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
158        n = tryOpen().read(b, off, len);
159      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
160        n = tryOpen().read(b, off, len);
161      }
162      if (n > 0) pos += n;
163      assert(in.getPos() == pos);
164      return n;
165    }
166
167    @Override
168    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
169      int n;
170      try {
171        n = in.read(position, buffer, offset, length);
172      } catch (FileNotFoundException e) {
173        n = tryOpen().read(position, buffer, offset, length);
174      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
175        n = tryOpen().read(position, buffer, offset, length);
176      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
177        n = tryOpen().read(position, buffer, offset, length);
178      }
179      return n;
180    }
181
182    @Override
183    public void readFully(long position, byte[] buffer) throws IOException {
184      readFully(position, buffer, 0, buffer.length);
185    }
186
187    @Override
188    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
189      try {
190        in.readFully(position, buffer, offset, length);
191      } catch (FileNotFoundException e) {
192        tryOpen().readFully(position, buffer, offset, length);
193      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
194        tryOpen().readFully(position, buffer, offset, length);
195      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
196        tryOpen().readFully(position, buffer, offset, length);
197      }
198    }
199
200    @Override
201    public long skip(long n) throws IOException {
202      long skipped;
203
204      try {
205        skipped = in.skip(n);
206      } catch (FileNotFoundException e) {
207        skipped = tryOpen().skip(n);
208      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
209        skipped = tryOpen().skip(n);
210      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
211        skipped = tryOpen().skip(n);
212      }
213
214      if (skipped > 0) pos += skipped;
215      return skipped;
216    }
217
218    @Override
219    public int available() throws IOException {
220      try {
221        return in.available();
222      } catch (FileNotFoundException e) {
223        return tryOpen().available();
224      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
225        return tryOpen().available();
226      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
227        return tryOpen().available();
228      }
229    }
230
231    @Override
232    public void seek(long pos) throws IOException {
233      try {
234        in.seek(pos);
235      } catch (FileNotFoundException e) {
236        tryOpen().seek(pos);
237      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
238        tryOpen().seek(pos);
239      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
240        tryOpen().seek(pos);
241      }
242      this.pos = pos;
243    }
244
245    @Override
246    public long getPos() throws IOException {
247      return pos;
248    }
249
250    @Override
251    public boolean seekToNewSource(long targetPos) throws IOException {
252      boolean res;
253      try {
254        res = in.seekToNewSource(targetPos);
255      } catch (FileNotFoundException e) {
256        res = tryOpen().seekToNewSource(targetPos);
257      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
258        res = tryOpen().seekToNewSource(targetPos);
259      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
260        res = tryOpen().seekToNewSource(targetPos);
261      }
262      if (res) pos = targetPos;
263      return res;
264    }
265
266    @Override
267    public void close() throws IOException {
268      in.close();
269    }
270
271    @Override
272    public synchronized void mark(int readlimit) {
273    }
274
275    @Override
276    public synchronized void reset() throws IOException {
277      throw new IOException("mark/reset not supported");
278    }
279
280    @Override
281    public boolean markSupported() {
282      return false;
283    }
284
285    @Override
286    public void unbuffer() {
287      if (in == null) {
288        return;
289      }
290      in.unbuffer();
291    }
292
293    /**
294     * Try to open the file from one of the available locations.
295     *
296     * @return FSDataInputStream stream of the opened file link
297     * @throws IOException on unexpected error, or file not found.
298     */
299    private FSDataInputStream tryOpen() throws IOException {
300      for (Path path: fileLink.getLocations()) {
301        if (path.equals(currentPath)) continue;
302        try {
303          in = fs.open(path, bufferSize);
304          if (pos != 0) in.seek(pos);
305          assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
306          if (LOG.isTraceEnabled()) {
307            if (currentPath == null) {
308              LOG.debug("link open path=" + path);
309            } else {
310              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
311            }
312          }
313          currentPath = path;
314          return(in);
315        } catch (FileNotFoundException e) {
316          // Try another file location
317        } catch (RemoteException re) {
318          IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
319          if (!(ioe instanceof FileNotFoundException)) throw re;
320        }
321      }
322      throw new FileNotFoundException(this.fileLink.toString());
323    }
324
325    @Override
326    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
327      in.setReadahead(readahead);
328    }
329
330    @Override
331    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
332      in.setDropBehind(dropCache);
333    }
334  }
335
336  private Path[] locations = null;
337
338  protected FileLink() {
339    this.locations = null;
340  }
341
342  /**
343   * @param originPath Original location of the file to link
344   * @param alternativePaths Alternative locations to look for the linked file
345   */
346  public FileLink(Path originPath, Path... alternativePaths) {
347    setLocations(originPath, alternativePaths);
348  }
349
350  /**
351   * @param locations locations to look for the linked file
352   */
353  public FileLink(final Collection<Path> locations) {
354    this.locations = locations.toArray(new Path[locations.size()]);
355  }
356
357  /**
358   * @return the locations to look for the linked file.
359   */
360  public Path[] getLocations() {
361    return locations;
362  }
363
364  @Override
365  public String toString() {
366    StringBuilder str = new StringBuilder(getClass().getSimpleName());
367    str.append(" locations=[");
368    for (int i = 0; i < locations.length; ++i) {
369      if (i > 0) str.append(", ");
370      str.append(locations[i].toString());
371    }
372    str.append("]");
373    return str.toString();
374  }
375
376  /**
377   * @return true if the file pointed by the link exists
378   */
379  public boolean exists(final FileSystem fs) throws IOException {
380    for (int i = 0; i < locations.length; ++i) {
381      if (fs.exists(locations[i])) {
382        return true;
383      }
384    }
385    return false;
386  }
387
388  /**
389   * @return the path of the first available link.
390   */
391  public Path getAvailablePath(FileSystem fs) throws IOException {
392    for (int i = 0; i < locations.length; ++i) {
393      if (fs.exists(locations[i])) {
394        return locations[i];
395      }
396    }
397    throw new FileNotFoundException(toString());
398  }
399
400  /**
401   * Get the FileStatus of the referenced file.
402   *
403   * @param fs {@link FileSystem} on which to get the file status
404   * @return InputStream for the hfile link.
405   * @throws IOException on unexpected error.
406   */
407  public FileStatus getFileStatus(FileSystem fs) throws IOException {
408    for (int i = 0; i < locations.length; ++i) {
409      try {
410        return fs.getFileStatus(locations[i]);
411      } catch (FileNotFoundException e) {
412        // Try another file location
413      }
414    }
415    throw new FileNotFoundException(toString());
416  }
417
418  /**
419   * Open the FileLink for read.
420   * <p>
421   * It uses a wrapper of FSDataInputStream that is agnostic to the location
422   * of the file, even if the file switches between locations.
423   *
424   * @param fs {@link FileSystem} on which to open the FileLink
425   * @return InputStream for reading the file link.
426   * @throws IOException on unexpected error.
427   */
428  public FSDataInputStream open(final FileSystem fs) throws IOException {
429    return new FSDataInputStream(new FileLinkInputStream(fs, this));
430  }
431
432  /**
433   * Open the FileLink for read.
434   * <p>
435   * It uses a wrapper of FSDataInputStream that is agnostic to the location
436   * of the file, even if the file switches between locations.
437   *
438   * @param fs {@link FileSystem} on which to open the FileLink
439   * @param bufferSize the size of the buffer to be used.
440   * @return InputStream for reading the file link.
441   * @throws IOException on unexpected error.
442   */
443  public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
444    return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
445  }
446
447  /**
448   * NOTE: This method must be used only in the constructor!
449   * It creates a List with the specified locations for the link.
450   */
451  protected void setLocations(Path originPath, Path... alternativePaths) {
452    assert this.locations == null : "Link locations already set";
453
454    List<Path> paths = new ArrayList<>(alternativePaths.length +1);
455    if (originPath != null) {
456      paths.add(originPath);
457    }
458
459    for (int i = 0; i < alternativePaths.length; i++) {
460      if (alternativePaths[i] != null) {
461        paths.add(alternativePaths[i]);
462      }
463    }
464    this.locations = paths.toArray(new Path[0]);
465  }
466
467  /**
468   * Get the directory to store the link back references
469   *
470   * <p>To simplify the reference count process, during the FileLink creation
471   * a back-reference is added to the back-reference directory of the specified file.
472   *
473   * @param storeDir Root directory for the link reference folder
474   * @param fileName File Name with links
475   * @return Path for the link back references.
476   */
477  public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
478    return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
479  }
480
481  /**
482   * Get the referenced file name from the reference link directory path.
483   *
484   * @param dirPath Link references directory path
485   * @return Name of the file referenced
486   */
487  public static String getBackReferenceFileName(final Path dirPath) {
488    return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
489  }
490
491  /**
492   * Checks if the specified directory path is a back reference links folder.
493   *
494   * @param dirPath Directory path to verify
495   * @return True if the specified directory is a link references folder
496   */
497  public static boolean isBackReferencesDir(final Path dirPath) {
498    if (dirPath == null) return false;
499    return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
500  }
501
502  @Override
503  public boolean equals(Object obj) {
504    if (obj == null) {
505      return false;
506    }
507    // Assumes that the ordering of locations between objects are the same. This is true for the
508    // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
509    // or keep them presorted
510    if (this.getClass().equals(obj.getClass())) {
511      return Arrays.equals(this.locations, ((FileLink) obj).locations);
512    }
513
514    return false;
515  }
516
517  @Override
518  public int hashCode() {
519    return Arrays.hashCode(locations);
520  }
521}
522