001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.aircompressor;
019
020import io.airlift.compress.zstd.ZstdCompressor;
021import io.airlift.compress.zstd.ZstdDecompressor;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.CommonConfigurationKeys;
028import org.apache.hadoop.hbase.io.compress.CompressionUtil;
029import org.apache.hadoop.io.compress.BlockCompressorStream;
030import org.apache.hadoop.io.compress.BlockDecompressorStream;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.CompressionInputStream;
033import org.apache.hadoop.io.compress.CompressionOutputStream;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Hadoop codec implementation for Zstandard, implemented with aircompressor.
040 * <p>
041 * Unlike the other codecs this one should be considered as under development and unstable (as in
042 * changing), reflecting the status of aircompressor's zstandard implementation.
043 * <p>
044 * NOTE: This codec is NOT data format compatible with the Hadoop native zstandard codec. There are
045 * issues with both framing and limitations of the aircompressor zstandard compressor. This codec
046 * can be used as an alternative to the native codec, if the native codec cannot be made available
047 * and/or an eventual migration will never be necessary (i.e. this codec's performance meets
048 * anticipated requirements). Once you begin using this alternative you will be locked into it.
049 */
050@InterfaceAudience.Private
051public class ZstdCodec implements Configurable, CompressionCodec {
052
053  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
054  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
055
056  private Configuration conf;
057
058  public ZstdCodec() {
059    conf = new Configuration();
060  }
061
062  @Override
063  public void setConf(Configuration conf) {
064    this.conf = conf;
065  }
066
067  @Override
068  public Configuration getConf() {
069    return conf;
070  }
071
072  @Override
073  public Compressor createCompressor() {
074    return new HadoopZstdCompressor();
075  }
076
077  @Override
078  public Decompressor createDecompressor() {
079    return new HadoopZstdDecompressor();
080  }
081
082  @Override
083  public CompressionInputStream createInputStream(InputStream in) throws IOException {
084    return createInputStream(in, createDecompressor());
085  }
086
087  @Override
088  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
089    throws IOException {
090    return new BlockDecompressorStream(in, d, getBufferSize(conf));
091  }
092
093  @Override
094  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
095    return createOutputStream(out, createCompressor());
096  }
097
098  @Override
099  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
100    throws IOException {
101    int bufferSize = getBufferSize(conf);
102    return new BlockCompressorStream(out, c, bufferSize,
103      CompressionUtil.compressionOverhead(bufferSize));
104  }
105
106  @Override
107  public Class<? extends Compressor> getCompressorType() {
108    return HadoopZstdCompressor.class;
109  }
110
111  @Override
112  public Class<? extends Decompressor> getDecompressorType() {
113    return HadoopZstdDecompressor.class;
114  }
115
116  @Override
117  public String getDefaultExtension() {
118    return ".zst";
119  }
120
121  @InterfaceAudience.Private
122  public class HadoopZstdCompressor extends HadoopCompressor<ZstdCompressor> {
123
124    HadoopZstdCompressor(ZstdCompressor compressor) {
125      super(compressor, ZstdCodec.getBufferSize(conf));
126    }
127
128    HadoopZstdCompressor() {
129      this(new ZstdCompressor());
130    }
131
132    @Override
133    int getLevel(Configuration conf) {
134      return 0;
135    }
136
137    @Override
138    int getBufferSize(Configuration conf) {
139      return ZstdCodec.getBufferSize(conf);
140    }
141
142  }
143
144  @InterfaceAudience.Private
145  public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor> {
146
147    HadoopZstdDecompressor(ZstdDecompressor decompressor) {
148      super(decompressor, getBufferSize(conf));
149    }
150
151    HadoopZstdDecompressor() {
152      this(new ZstdDecompressor());
153    }
154
155  }
156
157  // Package private
158
159  static int getBufferSize(Configuration conf) {
160    return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
161      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
162        // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
163        ZSTD_BUFFER_SIZE_DEFAULT));
164  }
165
166}