001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017package org.apache.hadoop.hbase.io.compress;
018
019import java.io.BufferedInputStream;
020import java.io.BufferedOutputStream;
021import java.io.FilterOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.io.IOUtils;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DefaultCodec;
036import org.apache.hadoop.io.compress.DoNotPool;
037import org.apache.hadoop.io.compress.GzipCodec;
038import org.apache.hadoop.util.ReflectionUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Compression related stuff.
045 * Copied from hadoop-3315 tfile.
046 */
047@InterfaceAudience.Private
048public final class Compression {
049  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
050
051  /**
052   * Prevent the instantiation of class.
053   */
054  private Compression() {
055    super();
056  }
057
058  static class FinishOnFlushCompressionStream extends FilterOutputStream {
059    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
060      super(cout);
061    }
062
063    @Override
064    public void write(byte b[], int off, int len) throws IOException {
065      out.write(b, off, len);
066    }
067
068    @Override
069    public void flush() throws IOException {
070      CompressionOutputStream cout = (CompressionOutputStream) out;
071      cout.finish();
072      cout.flush();
073      cout.resetState();
074    }
075  }
076
077  /**
078   * Returns the classloader to load the Codec class from.
079   */
080  private static ClassLoader getClassLoaderForCodec() {
081    ClassLoader cl = Thread.currentThread().getContextClassLoader();
082    if (cl == null) {
083      cl = Compression.class.getClassLoader();
084    }
085    if (cl == null) {
086      cl = ClassLoader.getSystemClassLoader();
087    }
088    if (cl == null) {
089      throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
090    }
091    return cl;
092  }
093
094  /**
095   * Compression algorithms. The ordinal of these cannot change or else you
096   * risk breaking all existing HFiles out there.  Even the ones that are
097   * not compressed! (They use the NONE algorithm)
098   */
099  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
100      value="SE_TRANSIENT_FIELD_NOT_RESTORED",
101      justification="We are not serializing so doesn't apply (not sure why transient though)")
102  @InterfaceAudience.Public
103  public static enum Algorithm {
104    LZO("lzo") {
105      // Use base type to avoid compile-time dependencies.
106      private volatile transient CompressionCodec lzoCodec;
107      private final transient Object lock = new Object();
108
109      @Override
110      CompressionCodec getCodec(Configuration conf) {
111        if (lzoCodec == null) {
112          synchronized (lock) {
113            if (lzoCodec == null) {
114              lzoCodec = buildCodec(conf);
115            }
116          }
117        }
118        return lzoCodec;
119      }
120
121      private CompressionCodec buildCodec(Configuration conf) {
122        try {
123          Class<?> externalCodec =
124              getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
125          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
126              new Configuration(conf));
127        } catch (ClassNotFoundException e) {
128          throw new RuntimeException(e);
129        }
130      }
131    },
132    GZ("gz") {
133      private volatile transient GzipCodec codec;
134      private final transient Object lock = new Object();
135
136      @Override
137      DefaultCodec getCodec(Configuration conf) {
138        if (codec == null) {
139          synchronized (lock) {
140            if (codec == null) {
141              codec = buildCodec(conf);
142            }
143          }
144        }
145
146        return codec;
147      }
148
149      private GzipCodec buildCodec(Configuration conf) {
150        GzipCodec gzcodec = new ReusableStreamGzipCodec();
151        gzcodec.setConf(new Configuration(conf));
152        return gzcodec;
153      }
154    },
155
156    NONE("none") {
157      @Override
158      DefaultCodec getCodec(Configuration conf) {
159        return null;
160      }
161
162      @Override
163      public synchronized InputStream createDecompressionStream(
164          InputStream downStream, Decompressor decompressor,
165          int downStreamBufferSize) throws IOException {
166        if (downStreamBufferSize > 0) {
167          return new BufferedInputStream(downStream, downStreamBufferSize);
168        }
169        return downStream;
170      }
171
172      @Override
173      public synchronized OutputStream createCompressionStream(
174          OutputStream downStream, Compressor compressor,
175          int downStreamBufferSize) throws IOException {
176        if (downStreamBufferSize > 0) {
177          return new BufferedOutputStream(downStream, downStreamBufferSize);
178        }
179
180        return downStream;
181      }
182    },
183    SNAPPY("snappy") {
184      // Use base type to avoid compile-time dependencies.
185      private volatile transient CompressionCodec snappyCodec;
186      private final transient Object lock = new Object();
187
188      @Override
189      CompressionCodec getCodec(Configuration conf) {
190        if (snappyCodec == null) {
191          synchronized (lock) {
192            if (snappyCodec == null) {
193              snappyCodec = buildCodec(conf);
194            }
195          }
196        }
197        return snappyCodec;
198      }
199
200      private CompressionCodec buildCodec(Configuration conf) {
201        try {
202          Class<?> externalCodec =
203              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
204          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
205        } catch (ClassNotFoundException e) {
206          throw new RuntimeException(e);
207        }
208      }
209    },
210    LZ4("lz4") {
211      // Use base type to avoid compile-time dependencies.
212      private volatile transient CompressionCodec lz4Codec;
213      private final transient Object lock = new Object();
214
215      @Override
216      CompressionCodec getCodec(Configuration conf) {
217        if (lz4Codec == null) {
218          synchronized (lock) {
219            if (lz4Codec == null) {
220              lz4Codec = buildCodec(conf);
221            }
222          }
223        }
224        return lz4Codec;
225      }
226
227      private CompressionCodec buildCodec(Configuration conf) {
228        try {
229          Class<?> externalCodec =
230              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
231          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
232        } catch (ClassNotFoundException e) {
233          throw new RuntimeException(e);
234        }
235      }
236    },
237    BZIP2("bzip2") {
238      // Use base type to avoid compile-time dependencies.
239      private volatile transient CompressionCodec bzipCodec;
240      private final transient Object lock = new Object();
241
242      @Override
243      CompressionCodec getCodec(Configuration conf) {
244        if (bzipCodec == null) {
245          synchronized (lock) {
246            if (bzipCodec == null) {
247              bzipCodec = buildCodec(conf);
248            }
249          }
250        }
251        return bzipCodec;
252      }
253
254      private CompressionCodec buildCodec(Configuration conf) {
255        try {
256          Class<?> externalCodec =
257              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.BZip2Codec");
258          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
259        } catch (ClassNotFoundException e) {
260          throw new RuntimeException(e);
261        }
262      }
263    },
264    ZSTD("zstd") {
265      // Use base type to avoid compile-time dependencies.
266      private volatile transient CompressionCodec zStandardCodec;
267      private final transient Object lock = new Object();
268
269      @Override
270      CompressionCodec getCodec(Configuration conf) {
271        if (zStandardCodec == null) {
272          synchronized (lock) {
273            if (zStandardCodec == null) {
274              zStandardCodec = buildCodec(conf);
275            }
276          }
277        }
278        return zStandardCodec;
279      }
280
281      private CompressionCodec buildCodec(Configuration conf) {
282        try {
283          Class<?> externalCodec =
284              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
285          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
286        } catch (ClassNotFoundException e) {
287          throw new RuntimeException(e);
288        }
289      }
290    };
291
292    private final Configuration conf;
293    private final String compressName;
294    /** data input buffer size to absorb small reads from application. */
295    private static final int DATA_IBUF_SIZE = 1 * 1024;
296    /** data output buffer size to absorb small writes from application. */
297    private static final int DATA_OBUF_SIZE = 4 * 1024;
298
299    Algorithm(String name) {
300      this.conf = new Configuration();
301      this.conf.setBoolean("io.native.lib.available", true);
302      this.compressName = name;
303    }
304
305    abstract CompressionCodec getCodec(Configuration conf);
306
307    public InputStream createDecompressionStream(
308        InputStream downStream, Decompressor decompressor,
309        int downStreamBufferSize) throws IOException {
310      CompressionCodec codec = getCodec(conf);
311      // Set the internal buffer size to read from down stream.
312      if (downStreamBufferSize > 0) {
313        ((Configurable)codec).getConf().setInt("io.file.buffer.size",
314            downStreamBufferSize);
315      }
316      CompressionInputStream cis =
317          codec.createInputStream(downStream, decompressor);
318      BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
319      return bis2;
320
321    }
322
323    public OutputStream createCompressionStream(
324        OutputStream downStream, Compressor compressor, int downStreamBufferSize)
325        throws IOException {
326      OutputStream bos1 = null;
327      if (downStreamBufferSize > 0) {
328        bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
329      }
330      else {
331        bos1 = downStream;
332      }
333      CompressionOutputStream cos =
334          createPlainCompressionStream(bos1, compressor);
335      BufferedOutputStream bos2 =
336          new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
337              DATA_OBUF_SIZE);
338      return bos2;
339    }
340
341    /**
342     * Creates a compression stream without any additional wrapping into
343     * buffering streams.
344     */
345    public CompressionOutputStream createPlainCompressionStream(
346        OutputStream downStream, Compressor compressor) throws IOException {
347      CompressionCodec codec = getCodec(conf);
348      ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
349      return codec.createOutputStream(downStream, compressor);
350    }
351
352    public Compressor getCompressor() {
353      CompressionCodec codec = getCodec(conf);
354      if (codec != null) {
355        Compressor compressor = CodecPool.getCompressor(codec);
356        if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
357        if (compressor != null) {
358          if (compressor.finished()) {
359            // Somebody returns the compressor to CodecPool but is still using it.
360            LOG.warn("Compressor obtained from CodecPool is already finished()");
361          }
362          compressor.reset();
363        }
364        return compressor;
365      }
366      return null;
367    }
368
369    public void returnCompressor(Compressor compressor) {
370      if (compressor != null) {
371        if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
372        CodecPool.returnCompressor(compressor);
373      }
374    }
375
376    public Decompressor getDecompressor() {
377      CompressionCodec codec = getCodec(conf);
378      if (codec != null) {
379        Decompressor decompressor = CodecPool.getDecompressor(codec);
380        if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
381        if (decompressor != null) {
382          if (decompressor.finished()) {
383            // Somebody returns the decompressor to CodecPool but is still using it.
384            LOG.warn("Deompressor obtained from CodecPool is already finished()");
385          }
386          decompressor.reset();
387        }
388        return decompressor;
389      }
390
391      return null;
392    }
393
394    public void returnDecompressor(Decompressor decompressor) {
395      if (decompressor != null) {
396        if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
397        CodecPool.returnDecompressor(decompressor);
398        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
399          if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
400          decompressor.end();
401        }
402      }
403    }
404
405    public String getName() {
406      return compressName;
407    }
408  }
409
410  public static Algorithm getCompressionAlgorithmByName(String compressName) {
411    Algorithm[] algos = Algorithm.class.getEnumConstants();
412
413    for (Algorithm a : algos) {
414      if (a.getName().equals(compressName)) {
415        return a;
416      }
417    }
418
419    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
420  }
421
422  /**
423   * Get names of supported compression algorithms.
424   *
425   * @return Array of strings, each represents a supported compression
426   * algorithm. Currently, the following compression algorithms are supported.
427   */
428  public static String[] getSupportedAlgorithms() {
429    Algorithm[] algos = Algorithm.class.getEnumConstants();
430
431    String[] ret = new String[algos.length];
432    int i = 0;
433    for (Algorithm a : algos) {
434      ret[i++] = a.getName();
435    }
436
437    return ret;
438  }
439
440  /**
441   * Decompresses data from the given stream using the configured compression
442   * algorithm. It will throw an exception if the dest buffer does not have
443   * enough space to hold the decompressed data.
444   *
445   * @param dest
446   *          the output bytes buffer
447   * @param destOffset
448   *          start writing position of the output buffer
449   * @param bufferedBoundedStream
450   *          a stream to read compressed data from, bounded to the exact amount
451   *          of compressed data
452   * @param compressedSize
453   *          compressed data size, header not included
454   * @param uncompressedSize
455   *          uncompressed data size, header not included
456   * @param compressAlgo
457   *          compression algorithm used
458   * @throws IOException
459   */
460  public static void decompress(byte[] dest, int destOffset,
461      InputStream bufferedBoundedStream, int compressedSize,
462      int uncompressedSize, Compression.Algorithm compressAlgo)
463      throws IOException {
464
465    if (dest.length - destOffset < uncompressedSize) {
466      throw new IllegalArgumentException(
467          "Output buffer does not have enough space to hold "
468              + uncompressedSize + " decompressed bytes, available: "
469              + (dest.length - destOffset));
470    }
471
472    Decompressor decompressor = null;
473    try {
474      decompressor = compressAlgo.getDecompressor();
475      InputStream is = compressAlgo.createDecompressionStream(
476          bufferedBoundedStream, decompressor, 0);
477
478      IOUtils.readFully(is, dest, destOffset, uncompressedSize);
479      is.close();
480    } finally {
481      if (decompressor != null) {
482        compressAlgo.returnDecompressor(decompressor);
483      }
484    }
485  }
486}