View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.FileNotFoundException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.fs.PositionedReadable;
37  import org.apache.hadoop.fs.Seekable;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  
40  /**
41   * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
42   *
43   * <p><b>The Problem:</b>
44   * <ul>
45   *  <li>
46   *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
47   *    the same data blocks using different names.
48   *  </li>
49   *  <li>
50   *    HBase store files in one location (e.g. table/region/family/) and when the file is not
51   *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
52   *  </li>
53   * </ul>
54   * If we want to create a reference to a file, we need to remember that it can be in its
55   * original location or in the archive folder.
56   * The FileLink class tries to abstract this concept and given a set of locations
57   * it is able to switch between them making this operation transparent for the user.
58   * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
59   *
60   * <p><b>Back-references:</b>
61   * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
62   * the links to a particular file, during the {@code FileLink} creation, a new file is placed
63   * inside a back-reference directory. There's one back-reference directory for each file that
64   * has links, and in the directory there's one file per link.
65   *
66   * <p>HFileLink Example
67   * <ul>
68   *  <li>
69   *      /hbase/table/region-x/cf/file-k
70   *      (Original File)
71   *  </li>
72   *  <li>
73   *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
74   *     (HFileLink to the original file)
75   *  </li>
76   *  <li>
77   *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
78   *      (HFileLink to the original file)
79   *  </li>
80   *  <li>
81   *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
82   *      (Back-reference to the link in table-cloned)
83   *  </li>
84   *  <li>
85   *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
86   *      (Back-reference to the link in table-2nd-cloned)
87   *  </li>
88   * </ul>
89   */
90  @InterfaceAudience.Private
91  public class FileLink {
92    private static final Log LOG = LogFactory.getLog(FileLink.class);
93  
94    /** Define the Back-reference directory name prefix: .links-&lt;hfile&gt;/ */
95    public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
96  
97    /**
98     * FileLink InputStream that handles the switch between the original path
99     * and the alternative locations, when the file is moved.
100    */
101   private static class FileLinkInputStream extends InputStream
102       implements Seekable, PositionedReadable {
103     private FSDataInputStream in = null;
104     private Path currentPath = null;
105     private long pos = 0;
106 
107     private final FileLink fileLink;
108     private final int bufferSize;
109     private final FileSystem fs;
110 
111     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
112         throws IOException {
113       this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
114     }
115 
116     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
117         throws IOException {
118       this.bufferSize = bufferSize;
119       this.fileLink = fileLink;
120       this.fs = fs;
121 
122       this.in = tryOpen();
123     }
124 
125     @Override
126     public int read() throws IOException {
127       int res;
128       try {
129         res = in.read();
130       } catch (FileNotFoundException e) {
131         res = tryOpen().read();
132       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
133         res = tryOpen().read();
134       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
135         res = tryOpen().read();
136       }
137       if (res > 0) pos += 1;
138       return res;
139     }
140 
141     @Override
142     public int read(byte[] b) throws IOException {
143        return read(b, 0, b.length);
144     }
145 
146     @Override
147     public int read(byte[] b, int off, int len) throws IOException {
148       int n;
149       try {
150         n = in.read(b, off, len);
151       } catch (FileNotFoundException e) {
152         n = tryOpen().read(b, off, len);
153       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
154         n = tryOpen().read(b, off, len);
155       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
156         n = tryOpen().read(b, off, len);
157       }
158       if (n > 0) pos += n;
159       assert(in.getPos() == pos);
160       return n;
161     }
162 
163     @Override
164     public int read(long position, byte[] buffer, int offset, int length) throws IOException {
165       int n;
166       try {
167         n = in.read(position, buffer, offset, length);
168       } catch (FileNotFoundException e) {
169         n = tryOpen().read(position, buffer, offset, length);
170       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
171         n = tryOpen().read(position, buffer, offset, length);
172       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
173         n = tryOpen().read(position, buffer, offset, length);
174       }
175       return n;
176     }
177 
178     @Override
179     public void readFully(long position, byte[] buffer) throws IOException {
180       readFully(position, buffer, 0, buffer.length);
181     }
182 
183     @Override
184     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
185       try {
186         in.readFully(position, buffer, offset, length);
187       } catch (FileNotFoundException e) {
188         tryOpen().readFully(position, buffer, offset, length);
189       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
190         tryOpen().readFully(position, buffer, offset, length);
191       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
192         tryOpen().readFully(position, buffer, offset, length);
193       }
194     }
195 
196     @Override
197     public long skip(long n) throws IOException {
198       long skipped;
199 
200       try {
201         skipped = in.skip(n);
202       } catch (FileNotFoundException e) {
203         skipped = tryOpen().skip(n);
204       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
205         skipped = tryOpen().skip(n);
206       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
207         skipped = tryOpen().skip(n);
208       }
209 
210       if (skipped > 0) pos += skipped;
211       return skipped;
212     }
213 
214     @Override
215     public int available() throws IOException {
216       try {
217         return in.available();
218       } catch (FileNotFoundException e) {
219         return tryOpen().available();
220       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
221         return tryOpen().available();
222       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
223         return tryOpen().available();
224       }
225     }
226 
227     @Override
228     public void seek(long pos) throws IOException {
229       try {
230         in.seek(pos);
231       } catch (FileNotFoundException e) {
232         tryOpen().seek(pos);
233       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
234         tryOpen().seek(pos);
235       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
236         tryOpen().seek(pos);
237       }
238       this.pos = pos;
239     }
240 
241     @Override
242     public long getPos() throws IOException {
243       return pos;
244     }
245 
246     @Override
247     public boolean seekToNewSource(long targetPos) throws IOException {
248       boolean res;
249       try {
250         res = in.seekToNewSource(targetPos);
251       } catch (FileNotFoundException e) {
252         res = tryOpen().seekToNewSource(targetPos);
253       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
254         res = tryOpen().seekToNewSource(targetPos);
255       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
256         res = tryOpen().seekToNewSource(targetPos);
257       }
258       if (res) pos = targetPos;
259       return res;
260     }
261 
262     @Override
263     public void close() throws IOException {
264       in.close();
265     }
266 
267     @Override
268     public synchronized void mark(int readlimit) {
269     }
270 
271     @Override
272     public synchronized void reset() throws IOException {
273       throw new IOException("mark/reset not supported");
274     }
275 
276     @Override
277     public boolean markSupported() {
278       return false;
279     }
280 
281     /**
282      * Try to open the file from one of the available locations.
283      *
284      * @return FSDataInputStream stream of the opened file link
285      * @throws IOException on unexpected error, or file not found.
286      */
287     private FSDataInputStream tryOpen() throws IOException {
288       for (Path path: fileLink.getLocations()) {
289         if (path.equals(currentPath)) continue;
290         try {
291           in = fs.open(path, bufferSize);
292           if (pos != 0) in.seek(pos);
293           assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
294           if (LOG.isTraceEnabled()) {
295             if (currentPath == null) {
296               LOG.debug("link open path=" + path);
297             } else {
298               LOG.trace("link switch from path=" + currentPath + " to path=" + path);
299             }
300           }
301           currentPath = path;
302           return(in);
303         } catch (FileNotFoundException e) {
304           // Try another file location
305         }
306       }
307       throw new FileNotFoundException("Unable to open link: " + fileLink);
308     }
309   }
310 
311   private Path[] locations = null;
312 
313   protected FileLink() {
314     this.locations = null;
315   }
316 
317   /**
318    * @param originPath Original location of the file to link
319    * @param alternativePaths Alternative locations to look for the linked file
320    */
321   public FileLink(Path originPath, Path... alternativePaths) {
322     setLocations(originPath, alternativePaths);
323   }
324 
325   /**
326    * @param locations locations to look for the linked file
327    */
328   public FileLink(final Collection<Path> locations) {
329     this.locations = locations.toArray(new Path[locations.size()]);
330   }
331 
332   /**
333    * @return the locations to look for the linked file.
334    */
335   public Path[] getLocations() {
336     return locations;
337   }
338 
339   @Override
340   public String toString() {
341     StringBuilder str = new StringBuilder(getClass().getName());
342     str.append(" locations=[");
343     for (int i = 0; i < locations.length; ++i) {
344       if (i > 0) str.append(", ");
345       str.append(locations[i].toString());
346     }
347     str.append("]");
348     return str.toString();
349   }
350 
351   /**
352    * @return true if the file pointed by the link exists
353    */
354   public boolean exists(final FileSystem fs) throws IOException {
355     for (int i = 0; i < locations.length; ++i) {
356       if (fs.exists(locations[i])) {
357         return true;
358       }
359     }
360     return false;
361   }
362 
363   /**
364    * @return the path of the first available link.
365    */
366   public Path getAvailablePath(FileSystem fs) throws IOException {
367     for (int i = 0; i < locations.length; ++i) {
368       if (fs.exists(locations[i])) {
369         return locations[i];
370       }
371     }
372     throw new FileNotFoundException("Unable to open link: " + this);
373   }
374 
375   /**
376    * Get the FileStatus of the referenced file.
377    *
378    * @param fs {@link FileSystem} on which to get the file status
379    * @return InputStream for the hfile link.
380    * @throws IOException on unexpected error.
381    */
382   public FileStatus getFileStatus(FileSystem fs) throws IOException {
383     for (int i = 0; i < locations.length; ++i) {
384       try {
385         return fs.getFileStatus(locations[i]);
386       } catch (FileNotFoundException e) {
387         // Try another file location
388       }
389     }
390     throw new FileNotFoundException("Unable to open link: " + this);
391   }
392 
393   /**
394    * Open the FileLink for read.
395    * <p>
396    * It uses a wrapper of FSDataInputStream that is agnostic to the location
397    * of the file, even if the file switches between locations.
398    *
399    * @param fs {@link FileSystem} on which to open the FileLink
400    * @return InputStream for reading the file link.
401    * @throws IOException on unexpected error.
402    */
403   public FSDataInputStream open(final FileSystem fs) throws IOException {
404     return new FSDataInputStream(new FileLinkInputStream(fs, this));
405   }
406 
407   /**
408    * Open the FileLink for read.
409    * <p>
410    * It uses a wrapper of FSDataInputStream that is agnostic to the location
411    * of the file, even if the file switches between locations.
412    *
413    * @param fs {@link FileSystem} on which to open the FileLink
414    * @param bufferSize the size of the buffer to be used.
415    * @return InputStream for reading the file link.
416    * @throws IOException on unexpected error.
417    */
418   public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
419     return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
420   }
421 
422   /**
423    * NOTE: This method must be used only in the constructor!
424    * It creates a List with the specified locations for the link.
425    */
426   protected void setLocations(Path originPath, Path... alternativePaths) {
427     assert this.locations == null : "Link locations already set";
428 
429     List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
430     if (originPath != null) {
431       paths.add(originPath);
432     }
433 
434     for (int i = 0; i < alternativePaths.length; i++) {
435       if (alternativePaths[i] != null) {
436         paths.add(alternativePaths[i]);
437       }
438     }
439     this.locations = paths.toArray(new Path[0]);
440   }
441 
442   /**
443    * Get the directory to store the link back references
444    *
445    * <p>To simplify the reference count process, during the FileLink creation
446    * a back-reference is added to the back-reference directory of the specified file.
447    *
448    * @param storeDir Root directory for the link reference folder
449    * @param fileName File Name with links
450    * @return Path for the link back references.
451    */
452   public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
453     return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
454   }
455 
456   /**
457    * Get the referenced file name from the reference link directory path.
458    *
459    * @param dirPath Link references directory path
460    * @return Name of the file referenced
461    */
462   public static String getBackReferenceFileName(final Path dirPath) {
463     return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
464   }
465 
466   /**
467    * Checks if the specified directory path is a back reference links folder.
468    *
469    * @param dirPath Directory path to verify
470    * @return True if the specified directory is a link references folder
471    */
472   public static boolean isBackReferencesDir(final Path dirPath) {
473     if (dirPath == null) return false;
474     return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
475   }
476 
477   @Override
478   public boolean equals(Object obj) {
479     if (obj == null) {
480       return false;
481     }
482     // Assumes that the ordering of locations between objects are the same. This is true for the
483     // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
484     // or keep them presorted
485     if (this.getClass().equals(obj.getClass())) {
486       return Arrays.equals(this.locations, ((FileLink) obj).locations);
487     }
488 
489     return false;
490   }
491 
492   @Override
493   public int hashCode() {
494     return Arrays.hashCode(locations);
495   }
496 }
497