View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.FilterInputStream;
24  import java.io.IOException;
25  import java.lang.reflect.Field;
26  import java.lang.reflect.Method;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataInputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.io.SequenceFile;
35  
36  public class SequenceFileLogReader implements HLog.Reader {
37    private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
38  
39    /**
40     * Hack just to set the correct file length up in SequenceFile.Reader.
41     * See HADOOP-6307.  The below is all about setting the right length on the
42     * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
43     * a private SequenceFile.Reader constructor.  This won't work.  Need to do
44     * the available on the stream.  The below is ugly.  It makes getPos, the
45     * first time its called, return length of the file -- i.e. tell a lie -- just
46     * so this line up in SF.Reader's constructor ends up with right answer:
47     *
48     *         this.end = in.getPos() + length;
49     *
50     */
51    static class WALReader extends SequenceFile.Reader {
52  
53      WALReader(final FileSystem fs, final Path p, final Configuration c)
54      throws IOException {
55        super(fs, p, c);
56      }
57  
58      @Override
59      protected FSDataInputStream openFile(FileSystem fs, Path file,
60        int bufferSize, long length)
61      throws IOException {
62        return new WALReaderFSDataInputStream(super.openFile(fs, file,
63          bufferSize, length), length);
64      }
65  
66      /**
67       * Call this method after init() has been executed
68       * 
69       * @return whether WAL compression is enabled
70       */
71      public boolean isWALCompressionEnabled() {
72        return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata());
73      }
74  
75      /**
76       * Override just so can intercept first call to getPos.
77       */
78      static class WALReaderFSDataInputStream extends FSDataInputStream {
79        private boolean firstGetPosInvocation = true;
80        private long length;
81  
82        WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
83        throws IOException {
84          super(is);
85          this.length = l;
86        }
87  
88        // This section can be confusing.  It is specific to how HDFS works.
89        // Let me try to break it down.  This is the problem:
90        //
91        //  1. HDFS DataNodes update the NameNode about a filename's length
92        //     on block boundaries or when a file is closed. Therefore,
93        //     if an RS dies, then the NN's fs.getLength() can be out of date
94        //  2. this.in.available() would work, but it returns int &
95        //     therefore breaks for files > 2GB (happens on big clusters)
96        //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
97        //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
98        //
99        // So, here we adjust getPos() using getFileLength() so the
100       // SequenceFile.Reader constructor (aka: first invocation) comes out
101       // with the correct end of the file:
102       //         this.end = in.getPos() + length;
103       @Override
104       public long getPos() throws IOException {
105         if (this.firstGetPosInvocation) {
106           this.firstGetPosInvocation = false;
107           long adjust = 0;
108 
109           try {
110             Field fIn = FilterInputStream.class.getDeclaredField("in");
111             fIn.setAccessible(true);
112             Object realIn = fIn.get(this.in);
113             // In hadoop 0.22, DFSInputStream is a standalone class.  Before this,
114             // it was an inner class of DFSClient.
115             if (realIn.getClass().getName().endsWith("DFSInputStream")) {
116               Method getFileLength = realIn.getClass().
117                 getDeclaredMethod("getFileLength", new Class<?> []{});
118               getFileLength.setAccessible(true);
119               long realLength = ((Long)getFileLength.
120                 invoke(realIn, new Object []{})).longValue();
121               assert(realLength >= this.length);
122               adjust = realLength - this.length;
123             } else {
124               LOG.info("Input stream class: " + realIn.getClass().getName() +
125                   ", not adjusting length");
126             }
127           } catch(Exception e) {
128             SequenceFileLogReader.LOG.warn(
129               "Error while trying to get accurate file length.  " +
130               "Truncation / data loss may occur if RegionServers die.", e);
131           }
132 
133           return adjust + super.getPos();
134         }
135         return super.getPos();
136       }
137     }
138   }
139 
140   Configuration conf;
141   WALReader reader;
142   FileSystem fs;
143 
144   // Needed logging exceptions
145   Path path;
146   int edit = 0;
147   long entryStart = 0;
148   boolean emptyCompressionContext = true;
149   /**
150    * Compression context to use reading.  Can be null if no compression.
151    */
152   protected CompressionContext compressionContext = null;
153 
154   protected Class<? extends HLogKey> keyClass;
155   private WALEditCodec codec;
156 
157   /**
158    * Default constructor.
159    */
160   public SequenceFileLogReader() {
161   }
162 
163   /**
164    * This constructor allows a specific HLogKey implementation to override that
165    * which would otherwise be chosen via configuration property.
166    *
167    * @param keyClass
168    */
169   public SequenceFileLogReader(Class<? extends HLogKey> keyClass) {
170     this.keyClass = keyClass;
171   }
172 
173   @Override
174   public void init(FileSystem fs, Path path, Configuration conf)
175       throws IOException {
176     this.conf = conf;
177     this.path = path;
178     reader = new WALReader(fs, path, conf);
179 
180     this.fs = fs;
181 
182     // If compression is enabled, new dictionaries are created here.
183     boolean compression = reader.isWALCompressionEnabled();
184     if (compression) {
185       try {
186         if (compressionContext == null) {
187           compressionContext = new CompressionContext(LRUDictionary.class);
188         } else {
189           compressionContext.clear();
190         }
191       } catch (Exception e) {
192         throw new IOException("Failed to initialize CompressionContext", e);
193       }
194     }
195 
196     // setup the codec
197     this.codec = WALEditCodec.create(conf, compressionContext);
198   }
199 
200   @Override
201   public void close() throws IOException {
202     try {
203       if (reader != null) {
204         this.reader.close();
205         this.reader = null;
206       }
207     } catch (IOException ioe) {
208       throw addFileInfoToException(ioe);
209     }
210   }
211 
212   @Override
213   public HLog.Entry next() throws IOException {
214     return next(null);
215   }
216 
217   @Override
218   public HLog.Entry next(HLog.Entry reuse) throws IOException {
219     this.entryStart = this.reader.getPosition();
220     HLog.Entry e = reuse;
221     if (e == null) {
222       HLogKey key;
223       if (keyClass == null) {
224         key = HLog.newKey(conf);
225       } else {
226         try {
227           key = keyClass.newInstance();
228         } catch (InstantiationException ie) {
229           throw new IOException(ie);
230         } catch (IllegalAccessException iae) {
231           throw new IOException(iae);
232         }
233       }
234 
235       WALEdit val = new WALEdit();
236       e = new HLog.Entry(key, val);
237     }
238 
239     boolean b = false;
240     try {
241       e.getEdit().setCodec(codec);
242       if (compressionContext != null) {
243         e.getKey().setCompressionContext(compressionContext);
244       }
245       b = this.reader.next(e.getKey(), e.getEdit());
246     } catch (IOException ioe) {
247       throw addFileInfoToException(ioe);
248     }
249     edit++;
250     if (compressionContext != null && emptyCompressionContext) {
251       emptyCompressionContext = false;
252     }
253     return b? e: null;
254   }
255 
256   @Override
257   public void seek(long pos) throws IOException {
258     if (compressionContext != null && emptyCompressionContext) {
259       while (next() != null) {
260         if (getPosition() == pos) {
261           emptyCompressionContext = false;
262           break;
263         }
264       }
265     }
266     try {
267       reader.seek(pos);
268     } catch (IOException ioe) {
269       throw addFileInfoToException(ioe);
270     }
271   }
272 
273   @Override
274   public long getPosition() throws IOException {
275     return reader != null ? reader.getPosition() : 0;
276   }
277 
278   protected IOException addFileInfoToException(final IOException ioe)
279   throws IOException {
280     long pos = -1;
281     try {
282       pos = getPosition();
283     } catch (IOException e) {
284       LOG.warn("Failed getting position to add to throw", e);
285     }
286 
287     // See what SequenceFile.Reader thinks is the end of the file
288     long end = Long.MAX_VALUE;
289     try {
290       Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
291       fEnd.setAccessible(true);
292       end = fEnd.getLong(this.reader);
293     } catch(Exception e) { /* reflection fail. keep going */ }
294 
295     String msg = (this.path == null? "": this.path.toString()) +
296       ", entryStart=" + entryStart + ", pos=" + pos +
297       ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
298       ", edit=" + this.edit;
299 
300     // Enhance via reflection so we don't change the original class type
301     try {
302       return (IOException) ioe.getClass()
303         .getConstructor(String.class)
304         .newInstance(msg)
305         .initCause(ioe);
306     } catch(Exception e) { /* reflection fail. keep going */ }
307 
308     return ioe;
309   }
310 
311   @Override
312   public void reset() throws IOException {
313     // Resetting the reader lets us see newly added data if the file is being written to
314     // We also keep the same compressionContext which was previously populated for this file
315     reader = new WALReader(fs, path, conf);
316   }
317 }