View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.IOUtils;
33  import org.apache.hadoop.io.compress.CodecPool;
34  import org.apache.hadoop.io.compress.CompressionCodec;
35  import org.apache.hadoop.io.compress.CompressionInputStream;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  import org.apache.hadoop.io.compress.Decompressor;
39  import org.apache.hadoop.io.compress.DefaultCodec;
40  import org.apache.hadoop.io.compress.DoNotPool;
41  import org.apache.hadoop.io.compress.GzipCodec;
42  import org.apache.hadoop.util.ReflectionUtils;
43  
44  /**
45   * Compression related stuff.
46   * Copied from hadoop-3315 tfile.
47   */
48  @InterfaceAudience.Private
49  public final class Compression {
50    static final Log LOG = LogFactory.getLog(Compression.class);
51  
52    /**
53     * Prevent the instantiation of class.
54     */
55    private Compression() {
56      super();
57    }
58  
59    static class FinishOnFlushCompressionStream extends FilterOutputStream {
60      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61        super(cout);
62      }
63  
64      @Override
65      public void write(byte b[], int off, int len) throws IOException {
66        out.write(b, off, len);
67      }
68  
69      @Override
70      public void flush() throws IOException {
71        CompressionOutputStream cout = (CompressionOutputStream) out;
72        cout.finish();
73        cout.flush();
74        cout.resetState();
75      }
76    }
77  
78    /**
79     * Returns the classloader to load the Codec class from.
80     */
81    private static ClassLoader getClassLoaderForCodec() {
82      ClassLoader cl = Thread.currentThread().getContextClassLoader();
83      if (cl == null) {
84        cl = Compression.class.getClassLoader();
85      }
86      if (cl == null) {
87        cl = ClassLoader.getSystemClassLoader();
88      }
89      if (cl == null) {
90        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91      }
92      return cl;
93    }
94  
95    /**
96     * Compression algorithms. The ordinal of these cannot change or else you
97     * risk breaking all existing HFiles out there.  Even the ones that are
98     * not compressed! (They use the NONE algorithm)
99     */
100   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101       value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102       justification="We are not serializing so doesn't apply (not sure why transient though)")
103   @InterfaceAudience.Public
104   @InterfaceStability.Evolving
105   public static enum Algorithm {
106     LZO("lzo") {
107       // Use base type to avoid compile-time dependencies.
108       private volatile transient CompressionCodec lzoCodec;
109       private transient Object lock = new Object();
110 
111       @Override
112       CompressionCodec getCodec(Configuration conf) {
113         if (lzoCodec == null) {
114           synchronized (lock) {
115             if (lzoCodec == null) {
116               lzoCodec = buildCodec(conf);
117             }
118           }
119         }
120         return lzoCodec;
121       }
122 
123       private CompressionCodec buildCodec(Configuration conf) {
124         try {
125           Class<?> externalCodec =
126               ClassLoader.getSystemClassLoader()
127                   .loadClass("com.hadoop.compression.lzo.LzoCodec");
128           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
129               new Configuration(conf));
130         } catch (ClassNotFoundException e) {
131           throw new RuntimeException(e);
132         }
133       }
134     },
135     GZ("gz") {
136       private volatile transient GzipCodec codec;
137       private transient Object lock = new Object();
138 
139       @Override
140       DefaultCodec getCodec(Configuration conf) {
141         if (codec == null) {
142           synchronized (lock) {
143             if (codec == null) {
144               codec = buildCodec(conf);
145             }
146           }
147         }
148 
149         return codec;
150       }
151 
152       private GzipCodec buildCodec(Configuration conf) {
153         GzipCodec gzcodec = new ReusableStreamGzipCodec();
154         gzcodec.setConf(new Configuration(conf));
155         return gzcodec;
156       }
157     },
158 
159     NONE("none") {
160       @Override
161       DefaultCodec getCodec(Configuration conf) {
162         return null;
163       }
164 
165       @Override
166       public synchronized InputStream createDecompressionStream(
167           InputStream downStream, Decompressor decompressor,
168           int downStreamBufferSize) throws IOException {
169         if (downStreamBufferSize > 0) {
170           return new BufferedInputStream(downStream, downStreamBufferSize);
171         }
172         // else {
173           // Make sure we bypass FSInputChecker buffer.
174         // return new BufferedInputStream(downStream, 1024);
175         // }
176         // }
177         return downStream;
178       }
179 
180       @Override
181       public synchronized OutputStream createCompressionStream(
182           OutputStream downStream, Compressor compressor,
183           int downStreamBufferSize) throws IOException {
184         if (downStreamBufferSize > 0) {
185           return new BufferedOutputStream(downStream, downStreamBufferSize);
186         }
187 
188         return downStream;
189       }
190     },
191     SNAPPY("snappy") {
192       // Use base type to avoid compile-time dependencies.
193       private volatile transient CompressionCodec snappyCodec;
194       private transient Object lock = new Object();
195 
196       @Override
197       CompressionCodec getCodec(Configuration conf) {
198         if (snappyCodec == null) {
199           synchronized (lock) {
200             if (snappyCodec == null) {
201               snappyCodec = buildCodec(conf);
202             }
203           }
204         }
205         return snappyCodec;
206       }
207 
208       private CompressionCodec buildCodec(Configuration conf) {
209         try {
210           Class<?> externalCodec =
211               ClassLoader.getSystemClassLoader()
212                   .loadClass("org.apache.hadoop.io.compress.SnappyCodec");
213           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
214               conf);
215         } catch (ClassNotFoundException e) {
216           throw new RuntimeException(e);
217         }
218       }
219     },
220     LZ4("lz4") {
221       // Use base type to avoid compile-time dependencies.
222       private volatile transient CompressionCodec lz4Codec;
223       private transient Object lock = new Object();
224 
225       @Override
226       CompressionCodec getCodec(Configuration conf) {
227         if (lz4Codec == null) {
228           synchronized (lock) {
229             if (lz4Codec == null) {
230               lz4Codec = buildCodec(conf);
231             }
232           }
233         }
234         return lz4Codec;
235       }
236 
237       private CompressionCodec buildCodec(Configuration conf) {
238         try {
239           Class<?> externalCodec =
240               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
241           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
242               conf);
243         } catch (ClassNotFoundException e) {
244           throw new RuntimeException(e);
245         }
246       }
247   };
248 
249     private final Configuration conf;
250     private final String compressName;
251   // data input buffer size to absorb small reads from application.
252     private static final int DATA_IBUF_SIZE = 1 * 1024;
253   // data output buffer size to absorb small writes from application.
254     private static final int DATA_OBUF_SIZE = 4 * 1024;
255 
256     Algorithm(String name) {
257       this.conf = new Configuration();
258       this.conf.setBoolean("hadoop.native.lib", true);
259       this.compressName = name;
260     }
261 
262     abstract CompressionCodec getCodec(Configuration conf);
263 
264     public InputStream createDecompressionStream(
265         InputStream downStream, Decompressor decompressor,
266         int downStreamBufferSize) throws IOException {
267       CompressionCodec codec = getCodec(conf);
268       // Set the internal buffer size to read from down stream.
269       if (downStreamBufferSize > 0) {
270         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
271             downStreamBufferSize);
272       }
273       CompressionInputStream cis =
274           codec.createInputStream(downStream, decompressor);
275       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
276       return bis2;
277 
278     }
279 
280     public OutputStream createCompressionStream(
281         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
282         throws IOException {
283       OutputStream bos1 = null;
284       if (downStreamBufferSize > 0) {
285         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
286       }
287       else {
288         bos1 = downStream;
289       }
290       CompressionOutputStream cos =
291           createPlainCompressionStream(bos1, compressor);
292       BufferedOutputStream bos2 =
293           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
294               DATA_OBUF_SIZE);
295       return bos2;
296     }
297 
298     /**
299      * Creates a compression stream without any additional wrapping into
300      * buffering streams.
301      */
302     public CompressionOutputStream createPlainCompressionStream(
303         OutputStream downStream, Compressor compressor) throws IOException {
304       CompressionCodec codec = getCodec(conf);
305       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
306       return codec.createOutputStream(downStream, compressor);
307     }
308 
309     public Compressor getCompressor() {
310       CompressionCodec codec = getCodec(conf);
311       if (codec != null) {
312         Compressor compressor = CodecPool.getCompressor(codec);
313         if (compressor != null) {
314           if (compressor.finished()) {
315             // Somebody returns the compressor to CodecPool but is still using
316             // it.
317             LOG
318                 .warn("Compressor obtained from CodecPool is already finished()");
319             // throw new AssertionError(
320             // "Compressor obtained from CodecPool is already finished()");
321           }
322           compressor.reset();
323         }
324         return compressor;
325       }
326       return null;
327     }
328 
329     public void returnCompressor(Compressor compressor) {
330       if (compressor != null) {
331         CodecPool.returnCompressor(compressor);
332       }
333     }
334 
335     public Decompressor getDecompressor() {
336       CompressionCodec codec = getCodec(conf);
337       if (codec != null) {
338         Decompressor decompressor = CodecPool.getDecompressor(codec);
339         if (decompressor != null) {
340           if (decompressor.finished()) {
341             // Somebody returns the decompressor to CodecPool but is still using
342             // it.
343             LOG
344                 .warn("Deompressor obtained from CodecPool is already finished()");
345             // throw new AssertionError(
346             // "Decompressor obtained from CodecPool is already finished()");
347           }
348           decompressor.reset();
349         }
350         return decompressor;
351       }
352 
353       return null;
354     }
355 
356     public void returnDecompressor(Decompressor decompressor) {
357       if (decompressor != null) {
358         CodecPool.returnDecompressor(decompressor);
359         if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
360           decompressor.end();
361         }
362       }
363     }
364 
365     public String getName() {
366       return compressName;
367     }
368   }
369 
370   public static Algorithm getCompressionAlgorithmByName(String compressName) {
371     Algorithm[] algos = Algorithm.class.getEnumConstants();
372 
373     for (Algorithm a : algos) {
374       if (a.getName().equals(compressName)) {
375         return a;
376       }
377     }
378 
379     throw new IllegalArgumentException(
380         "Unsupported compression algorithm name: " + compressName);
381   }
382 
383   /**
384    * Get names of supported compression algorithms.
385    *
386    * @return Array of strings, each represents a supported compression
387    * algorithm. Currently, the following compression algorithms are supported.
388    */
389   public static String[] getSupportedAlgorithms() {
390     Algorithm[] algos = Algorithm.class.getEnumConstants();
391 
392     String[] ret = new String[algos.length];
393     int i = 0;
394     for (Algorithm a : algos) {
395       ret[i++] = a.getName();
396     }
397 
398     return ret;
399   }
400 
401   /**
402    * Decompresses data from the given stream using the configured compression
403    * algorithm. It will throw an exception if the dest buffer does not have
404    * enough space to hold the decompressed data.
405    *
406    * @param dest
407    *          the output bytes buffer
408    * @param destOffset
409    *          start writing position of the output buffer
410    * @param bufferedBoundedStream
411    *          a stream to read compressed data from, bounded to the exact amount
412    *          of compressed data
413    * @param compressedSize
414    *          compressed data size, header not included
415    * @param uncompressedSize
416    *          uncompressed data size, header not included
417    * @param compressAlgo
418    *          compression algorithm used
419    * @throws IOException
420    */
421   public static void decompress(byte[] dest, int destOffset,
422       InputStream bufferedBoundedStream, int compressedSize,
423       int uncompressedSize, Compression.Algorithm compressAlgo)
424       throws IOException {
425 
426     if (dest.length - destOffset < uncompressedSize) {
427       throw new IllegalArgumentException(
428           "Output buffer does not have enough space to hold "
429               + uncompressedSize + " decompressed bytes, available: "
430               + (dest.length - destOffset));
431     }
432 
433     Decompressor decompressor = null;
434     try {
435       decompressor = compressAlgo.getDecompressor();
436       InputStream is = compressAlgo.createDecompressionStream(
437           bufferedBoundedStream, decompressor, 0);
438 
439       IOUtils.readFully(is, dest, destOffset, uncompressedSize);
440       is.close();
441     } finally {
442       if (decompressor != null) {
443         compressAlgo.returnDecompressor(decompressor);
444       }
445     }
446   }
447 
448 }