001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.FilterOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DoNotPool;
036import org.apache.hadoop.util.ReflectionUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Compression related stuff. Copied from hadoop-3315 tfile.
043 */
044@InterfaceAudience.Private
045public final class Compression {
046  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
047
048  // LZO
049
050  public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec";
051  public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec";
052
053  // GZ
054
055  public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec";
056  // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to
057  // reuse compression streams, but still requires the Hadoop native codec.
058  public static final String GZ_CODEC_CLASS_DEFAULT =
059    "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec";
060
061  // SNAPPY
062
063  public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec";
064  public static final String SNAPPY_CODEC_CLASS_DEFAULT =
065    "org.apache.hadoop.io.compress.SnappyCodec";
066
067  // LZ4
068
069  public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec";
070  public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec";
071
072  // ZSTD
073
074  public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec";
075  public static final String ZSTD_CODEC_CLASS_DEFAULT =
076    "org.apache.hadoop.io.compress.ZStandardCodec";
077
078  // BZIP2
079
080  public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec";
081  public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec";
082
083  // LZMA
084
085  public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec";
086  public static final String LZMA_CODEC_CLASS_DEFAULT =
087    "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
088
089  // Brotli
090
091  public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec";
092  public static final String BROTLI_CODEC_CLASS_DEFAULT =
093    "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
094
095  /**
096   * Prevent the instantiation of class.
097   */
098  private Compression() {
099    super();
100  }
101
102  static class FinishOnFlushCompressionStream extends FilterOutputStream {
103    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
104      super(cout);
105    }
106
107    @Override
108    public void write(byte b[], int off, int len) throws IOException {
109      out.write(b, off, len);
110    }
111
112    @Override
113    public void flush() throws IOException {
114      CompressionOutputStream cout = (CompressionOutputStream) out;
115      cout.finish();
116      cout.flush();
117      cout.resetState();
118    }
119  }
120
121  /**
122   * Returns the classloader to load the Codec class from.
123   */
124  private static ClassLoader getClassLoaderForCodec() {
125    ClassLoader cl = Thread.currentThread().getContextClassLoader();
126    if (cl == null) {
127      cl = Compression.class.getClassLoader();
128    }
129    if (cl == null) {
130      cl = ClassLoader.getSystemClassLoader();
131    }
132    if (cl == null) {
133      throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
134    }
135    return cl;
136  }
137
138  /**
139   * Compression algorithms. The ordinal of these cannot change or else you risk breaking all
140   * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm)
141   */
142  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED",
143      justification = "We are not serializing so doesn't apply (not sure why transient though)")
144  @SuppressWarnings("ImmutableEnumChecker")
145  @InterfaceAudience.Public
146  public static enum Algorithm {
147    // LZO is GPL and requires extra install to setup. See
148    // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5
149    LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) {
150      // Use base type to avoid compile-time dependencies.
151      private volatile transient CompressionCodec lzoCodec;
152      private final transient Object lock = new Object();
153
154      @Override
155      CompressionCodec getCodec(Configuration conf) {
156        if (lzoCodec == null) {
157          synchronized (lock) {
158            if (lzoCodec == null) {
159              lzoCodec = buildCodec(conf, this);
160            }
161          }
162        }
163        return lzoCodec;
164      }
165
166      @Override
167      public CompressionCodec reload(Configuration conf) {
168        synchronized (lock) {
169          lzoCodec = buildCodec(conf, this);
170          LOG.warn("Reloaded configuration for {}", name());
171          return lzoCodec;
172        }
173      }
174    },
175
176    GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) {
177      private volatile transient CompressionCodec gzCodec;
178      private final transient Object lock = new Object();
179
180      @Override
181      CompressionCodec getCodec(Configuration conf) {
182        if (gzCodec == null) {
183          synchronized (lock) {
184            if (gzCodec == null) {
185              gzCodec = buildCodec(conf, this);
186            }
187          }
188        }
189        return gzCodec;
190      }
191
192      @Override
193      public CompressionCodec reload(Configuration conf) {
194        synchronized (lock) {
195          gzCodec = buildCodec(conf, this);
196          LOG.warn("Reloaded configuration for {}", name());
197          return gzCodec;
198        }
199      }
200    },
201
202    NONE("none", "", "") {
203      @Override
204      CompressionCodec getCodec(Configuration conf) {
205        return null;
206      }
207
208      @Override
209      public CompressionCodec reload(Configuration conf) {
210        return null;
211      }
212
213      @Override
214      public synchronized InputStream createDecompressionStream(InputStream downStream,
215        Decompressor decompressor, int downStreamBufferSize) throws IOException {
216        if (downStreamBufferSize > 0) {
217          return new BufferedInputStream(downStream, downStreamBufferSize);
218        }
219        return downStream;
220      }
221
222      @Override
223      public synchronized OutputStream createCompressionStream(OutputStream downStream,
224        Compressor compressor, int downStreamBufferSize) throws IOException {
225        if (downStreamBufferSize > 0) {
226          return new BufferedOutputStream(downStream, downStreamBufferSize);
227        }
228
229        return downStream;
230      }
231    },
232    SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) {
233      // Use base type to avoid compile-time dependencies.
234      private volatile transient CompressionCodec snappyCodec;
235      private final transient Object lock = new Object();
236
237      @Override
238      CompressionCodec getCodec(Configuration conf) {
239        if (snappyCodec == null) {
240          synchronized (lock) {
241            if (snappyCodec == null) {
242              snappyCodec = buildCodec(conf, this);
243            }
244          }
245        }
246        return snappyCodec;
247      }
248
249      @Override
250      public CompressionCodec reload(Configuration conf) {
251        synchronized (lock) {
252          snappyCodec = buildCodec(conf, this);
253          LOG.warn("Reloaded configuration for {}", name());
254          return snappyCodec;
255        }
256      }
257    },
258    LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) {
259      // Use base type to avoid compile-time dependencies.
260      private volatile transient CompressionCodec lz4Codec;
261      private final transient Object lock = new Object();
262
263      @Override
264      CompressionCodec getCodec(Configuration conf) {
265        if (lz4Codec == null) {
266          synchronized (lock) {
267            if (lz4Codec == null) {
268              lz4Codec = buildCodec(conf, this);
269            }
270          }
271        }
272        return lz4Codec;
273      }
274
275      @Override
276      public CompressionCodec reload(Configuration conf) {
277        synchronized (lock) {
278          lz4Codec = buildCodec(conf, this);
279          LOG.warn("Reloaded configuration for {}", name());
280          return lz4Codec;
281        }
282      }
283    },
284    BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) {
285      // Use base type to avoid compile-time dependencies.
286      private volatile transient CompressionCodec bzipCodec;
287      private final transient Object lock = new Object();
288
289      @Override
290      CompressionCodec getCodec(Configuration conf) {
291        if (bzipCodec == null) {
292          synchronized (lock) {
293            if (bzipCodec == null) {
294              bzipCodec = buildCodec(conf, this);
295            }
296          }
297        }
298        return bzipCodec;
299      }
300
301      @Override
302      public CompressionCodec reload(Configuration conf) {
303        synchronized (lock) {
304          bzipCodec = buildCodec(conf, this);
305          LOG.warn("Reloaded configuration for {}", name());
306          return bzipCodec;
307        }
308      }
309    },
310    ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) {
311      // Use base type to avoid compile-time dependencies.
312      private volatile transient CompressionCodec zStandardCodec;
313      private final transient Object lock = new Object();
314
315      @Override
316      CompressionCodec getCodec(Configuration conf) {
317        if (zStandardCodec == null) {
318          synchronized (lock) {
319            if (zStandardCodec == null) {
320              zStandardCodec = buildCodec(conf, this);
321            }
322          }
323        }
324        return zStandardCodec;
325      }
326
327      @Override
328      public CompressionCodec reload(Configuration conf) {
329        synchronized (lock) {
330          zStandardCodec = buildCodec(conf, this);
331          LOG.warn("Reloaded configuration for {}", name());
332          return zStandardCodec;
333        }
334      }
335    },
336    LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) {
337      // Use base type to avoid compile-time dependencies.
338      private volatile transient CompressionCodec lzmaCodec;
339      private final transient Object lock = new Object();
340
341      @Override
342      CompressionCodec getCodec(Configuration conf) {
343        if (lzmaCodec == null) {
344          synchronized (lock) {
345            if (lzmaCodec == null) {
346              lzmaCodec = buildCodec(conf, this);
347            }
348          }
349        }
350        return lzmaCodec;
351      }
352
353      @Override
354      public CompressionCodec reload(Configuration conf) {
355        synchronized (lock) {
356          lzmaCodec = buildCodec(conf, this);
357          LOG.warn("Reloaded configuration for {}", name());
358          return lzmaCodec;
359        }
360      }
361    },
362
363    BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
364      // Use base type to avoid compile-time dependencies.
365      private volatile transient CompressionCodec brotliCodec;
366      private final transient Object lock = new Object();
367
368      @Override
369      CompressionCodec getCodec(Configuration conf) {
370        if (brotliCodec == null) {
371          synchronized (lock) {
372            if (brotliCodec == null) {
373              brotliCodec = buildCodec(conf, this);
374            }
375          }
376        }
377        return brotliCodec;
378      }
379
380      @Override
381      public CompressionCodec reload(Configuration conf) {
382        synchronized (lock) {
383          brotliCodec = buildCodec(conf, this);
384          LOG.warn("Reloaded configuration for {}", name());
385          return brotliCodec;
386        }
387      }
388    };
389
390    private final Configuration conf;
391    private final String compressName;
392    private final String confKey;
393    private final String confDefault;
394    /** data input buffer size to absorb small reads from application. */
395    private static final int DATA_IBUF_SIZE = 1 * 1024;
396    /** data output buffer size to absorb small writes from application. */
397    private static final int DATA_OBUF_SIZE = 4 * 1024;
398
399    Algorithm(String name, String confKey, String confDefault) {
400      this.conf = HBaseConfiguration.create();
401      this.conf.setBoolean("io.native.lib.available", true);
402      this.compressName = name;
403      this.confKey = confKey;
404      this.confDefault = confDefault;
405    }
406
407    abstract CompressionCodec getCodec(Configuration conf);
408
409    /**
410     * Reload configuration for the given algorithm.
411     * <p>
412     * NOTE: Experts only. This can only be done safely during process startup, before the
413     * algorithm's codecs are in use. If the codec implementation is changed, the new implementation
414     * may not be fully compatible with what was loaded at static initialization time, leading to
415     * potential data corruption. Mostly used by unit tests.
416     * @param conf configuration
417     */
418    public abstract CompressionCodec reload(Configuration conf);
419
420    public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor,
421      int downStreamBufferSize) throws IOException {
422      CompressionCodec codec = getCodec(conf);
423      // Set the internal buffer size to read from down stream.
424      if (downStreamBufferSize > 0) {
425        ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize);
426      }
427      CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
428      BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
429      return bis2;
430
431    }
432
433    public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
434      int downStreamBufferSize) throws IOException {
435      OutputStream bos1 = null;
436      if (downStreamBufferSize > 0) {
437        bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
438      } else {
439        bos1 = downStream;
440      }
441      CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor);
442      BufferedOutputStream bos2 =
443        new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
444      return bos2;
445    }
446
447    /**
448     * Creates a compression stream without any additional wrapping into buffering streams.
449     */
450    public CompressionOutputStream createPlainCompressionStream(OutputStream downStream,
451      Compressor compressor) throws IOException {
452      CompressionCodec codec = getCodec(conf);
453      ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
454      return codec.createOutputStream(downStream, compressor);
455    }
456
457    public Compressor getCompressor() {
458      CompressionCodec codec = getCodec(conf);
459      if (codec != null) {
460        Compressor compressor = CodecPool.getCompressor(codec);
461        if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
462        if (compressor != null) {
463          if (compressor.finished()) {
464            // Somebody returns the compressor to CodecPool but is still using it.
465            LOG.warn("Compressor obtained from CodecPool is already finished()");
466          }
467          compressor.reset();
468        }
469        return compressor;
470      }
471      return null;
472    }
473
474    public void returnCompressor(Compressor compressor) {
475      if (compressor != null) {
476        if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
477        CodecPool.returnCompressor(compressor);
478      }
479    }
480
481    public Decompressor getDecompressor() {
482      CompressionCodec codec = getCodec(conf);
483      if (codec != null) {
484        Decompressor decompressor = CodecPool.getDecompressor(codec);
485        if (LOG.isTraceEnabled())
486          LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
487        if (decompressor != null) {
488          if (decompressor.finished()) {
489            // Somebody returns the decompressor to CodecPool but is still using it.
490            LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor);
491          }
492          decompressor.reset();
493        }
494        return decompressor;
495      }
496
497      return null;
498    }
499
500    public void returnDecompressor(Decompressor decompressor) {
501      if (decompressor != null) {
502        if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
503        CodecPool.returnDecompressor(decompressor);
504        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
505          if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
506          decompressor.end();
507        }
508      }
509    }
510
511    public String getName() {
512      return compressName;
513    }
514  }
515
516  public static Algorithm getCompressionAlgorithmByName(String compressName) {
517    Algorithm[] algos = Algorithm.class.getEnumConstants();
518
519    for (Algorithm a : algos) {
520      if (a.getName().equals(compressName)) {
521        return a;
522      }
523    }
524
525    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
526  }
527
528  /**
529   * Get names of supported compression algorithms.
530   * @return Array of strings, each represents a supported compression algorithm. Currently, the
531   *         following compression algorithms are supported.
532   */
533  public static String[] getSupportedAlgorithms() {
534    Algorithm[] algos = Algorithm.class.getEnumConstants();
535
536    String[] ret = new String[algos.length];
537    int i = 0;
538    for (Algorithm a : algos) {
539      ret[i++] = a.getName();
540    }
541
542    return ret;
543  }
544
545  /**
546   * Load a codec implementation for an algorithm using the supplied configuration.
547   * @param conf the configuration to use
548   * @param algo the algorithm to implement
549   */
550  private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) {
551    try {
552      String codecClassName = conf.get(algo.confKey, algo.confDefault);
553      if (codecClassName == null) {
554        throw new RuntimeException("No codec configured for " + algo.confKey);
555      }
556      Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName);
557      CompressionCodec codec =
558        (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf));
559      LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(),
560        algo.name());
561      return codec;
562    } catch (ClassNotFoundException e) {
563      throw new RuntimeException(e);
564    }
565  }
566
567  public static void main(String[] args) throws Exception {
568    Configuration conf = HBaseConfiguration.create();
569    java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>();
570    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
571      try {
572        implMap.put(algo.name(), algo.getCodec(conf));
573      } catch (Exception e) {
574        // Ignore failures to load codec native implementations while building the report.
575        // We are to report what is configured.
576      }
577    }
578    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
579      System.out.println(algo.name() + ":");
580      System.out.println("    name: " + algo.getName());
581      System.out.println("    confKey: " + algo.confKey);
582      System.out.println("    confDefault: " + algo.confDefault);
583      CompressionCodec codec = implMap.get(algo.name());
584      System.out.println(
585        "    implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>"));
586    }
587  }
588
589}