View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.lang.reflect.Field;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.util.TreeMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FSDataOutputStream;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.util.FSUtils;
38  import org.apache.hadoop.io.SequenceFile;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.io.SequenceFile.CompressionType;
41  import org.apache.hadoop.io.SequenceFile.Metadata;
42  import org.apache.hadoop.io.compress.CompressionCodec;
43  import org.apache.hadoop.io.compress.DefaultCodec;
44  
45  /**
46   * Implementation of {@link HLog.Writer} that delegates to
47   * SequenceFile.Writer.
48   */
49  public class SequenceFileLogWriter implements HLog.Writer {
50    static final Text WAL_VERSION_KEY = new Text("version");
51    // Let the version be 1.  Let absence of a version meta tag be old, version 0.
52    // Set this version '1' to be the version that introduces compression,
53    // the COMPRESSION_VERSION.
54    private static final int COMPRESSION_VERSION = 1;
55    static final int VERSION = COMPRESSION_VERSION;
56    static final Text WAL_VERSION = new Text("" + VERSION);
57    static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
58    static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
59  
60    private final Log LOG = LogFactory.getLog(this.getClass());
61    // The sequence file we delegate to.
62    private SequenceFile.Writer writer;
63    // This is the FSDataOutputStream instance that is the 'out' instance
64    // in the SequenceFile.Writer 'writer' instance above.
65    private FSDataOutputStream writer_out;
66  
67    private Class<? extends HLogKey> keyClass;
68  
69    /**
70     * Context used by our wal dictionary compressor.  Null if we're not to do
71     * our custom dictionary compression.  This custom WAL compression is distinct
72     * from sequencefile native compression.
73     */
74    private CompressionContext compressionContext;
75  
76    private Method syncFs = null;
77    private Method hflush = null;
78    private WALEditCodec codec;
79  
80    /**
81     * Default constructor.
82     */
83    public SequenceFileLogWriter() {
84      super();
85    }
86  
87    /**
88     * This constructor allows a specific HLogKey implementation to override that
89     * which would otherwise be chosen via configuration property.
90     * 
91     * @param keyClass
92     */
93    public SequenceFileLogWriter(Class<? extends HLogKey> keyClass) {
94      this.keyClass = keyClass;
95    }
96  
97    /**
98     * Create sequence file Metadata for our WAL file with version and compression
99     * type (if any).
100    * @param conf
101    * @param compress
102    * @return Metadata instance.
103    */
104   private static Metadata createMetadata(final Configuration conf,
105       final boolean compress) {
106     TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
107     metaMap.put(WAL_VERSION_KEY, WAL_VERSION);
108     if (compress) {
109       // Currently we only do one compression type.
110       metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
111     }
112     return new Metadata(metaMap);
113   }
114 
115   /**
116    * Call this method after init() has been executed
117    * 
118    * @return whether WAL compression is enabled
119    */
120   static boolean isWALCompressionEnabled(final Metadata metadata) {
121     // Check version is >= VERSION?
122     Text txt = metadata.get(WAL_VERSION_KEY);
123     if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
124       return false;
125     }
126     // Now check that compression type is present.  Currently only one value.
127     txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
128     return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
129   }
130 
131   @Override
132   public void init(FileSystem fs, Path path, Configuration conf)
133   throws IOException {
134     // Should we do our custom WAL compression?
135     boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
136     if (compress) {
137       try {
138         if (this.compressionContext == null) {
139           this.compressionContext = new CompressionContext(LRUDictionary.class);
140         } else {
141           this.compressionContext.clear();
142         }
143       } catch (Exception e) {
144         throw new IOException("Failed to initiate CompressionContext", e);
145       }
146     }
147 
148     if (null == keyClass) {
149       keyClass = HLog.getKeyClass(conf);
150     }
151 
152     // Create a SF.Writer instance.
153     try {
154       // reflection for a version of SequenceFile.createWriter that doesn't
155       // automatically create the parent directory (see HBASE-2312)
156       this.writer = (SequenceFile.Writer) SequenceFile.class
157         .getMethod("createWriter", new Class[] {FileSystem.class,
158             Configuration.class, Path.class, Class.class, Class.class,
159             Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
160             CompressionType.class, CompressionCodec.class, Metadata.class})
161         .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
162             WALEdit.class,
163             Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
164             Short.valueOf((short)
165               conf.getInt("hbase.regionserver.hlog.replication",
166               FSUtils.getDefaultReplication(fs, path))),
167             Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
168                 FSUtils.getDefaultBlockSize(fs, path))),
169             Boolean.valueOf(false) /*createParent*/,
170             SequenceFile.CompressionType.NONE, new DefaultCodec(),
171             createMetadata(conf, compress)
172             });
173     } catch (InvocationTargetException ite) {
174       // function was properly called, but threw it's own exception
175       throw new IOException(ite.getCause());
176     } catch (Exception e) {
177       // ignore all other exceptions. related to reflection failure
178     }
179 
180     // if reflection failed, use the old createWriter
181     if (this.writer == null) {
182       LOG.debug("new createWriter -- HADOOP-6840 -- not available");
183       this.writer = SequenceFile.createWriter(fs, conf, path,
184         HLog.getKeyClass(conf), WALEdit.class,
185         fs.getConf().getInt("io.file.buffer.size", 4096),
186         (short) conf.getInt("hbase.regionserver.hlog.replication",
187           FSUtils.getDefaultReplication(fs, path)),
188         conf.getLong("hbase.regionserver.hlog.blocksize",
189           FSUtils.getDefaultBlockSize(fs, path)),
190         SequenceFile.CompressionType.NONE,
191         new DefaultCodec(),
192         null,
193         createMetadata(conf, compress));
194     } else {
195       LOG.debug("using new createWriter -- HADOOP-6840");
196     }
197 
198     // setup the WALEditCodec
199     this.codec = WALEditCodec.create(conf, compressionContext);
200     this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
201     this.syncFs = getSyncFs();
202     this.hflush = getHFlush();
203     String msg = "Path=" + path +
204       ", syncFs=" + (this.syncFs != null) +
205       ", hflush=" + (this.hflush != null) +
206       ", compression=" + compress;
207     if (this.syncFs != null || this.hflush != null) {
208       LOG.debug(msg);
209     } else {
210       LOG.warn("No sync support! " + msg);
211     }
212   }
213 
214   /**
215    * Now do dirty work to see if syncFs is available on the backing this.writer.
216    * It will be available in branch-0.20-append and in CDH3.
217    * @return The syncFs method or null if not available.
218    * @throws IOException
219    */
220   private Method getSyncFs()
221   throws IOException {
222     Method m = null;
223     try {
224       // function pointer to writer.syncFs() method; present when sync is hdfs-200.
225       m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
226     } catch (SecurityException e) {
227       throw new IOException("Failed test for syncfs", e);
228     } catch (NoSuchMethodException e) {
229       // Not available
230     }
231     return m;
232   }
233 
234   /**
235    * See if hflush (0.21 and 0.22 hadoop) is available.
236    * @return The hflush method or null if not available.
237    * @throws IOException
238    */
239   private Method getHFlush()
240   throws IOException {
241     Method m = null;
242     try {
243       Class<? extends OutputStream> c = getWriterFSDataOutputStream().getClass();
244       m = c.getMethod("hflush", new Class<?> []{});
245     } catch (SecurityException e) {
246       throw new IOException("Failed test for hflush", e);
247     } catch (NoSuchMethodException e) {
248       // Ignore
249     }
250     return m;
251   }
252 
253   // Get at the private FSDataOutputStream inside in SequenceFile so we can
254   // call sync on it.  Make it accessible.
255   private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
256   throws IOException {
257     FSDataOutputStream out = null;
258     final Field fields [] = this.writer.getClass().getDeclaredFields();
259     final String fieldName = "out";
260     for (int i = 0; i < fields.length; ++i) {
261       if (fieldName.equals(fields[i].getName())) {
262         try {
263           // Make the 'out' field up in SF.Writer accessible.
264           fields[i].setAccessible(true);
265           out = (FSDataOutputStream)fields[i].get(this.writer);
266           break;
267         } catch (IllegalAccessException ex) {
268           throw new IOException("Accessing " + fieldName, ex);
269         } catch (SecurityException e) {
270           // TODO Auto-generated catch block
271           e.printStackTrace();
272         }
273       }
274     }
275     return out;
276   }
277 
278   @Override
279   public void append(HLog.Entry entry) throws IOException {
280     entry.getEdit().setCodec(this.codec);
281     entry.getKey().setCompressionContext(compressionContext);
282 
283     try {
284       this.writer.append(entry.getKey(), entry.getEdit());
285     } catch (NullPointerException npe) {
286       // Concurrent close...
287       throw new IOException(npe);
288     }
289   }
290 
291   @Override
292   public void close() throws IOException {
293     if (this.writer != null) {
294       try {
295         this.writer.close();
296       } catch (NullPointerException npe) {
297         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
298         LOG.warn(npe);
299       }
300       this.writer = null;
301     }
302   }
303 
304   @Override
305   public void sync() throws IOException {
306     if (this.syncFs != null) {
307       try {
308        this.syncFs.invoke(this.writer, HLog.NO_ARGS);
309       } catch (Exception e) {
310         throw new IOException("Reflection", e);
311       }
312     } else if (this.hflush != null) {
313       try {
314         this.hflush.invoke(getWriterFSDataOutputStream(), HLog.NO_ARGS);
315       } catch (Exception e) {
316         throw new IOException("Reflection", e);
317       }
318     }
319   }
320 
321   @Override
322   public long getLength() throws IOException {
323     try {
324       return this.writer.getLength();
325     } catch (NullPointerException npe) {
326       // Concurrent close...
327       throw new IOException(npe);
328     }
329   }
330 
331   /**
332    * @return The dfsclient out stream up inside SF.Writer made accessible, or
333    * null if not available.
334    */
335   public FSDataOutputStream getWriterFSDataOutputStream() {
336     return this.writer_out;
337   }
338 }