View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.FilterInputStream;
23  import java.io.IOException;
24  import java.lang.reflect.Field;
25  import java.lang.reflect.Method;
26  import java.util.NavigableMap;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
36  import org.apache.hadoop.hbase.wal.WAL.Entry;
37  import org.apache.hadoop.io.SequenceFile;
38  import org.apache.hadoop.io.SequenceFile.Metadata;
39  import org.apache.hadoop.io.Text;
40  
41  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
42  public class SequenceFileLogReader extends ReaderBase {
43    private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
44  
45    // Legacy stuff from pre-PB WAL metadata.
46    private static final Text WAL_VERSION_KEY = new Text("version");
47    // Let the version be 1.  Let absence of a version meta tag be old, version 0.
48    // Set this version '1' to be the version that introduces compression,
49    // the COMPRESSION_VERSION.
50    private static final int COMPRESSION_VERSION = 1;
51    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
52    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
53  
54    /**
55     * Hack just to set the correct file length up in SequenceFile.Reader.
56     * See HADOOP-6307.  The below is all about setting the right length on the
57     * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
58     * a private SequenceFile.Reader constructor.  This won't work.  Need to do
59     * the available on the stream.  The below is ugly.  It makes getPos, the
60     * first time its called, return length of the file -- i.e. tell a lie -- just
61     * so this line up in SF.Reader's constructor ends up with right answer:
62     *
63     *         this.end = in.getPos() + length;
64     *
65     */
66    private static class WALReader extends SequenceFile.Reader {
67  
68      WALReader(final FileSystem fs, final Path p, final Configuration c)
69      throws IOException {
70        super(fs, p, c);
71      }
72  
73      @Override
74      protected FSDataInputStream openFile(FileSystem fs, Path file,
75        int bufferSize, long length)
76      throws IOException {
77        return new WALReaderFSDataInputStream(super.openFile(fs, file,
78          bufferSize, length), length);
79      }
80  
81      /**
82       * Override just so can intercept first call to getPos.
83       */
84      static class WALReaderFSDataInputStream extends FSDataInputStream {
85        private boolean firstGetPosInvocation = true;
86        private long length;
87  
88        WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
89        throws IOException {
90          super(is);
91          this.length = l;
92        }
93  
94        // This section can be confusing.  It is specific to how HDFS works.
95        // Let me try to break it down.  This is the problem:
96        //
97        //  1. HDFS DataNodes update the NameNode about a filename's length
98        //     on block boundaries or when a file is closed. Therefore,
99        //     if an RS dies, then the NN's fs.getLength() can be out of date
100       //  2. this.in.available() would work, but it returns int &
101       //     therefore breaks for files > 2GB (happens on big clusters)
102       //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
103       //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
104       //
105       // So, here we adjust getPos() using getFileLength() so the
106       // SequenceFile.Reader constructor (aka: first invocation) comes out
107       // with the correct end of the file:
108       //         this.end = in.getPos() + length;
109       @Override
110       public long getPos() throws IOException {
111         if (this.firstGetPosInvocation) {
112           this.firstGetPosInvocation = false;
113           long adjust = 0;
114 
115           try {
116             Field fIn = FilterInputStream.class.getDeclaredField("in");
117             fIn.setAccessible(true);
118             Object realIn = fIn.get(this.in);
119             // In hadoop 0.22, DFSInputStream is a standalone class.  Before this,
120             // it was an inner class of DFSClient.
121             if (realIn.getClass().getName().endsWith("DFSInputStream")) {
122               Method getFileLength = realIn.getClass().
123                 getDeclaredMethod("getFileLength", new Class<?> []{});
124               getFileLength.setAccessible(true);
125               long realLength = ((Long)getFileLength.
126                 invoke(realIn, new Object []{})).longValue();
127               assert(realLength >= this.length);
128               adjust = realLength - this.length;
129             } else {
130               LOG.info("Input stream class: " + realIn.getClass().getName() +
131                   ", not adjusting length");
132             }
133           } catch(Exception e) {
134             SequenceFileLogReader.LOG.warn(
135               "Error while trying to get accurate file length.  " +
136               "Truncation / data loss may occur if RegionServers die.", e);
137             throw new IOException(e);
138           }
139 
140           return adjust + super.getPos();
141         }
142         return super.getPos();
143       }
144     }
145   }
146 
147   // Protected for tests.
148   protected SequenceFile.Reader reader;
149   long entryStart = 0; // needed for logging exceptions
150 
151   public SequenceFileLogReader() {
152     super();
153   }
154 
155   @Override
156   public void close() throws IOException {
157     try {
158       if (reader != null) {
159         this.reader.close();
160         this.reader = null;
161       }
162     } catch (IOException ioe) {
163       throw addFileInfoToException(ioe);
164     }
165   }
166 
167   @Override
168   public long getPosition() throws IOException {
169     return reader != null ? reader.getPosition() : 0;
170   }
171 
172   @Override
173   public void reset() throws IOException {
174     // Resetting the reader lets us see newly added data if the file is being written to
175     // We also keep the same compressionContext which was previously populated for this file
176     reader = new WALReader(fs, path, conf);
177   }
178 
179   @Override
180   protected String initReader(FSDataInputStream stream) throws IOException {
181     // We don't use the stream because we have to have the magic stream above.
182     if (stream != null) {
183       stream.close();
184     }
185     reset();
186     return null;
187   }
188   
189   @Override
190   protected void initAfterCompression(String cellCodecClsName) throws IOException {
191     // Nothing to do here
192   }
193 
194   @Override
195   protected void initAfterCompression() throws IOException {
196     // Nothing to do here
197   }
198 
199   @Override
200   protected boolean hasCompression() {
201     return isWALCompressionEnabled(reader.getMetadata());
202   }
203 
204   @Override
205   protected boolean hasTagCompression() {
206     // Tag compression not supported with old SequenceFileLog Reader/Writer
207     return false;
208   }
209 
210   /**
211    * Call this method after init() has been executed
212    * @return whether WAL compression is enabled
213    */
214   static boolean isWALCompressionEnabled(final Metadata metadata) {
215     // Check version is >= VERSION?
216     Text txt = metadata.get(WAL_VERSION_KEY);
217     if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
218       return false;
219     }
220     // Now check that compression type is present.  Currently only one value.
221     txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
222     return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
223   }
224 
225 
226   /**
227    * fill in the passed entry with teh next key/value.
228    * Note that because this format deals with our legacy storage, the provided
229    * Entery MUST use an {@link HLogKey} for the key.
230    * @return boolean indicating if the contents of Entry have been filled in.
231    */
232   @Override
233   protected boolean readNext(Entry e) throws IOException {
234     try {
235       if (!(e.getKey() instanceof HLogKey)) {
236         final IllegalArgumentException exception = new IllegalArgumentException(
237             "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" +
238             " one had '" + e.getKey().getClass() + "'");
239         LOG.error("We need to use the legacy SequenceFileLogReader to handle a " +
240             " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." +
241             " This is a bug; please file an issue or email the developer mailing list. You will " +
242             "need the following exception details when seeking help from the HBase community.",
243             exception);
244         throw exception;
245       }
246       boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit());
247       if (!hasNext) return false;
248       // Scopes are probably in WAL edit, move to key
249       NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
250       if (scopes != null) {
251         e.getKey().readOlderScopes(scopes);
252       }
253       return true;
254     } catch (IOException ioe) {
255       throw addFileInfoToException(ioe);
256     }
257   }
258 
259   @Override
260   protected void seekOnFs(long pos) throws IOException {
261     try {
262       reader.seek(pos);
263     } catch (IOException ioe) {
264       throw addFileInfoToException(ioe);
265     }
266   }
267 
268   protected IOException addFileInfoToException(final IOException ioe)
269   throws IOException {
270     long pos = -1;
271     try {
272       pos = getPosition();
273     } catch (IOException e) {
274       LOG.warn("Failed getting position to add to throw", e);
275     }
276 
277     // See what SequenceFile.Reader thinks is the end of the file
278     long end = Long.MAX_VALUE;
279     try {
280       Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
281       fEnd.setAccessible(true);
282       end = fEnd.getLong(this.reader);
283     } catch(NoSuchFieldException nfe) {
284        /* reflection failure, keep going */
285     } catch(IllegalAccessException iae) {
286        /* reflection failure, keep going */
287     } catch(Exception e) {
288        /* All other cases. Should we handle it more aggressively? */
289        LOG.warn("Unexpected exception when accessing the end field", e);
290     }
291  
292     String msg = (this.path == null? "": this.path.toString()) +
293       ", entryStart=" + entryStart + ", pos=" + pos +
294       ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
295       ", edit=" + this.edit;
296 
297     // Enhance via reflection so we don't change the original class type
298     try {
299       return (IOException) ioe.getClass()
300         .getConstructor(String.class)
301         .newInstance(msg)
302         .initCause(ioe);
303     } catch(NoSuchMethodException nfe) {
304        /* reflection failure, keep going */
305     } catch(IllegalAccessException iae) {
306        /* reflection failure, keep going */
307     } catch(Exception e) {
308        /* All other cases. Should we handle it more aggressively? */
309        LOG.warn("Unexpected exception when accessing the end field", e);
310     }
311     return ioe;
312   }
313 }