View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.FileNotFoundException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.fs.PositionedReadable;
37  import org.apache.hadoop.fs.Seekable;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  
40  /**
41   * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
42   *
43   * <p><b>The Problem:</b>
44   * <ul>
45   *  <li>
46   *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
47   *    the same data blocks using different names.
48   *  </li>
49   *  <li>
50   *    HBase store files in one location (e.g. table/region/family/) and when the file is not
51   *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
52   *  </li>
53   * </ul>
54   * If we want to create a reference to a file, we need to remember that it can be in its
55   * original location or in the archive folder.
56   * The FileLink class tries to abstract this concept and given a set of locations
57   * it is able to switch between them making this operation transparent for the user.
58   * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
59   *
60   * <p><b>Back-references:</b>
61   * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
62   * the links to a particular file, during the {@code FileLink} creation, a new file is placed
63   * inside a back-reference directory. There's one back-reference directory for each file that
64   * has links, and in the directory there's one file per link.
65   *
66   * <p>HFileLink Example
67   * <ul>
68   *  <li>
69   *      /hbase/table/region-x/cf/file-k
70   *      (Original File)
71   *  </li>
72   *  <li>
73   *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
74   *     (HFileLink to the original file)
75   *  </li>
76   *  <li>
77   *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
78   *      (HFileLink to the original file)
79   *  </li>
80   *  <li>
81   *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
82   *      (Back-reference to the link in table-cloned)
83   *  </li>
84   *  <li>
85   *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
86   *      (Back-reference to the link in table-2nd-cloned)
87   *  </li>
88   * </ul>
89   */
90  @InterfaceAudience.Private
91  public class FileLink {
92    private static final Log LOG = LogFactory.getLog(FileLink.class);
93  
94    /** Define the Back-reference directory name prefix: .links-<hfile>/ */
95    public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
96  
97    /**
98     * FileLink InputStream that handles the switch between the original path
99     * and the alternative locations, when the file is moved.
100    */
101   private static class FileLinkInputStream extends InputStream
102       implements Seekable, PositionedReadable {
103     private FSDataInputStream in = null;
104     private Path currentPath = null;
105     private long pos = 0;
106 
107     private final FileLink fileLink;
108     private final int bufferSize;
109     private final FileSystem fs;
110 
111     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
112         throws IOException {
113       this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
114     }
115 
116     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
117         throws IOException {
118       this.bufferSize = bufferSize;
119       this.fileLink = fileLink;
120       this.fs = fs;
121 
122       this.in = tryOpen();
123     }
124 
125     @Override
126     public int read() throws IOException {
127       int res;
128       try {
129         res = in.read();
130       } catch (FileNotFoundException e) {
131         res = tryOpen().read();
132       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
133         res = tryOpen().read();
134       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
135         res = tryOpen().read();
136       }
137       if (res > 0) pos += 1;
138       return res;
139     }
140 
141     @Override
142     public int read(byte[] b) throws IOException {
143        return read(b, 0, b.length);
144     }
145 
146     @Override
147     public int read(byte[] b, int off, int len) throws IOException {
148       int n;
149       try {
150         n = in.read(b, off, len);
151       } catch (FileNotFoundException e) {
152         n = tryOpen().read(b, off, len);
153       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
154         n = tryOpen().read(b, off, len);
155       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
156         n = tryOpen().read(b, off, len);
157       }
158       if (n > 0) pos += n;
159       assert(in.getPos() == pos);
160       return n;
161     }
162 
163     @Override
164     public int read(long position, byte[] buffer, int offset, int length) throws IOException {
165       int n;
166       try {
167         n = in.read(position, buffer, offset, length);
168       } catch (FileNotFoundException e) {
169         n = tryOpen().read(position, buffer, offset, length);
170       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
171         n = tryOpen().read(position, buffer, offset, length);
172       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
173         n = tryOpen().read(position, buffer, offset, length);
174       }
175       return n;
176     }
177 
178     @Override
179     public void readFully(long position, byte[] buffer) throws IOException {
180       readFully(position, buffer, 0, buffer.length);
181     }
182 
183     @Override
184     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
185       try {
186         in.readFully(position, buffer, offset, length);
187       } catch (FileNotFoundException e) {
188         tryOpen().readFully(position, buffer, offset, length);
189       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
190         tryOpen().readFully(position, buffer, offset, length);
191       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
192         tryOpen().readFully(position, buffer, offset, length);
193       }
194     }
195 
196     @Override
197     public long skip(long n) throws IOException {
198       long skipped;
199 
200       try {
201         skipped = in.skip(n);
202       } catch (FileNotFoundException e) {
203         skipped = tryOpen().skip(n);
204       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
205         skipped = tryOpen().skip(n);
206       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
207         skipped = tryOpen().skip(n);
208       }
209 
210       if (skipped > 0) pos += skipped;
211       return skipped;
212     }
213 
214     @Override
215     public int available() throws IOException {
216       try {
217         return in.available();
218       } catch (FileNotFoundException e) {
219         return tryOpen().available();
220       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
221         return tryOpen().available();
222       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
223         return tryOpen().available();
224       }
225     }
226 
227     @Override
228     public void seek(long pos) throws IOException {
229       try {
230         in.seek(pos);
231       } catch (FileNotFoundException e) {
232         tryOpen().seek(pos);
233       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
234         tryOpen().seek(pos);
235       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
236         tryOpen().seek(pos);
237       }
238       this.pos = pos;
239     }
240 
241     @Override
242     public long getPos() throws IOException {
243       return pos;
244     }
245 
246     @Override
247     public boolean seekToNewSource(long targetPos) throws IOException {
248       boolean res;
249       try {
250         res = in.seekToNewSource(targetPos);
251       } catch (FileNotFoundException e) {
252         res = tryOpen().seekToNewSource(targetPos);
253       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
254         res = tryOpen().seekToNewSource(targetPos);
255       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
256         res = tryOpen().seekToNewSource(targetPos);
257       }
258       if (res) pos = targetPos;
259       return res;
260     }
261 
262     @Override
263     public void close() throws IOException {
264       in.close();
265     }
266 
267     @Override
268     public synchronized void mark(int readlimit) {
269     }
270 
271     @Override
272     public synchronized void reset() throws IOException {
273       throw new IOException("mark/reset not supported");
274     }
275 
276     @Override
277     public boolean markSupported() {
278       return false;
279     }
280 
281     /**
282      * Try to open the file from one of the available locations.
283      *
284      * @return FSDataInputStream stream of the opened file link
285      * @throws IOException on unexpected error, or file not found.
286      */
287     private FSDataInputStream tryOpen() throws IOException {
288       for (Path path: fileLink.getLocations()) {
289         if (path.equals(currentPath)) continue;
290         try {
291           in = fs.open(path, bufferSize);
292           if (pos != 0) in.seek(pos);
293           assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
294           if (LOG.isTraceEnabled()) {
295             if (currentPath == null) {
296               LOG.debug("link open path=" + path);
297             } else {
298               LOG.trace("link switch from path=" + currentPath + " to path=" + path);
299             }
300           }
301           currentPath = path;
302           return(in);
303         } catch (FileNotFoundException e) {
304           // Try another file location
305         }
306       }
307       throw new FileNotFoundException("Unable to open link: " + fileLink);
308     }
309   }
310 
311   private Path[] locations = null;
312 
313   protected FileLink() {
314     this.locations = null;
315   }
316 
317   /**
318    * @param originPath Original location of the file to link
319    * @param alternativePaths Alternative locations to look for the linked file
320    */
321   public FileLink(Path originPath, Path... alternativePaths) {
322     setLocations(originPath, alternativePaths);
323   }
324 
325   /**
326    * @param locations locations to look for the linked file
327    */
328   public FileLink(final Collection<Path> locations) {
329     this.locations = locations.toArray(new Path[locations.size()]);
330   }
331 
332   /**
333    * @return the locations to look for the linked file.
334    */
335   public Path[] getLocations() {
336     return locations;
337   }
338 
339   public String toString() {
340     StringBuilder str = new StringBuilder(getClass().getName());
341     str.append(" locations=[");
342     for (int i = 0; i < locations.length; ++i) {
343       if (i > 0) str.append(", ");
344       str.append(locations[i].toString());
345     }
346     str.append("]");
347     return str.toString();
348   }
349 
350   /**
351    * @return true if the file pointed by the link exists
352    */
353   public boolean exists(final FileSystem fs) throws IOException {
354     for (int i = 0; i < locations.length; ++i) {
355       if (fs.exists(locations[i])) {
356         return true;
357       }
358     }
359     return false;
360   }
361 
362   /**
363    * @return the path of the first available link.
364    */
365   public Path getAvailablePath(FileSystem fs) throws IOException {
366     for (int i = 0; i < locations.length; ++i) {
367       if (fs.exists(locations[i])) {
368         return locations[i];
369       }
370     }
371     throw new FileNotFoundException("Unable to open link: " + this);
372   }
373 
374   /**
375    * Get the FileStatus of the referenced file.
376    *
377    * @param fs {@link FileSystem} on which to get the file status
378    * @return InputStream for the hfile link.
379    * @throws IOException on unexpected error.
380    */
381   public FileStatus getFileStatus(FileSystem fs) throws IOException {
382     for (int i = 0; i < locations.length; ++i) {
383       try {
384         return fs.getFileStatus(locations[i]);
385       } catch (FileNotFoundException e) {
386         // Try another file location
387       }
388     }
389     throw new FileNotFoundException("Unable to open link: " + this);
390   }
391 
392   /**
393    * Open the FileLink for read.
394    * <p>
395    * It uses a wrapper of FSDataInputStream that is agnostic to the location
396    * of the file, even if the file switches between locations.
397    *
398    * @param fs {@link FileSystem} on which to open the FileLink
399    * @return InputStream for reading the file link.
400    * @throws IOException on unexpected error.
401    */
402   public FSDataInputStream open(final FileSystem fs) throws IOException {
403     return new FSDataInputStream(new FileLinkInputStream(fs, this));
404   }
405 
406   /**
407    * Open the FileLink for read.
408    * <p>
409    * It uses a wrapper of FSDataInputStream that is agnostic to the location
410    * of the file, even if the file switches between locations.
411    *
412    * @param fs {@link FileSystem} on which to open the FileLink
413    * @param bufferSize the size of the buffer to be used.
414    * @return InputStream for reading the file link.
415    * @throws IOException on unexpected error.
416    */
417   public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
418     return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
419   }
420 
421   /**
422    * NOTE: This method must be used only in the constructor!
423    * It creates a List with the specified locations for the link.
424    */
425   protected void setLocations(Path originPath, Path... alternativePaths) {
426     assert this.locations == null : "Link locations already set";
427 
428     List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
429     if (originPath != null) {
430       paths.add(originPath);
431     }
432 
433     for (int i = 0; i < alternativePaths.length; i++) {
434       if (alternativePaths[i] != null) {
435         paths.add(alternativePaths[i]);
436       }
437     }
438     this.locations = paths.toArray(new Path[0]);
439   }
440 
441   /**
442    * Get the directory to store the link back references
443    *
444    * <p>To simplify the reference count process, during the FileLink creation
445    * a back-reference is added to the back-reference directory of the specified file.
446    *
447    * @param storeDir Root directory for the link reference folder
448    * @param fileName File Name with links
449    * @return Path for the link back references.
450    */
451   public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
452     return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
453   }
454 
455   /**
456    * Get the referenced file name from the reference link directory path.
457    *
458    * @param dirPath Link references directory path
459    * @return Name of the file referenced
460    */
461   public static String getBackReferenceFileName(final Path dirPath) {
462     return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
463   }
464 
465   /**
466    * Checks if the specified directory path is a back reference links folder.
467    *
468    * @param dirPath Directory path to verify
469    * @return True if the specified directory is a link references folder
470    */
471   public static boolean isBackReferencesDir(final Path dirPath) {
472     if (dirPath == null) return false;
473     return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
474   }
475 }
476