View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.IOUtils;
33  import org.apache.hadoop.io.compress.CodecPool;
34  import org.apache.hadoop.io.compress.CompressionCodec;
35  import org.apache.hadoop.io.compress.CompressionInputStream;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  import org.apache.hadoop.io.compress.Decompressor;
39  import org.apache.hadoop.io.compress.DefaultCodec;
40  import org.apache.hadoop.io.compress.DoNotPool;
41  import org.apache.hadoop.io.compress.GzipCodec;
42  import org.apache.hadoop.util.ReflectionUtils;
43  
44  /**
45   * Compression related stuff.
46   * Copied from hadoop-3315 tfile.
47   */
48  @InterfaceAudience.Private
49  public final class Compression {
50    private static final Log LOG = LogFactory.getLog(Compression.class);
51  
52    /**
53     * Prevent the instantiation of class.
54     */
55    private Compression() {
56      super();
57    }
58  
59    static class FinishOnFlushCompressionStream extends FilterOutputStream {
60      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61        super(cout);
62      }
63  
64      @Override
65      public void write(byte b[], int off, int len) throws IOException {
66        out.write(b, off, len);
67      }
68  
69      @Override
70      public void flush() throws IOException {
71        CompressionOutputStream cout = (CompressionOutputStream) out;
72        cout.finish();
73        cout.flush();
74        cout.resetState();
75      }
76    }
77  
78    /**
79     * Returns the classloader to load the Codec class from.
80     */
81    private static ClassLoader getClassLoaderForCodec() {
82      ClassLoader cl = Thread.currentThread().getContextClassLoader();
83      if (cl == null) {
84        cl = Compression.class.getClassLoader();
85      }
86      if (cl == null) {
87        cl = ClassLoader.getSystemClassLoader();
88      }
89      if (cl == null) {
90        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91      }
92      return cl;
93    }
94  
95    /**
96     * Compression algorithms. The ordinal of these cannot change or else you
97     * risk breaking all existing HFiles out there.  Even the ones that are
98     * not compressed! (They use the NONE algorithm)
99     */
100   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101       value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102       justification="We are not serializing so doesn't apply (not sure why transient though)")
103   @InterfaceAudience.Public
104   @InterfaceStability.Evolving
105   public static enum Algorithm {
106     LZO("lzo") {
107       // Use base type to avoid compile-time dependencies.
108       private volatile transient CompressionCodec lzoCodec;
109       private transient Object lock = new Object();
110 
111       @Override
112       CompressionCodec getCodec(Configuration conf) {
113         if (lzoCodec == null) {
114           synchronized (lock) {
115             if (lzoCodec == null) {
116               lzoCodec = buildCodec(conf);
117             }
118           }
119         }
120         return lzoCodec;
121       }
122 
123       private CompressionCodec buildCodec(Configuration conf) {
124         try {
125           Class<?> externalCodec =
126               getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128               new Configuration(conf));
129         } catch (ClassNotFoundException e) {
130           throw new RuntimeException(e);
131         }
132       }
133     },
134     GZ("gz") {
135       private volatile transient GzipCodec codec;
136       private transient Object lock = new Object();
137 
138       @Override
139       DefaultCodec getCodec(Configuration conf) {
140         if (codec == null) {
141           synchronized (lock) {
142             if (codec == null) {
143               codec = buildCodec(conf);
144             }
145           }
146         }
147 
148         return codec;
149       }
150 
151       private GzipCodec buildCodec(Configuration conf) {
152         GzipCodec gzcodec = new ReusableStreamGzipCodec();
153         gzcodec.setConf(new Configuration(conf));
154         return gzcodec;
155       }
156     },
157 
158     NONE("none") {
159       @Override
160       DefaultCodec getCodec(Configuration conf) {
161         return null;
162       }
163 
164       @Override
165       public synchronized InputStream createDecompressionStream(
166           InputStream downStream, Decompressor decompressor,
167           int downStreamBufferSize) throws IOException {
168         if (downStreamBufferSize > 0) {
169           return new BufferedInputStream(downStream, downStreamBufferSize);
170         }
171         return downStream;
172       }
173 
174       @Override
175       public synchronized OutputStream createCompressionStream(
176           OutputStream downStream, Compressor compressor,
177           int downStreamBufferSize) throws IOException {
178         if (downStreamBufferSize > 0) {
179           return new BufferedOutputStream(downStream, downStreamBufferSize);
180         }
181 
182         return downStream;
183       }
184     },
185     SNAPPY("snappy") {
186       // Use base type to avoid compile-time dependencies.
187       private volatile transient CompressionCodec snappyCodec;
188       private transient Object lock = new Object();
189 
190       @Override
191       CompressionCodec getCodec(Configuration conf) {
192         if (snappyCodec == null) {
193           synchronized (lock) {
194             if (snappyCodec == null) {
195               snappyCodec = buildCodec(conf);
196             }
197           }
198         }
199         return snappyCodec;
200       }
201 
202       private CompressionCodec buildCodec(Configuration conf) {
203         try {
204           Class<?> externalCodec =
205               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
206           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
207         } catch (ClassNotFoundException e) {
208           throw new RuntimeException(e);
209         }
210       }
211     },
212     LZ4("lz4") {
213       // Use base type to avoid compile-time dependencies.
214       private volatile transient CompressionCodec lz4Codec;
215       private transient Object lock = new Object();
216 
217       @Override
218       CompressionCodec getCodec(Configuration conf) {
219         if (lz4Codec == null) {
220           synchronized (lock) {
221             if (lz4Codec == null) {
222               lz4Codec = buildCodec(conf);
223             }
224           }
225         }
226         return lz4Codec;
227       }
228 
229       private CompressionCodec buildCodec(Configuration conf) {
230         try {
231           Class<?> externalCodec =
232               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
233           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
234         } catch (ClassNotFoundException e) {
235           throw new RuntimeException(e);
236         }
237       }
238   },
239   ZSTD("zstd") {
240     // Use base type to avoid compile-time dependencies.
241     private volatile transient CompressionCodec zStandardCodec;
242     private transient Object lock = new Object();
243 
244     @Override
245     CompressionCodec getCodec(Configuration conf) {
246       if (zStandardCodec == null) {
247         synchronized (lock) {
248           if (zStandardCodec == null) {
249             zStandardCodec = buildCodec(conf);
250           }
251         }
252       }
253       return zStandardCodec;
254     }
255 
256     private CompressionCodec buildCodec(Configuration conf) {
257       try {
258         Class<?> externalCodec =
259             getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
260         return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
261       } catch (ClassNotFoundException e) {
262         throw new RuntimeException(e);
263       }
264     }
265   };
266 
267     private final transient Configuration conf; // FindBugs: SE_BAD_FIELD so just made it transient
268     private final String compressName;
269     /** data input buffer size to absorb small reads from application. */
270     private static final int DATA_IBUF_SIZE = 1 * 1024;
271     /** data output buffer size to absorb small writes from application. */
272     private static final int DATA_OBUF_SIZE = 4 * 1024;
273 
274     Algorithm(String name) {
275       this.conf = new Configuration();
276       this.conf.setBoolean("io.native.lib.available", true);
277       this.compressName = name;
278     }
279 
280     abstract CompressionCodec getCodec(Configuration conf);
281 
282     public InputStream createDecompressionStream(
283         InputStream downStream, Decompressor decompressor,
284         int downStreamBufferSize) throws IOException {
285       CompressionCodec codec = getCodec(conf);
286       // Set the internal buffer size to read from down stream.
287       if (downStreamBufferSize > 0) {
288         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
289             downStreamBufferSize);
290       }
291       CompressionInputStream cis =
292           codec.createInputStream(downStream, decompressor);
293       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
294       return bis2;
295 
296     }
297 
298     public OutputStream createCompressionStream(
299         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
300         throws IOException {
301       OutputStream bos1 = null;
302       if (downStreamBufferSize > 0) {
303         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
304       }
305       else {
306         bos1 = downStream;
307       }
308       CompressionOutputStream cos =
309           createPlainCompressionStream(bos1, compressor);
310       BufferedOutputStream bos2 =
311           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
312               DATA_OBUF_SIZE);
313       return bos2;
314     }
315 
316     /**
317      * Creates a compression stream without any additional wrapping into
318      * buffering streams.
319      */
320     public CompressionOutputStream createPlainCompressionStream(
321         OutputStream downStream, Compressor compressor) throws IOException {
322       CompressionCodec codec = getCodec(conf);
323       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
324       return codec.createOutputStream(downStream, compressor);
325     }
326 
327     public Compressor getCompressor() {
328       CompressionCodec codec = getCodec(conf);
329       if (codec != null) {
330         Compressor compressor = CodecPool.getCompressor(codec);
331         if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
332         if (compressor != null) {
333           if (compressor.finished()) {
334             // Somebody returns the compressor to CodecPool but is still using it.
335             LOG.warn("Compressor obtained from CodecPool is already finished()");
336           }
337           compressor.reset();
338         }
339         return compressor;
340       }
341       return null;
342     }
343 
344     public void returnCompressor(Compressor compressor) {
345       if (compressor != null) {
346         if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
347         CodecPool.returnCompressor(compressor);
348       }
349     }
350 
351     public Decompressor getDecompressor() {
352       CompressionCodec codec = getCodec(conf);
353       if (codec != null) {
354         Decompressor decompressor = CodecPool.getDecompressor(codec);
355         if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
356         if (decompressor != null) {
357           if (decompressor.finished()) {
358             // Somebody returns the decompressor to CodecPool but is still using it.
359             LOG.warn("Deompressor obtained from CodecPool is already finished()");
360           }
361           decompressor.reset();
362         }
363         return decompressor;
364       }
365 
366       return null;
367     }
368 
369     public void returnDecompressor(Decompressor decompressor) {
370       if (decompressor != null) {
371         if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
372         CodecPool.returnDecompressor(decompressor);
373         if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
374           if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
375           decompressor.end();
376         }
377       }
378     }
379 
380     public String getName() {
381       return compressName;
382     }
383   }
384 
385   public static Algorithm getCompressionAlgorithmByName(String compressName) {
386     Algorithm[] algos = Algorithm.class.getEnumConstants();
387 
388     for (Algorithm a : algos) {
389       if (a.getName().equals(compressName)) {
390         return a;
391       }
392     }
393 
394     throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
395   }
396 
397   /**
398    * Get names of supported compression algorithms.
399    *
400    * @return Array of strings, each represents a supported compression
401    * algorithm. Currently, the following compression algorithms are supported.
402    */
403   public static String[] getSupportedAlgorithms() {
404     Algorithm[] algos = Algorithm.class.getEnumConstants();
405 
406     String[] ret = new String[algos.length];
407     int i = 0;
408     for (Algorithm a : algos) {
409       ret[i++] = a.getName();
410     }
411 
412     return ret;
413   }
414 
415   /**
416    * Decompresses data from the given stream using the configured compression
417    * algorithm. It will throw an exception if the dest buffer does not have
418    * enough space to hold the decompressed data.
419    *
420    * @param dest
421    *          the output bytes buffer
422    * @param destOffset
423    *          start writing position of the output buffer
424    * @param bufferedBoundedStream
425    *          a stream to read compressed data from, bounded to the exact amount
426    *          of compressed data
427    * @param compressedSize
428    *          compressed data size, header not included
429    * @param uncompressedSize
430    *          uncompressed data size, header not included
431    * @param compressAlgo
432    *          compression algorithm used
433    * @throws IOException
434    */
435   public static void decompress(byte[] dest, int destOffset,
436       InputStream bufferedBoundedStream, int compressedSize,
437       int uncompressedSize, Compression.Algorithm compressAlgo)
438       throws IOException {
439 
440     if (dest.length - destOffset < uncompressedSize) {
441       throw new IllegalArgumentException(
442           "Output buffer does not have enough space to hold "
443               + uncompressedSize + " decompressed bytes, available: "
444               + (dest.length - destOffset));
445     }
446 
447     Decompressor decompressor = null;
448     try {
449       decompressor = compressAlgo.getDecompressor();
450       InputStream is = compressAlgo.createDecompressionStream(
451           bufferedBoundedStream, decompressor, 0);
452 
453       IOUtils.readFully(is, dest, destOffset, uncompressedSize);
454       is.close();
455     } finally {
456       if (decompressor != null) {
457         compressAlgo.returnDecompressor(decompressor);
458       }
459     }
460   }
461 }