View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.hfile;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.lang.annotation.Annotation;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configurable;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.io.compress.CodecPool;
32  import org.apache.hadoop.io.compress.CompressionCodec;
33  import org.apache.hadoop.io.compress.CompressionInputStream;
34  import org.apache.hadoop.io.compress.CompressionOutputStream;
35  import org.apache.hadoop.io.compress.Compressor;
36  import org.apache.hadoop.io.compress.Decompressor;
37  import org.apache.hadoop.io.compress.GzipCodec;
38  import org.apache.hadoop.io.compress.DefaultCodec;
39  import org.apache.hadoop.util.ReflectionUtils;
40  
41  /**
42   * Compression related stuff.
43   * Copied from hadoop-3315 tfile.
44   */
45  public final class Compression {
46    static final Log LOG = LogFactory.getLog(Compression.class);
47  
48    /**
49     * Prevent the instantiation of class.
50     */
51    private Compression() {
52      super();
53    }
54  
55    static class FinishOnFlushCompressionStream extends FilterOutputStream {
56      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
57        super(cout);
58      }
59  
60      @Override
61      public void write(byte b[], int off, int len) throws IOException {
62        out.write(b, off, len);
63      }
64  
65      @Override
66      public void flush() throws IOException {
67        CompressionOutputStream cout = (CompressionOutputStream) out;
68        cout.finish();
69        cout.flush();
70        cout.resetState();
71      }
72    }
73  
74    /**
75     * Returns the classloader to load the Codec class from.
76     * @return
77     */
78    private static ClassLoader getClassLoaderForCodec() {
79      ClassLoader cl = Thread.currentThread().getContextClassLoader();
80      if (cl == null) {
81        cl = Compression.class.getClassLoader();
82      }
83      if (cl == null) {
84        cl = ClassLoader.getSystemClassLoader();
85      }
86      if (cl == null) {
87        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
88      }
89      return cl;
90    }
91  
92    /**
93     * Compression algorithms. The ordinal of these cannot change or else you
94     * risk breaking all existing HFiles out there.  Even the ones that are
95     * not compressed! (They use the NONE algorithm)
96     */
97    public static enum Algorithm {
98      LZO("lzo") {
99        // Use base type to avoid compile-time dependencies.
100       private volatile transient CompressionCodec lzoCodec;
101       private transient Object lock = new Object();
102 
103       @Override
104       CompressionCodec getCodec(Configuration conf) {
105         if (lzoCodec == null) {
106           synchronized (lock) {
107             if (lzoCodec == null) {
108               lzoCodec = buildCodec(conf);
109             }
110           }
111         }
112         return lzoCodec;
113       }
114 
115       private CompressionCodec buildCodec(Configuration conf) {
116         try {
117           Class<?> externalCodec =
118               ClassLoader.getSystemClassLoader()
119                   .loadClass("com.hadoop.compression.lzo.LzoCodec");
120           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
121               new Configuration(conf));
122         } catch (ClassNotFoundException e) {
123           throw new RuntimeException(e);
124         }
125       }
126     },
127     GZ("gz") {
128       private volatile transient GzipCodec codec;
129       private transient Object lock = new Object();
130 
131       @Override
132       DefaultCodec getCodec(Configuration conf) {
133         if (codec == null) {
134           synchronized (lock) {
135             if (codec == null) {
136               codec = buildCodec(conf);
137             }
138           }
139         }
140 
141         return codec;
142       }
143 
144       private GzipCodec buildCodec(Configuration conf) {
145         GzipCodec gzcodec = new ReusableStreamGzipCodec();
146         gzcodec.setConf(new Configuration(conf));
147         return gzcodec;
148       }
149     },
150 
151     NONE("none") {
152       @Override
153       DefaultCodec getCodec(Configuration conf) {
154         return null;
155       }
156 
157       @Override
158       public synchronized InputStream createDecompressionStream(
159           InputStream downStream, Decompressor decompressor,
160           int downStreamBufferSize) throws IOException {
161         if (downStreamBufferSize > 0) {
162           return new BufferedInputStream(downStream, downStreamBufferSize);
163         }
164         // else {
165           // Make sure we bypass FSInputChecker buffer.
166         // return new BufferedInputStream(downStream, 1024);
167         // }
168         // }
169         return downStream;
170       }
171 
172       @Override
173       public synchronized OutputStream createCompressionStream(
174           OutputStream downStream, Compressor compressor,
175           int downStreamBufferSize) throws IOException {
176         if (downStreamBufferSize > 0) {
177           return new BufferedOutputStream(downStream, downStreamBufferSize);
178         }
179 
180         return downStream;
181       }
182     },
183     SNAPPY("snappy") {
184       // Use base type to avoid compile-time dependencies.
185       private volatile transient CompressionCodec snappyCodec;
186       private transient Object lock = new Object();
187 
188       @Override
189       CompressionCodec getCodec(Configuration conf) {
190         if (snappyCodec == null) {
191           synchronized (lock) {
192             if (snappyCodec == null) {
193               snappyCodec = buildCodec(conf);
194             }
195           }
196         }
197         return snappyCodec;
198       }
199 
200       private CompressionCodec buildCodec(Configuration conf) {
201         try {
202           Class<?> externalCodec =
203               ClassLoader.getSystemClassLoader()
204                   .loadClass("org.apache.hadoop.io.compress.SnappyCodec");
205           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
206               conf);
207         } catch (ClassNotFoundException e) {
208           throw new RuntimeException(e);
209         }
210       }
211     },
212     LZ4("lz4") {
213       // Use base type to avoid compile-time dependencies.
214       private volatile transient CompressionCodec lz4Codec;
215       private transient Object lock = new Object();
216 
217       @Override
218       CompressionCodec getCodec(Configuration conf) {
219         if (lz4Codec == null) {
220           synchronized (lock) {
221             if (lz4Codec == null) {
222               lz4Codec = buildCodec(conf);
223             }
224           }
225           buildCodec(conf);
226         }
227         return lz4Codec;
228       }
229 
230       private CompressionCodec buildCodec(Configuration conf) {
231         try {
232           Class<?> externalCodec =
233               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
234           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
235               conf);
236         } catch (ClassNotFoundException e) {
237           throw new RuntimeException(e);
238         }
239       }
240     };
241 
242     private final Configuration conf;
243     private final String compressName;
244   // data input buffer size to absorb small reads from application.
245     private static final int DATA_IBUF_SIZE = 1 * 1024;
246   // data output buffer size to absorb small writes from application.
247     private static final int DATA_OBUF_SIZE = 4 * 1024;
248 
249     Algorithm(String name) {
250       this.conf = new Configuration();
251       this.conf.setBoolean("hadoop.native.lib", true);
252       this.compressName = name;
253     }
254 
255     abstract CompressionCodec getCodec(Configuration conf);
256 
257     public InputStream createDecompressionStream(
258         InputStream downStream, Decompressor decompressor,
259         int downStreamBufferSize) throws IOException {
260       CompressionCodec codec = getCodec(conf);
261       // Set the internal buffer size to read from down stream.
262       if (downStreamBufferSize > 0) {
263         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
264             downStreamBufferSize);
265       }
266       CompressionInputStream cis =
267           codec.createInputStream(downStream, decompressor);
268       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
269       return bis2;
270 
271     }
272 
273     public OutputStream createCompressionStream(
274         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
275         throws IOException {
276       OutputStream bos1 = null;
277       if (downStreamBufferSize > 0) {
278         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
279       }
280       else {
281         bos1 = downStream;
282       }
283       CompressionOutputStream cos =
284           createPlainCompressionStream(bos1, compressor);
285       BufferedOutputStream bos2 =
286           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
287               DATA_OBUF_SIZE);
288       return bos2;
289     }
290 
291     /**
292      * Creates a compression stream without any additional wrapping into
293      * buffering streams.
294      */
295     CompressionOutputStream createPlainCompressionStream(
296         OutputStream downStream, Compressor compressor) throws IOException {
297       CompressionCodec codec = getCodec(conf);
298       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
299       return codec.createOutputStream(downStream, compressor);
300     }
301 
302     public Compressor getCompressor() {
303       CompressionCodec codec = getCodec(conf);
304       if (codec != null) {
305         Compressor compressor = CodecPool.getCompressor(codec);
306         if (compressor != null) {
307           if (compressor.finished()) {
308             // Somebody returns the compressor to CodecPool but is still using
309             // it.
310             LOG
311                 .warn("Compressor obtained from CodecPool is already finished()");
312             // throw new AssertionError(
313             // "Compressor obtained from CodecPool is already finished()");
314           }
315           compressor.reset();
316         }
317         return compressor;
318       }
319       return null;
320     }
321 
322     public void returnCompressor(Compressor compressor) {
323       if (compressor != null) {
324         CodecPool.returnCompressor(compressor);
325       }
326     }
327 
328     public Decompressor getDecompressor() {
329       CompressionCodec codec = getCodec(conf);
330       if (codec != null) {
331         Decompressor decompressor = CodecPool.getDecompressor(codec);
332         if (decompressor != null) {
333           if (decompressor.finished()) {
334             // Somebody returns the decompressor to CodecPool but is still using
335             // it.
336             LOG
337                 .warn("Deompressor obtained from CodecPool is already finished()");
338             // throw new AssertionError(
339             // "Decompressor obtained from CodecPool is already finished()");
340           }
341           decompressor.reset();
342         }
343         return decompressor;
344       }
345 
346       return null;
347     }
348 
349     public void returnDecompressor(Decompressor decompressor) {
350       if (decompressor != null) {
351         CodecPool.returnDecompressor(decompressor);
352         Annotation[] annotations = decompressor.getClass().getAnnotations();
353         if (annotations != null) {
354           for (Annotation annotation : annotations) {
355             String annoName = annotation.annotationType().getSimpleName();
356             if (annoName.equals("DoNotPool")) {
357               decompressor.end();              
358             }
359           }
360         }
361       }
362     }
363 
364     public String getName() {
365       return compressName;
366     }
367   }
368 
369   public static Algorithm getCompressionAlgorithmByName(String compressName) {
370     Algorithm[] algos = Algorithm.class.getEnumConstants();
371 
372     for (Algorithm a : algos) {
373       if (a.getName().equals(compressName)) {
374         return a;
375       }
376     }
377 
378     throw new IllegalArgumentException(
379         "Unsupported compression algorithm name: " + compressName);
380   }
381 
382   static String[] getSupportedAlgorithms() {
383     Algorithm[] algos = Algorithm.class.getEnumConstants();
384 
385     String[] ret = new String[algos.length];
386     int i = 0;
387     for (Algorithm a : algos) {
388       ret[i++] = a.getName();
389     }
390 
391     return ret;
392   }
393 }