View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.IOUtils;
33  import org.apache.hadoop.io.compress.CodecPool;
34  import org.apache.hadoop.io.compress.CompressionCodec;
35  import org.apache.hadoop.io.compress.CompressionInputStream;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  import org.apache.hadoop.io.compress.Decompressor;
39  import org.apache.hadoop.io.compress.DefaultCodec;
40  import org.apache.hadoop.io.compress.DoNotPool;
41  import org.apache.hadoop.io.compress.GzipCodec;
42  import org.apache.hadoop.util.ReflectionUtils;
43  
44  /**
45   * Compression related stuff.
46   * Copied from hadoop-3315 tfile.
47   */
48  @InterfaceAudience.Private
49  public final class Compression {
50    static final Log LOG = LogFactory.getLog(Compression.class);
51  
52    /**
53     * Prevent the instantiation of class.
54     */
55    private Compression() {
56      super();
57    }
58  
59    static class FinishOnFlushCompressionStream extends FilterOutputStream {
60      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61        super(cout);
62      }
63  
64      @Override
65      public void write(byte b[], int off, int len) throws IOException {
66        out.write(b, off, len);
67      }
68  
69      @Override
70      public void flush() throws IOException {
71        CompressionOutputStream cout = (CompressionOutputStream) out;
72        cout.finish();
73        cout.flush();
74        cout.resetState();
75      }
76    }
77  
78    /**
79     * Returns the classloader to load the Codec class from.
80     */
81    private static ClassLoader getClassLoaderForCodec() {
82      ClassLoader cl = Thread.currentThread().getContextClassLoader();
83      if (cl == null) {
84        cl = Compression.class.getClassLoader();
85      }
86      if (cl == null) {
87        cl = ClassLoader.getSystemClassLoader();
88      }
89      if (cl == null) {
90        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91      }
92      return cl;
93    }
94  
95    /**
96     * Compression algorithms. The ordinal of these cannot change or else you
97     * risk breaking all existing HFiles out there.  Even the ones that are
98     * not compressed! (They use the NONE algorithm)
99     */
100   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101       value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102       justification="We are not serializing so doesn't apply (not sure why transient though)")
103   @InterfaceAudience.Public
104   @InterfaceStability.Evolving
105   public static enum Algorithm {
106     LZO("lzo") {
107       // Use base type to avoid compile-time dependencies.
108       private volatile transient CompressionCodec lzoCodec;
109       private transient Object lock = new Object();
110 
111       @Override
112       CompressionCodec getCodec(Configuration conf) {
113         if (lzoCodec == null) {
114           synchronized (lock) {
115             if (lzoCodec == null) {
116               lzoCodec = buildCodec(conf);
117             }
118           }
119         }
120         return lzoCodec;
121       }
122 
123       private CompressionCodec buildCodec(Configuration conf) {
124         try {
125           Class<?> externalCodec =
126               getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128               new Configuration(conf));
129         } catch (ClassNotFoundException e) {
130           throw new RuntimeException(e);
131         }
132       }
133     },
134     GZ("gz") {
135       private volatile transient GzipCodec codec;
136       private transient Object lock = new Object();
137 
138       @Override
139       DefaultCodec getCodec(Configuration conf) {
140         if (codec == null) {
141           synchronized (lock) {
142             if (codec == null) {
143               codec = buildCodec(conf);
144             }
145           }
146         }
147 
148         return codec;
149       }
150 
151       private GzipCodec buildCodec(Configuration conf) {
152         GzipCodec gzcodec = new ReusableStreamGzipCodec();
153         gzcodec.setConf(new Configuration(conf));
154         return gzcodec;
155       }
156     },
157 
158     NONE("none") {
159       @Override
160       DefaultCodec getCodec(Configuration conf) {
161         return null;
162       }
163 
164       @Override
165       public synchronized InputStream createDecompressionStream(
166           InputStream downStream, Decompressor decompressor,
167           int downStreamBufferSize) throws IOException {
168         if (downStreamBufferSize > 0) {
169           return new BufferedInputStream(downStream, downStreamBufferSize);
170         }
171         return downStream;
172       }
173 
174       @Override
175       public synchronized OutputStream createCompressionStream(
176           OutputStream downStream, Compressor compressor,
177           int downStreamBufferSize) throws IOException {
178         if (downStreamBufferSize > 0) {
179           return new BufferedOutputStream(downStream, downStreamBufferSize);
180         }
181 
182         return downStream;
183       }
184     },
185     SNAPPY("snappy") {
186       // Use base type to avoid compile-time dependencies.
187       private volatile transient CompressionCodec snappyCodec;
188       private transient Object lock = new Object();
189 
190       @Override
191       CompressionCodec getCodec(Configuration conf) {
192         if (snappyCodec == null) {
193           synchronized (lock) {
194             if (snappyCodec == null) {
195               snappyCodec = buildCodec(conf);
196             }
197           }
198         }
199         return snappyCodec;
200       }
201 
202       private CompressionCodec buildCodec(Configuration conf) {
203         try {
204           Class<?> externalCodec =
205               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
206           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
207         } catch (ClassNotFoundException e) {
208           throw new RuntimeException(e);
209         }
210       }
211     },
212     LZ4("lz4") {
213       // Use base type to avoid compile-time dependencies.
214       private volatile transient CompressionCodec lz4Codec;
215       private transient Object lock = new Object();
216 
217       @Override
218       CompressionCodec getCodec(Configuration conf) {
219         if (lz4Codec == null) {
220           synchronized (lock) {
221             if (lz4Codec == null) {
222               lz4Codec = buildCodec(conf);
223             }
224           }
225         }
226         return lz4Codec;
227       }
228 
229       private CompressionCodec buildCodec(Configuration conf) {
230         try {
231           Class<?> externalCodec =
232               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
233           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
234         } catch (ClassNotFoundException e) {
235           throw new RuntimeException(e);
236         }
237       }
238   };
239 
240     private final Configuration conf;
241     private final String compressName;
242     /** data input buffer size to absorb small reads from application. */
243     private static final int DATA_IBUF_SIZE = 1 * 1024;
244     /** data output buffer size to absorb small writes from application. */
245     private static final int DATA_OBUF_SIZE = 4 * 1024;
246 
247     Algorithm(String name) {
248       this.conf = new Configuration();
249       this.conf.setBoolean("io.native.lib.available", true);
250       this.compressName = name;
251     }
252 
253     abstract CompressionCodec getCodec(Configuration conf);
254 
255     public InputStream createDecompressionStream(
256         InputStream downStream, Decompressor decompressor,
257         int downStreamBufferSize) throws IOException {
258       CompressionCodec codec = getCodec(conf);
259       // Set the internal buffer size to read from down stream.
260       if (downStreamBufferSize > 0) {
261         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
262             downStreamBufferSize);
263       }
264       CompressionInputStream cis =
265           codec.createInputStream(downStream, decompressor);
266       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
267       return bis2;
268 
269     }
270 
271     public OutputStream createCompressionStream(
272         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
273         throws IOException {
274       OutputStream bos1 = null;
275       if (downStreamBufferSize > 0) {
276         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
277       }
278       else {
279         bos1 = downStream;
280       }
281       CompressionOutputStream cos =
282           createPlainCompressionStream(bos1, compressor);
283       BufferedOutputStream bos2 =
284           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
285               DATA_OBUF_SIZE);
286       return bos2;
287     }
288 
289     /**
290      * Creates a compression stream without any additional wrapping into
291      * buffering streams.
292      */
293     public CompressionOutputStream createPlainCompressionStream(
294         OutputStream downStream, Compressor compressor) throws IOException {
295       CompressionCodec codec = getCodec(conf);
296       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
297       return codec.createOutputStream(downStream, compressor);
298     }
299 
300     public Compressor getCompressor() {
301       CompressionCodec codec = getCodec(conf);
302       if (codec != null) {
303         Compressor compressor = CodecPool.getCompressor(codec);
304         if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
305         if (compressor != null) {
306           if (compressor.finished()) {
307             // Somebody returns the compressor to CodecPool but is still using it.
308             LOG.warn("Compressor obtained from CodecPool is already finished()");
309           }
310           compressor.reset();
311         }
312         return compressor;
313       }
314       return null;
315     }
316 
317     public void returnCompressor(Compressor compressor) {
318       if (compressor != null) {
319         if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
320         CodecPool.returnCompressor(compressor);
321       }
322     }
323 
324     public Decompressor getDecompressor() {
325       CompressionCodec codec = getCodec(conf);
326       if (codec != null) {
327         Decompressor decompressor = CodecPool.getDecompressor(codec);
328         if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
329         if (decompressor != null) {
330           if (decompressor.finished()) {
331             // Somebody returns the decompressor to CodecPool but is still using it.
332             LOG.warn("Deompressor obtained from CodecPool is already finished()");
333           }
334           decompressor.reset();
335         }
336         return decompressor;
337       }
338 
339       return null;
340     }
341 
342     public void returnDecompressor(Decompressor decompressor) {
343       if (decompressor != null) {
344         if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
345         CodecPool.returnDecompressor(decompressor);
346         if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
347           if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
348           decompressor.end();
349         }
350       }
351     }
352 
353     public String getName() {
354       return compressName;
355     }
356   }
357 
358   public static Algorithm getCompressionAlgorithmByName(String compressName) {
359     Algorithm[] algos = Algorithm.class.getEnumConstants();
360 
361     for (Algorithm a : algos) {
362       if (a.getName().equals(compressName)) {
363         return a;
364       }
365     }
366 
367     throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
368   }
369 
370   /**
371    * Get names of supported compression algorithms.
372    *
373    * @return Array of strings, each represents a supported compression
374    * algorithm. Currently, the following compression algorithms are supported.
375    */
376   public static String[] getSupportedAlgorithms() {
377     Algorithm[] algos = Algorithm.class.getEnumConstants();
378 
379     String[] ret = new String[algos.length];
380     int i = 0;
381     for (Algorithm a : algos) {
382       ret[i++] = a.getName();
383     }
384 
385     return ret;
386   }
387 
388   /**
389    * Decompresses data from the given stream using the configured compression
390    * algorithm. It will throw an exception if the dest buffer does not have
391    * enough space to hold the decompressed data.
392    *
393    * @param dest
394    *          the output bytes buffer
395    * @param destOffset
396    *          start writing position of the output buffer
397    * @param bufferedBoundedStream
398    *          a stream to read compressed data from, bounded to the exact amount
399    *          of compressed data
400    * @param compressedSize
401    *          compressed data size, header not included
402    * @param uncompressedSize
403    *          uncompressed data size, header not included
404    * @param compressAlgo
405    *          compression algorithm used
406    * @throws IOException
407    */
408   public static void decompress(byte[] dest, int destOffset,
409       InputStream bufferedBoundedStream, int compressedSize,
410       int uncompressedSize, Compression.Algorithm compressAlgo)
411       throws IOException {
412 
413     if (dest.length - destOffset < uncompressedSize) {
414       throw new IllegalArgumentException(
415           "Output buffer does not have enough space to hold "
416               + uncompressedSize + " decompressed bytes, available: "
417               + (dest.length - destOffset));
418     }
419 
420     Decompressor decompressor = null;
421     try {
422       decompressor = compressAlgo.getDecompressor();
423       InputStream is = compressAlgo.createDecompressionStream(
424           bufferedBoundedStream, decompressor, 0);
425 
426       IOUtils.readFully(is, dest, destOffset, uncompressedSize);
427       is.close();
428     } finally {
429       if (decompressor != null) {
430         compressAlgo.returnDecompressor(decompressor);
431       }
432     }
433   }
434 }