View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.FilterInputStream;
23  import java.io.IOException;
24  import java.lang.reflect.Field;
25  import java.lang.reflect.Method;
26  import java.util.NavigableMap;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
36  import org.apache.hadoop.io.SequenceFile;
37  import org.apache.hadoop.io.SequenceFile.Metadata;
38  import org.apache.hadoop.io.Text;
39  
40  @InterfaceAudience.Private
41  public class SequenceFileLogReader extends ReaderBase {
42    private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
43  
44    // Legacy stuff from pre-PB WAL metadata.
45    private static final Text WAL_VERSION_KEY = new Text("version");
46    // Let the version be 1.  Let absence of a version meta tag be old, version 0.
47    // Set this version '1' to be the version that introduces compression,
48    // the COMPRESSION_VERSION.
49    private static final int COMPRESSION_VERSION = 1;
50    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
51    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
52  
53    /**
54     * Hack just to set the correct file length up in SequenceFile.Reader.
55     * See HADOOP-6307.  The below is all about setting the right length on the
56     * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
57     * a private SequenceFile.Reader constructor.  This won't work.  Need to do
58     * the available on the stream.  The below is ugly.  It makes getPos, the
59     * first time its called, return length of the file -- i.e. tell a lie -- just
60     * so this line up in SF.Reader's constructor ends up with right answer:
61     *
62     *         this.end = in.getPos() + length;
63     *
64     */
65    private static class WALReader extends SequenceFile.Reader {
66  
67      WALReader(final FileSystem fs, final Path p, final Configuration c)
68      throws IOException {
69        super(fs, p, c);
70      }
71  
72      @Override
73      protected FSDataInputStream openFile(FileSystem fs, Path file,
74        int bufferSize, long length)
75      throws IOException {
76        return new WALReaderFSDataInputStream(super.openFile(fs, file,
77          bufferSize, length), length);
78      }
79  
80      /**
81       * Override just so can intercept first call to getPos.
82       */
83      static class WALReaderFSDataInputStream extends FSDataInputStream {
84        private boolean firstGetPosInvocation = true;
85        private long length;
86  
87        WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
88        throws IOException {
89          super(is);
90          this.length = l;
91        }
92  
93        // This section can be confusing.  It is specific to how HDFS works.
94        // Let me try to break it down.  This is the problem:
95        //
96        //  1. HDFS DataNodes update the NameNode about a filename's length
97        //     on block boundaries or when a file is closed. Therefore,
98        //     if an RS dies, then the NN's fs.getLength() can be out of date
99        //  2. this.in.available() would work, but it returns int &
100       //     therefore breaks for files > 2GB (happens on big clusters)
101       //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
102       //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
103       //
104       // So, here we adjust getPos() using getFileLength() so the
105       // SequenceFile.Reader constructor (aka: first invocation) comes out
106       // with the correct end of the file:
107       //         this.end = in.getPos() + length;
108       @Override
109       public long getPos() throws IOException {
110         if (this.firstGetPosInvocation) {
111           this.firstGetPosInvocation = false;
112           long adjust = 0;
113 
114           try {
115             Field fIn = FilterInputStream.class.getDeclaredField("in");
116             fIn.setAccessible(true);
117             Object realIn = fIn.get(this.in);
118             // In hadoop 0.22, DFSInputStream is a standalone class.  Before this,
119             // it was an inner class of DFSClient.
120             if (realIn.getClass().getName().endsWith("DFSInputStream")) {
121               Method getFileLength = realIn.getClass().
122                 getDeclaredMethod("getFileLength", new Class<?> []{});
123               getFileLength.setAccessible(true);
124               long realLength = ((Long)getFileLength.
125                 invoke(realIn, new Object []{})).longValue();
126               assert(realLength >= this.length);
127               adjust = realLength - this.length;
128             } else {
129               LOG.info("Input stream class: " + realIn.getClass().getName() +
130                   ", not adjusting length");
131             }
132           } catch(Exception e) {
133             SequenceFileLogReader.LOG.warn(
134               "Error while trying to get accurate file length.  " +
135               "Truncation / data loss may occur if RegionServers die.", e);
136           }
137 
138           return adjust + super.getPos();
139         }
140         return super.getPos();
141       }
142     }
143   }
144 
145   // Protected for tests.
146   protected SequenceFile.Reader reader;
147   long entryStart = 0; // needed for logging exceptions
148 
149   public SequenceFileLogReader() {
150     super();
151   }
152 
153   @Override
154   public void close() throws IOException {
155     try {
156       if (reader != null) {
157         this.reader.close();
158         this.reader = null;
159       }
160     } catch (IOException ioe) {
161       throw addFileInfoToException(ioe);
162     }
163   }
164 
165   @Override
166   public long getPosition() throws IOException {
167     return reader != null ? reader.getPosition() : 0;
168   }
169 
170   @Override
171   public void reset() throws IOException {
172     // Resetting the reader lets us see newly added data if the file is being written to
173     // We also keep the same compressionContext which was previously populated for this file
174     reader = new WALReader(fs, path, conf);
175   }
176 
177   @Override
178   protected String initReader(FSDataInputStream stream) throws IOException {
179     // We don't use the stream because we have to have the magic stream above.
180     if (stream != null) {
181       stream.close();
182     }
183     reset();
184     return null;
185   }
186   
187   @Override
188   protected void initAfterCompression(String cellCodecClsName) throws IOException {
189     // Nothing to do here
190   }
191 
192   @Override
193   protected boolean hasCompression() {
194     return isWALCompressionEnabled(reader.getMetadata());
195   }
196 
197   @Override
198   protected boolean hasTagCompression() {
199     // Tag compression not supported with old SequenceFileLog Reader/Writer
200     return false;
201   }
202 
203   /**
204    * Call this method after init() has been executed
205    * @return whether WAL compression is enabled
206    */
207   static boolean isWALCompressionEnabled(final Metadata metadata) {
208     // Check version is >= VERSION?
209     Text txt = metadata.get(WAL_VERSION_KEY);
210     if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
211       return false;
212     }
213     // Now check that compression type is present.  Currently only one value.
214     txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
215     return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
216   }
217 
218 
219   @Override
220   protected boolean readNext(Entry e) throws IOException {
221     try {
222       boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
223       if (!hasNext) return false;
224       // Scopes are probably in WAL edit, move to key
225       NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
226       if (scopes != null) {
227         e.getKey().readOlderScopes(scopes);
228       }
229       return true;
230     } catch (IOException ioe) {
231       throw addFileInfoToException(ioe);
232     }
233   }
234 
235   @Override
236   protected void seekOnFs(long pos) throws IOException {
237     try {
238       reader.seek(pos);
239     } catch (IOException ioe) {
240       throw addFileInfoToException(ioe);
241     }
242   }
243 
244   protected IOException addFileInfoToException(final IOException ioe)
245   throws IOException {
246     long pos = -1;
247     try {
248       pos = getPosition();
249     } catch (IOException e) {
250       LOG.warn("Failed getting position to add to throw", e);
251     }
252 
253     // See what SequenceFile.Reader thinks is the end of the file
254     long end = Long.MAX_VALUE;
255     try {
256       Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
257       fEnd.setAccessible(true);
258       end = fEnd.getLong(this.reader);
259     } catch(NoSuchFieldException nfe) {
260        /* reflection failure, keep going */
261     } catch(IllegalAccessException iae) {
262        /* reflection failure, keep going */
263     } catch(Exception e) {
264        /* All other cases. Should we handle it more aggressively? */
265        LOG.warn("Unexpected exception when accessing the end field", e);
266     }
267  
268     String msg = (this.path == null? "": this.path.toString()) +
269       ", entryStart=" + entryStart + ", pos=" + pos +
270       ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
271       ", edit=" + this.edit;
272 
273     // Enhance via reflection so we don't change the original class type
274     try {
275       return (IOException) ioe.getClass()
276         .getConstructor(String.class)
277         .newInstance(msg)
278         .initCause(ioe);
279     } catch(NoSuchMethodException nfe) {
280        /* reflection failure, keep going */
281     } catch(IllegalAccessException iae) {
282        /* reflection failure, keep going */
283     } catch(Exception e) {
284        /* All other cases. Should we handle it more aggressively? */
285        LOG.warn("Unexpected exception when accessing the end field", e);
286     }
287     return ioe;
288   }
289 }