1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.IOUtils;
33  import org.apache.hadoop.io.compress.CodecPool;
34  import org.apache.hadoop.io.compress.CompressionCodec;
35  import org.apache.hadoop.io.compress.CompressionInputStream;
36  import org.apache.hadoop.io.compress.CompressionOutputStream;
37  import org.apache.hadoop.io.compress.Compressor;
38  import org.apache.hadoop.io.compress.Decompressor;
39  import org.apache.hadoop.io.compress.DefaultCodec;
40  import org.apache.hadoop.io.compress.DoNotPool;
41  import org.apache.hadoop.io.compress.GzipCodec;
42  import org.apache.hadoop.util.ReflectionUtils;
43  
44  
45  
46  
47  
48  @InterfaceAudience.Private
49  public final class Compression {
50    private static final Log LOG = LogFactory.getLog(Compression.class);
51  
52    
53  
54  
55    private Compression() {
56      super();
57    }
58  
59    static class FinishOnFlushCompressionStream extends FilterOutputStream {
60      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61        super(cout);
62      }
63  
64      @Override
65      public void write(byte b[], int off, int len) throws IOException {
66        out.write(b, off, len);
67      }
68  
69      @Override
70      public void flush() throws IOException {
71        CompressionOutputStream cout = (CompressionOutputStream) out;
72        cout.finish();
73        cout.flush();
74        cout.resetState();
75      }
76    }
77  
78    
79  
80  
81    private static ClassLoader getClassLoaderForCodec() {
82      ClassLoader cl = Thread.currentThread().getContextClassLoader();
83      if (cl == null) {
84        cl = Compression.class.getClassLoader();
85      }
86      if (cl == null) {
87        cl = ClassLoader.getSystemClassLoader();
88      }
89      if (cl == null) {
90        throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91      }
92      return cl;
93    }
94  
95    
96  
97  
98  
99  
100   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101       value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102       justification="We are not serializing so doesn't apply (not sure why transient though)")
103   @InterfaceAudience.Public
104   @InterfaceStability.Evolving
105   public static enum Algorithm {
106     LZO("lzo") {
107       
108       private volatile transient CompressionCodec lzoCodec;
109       private transient Object lock = new Object();
110 
111       @Override
112       CompressionCodec getCodec(Configuration conf) {
113         if (lzoCodec == null) {
114           synchronized (lock) {
115             if (lzoCodec == null) {
116               lzoCodec = buildCodec(conf);
117             }
118           }
119         }
120         return lzoCodec;
121       }
122 
123       private CompressionCodec buildCodec(Configuration conf) {
124         try {
125           Class<?> externalCodec =
126               getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128               new Configuration(conf));
129         } catch (ClassNotFoundException e) {
130           throw new RuntimeException(e);
131         }
132       }
133     },
134     GZ("gz") {
135       private volatile transient GzipCodec codec;
136       private transient Object lock = new Object();
137 
138       @Override
139       DefaultCodec getCodec(Configuration conf) {
140         if (codec == null) {
141           synchronized (lock) {
142             if (codec == null) {
143               codec = buildCodec(conf);
144             }
145           }
146         }
147 
148         return codec;
149       }
150 
151       private GzipCodec buildCodec(Configuration conf) {
152         GzipCodec gzcodec = new ReusableStreamGzipCodec();
153         gzcodec.setConf(new Configuration(conf));
154         return gzcodec;
155       }
156     },
157 
158     NONE("none") {
159       @Override
160       DefaultCodec getCodec(Configuration conf) {
161         return null;
162       }
163 
164       @Override
165       public synchronized InputStream createDecompressionStream(
166           InputStream downStream, Decompressor decompressor,
167           int downStreamBufferSize) throws IOException {
168         if (downStreamBufferSize > 0) {
169           return new BufferedInputStream(downStream, downStreamBufferSize);
170         }
171         return downStream;
172       }
173 
174       @Override
175       public synchronized OutputStream createCompressionStream(
176           OutputStream downStream, Compressor compressor,
177           int downStreamBufferSize) throws IOException {
178         if (downStreamBufferSize > 0) {
179           return new BufferedOutputStream(downStream, downStreamBufferSize);
180         }
181 
182         return downStream;
183       }
184     },
185     SNAPPY("snappy") {
186       
187       private volatile transient CompressionCodec snappyCodec;
188       private transient Object lock = new Object();
189 
190       @Override
191       CompressionCodec getCodec(Configuration conf) {
192         if (snappyCodec == null) {
193           synchronized (lock) {
194             if (snappyCodec == null) {
195               snappyCodec = buildCodec(conf);
196             }
197           }
198         }
199         return snappyCodec;
200       }
201 
202       private CompressionCodec buildCodec(Configuration conf) {
203         try {
204           Class<?> externalCodec =
205               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
206           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
207         } catch (ClassNotFoundException e) {
208           throw new RuntimeException(e);
209         }
210       }
211     },
212     LZ4("lz4") {
213       
214       private volatile transient CompressionCodec lz4Codec;
215       private transient Object lock = new Object();
216 
217       @Override
218       CompressionCodec getCodec(Configuration conf) {
219         if (lz4Codec == null) {
220           synchronized (lock) {
221             if (lz4Codec == null) {
222               lz4Codec = buildCodec(conf);
223             }
224           }
225         }
226         return lz4Codec;
227       }
228 
229       private CompressionCodec buildCodec(Configuration conf) {
230         try {
231           Class<?> externalCodec =
232               getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
233           return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
234         } catch (ClassNotFoundException e) {
235           throw new RuntimeException(e);
236         }
237       }
238   },
239   ZSTD("zstd") {
240     
241     private volatile transient CompressionCodec zStandardCodec;
242     private transient Object lock = new Object();
243 
244     @Override
245     CompressionCodec getCodec(Configuration conf) {
246       if (zStandardCodec == null) {
247         synchronized (lock) {
248           if (zStandardCodec == null) {
249             zStandardCodec = buildCodec(conf);
250           }
251         }
252       }
253       return zStandardCodec;
254     }
255 
256     private CompressionCodec buildCodec(Configuration conf) {
257       try {
258         Class<?> externalCodec =
259             getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
260         return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
261       } catch (ClassNotFoundException e) {
262         throw new RuntimeException(e);
263       }
264     }
265   };
266 
267     private final transient Configuration conf; 
268     private final String compressName;
269     
270     private static final int DATA_IBUF_SIZE = 1 * 1024;
271     
272     private static final int DATA_OBUF_SIZE = 4 * 1024;
273 
274     Algorithm(String name) {
275       this.conf = new Configuration();
276       this.conf.setBoolean("io.native.lib.available", true);
277       this.compressName = name;
278     }
279 
280     abstract CompressionCodec getCodec(Configuration conf);
281 
282     public InputStream createDecompressionStream(
283         InputStream downStream, Decompressor decompressor,
284         int downStreamBufferSize) throws IOException {
285       CompressionCodec codec = getCodec(conf);
286       
287       if (downStreamBufferSize > 0) {
288         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
289             downStreamBufferSize);
290       }
291       CompressionInputStream cis =
292           codec.createInputStream(downStream, decompressor);
293       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
294       return bis2;
295 
296     }
297 
298     public OutputStream createCompressionStream(
299         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
300         throws IOException {
301       OutputStream bos1 = null;
302       if (downStreamBufferSize > 0) {
303         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
304       }
305       else {
306         bos1 = downStream;
307       }
308       CompressionOutputStream cos =
309           createPlainCompressionStream(bos1, compressor);
310       BufferedOutputStream bos2 =
311           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
312               DATA_OBUF_SIZE);
313       return bos2;
314     }
315 
316     
317 
318 
319 
320     public CompressionOutputStream createPlainCompressionStream(
321         OutputStream downStream, Compressor compressor) throws IOException {
322       CompressionCodec codec = getCodec(conf);
323       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
324       return codec.createOutputStream(downStream, compressor);
325     }
326 
327     public Compressor getCompressor() {
328       CompressionCodec codec = getCodec(conf);
329       if (codec != null) {
330         Compressor compressor = CodecPool.getCompressor(codec);
331         if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
332         if (compressor != null) {
333           if (compressor.finished()) {
334             
335             LOG.warn("Compressor obtained from CodecPool is already finished()");
336           }
337           compressor.reset();
338         }
339         return compressor;
340       }
341       return null;
342     }
343 
344     public void returnCompressor(Compressor compressor) {
345       if (compressor != null) {
346         if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
347         CodecPool.returnCompressor(compressor);
348       }
349     }
350 
351     public Decompressor getDecompressor() {
352       CompressionCodec codec = getCodec(conf);
353       if (codec != null) {
354         Decompressor decompressor = CodecPool.getDecompressor(codec);
355         if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
356         if (decompressor != null) {
357           if (decompressor.finished()) {
358             
359             LOG.warn("Deompressor obtained from CodecPool is already finished()");
360           }
361           decompressor.reset();
362         }
363         return decompressor;
364       }
365 
366       return null;
367     }
368 
369     public void returnDecompressor(Decompressor decompressor) {
370       if (decompressor != null) {
371         if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
372         CodecPool.returnDecompressor(decompressor);
373         if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
374           if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
375           decompressor.end();
376         }
377       }
378     }
379 
380     public String getName() {
381       return compressName;
382     }
383   }
384 
385   public static Algorithm getCompressionAlgorithmByName(String compressName) {
386     Algorithm[] algos = Algorithm.class.getEnumConstants();
387 
388     for (Algorithm a : algos) {
389       if (a.getName().equals(compressName)) {
390         return a;
391       }
392     }
393 
394     throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
395   }
396 
397   
398 
399 
400 
401 
402 
403   public static String[] getSupportedAlgorithms() {
404     Algorithm[] algos = Algorithm.class.getEnumConstants();
405 
406     String[] ret = new String[algos.length];
407     int i = 0;
408     for (Algorithm a : algos) {
409       ret[i++] = a.getName();
410     }
411 
412     return ret;
413   }
414 
415   
416 
417 
418 
419 
420 
421 
422 
423 
424 
425 
426 
427 
428 
429 
430 
431 
432 
433 
434 
435   public static void decompress(byte[] dest, int destOffset,
436       InputStream bufferedBoundedStream, int compressedSize,
437       int uncompressedSize, Compression.Algorithm compressAlgo)
438       throws IOException {
439 
440     if (dest.length - destOffset < uncompressedSize) {
441       throw new IllegalArgumentException(
442           "Output buffer does not have enough space to hold "
443               + uncompressedSize + " decompressed bytes, available: "
444               + (dest.length - destOffset));
445     }
446 
447     Decompressor decompressor = null;
448     try {
449       decompressor = compressAlgo.getDecompressor();
450       InputStream is = compressAlgo.createDecompressionStream(
451           bufferedBoundedStream, decompressor, 0);
452 
453       IOUtils.readFully(is, dest, destOffset, uncompressedSize);
454       is.close();
455     } finally {
456       if (decompressor != null) {
457         compressAlgo.returnDecompressor(decompressor);
458       }
459     }
460   }
461 }