View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.FilterInputStream;
23  import java.io.IOException;
24  import java.lang.reflect.Field;
25  import java.lang.reflect.Method;
26  import java.util.NavigableMap;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
36  import org.apache.hadoop.io.SequenceFile;
37  import org.apache.hadoop.io.SequenceFile.Metadata;
38  import org.apache.hadoop.io.Text;
39  
40  @InterfaceAudience.Private
41  public class SequenceFileLogReader extends ReaderBase {
42    private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
43  
44    // Legacy stuff from pre-PB WAL metadata.
45    private static final Text WAL_VERSION_KEY = new Text("version");
46    // Let the version be 1.  Let absence of a version meta tag be old, version 0.
47    // Set this version '1' to be the version that introduces compression,
48    // the COMPRESSION_VERSION.
49    private static final int COMPRESSION_VERSION = 1;
50    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
51    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
52  
53    /**
54     * Hack just to set the correct file length up in SequenceFile.Reader.
55     * See HADOOP-6307.  The below is all about setting the right length on the
56     * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
57     * a private SequenceFile.Reader constructor.  This won't work.  Need to do
58     * the available on the stream.  The below is ugly.  It makes getPos, the
59     * first time its called, return length of the file -- i.e. tell a lie -- just
60     * so this line up in SF.Reader's constructor ends up with right answer:
61     *
62     *         this.end = in.getPos() + length;
63     *
64     */
65    private static class WALReader extends SequenceFile.Reader {
66  
67      WALReader(final FileSystem fs, final Path p, final Configuration c)
68      throws IOException {
69        super(fs, p, c);
70      }
71  
72      @Override
73      protected FSDataInputStream openFile(FileSystem fs, Path file,
74        int bufferSize, long length)
75      throws IOException {
76        return new WALReaderFSDataInputStream(super.openFile(fs, file,
77          bufferSize, length), length);
78      }
79  
80      /**
81       * Override just so can intercept first call to getPos.
82       */
83      static class WALReaderFSDataInputStream extends FSDataInputStream {
84        private boolean firstGetPosInvocation = true;
85        private long length;
86  
87        WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
88        throws IOException {
89          super(is);
90          this.length = l;
91        }
92  
93        // This section can be confusing.  It is specific to how HDFS works.
94        // Let me try to break it down.  This is the problem:
95        //
96        //  1. HDFS DataNodes update the NameNode about a filename's length
97        //     on block boundaries or when a file is closed. Therefore,
98        //     if an RS dies, then the NN's fs.getLength() can be out of date
99        //  2. this.in.available() would work, but it returns int &
100       //     therefore breaks for files > 2GB (happens on big clusters)
101       //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
102       //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
103       //
104       // So, here we adjust getPos() using getFileLength() so the
105       // SequenceFile.Reader constructor (aka: first invocation) comes out
106       // with the correct end of the file:
107       //         this.end = in.getPos() + length;
108       @Override
109       public long getPos() throws IOException {
110         if (this.firstGetPosInvocation) {
111           this.firstGetPosInvocation = false;
112           long adjust = 0;
113 
114           try {
115             Field fIn = FilterInputStream.class.getDeclaredField("in");
116             fIn.setAccessible(true);
117             Object realIn = fIn.get(this.in);
118             // In hadoop 0.22, DFSInputStream is a standalone class.  Before this,
119             // it was an inner class of DFSClient.
120             if (realIn.getClass().getName().endsWith("DFSInputStream")) {
121               Method getFileLength = realIn.getClass().
122                 getDeclaredMethod("getFileLength", new Class<?> []{});
123               getFileLength.setAccessible(true);
124               long realLength = ((Long)getFileLength.
125                 invoke(realIn, new Object []{})).longValue();
126               assert(realLength >= this.length);
127               adjust = realLength - this.length;
128             } else {
129               LOG.info("Input stream class: " + realIn.getClass().getName() +
130                   ", not adjusting length");
131             }
132           } catch(Exception e) {
133             SequenceFileLogReader.LOG.warn(
134               "Error while trying to get accurate file length.  " +
135               "Truncation / data loss may occur if RegionServers die.", e);
136           }
137 
138           return adjust + super.getPos();
139         }
140         return super.getPos();
141       }
142     }
143   }
144 
145   // Protected for tests.
146   protected SequenceFile.Reader reader;
147   long entryStart = 0; // needed for logging exceptions
148 
149   public SequenceFileLogReader() {
150     super();
151   }
152 
153   @Override
154   public void close() throws IOException {
155     try {
156       if (reader != null) {
157         this.reader.close();
158         this.reader = null;
159       }
160     } catch (IOException ioe) {
161       throw addFileInfoToException(ioe);
162     }
163   }
164 
165   @Override
166   public long getPosition() throws IOException {
167     return reader != null ? reader.getPosition() : 0;
168   }
169 
170   @Override
171   public void reset() throws IOException {
172     // Resetting the reader lets us see newly added data if the file is being written to
173     // We also keep the same compressionContext which was previously populated for this file
174     reader = new WALReader(fs, path, conf);
175   }
176 
177   @Override
178   protected void initReader(FSDataInputStream stream) throws IOException {
179     // We don't use the stream because we have to have the magic stream above.
180     if (stream != null) {
181       stream.close();
182     }
183     reset();
184   }
185   
186   @Override
187   protected void initAfterCompression() throws IOException {
188     // Nothing to do here
189   }
190 
191   @Override
192   protected boolean hasCompression() {
193     return isWALCompressionEnabled(reader.getMetadata());
194   }
195 
196   @Override
197   protected boolean hasTagCompression() {
198     // Tag compression not supported with old SequenceFileLog Reader/Writer
199     return false;
200   }
201 
202   /**
203    * Call this method after init() has been executed
204    * @return whether WAL compression is enabled
205    */
206   static boolean isWALCompressionEnabled(final Metadata metadata) {
207     // Check version is >= VERSION?
208     Text txt = metadata.get(WAL_VERSION_KEY);
209     if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
210       return false;
211     }
212     // Now check that compression type is present.  Currently only one value.
213     txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
214     return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
215   }
216 
217 
218   @Override
219   protected boolean readNext(Entry e) throws IOException {
220     try {
221       boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
222       if (!hasNext) return false;
223       // Scopes are probably in WAL edit, move to key
224       NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
225       if (scopes != null) {
226         e.getKey().readOlderScopes(scopes);
227       }
228       return true;
229     } catch (IOException ioe) {
230       throw addFileInfoToException(ioe);
231     }
232   }
233 
234   @Override
235   protected void seekOnFs(long pos) throws IOException {
236     try {
237       reader.seek(pos);
238     } catch (IOException ioe) {
239       throw addFileInfoToException(ioe);
240     }
241   }
242 
243   protected IOException addFileInfoToException(final IOException ioe)
244   throws IOException {
245     long pos = -1;
246     try {
247       pos = getPosition();
248     } catch (IOException e) {
249       LOG.warn("Failed getting position to add to throw", e);
250     }
251 
252     // See what SequenceFile.Reader thinks is the end of the file
253     long end = Long.MAX_VALUE;
254     try {
255       Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
256       fEnd.setAccessible(true);
257       end = fEnd.getLong(this.reader);
258     } catch(NoSuchFieldException nfe) {
259        /* reflection failure, keep going */
260     } catch(IllegalAccessException iae) {
261        /* reflection failure, keep going */
262     } catch(Exception e) {
263        /* All other cases. Should we handle it more aggressively? */
264        LOG.warn("Unexpected exception when accessing the end field", e);
265     }
266  
267     String msg = (this.path == null? "": this.path.toString()) +
268       ", entryStart=" + entryStart + ", pos=" + pos +
269       ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
270       ", edit=" + this.edit;
271 
272     // Enhance via reflection so we don't change the original class type
273     try {
274       return (IOException) ioe.getClass()
275         .getConstructor(String.class)
276         .newInstance(msg)
277         .initCause(ioe);
278     } catch(NoSuchMethodException nfe) {
279        /* reflection failure, keep going */
280     } catch(IllegalAccessException iae) {
281        /* reflection failure, keep going */
282     } catch(Exception e) {
283        /* All other cases. Should we handle it more aggressively? */
284        LOG.warn("Unexpected exception when accessing the end field", e);
285     }
286     return ioe;
287   }
288 }