001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.FileNotFoundException;
027import java.util.List;
028
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.fs.CanSetDropBehind;
033import org.apache.hadoop.fs.CanSetReadahead;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PositionedReadable;
039import org.apache.hadoop.fs.Seekable;
040import org.apache.hadoop.hbase.util.FSUtils;
041import org.apache.hadoop.ipc.RemoteException;
042
043/**
044 * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
045 *
046 * <p><b>The Problem:</b>
047 * <ul>
048 *  <li>
049 *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
050 *    the same data blocks using different names.
051 *  </li>
052 *  <li>
053 *    HBase store files in one location (e.g. table/region/family/) and when the file is not
054 *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
055 *  </li>
056 * </ul>
057 * If we want to create a reference to a file, we need to remember that it can be in its
058 * original location or in the archive folder.
059 * The FileLink class tries to abstract this concept and given a set of locations
060 * it is able to switch between them making this operation transparent for the user.
061 * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
062 *
063 * <p><b>Back-references:</b>
064 * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
065 * the links to a particular file, during the {@code FileLink} creation, a new file is placed
066 * inside a back-reference directory. There's one back-reference directory for each file that
067 * has links, and in the directory there's one file per link.
068 *
069 * <p>HFileLink Example
070 * <ul>
071 *  <li>
072 *      /hbase/table/region-x/cf/file-k
073 *      (Original File)
074 *  </li>
075 *  <li>
076 *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
077 *     (HFileLink to the original file)
078 *  </li>
079 *  <li>
080 *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
081 *      (HFileLink to the original file)
082 *  </li>
083 *  <li>
084 *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
085 *      (Back-reference to the link in table-cloned)
086 *  </li>
087 *  <li>
088 *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
089 *      (Back-reference to the link in table-2nd-cloned)
090 *  </li>
091 * </ul>
092 */
093@InterfaceAudience.Private
094public class FileLink {
095  private static final Logger LOG = LoggerFactory.getLogger(FileLink.class);
096
097  /** Define the Back-reference directory name prefix: .links-&lt;hfile&gt;/ */
098  public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
099
100  /**
101   * FileLink InputStream that handles the switch between the original path
102   * and the alternative locations, when the file is moved.
103   */
104  private static class FileLinkInputStream extends InputStream
105      implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead {
106    private FSDataInputStream in = null;
107    private Path currentPath = null;
108    private long pos = 0;
109
110    private final FileLink fileLink;
111    private final int bufferSize;
112    private final FileSystem fs;
113
114    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
115        throws IOException {
116      this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
117    }
118
119    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
120        throws IOException {
121      this.bufferSize = bufferSize;
122      this.fileLink = fileLink;
123      this.fs = fs;
124
125      this.in = tryOpen();
126    }
127
128    @Override
129    public int read() throws IOException {
130      int res;
131      try {
132        res = in.read();
133      } catch (FileNotFoundException e) {
134        res = tryOpen().read();
135      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
136        res = tryOpen().read();
137      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
138        res = tryOpen().read();
139      }
140      if (res > 0) pos += 1;
141      return res;
142    }
143
144    @Override
145    public int read(byte[] b) throws IOException {
146       return read(b, 0, b.length);
147    }
148
149    @Override
150    public int read(byte[] b, int off, int len) throws IOException {
151      int n;
152      try {
153        n = in.read(b, off, len);
154      } catch (FileNotFoundException e) {
155        n = tryOpen().read(b, off, len);
156      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
157        n = tryOpen().read(b, off, len);
158      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
159        n = tryOpen().read(b, off, len);
160      }
161      if (n > 0) pos += n;
162      assert(in.getPos() == pos);
163      return n;
164    }
165
166    @Override
167    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
168      int n;
169      try {
170        n = in.read(position, buffer, offset, length);
171      } catch (FileNotFoundException e) {
172        n = tryOpen().read(position, buffer, offset, length);
173      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
174        n = tryOpen().read(position, buffer, offset, length);
175      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
176        n = tryOpen().read(position, buffer, offset, length);
177      }
178      return n;
179    }
180
181    @Override
182    public void readFully(long position, byte[] buffer) throws IOException {
183      readFully(position, buffer, 0, buffer.length);
184    }
185
186    @Override
187    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
188      try {
189        in.readFully(position, buffer, offset, length);
190      } catch (FileNotFoundException e) {
191        tryOpen().readFully(position, buffer, offset, length);
192      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
193        tryOpen().readFully(position, buffer, offset, length);
194      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
195        tryOpen().readFully(position, buffer, offset, length);
196      }
197    }
198
199    @Override
200    public long skip(long n) throws IOException {
201      long skipped;
202
203      try {
204        skipped = in.skip(n);
205      } catch (FileNotFoundException e) {
206        skipped = tryOpen().skip(n);
207      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
208        skipped = tryOpen().skip(n);
209      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
210        skipped = tryOpen().skip(n);
211      }
212
213      if (skipped > 0) pos += skipped;
214      return skipped;
215    }
216
217    @Override
218    public int available() throws IOException {
219      try {
220        return in.available();
221      } catch (FileNotFoundException e) {
222        return tryOpen().available();
223      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
224        return tryOpen().available();
225      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
226        return tryOpen().available();
227      }
228    }
229
230    @Override
231    public void seek(long pos) throws IOException {
232      try {
233        in.seek(pos);
234      } catch (FileNotFoundException e) {
235        tryOpen().seek(pos);
236      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
237        tryOpen().seek(pos);
238      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
239        tryOpen().seek(pos);
240      }
241      this.pos = pos;
242    }
243
244    @Override
245    public long getPos() throws IOException {
246      return pos;
247    }
248
249    @Override
250    public boolean seekToNewSource(long targetPos) throws IOException {
251      boolean res;
252      try {
253        res = in.seekToNewSource(targetPos);
254      } catch (FileNotFoundException e) {
255        res = tryOpen().seekToNewSource(targetPos);
256      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
257        res = tryOpen().seekToNewSource(targetPos);
258      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
259        res = tryOpen().seekToNewSource(targetPos);
260      }
261      if (res) pos = targetPos;
262      return res;
263    }
264
265    @Override
266    public void close() throws IOException {
267      in.close();
268    }
269
270    @Override
271    public synchronized void mark(int readlimit) {
272    }
273
274    @Override
275    public synchronized void reset() throws IOException {
276      throw new IOException("mark/reset not supported");
277    }
278
279    @Override
280    public boolean markSupported() {
281      return false;
282    }
283
284    /**
285     * Try to open the file from one of the available locations.
286     *
287     * @return FSDataInputStream stream of the opened file link
288     * @throws IOException on unexpected error, or file not found.
289     */
290    private FSDataInputStream tryOpen() throws IOException {
291      for (Path path: fileLink.getLocations()) {
292        if (path.equals(currentPath)) continue;
293        try {
294          in = fs.open(path, bufferSize);
295          if (pos != 0) in.seek(pos);
296          assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
297          if (LOG.isTraceEnabled()) {
298            if (currentPath == null) {
299              LOG.debug("link open path=" + path);
300            } else {
301              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
302            }
303          }
304          currentPath = path;
305          return(in);
306        } catch (FileNotFoundException e) {
307          // Try another file location
308        } catch (RemoteException re) {
309          IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
310          if (!(ioe instanceof FileNotFoundException)) throw re;
311        }
312      }
313      throw new FileNotFoundException("Unable to open link: " + fileLink);
314    }
315
316    @Override
317    public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
318      in.setReadahead(readahead);
319    }
320
321    @Override
322    public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
323      in.setDropBehind(dropCache);
324    }
325  }
326
327  private Path[] locations = null;
328
329  protected FileLink() {
330    this.locations = null;
331  }
332
333  /**
334   * @param originPath Original location of the file to link
335   * @param alternativePaths Alternative locations to look for the linked file
336   */
337  public FileLink(Path originPath, Path... alternativePaths) {
338    setLocations(originPath, alternativePaths);
339  }
340
341  /**
342   * @param locations locations to look for the linked file
343   */
344  public FileLink(final Collection<Path> locations) {
345    this.locations = locations.toArray(new Path[locations.size()]);
346  }
347
348  /**
349   * @return the locations to look for the linked file.
350   */
351  public Path[] getLocations() {
352    return locations;
353  }
354
355  @Override
356  public String toString() {
357    StringBuilder str = new StringBuilder(getClass().getName());
358    str.append(" locations=[");
359    for (int i = 0; i < locations.length; ++i) {
360      if (i > 0) str.append(", ");
361      str.append(locations[i].toString());
362    }
363    str.append("]");
364    return str.toString();
365  }
366
367  /**
368   * @return true if the file pointed by the link exists
369   */
370  public boolean exists(final FileSystem fs) throws IOException {
371    for (int i = 0; i < locations.length; ++i) {
372      if (fs.exists(locations[i])) {
373        return true;
374      }
375    }
376    return false;
377  }
378
379  /**
380   * @return the path of the first available link.
381   */
382  public Path getAvailablePath(FileSystem fs) throws IOException {
383    for (int i = 0; i < locations.length; ++i) {
384      if (fs.exists(locations[i])) {
385        return locations[i];
386      }
387    }
388    throw new FileNotFoundException("Unable to open link: " + this);
389  }
390
391  /**
392   * Get the FileStatus of the referenced file.
393   *
394   * @param fs {@link FileSystem} on which to get the file status
395   * @return InputStream for the hfile link.
396   * @throws IOException on unexpected error.
397   */
398  public FileStatus getFileStatus(FileSystem fs) throws IOException {
399    for (int i = 0; i < locations.length; ++i) {
400      try {
401        return fs.getFileStatus(locations[i]);
402      } catch (FileNotFoundException e) {
403        // Try another file location
404      }
405    }
406    throw new FileNotFoundException("Unable to open link: " + this);
407  }
408
409  /**
410   * Open the FileLink for read.
411   * <p>
412   * It uses a wrapper of FSDataInputStream that is agnostic to the location
413   * of the file, even if the file switches between locations.
414   *
415   * @param fs {@link FileSystem} on which to open the FileLink
416   * @return InputStream for reading the file link.
417   * @throws IOException on unexpected error.
418   */
419  public FSDataInputStream open(final FileSystem fs) throws IOException {
420    return new FSDataInputStream(new FileLinkInputStream(fs, this));
421  }
422
423  /**
424   * Open the FileLink for read.
425   * <p>
426   * It uses a wrapper of FSDataInputStream that is agnostic to the location
427   * of the file, even if the file switches between locations.
428   *
429   * @param fs {@link FileSystem} on which to open the FileLink
430   * @param bufferSize the size of the buffer to be used.
431   * @return InputStream for reading the file link.
432   * @throws IOException on unexpected error.
433   */
434  public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
435    return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
436  }
437
438  /**
439   * NOTE: This method must be used only in the constructor!
440   * It creates a List with the specified locations for the link.
441   */
442  protected void setLocations(Path originPath, Path... alternativePaths) {
443    assert this.locations == null : "Link locations already set";
444
445    List<Path> paths = new ArrayList<>(alternativePaths.length +1);
446    if (originPath != null) {
447      paths.add(originPath);
448    }
449
450    for (int i = 0; i < alternativePaths.length; i++) {
451      if (alternativePaths[i] != null) {
452        paths.add(alternativePaths[i]);
453      }
454    }
455    this.locations = paths.toArray(new Path[0]);
456  }
457
458  /**
459   * Get the directory to store the link back references
460   *
461   * <p>To simplify the reference count process, during the FileLink creation
462   * a back-reference is added to the back-reference directory of the specified file.
463   *
464   * @param storeDir Root directory for the link reference folder
465   * @param fileName File Name with links
466   * @return Path for the link back references.
467   */
468  public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
469    return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
470  }
471
472  /**
473   * Get the referenced file name from the reference link directory path.
474   *
475   * @param dirPath Link references directory path
476   * @return Name of the file referenced
477   */
478  public static String getBackReferenceFileName(final Path dirPath) {
479    return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
480  }
481
482  /**
483   * Checks if the specified directory path is a back reference links folder.
484   *
485   * @param dirPath Directory path to verify
486   * @return True if the specified directory is a link references folder
487   */
488  public static boolean isBackReferencesDir(final Path dirPath) {
489    if (dirPath == null) return false;
490    return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
491  }
492
493  @Override
494  public boolean equals(Object obj) {
495    if (obj == null) {
496      return false;
497    }
498    // Assumes that the ordering of locations between objects are the same. This is true for the
499    // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
500    // or keep them presorted
501    if (this.getClass().equals(obj.getClass())) {
502      return Arrays.equals(this.locations, ((FileLink) obj).locations);
503    }
504
505    return false;
506  }
507
508  @Override
509  public int hashCode() {
510    return Arrays.hashCode(locations);
511  }
512}
513