001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017package org.apache.hadoop.hbase.io.compress;
018
019import java.io.BufferedInputStream;
020import java.io.BufferedOutputStream;
021import java.io.FilterOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.io.util.BlockIOUtils;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.io.compress.CodecPool;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.CompressionInputStream;
033import org.apache.hadoop.io.compress.CompressionOutputStream;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.hadoop.io.compress.DefaultCodec;
037import org.apache.hadoop.io.compress.DoNotPool;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.util.ReflectionUtils;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Compression related stuff.
046 * Copied from hadoop-3315 tfile.
047 */
048@InterfaceAudience.Private
049public final class Compression {
050  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
051
052  /**
053   * Prevent the instantiation of class.
054   */
055  private Compression() {
056    super();
057  }
058
059  static class FinishOnFlushCompressionStream extends FilterOutputStream {
060    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
061      super(cout);
062    }
063
064    @Override
065    public void write(byte b[], int off, int len) throws IOException {
066      out.write(b, off, len);
067    }
068
069    @Override
070    public void flush() throws IOException {
071      CompressionOutputStream cout = (CompressionOutputStream) out;
072      cout.finish();
073      cout.flush();
074      cout.resetState();
075    }
076  }
077
078  /**
079   * Returns the classloader to load the Codec class from.
080   */
081  private static ClassLoader getClassLoaderForCodec() {
082    ClassLoader cl = Thread.currentThread().getContextClassLoader();
083    if (cl == null) {
084      cl = Compression.class.getClassLoader();
085    }
086    if (cl == null) {
087      cl = ClassLoader.getSystemClassLoader();
088    }
089    if (cl == null) {
090      throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
091    }
092    return cl;
093  }
094
095  /**
096   * Compression algorithms. The ordinal of these cannot change or else you
097   * risk breaking all existing HFiles out there.  Even the ones that are
098   * not compressed! (They use the NONE algorithm)
099   */
100  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101      value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102      justification="We are not serializing so doesn't apply (not sure why transient though)")
103  @InterfaceAudience.Public
104  public static enum Algorithm {
105    LZO("lzo") {
106      // Use base type to avoid compile-time dependencies.
107      private volatile transient CompressionCodec lzoCodec;
108      private final transient Object lock = new Object();
109
110      @Override
111      CompressionCodec getCodec(Configuration conf) {
112        if (lzoCodec == null) {
113          synchronized (lock) {
114            if (lzoCodec == null) {
115              lzoCodec = buildCodec(conf);
116            }
117          }
118        }
119        return lzoCodec;
120      }
121
122      private CompressionCodec buildCodec(Configuration conf) {
123        try {
124          Class<?> externalCodec =
125              getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
126          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
127              new Configuration(conf));
128        } catch (ClassNotFoundException e) {
129          throw new RuntimeException(e);
130        }
131      }
132    },
133    GZ("gz") {
134      private volatile transient GzipCodec codec;
135      private final transient Object lock = new Object();
136
137      @Override
138      DefaultCodec getCodec(Configuration conf) {
139        if (codec == null) {
140          synchronized (lock) {
141            if (codec == null) {
142              codec = buildCodec(conf);
143            }
144          }
145        }
146
147        return codec;
148      }
149
150      private GzipCodec buildCodec(Configuration conf) {
151        GzipCodec gzcodec = new ReusableStreamGzipCodec();
152        gzcodec.setConf(new Configuration(conf));
153        return gzcodec;
154      }
155    },
156
157    NONE("none") {
158      @Override
159      DefaultCodec getCodec(Configuration conf) {
160        return null;
161      }
162
163      @Override
164      public synchronized InputStream createDecompressionStream(
165          InputStream downStream, Decompressor decompressor,
166          int downStreamBufferSize) throws IOException {
167        if (downStreamBufferSize > 0) {
168          return new BufferedInputStream(downStream, downStreamBufferSize);
169        }
170        return downStream;
171      }
172
173      @Override
174      public synchronized OutputStream createCompressionStream(
175          OutputStream downStream, Compressor compressor,
176          int downStreamBufferSize) throws IOException {
177        if (downStreamBufferSize > 0) {
178          return new BufferedOutputStream(downStream, downStreamBufferSize);
179        }
180
181        return downStream;
182      }
183    },
184    SNAPPY("snappy") {
185      // Use base type to avoid compile-time dependencies.
186      private volatile transient CompressionCodec snappyCodec;
187      private final transient Object lock = new Object();
188
189      @Override
190      CompressionCodec getCodec(Configuration conf) {
191        if (snappyCodec == null) {
192          synchronized (lock) {
193            if (snappyCodec == null) {
194              snappyCodec = buildCodec(conf);
195            }
196          }
197        }
198        return snappyCodec;
199      }
200
201      private CompressionCodec buildCodec(Configuration conf) {
202        try {
203          Class<?> externalCodec =
204              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
205          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
206        } catch (ClassNotFoundException e) {
207          throw new RuntimeException(e);
208        }
209      }
210    },
211    LZ4("lz4") {
212      // Use base type to avoid compile-time dependencies.
213      private volatile transient CompressionCodec lz4Codec;
214      private final transient Object lock = new Object();
215
216      @Override
217      CompressionCodec getCodec(Configuration conf) {
218        if (lz4Codec == null) {
219          synchronized (lock) {
220            if (lz4Codec == null) {
221              lz4Codec = buildCodec(conf);
222            }
223          }
224        }
225        return lz4Codec;
226      }
227
228      private CompressionCodec buildCodec(Configuration conf) {
229        try {
230          Class<?> externalCodec =
231              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
232          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
233        } catch (ClassNotFoundException e) {
234          throw new RuntimeException(e);
235        }
236      }
237    },
238    BZIP2("bzip2") {
239      // Use base type to avoid compile-time dependencies.
240      private volatile transient CompressionCodec bzipCodec;
241      private final transient Object lock = new Object();
242
243      @Override
244      CompressionCodec getCodec(Configuration conf) {
245        if (bzipCodec == null) {
246          synchronized (lock) {
247            if (bzipCodec == null) {
248              bzipCodec = buildCodec(conf);
249            }
250          }
251        }
252        return bzipCodec;
253      }
254
255      private CompressionCodec buildCodec(Configuration conf) {
256        try {
257          Class<?> externalCodec =
258              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.BZip2Codec");
259          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
260        } catch (ClassNotFoundException e) {
261          throw new RuntimeException(e);
262        }
263      }
264    },
265    ZSTD("zstd") {
266      // Use base type to avoid compile-time dependencies.
267      private volatile transient CompressionCodec zStandardCodec;
268      private final transient Object lock = new Object();
269
270      @Override
271      CompressionCodec getCodec(Configuration conf) {
272        if (zStandardCodec == null) {
273          synchronized (lock) {
274            if (zStandardCodec == null) {
275              zStandardCodec = buildCodec(conf);
276            }
277          }
278        }
279        return zStandardCodec;
280      }
281
282      private CompressionCodec buildCodec(Configuration conf) {
283        try {
284          Class<?> externalCodec =
285              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
286          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
287        } catch (ClassNotFoundException e) {
288          throw new RuntimeException(e);
289        }
290      }
291    };
292
293    private final Configuration conf;
294    private final String compressName;
295    /** data input buffer size to absorb small reads from application. */
296    private static final int DATA_IBUF_SIZE = 1 * 1024;
297    /** data output buffer size to absorb small writes from application. */
298    private static final int DATA_OBUF_SIZE = 4 * 1024;
299
300    Algorithm(String name) {
301      this.conf = new Configuration();
302      this.conf.setBoolean("io.native.lib.available", true);
303      this.compressName = name;
304    }
305
306    abstract CompressionCodec getCodec(Configuration conf);
307
308    public InputStream createDecompressionStream(
309        InputStream downStream, Decompressor decompressor,
310        int downStreamBufferSize) throws IOException {
311      CompressionCodec codec = getCodec(conf);
312      // Set the internal buffer size to read from down stream.
313      if (downStreamBufferSize > 0) {
314        ((Configurable)codec).getConf().setInt("io.file.buffer.size",
315            downStreamBufferSize);
316      }
317      CompressionInputStream cis =
318          codec.createInputStream(downStream, decompressor);
319      BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
320      return bis2;
321
322    }
323
324    public OutputStream createCompressionStream(
325        OutputStream downStream, Compressor compressor, int downStreamBufferSize)
326        throws IOException {
327      OutputStream bos1 = null;
328      if (downStreamBufferSize > 0) {
329        bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
330      }
331      else {
332        bos1 = downStream;
333      }
334      CompressionOutputStream cos =
335          createPlainCompressionStream(bos1, compressor);
336      BufferedOutputStream bos2 =
337          new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
338              DATA_OBUF_SIZE);
339      return bos2;
340    }
341
342    /**
343     * Creates a compression stream without any additional wrapping into
344     * buffering streams.
345     */
346    public CompressionOutputStream createPlainCompressionStream(
347        OutputStream downStream, Compressor compressor) throws IOException {
348      CompressionCodec codec = getCodec(conf);
349      ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
350      return codec.createOutputStream(downStream, compressor);
351    }
352
353    public Compressor getCompressor() {
354      CompressionCodec codec = getCodec(conf);
355      if (codec != null) {
356        Compressor compressor = CodecPool.getCompressor(codec);
357        if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
358        if (compressor != null) {
359          if (compressor.finished()) {
360            // Somebody returns the compressor to CodecPool but is still using it.
361            LOG.warn("Compressor obtained from CodecPool is already finished()");
362          }
363          compressor.reset();
364        }
365        return compressor;
366      }
367      return null;
368    }
369
370    public void returnCompressor(Compressor compressor) {
371      if (compressor != null) {
372        if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
373        CodecPool.returnCompressor(compressor);
374      }
375    }
376
377    public Decompressor getDecompressor() {
378      CompressionCodec codec = getCodec(conf);
379      if (codec != null) {
380        Decompressor decompressor = CodecPool.getDecompressor(codec);
381        if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
382        if (decompressor != null) {
383          if (decompressor.finished()) {
384            // Somebody returns the decompressor to CodecPool but is still using it.
385            LOG.warn("Deompressor obtained from CodecPool is already finished()");
386          }
387          decompressor.reset();
388        }
389        return decompressor;
390      }
391
392      return null;
393    }
394
395    public void returnDecompressor(Decompressor decompressor) {
396      if (decompressor != null) {
397        if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
398        CodecPool.returnDecompressor(decompressor);
399        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
400          if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
401          decompressor.end();
402        }
403      }
404    }
405
406    public String getName() {
407      return compressName;
408    }
409  }
410
411  public static Algorithm getCompressionAlgorithmByName(String compressName) {
412    Algorithm[] algos = Algorithm.class.getEnumConstants();
413
414    for (Algorithm a : algos) {
415      if (a.getName().equals(compressName)) {
416        return a;
417      }
418    }
419
420    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
421  }
422
423  /**
424   * Get names of supported compression algorithms.
425   *
426   * @return Array of strings, each represents a supported compression
427   * algorithm. Currently, the following compression algorithms are supported.
428   */
429  public static String[] getSupportedAlgorithms() {
430    Algorithm[] algos = Algorithm.class.getEnumConstants();
431
432    String[] ret = new String[algos.length];
433    int i = 0;
434    for (Algorithm a : algos) {
435      ret[i++] = a.getName();
436    }
437
438    return ret;
439  }
440
441  /**
442   * Decompresses data from the given stream using the configured compression algorithm. It will
443   * throw an exception if the dest buffer does not have enough space to hold the decompressed data.
444   * @param dest the output buffer
445   * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
446   *          of compressed data
447   * @param uncompressedSize uncompressed data size, header not included
448   * @param compressAlgo compression algorithm used
449   * @throws IOException if any IO error happen
450   */
451  public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream,
452      int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException {
453    if (dest.remaining() < uncompressedSize) {
454      throw new IllegalArgumentException("Output buffer does not have enough space to hold "
455          + uncompressedSize + " decompressed bytes, available: " + dest.remaining());
456    }
457
458    Decompressor decompressor = null;
459    try {
460      decompressor = compressAlgo.getDecompressor();
461      try (InputStream is =
462          compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) {
463        BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
464      }
465    } finally {
466      if (decompressor != null) {
467        compressAlgo.returnDecompressor(decompressor);
468      }
469    }
470  }
471}