View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.IOUtils;
33  import org.apache.hadoop.io.compress.CodecPool;
34  import org.apache.hadoop.io.compress.CompressionCodec;
35  import org.apache.hadoop.io.compress.CompressionInputStream;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  import org.apache.hadoop.io.compress.Decompressor;
39  import org.apache.hadoop.io.compress.DefaultCodec;
40  import org.apache.hadoop.io.compress.DoNotPool;
41  import org.apache.hadoop.io.compress.GzipCodec;
42  import org.apache.hadoop.util.ReflectionUtils;
43  
44  /**
45   * Compression related stuff.
46   * Copied from hadoop-3315 tfile.
47   */
48  @InterfaceAudience.Private
49  public final class Compression {
50    static final Log LOG = LogFactory.getLog(Compression.class);
51  
52    /**
53     * Prevent the instantiation of class.
54     */
55    private Compression() {
56      super();
57    }
58  
59    static class FinishOnFlushCompressionStream extends FilterOutputStream {
60      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61        super(cout);
62      }
63  
64      @Override
65      public void write(byte b[], int off, int len) throws IOException {
66        out.write(b, off, len);
67      }
68  
69      @Override
70      public void flush() throws IOException {
71        CompressionOutputStream cout = (CompressionOutputStream) out;
72        cout.finish();
73        cout.flush();
74        cout.resetState();
75      }
76    }
77  
78    /**
79     * Returns the classloader to load the Codec class from.
80     */
81    private static ClassLoader getClassLoaderForCodec() {
82      ClassLoader cl = Thread.currentThread().getContextClassLoader();
83      if (cl == null) {
84        cl = Compression.class.getClassLoader();
85      }
86      if (cl == null) {
87        cl = ClassLoader.getSystemClassLoader();
88      }
89      if (cl == null) {
90        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91      }
92      return cl;
93    }
94  
95    /**
96     * Compression algorithms. The ordinal of these cannot change or else you
97     * risk breaking all existing HFiles out there.  Even the ones that are
98     * not compressed! (They use the NONE algorithm)
99     */
100   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101       value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102       justification="We are not serializing so doesn't apply (not sure why transient though)")
103   @InterfaceAudience.Public
104   @InterfaceStability.Evolving
105   public static enum Algorithm {
106     LZO("lzo") {
107       // Use base type to avoid compile-time dependencies.
108       private volatile transient CompressionCodec lzoCodec;
109       private transient Object lock = new Object();
110 
111       @Override
112       CompressionCodec getCodec(Configuration conf) {
113         if (lzoCodec == null) {
114           synchronized (lock) {
115             if (lzoCodec == null) {
116               lzoCodec = buildCodec(conf);
117             }
118           }
119         }
120         return lzoCodec;
121       }
122 
123       private CompressionCodec buildCodec(Configuration conf) {
124         try {
125           Class<?> externalCodec =
126               getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128               new Configuration(conf));
129         } catch (ClassNotFoundException e) {
130           throw new RuntimeException(e);
131         }
132       }
133     },
134     GZ("gz") {
135       private volatile transient GzipCodec codec;
136       private transient Object lock = new Object();
137 
138       @Override
139       DefaultCodec getCodec(Configuration conf) {
140         if (codec == null) {
141           synchronized (lock) {
142             if (codec == null) {
143               codec = buildCodec(conf);
144             }
145           }
146         }
147 
148         return codec;
149       }
150 
151       private GzipCodec buildCodec(Configuration conf) {
152         GzipCodec gzcodec = new ReusableStreamGzipCodec();
153         gzcodec.setConf(new Configuration(conf));
154         return gzcodec;
155       }
156     },
157 
158     NONE("none") {
159       @Override
160       DefaultCodec getCodec(Configuration conf) {
161         return null;
162       }
163 
164       @Override
165       public synchronized InputStream createDecompressionStream(
166           InputStream downStream, Decompressor decompressor,
167           int downStreamBufferSize) throws IOException {
168         if (downStreamBufferSize > 0) {
169           return new BufferedInputStream(downStream, downStreamBufferSize);
170         }
171         // else {
172           // Make sure we bypass FSInputChecker buffer.
173         // return new BufferedInputStream(downStream, 1024);
174         // }
175         // }
176         return downStream;
177       }
178 
179       @Override
180       public synchronized OutputStream createCompressionStream(
181           OutputStream downStream, Compressor compressor,
182           int downStreamBufferSize) throws IOException {
183         if (downStreamBufferSize > 0) {
184           return new BufferedOutputStream(downStream, downStreamBufferSize);
185         }
186 
187         return downStream;
188       }
189     },
190     SNAPPY("snappy") {
191       // Use base type to avoid compile-time dependencies.
192       private volatile transient CompressionCodec snappyCodec;
193       private transient Object lock = new Object();
194 
195       @Override
196       CompressionCodec getCodec(Configuration conf) {
197         if (snappyCodec == null) {
198           synchronized (lock) {
199             if (snappyCodec == null) {
200               snappyCodec = buildCodec(conf);
201             }
202           }
203         }
204         return snappyCodec;
205       }
206 
207       private CompressionCodec buildCodec(Configuration conf) {
208         try {
209           Class<?> externalCodec =
210               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
211           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
212               conf);
213         } catch (ClassNotFoundException e) {
214           throw new RuntimeException(e);
215         }
216       }
217     },
218     LZ4("lz4") {
219       // Use base type to avoid compile-time dependencies.
220       private volatile transient CompressionCodec lz4Codec;
221       private transient Object lock = new Object();
222 
223       @Override
224       CompressionCodec getCodec(Configuration conf) {
225         if (lz4Codec == null) {
226           synchronized (lock) {
227             if (lz4Codec == null) {
228               lz4Codec = buildCodec(conf);
229             }
230           }
231         }
232         return lz4Codec;
233       }
234 
235       private CompressionCodec buildCodec(Configuration conf) {
236         try {
237           Class<?> externalCodec =
238               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
239           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
240               conf);
241         } catch (ClassNotFoundException e) {
242           throw new RuntimeException(e);
243         }
244       }
245   };
246 
247     private final Configuration conf;
248     private final String compressName;
249   // data input buffer size to absorb small reads from application.
250     private static final int DATA_IBUF_SIZE = 1 * 1024;
251   // data output buffer size to absorb small writes from application.
252     private static final int DATA_OBUF_SIZE = 4 * 1024;
253 
254     Algorithm(String name) {
255       this.conf = new Configuration();
256       this.conf.setBoolean("io.native.lib.available", true);
257       this.compressName = name;
258     }
259 
260     abstract CompressionCodec getCodec(Configuration conf);
261 
262     public InputStream createDecompressionStream(
263         InputStream downStream, Decompressor decompressor,
264         int downStreamBufferSize) throws IOException {
265       CompressionCodec codec = getCodec(conf);
266       // Set the internal buffer size to read from down stream.
267       if (downStreamBufferSize > 0) {
268         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
269             downStreamBufferSize);
270       }
271       CompressionInputStream cis =
272           codec.createInputStream(downStream, decompressor);
273       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
274       return bis2;
275 
276     }
277 
278     public OutputStream createCompressionStream(
279         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
280         throws IOException {
281       OutputStream bos1 = null;
282       if (downStreamBufferSize > 0) {
283         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
284       }
285       else {
286         bos1 = downStream;
287       }
288       CompressionOutputStream cos =
289           createPlainCompressionStream(bos1, compressor);
290       BufferedOutputStream bos2 =
291           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
292               DATA_OBUF_SIZE);
293       return bos2;
294     }
295 
296     /**
297      * Creates a compression stream without any additional wrapping into
298      * buffering streams.
299      */
300     public CompressionOutputStream createPlainCompressionStream(
301         OutputStream downStream, Compressor compressor) throws IOException {
302       CompressionCodec codec = getCodec(conf);
303       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
304       return codec.createOutputStream(downStream, compressor);
305     }
306 
307     public Compressor getCompressor() {
308       CompressionCodec codec = getCodec(conf);
309       if (codec != null) {
310         Compressor compressor = CodecPool.getCompressor(codec);
311         if (compressor != null) {
312           if (compressor.finished()) {
313             // Somebody returns the compressor to CodecPool but is still using
314             // it.
315             LOG
316                 .warn("Compressor obtained from CodecPool is already finished()");
317             // throw new AssertionError(
318             // "Compressor obtained from CodecPool is already finished()");
319           }
320           compressor.reset();
321         }
322         return compressor;
323       }
324       return null;
325     }
326 
327     public void returnCompressor(Compressor compressor) {
328       if (compressor != null) {
329         CodecPool.returnCompressor(compressor);
330       }
331     }
332 
333     public Decompressor getDecompressor() {
334       CompressionCodec codec = getCodec(conf);
335       if (codec != null) {
336         Decompressor decompressor = CodecPool.getDecompressor(codec);
337         if (decompressor != null) {
338           if (decompressor.finished()) {
339             // Somebody returns the decompressor to CodecPool but is still using
340             // it.
341             LOG
342                 .warn("Deompressor obtained from CodecPool is already finished()");
343             // throw new AssertionError(
344             // "Decompressor obtained from CodecPool is already finished()");
345           }
346           decompressor.reset();
347         }
348         return decompressor;
349       }
350 
351       return null;
352     }
353 
354     public void returnDecompressor(Decompressor decompressor) {
355       if (decompressor != null) {
356         CodecPool.returnDecompressor(decompressor);
357         if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
358           decompressor.end();
359         }
360       }
361     }
362 
363     public String getName() {
364       return compressName;
365     }
366   }
367 
368   public static Algorithm getCompressionAlgorithmByName(String compressName) {
369     Algorithm[] algos = Algorithm.class.getEnumConstants();
370 
371     for (Algorithm a : algos) {
372       if (a.getName().equals(compressName)) {
373         return a;
374       }
375     }
376 
377     throw new IllegalArgumentException(
378         "Unsupported compression algorithm name: " + compressName);
379   }
380 
381   /**
382    * Get names of supported compression algorithms.
383    *
384    * @return Array of strings, each represents a supported compression
385    * algorithm. Currently, the following compression algorithms are supported.
386    */
387   public static String[] getSupportedAlgorithms() {
388     Algorithm[] algos = Algorithm.class.getEnumConstants();
389 
390     String[] ret = new String[algos.length];
391     int i = 0;
392     for (Algorithm a : algos) {
393       ret[i++] = a.getName();
394     }
395 
396     return ret;
397   }
398 
399   /**
400    * Decompresses data from the given stream using the configured compression
401    * algorithm. It will throw an exception if the dest buffer does not have
402    * enough space to hold the decompressed data.
403    *
404    * @param dest
405    *          the output bytes buffer
406    * @param destOffset
407    *          start writing position of the output buffer
408    * @param bufferedBoundedStream
409    *          a stream to read compressed data from, bounded to the exact amount
410    *          of compressed data
411    * @param compressedSize
412    *          compressed data size, header not included
413    * @param uncompressedSize
414    *          uncompressed data size, header not included
415    * @param compressAlgo
416    *          compression algorithm used
417    * @throws IOException
418    */
419   public static void decompress(byte[] dest, int destOffset,
420       InputStream bufferedBoundedStream, int compressedSize,
421       int uncompressedSize, Compression.Algorithm compressAlgo)
422       throws IOException {
423 
424     if (dest.length - destOffset < uncompressedSize) {
425       throw new IllegalArgumentException(
426           "Output buffer does not have enough space to hold "
427               + uncompressedSize + " decompressed bytes, available: "
428               + (dest.length - destOffset));
429     }
430 
431     Decompressor decompressor = null;
432     try {
433       decompressor = compressAlgo.getDecompressor();
434       InputStream is = compressAlgo.createDecompressionStream(
435           bufferedBoundedStream, decompressor, 0);
436 
437       IOUtils.readFully(is, dest, destOffset, uncompressedSize);
438       is.close();
439     } finally {
440       if (decompressor != null) {
441         compressAlgo.returnDecompressor(decompressor);
442       }
443     }
444   }
445 
446 }